From 108dc421cec88ab5afd092f40da3fa31b8fcfbc5 Mon Sep 17 00:00:00 2001
From: "Andrey V. Lepikhov" <a.lepikhov@postgrespro.ru>
Date: Mon, 22 Jun 2020 10:28:42 +0500
Subject: [PATCH] Fast COPY FROM into the foreign or sharded table.

This feature enables bulk COPY into foreign table in the case of
multi inserts is possible and foreign table has non-zero number of columns.
---
 contrib/postgres_fdw/deparse.c                |  60 ++++-
 .../postgres_fdw/expected/postgres_fdw.out    |  33 ++-
 contrib/postgres_fdw/postgres_fdw.c           |  87 ++++++++
 contrib/postgres_fdw/postgres_fdw.h           |   1 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  28 +++
 src/backend/commands/copy.c                   | 206 ++++++++++++------
 src/include/commands/copy.h                   |   5 +
 src/include/foreign/fdwapi.h                  |   8 +
 8 files changed, 344 insertions(+), 84 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index ad37a74221..a37981ff66 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -184,6 +184,8 @@ static void appendAggOrderBy(List *orderList, List *targetList,
 static void appendFunctionName(Oid funcid, deparse_expr_cxt *context);
 static Node *deparseSortGroupClause(Index ref, List *tlist, bool force_colno,
 									deparse_expr_cxt *context);
+static List *deparseRelColumnList(StringInfo buf, Relation rel,
+								  bool enclose_in_parens);
 
 /*
  * Helper functions
@@ -1758,6 +1760,20 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 						 withCheckOptionList, returningList, retrieved_attrs);
 }
 
+/*
+ * Deparse COPY FROM into given buf.
+ * We need to use list of parameters at each query.
+ */
+void
+deparseCopyFromSql(StringInfo buf, Relation rel)
+{
+	appendStringInfoString(buf, "COPY ");
+	deparseRelation(buf, rel);
+	(void) deparseRelColumnList(buf, rel, true);
+
+	appendStringInfoString(buf, " FROM STDIN ");
+}
+
 /*
  * deparse remote UPDATE statement
  *
@@ -2061,6 +2077,30 @@ deparseAnalyzeSizeSql(StringInfo buf, Relation rel)
  */
 void
 deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
+{
+	appendStringInfoString(buf, "SELECT ");
+	*retrieved_attrs = deparseRelColumnList(buf, rel, false);
+
+	/* Don't generate bad syntax for zero-column relation. */
+	if (list_length(*retrieved_attrs) == 0)
+		appendStringInfoString(buf, "NULL");
+
+	/*
+	 * Construct FROM clause
+	 */
+	appendStringInfoString(buf, " FROM ");
+	deparseRelation(buf, rel);
+}
+
+/*
+ * Construct the list of columns of given foreign relation in the order they
+ * appear in the tuple descriptor of the relation. Ignore any dropped columns.
+ * Use column names on the foreign server instead of local names.
+ *
+ * Optionally enclose the list in parantheses.
+ */
+static List *
+deparseRelColumnList(StringInfo buf, Relation rel, bool enclose_in_parens)
 {
 	Oid			relid = RelationGetRelid(rel);
 	TupleDesc	tupdesc = RelationGetDescr(rel);
@@ -2069,10 +2109,8 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
 	List	   *options;
 	ListCell   *lc;
 	bool		first = true;
+	List	   *retrieved_attrs = NIL;
 
-	*retrieved_attrs = NIL;
-
-	appendStringInfoString(buf, "SELECT ");
 	for (i = 0; i < tupdesc->natts; i++)
 	{
 		/* Ignore dropped columns. */
@@ -2081,6 +2119,9 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
 
 		if (!first)
 			appendStringInfoString(buf, ", ");
+		else if (enclose_in_parens)
+			appendStringInfoChar(buf, '(');
+
 		first = false;
 
 		/* Use attribute name or column_name option. */
@@ -2100,18 +2141,13 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
 
 		appendStringInfoString(buf, quote_identifier(colname));
 
-		*retrieved_attrs = lappend_int(*retrieved_attrs, i + 1);
+		retrieved_attrs = lappend_int(retrieved_attrs, i + 1);
 	}
 
-	/* Don't generate bad syntax for zero-column relation. */
-	if (first)
-		appendStringInfoString(buf, "NULL");
+	if (enclose_in_parens && list_length(retrieved_attrs) > 0)
+		appendStringInfoChar(buf, ')');
 
-	/*
-	 * Construct FROM clause
-	 */
-	appendStringInfoString(buf, " FROM ");
-	deparseRelation(buf, rel);
+	return retrieved_attrs;
 }
 
 /*
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 82fc1290ef..3a3cca5047 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8063,8 +8063,9 @@ copy rem2 from stdin;
 copy rem2 from stdin; -- ERROR
 ERROR:  new row for relation "loc2" violates check constraint "loc2_f1positive"
 DETAIL:  Failing row contains (-1, xyzzy).
-CONTEXT:  remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2)
-COPY rem2, line 1: "-1	xyzzy"
+CONTEXT:  COPY loc2, line 1: "-1	xyzzy"
+remote SQL command: COPY public.loc2(f1, f2) FROM STDIN 
+COPY rem2, line 2
 select * from rem2;
  f1 | f2  
 ----+-----
@@ -8183,6 +8184,34 @@ drop trigger rem2_trig_row_before on rem2;
 drop trigger rem2_trig_row_after on rem2;
 drop trigger loc2_trig_row_before_insert on loc2;
 delete from rem2;
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+ERROR:  column "f1" of relation "loc2" does not exist
+CONTEXT:  remote SQL command: COPY public.loc2(f1, f2) FROM STDIN 
+COPY rem2, line 3
+alter table loc2 add column f1 int;
+alter table loc2 add column f2 int;
+select * from rem2;
+ f1 | f2 
+----+----
+(0 rows)
+
+-- dropped columns locally and on the foreign server
+alter table rem2 drop column f1;
+alter table rem2 drop column f2;
+copy rem2 from stdin;
+select * from rem2;
+--
+(2 rows)
+
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+select * from rem2;
+--
+(4 rows)
+
 -- test COPY FROM with foreign table created in the same transaction
 create table loc3 (f1 int, f2 text);
 begin;
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 9fc53cad68..2b3d7d6dfb 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -18,6 +18,7 @@
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "catalog/pg_class.h"
+#include "commands/copy.h"
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
@@ -190,6 +191,7 @@ typedef struct PgFdwModifyState
 	/* for update row movement if subplan result rel */
 	struct PgFdwModifyState *aux_fmstate;	/* foreign-insert state, if
 											 * created */
+	CopyState fdwcstate;
 } PgFdwModifyState;
 
 /*
@@ -350,6 +352,9 @@ static TupleTableSlot *postgresExecForeignDelete(EState *estate,
 												 ResultRelInfo *resultRelInfo,
 												 TupleTableSlot *slot,
 												 TupleTableSlot *planSlot);
+static void postgresExecForeignBulkInsert(ResultRelInfo *resultRelInfo,
+									TupleTableSlot **slots,
+									int nslots);
 static void postgresEndForeignModify(EState *estate,
 									 ResultRelInfo *resultRelInfo);
 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
@@ -530,6 +535,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	routine->ExecForeignInsert = postgresExecForeignInsert;
 	routine->ExecForeignUpdate = postgresExecForeignUpdate;
 	routine->ExecForeignDelete = postgresExecForeignDelete;
+	routine->ExecForeignBulkInsert = postgresExecForeignBulkInsert;
 	routine->EndForeignModify = postgresEndForeignModify;
 	routine->BeginForeignInsert = postgresBeginForeignInsert;
 	routine->EndForeignInsert = postgresEndForeignInsert;
@@ -1890,6 +1896,87 @@ postgresExecForeignDelete(EState *estate,
 								  slot, planSlot);
 }
 
+/*
+ * postgresExecForeignBulkInsert
+ *		Copy rows into a foreign table by COPY .. FROM STDIN machinery
+ */
+static void
+postgresExecForeignBulkInsert(ResultRelInfo *resultRelInfo,
+							  TupleTableSlot **slots,
+							  int nslots)
+{
+	Relation rel = resultRelInfo->ri_RelationDesc;
+	PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState;
+	StringInfoData sql;
+	PGresult *res;
+	bool status = false;
+	PGconn *conn = fmstate->conn;
+	int i;
+
+	Assert(resultRelInfo->ri_FdwRoutine != NULL &&
+		   resultRelInfo->ri_FdwState != NULL);
+
+	fmstate->target_attrs = NULL;
+	fmstate->has_returning = false;
+	fmstate->retrieved_attrs = NULL;
+	fmstate->fdwcstate = BeginForeignCopyTo(rel);
+
+	initStringInfo(&sql);
+	deparseCopyFromSql(&sql, rel);
+	fmstate->query = sql.data;
+
+	res = PQexec(conn, fmstate->query);
+	if (PQresultStatus(res) != PGRES_COPY_IN)
+		pgfdw_report_error(ERROR, res, conn, true, fmstate->query);
+	PQclear(res);
+
+	PG_TRY();
+	{
+		for (i = 0; i < nslots; i++)
+		{
+			char *buf = NextForeignCopyRow(fmstate->fdwcstate, slots[i]);
+
+			if (PQputCopyData(conn, buf, strlen(buf)) <= 0)
+			{
+				res = PQgetResult(conn);
+				pgfdw_report_error(ERROR, res, conn, true, fmstate->query);
+			}
+		}
+
+		status = true;
+	}
+	PG_FINALLY();
+	{
+		/* Finish COPY IN protocol. It is needed to do after successful copy or
+		 * after an error.
+		 */
+		if (PQputCopyEnd(conn, status ? NULL : _("canceled by server")) <= 0 ||
+			PQflush(conn))
+			ereport(ERROR,
+					(errmsg("error returned by PQputCopyEnd: %s",
+							PQerrorMessage(conn))));
+
+		/* After successfully  sending an EOF signal, check command status. */
+		res = PQgetResult(conn);
+		if ((!status && PQresultStatus(res) != PGRES_FATAL_ERROR) ||
+			(status && PQresultStatus(res) != PGRES_COMMAND_OK))
+			pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+
+		PQclear(res);
+		/* Do this to ensure we've pumped libpq back to idle state */
+		if (PQgetResult(conn) != NULL)
+			ereport(ERROR,
+					(errmsg("unexpected extra results during COPY of table: %s",
+							PQerrorMessage(conn))));
+
+		EndForeignCopyTo(fmstate->fdwcstate);
+		pfree(fmstate->fdwcstate);
+
+		if (!status)
+			PG_RE_THROW();
+	}
+	PG_END_TRY();
+}
 /*
  * postgresEndForeignModify
  *		Finish an insert/update/delete operation on a foreign table
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410db39..8fc5ff018f 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -162,6 +162,7 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 							 List *targetAttrs, bool doNothing,
 							 List *withCheckOptionList, List *returningList,
 							 List **retrieved_attrs);
+extern void deparseCopyFromSql(StringInfo buf, Relation rel);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
 							 Index rtindex, Relation rel,
 							 List *targetAttrs,
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 83971665e3..73f98a3152 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2293,6 +2293,34 @@ drop trigger loc2_trig_row_before_insert on loc2;
 
 delete from rem2;
 
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+1	foo
+2	bar
+\.
+
+alter table loc2 add column f1 int;
+alter table loc2 add column f2 int;
+select * from rem2;
+
+-- dropped columns locally and on the foreign server
+alter table rem2 drop column f1;
+alter table rem2 drop column f2;
+copy rem2 from stdin;
+
+
+\.
+select * from rem2;
+
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+
+
+\.
+select * from rem2;
+
 -- test COPY FROM with foreign table created in the same transaction
 create table loc3 (f1 int, f2 text);
 begin;
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 6d53dc463c..ddf3c10146 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -133,6 +133,7 @@ typedef struct CopyStateData
 	char	   *filename;		/* filename, or NULL for STDIN/STDOUT */
 	bool		is_program;		/* is 'filename' a program to popen? */
 	copy_data_source_cb data_source_cb; /* function for reading data */
