From 6cc545a00f6f6b18926602880819eeed9313550b Mon Sep 17 00:00:00 2001
From: "Andrey V. Lepikhov" <a.lepikhov@postgrespro.ru>
Date: Fri, 29 May 2020 10:39:57 +0500
Subject: [PATCH] Fast COPY FROM into the foreign (or sharded) table.

---
 contrib/postgres_fdw/deparse.c                |  25 ++
 .../postgres_fdw/expected/postgres_fdw.out    |   5 +-
 contrib/postgres_fdw/postgres_fdw.c           |  95 ++++++++
 contrib/postgres_fdw/postgres_fdw.h           |   1 +
 src/backend/commands/copy.c                   | 216 ++++++++++++------
 src/include/commands/copy.h                   |   5 +
 src/include/foreign/fdwapi.h                  |   9 +
 7 files changed, 289 insertions(+), 67 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index ad37a74221..427402c8eb 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1758,6 +1758,31 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 						 withCheckOptionList, returningList, retrieved_attrs);
 }
 
+/*
+ * Deparse COPY FROM into given buf.
+ * We need to use list of parameters at each query.
+ */
+void
+deparseCopyFromSql(StringInfo buf, Relation rel)
+{
+	int attnum;
+
+	appendStringInfoString(buf, "COPY ");
+	deparseRelation(buf, rel);
+	appendStringInfoString(buf, " ( ");
+
+	for(attnum = 0; attnum < rel->rd_att->natts; attnum++)
+	{
+		appendStringInfoString(buf, NameStr(rel->rd_att->attrs[attnum].attname));
+
+		if (attnum != rel->rd_att->natts-1)
+			appendStringInfoString(buf, ", ");
+	}
+
+	appendStringInfoString(buf, " ) ");
+	appendStringInfoString(buf, " FROM STDIN ");
+}
+
 /*
  * deparse remote UPDATE statement
  *
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 90db550b92..5ae24fef7c 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8063,8 +8063,9 @@ copy rem2 from stdin;
 copy rem2 from stdin; -- ERROR
 ERROR:  new row for relation "loc2" violates check constraint "loc2_f1positive"
 DETAIL:  Failing row contains (-1, xyzzy).
-CONTEXT:  remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2)
-COPY rem2, line 1: "-1	xyzzy"
+CONTEXT:  COPY loc2, line 1: "-1	xyzzy"
+remote SQL command: COPY public.loc2 ( f1, f2 )  FROM STDIN 
+COPY rem2, line 2
 select * from rem2;
  f1 | f2  
 ----+-----
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 9fc53cad68..bd2a8f596f 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -18,6 +18,7 @@
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "catalog/pg_class.h"
+#include "commands/copy.h"
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
@@ -190,6 +191,7 @@ typedef struct PgFdwModifyState
 	/* for update row movement if subplan result rel */
 	struct PgFdwModifyState *aux_fmstate;	/* foreign-insert state, if
 											 * created */
