From 29ff4fbd97386eb0f2ab8a9bc80420c74fc4fd82 Mon Sep 17 00:00:00 2001 From: Takayuki Tsunakawa Date: Tue, 10 Nov 2020 09:27:56 +0900 Subject: [PATCH v1] Add bulk insert for foreign tables --- contrib/postgres_fdw/deparse.c | 3 +- contrib/postgres_fdw/postgres_fdw.c | 233 ++++++++++++++++++++++++++------- contrib/postgres_fdw/postgres_fdw.h | 2 +- doc/src/sgml/fdwhandler.sgml | 64 ++++++++- src/backend/executor/nodeModifyTable.c | 135 +++++++++++++++++++ src/include/foreign/fdwapi.h | 7 + src/include/nodes/execnodes.h | 6 + 7 files changed, 395 insertions(+), 55 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index 2d44df1..5aa81db 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -1706,7 +1706,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, bool doNothing, List *withCheckOptionList, List *returningList, - List **retrieved_attrs) + List **retrieved_attrs, int *values_end_len) { AttrNumber pindex; bool first; @@ -1749,6 +1749,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, } else appendStringInfoString(buf, " DEFAULT VALUES"); + *values_end_len = buf->len; if (doNothing) appendStringInfoString(buf, " ON CONFLICT DO NOTHING"); diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 9c5aaac..f7be4be 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -86,8 +86,10 @@ enum FdwScanPrivateIndex * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server * 2) Integer list of target attribute numbers for INSERT/UPDATE * (NIL for a DELETE) - * 3) Boolean flag showing if the remote query has a RETURNING clause - * 4) Integer list of attribute numbers retrieved by RETURNING, if any + * 3) Length till the end of VALUES clause for INSERT + * (-1 for a DELETE/UPDATE) + * 4) Boolean flag showing if the remote query has a RETURNING clause + * 5) Integer list of attribute numbers retrieved by RETURNING, if any */ enum FdwModifyPrivateIndex { @@ -95,6 +97,8 @@ enum FdwModifyPrivateIndex FdwModifyPrivateUpdateSql, /* Integer list of target attribute numbers for INSERT/UPDATE */ FdwModifyPrivateTargetAttnums, + /* Length till the end of VALUES clause (as an integer Value node) */ + FdwModifyPrivateLen, /* has-returning flag (as an integer Value node) */ FdwModifyPrivateHasReturning, /* Integer list of attribute numbers retrieved by RETURNING */ @@ -175,7 +179,9 @@ typedef struct PgFdwModifyState /* extracted fdw_private data */ char *query; /* text of INSERT/UPDATE/DELETE command */ + char *orig_query; /* original text of INSERT command */ List *target_attrs; /* list of target attribute numbers */ + int len; /* length of some part of query */ bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ @@ -184,6 +190,9 @@ typedef struct PgFdwModifyState int p_nums; /* number of parameters to transmit */ FmgrInfo *p_flinfo; /* output conversion functions for them */ + /* bulk operation stuff */ + int num_slots; /* number of slots to insert */ + /* working memory context */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ @@ -342,6 +351,11 @@ static TupleTableSlot *postgresExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot); +static TupleTableSlot **postgresExecForeignBulkInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots); static TupleTableSlot *postgresExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, @@ -428,20 +442,23 @@ static PgFdwModifyState *create_foreign_modify(EState *estate, Plan *subplan, char *query, List *target_attrs, + int len, bool has_returning, List *retrieved_attrs); -static TupleTableSlot *execute_foreign_modify(EState *estate, +static TupleTableSlot **execute_foreign_modify(EState *estate, ResultRelInfo *resultRelInfo, CmdType operation, - TupleTableSlot *slot, - TupleTableSlot *planSlot); + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, - TupleTableSlot *slot); + TupleTableSlot **slots, + int numSlots); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); -static void finish_foreign_modify(PgFdwModifyState *fmstate); +static void finish_foreign_modify(PgFdwModifyState *fmstate, bool release_conn); static List *build_remote_returning(Index rtindex, Relation rel, List *returningList); static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist); @@ -529,6 +546,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->PlanForeignModify = postgresPlanForeignModify; routine->BeginForeignModify = postgresBeginForeignModify; routine->ExecForeignInsert = postgresExecForeignInsert; + routine->ExecForeignBulkInsert = postgresExecForeignBulkInsert; routine->ExecForeignUpdate = postgresExecForeignUpdate; routine->ExecForeignDelete = postgresExecForeignDelete; routine->EndForeignModify = postgresEndForeignModify; @@ -1663,7 +1681,9 @@ postgresPlanForeignModify(PlannerInfo *root, List *withCheckOptionList = NIL; List *returningList = NIL; List *retrieved_attrs = NIL; + List *retvalList; bool doNothing = false; + int values_end_len = -1; initStringInfo(&sql); @@ -1751,7 +1771,7 @@ postgresPlanForeignModify(PlannerInfo *root, deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, withCheckOptionList, returningList, - &retrieved_attrs); + &retrieved_attrs, &values_end_len); break; case CMD_UPDATE: deparseUpdateSql(&sql, rte, resultRelation, rel, @@ -1775,10 +1795,12 @@ postgresPlanForeignModify(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwModifyPrivateIndex, above. */ - return list_make4(makeString(sql.data), + retvalList = list_make4(makeString(sql.data), targetAttrs, - makeInteger((retrieved_attrs != NIL)), - retrieved_attrs); + makeInteger(values_end_len), + makeInteger((retrieved_attrs != NIL))); + retvalList = lappend(retvalList, retrieved_attrs); + return retvalList; } /* @@ -1796,6 +1818,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, char *query; List *target_attrs; bool has_returning; + int values_end_len; List *retrieved_attrs; RangeTblEntry *rte; @@ -1811,6 +1834,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate, FdwModifyPrivateUpdateSql)); target_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateTargetAttnums); + values_end_len = intVal(list_nth(fdw_private, + FdwModifyPrivateLen)); has_returning = intVal(list_nth(fdw_private, FdwModifyPrivateHasReturning)); retrieved_attrs = (List *) list_nth(fdw_private, @@ -1828,6 +1853,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, mtstate->mt_plans[subplan_index]->plan, query, target_attrs, + values_end_len, has_returning, retrieved_attrs); @@ -1845,7 +1871,37 @@ postgresExecForeignInsert(EState *estate, TupleTableSlot *planSlot) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; - TupleTableSlot *rslot; + TupleTableSlot **rslot; + int numSlots = 1; + + /* + * If the fmstate has aux_fmstate set, use the aux_fmstate (see + * postgresBeginForeignInsert()) + */ + if (fmstate->aux_fmstate) + resultRelInfo->ri_FdwState = fmstate->aux_fmstate; + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, + &slot, &planSlot, &numSlots); + /* Revert that change */ + if (fmstate->aux_fmstate) + resultRelInfo->ri_FdwState = fmstate; + + return rslot ? *rslot : NULL; +} + +/* + * postgresExecForeignBulkInsert + * Insert multiple rows into a foreign table + */ +static TupleTableSlot ** +postgresExecForeignBulkInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + TupleTableSlot **rslot; /* * If the fmstate has aux_fmstate set, use the aux_fmstate (see @@ -1854,7 +1910,7 @@ postgresExecForeignInsert(EState *estate, if (fmstate->aux_fmstate) resultRelInfo->ri_FdwState = fmstate->aux_fmstate; rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, - slot, planSlot); + slots, planSlots, numSlots); /* Revert that change */ if (fmstate->aux_fmstate) resultRelInfo->ri_FdwState = fmstate; @@ -1872,8 +1928,13 @@ postgresExecForeignUpdate(EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot) { - return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE, - slot, planSlot); + TupleTableSlot **rslot; + int numSlots = 1; + + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE, + &slot, &planSlot, &numSlots); + + return rslot ? *rslot : NULL; } /* @@ -1886,8 +1947,13 @@ postgresExecForeignDelete(EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot) { - return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE, - slot, planSlot); + TupleTableSlot **rslot; + int numSlots = 1; + + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE, + &slot, &planSlot, &numSlots); + + return rslot ? *rslot : NULL; } /* @@ -1905,7 +1971,7 @@ postgresEndForeignModify(EState *estate, return; /* Destroy the execution state */ - finish_foreign_modify(fmstate); + finish_foreign_modify(fmstate, true); } /* @@ -1924,6 +1990,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, RangeTblEntry *rte; TupleDesc tupdesc = RelationGetDescr(rel); int attnum; + int values_end_len; StringInfoData sql; List *targetAttrs = NIL; List *retrieved_attrs = NIL; @@ -2000,7 +2067,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, resultRelInfo->ri_WithCheckOptions, resultRelInfo->ri_returningList, - &retrieved_attrs); + &retrieved_attrs, &values_end_len); /* Construct an execution state. */ fmstate = create_foreign_modify(mtstate->ps.state, @@ -2010,6 +2077,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, NULL, sql.data, targetAttrs, + values_end_len, retrieved_attrs != NIL, retrieved_attrs); @@ -2048,7 +2116,7 @@ postgresEndForeignInsert(EState *estate, fmstate = fmstate->aux_fmstate; /* Destroy the execution state */ - finish_foreign_modify(fmstate); + finish_foreign_modify(fmstate, true); } /* @@ -3538,6 +3606,7 @@ create_foreign_modify(EState *estate, Plan *subplan, char *query, List *target_attrs, + int len, bool has_returning, List *retrieved_attrs) { @@ -3572,7 +3641,10 @@ create_foreign_modify(EState *estate, /* Set up remote query information. */ fmstate->query = query; + if (operation == CMD_INSERT) + fmstate->orig_query = pstrdup(fmstate->query); fmstate->target_attrs = target_attrs; + fmstate->len = len; fmstate->has_returning = has_returning; fmstate->retrieved_attrs = retrieved_attrs; @@ -3624,6 +3696,8 @@ create_foreign_modify(EState *estate, Assert(fmstate->p_nums <= n_params); + fmstate->num_slots = 1; + /* Initialize auxiliary state */ fmstate->aux_fmstate = NULL; @@ -3634,26 +3708,75 @@ create_foreign_modify(EState *estate, * execute_foreign_modify * Perform foreign-table modification as required, and fetch RETURNING * result if any. (This is the shared guts of postgresExecForeignInsert, - * postgresExecForeignUpdate, and postgresExecForeignDelete.) + * postgresExecForeignBulkInsert, postgresExecForeignUpdate, and + * postgresExecForeignDelete.) */ -static TupleTableSlot * +static TupleTableSlot ** execute_foreign_modify(EState *estate, ResultRelInfo *resultRelInfo, CmdType operation, - TupleTableSlot *slot, - TupleTableSlot *planSlot) + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; ItemPointer ctid = NULL; const char **p_values; PGresult *res; int n_rows; + int i, j; + int pindex; + bool first; + StringInfoData sql; /* The operation should be INSERT, UPDATE, or DELETE */ Assert(operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE); + if (operation == CMD_INSERT && fmstate->num_slots != *numSlots) + { + /* Destroy the prepared statement created previously */ + if (fmstate->p_name) + finish_foreign_modify(fmstate, false); + + /* + * Recreate INSERT command string with numSlots records in its + * VALUES clause + */ + + /* Copy up to the end of the first record from the original query */ + initStringInfo(&sql); + appendBinaryStringInfo(&sql, fmstate->orig_query, fmstate->len); + + /* Add records to VALUES clause */ + pindex = fmstate->p_nums + 1; + for (i = 0; i < *numSlots - 1; i++) + { + appendStringInfoString(&sql, ", ("); + + first = true; + for (j = 0; j < fmstate->p_nums; j++) + { + if (!first) + appendStringInfoString(&sql, ", "); + first = false; + + appendStringInfo(&sql, "$%d", pindex); + pindex++; + } + + appendStringInfoChar(&sql, ')'); + } + + /* Copy stuff after VALUES clause from the original query */ + appendStringInfoString(&sql, fmstate->orig_query + fmstate->len); + + pfree(fmstate->query); + fmstate->query = sql.data; + fmstate->num_slots = *numSlots; + } + /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); @@ -3666,7 +3789,7 @@ execute_foreign_modify(EState *estate, Datum datum; bool isNull; - datum = ExecGetJunkAttribute(planSlot, + datum = ExecGetJunkAttribute(planSlots[0], fmstate->ctidAttno, &isNull); /* shouldn't ever get a null result... */ @@ -3676,14 +3799,14 @@ execute_foreign_modify(EState *estate, } /* Convert parameters needed by prepared statement to text form */ - p_values = convert_prep_stmt_params(fmstate, ctid, slot); + p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots); /* * Execute the prepared statement. */ if (!PQsendQueryPrepared(fmstate->conn, fmstate->p_name, - fmstate->p_nums, + fmstate->p_nums * (*numSlots), p_values, NULL, NULL, @@ -3704,9 +3827,10 @@ execute_foreign_modify(EState *estate, /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) { + Assert(*numSlots == 1); n_rows = PQntuples(res); if (n_rows > 0) - store_returning_result(fmstate, slot, res); + store_returning_result(fmstate, slots[0], res); } else n_rows = atoi(PQcmdTuples(res)); @@ -3716,10 +3840,12 @@ execute_foreign_modify(EState *estate, MemoryContextReset(fmstate->temp_cxt); + *numSlots = n_rows; + /* * Return NULL if nothing was inserted/updated/deleted on the remote end */ - return (n_rows > 0) ? slot : NULL; + return (n_rows > 0) ? slots : NULL; } /* @@ -3779,19 +3905,23 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) static const char ** convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, - TupleTableSlot *slot) + TupleTableSlot **slots, + int numSlots) { const char **p_values; + int i; + int j; int pindex = 0; MemoryContext oldcontext; oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt); - p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums); + p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots); /* 1st parameter should be ctid, if it's in use */ if (tupleid != NULL) { + Assert(numSlots == 1); /* don't need set_transmission_modes for TID output */ p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], PointerGetDatum(tupleid)); @@ -3799,32 +3929,37 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate, } /* get following parameters from slot */ - if (slot != NULL && fmstate->target_attrs != NIL) + if (slots != NULL && fmstate->target_attrs != NIL) { int nestlevel; ListCell *lc; nestlevel = set_transmission_modes(); - foreach(lc, fmstate->target_attrs) + for (i = 0; i < numSlots; i++) { - int attnum = lfirst_int(lc); - Datum value; - bool isnull; + j = (tupleid != NULL) ? 1 : 0; + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + Datum value; + bool isnull; - value = slot_getattr(slot, attnum, &isnull); - if (isnull) - p_values[pindex] = NULL; - else - p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], - value); - pindex++; + value = slot_getattr(slots[i], attnum, &isnull); + if (isnull) + p_values[pindex] = NULL; + else + p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j], + value); + pindex++; + j++; + } } reset_transmission_modes(nestlevel); } - Assert(pindex == fmstate->p_nums); + Assert(pindex == fmstate->p_nums * numSlots); MemoryContextSwitchTo(oldcontext); @@ -3873,7 +4008,8 @@ store_returning_result(PgFdwModifyState *fmstate, * Release resources for a foreign insert/update/delete operation */ static void -finish_foreign_modify(PgFdwModifyState *fmstate) +finish_foreign_modify(PgFdwModifyState *fmstate, + bool release_conn) { Assert(fmstate != NULL); @@ -3897,8 +4033,11 @@ finish_foreign_modify(PgFdwModifyState *fmstate) } /* Release remote connection */ - ReleaseConnection(fmstate->conn); - fmstate->conn = NULL; + if (release_conn) + { + ReleaseConnection(fmstate->conn); + fmstate->conn = NULL; + } } /* diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index eef410d..459a9ca 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -161,7 +161,7 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, bool doNothing, List *withCheckOptionList, List *returningList, - List **retrieved_attrs); + List **retrieved_attrs, int *values_end_len); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml index 9c92934..cdf2959 100644 --- a/doc/src/sgml/fdwhandler.sgml +++ b/doc/src/sgml/fdwhandler.sgml @@ -523,8 +523,9 @@ BeginForeignModify(ModifyTableState *mtstate, Begin executing a foreign table modification operation. This routine is called during executor startup. It should perform any initialization needed prior to the actual table modifications. Subsequently, - ExecForeignInsert, ExecForeignUpdate or - ExecForeignDelete will be called for each tuple to be + ExecForeignInsert/ExecForeignBulkInsert, + ExecForeignUpdate or + ExecForeignDelete will be called for tuple(s) to be inserted, updated, or deleted. @@ -614,6 +615,56 @@ ExecForeignInsert(EState *estate, +TupleTableSlot ** +ExecForeignBulkInsert(EState *estate, + ResultRelInfo *rinfo, + TupleTableSlot **slots, + TupleTableSlot *planSlots, + int *numSlots); + + + Insert multiple tuples in bulk into the foreign table. + The parameters are the same for ExecForeignInsert + except slots and planSlots contain + multiple tuples and *numSlots> specifies the number of + tuples in those arrays. + + + + The return value is an array of slots containing the data that was + actually inserted (this might differ from the data supplied, for + example as a result of trigger actions.) + The passed-in slots can be re-used for this purpose. + The number of successfully inserted tuples is returned in + *numSlots. + + + + The data in the returned slot is used only if the INSERT + statement involves a view + WITH CHECK OPTION; or if the foreign table has + an AFTER ROW trigger. Triggers require all columns, + but the FDW could choose to optimize away returning some or all columns + depending on the contents of the + WITH CHECK OPTION constraints. + + + + If the ExecForeignBulkInsert pointer is set to + NULL, attempts to insert into the foreign table will + use ExecForeignInsert. + This function is not used if the INSERT has the + RETURNING> clause. + + + + Note that this function is also called when inserting routed tuples into + a foreign-table partition. See the callback functions + described below that allow the FDW to support that. + + + + TupleTableSlot * ExecForeignUpdate(EState *estate, ResultRelInfo *rinfo, @@ -741,8 +792,9 @@ BeginForeignInsert(ModifyTableState *mtstate, in both cases when it is the partition chosen for tuple routing and the target specified in a COPY FROM command. It should perform any initialization needed prior to the actual insertion. - Subsequently, ExecForeignInsert will be called for - each tuple to be inserted into the foreign table. + Subsequently, ExecForeignInsert or + ExecForeignBulkInsert will be called for + tuple(s) to be inserted into the foreign table. @@ -773,8 +825,8 @@ BeginForeignInsert(ModifyTableState *mtstate, Note that if the FDW does not support routable foreign-table partitions and/or executing COPY FROM on foreign tables, this - function or ExecForeignInsert subsequently called - must throw error as needed. + function or ExecForeignInsert/ExecForeignBulkInsert + subsequently called must throw error as needed. diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 29e07b7..6c4b33e 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -58,6 +58,15 @@ #include "utils/rel.h" +#define BULK_INSERT_ROWS 100 + +static void ExecBulkInsert(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int numSlots, + EState *estate, + bool canSetTag); static bool ExecOnConflictUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer conflictTid, @@ -389,6 +398,7 @@ ExecInsert(ModifyTableState *mtstate, ModifyTable *node = (ModifyTable *) mtstate->ps.plan; OnConflictAction onconflict = node->onConflictAction; PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing; + MemoryContext oldContext; /* * If the input result relation is a partitioned table, find the leaf @@ -442,6 +452,55 @@ ExecInsert(ModifyTableState *mtstate, CMD_INSERT); /* + * If the FDW supports bulk insert, accumulate tuples and insert them + * in bulk + */ + if (resultRelInfo->ri_FdwRoutine->ExecForeignBulkInsert && + resultRelInfo->ri_projectReturning == NULL) + { + /* + * If a certain number of tuples have already been accumulated, + * or a tuple has come for a different relation than that for + * the accumulated tuples, perform the bulk insert + */ + if (mtstate->mt_nslots == BULK_INSERT_ROWS || + (mtstate->mt_nslots > 0 && + mtstate->bulk_rri != resultRelInfo)) + { + ExecBulkInsert(mtstate, resultRelInfo, + mtstate->mt_slots, mtstate->mt_planslots, + mtstate->mt_nslots, + estate, canSetTag); + mtstate->mt_nslots = 0; + } + + oldContext = MemoryContextSwitchTo(estate->es_query_cxt); + + if (mtstate->mt_slots == NULL) + { + mtstate->mt_slots = palloc(sizeof(TupleTableSlot *) * + BULK_INSERT_ROWS); + mtstate->mt_planslots = palloc(sizeof(TupleTableSlot *) * + BULK_INSERT_ROWS); + } + + mtstate->mt_slots[mtstate->mt_nslots] = + MakeSingleTupleTableSlot(slot->tts_tupleDescriptor, + slot->tts_ops); + ExecCopySlot(mtstate->mt_slots[mtstate->mt_nslots], slot); + mtstate->mt_planslots[mtstate->mt_nslots] = + MakeSingleTupleTableSlot(planSlot->tts_tupleDescriptor, + planSlot->tts_ops); + ExecCopySlot(mtstate->mt_planslots[mtstate->mt_nslots], planSlot); + + mtstate->mt_nslots++; + mtstate->bulk_rri = resultRelInfo; + + MemoryContextSwitchTo(oldContext); + return NULL; + } + + /* * insert into foreign table: let the FDW do it */ slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate, @@ -702,6 +761,73 @@ ExecInsert(ModifyTableState *mtstate, } /* ---------------------------------------------------------------- + * ExecBulkInsert + * + * Insert multiple tuples in an efficient way. + * Currently, this handles inserting into a foreign table without + * RETURNING clause. + * ---------------------------------------------------------------- + */ +static void +ExecBulkInsert(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int numSlots, + EState *estate, + bool canSetTag) +{ + int i; + int numInserted = numSlots; + TupleTableSlot *slot = NULL; + TupleTableSlot **rslots; + + /* + * insert into foreign table: let the FDW do it + */ + rslots = resultRelInfo->ri_FdwRoutine->ExecForeignBulkInsert(estate, + resultRelInfo, + slots, + planSlots, + &numInserted); + + for (i = 0; i < numInserted; i++) + { + slot = rslots[i]; + + /* + * AFTER ROW Triggers or RETURNING expressions might reference the + * tableoid column, so (re-)initialize tts_tableOid before evaluating + * them. + */ + slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + + /* AFTER ROW INSERT Triggers */ + ExecARInsertTriggers(estate, resultRelInfo, slot, NIL, + mtstate->mt_transition_capture); + + /* + * Check any WITH CHECK OPTION constraints from parent views. See the + * comment in ExecInsert. + */ + if (resultRelInfo->ri_WithCheckOptions != NIL) + ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate); + } + + if (canSetTag && numInserted > 0) + { + estate->es_processed += numInserted; + setLastTid(&slot->tts_tid); + } + + for (i = 0; i < numSlots; i++) + { + ExecDropSingleTupleTableSlot(slots[i]); + ExecDropSingleTupleTableSlot(planSlots[i]); + } +} + +/* ---------------------------------------------------------------- * ExecDelete * * DELETE is like UPDATE, except that we delete the tuple and no @@ -2156,6 +2282,15 @@ ExecModifyTable(PlanState *pstate) } /* + * Insert remaining tuples for bulk insert. + */ + if (node->mt_nslots > 0) + ExecBulkInsert(node, node->bulk_rri, + node->mt_slots, node->mt_planslots, + node->mt_nslots, + estate, node->canSetTag); + + /* * We're done, but fire AFTER STATEMENT triggers before exiting. */ fireASTriggers(node); diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 95556df..c7eeff2 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -85,6 +85,12 @@ typedef TupleTableSlot *(*ExecForeignInsert_function) (EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot); +typedef TupleTableSlot **(*ExecForeignBulkInsert_function) (EState *estate, + ResultRelInfo *rinfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots); + typedef TupleTableSlot *(*ExecForeignUpdate_function) (EState *estate, ResultRelInfo *rinfo, TupleTableSlot *slot, @@ -209,6 +215,7 @@ typedef struct FdwRoutine PlanForeignModify_function PlanForeignModify; BeginForeignModify_function BeginForeignModify; ExecForeignInsert_function ExecForeignInsert; + ExecForeignBulkInsert_function ExecForeignBulkInsert; ExecForeignUpdate_function ExecForeignUpdate; ExecForeignDelete_function ExecForeignDelete; EndForeignModify_function EndForeignModify; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 6c0a7d6..e16d7a1 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1191,6 +1191,12 @@ typedef struct ModifyTableState /* controls transition table population for INSERT...ON CONFLICT UPDATE */ struct TransitionCaptureState *mt_oc_transition_capture; + + /* bulk insert stuff */ + int mt_nslots; /* number of slots in the array */ + TupleTableSlot **mt_slots; /* input tuples for bulk insert */ + TupleTableSlot **mt_planslots; + ResultRelInfo *bulk_rri; /* target relation for bulk insert */ } ModifyTableState; /* ---------------- -- 2.10.1