+	copy_data_dest_cb data_dest_cb;
 	bool		binary;			/* binary format? */
 	bool		freeze;			/* freeze rows on loading? */
 	bool		csv_mode;		/* Comma Separated Value format? */
@@ -358,8 +359,11 @@ static void EndCopy(CopyState cstate);
 static void ClosePipeToProgram(CopyState cstate);
 static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
 							 Oid queryRelId, const char *filename, bool is_program,
-							 List *attnamelist, List *options);
+							 copy_data_dest_cb data_dest_cb, List *attnamelist,
+							 List *options);
 static void EndCopyTo(CopyState cstate);
+static void CopyToStart(CopyState cstate);
+static void CopyToFinish(CopyState cstate);
 static uint64 DoCopyTo(CopyState cstate);
 static uint64 CopyTo(CopyState cstate);
 static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
@@ -587,7 +591,9 @@ CopySendEndOfRow(CopyState cstate)
 			(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
 			break;
 		case COPY_CALLBACK:
-			Assert(false);		/* Not yet supported. */
+			CopySendChar(cstate, '\n');
+			CopySendChar(cstate, '\0');
+			cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
 			break;
 	}
 
@@ -1075,7 +1081,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
 	else
 	{
 		cstate = BeginCopyTo(pstate, rel, query, relid,
-							 stmt->filename, stmt->is_program,
+							 stmt->filename, stmt->is_program, NULL,
 							 stmt->attlist, stmt->options);
 		*processed = DoCopyTo(cstate);	/* copy from database to file */
 		EndCopyTo(cstate);
@@ -1815,6 +1821,32 @@ EndCopy(CopyState cstate)
 	pfree(cstate);
 }
 
