From 98e842769cf7b2a12628d019e3531904b4936b28 Mon Sep 17 00:00:00 2001
From: Ashutosh Bapat <ashutosh.bapat@enterprisedb.com>
Date: Wed, 18 Apr 2018 10:33:59 +0530
Subject: [PATCH 3/3] An incomplete fix for problem in non-direct UPDATE of a
 foreign table pointing to a partitioned table or an
 inheritance parent on the foreign server

When a foreign table points to a partitioned table or an inheritance
parent on the foreign server, a non-direct DML can affect multiple
rows when only one row is intended to be affected. This
happens because postgres_fdw uses only ctid to identify a row to work
on. Though ctid uniquely identifies a row in a single table, in a
partitioned table or in an inheritance hierarchy, there can be be
multiple rows, in different partitions, with the same ctid. So a
DML statement sent to the foreign server by postgres_fdw ends up
affecting more than one row, only one of which is intended to be
affected.

(ctid, tableoid) is unique across a partitioning or inheritance
hierarchy. Thus instead of using just ctid, we can use qualification
based on both ctid and tableoid. This partial commit tries to do that.
The commit adds code to add tableoid as a resjunk column in the
targetlist and pushes it down to the foreign table. But right now,
when tableoid is requested from a foreign table, foreign table's local
tableoid is returned instead of the tableoid of the table pointed by
the foreign table. The commit adds some code to handle that, but it's
not foolproof. We need to find a way to keep these two tableoid's
separate. When a user requests a tableoid, we return local tableoid of
the foreign table. For an UPDATE or a DELETE, however, we want to
fetch the tableoid from the foreign server and use it in the
qualification along-side ctid. (Note that when use requests a ctid, we
always return ctid on the foreign server since there are no rows
locally.)

