contrib/postgres_fdw/postgres_fdw.c | 297 ++++++++++++++++++++++++++++++------ contrib/postgres_fdw/postgres_fdw.h | 2 + 2 files changed, 249 insertions(+), 50 deletions(-) diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 36b407d..c0ff341 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -22,8 +22,10 @@ #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" +#include "nodes/extensible.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" +#include "nodes/readfuncs.h" #include "optimizer/cost.h" #include "optimizer/pathnode.h" #include "optimizer/paths.h" @@ -57,15 +59,18 @@ PG_MODULE_MAGIC; * can be fetched with list_nth(). For example, to get the SELECT statement: * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); */ -enum FdwScanPrivateIndex +typedef struct fdwScanPrivate { + ExtensibleNode xnode; /* SQL statement to execute remotely (as a String node) */ - FdwScanPrivateSelectSql, + char *selectSql; /* Integer list of attribute numbers retrieved by the SELECT */ - FdwScanPrivateRetrievedAttrs, + List *retrievedAttrs; /* Integer representing the desired fetch_size */ - FdwScanPrivateFetchSize -}; + int fetchSize; +} fdwScanPrivate; + +static ExtensibleNodeMethods fdwScanPrivateMethods; /* * Similarly, this enum describes what's kept in the fdw_private list for @@ -77,17 +82,20 @@ enum FdwScanPrivateIndex * 3) Boolean flag showing if the remote query has a RETURNING clause * 4) Integer list of attribute numbers retrieved by RETURNING, if any */ -enum FdwModifyPrivateIndex +typedef struct fdwModifyPrivate { + ExtensibleNode xnode; /* SQL statement to execute remotely (as a String node) */ - FdwModifyPrivateUpdateSql, + char *updateSql; /* Integer list of target attribute numbers for INSERT/UPDATE */ - FdwModifyPrivateTargetAttnums, + List *targetAttnums; /* has-returning flag (as an integer Value node) */ - FdwModifyPrivateHasReturning, + bool hasReturning; /* Integer list of attribute numbers retrieved by RETURNING */ - FdwModifyPrivateRetrievedAttrs -}; + List *retrievedAttrs; +} fdwModifyPrivate; + +static ExtensibleNodeMethods fdwModifyPrivateMethods; /* * Execution state of a foreign scan using postgres_fdw. @@ -221,13 +229,13 @@ static void postgresEndForeignScan(ForeignScanState *node); static void postgresAddForeignUpdateTargets(Query *parsetree, RangeTblEntry *target_rte, Relation target_relation); -static List *postgresPlanForeignModify(PlannerInfo *root, +static Node *postgresPlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index); static void postgresBeginForeignModify(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, - List *fdw_private, + Node *fdw_private, int subplan_index, int eflags); static TupleTableSlot *postgresExecForeignInsert(EState *estate, @@ -249,7 +257,7 @@ static void postgresExplainForeignScan(ForeignScanState *node, ExplainState *es); static void postgresExplainForeignModify(ModifyTableState *mtstate, ResultRelInfo *rinfo, - List *fdw_private, + Node *fdw_private, int subplan_index, ExplainState *es); static bool postgresAnalyzeForeignTable(Relation relation, @@ -944,7 +952,7 @@ postgresGetForeignPlan(PlannerInfo *root, { PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private; Index scan_relid = baserel->relid; - List *fdw_private; + fdwScanPrivate *spriv; List *remote_conds = NIL; List *remote_exprs = NIL; List *local_exprs = NIL; @@ -1011,9 +1019,12 @@ postgresGetForeignPlan(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwScanPrivateIndex, above. */ - fdw_private = list_make3(makeString(sql.data), - retrieved_attrs, - makeInteger(fpinfo->fetch_size)); + spriv = (fdwScanPrivate *) newNode(sizeof(fdwScanPrivate), + T_ExtensibleNode); + spriv->xnode.extnodename = fdwScanPrivateMethods.extnodename; + spriv->selectSql = sql.data; + spriv->retrievedAttrs = retrieved_attrs; + spriv->fetchSize = fpinfo->fetch_size; /* * Create the ForeignScan node from target list, filtering expressions, @@ -1027,7 +1038,7 @@ postgresGetForeignPlan(PlannerInfo *root, local_exprs, scan_relid, params_list, - (Node *)fdw_private, + (Node *)spriv, NIL, /* no custom tlist */ remote_exprs, outer_plan); @@ -1042,6 +1053,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) { ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; EState *estate = node->ss.ps.state; + fdwScanPrivate *spriv; PgFdwScanState *fsstate; RangeTblEntry *rte; Oid userid; @@ -1058,6 +1070,11 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) return; /* + * DEBUG of node operations + */ + fsplan = stringToNode(nodeToString(copyObject(fsplan))); + + /* * We'll save private state in node->fdw_state. */ fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState)); @@ -1086,12 +1103,10 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) fsstate->cursor_exists = false; /* Get private info created by planner functions. */ - fsstate->query = strVal(list_nth((List *)fsplan->fdw_private, - FdwScanPrivateSelectSql)); - fsstate->retrieved_attrs = (List *) list_nth((List *)fsplan->fdw_private, - FdwScanPrivateRetrievedAttrs); - fsstate->fetch_size = intVal(list_nth((List *)fsplan->fdw_private, - FdwScanPrivateFetchSize)); + spriv = (fdwScanPrivate *)fsplan->fdw_private; + fsstate->query = spriv->selectSql; + fsstate->retrieved_attrs = spriv->retrievedAttrs; + fsstate->fetch_size = spriv->fetchSize; /* Create contexts for batches of tuples and per-tuple temp workspace. */ fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, @@ -1316,7 +1331,7 @@ postgresAddForeignUpdateTargets(Query *parsetree, * needed, it'd be better to let the scan node do UPDATE/DELETE RETURNING * and then do nothing at ModifyTable. Room for future optimization ... */ -static List * +static Node * postgresPlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, @@ -1330,6 +1345,7 @@ postgresPlanForeignModify(PlannerInfo *root, List *returningList = NIL; List *retrieved_attrs = NIL; bool doNothing = false; + fdwModifyPrivate *mpriv; initStringInfo(&sql); @@ -1424,10 +1440,15 @@ postgresPlanForeignModify(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwModifyPrivateIndex, above. */ - return list_make4(makeString(sql.data), - targetAttrs, - makeInteger((retrieved_attrs != NIL)), - retrieved_attrs); + mpriv = (fdwModifyPrivate *) newNode(sizeof(fdwModifyPrivate), + T_ExtensibleNode); + mpriv->xnode.extnodename = fdwModifyPrivateMethods.extnodename; + mpriv->updateSql = sql.data; + mpriv->targetAttnums = targetAttrs; + mpriv->hasReturning = (retrieved_attrs != NIL); + mpriv->retrievedAttrs = retrieved_attrs; + + return (Node *) mpriv; } /* @@ -1437,11 +1458,12 @@ postgresPlanForeignModify(PlannerInfo *root, static void postgresBeginForeignModify(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, - List *fdw_private, + Node *fdw_private, int subplan_index, int eflags) { PgFdwModifyState *fmstate; + fdwModifyPrivate *mpriv = (fdwModifyPrivate *) fdw_private; EState *estate = mtstate->ps.state; CmdType operation = mtstate->operation; Relation rel = resultRelInfo->ri_RelationDesc; @@ -1481,14 +1503,10 @@ postgresBeginForeignModify(ModifyTableState *mtstate, fmstate->p_name = NULL; /* prepared statement not made yet */ /* Deconstruct fdw_private data. */ - fmstate->query = strVal(list_nth(fdw_private, - FdwModifyPrivateUpdateSql)); - fmstate->target_attrs = (List *) list_nth(fdw_private, - FdwModifyPrivateTargetAttnums); - fmstate->has_returning = intVal(list_nth(fdw_private, - FdwModifyPrivateHasReturning)); - fmstate->retrieved_attrs = (List *) list_nth(fdw_private, - FdwModifyPrivateRetrievedAttrs); + fmstate->query = mpriv->updateSql; + fmstate->target_attrs = mpriv->targetAttnums; + fmstate->has_returning = mpriv->hasReturning; + fmstate->retrieved_attrs = mpriv->retrievedAttrs; /* Create context for per-tuple temp workspace. */ fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, @@ -1831,14 +1849,11 @@ postgresIsForeignRelUpdatable(Relation rel) static void postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) { - List *fdw_private; - char *sql; - if (es->verbose) { - fdw_private = (List *)((ForeignScan *) node->ss.ps.plan)->fdw_private; - sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); - ExplainPropertyText("Remote SQL", sql, es); + fdwScanPrivate *spriv = (fdwScanPrivate *) + ((ForeignScan *) node->ss.ps.plan)->fdw_private; + ExplainPropertyText("Remote SQL", spriv->selectSql, es); } } @@ -1849,16 +1864,14 @@ postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) static void postgresExplainForeignModify(ModifyTableState *mtstate, ResultRelInfo *rinfo, - List *fdw_private, + Node *fdw_private, int subplan_index, ExplainState *es) { if (es->verbose) { - char *sql = strVal(list_nth(fdw_private, - FdwModifyPrivateUpdateSql)); - - ExplainPropertyText("Remote SQL", sql, es); + fdwModifyPrivate *mpriv = (fdwModifyPrivate *)fdw_private; + ExplainPropertyText("Remote SQL", mpriv->updateSql, es); } } @@ -3214,3 +3227,187 @@ find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel) /* We didn't find any suitable equivalence class expression */ return NULL; } + +/* + * node operations for fdwScanPrivate + */ +static void +fdwScanPrivateCopy(ExtensibleNode *_newnode, const ExtensibleNode *_oldnode) +{ + fdwScanPrivate *newnode = (fdwScanPrivate *) _newnode; + fdwScanPrivate *oldnode = (fdwScanPrivate *) _oldnode; + + newnode->selectSql = (!oldnode->selectSql + ? NULL + : pstrdup(oldnode->selectSql)); + newnode->retrievedAttrs = copyObject(oldnode->retrievedAttrs); + newnode->fetchSize = oldnode->fetchSize; +} + +static bool +fdwScanPrivateEqual(const ExtensibleNode *node1, const ExtensibleNode *node2) +{ + fdwScanPrivate *a = (fdwScanPrivate *) node1; + fdwScanPrivate *b = (fdwScanPrivate *) node2; + + /* selectSql */ + if (a->selectSql != NULL && b->selectSql != NULL) + { + if (strcmp(a->selectSql, b->selectSql) != 0) + return false; + } + else if (a->selectSql != b->selectSql) + return false; + + /* retrievedAttrs */ + if (!equal(a->retrievedAttrs, b->retrievedAttrs)) + return false; + + /* fetchSize */ + if (a->fetchSize != b->fetchSize) + return false; + + return true; +} + +static void +fdwScanPrivateOut(StringInfo str, const ExtensibleNode *node) +{ + fdwScanPrivate *spriv = (fdwScanPrivate *)node; + /* selectSql */ + appendStringInfo(str, " :selectSql "); + outToken(str, spriv->selectSql); + /* retrievedAttrs */ + appendStringInfo(str, " :retrievedAttrs %s", + nodeToString(spriv->retrievedAttrs)); + /* fetchSize */ + appendStringInfo(str, " :fetchSize %d", + spriv->fetchSize); +} + +static void +fdwScanPrivateRead(ExtensibleNode *node) +{ + fdwScanPrivate *spriv = (fdwScanPrivate *)node; + char *token; + int length; + + token = pg_strtok(&length); /* skip :selectSql */ + token = pg_strtok(&length); + spriv->selectSql = (length > 0 ? debackslash(token, length) : NULL); + + token = pg_strtok(&length); /* skip :retrievedAttrs */ + spriv->retrievedAttrs = nodeRead(NULL, 0); + + token = pg_strtok(&length); /* skip :fetchSize */ + token = pg_strtok(&length); + spriv->fetchSize = atoi(token); +} + +/* + * node operations for fdwModifyPrivate + */ +static void +fdwModifyPrivateCopy(ExtensibleNode *_newnode, const ExtensibleNode *_oldnode) +{ + fdwModifyPrivate *newnode = (fdwModifyPrivate *) _newnode; + fdwModifyPrivate *oldnode = (fdwModifyPrivate *) _oldnode; + + newnode->updateSql = (!oldnode->updateSql + ? NULL + : pstrdup(oldnode->updateSql)); + newnode->targetAttnums = copyObject(oldnode->targetAttnums); + newnode->hasReturning = oldnode->hasReturning; + newnode->retrievedAttrs = copyObject(oldnode->retrievedAttrs); +} + +static bool +fdwModifyPrivateEqual(const ExtensibleNode *node1, const ExtensibleNode *node2) +{ + fdwModifyPrivate *a = (fdwModifyPrivate *) node1; + fdwModifyPrivate *b = (fdwModifyPrivate *) node2; + + /* updateSql */ + if (a->updateSql != NULL && b->updateSql != NULL) + { + if (strcmp(a->updateSql, b->updateSql) != 0) + return false; + } + else if (a->updateSql != b->updateSql) + return false; + /* targetAttnums */ + if (!equal(a->targetAttnums, b->targetAttnums)) + return false; + /* hasReturning */ + if (a->hasReturning != b->hasReturning) + return false; + /* retrievedAttrs */ + if (!equal(a->retrievedAttrs, b->retrievedAttrs)) + return false; + return true; +} + +static void +fdwModifyPrivateOut(StringInfo str, const ExtensibleNode *node) +{ + fdwModifyPrivate *mpriv = (fdwModifyPrivate *) node; + /* updateSql */ + appendStringInfo(str, " :updateSql "); + outToken(str, mpriv->updateSql); + /* targetAttnums */ + appendStringInfo(str, " :targetAttnums %s", + nodeToString(mpriv->targetAttnums)); + /* hasReturning */ + appendStringInfo(str, " :hasReturning %s", + mpriv->hasReturning ? "true" : "false"); + /* retrievedAttrs */ + appendStringInfo(str, " :retrievedAttrs %s", + nodeToString(mpriv->retrievedAttrs)); +} + +static void +fdwModifyPrivateRead(ExtensibleNode *node) +{ + fdwModifyPrivate *mpriv = (fdwModifyPrivate *) node; + char *token; + int length; + + token = pg_strtok(&length); /* skip :updateSql */ + token = pg_strtok(&length); + mpriv->updateSql = (length > 0 ? debackslash(token, length) : NULL); + + token = pg_strtok(&length); /* skip :targetAttnums*/ + mpriv->targetAttnums = nodeRead(NULL, 0); + + token = pg_strtok(&length); /* skip :hasReturning */ + token = pg_strtok(&length); + mpriv->hasReturning = (*token == 't' ? true : false); + + token = pg_strtok(&length); /* skip :retrievedAttrs */ + mpriv->retrievedAttrs = nodeRead(NULL, 0); +} + +/* + * Entrypoint of postgres_fdw + */ +void +_PG_init(void) +{ + /* register fdwScanPrivate node */ + fdwScanPrivateMethods.extnodename = "postgres_fdw::fdwScanPrivate"; + fdwScanPrivateMethods.node_size = sizeof(fdwScanPrivate); + fdwScanPrivateMethods.nodeCopy = fdwScanPrivateCopy; + fdwScanPrivateMethods.nodeEqual = fdwScanPrivateEqual; + fdwScanPrivateMethods.nodeOut = fdwScanPrivateOut; + fdwScanPrivateMethods.nodeRead = fdwScanPrivateRead; + RegisterExtensibleNodeMethods(&fdwScanPrivateMethods); + + /* register fdwModifyPrivate node */ + fdwModifyPrivateMethods.extnodename = "postgres_fdw::fdwModifyPrivate"; + fdwModifyPrivateMethods.node_size = sizeof(fdwModifyPrivate); + fdwModifyPrivateMethods.nodeCopy = fdwModifyPrivateCopy; + fdwModifyPrivateMethods.nodeEqual = fdwModifyPrivateEqual; + fdwModifyPrivateMethods.nodeOut = fdwModifyPrivateOut; + fdwModifyPrivateMethods.nodeRead = fdwModifyPrivateRead; + RegisterExtensibleNodeMethods(&fdwModifyPrivateMethods); +} diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 2b63281..98c8527 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -61,6 +61,8 @@ typedef struct PgFdwRelationInfo extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); +extern void _PG_init(void); + /* in connection.c */ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); extern void ReleaseConnection(PGconn *conn);