+static char *buf = NULL;
+static void
+data_dest_cb(void *outbuf, int len)
+{
+	buf = (char *) palloc(len);
+	memcpy(buf, (char *) outbuf, len);
+}
+
+CopyState
+BeginForeignCopyTo(Relation rel)
+{
+	CopyState cstate;
+
+	cstate = BeginCopy(NULL, false, rel, NULL, InvalidOid, NIL, NIL);
+	cstate->copy_dest = COPY_CALLBACK;
+	cstate->data_dest_cb = data_dest_cb;
+	CopyToStart(cstate);
+	return cstate;
+}
+
+void
+EndForeignCopyTo(CopyState cstate)
+{
+	CopyToFinish(cstate);
+}
+
 /*
  * Setup CopyState to read tuples from a table or a query for COPY TO.
  */
@@ -1825,6 +1857,7 @@ BeginCopyTo(ParseState *pstate,
 			Oid queryRelId,
 			const char *filename,
 			bool is_program,
+			copy_data_dest_cb data_dest_cb,
 			List *attnamelist,
 			List *options)
 {
@@ -1880,6 +1913,11 @@ BeginCopyTo(ParseState *pstate,
 		if (whereToSendOutput != DestRemote)
 			cstate->copy_file = stdout;
 	}
+	else if (data_dest_cb)
+	{
+		cstate->copy_dest = COPY_CALLBACK;
+		cstate->data_dest_cb = data_dest_cb;
+	}
 	else
 	{
 		cstate->filename = pstrdup(filename);
@@ -1950,6 +1988,13 @@ BeginCopyTo(ParseState *pstate,
 	return cstate;
 }
 
+char *
+NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot)
+{
+	CopyOneRowTo(cstate, slot);
+	return buf;
+}
+
 /*
  * This intermediate routine exists mainly to localize the effects of setjmp
  * so we don't need to plaster a lot of variables with "volatile".
@@ -1966,7 +2011,9 @@ DoCopyTo(CopyState cstate)
 		if (fe_copy)
 			SendCopyBegin(cstate);
 
+		CopyToStart(cstate);
 		processed = CopyTo(cstate);
+		CopyToFinish(cstate);
 
 		if (fe_copy)
 			SendCopyEnd(cstate);
@@ -2005,16 +2052,12 @@ EndCopyTo(CopyState cstate)
 	EndCopy(cstate);
 }
 
-/*
- * Copy from relation or query TO file.
- */
-static uint64
-CopyTo(CopyState cstate)
+static void
+CopyToStart(CopyState cstate)
 {
 	TupleDesc	tupDesc;
 	int			num_phys_attrs;
 	ListCell   *cur;
-	uint64		processed;
 
 	if (cstate->rel)
 		tupDesc = RelationGetDescr(cstate->rel);
@@ -2104,6 +2147,29 @@ CopyTo(CopyState cstate)
 			CopySendEndOfRow(cstate);
 		}
 	}