+	CopyState fdwcstate;
 } PgFdwModifyState;
 
 /*
@@ -350,12 +352,16 @@ static TupleTableSlot *postgresExecForeignDelete(EState *estate,
 												 ResultRelInfo *resultRelInfo,
 												 TupleTableSlot *slot,
 												 TupleTableSlot *planSlot);
+static void postgresExecForeignCopy(ResultRelInfo *resultRelInfo,
+												 TupleTableSlot *slot);
 static void postgresEndForeignModify(EState *estate,
 									 ResultRelInfo *resultRelInfo);
 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
 									   ResultRelInfo *resultRelInfo);
 static void postgresEndForeignInsert(EState *estate,
 									 ResultRelInfo *resultRelInfo);
+static void postgresBeginForeignCopy(ResultRelInfo *resultRelInfo);
+static void postgresEndForeignCopy(ResultRelInfo *resultRelInfo, bool status);
 static int	postgresIsForeignRelUpdatable(Relation rel);
 static bool postgresPlanDirectModify(PlannerInfo *root,
 									 ModifyTable *plan,
@@ -530,9 +536,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	routine->ExecForeignInsert = postgresExecForeignInsert;
 	routine->ExecForeignUpdate = postgresExecForeignUpdate;
 	routine->ExecForeignDelete = postgresExecForeignDelete;
+	routine->ExecForeignCopy = postgresExecForeignCopy;
 	routine->EndForeignModify = postgresEndForeignModify;
 	routine->BeginForeignInsert = postgresBeginForeignInsert;
 	routine->EndForeignInsert = postgresEndForeignInsert;
+	routine->BeginForeignCopy = postgresBeginForeignCopy;
+	routine->EndForeignCopy = postgresEndForeignCopy;
 	routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
 	routine->PlanDirectModify = postgresPlanDirectModify;
 	routine->BeginDirectModify = postgresBeginDirectModify;
@@ -1890,6 +1899,27 @@ postgresExecForeignDelete(EState *estate,
 								  slot, planSlot);
 }
 
+/*
+ * postgresExecForeignCopy
+ *		Copy one row into a foreign table
+ */
+static void
+postgresExecForeignCopy(ResultRelInfo *resultRelInfo, TupleTableSlot *slot)
+{
+	PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState;
+	char *buf;
+
+	buf = NextForeignCopyRow(fmstate->fdwcstate, slot);
+
+	if (PQputCopyData(fmstate->conn, buf, strlen(buf)) <= 0)
+	{
+		PGresult *res;
+
+		res = PQgetResult(fmstate->conn);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+			pgfdw_report_error(ERROR, res, fmstate->conn, false, fmstate->query);
+	}
+}
 /*
  * postgresEndForeignModify
  *		Finish an insert/update/delete operation on a foreign table
@@ -2051,6 +2081,71 @@ postgresEndForeignInsert(EState *estate,
 	finish_foreign_modify(fmstate);
 }
 
+/*
+ * postgresBeginForeignCopy
+ *		Begin an COPY operation on a foreign table
+ */
+static void
+postgresBeginForeignCopy(ResultRelInfo *resultRelInfo)
+{
+	Relation rel = resultRelInfo->ri_RelationDesc;
+	PgFdwModifyState *fmstate = (PgFdwModifyState *) (resultRelInfo->ri_FdwState);
+	StringInfoData sql;
+	PGresult *res;
+
+	Assert(resultRelInfo->ri_FdwRoutine != NULL);
+
+	fmstate->target_attrs = NULL;
+	fmstate->has_returning = false;
+	fmstate->retrieved_attrs = NULL;
+
+	if (fmstate->fdwcstate == NULL)
+		fmstate->fdwcstate = BeginForeignCopyTo(rel);
+
+	initStringInfo(&sql);
+	deparseCopyFromSql(&sql, rel);
+	fmstate->query = sql.data;
+
+	res = PQexec(fmstate->conn, fmstate->query);
+}
+
+/*
+ * postgresEndForeignCopy
+ *		Finish an COPY operation on a foreign table
+ */
+static void
+postgresEndForeignCopy(ResultRelInfo *resultRelInfo, bool status)
+{
+	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+	PGresult *res;
+
+	Assert(fmstate != NULL);
+
+	if (!status)
+	{
+		PQputCopyEnd(fmstate->conn, (PQprotocolVersion(fmstate->conn) < 3) ?
+					NULL :
+					_("aborted foreign copy"));
+		pfree(fmstate->fdwcstate);
+		fmstate->fdwcstate = NULL;
+		EndForeignCopyTo(fmstate->fdwcstate);
+		return;
+	}
+
+	while (res = PQgetResult(fmstate->conn), PQresultStatus(res) == PGRES_COPY_IN)
+	{
+		/* We can't send an error message if we're using protocol version 2 */
+		PQputCopyEnd(fmstate->conn, (status || PQprotocolVersion(fmstate->conn) < 3) ? NULL :
+					 _("aborted foreign copy"));
+		PQclear(res);
+	}
+
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pgfdw_report_error(ERROR, res, fmstate->conn, false, fmstate->query);
+
+	while (PQgetResult(fmstate->conn) != NULL);
+}
+
 /*
  * postgresIsForeignRelUpdatable
  *		Determine whether a foreign table supports INSERT, UPDATE and/or
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410db39..8fc5ff018f 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -162,6 +162,7 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 							 List *targetAttrs, bool doNothing,
 							 List *withCheckOptionList, List *returningList,
 							 List **retrieved_attrs);
+extern void deparseCopyFromSql(StringInfo buf, Relation rel);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
 							 Index rtindex, Relation rel,
 							 List *targetAttrs,
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 6d53dc463c..9459e031c7 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -133,6 +133,7 @@ typedef struct CopyStateData
 	char	   *filename;		/* filename, or NULL for STDIN/STDOUT */
 	bool		is_program;		/* is 'filename' a program to popen? */
 	copy_data_source_cb data_source_cb; /* function for reading data */