Ashutosh Bapat, reviewed by Kyotaro Horiguchi
---
 contrib/postgres_fdw/deparse.c         |   24 +++++-
 contrib/postgres_fdw/postgres_fdw.c    |  128 ++++++++++++++++++++++++++------
 src/backend/executor/nodeForeignscan.c |    5 +-
 3 files changed, 128 insertions(+), 29 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 6e2fa14..d3c98d3 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1144,7 +1144,7 @@ deparseTargetList(StringInfo buf,
 	}
 
 	/*
-	 * Add ctid and oid if needed.  We currently don't support retrieving any
+	 * Add ctid, tableoid and oid if needed.  We currently don't support retrieving any
 	 * other system columns.
 	 */
 	if (bms_is_member(SelfItemPointerAttributeNumber - FirstLowInvalidHeapAttributeNumber,
@@ -1179,6 +1179,22 @@ deparseTargetList(StringInfo buf,
 		*retrieved_attrs = lappend_int(*retrieved_attrs,
 									   ObjectIdAttributeNumber);
 	}
+	if (bms_is_member(TableOidAttributeNumber - FirstLowInvalidHeapAttributeNumber,
+					  attrs_used))
+	{
+		if (!first)
+			appendStringInfoString(buf, ", ");
+		else if (is_returning)
+			appendStringInfoString(buf, " RETURNING ");
+		first = false;
+
+		if (qualify_col)
+			ADD_REL_QUALIFIER(buf, rtindex);
+		appendStringInfoString(buf, "tableoid");
+
+		*retrieved_attrs = lappend_int(*retrieved_attrs,
+									   TableOidAttributeNumber);
+	}
 
 	/* Don't generate bad syntax if no undropped columns */
 	if (first && !is_returning)
@@ -1725,7 +1741,7 @@ deparseUpdateSql(StringInfo buf, PlannerInfo *root,
 	deparseRelation(buf, rel);
 	appendStringInfoString(buf, " SET ");
 
-	pindex = 2;					/* ctid is always the first param */
+	pindex = 3;					/* ctid, tableoid params appear first */
 	first = true;
 	foreach(lc, targetAttrs)
 	{
@@ -1739,7 +1755,7 @@ deparseUpdateSql(StringInfo buf, PlannerInfo *root,
 		appendStringInfo(buf, " = $%d", pindex);
 		pindex++;
 	}
-	appendStringInfoString(buf, " WHERE ctid = $1");
+	appendStringInfoString(buf, " WHERE ctid = $1 AND tableoid = $2");
 
 	deparseReturningList(buf, root, rtindex, rel,
 						 rel->trigdesc && rel->trigdesc->trig_update_after_row,
@@ -1854,7 +1870,7 @@ deparseDeleteSql(StringInfo buf, PlannerInfo *root,
 {
 	appendStringInfoString(buf, "DELETE FROM ");
 	deparseRelation(buf, rel);
-	appendStringInfoString(buf, " WHERE ctid = $1");
+	appendStringInfoString(buf, " WHERE ctid = $1 AND tableoid = $2");
 
 	deparseReturningList(buf, root, rtindex, rel,
 						 rel->trigdesc && rel->trigdesc->trig_delete_after_row,
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 0404e21..8761ab2 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -178,6 +178,7 @@ typedef struct PgFdwModifyState
 
 	/* info about parameters for prepared statement */
 	AttrNumber	ctidAttno;		/* attnum of input resjunk ctid column */
+	AttrNumber	tableoidAttno;	/* attnum of input resjunk tableoid column */
 	int			p_nums;			/* number of parameters to transmit */
 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
 
@@ -390,7 +391,7 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
 					  List *retrieved_attrs);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
-						 ItemPointer tupleid,
+						 ItemPointer tupleid, Oid tableoid,
 						 TupleTableSlot *slot);
 static void store_returning_result(PgFdwModifyState *fmstate,
 					   TupleTableSlot *slot, PGresult *res);
@@ -1543,26 +1544,39 @@ postgresAddForeignUpdateTargets(Query *parsetree,
 	TargetEntry *tle;
 
 	/*
-	 * In postgres_fdw, what we need is the ctid, same as for a regular table.
+	 * ctid is used to locate a row in a given table and tableoid is used to
+	 * identify a table in a partition or inheritance hierarchy.
 	 */
 
-	/* Make a Var representing the desired value */
+	/*
+	 * Make a Var representing the ctid, wrap it in a resjunk TLE with the
+	 * right name and add it to the query's targetlist.
+	 */
 	var = makeVar(parsetree->resultRelation,
 				  SelfItemPointerAttributeNumber,
 				  TIDOID,
 				  -1,
 				  InvalidOid,
 				  0);
-
-	/* Wrap it in a resjunk TLE with the right name ... */
 	attrname = "ctid";
-
 	tle = makeTargetEntry((Expr *) var,
 						  list_length(parsetree->targetList) + 1,
 						  pstrdup(attrname),
 						  true);
+	parsetree->targetList = lappend(parsetree->targetList, tle);
 
-	/* ... and add it to the query's targetlist */
+	/* Do the same for tableoid */
+	var = makeVar(parsetree->resultRelation,
+				  TableOidAttributeNumber,
+				  OIDOID,
+				  -1,
+				  InvalidOid,
+				  0);
+	attrname = "tableoid";
+	tle = makeTargetEntry((Expr *) var,
+						  list_length(parsetree->targetList) + 1,
+						  pstrdup(attrname),
+						  true);
 	parsetree->targetList = lappend(parsetree->targetList, tle);
 }
 
@@ -1751,7 +1765,7 @@ postgresExecForeignInsert(EState *estate,
 		prepare_foreign_modify(fmstate);
 
 	/* Convert parameters needed by prepared statement to text form */
-	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
+	p_values = convert_prep_stmt_params(fmstate, NULL, InvalidOid, slot);
 
 	/*
 	 * Execute the prepared statement.
@@ -1806,7 +1820,8 @@ postgresExecForeignUpdate(EState *estate,
 						  TupleTableSlot *planSlot)
 {
 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
-	Datum		datum;
+	Datum		ctid_datum;
+	Datum		tableoid_datum;
 	bool		isNull;
 	const char **p_values;
 	PGresult   *res;
@@ -1818,16 +1833,29 @@ postgresExecForeignUpdate(EState *estate,
 		prepare_foreign_modify(fmstate);
 
 	/* Get the ctid that was passed up as a resjunk column */
-	datum = ExecGetJunkAttribute(planSlot,
-								 fmstate->ctidAttno,
-								 &isNull);
+	ctid_datum = ExecGetJunkAttribute(planSlot,
+									  fmstate->ctidAttno,
+									  &isNull);
 	/* shouldn't ever get a null result... */
 	if (isNull)
 		elog(ERROR, "ctid is NULL");
 
+	/* Get the tableoid that was passed up as a resjunk column */
+	tableoid_datum = ExecGetJunkAttribute(planSlot,
+										  fmstate->tableoidAttno,
+										  &isNull);
+	/* shouldn't ever get a null result... */
+	if (isNull)
+		elog(ERROR, "tableoid is NULL");
+
+	/* ... and should be always a valid */
+	if (!OidIsValid(DatumGetObjectId(tableoid_datum)))
+		elog(ERROR, "tableoid is invalid");
+
 	/* Convert parameters needed by prepared statement to text form */
 	p_values = convert_prep_stmt_params(fmstate,
-										(ItemPointer) DatumGetPointer(datum),
+										(ItemPointer) DatumGetPointer(ctid_datum),
+										DatumGetObjectId(tableoid_datum),
 										slot);
 
 	/*
@@ -1894,7 +1922,8 @@ postgresExecForeignDelete(EState *estate,
 						  TupleTableSlot *planSlot)
 {
 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
-	Datum		datum;
+	Datum		ctid_datum;
+	Datum		tableoid_datum;
 	bool		isNull;
 	const char **p_values;
 	PGresult   *res;
@@ -1906,16 +1935,29 @@ postgresExecForeignDelete(EState *estate,
 		prepare_foreign_modify(fmstate);
 
 	/* Get the ctid that was passed up as a resjunk column */
-	datum = ExecGetJunkAttribute(planSlot,
-								 fmstate->ctidAttno,
-								 &isNull);
+	ctid_datum = ExecGetJunkAttribute(planSlot,
+									  fmstate->ctidAttno,
+									  &isNull);
 	/* shouldn't ever get a null result... */
 	if (isNull)
 		elog(ERROR, "ctid is NULL");
 
+	/* Get the tableoid that was passed up as a resjunk column */
+	tableoid_datum = ExecGetJunkAttribute(planSlot,
+										  fmstate->tableoidAttno,
+										  &isNull);
+	/* shouldn't ever get a null result... */
+	if (isNull)
+		elog(ERROR, "tableoid is NULL");
+
+	/* ... and should be always a valid */
+	if (!OidIsValid(DatumGetObjectId(tableoid_datum)))
+		elog(ERROR, "tableoid is invalid");
+
 	/* Convert parameters needed by prepared statement to text form */
 	p_values = convert_prep_stmt_params(fmstate,
-										(ItemPointer) DatumGetPointer(datum),
+										(ItemPointer) DatumGetPointer(ctid_datum),
+										DatumGetObjectId(tableoid_datum),
 										NULL);
 
 	/*
@@ -3334,7 +3376,7 @@ create_foreign_modify(EState *estate,
 		fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
 
 	/* Prepare for output conversion of parameters used in prepared stmt. */
-	n_params = list_length(fmstate->target_attrs) + 1;
+	n_params = list_length(fmstate->target_attrs) + 2;
 	fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
 	fmstate->p_nums = 0;
 
@@ -3342,16 +3384,30 @@ create_foreign_modify(EState *estate,
 	{
 		Assert(subplan != NULL);
 
-		/* Find the ctid resjunk column in the subplan's result */
+		/*
+		 * Find the ctid, tableoid resjunk columns in the subplan's result and
+		 * record those as transmittable parameters.
+		 */
+
+
+		/* First transmittable parameter will be ctid */
 		fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
 														  "ctid");
 		if (!AttributeNumberIsValid(fmstate->ctidAttno))
 			elog(ERROR, "could not find junk ctid column");
-
-		/* First transmittable parameter will be ctid */
 		getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
 		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
 		fmstate->p_nums++;
+
+		/* Second transmittable parameter will be tableoid */
+		fmstate->tableoidAttno =
+			ExecFindJunkAttributeInTlist(subplan->targetlist,
+										 "tableoid");
+		if (!AttributeNumberIsValid(fmstate->tableoidAttno))
+			elog(ERROR, "could not find junk tableoid column");
+		getTypeOutputInfo(OIDOID, &typefnoid, &isvarlena);
+		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+		fmstate->p_nums++;
 	}
 
 	if (operation == CMD_INSERT || operation == CMD_UPDATE)
@@ -3425,13 +3481,14 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
  *		Create array of text strings representing parameter values
  *
  * tupleid is ctid to send, or NULL if none
+ * tableoid is tableoid to send or InvalidOid if none
  * slot is slot to get remaining parameters from, or NULL if none
  *
  * Data is constructed in temp_cxt; caller should reset that after use.
  */
 static const char **
 convert_prep_stmt_params(PgFdwModifyState *fmstate,
-						 ItemPointer tupleid,
+						 ItemPointer tupleid, Oid tableoid,
 						 TupleTableSlot *slot)
 {
 	const char **p_values;
@@ -3451,6 +3508,15 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate,
 		pindex++;
 	}
 
+	/* 2nd parameter should be tableoid, if it's in use */
+	if (OidIsValid(tableoid))
+	{
+		/* don't need set_transmission_modes for TID output */
+		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
+											  ObjectIdGetDatum(tableoid));
+		pindex++;
+	}
+
 	/* get following parameters from slot */
 	if (slot != NULL && fmstate->target_attrs != NIL)
 	{
@@ -5549,6 +5615,7 @@ make_tuple_from_result_row(PGresult *res,
 	bool	   *nulls;
 	ItemPointer ctid = NULL;
 	Oid			oid = InvalidOid;
+	Oid			tableoid = InvalidOid;
 	ConversionLocation errpos;
 	ErrorContextCallback errcallback;
 	MemoryContext oldcontext;
@@ -5642,6 +5709,18 @@ make_tuple_from_result_row(PGresult *res,
 				oid = DatumGetObjectId(datum);
 			}
 		}
+		else if (i == TableOidAttributeNumber)
+		{
+			/* tableoid */
+			if (valstr != NULL)
+			{
+				Datum		datum;
+
+				datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
+				tableoid = DatumGetObjectId(datum);
+			}
+		}
+
 		errpos.cur_attno = 0;
 
 		j++;
@@ -5691,6 +5770,9 @@ make_tuple_from_result_row(PGresult *res,
 	if (OidIsValid(oid))
 		HeapTupleSetOid(tuple, oid);
 
+	if (OidIsValid(tableoid))
+		tuple->t_tableOid = tableoid;
+
 	/* Clean up */
 	MemoryContextReset(temp_context);
 
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index a2a28b7..8ebfdfd 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -58,13 +58,14 @@ ForeignNext(ForeignScanState *node)
 	 * If any system columns are requested, we have to force the tuple into
 	 * physical-tuple form to avoid "cannot extract system attribute from
 	 * virtual tuple" errors later.  We also insert a valid value for
-	 * tableoid, which is the only actually-useful system column.
+	 * tableoid, in case FDW has not set it as per its needs.
 	 */
 	if (plan->fsSystemCol && !TupIsNull(slot))
 	{
 		HeapTuple	tup = ExecMaterializeSlot(slot);
 
-		tup->t_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
+		if (!OidIsValid(tup->t_tableOid))
+			tup->t_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
 	}
 
 	return slot;
-- 
1.7.9.5