+}
+
+static void
+CopyToFinish(CopyState cstate)
+{
+	if (cstate->binary)
+	{
+		/* Generate trailer for a binary copy */
+		CopySendInt16(cstate, -1);
+		/* Need to flush out the trailer */
+		CopySendEndOfRow(cstate);
+	}
+
+	MemoryContextDelete(cstate->rowcontext);
+}
+
+/*
+ * Copy from relation or query TO file.
+ */
+static uint64
+CopyTo(CopyState cstate)
+{
+	uint64		processed;
 
 	if (cstate->rel)
 	{
@@ -2135,17 +2201,6 @@ CopyTo(CopyState cstate)
 		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
 		processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
 	}
-
-	if (cstate->binary)
-	{
-		/* Generate trailer for a binary copy */
-		CopySendInt16(cstate, -1);
-		/* Need to flush out the trailer */
-		CopySendEndOfRow(cstate);
-	}
-
-	MemoryContextDelete(cstate->rowcontext);
-
 	return processed;
 }
 
@@ -2449,53 +2504,64 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	cstate->line_buf_valid = false;
 	save_cur_lineno = cstate->cur_lineno;
 
-	/*
-	 * table_multi_insert may leak memory, so switch to short-lived memory
-	 * context before calling it.
-	 */
-	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	table_multi_insert(resultRelInfo->ri_RelationDesc,
-					   slots,
-					   nused,
-					   mycid,
-					   ti_options,
-					   buffer->bistate);
-	MemoryContextSwitchTo(oldcontext);
-
-	for (i = 0; i < nused; i++)
+	if (resultRelInfo->ri_RelationDesc->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+	{
+		/* Flush into foreign table or partition */
+		resultRelInfo->ri_FdwRoutine->ExecForeignBulkInsert(resultRelInfo,
+															  slots,
+															  nused);
+	}
+	else
 	{
 		/*
-		 * If there are any indexes, update them for all the inserted tuples,
-		 * and run AFTER ROW INSERT triggers.
+		 * table_multi_insert may leak memory, so switch to short-lived memory
+		 * context before calling it.
 		 */
-		if (resultRelInfo->ri_NumIndices > 0)
-		{
-			List	   *recheckIndexes;
-
-			cstate->cur_lineno = buffer->linenos[i];
-			recheckIndexes =
-				ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
-									  NIL);
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], recheckIndexes,
-								 cstate->transition_capture);
-			list_free(recheckIndexes);
-		}
+		oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+
+		table_multi_insert(resultRelInfo->ri_RelationDesc,
+						   slots,
+						   nused,
+						   mycid,
+						   ti_options,
+						   buffer->bistate);
+		MemoryContextSwitchTo(oldcontext);
 