+	copy_data_dest_cb data_dest_cb;
 	bool		binary;			/* binary format? */
 	bool		freeze;			/* freeze rows on loading? */
 	bool		csv_mode;		/* Comma Separated Value format? */
@@ -358,8 +359,11 @@ static void EndCopy(CopyState cstate);
 static void ClosePipeToProgram(CopyState cstate);
 static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
 							 Oid queryRelId, const char *filename, bool is_program,
-							 List *attnamelist, List *options);
+							 copy_data_dest_cb data_dest_cb, List *attnamelist,
+							 List *options);
 static void EndCopyTo(CopyState cstate);
+static void CopyToStart(CopyState cstate);
+static void CopyToFinish(CopyState cstate);
 static uint64 DoCopyTo(CopyState cstate);
 static uint64 CopyTo(CopyState cstate);
 static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
@@ -587,7 +591,9 @@ CopySendEndOfRow(CopyState cstate)
 			(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
 			break;
 		case COPY_CALLBACK:
-			Assert(false);		/* Not yet supported. */
+			CopySendChar(cstate, '\n');
+			CopySendChar(cstate, '\0');
+			cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
 			break;
 	}
 
@@ -1075,7 +1081,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
 	else
 	{
 		cstate = BeginCopyTo(pstate, rel, query, relid,
-							 stmt->filename, stmt->is_program,
+							 stmt->filename, stmt->is_program, NULL,
 							 stmt->attlist, stmt->options);
 		*processed = DoCopyTo(cstate);	/* copy from database to file */
 		EndCopyTo(cstate);
@@ -1815,6 +1821,32 @@ EndCopy(CopyState cstate)
 	pfree(cstate);
 }
 
+static char *buf = NULL;
+static void
+data_dest_cb(void *outbuf, int len)
+{
+	buf = (char *) palloc(len);
+	memcpy(buf, (char *) outbuf, len);
+}
+
+CopyState
+BeginForeignCopyTo(Relation rel)
+{
+	CopyState cstate;
+
+	cstate = BeginCopy(NULL, false, rel, NULL, InvalidOid, NIL, NIL);
+	cstate->copy_dest = COPY_CALLBACK;
+	cstate->data_dest_cb = data_dest_cb;
+	CopyToStart(cstate);
+	return cstate;
+}
+
+void
+EndForeignCopyTo(CopyState cstate)
+{
+	CopyToFinish(cstate);
+}
+
 /*
  * Setup CopyState to read tuples from a table or a query for COPY TO.
  */
@@ -1825,6 +1857,7 @@ BeginCopyTo(ParseState *pstate,
 			Oid queryRelId,
 			const char *filename,
 			bool is_program,
+			copy_data_dest_cb data_dest_cb,
 			List *attnamelist,
 			List *options)
 {
@@ -1880,6 +1913,11 @@ BeginCopyTo(ParseState *pstate,
 		if (whereToSendOutput != DestRemote)
 			cstate->copy_file = stdout;
 	}
+	else if (data_dest_cb)
+	{
+		cstate->copy_dest = COPY_CALLBACK;
+		cstate->data_dest_cb = data_dest_cb;
+	}
 	else
 	{
 		cstate->filename = pstrdup(filename);
@@ -1950,6 +1988,13 @@ BeginCopyTo(ParseState *pstate,
 	return cstate;
 }
 
+char *
+NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot)
+{
+	CopyOneRowTo(cstate, slot);
+	return buf;
+}
+
 /*
  * This intermediate routine exists mainly to localize the effects of setjmp
  * so we don't need to plaster a lot of variables with "volatile".
@@ -1966,7 +2011,9 @@ DoCopyTo(CopyState cstate)
 		if (fe_copy)
 			SendCopyBegin(cstate);
 
+		CopyToStart(cstate);
 		processed = CopyTo(cstate);
+		CopyToFinish(cstate);
 
 		if (fe_copy)
 			SendCopyEnd(cstate);
@@ -2005,16 +2052,12 @@ EndCopyTo(CopyState cstate)
 	EndCopy(cstate);
 }
 
-/*
- * Copy from relation or query TO file.
- */
-static uint64
-CopyTo(CopyState cstate)
+static void
+CopyToStart(CopyState cstate)
 {
 	TupleDesc	tupDesc;
 	int			num_phys_attrs;
 	ListCell   *cur;
-	uint64		processed;
 
 	if (cstate->rel)
 		tupDesc = RelationGetDescr(cstate->rel);
@@ -2104,6 +2147,29 @@ CopyTo(CopyState cstate)
 			CopySendEndOfRow(cstate);
 		}
 	}