-		/*
-		 * There's no indexes, but see if we need to run AFTER ROW INSERT
-		 * triggers anyway.
-		 */
-		else if (resultRelInfo->ri_TrigDesc != NULL &&
-				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+		for (i = 0; i < nused; i++)
 		{
-			cstate->cur_lineno = buffer->linenos[i];
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], NIL, cstate->transition_capture);
-		}
+			/*
+			 * If there are any indexes, update them for all the inserted tuples,
+			 * and run AFTER ROW INSERT triggers.
+			 */
+			if (resultRelInfo->ri_NumIndices > 0)
+			{
+				List	   *recheckIndexes;
+
+				cstate->cur_lineno = buffer->linenos[i];
+				recheckIndexes =
+					ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
+										  NIL);
+				ExecARInsertTriggers(estate, resultRelInfo,
+									 slots[i], recheckIndexes,
+									 cstate->transition_capture);
+				list_free(recheckIndexes);
+			}
+
+			/*
+			 * There's no indexes, but see if we need to run AFTER ROW INSERT
+			 * triggers anyway.
+			 */
+			else if (resultRelInfo->ri_TrigDesc != NULL &&
+					 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+					  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+			{
+				cstate->cur_lineno = buffer->linenos[i];
+				ExecARInsertTriggers(estate, resultRelInfo,
+									 slots[i], NIL, cstate->transition_capture);
+			}
 
-		ExecClearTuple(slots[i]);
+			ExecClearTuple(slots[i]);
+		}
 	}
 
 	/* Mark that all slots are free */