+}
+
+static void
+CopyToFinish(CopyState cstate)
+{
+	if (cstate->binary)
+	{
+		/* Generate trailer for a binary copy */
+		CopySendInt16(cstate, -1);
+		/* Need to flush out the trailer */
+		CopySendEndOfRow(cstate);
+	}
+
+	MemoryContextDelete(cstate->rowcontext);
+}
+
+/*
+ * Copy from relation or query TO file.
+ */
+static uint64
+CopyTo(CopyState cstate)
+{
+	uint64		processed;
 
 	if (cstate->rel)
 	{
@@ -2135,17 +2201,6 @@ CopyTo(CopyState cstate)
 		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
 		processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
 	}
-
-	if (cstate->binary)
-	{
-		/* Generate trailer for a binary copy */
-		CopySendInt16(cstate, -1);
-		/* Need to flush out the trailer */
-		CopySendEndOfRow(cstate);
-	}
-
-	MemoryContextDelete(cstate->rowcontext);
-
 	return processed;
 }
 
@@ -2449,53 +2504,85 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	cstate->line_buf_valid = false;
 	save_cur_lineno = cstate->cur_lineno;
 
-	/*
-	 * table_multi_insert may leak memory, so switch to short-lived memory
-	 * context before calling it.
-	 */
-	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	table_multi_insert(resultRelInfo->ri_RelationDesc,
-					   slots,
-					   nused,
-					   mycid,
-					   ti_options,
-					   buffer->bistate);
-	MemoryContextSwitchTo(oldcontext);
-
-	for (i = 0; i < nused; i++)
+	if (resultRelInfo->ri_RelationDesc->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
 	{
-		/*
-		 * If there are any indexes, update them for all the inserted tuples,
-		 * and run AFTER ROW INSERT triggers.
-		 */
-		if (resultRelInfo->ri_NumIndices > 0)
+		/* Flush into foreign table or partition */
+		int i;
+		bool status = false;
+		CopyState fcstate = NULL;
+
+		Assert(resultRelInfo->ri_FdwRoutine != NULL &&
+			   resultRelInfo->ri_FdwState != NULL);
+
+		PG_TRY();
 		{
-			List	   *recheckIndexes;
-
-			cstate->cur_lineno = buffer->linenos[i];
-			recheckIndexes =
-				ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
-									  NIL);
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], recheckIndexes,
-								 cstate->transition_capture);
-			list_free(recheckIndexes);
+			fcstate = resultRelInfo->ri_FdwRoutine->BeginForeignCopy(resultRelInfo);
+			for (i = 0; i < nused; i++)
+				resultRelInfo->ri_FdwRoutine->ExecForeignCopy(resultRelInfo,
+															  fcstate,
+															  slots[i]);
+			status = true;
 		}
-
+		PG_FINALLY();
+		{
+			Assert(fcstate != NULL);
+			resultRelInfo->ri_FdwRoutine->EndForeignCopy(
+														buffer->resultRelInfo,
+														status);
+		}
+		PG_END_TRY();
+	}
+	else
+	{
 		/*
-		 * There's no indexes, but see if we need to run AFTER ROW INSERT
-		 * triggers anyway.
+		 * table_multi_insert may leak memory, so switch to short-lived memory
+		 * context before calling it.
 		 */
-		else if (resultRelInfo->ri_TrigDesc != NULL &&
-				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+		oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+
+		table_multi_insert(resultRelInfo->ri_RelationDesc,
+						   slots,
+						   nused,
+						   mycid,
+						   ti_options,
+						   buffer->bistate);
+		MemoryContextSwitchTo(oldcontext);
+
+		for (i = 0; i < nused; i++)
 		{
-			cstate->cur_lineno = buffer->linenos[i];
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], NIL, cstate->transition_capture);
-		}
+			/*
+			 * If there are any indexes, update them for all the inserted tuples,
+			 * and run AFTER ROW INSERT triggers.
+			 */
+			if (resultRelInfo->ri_NumIndices > 0)
+			{
+				List	   *recheckIndexes;
+
+				cstate->cur_lineno = buffer->linenos[i];
+				recheckIndexes =
+					ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
+										  NIL);
+				ExecARInsertTriggers(estate, resultRelInfo,
+									 slots[i], recheckIndexes,
+									 cstate->transition_capture);
+				list_free(recheckIndexes);
+			}
 