@@ -2868,14 +2934,14 @@ CopyFrom(CopyState cstate)
 		 */
 		insertMethod = CIM_SINGLE;
 	}
-	else if (resultRelInfo->ri_FdwRoutine != NULL ||
-			 cstate->volatile_defexprs)
+	else if (cstate->volatile_defexprs || (resultRelInfo->ri_FdwRoutine != NULL &&
+			list_length(cstate->attnumlist) == 0))
 	{
 		/*
-		 * Can't support multi-inserts to foreign tables or if there are any
-		 * volatile default expressions in the table.  Similarly to the
-		 * trigger case above, such expressions may query the table we're
-		 * inserting into.
+		 * Can't support bufferization of copy into foreign tables without any
+		 * defined columns or if there are any volatile default expressions in the
+		 * table. Similarly to the trigger case above, such expressions may query
+		 * the table we're inserting into.
 		 *
 		 * Note: It does not matter if any partitions have any volatile
 		 * default expressions as we use the defaults from the target of the
@@ -3037,8 +3103,7 @@ CopyFrom(CopyState cstate)
 				 */
 				leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
 					!has_before_insert_row_trig &&
-					!has_instead_insert_row_trig &&
-					resultRelInfo->ri_FdwRoutine == NULL;
+					!has_instead_insert_row_trig;
 
 				/* Set the multi-insert buffer to use for this partition. */
 				if (leafpart_use_multi_insert)
@@ -3048,7 +3113,8 @@ CopyFrom(CopyState cstate)
 													   resultRelInfo);
 				}
 				else if (insertMethod == CIM_MULTI_CONDITIONAL &&
-						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
+						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo) &&
+						 resultRelInfo->ri_FdwRoutine == NULL)
 				{
 					/*
 					 * Flush pending inserts if this partition can't use
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index c639833565..ef119a761a 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -22,6 +22,7 @@
 /* CopyStateData is private in commands/copy.c */
 typedef struct CopyStateData *CopyState;
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+typedef void (*copy_data_dest_cb) (void *outbuf, int len);
 
 extern void DoCopy(ParseState *state, const CopyStmt *stmt,
 				   int stmt_location, int stmt_len,
@@ -41,4 +42,8 @@ extern uint64 CopyFrom(CopyState cstate);
 
 extern DestReceiver *CreateCopyDestReceiver(void);
 
+extern CopyState BeginForeignCopyTo(Relation rel);
+extern char *NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot);
+extern void EndForeignCopyTo(CopyState cstate);
+
 #endif							/* COPY_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 95556dfb15..0507c2f96f 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -94,6 +94,9 @@ typedef TupleTableSlot *(*ExecForeignDelete_function) (EState *estate,
 													   ResultRelInfo *rinfo,
 													   TupleTableSlot *slot,
 													   TupleTableSlot *planSlot);
+typedef void (*ExecForeignBulkInsert_function) (ResultRelInfo *rinfo,
+												TupleTableSlot **slots,
+												int nslots);
 
 typedef void (*EndForeignModify_function) (EState *estate,
 										   ResultRelInfo *rinfo);
@@ -104,6 +107,10 @@ typedef void (*BeginForeignInsert_function) (ModifyTableState *mtstate,
 typedef void (*EndForeignInsert_function) (EState *estate,
 										   ResultRelInfo *rinfo);
 
+typedef void (*BeginForeignCopy_function) (ResultRelInfo *rinfo);
+
+typedef void (*EndForeignCopy_function) (ResultRelInfo *rinfo, bool status);
+
 typedef int (*IsForeignRelUpdatable_function) (Relation rel);
 
 typedef bool (*PlanDirectModify_function) (PlannerInfo *root,
@@ -211,6 +218,7 @@ typedef struct FdwRoutine
 	ExecForeignInsert_function ExecForeignInsert;
 	ExecForeignUpdate_function ExecForeignUpdate;
 	ExecForeignDelete_function ExecForeignDelete;
+	ExecForeignBulkInsert_function ExecForeignBulkInsert;
 	EndForeignModify_function EndForeignModify;
 	BeginForeignInsert_function BeginForeignInsert;
 	EndForeignInsert_function EndForeignInsert;
-- 
2.17.1