-		ExecClearTuple(slots[i]);
+			/*
+			 * There's no indexes, but see if we need to run AFTER ROW INSERT
+			 * triggers anyway.
+			 */
+			else if (resultRelInfo->ri_TrigDesc != NULL &&
+					 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+					  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+			{
+				cstate->cur_lineno = buffer->linenos[i];
+				ExecARInsertTriggers(estate, resultRelInfo,
+									 slots[i], NIL, cstate->transition_capture);
+			}
+
+			ExecClearTuple(slots[i]);
+		}
 	}
 
 	/* Mark that all slots are free */
@@ -2868,8 +2955,7 @@ CopyFrom(CopyState cstate)
 		 */
 		insertMethod = CIM_SINGLE;
 	}
-	else if (resultRelInfo->ri_FdwRoutine != NULL ||
-			 cstate->volatile_defexprs)
+	else if (cstate->volatile_defexprs)
 	{
 		/*
 		 * Can't support multi-inserts to foreign tables or if there are any
@@ -3037,8 +3123,7 @@ CopyFrom(CopyState cstate)
 				 */
 				leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
 					!has_before_insert_row_trig &&
-					!has_instead_insert_row_trig &&
-					resultRelInfo->ri_FdwRoutine == NULL;
+					!has_instead_insert_row_trig;
 
 				/* Set the multi-insert buffer to use for this partition. */
 				if (leafpart_use_multi_insert)
@@ -3048,7 +3133,8 @@ CopyFrom(CopyState cstate)
 													   resultRelInfo);
 				}
 				else if (insertMethod == CIM_MULTI_CONDITIONAL &&
-						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
+						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo) &&
+						 resultRelInfo->ri_FdwRoutine == NULL)
 				{
 					/*
 					 * Flush pending inserts if this partition can't use
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index c639833565..ef119a761a 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -22,6 +22,7 @@
 /* CopyStateData is private in commands/copy.c */
 typedef struct CopyStateData *CopyState;
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+typedef void (*copy_data_dest_cb) (void *outbuf, int len);
 
 extern void DoCopy(ParseState *state, const CopyStmt *stmt,
 				   int stmt_location, int stmt_len,
@@ -41,4 +42,8 @@ extern uint64 CopyFrom(CopyState cstate);
 
 extern DestReceiver *CreateCopyDestReceiver(void);
 
+extern CopyState BeginForeignCopyTo(Relation rel);
+extern char *NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot);
+extern void EndForeignCopyTo(CopyState cstate);
+
 #endif							/* COPY_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 95556dfb15..197301c5a5 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -94,6 +94,8 @@ typedef TupleTableSlot *(*ExecForeignDelete_function) (EState *estate,
 													   ResultRelInfo *rinfo,
 													   TupleTableSlot *slot,
 													   TupleTableSlot *planSlot);
+typedef void (*ExecForeignCopy_function) (ResultRelInfo *rinfo,
+										  TupleTableSlot *slot);
 
 typedef void (*EndForeignModify_function) (EState *estate,
 										   ResultRelInfo *rinfo);
@@ -104,6 +106,10 @@ typedef void (*BeginForeignInsert_function) (ModifyTableState *mtstate,
 typedef void (*EndForeignInsert_function) (EState *estate,
 										   ResultRelInfo *rinfo);
 
+typedef void (*BeginForeignCopy_function) (ResultRelInfo *rinfo);
+
+typedef void (*EndForeignCopy_function) (ResultRelInfo *rinfo, bool status);
+
 typedef int (*IsForeignRelUpdatable_function) (Relation rel);
 
 typedef bool (*PlanDirectModify_function) (PlannerInfo *root,
@@ -211,9 +217,12 @@ typedef struct FdwRoutine
 	ExecForeignInsert_function ExecForeignInsert;
 	ExecForeignUpdate_function ExecForeignUpdate;
 	ExecForeignDelete_function ExecForeignDelete;
+	ExecForeignCopy_function ExecForeignCopy;
 	EndForeignModify_function EndForeignModify;
 	BeginForeignInsert_function BeginForeignInsert;
 	EndForeignInsert_function EndForeignInsert;
+	BeginForeignCopy_function BeginForeignCopy;
+	EndForeignCopy_function EndForeignCopy;
 	IsForeignRelUpdatable_function IsForeignRelUpdatable;
 	PlanDirectModify_function PlanDirectModify;
 	BeginDirectModify_function BeginDirectModify;
-- 
2.17.1

