Eliminating SPI / SQL from some RI triggers - take 3

Started by Amit Langoteabout 1 year ago6 messages
#1Amit Langote
amitlangote09@gmail.com
3 attachment(s)

Hi,

We discussed $subject at [1]Simplifying foreign key/RI checks: /messages/by-id/CA+HiwqG5e8pk8s7+7zhr1Nc_PGyhEdM5f=pHkMOdK1RYWXfJsg@mail.gmail.com and [2]Eliminating SPI from RI triggers - take 2 /messages/by-id/CA+HiwqG5e8pk8s7+7zhr1Nc_PGyhEdM5f=pHkMOdK1RYWXfJsg@mail.gmail.com and I'd like to continue that
work with the hope to commit some part of it for v18.

In short, performing the RI checks for inserts and updates of a
referencing table as direct scans of the PK index results in up to 40%
improvement in their performance, especially when they are done in a
bulk manner as shown in the following example:

create unlogged table pk (a int primary key);
insert into pk select generate_series(1, 10000000);
insert into fk select generate_series(1, 10000000);

On my machine, the last query took 20 seconds with master, whereas 12
seconds with the patches. With master, a significant portion of the
time can be seen spent in ExecutorStart() and ExecutorEnd() on the
plan for the RI query, which adds up as it's done for each row in a
bulk load. Patch avoids that overhead because it calls the index AM
directly.

The patches haven't changed in the basic design since the last update
at [2]Eliminating SPI from RI triggers - take 2 /messages/by-id/CA+HiwqG5e8pk8s7+7zhr1Nc_PGyhEdM5f=pHkMOdK1RYWXfJsg@mail.gmail.com, though there are few changes:

1. I noticed a few additions to the RI trigger functions the patch
touches, such as those to support temporal foreign keys. I decided to
leave the SQL for temporal queries in place as the plan for those
doesn't look, on a glance, as simple as a simple index scan.

2. As I mentioned in [3]/messages/by-id/CA+TgmoaiTNj4DgQy42OT9JmTTP1NWcMV+ke0i=+a7=VgnzqGXw@mail.gmail.com, the new way of doing the PK lookup didn't
have a way to recheck the PK tuple after detecting concurrent updates
of the PK, so would cause an error under READ COMMITTED isolation
level. The old way of executing an SQL plan would deal with that
using the EvalPlanQual() mechanism in the executor. In the updated
patch, I've added an equivalent rechecking function that's called in
the same situations as EvalPlanQual() would get called in the old
method.

3. I reordered the patches as Robert suggested at [5]/messages/by-id/CA+TgmoaiTNj4DgQy42OT9JmTTP1NWcMV+ke0i=+a7=VgnzqGXw@mail.gmail.com. Mainly because
the patch set includes changes to address a bug where PK lookups could
return incorrect results under the REPEATABLE READ isolation level.
This issue arises because RI lookups on partitioned PK tables
manipulate ActiveSnapshot to pass the snapshot that's used by
find_inheritance_children() to determine the visibility of
detach-pending partitions to these RI lookups. To address this, the
patch set introduces refactoring of the PartitionDesc interface,
included in patch 0001. This refactoring eliminates the need to
manipulate ActiveSnapshot by explicitly passing the correct snapshot
for detach-pending visibility handling. The main patch (0002+0003),
which focuses on improving performance by avoiding SQL queries for RI
checks, builds upon these refactoring changes to pass the snapshot
directly instead of manipulating the ActiveSnapshot. Reordering the
patches this way ensures a logical progression of changes, as Robert
suggested, while avoiding any impression that the bug was introduced
by the ri_triggers.c changes.

However, I need to spend some time addressing Robert's feedback on the
basic design, as outlined at [5]/messages/by-id/CA+TgmoaiTNj4DgQy42OT9JmTTP1NWcMV+ke0i=+a7=VgnzqGXw@mail.gmail.com. Specifically, the new PK lookup
function could benefit significantly from caching information rather
than recomputing it for each row. This implies that the PlanCreate
function should create a struct to store reusable information across
PlanExecute calls for different rows being checked.

Beyond implementing these changes, I also need to confirm that the new
plan execution preserves all operations performed by the SQL plan for
the same checks, particularly those affecting user-visible behavior.
I've already verified that permission checks are preserved: revoking
access to the PK table during the checks causes them to fail, as
expected. This behavior is maintained because permission checks are
performed during each execution. The planned changes to separate the
"plan" and "execute" steps should continue to uphold this and other
behaviors that might need to be preserved.

--
Thanks, Amit Langote

[1]: Simplifying foreign key/RI checks: /messages/by-id/CA+HiwqG5e8pk8s7+7zhr1Nc_PGyhEdM5f=pHkMOdK1RYWXfJsg@mail.gmail.com
/messages/by-id/CA+HiwqG5e8pk8s7+7zhr1Nc_PGyhEdM5f=pHkMOdK1RYWXfJsg@mail.gmail.com

[2]: Eliminating SPI from RI triggers - take 2 /messages/by-id/CA+HiwqG5e8pk8s7+7zhr1Nc_PGyhEdM5f=pHkMOdK1RYWXfJsg@mail.gmail.com
/messages/by-id/CA+HiwqG5e8pk8s7+7zhr1Nc_PGyhEdM5f=pHkMOdK1RYWXfJsg@mail.gmail.com

[3]: /messages/by-id/CA+TgmoaiTNj4DgQy42OT9JmTTP1NWcMV+ke0i=+a7=VgnzqGXw@mail.gmail.com

[4]: /messages/by-id/CA+Tgmoa1DCQ0MdojD9o6Ppbfj=abXxe4FUkwA4O_6qBHwOMVjw@mail.gmail.com

[5]: /messages/by-id/CA+TgmoaiTNj4DgQy42OT9JmTTP1NWcMV+ke0i=+a7=VgnzqGXw@mail.gmail.com

Attachments:

v1-0003-Avoid-using-an-SQL-query-for-some-RI-checks.patchapplication/x-patch; name=v1-0003-Avoid-using-an-SQL-query-for-some-RI-checks.patchDownload
From 6a2d606abf3fa17c94fa9facbf82f9fdab8135e6 Mon Sep 17 00:00:00 2001
From: Amit Langote <amitlan@postgresql.org>
Date: Thu, 19 Dec 2024 21:52:32 +0900
Subject: [PATCH v1 3/3] Avoid using an SQL query for some RI checks

For RI triggers that check whether a referenced value exists in the
referenced relation, it is sufficient to scan the foreign key
constraint's unique index directly, instead of issuing an SQL query.
This optimization improves the performance of these checks by nearly
40%, especially for bulk inserts.

This commit builds on the RIPlan infrastructure introduced in the
previous commit. It replaces ri_SqlStringPlanCreate() in
RI_FKey_check() and ri_Check_Pk_Match() with
ri_LookupKeyInPkRelPlanCreate(), which installs
ri_LookupKeyInPkRel() as the plan to implement these checks.
ri_LookupKeyInPkRel() contains logic to directly scan the unique key
index associated with the foreign key constraint.

Additionally, ri_LookupKeyInPkRel() explicitly passes LatestSnapshot
as omit_detached_snapshot to CreatePartitionDirectory(), avoiding
the reliance on setting ActiveSnapshot. The previous approach caused
primary key lookups on partitioned tables to return incorrect results
under REPEATABLE READ isolation level, as demonstrated by a test case
added in commit 00cb86e75d. This issue has now been fixed, and the
buggy output in src/test/isolation/expected/fk-snapshot.out has been
updated to reflect the correct behavior.

Lastly, this commit introduces an isolation test suite to verify
foreign key insertions under concurrent updates to referenced values.

Reviewed-by: Robert Haas
Discussion: https://postgr.es/m/CA+HiwqGkfJfYdeq5vHPh6eqPKjSbfpDDY+j-kXYFePQedtSLeg@mail.gmail.com
Discussion: https://postgr.es/m/CA+HiwqG5e8pk8s7+7zhr1Nc_PGyhEdM5f=pHkMOdK1RYWXfJsg@mail.gmail.com
---
 src/backend/executor/execPartition.c          | 167 ++++-
 src/backend/executor/nodeLockRows.c           | 163 +++--
 src/backend/utils/adt/ri_triggers.c           | 668 ++++++++++++++----
 src/include/executor/execPartition.h          |   7 +
 src/include/executor/executor.h               |   9 +
 .../expected/fk-concurrent-pk-upd.out         |  58 ++
 src/test/isolation/expected/fk-snapshot.out   |   4 +-
 src/test/isolation/isolation_schedule         |   1 +
 .../isolation/specs/fk-concurrent-pk-upd.spec |  42 ++
 src/test/isolation/specs/fk-snapshot.spec     |   5 +-
 10 files changed, 918 insertions(+), 206 deletions(-)
 create mode 100644 src/test/isolation/expected/fk-concurrent-pk-upd.out
 create mode 100644 src/test/isolation/specs/fk-concurrent-pk-upd.spec

diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index d26cf20003..3494089b8e 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -174,8 +174,9 @@ static void FormPartitionKeyDatum(PartitionDispatch pd,
 								  EState *estate,
 								  Datum *values,
 								  bool *isnull);
-static int	get_partition_for_tuple(PartitionDispatch pd, Datum *values,
-									bool *isnull);
+static int get_partition_for_tuple(PartitionKey key,
+						PartitionDesc partdesc,
+						Datum *values, bool *isnull);
 static char *ExecBuildSlotPartitionKeyDescription(Relation rel,
 												  Datum *values,
 												  bool *isnull,
@@ -316,7 +317,9 @@ ExecFindPartition(ModifyTableState *mtstate,
 		 * these values, error out.
 		 */
 		if (partdesc->nparts == 0 ||
-			(partidx = get_partition_for_tuple(dispatch, values, isnull)) < 0)
+			(partidx = get_partition_for_tuple(dispatch->key,
+											   dispatch->partdesc,
+											   values, isnull)) < 0)
 		{
 			char	   *val_desc;
 
@@ -1394,12 +1397,12 @@ FormPartitionKeyDatum(PartitionDispatch pd,
  * found or -1 if none found.
  */
 static int
-get_partition_for_tuple(PartitionDispatch pd, Datum *values, bool *isnull)
+get_partition_for_tuple(PartitionKey key,
+						PartitionDesc partdesc,
+						Datum *values, bool *isnull)
 {
 	int			bound_offset = -1;
 	int			part_index = -1;
-	PartitionKey key = pd->key;
-	PartitionDesc partdesc = pd->partdesc;
 	PartitionBoundInfo boundinfo = partdesc->boundinfo;
 
 	/*
@@ -1606,6 +1609,158 @@ get_partition_for_tuple(PartitionDispatch pd, Datum *values, bool *isnull)
 	return part_index;
 }
 
+/*
+ * ExecGetLeafPartitionForKey
+ *		Finds the leaf partition of a partitioned table 'root_rel' that might
+ *		contain the specified primary key tuple containing a subset of the
+ *		table's columns (including all of the partition key columns)
+ *
+ * 'key_natts' specifies the number columns contained in the key,
+ * 'key_attnums' their attribute numbers as defined in 'root_rel', and
+ * 'key_vals' and 'key_nulls' specify the key tuple.
+ *
+ * Partition descriptors for tuple routing are obtained by referring to the
+ * caller-specified partition directory.
+ *
+ * Any intermediate parent tables encountered on the way to finding the leaf
+ * partition are locked using 'lockmode' when opening.
+ *
+ * Returns NULL if no leaf partition is found for the key.
+ *
+ * This also finds the index in thus found leaf partition that is recorded as
+ * descending from 'root_idxoid' and returns it in '*leaf_idxoid'.
+ *
+ * Caller must close the returned relation, if any.
+ *
+ * This works because the unique key defined on the root relation is required
+ * to contain the partition key columns of all of the ancestors that lead up to
+ * a given leaf partition.
+ */
+Relation
+ExecGetLeafPartitionForKey(PartitionDirectory partdir,
+						   Relation root_rel, int key_natts,
+						   const AttrNumber *key_attnums,
+						   Datum *key_vals, bool *key_nulls,
+						   Oid root_idxoid, int lockmode,
+						   Oid *leaf_idxoid)
+{
+	Relation	rel = root_rel;
+	Oid			constr_idxoid = root_idxoid;
+
+	*leaf_idxoid = InvalidOid;
+
+	/*
+	 * Descend through partitioned parents to find the leaf partition that
+	 * would accept a row with the provided key values, starting with the root
+	 * parent.
+	 */
+	while (true)
+	{
+		PartitionKey partkey = RelationGetPartitionKey(rel);
+		PartitionDesc partdesc;
+		Datum	partkey_vals[PARTITION_MAX_KEYS];
+		bool	partkey_isnull[PARTITION_MAX_KEYS];
+		AttrNumber *root_partattrs = partkey->partattrs;
+		int		i,
+				j;
+		int		partidx;
+		Oid		partoid;
+		bool	is_leaf;
+
+		/*
+		 * Collect partition key values from the unique key.
+		 *
+		 * Because we only have the root table's copy of pk_attnums, must map
+		 * any non-root table's partition key attribute numbers to the root
+		 * table's.
+		 */
+		if (rel != root_rel)
+		{
+			/*
+			 * map->attnums will contain root table attribute numbers for each
+			 * attribute of the current partitioned relation.
+			 */
+			AttrMap *map = build_attrmap_by_name_if_req(RelationGetDescr(root_rel),
+														RelationGetDescr(rel),
+														false);
+
+			if (map)
+			{
+				root_partattrs = palloc(partkey->partnatts *
+										sizeof(AttrNumber));
+				for (i = 0; i < partkey->partnatts; i++)
+				{
+					AttrNumber	partattno = partkey->partattrs[i];
+
+					root_partattrs[i] = map->attnums[partattno - 1];
+				}
+
+				free_attrmap(map);
+			}
+		}
+
+		/*
+		 * Referenced key specification does not allow expressions, so there
+		 * would not be expressions in the partition keys either.
+		 */
+		Assert(partkey->partexprs == NIL);
+		for (i = 0, j = 0; i < partkey->partnatts; i++)
+		{
+			int		k;
+
+			for (k = 0; k < key_natts; k++)
+			{
+				if (root_partattrs[i] == key_attnums[k])
+				{
+					partkey_vals[j] = key_vals[k];
+					partkey_isnull[j] = key_nulls[k];
+					j++;
+					break;
+				}
+			}
+		}
+		/* Had better have found values for all of the partition keys. */
+		Assert(j == partkey->partnatts);
+
+		if (root_partattrs != partkey->partattrs)
+			pfree(root_partattrs);
+
+		/* Get the PartitionDesc using the partition directory machinery.  */
+		partdesc = PartitionDirectoryLookup(partdir, rel);
+
+		/* Find the partition for the key. */
+		partidx = get_partition_for_tuple(partkey, partdesc, partkey_vals,
+										  partkey_isnull);
+		Assert(partidx < 0 || partidx < partdesc->nparts);
+
+		/* Close any intermediate parents we opened, but keep the lock. */
+		if (rel != root_rel)
+			table_close(rel, NoLock);
+
+		/* No partition found. */
+		if (partidx < 0)
+			return NULL;
+
+		partoid = partdesc->oids[partidx];
+		rel = table_open(partoid, lockmode);
+		constr_idxoid = index_get_partition(rel, constr_idxoid);
+
+		/*
+		 * Return if the partition is a leaf, else find its partition in the
+		 * next iteration.
+		 */
+		is_leaf = partdesc->is_leaf[partidx];
+		if (is_leaf)
+		{
+			*leaf_idxoid = constr_idxoid;
+			return rel;
+		}
+	}
+
+	Assert(false);
+	return NULL;
+}
+
 /*
  * ExecBuildSlotPartitionKeyDescription
  *
diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index 41754ddfea..fb3ebcd309 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -79,10 +79,7 @@ lnext:
 		Datum		datum;
 		bool		isNull;
 		ItemPointerData tid;
-		TM_FailureData tmfd;
 		LockTupleMode lockmode;
-		int			lockflags = 0;
-		TM_Result	test;
 		TupleTableSlot *markSlot;
 
 		/* clear any leftover test tuple for this rel */
@@ -178,74 +175,11 @@ lnext:
 				break;
 		}
 
-		lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
-		if (!IsolationUsesXactSnapshot())
-			lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
-
-		test = table_tuple_lock(erm->relation, &tid, estate->es_snapshot,
-								markSlot, estate->es_output_cid,
-								lockmode, erm->waitPolicy,
-								lockflags,
-								&tmfd);
-
-		switch (test)
-		{
-			case TM_WouldBlock:
-				/* couldn't lock tuple in SKIP LOCKED mode */
-				goto lnext;
-
-			case TM_SelfModified:
-
-				/*
-				 * The target tuple was already updated or deleted by the
-				 * current command, or by a later command in the current
-				 * transaction.  We *must* ignore the tuple in the former
-				 * case, so as to avoid the "Halloween problem" of repeated
-				 * update attempts.  In the latter case it might be sensible
-				 * to fetch the updated tuple instead, but doing so would
-				 * require changing heap_update and heap_delete to not
-				 * complain about updating "invisible" tuples, which seems
-				 * pretty scary (table_tuple_lock will not complain, but few
-				 * callers expect TM_Invisible, and we're not one of them). So
-				 * for now, treat the tuple as deleted and do not process.
-				 */
-				goto lnext;
-
-			case TM_Ok:
-
-				/*
-				 * Got the lock successfully, the locked tuple saved in
-				 * markSlot for, if needed, EvalPlanQual testing below.
-				 */
-				if (tmfd.traversed)
-					epq_needed = true;
-				break;
-
-			case TM_Updated:
-				if (IsolationUsesXactSnapshot())
-					ereport(ERROR,
-							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-							 errmsg("could not serialize access due to concurrent update")));
-				elog(ERROR, "unexpected table_tuple_lock status: %u",
-					 test);
-				break;
-
-			case TM_Deleted:
-				if (IsolationUsesXactSnapshot())
-					ereport(ERROR,
-							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-							 errmsg("could not serialize access due to concurrent update")));
-				/* tuple was deleted so don't return it */
-				goto lnext;
-
-			case TM_Invisible:
-				elog(ERROR, "attempted to lock invisible tuple");
-				break;
-
-			default:
-				elog(ERROR, "unrecognized table_tuple_lock status: %u",
-					 test);
-		}
+		/* skip tuple if it couldn't be locked */
+		if (!ExecLockTableTuple(erm->relation, &tid, markSlot,
+								estate->es_snapshot, estate->es_output_cid,
+								lockmode, erm->waitPolicy, &epq_needed))
+			goto lnext;
 
 		/* Remember locked tuple's TID for EPQ testing and WHERE CURRENT OF */
 		erm->curCtid = tid;
@@ -280,6 +214,93 @@ lnext:
 	return slot;
 }
 
+/*
+ * ExecLockTableTuple
+ * 		Locks tuple with the specified TID in lockmode following given wait
+ * 		policy
+ *
+ * Returns true if the tuple was successfully locked.  Locked tuple is loaded
+ * into provided slot.
+ */
+bool
+ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
+				   Snapshot snapshot, CommandId cid,
+				   LockTupleMode lockmode, LockWaitPolicy waitPolicy,
+				   bool *tuple_concurrently_updated)
+{
+	TM_FailureData tmfd;
+	int			lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
+	TM_Result	test;
+
+	if (tuple_concurrently_updated)
+		*tuple_concurrently_updated = false;
+
+	if (!IsolationUsesXactSnapshot())
+		lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
+
+	test = table_tuple_lock(relation, tid, snapshot, slot, cid, lockmode,
+							waitPolicy, lockflags, &tmfd);
+
+	switch (test)
+	{
+		case TM_WouldBlock:
+			/* couldn't lock tuple in SKIP LOCKED mode */
+			return false;
+
+		case TM_SelfModified:
+			/*
+			 * The target tuple was already updated or deleted by the
+			 * current command, or by a later command in the current
+			 * transaction.  We *must* ignore the tuple in the former
+			 * case, so as to avoid the "Halloween problem" of repeated
+			 * update attempts.  In the latter case it might be sensible
+			 * to fetch the updated tuple instead, but doing so would
+			 * require changing heap_update and heap_delete to not
+			 * complain about updating "invisible" tuples, which seems
+			 * pretty scary (table_tuple_lock will not complain, but few
+			 * callers expect TM_Invisible, and we're not one of them). So
+			 * for now, treat the tuple as deleted and do not process.
+			 */
+			return false;
+
+		case TM_Ok:
+			/*
+			 * Got the lock successfully, the locked tuple saved in
+			 * slot for EvalPlanQual, if asked by the caller.
+			 */
+			if (tmfd.traversed && tuple_concurrently_updated)
+				*tuple_concurrently_updated = true;
+			break;
+
+		case TM_Updated:
+			if (IsolationUsesXactSnapshot())
+				ereport(ERROR,
+						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+						 errmsg("could not serialize access due to concurrent update")));
+			elog(ERROR, "unexpected table_tuple_lock status: %u",
+				 test);
+			break;
+
+		case TM_Deleted:
+			if (IsolationUsesXactSnapshot())
+				ereport(ERROR,
+						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+						 errmsg("could not serialize access due to concurrent update")));
+			/* tuple was deleted so don't return it */
+			return false;
+
+		case TM_Invisible:
+			elog(ERROR, "attempted to lock invisible tuple");
+			return false;
+
+		default:
+			elog(ERROR, "unrecognized table_tuple_lock status: %u", test);
+			return false;
+	}
+
+	return true;
+}
+
 /* ----------------------------------------------------------------
  *		ExecInitLockRows
  *
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 804a2a69e4..81546655fd 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -23,21 +23,31 @@
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/htup_details.h"
+#include "access/skey.h"
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "access/tableam.h"
 #include "access/xact.h"
+#include "catalog/index.h"
+#include "catalog/partition.h"
+#include "catalog/pg_class.h"
 #include "catalog/pg_collation.h"
 #include "catalog/pg_constraint.h"
+#include "catalog/pg_namespace.h"
+#include "catalog/pg_operator.h"
 #include "catalog/pg_proc.h"
+#include "catalog/pg_type.h"
 #include "commands/trigger.h"
+#include "executor/execPartition.h"
 #include "executor/executor.h"
 #include "executor/spi.h"
 #include "lib/ilist.h"
 #include "miscadmin.h"
 #include "parser/parse_coerce.h"
 #include "parser/parse_relation.h"
+#include "partitioning/partdesc.h"
 #include "storage/bufmgr.h"
 #include "tcop/pquery.h"
 #include "tcop/utility.h"
@@ -50,6 +60,7 @@
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/rangetypes.h"
+#include "utils/partcache.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
 #include "utils/ruleutils.h"
@@ -157,6 +168,12 @@ typedef void (*RI_PlanFreeFunc_type) (struct RI_Plan *plan);
  */
 typedef struct RI_Plan
 {
+	/* Constraint for this plan. */
+	const RI_ConstraintInfo *riinfo;
+
+	/* RI query type code. */
+	int				constr_queryno;
+
 	/*
 	 * Context under which this struct and its subsidiary data gets allocated.
 	 * It is made a child of CacheMemoryContext.
@@ -270,7 +287,8 @@ static const RI_ConstraintInfo *ri_FetchConstraintInfo(Trigger *trigger,
 													   Relation trig_rel, bool rel_is_pk);
 static const RI_ConstraintInfo *ri_LoadConstraintInfo(Oid constraintOid);
 static Oid	get_ri_constraint_root(Oid constrOid);
-static RI_Plan *ri_PlanCheck(RI_PlanCreateFunc_type plan_create_func,
+static RI_Plan *ri_PlanCheck(const RI_ConstraintInfo *riinfo,
+							 RI_PlanCreateFunc_type plan_create_func,
 							 const char *querystr, int nargs, Oid *argtypes,
 							 RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel);
 static bool ri_PerformCheck(const RI_ConstraintInfo *riinfo,
@@ -294,6 +312,15 @@ static int ri_SqlStringPlanExecute(RI_Plan *plan, Relation fk_rel, Relation pk_r
 						Snapshot crosscheck_snapshot,
 						int limit, CmdType *last_stmt_cmdtype);
 static void ri_SqlStringPlanFree(RI_Plan *plan);
+static void ri_LookupKeyInPkRelPlanCreate(RI_Plan *plan,
+							  const char *querystr, int nargs, Oid *paramtypes);
+static int ri_LookupKeyInPkRel(struct RI_Plan *plan,
+					Relation fk_rel, Relation pk_rel,
+					Datum *pk_vals, char *pk_nulls,
+					Snapshot crosscheck_snapshot,
+					int limit, CmdType *last_stmt_cmdtype);
+static bool ri_LookupKeyInPkRelPlanIsValid(RI_Plan *plan);
+static void ri_LookupKeyInPkRelPlanFree(RI_Plan *plan);
 
 
 /*
@@ -389,9 +416,9 @@ RI_FKey_check(TriggerData *trigdata)
 
 					/*
 					 * MATCH PARTIAL - all non-null columns must match. (not
-					 * implemented, can be done by modifying the query below
-					 * to only include non-null columns, or by writing a
-					 * special version here)
+					 * implemented, can be done by modifying
+					 * LookupKeyInPkRelPlanExecute() to only include non-null
+					 * columns.
 					 */
 					break;
 #endif
@@ -411,24 +438,15 @@ RI_FKey_check(TriggerData *trigdata)
 
 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
 	{
-		StringInfoData querybuf;
-		char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
-		char		attname[MAX_QUOTED_NAME_LEN];
-		char		paramname[16];
-		const char *querysep;
-		Oid			queryoids[RI_MAX_NUMKEYS];
-		const char *pk_only;
-
 		/* ----------
-		 * The query string built is
+		 * For simple FKs, use ri_LookupKeyInPkRelPlanCreate() to create
+		 * the plan to check the row, which is equivalent to doing
 		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
 		 *		   FOR KEY SHARE OF x
-		 * The type id's for the $ parameters are those of the
-		 * corresponding FK attributes.
 		 *
-		 * But for temporal FKs we need to make sure
-		 * the FK's range is completely covered.
-		 * So we use this query instead:
+		 * But for temporal FKs we use ri_SqlStringPlanCreate() because we need
+		 * to make sure the FK's range is completely covered, which is done
+		 * with this query instead:
 		 *  SELECT 1
 		 *	FROM	(
 		 *		SELECT pkperiodatt AS r
@@ -442,45 +460,45 @@ RI_FKey_check(TriggerData *trigdata)
 		 * we can make this a bit simpler.
 		 * ----------
 		 */
-		initStringInfo(&querybuf);
-		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
-			"" : "ONLY ";
-		quoteRelationName(pkrelname, pk_rel);
 		if (riinfo->hasperiod)
 		{
+			StringInfoData querybuf;
+			char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
+			char		attname[MAX_QUOTED_NAME_LEN];
+			char		paramname[16];
+			const char *querysep;
+			Oid			queryoids[RI_MAX_NUMKEYS];
+			const char *pk_only;
+			Oid			fk_type;
+
+			initStringInfo(&querybuf);
+			pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+				"" : "ONLY ";
+			quoteRelationName(pkrelname, pk_rel);
 			quoteOneName(attname,
 						 RIAttName(pk_rel, riinfo->pk_attnums[riinfo->nkeys - 1]));
-
 			appendStringInfo(&querybuf,
 							 "SELECT 1 FROM (SELECT %s AS r FROM %s%s x",
 							 attname, pk_only, pkrelname);
-		}
-		else
-		{
-			appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
-							 pk_only, pkrelname);
-		}
-		querysep = "WHERE";
-		for (int i = 0; i < riinfo->nkeys; i++)
-		{
-			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
-			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
-
-			quoteOneName(attname,
-						 RIAttName(pk_rel, riinfo->pk_attnums[i]));
-			sprintf(paramname, "$%d", i + 1);
-			ri_GenerateQual(&querybuf, querysep,
-							attname, pk_type,
-							riinfo->pf_eq_oprs[i],
-							paramname, fk_type);
-			querysep = "AND";
-			queryoids[i] = fk_type;
-		}
-		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
-		if (riinfo->hasperiod)
-		{
-			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[riinfo->nkeys - 1]);
+			querysep = "WHERE";
+			for (int i = 0; i < riinfo->nkeys; i++)
+			{
+				Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
+
+				fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+				quoteOneName(attname,
+							 RIAttName(pk_rel, riinfo->pk_attnums[i]));
+				sprintf(paramname, "$%d", i + 1);
+				ri_GenerateQual(&querybuf, querysep,
+								attname, pk_type,
+								riinfo->pf_eq_oprs[i],
+								paramname, fk_type);
+				querysep = "AND";
+				queryoids[i] = fk_type;
+			}
+			appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
 
+			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[riinfo->nkeys - 1]);
 			appendStringInfo(&querybuf, ") x1 HAVING ");
 			sprintf(paramname, "$%d", riinfo->nkeys);
 			ri_GenerateQual(&querybuf, "",
@@ -488,26 +506,24 @@ RI_FKey_check(TriggerData *trigdata)
 							riinfo->agged_period_contained_by_oper,
 							"pg_catalog.range_agg", ANYMULTIRANGEOID);
 			appendStringInfo(&querybuf, "(x1.r)");
-		}
 
-		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
-							 querybuf.data, riinfo->nkeys, queryoids,
-							 &qkey, fk_rel, pk_rel);
+			/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
+			qplan = ri_PlanCheck(riinfo, ri_SqlStringPlanCreate,
+								 querybuf.data, riinfo->nkeys, queryoids,
+								 &qkey, fk_rel, pk_rel);
+		}
+		else
+			qplan = ri_PlanCheck(riinfo, ri_LookupKeyInPkRelPlanCreate,
+								 NULL, 0 /* nargs */, NULL /* argtypes */,
+								 &qkey, fk_rel, pk_rel);
 	}
 
-	/*
-	 * Now check that foreign key exists in PK table
-	 *
-	 * XXX detectNewRows must be true when a partitioned table is on the
-	 * referenced side.  The reason is that our snapshot must be fresh in
-	 * order for the hack in find_inheritance_children() to work.
-	 */
+	/* Now check that foreign key exists in PK table */
 	ri_PerformCheck(riinfo, &qkey, qplan,
 					fk_rel, pk_rel,
 					NULL, newslot,
 					false,
-					pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE,
+					false,
 					CMD_SELECT);
 
 	table_close(pk_rel, RowShareLock);
@@ -578,24 +594,15 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 
 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
 	{
-		StringInfoData querybuf;
-		char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
-		char		attname[MAX_QUOTED_NAME_LEN];
-		char		paramname[16];
-		const char *querysep;
-		const char *pk_only;
-		Oid			queryoids[RI_MAX_NUMKEYS];
-
 		/* ----------
-		 * The query string built is
+		 * For simple FKs, use ri_LookupKeyInPkRelPlanCreate() to create
+		 * the plan to check the row, which is equivalent to doing
 		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
 		 *		   FOR KEY SHARE OF x
-		 * The type id's for the $ parameters are those of the
-		 * PK attributes themselves.
 		 *
-		 * But for temporal FKs we need to make sure
-		 * the old PK's range is completely covered.
-		 * So we use this query instead:
+		 * But for temporal FKs we use ri_SqlStringPlanCreate() because we need
+		 * to make sure the FK's range is completely covered, which is done
+		 * with this query instead:
 		 *  SELECT 1
 		 *  FROM    (
 		 *	  SELECT pkperiodatt AS r
@@ -609,43 +616,44 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 		 * we can make this a bit simpler.
 		 * ----------
 		 */
-		initStringInfo(&querybuf);
-		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
-			"" : "ONLY ";
-		quoteRelationName(pkrelname, pk_rel);
 		if (riinfo->hasperiod)
 		{
+			StringInfoData querybuf;
+			char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
+			char		attname[MAX_QUOTED_NAME_LEN];
+			char		paramname[16];
+			const char *querysep;
+			const char *pk_only;
+			Oid			queryoids[RI_MAX_NUMKEYS];
+			Oid			fk_type;
+
+			initStringInfo(&querybuf);
+			pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+				"" : "ONLY ";
+			quoteRelationName(pkrelname, pk_rel);
 			quoteOneName(attname, RIAttName(pk_rel, riinfo->pk_attnums[riinfo->nkeys - 1]));
 
 			appendStringInfo(&querybuf,
 							 "SELECT 1 FROM (SELECT %s AS r FROM %s%s x",
 							 attname, pk_only, pkrelname);
-		}
-		else
-		{
-			appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
-							 pk_only, pkrelname);
-		}
-		querysep = "WHERE";
-		for (int i = 0; i < riinfo->nkeys; i++)
-		{
-			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
-
-			quoteOneName(attname,
-						 RIAttName(pk_rel, riinfo->pk_attnums[i]));
-			sprintf(paramname, "$%d", i + 1);
-			ri_GenerateQual(&querybuf, querysep,
-							attname, pk_type,
-							riinfo->pp_eq_oprs[i],
-							paramname, pk_type);
-			querysep = "AND";
-			queryoids[i] = pk_type;
-		}
-		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
-		if (riinfo->hasperiod)
-		{
-			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[riinfo->nkeys - 1]);
+			querysep = "WHERE";
+			for (int i = 0; i < riinfo->nkeys; i++)
+			{
+				Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
+
+				quoteOneName(attname,
+							 RIAttName(pk_rel, riinfo->pk_attnums[i]));
+				sprintf(paramname, "$%d", i + 1);
+				ri_GenerateQual(&querybuf, querysep,
+								attname, pk_type,
+								riinfo->pp_eq_oprs[i],
+								paramname, pk_type);
+				querysep = "AND";
+				queryoids[i] = pk_type;
+			}
+			appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
 
+			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[riinfo->nkeys - 1]);
 			appendStringInfo(&querybuf, ") x1 HAVING ");
 			sprintf(paramname, "$%d", riinfo->nkeys);
 			ri_GenerateQual(&querybuf, "",
@@ -653,12 +661,15 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 							riinfo->agged_period_contained_by_oper,
 							"pg_catalog.range_agg", ANYMULTIRANGEOID);
 			appendStringInfo(&querybuf, "(x1.r)");
+			/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
+			qplan = ri_PlanCheck(riinfo, ri_SqlStringPlanCreate,
+								 querybuf.data, riinfo->nkeys, queryoids,
+								 &qkey, fk_rel, pk_rel);
 		}
-
-		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
-							 querybuf.data, riinfo->nkeys, queryoids,
-							 &qkey, fk_rel, pk_rel);
+		else
+			qplan = ri_PlanCheck(riinfo, ri_LookupKeyInPkRelPlanCreate,
+								 NULL, 0 /* nargs */, NULL /* argtypes */,
+								 &qkey, fk_rel, pk_rel);
 	}
 
 	/*
@@ -840,7 +851,7 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
 		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
 
 		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+		qplan = ri_PlanCheck(riinfo, ri_SqlStringPlanCreate,
 							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
@@ -937,7 +948,7 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
 		}
 
 		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+		qplan = ri_PlanCheck(riinfo, ri_SqlStringPlanCreate,
 							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
@@ -1051,7 +1062,7 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
 		appendBinaryStringInfo(&querybuf, qualbuf.data, qualbuf.len);
 
 		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+		qplan = ri_PlanCheck(riinfo, ri_SqlStringPlanCreate,
 							 querybuf.data, riinfo->nkeys * 2, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
@@ -1275,7 +1286,7 @@ ri_set(TriggerData *trigdata, bool is_set_null, int tgkind)
 		}
 
 		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+		qplan = ri_PlanCheck(riinfo, ri_SqlStringPlanCreate,
 							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
@@ -2090,6 +2101,11 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
 	 * saving lots of work and memory when there are many partitions with
 	 * similar FK constraints.
 	 *
+	 * We must not share the plan for RI_PLAN_CHECK_LOOKUPPK queries either,
+	 * because its execution function (ri_LookupKeyInPkRel()) expects to see
+	 * the RI_ConstraintInfo of the individual leaf partitions that the
+	 * query fired on.
+	 *
 	 * (Note that we must still have a separate RI_ConstraintInfo for each
 	 * constraint, because partitions can have different column orders,
 	 * resulting in different pk_attnums[] or fk_attnums[] array contents.)
@@ -2097,7 +2113,8 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
 	 * We assume struct RI_QueryKey contains no padding bytes, else we'd need
 	 * to use memset to clear them.
 	 */
-	if (constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK)
+	if (constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK &&
+		constr_queryno != RI_PLAN_CHECK_LOOKUPPK)
 		key->constr_id = riinfo->constraint_root_id;
 	else
 		key->constr_id = riinfo->constraint_id;
@@ -2375,10 +2392,17 @@ InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue)
 	}
 }
 
+typedef enum RI_Plantype
+{
+	RI_PLAN_SQL = 0,
+	RI_PLAN_CHECK_FUNCTION
+} RI_Plantype;
+
 /* Query string or an equivalent name to show in the error CONTEXT. */
 typedef struct RIErrorCallbackArg
 {
 	const char *query;
+	RI_Plantype plantype;
 } RIErrorCallbackArg;
 
 /*
@@ -2408,7 +2432,17 @@ _RI_error_callback(void *arg)
 		internalerrquery(query);
 	}
 	else
-		errcontext("SQL statement \"%s\"", query);
+	{
+		switch (carg->plantype)
+		{
+			case RI_PLAN_SQL:
+				errcontext("SQL statement \"%s\"", query);
+				break;
+			case RI_PLAN_CHECK_FUNCTION:
+				errcontext("RI check function \"%s\"", query);
+				break;
+		}
+	}
 }
 
 /*
@@ -2644,14 +2678,387 @@ ri_SqlStringPlanFree(RI_Plan *plan)
 	}
 }
 
+/*
+ * Creates an RI_Plan to look a key up in the PK table.
+ *
+ * Not much to do beside initializing the expected callback members, because
+ * there is no query string to parse and plan.
+ */
+static void
+ri_LookupKeyInPkRelPlanCreate(RI_Plan *plan,
+							  const char *querystr, int nargs, Oid *paramtypes)
+{
+	Assert(querystr == NULL);
+	plan->plan_exec_func = ri_LookupKeyInPkRel;
+	plan->plan_exec_arg = NULL;
+	plan->plan_is_valid_func = ri_LookupKeyInPkRelPlanIsValid;
+	plan->plan_free_func = ri_LookupKeyInPkRelPlanFree;
+}
+
+/*
+ * get_fkey_unique_index
+ * 		Returns the unique index used by a supposedly foreign key constraint
+ */
+static Oid
+get_fkey_unique_index(Oid conoid)
+{
+	Oid			result = InvalidOid;
+	HeapTuple	tp;
+
+	tp = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conoid));
+	if (HeapTupleIsValid(tp))
+	{
+		Form_pg_constraint contup = (Form_pg_constraint) GETSTRUCT(tp);
+
+		if (contup->contype == CONSTRAINT_FOREIGN)
+			result = contup->conindid;
+		ReleaseSysCache(tp);
+	}
+
+	if (!OidIsValid(result))
+		elog(ERROR, "unique index not found for foreign key constraint %u",
+			 conoid);
+
+	return result;
+}
+
+/*
+ * ri_CheckPermissions
+ * 		Check that the new user has permissions to look into the schema of
+ * 		and SELECT from 'query_rel'
+ *
+ * Provided for non-SQL implementors of an RI_Plan.
+ */
+static void
+ri_CheckPermissions(Relation query_rel)
+{
+	AclResult	aclresult;
+
+	/* USAGE on schema. */
+	aclresult = object_aclcheck(NamespaceRelationId,
+								RelationGetNamespace(query_rel),
+								GetUserId(), ACL_USAGE);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_SCHEMA,
+					   get_namespace_name(RelationGetNamespace(query_rel)));
+
+	/* SELECT on relation. */
+	aclresult = pg_class_aclcheck(RelationGetRelid(query_rel), GetUserId(),
+								  ACL_SELECT);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_TABLE,
+					   RelationGetRelationName(query_rel));
+}
+
+/*
+ * This checks that the index key of the tuple specified in 'new_slot' matches
+ * the key that has already been found in the PK index relation 'idxrel'.
+ *
+ * Returns true if the index key of the tuple matches the existing index
+ * key, false otherwise.
+ */
+static bool
+recheck_matched_pk_tuple(Relation idxrel, ScanKeyData *skeys,
+						 TupleTableSlot *new_slot)
+{
+	IndexInfo *indexInfo = BuildIndexInfo(idxrel);
+	Datum		values[INDEX_MAX_KEYS];
+	bool		isnull[INDEX_MAX_KEYS];
+	bool		matched = true;
+
+	/* PK indexes never have these. */
+	Assert(indexInfo->ii_Expressions == NIL &&
+		   indexInfo->ii_ExclusionOps == NULL);
+
+	/* Form the index values and isnull flags given the table tuple. */
+	FormIndexDatum(indexInfo, new_slot, NULL, values, isnull);
+	for (int i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
+	{
+		ScanKeyData		*skey = &skeys[i];
+
+		/* A PK column can never be set to NULL. */
+		Assert(!isnull[i]);
+		if (!DatumGetBool(FunctionCall2Coll(&skey->sk_func,
+											skey->sk_collation,
+											skey->sk_argument,
+											values[i])))
+		{
+			matched = false;
+			break;
+		}
+	}
+
+	return matched;
+}
+
+/*
+ * Checks whether a tuple containing the given unique key given by pk_vals,
+ * pk_nulls exists in 'pk_rel'.  The key is looked up using the constraint's
+ * index given in plan->riinfo.
+ *
+ * If 'pk_rel' is a partitioned table, the check is performed on its leaf
+ * partition that would contain the key.
+ *
+ * The provided tuple is either the one being inserted into the referencing
+ * relation (fk_rel) or the one being deleted from the referenced relation
+ * (pk_rel).
+ */
+static int
+ri_LookupKeyInPkRel(struct RI_Plan *plan,
+					Relation fk_rel, Relation pk_rel,
+					Datum *pk_vals, char *pk_nulls,
+					Snapshot crosscheck_snapshot,
+					int limit, CmdType *last_stmt_cmdtype)
+{
+	const RI_ConstraintInfo *riinfo = plan->riinfo;
+	Oid			constr_id = riinfo->constraint_id;
+	Oid			idxoid;
+	Relation	idxrel;
+	Relation	leaf_pk_rel = NULL;
+	int			num_pk;
+	int			i;
+	int			tuples_processed = 0;
+	const Oid  *eq_oprs;
+	Datum		pk_values[INDEX_MAX_KEYS];
+	bool		pk_isnulls[INDEX_MAX_KEYS];
+	ScanKeyData skey[INDEX_MAX_KEYS];
+	IndexScanDesc	scan;
+	TupleTableSlot *outslot;
+	RIErrorCallbackArg ricallbackarg;
+	ErrorContextCallback rierrcontext;
+
+	/* We're effectively doing a CMD_SELECT below. */
+	*last_stmt_cmdtype = CMD_SELECT;
+
+	/*
+	 * Setup error traceback support for ereport()
+	 */
+	ricallbackarg.query = pstrdup("ri_LookupKeyInPkRel");
+	ricallbackarg.plantype = RI_PLAN_CHECK_FUNCTION;
+	rierrcontext.callback = _RI_error_callback;
+	rierrcontext.arg = &ricallbackarg;
+	rierrcontext.previous = error_context_stack;
+	error_context_stack = &rierrcontext;
+
+	/* XXX Maybe afterTriggerInvokeEvents() / AfterTriggerExecute() should? */
+	CHECK_FOR_INTERRUPTS();
+
+	ri_CheckPermissions(pk_rel);
+
+	/*
+	 * Choose the equality operators to use when scanning the PK index below.
+	 *
+	 * May need to cast the foreign key value (of the FK column's type) to
+	 * the corresponding PK column's type if the equality operator
+	 * demands it.
+	 */
+	if (plan->constr_queryno == RI_PLAN_CHECK_LOOKUPPK_FROM_PK)
+	{
+		/* Use PK = PK equality operator. */
+		eq_oprs = riinfo->pp_eq_oprs;
+
+		for (i = 0; i < riinfo->nkeys; i++)
+		{
+			if (pk_nulls[i] != 'n')
+			{
+				pk_isnulls[i] = false;
+				pk_values[i] = pk_vals[i];
+			}
+			else
+			{
+				Assert(false);
+			}
+		}
+	}
+	else
+	{
+		Assert(plan->constr_queryno == RI_PLAN_CHECK_LOOKUPPK);
+		/* Use PK = FK equality operator. */
+		eq_oprs = riinfo->pf_eq_oprs;
+
+		for (i = 0; i < riinfo->nkeys; i++)
+		{
+			if (pk_nulls[i] != 'n')
+			{
+				Oid		eq_opr = eq_oprs[i];
+				Oid		typeid = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+				RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
+
+				pk_isnulls[i] = false;
+				pk_values[i] = pk_vals[i];
+				if (OidIsValid(entry->cast_func_finfo.fn_oid))
+				{
+					pk_values[i] = FunctionCall3(&entry->cast_func_finfo,
+												 pk_vals[i],
+												 Int32GetDatum(-1), /* typmod */
+												 BoolGetDatum(false)); /* implicit coercion */
+				}
+			}
+			else
+			{
+				Assert(false);
+			}
+		}
+	}
+
+	/*
+	 * Open the constraint index to be scanned.
+	 *
+	 * If the target table is partitioned, we must look up the leaf partition
+	 * and its corresponding unique index to search the keys in.
+	 */
+	idxoid = get_fkey_unique_index(constr_id);
+	if (pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		Oid		leaf_idxoid;
+		PartitionDirectory partdir;
+
+		/*
+		 * Pass the latest snapshot for omit_detached_snapshot so that any
+		 * detach-pending partitions are correctly omitted or included from
+		 * the considerations of this lookup.  The PartitionDesc machinery
+		 * that runs as part of this will need to use the snapshot to determine
+		 * whether to omit or include any detach-pending partition based on the
+		 * whether the pg_inherits row that marks it as detach-pending is
+		 * is visible to it or not, respectively.
+		 */
+		partdir = CreatePartitionDirectory(CurrentMemoryContext,
+										   GetLatestSnapshot());
+		leaf_pk_rel = ExecGetLeafPartitionForKey(partdir,
+												 pk_rel, riinfo->nkeys,
+												 riinfo->pk_attnums,
+												 pk_values, pk_isnulls,
+												 idxoid, RowShareLock,
+												 &leaf_idxoid);
+
+		/*
+		 * XXX - Would be nice if this could be saved across calls. Problem
+		 * with just putting it in RI_Plan.plan_exec_arg is that the RI_Plan
+		 * is cached for the session duration, whereas the PartitionDirectory
+		 * can't last past the transaction.
+		 */
+		DestroyPartitionDirectory(partdir);
+
+		/*
+		 * If no suitable leaf partition exists, neither can the key we're
+		 * looking for.
+		 */
+		if (leaf_pk_rel == NULL)
+			goto done;
+
+		pk_rel = leaf_pk_rel;
+		idxoid = leaf_idxoid;
+	}
+	idxrel = index_open(idxoid, RowShareLock);
+
+	/*
+	 * Set up ScanKeys for the index scan.  This is essentially how
+	 * ExecIndexBuildScanKeys() sets them up.
+	 */
+	num_pk = IndexRelationGetNumberOfKeyAttributes(idxrel);
+	for (i = 0; i < num_pk; i++)
+	{
+		int			pkattno = i + 1;
+		Oid			lefttype,
+					righttype;
+		Oid			operator = eq_oprs[i];
+		Oid			opfamily = idxrel->rd_opfamily[i];
+		int			strat;
+		RegProcedure regop = get_opcode(operator);
+
+		Assert(!pk_isnulls[i]);
+		get_op_opfamily_properties(operator, opfamily, false, &strat,
+								   &lefttype, &righttype);
+		ScanKeyEntryInitialize(&skey[i], 0, pkattno, strat, righttype,
+							   idxrel->rd_indcollation[i], regop,
+							   pk_values[i]);
+	}
+
+	Assert(ActiveSnapshotSet());
+	scan = index_beginscan(pk_rel, idxrel, GetActiveSnapshot(), num_pk, 0);
+
+	/* Install the ScanKeys. */
+	index_rescan(scan, skey, num_pk, NULL, 0);
+
+	/* Look for the tuple, and if found, try to lock it in key share mode. */
+	outslot = table_slot_create(pk_rel, NULL);
+	while (index_getnext_slot(scan, ForwardScanDirection, outslot))
+	{
+		bool	tuple_concurrently_updated;
+
+		/*
+		 * If we fail to lock the tuple for whatever reason, assume it doesn't
+		 * exist.  If the locked tuple is the one that was found to be updated
+		 * concurrently, retry.
+		 */
+		if (ExecLockTableTuple(pk_rel, &(outslot->tts_tid), outslot,
+							   GetActiveSnapshot(),
+							   GetCurrentCommandId(false),
+							   LockTupleKeyShare,
+							   LockWaitBlock,
+							   &tuple_concurrently_updated))
+		{
+			bool	matched = true;
+
+			/*
+			 * If the matched table tuple has been updated, check if the key is
+			 * still the same.
+			 *
+			 * This emulates EvalPlanQual() in the executor.
+			 */
+			if (tuple_concurrently_updated &&
+				!recheck_matched_pk_tuple(idxrel, skey, outslot))
+				matched = false;
+
+			if (matched)
+				tuples_processed = 1;
+		}
+
+		break;
+	}
+
+	index_endscan(scan);
+	ExecDropSingleTupleTableSlot(outslot);
+
+	/* Don't release lock until commit. */
+	index_close(idxrel, NoLock);
+
+	/* Close leaf partition relation if any. */
+	if (leaf_pk_rel)
+		table_close(leaf_pk_rel, NoLock);
+
+done:
+	/*
+	 * Pop the error context stack
+	 */
+	error_context_stack = rierrcontext.previous;
+
+	return tuples_processed;
+}
+
+static bool
+ri_LookupKeyInPkRelPlanIsValid(RI_Plan *plan)
+{
+	/* Never store anything that can be invalidated. */
+	return true;
+}
+
+static void
+ri_LookupKeyInPkRelPlanFree(RI_Plan *plan)
+{
+	/* Nothing to free. */
+}
+
 /*
  * Create an RI_Plan for a given RI check query and initialize the
  * plan callbacks and execution argument using the caller specified
  * function.
  */
 static RI_Plan *
-ri_PlanCreate(RI_PlanCreateFunc_type plan_create_func,
-			  const char *querystr, int nargs, Oid *paramtypes)
+ri_PlanCreate(const RI_ConstraintInfo *riinfo,
+			  RI_PlanCreateFunc_type plan_create_func,
+			  const char *querystr, int nargs, Oid *paramtypes,
+			  int constr_queryno)
 {
 	RI_Plan	   *plan;
 	MemoryContext plancxt,
@@ -2666,6 +3073,8 @@ ri_PlanCreate(RI_PlanCreateFunc_type plan_create_func,
 									ALLOCSET_SMALL_SIZES);
 	oldcxt = MemoryContextSwitchTo(plancxt);
 	plan = (RI_Plan *) palloc0(sizeof(*plan));
+	plan->riinfo = riinfo;
+	plan->constr_queryno = constr_queryno;
 	plan->plancxt = plancxt;
 	plan->nargs = nargs;
 	if (plan->nargs > 0)
@@ -2730,7 +3139,8 @@ ri_FreePlan(RI_Plan *plan)
  * Prepare execution plan for a query to enforce an RI restriction
  */
 static RI_Plan *
-ri_PlanCheck(RI_PlanCreateFunc_type plan_create_func,
+ri_PlanCheck(const RI_ConstraintInfo *riinfo,
+			 RI_PlanCreateFunc_type plan_create_func,
 			 const char *querystr, int nargs, Oid *argtypes,
 			 RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel)
 {
@@ -2754,7 +3164,8 @@ ri_PlanCheck(RI_PlanCreateFunc_type plan_create_func,
 						   save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
 						   SECURITY_NOFORCE_RLS);
 	/* Create the plan */
-	qplan = ri_PlanCreate(plan_create_func, querystr, nargs, argtypes);
+	qplan = ri_PlanCreate(riinfo, plan_create_func, querystr, nargs,
+						  argtypes, qkey->constr_queryno);
 
 	/* Restore UID and security context */
 	SetUserIdAndSecContext(save_userid, save_sec_context);
@@ -3399,7 +3810,10 @@ ri_CompareWithCast(Oid eq_opr, Oid typeid, Oid collid,
  * ri_HashCompareOp -
  *
  * See if we know how to compare two values, and create a new hash entry
- * if not.
+ * if not.  The entry contains the FmgrInfo of the equality operator function
+ * and that of the cast function, if one is needed to convert the right
+ * operand (whose type OID has been passed) before passing it to the equality
+ * function.
  */
 static RI_CompareHashEntry *
 ri_HashCompareOp(Oid eq_opr, Oid typeid)
@@ -3455,8 +3869,16 @@ ri_HashCompareOp(Oid eq_opr, Oid typeid)
 		 * moment since that will never be generated for implicit coercions.
 		 */
 		op_input_types(eq_opr, &lefttype, &righttype);
-		Assert(lefttype == righttype);
-		if (typeid == lefttype)
+
+		/*
+		 * Don't need to cast if the values that will be passed to the
+		 * operator will be of expected operand type(s).  The operator can be
+		 * cross-type (such as when called by ri_LookupKeyInPkRel()), in which
+		 * case, we only need the cast if the right operand value doesn't match
+		 * the type expected by the operator.
+		 */
+		if ((lefttype == righttype && typeid == lefttype) ||
+			(lefttype != righttype && typeid == righttype))
 			castfunc = InvalidOid;	/* simplest case */
 		else
 		{
diff --git a/src/include/executor/execPartition.h b/src/include/executor/execPartition.h
index c09bc83b2a..e285427b48 100644
--- a/src/include/executor/execPartition.h
+++ b/src/include/executor/execPartition.h
@@ -31,6 +31,13 @@ extern ResultRelInfo *ExecFindPartition(ModifyTableState *mtstate,
 										EState *estate);
 extern void ExecCleanupTupleRouting(ModifyTableState *mtstate,
 									PartitionTupleRouting *proute);
+extern Relation ExecGetLeafPartitionForKey(PartitionDirectory partdir,
+										   Relation root_rel,
+										   int key_natts,
+										   const AttrNumber *key_attnums,
+										   Datum *key_vals, bool *key_nulls,
+										   Oid root_idxoid, int lockmode,
+										   Oid *leaf_idxoid);
 
 
 /*
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index c8e6befca8..fbf6b6c2c5 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -262,6 +262,15 @@ extern void ExecShutdownNode(PlanState *node);
 extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node);
 
 
+/*
+ * functions in execLockRows.c
+ */
+
+extern bool ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
+				   Snapshot snapshot, CommandId cid,
+				   LockTupleMode lockmode, LockWaitPolicy waitPolicy,
+				   bool *tuple_concurrently_updated);
+
 /* ----------------------------------------------------------------
  *		ExecProcNode
  *
diff --git a/src/test/isolation/expected/fk-concurrent-pk-upd.out b/src/test/isolation/expected/fk-concurrent-pk-upd.out
new file mode 100644
index 0000000000..9bbec638ac
--- /dev/null
+++ b/src/test/isolation/expected/fk-concurrent-pk-upd.out
@@ -0,0 +1,58 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s2ukey s1i s2c s1c s2s s1s
+step s2ukey: UPDATE parent SET parent_key = 2 WHERE parent_key = 1;
+step s1i: INSERT INTO child VALUES (1, 1); <waiting ...>
+step s2c: COMMIT;
+step s1i: <... completed>
+ERROR:  insert or update on table "child" violates foreign key constraint "child_parent_key_fkey"
+step s1c: COMMIT;
+step s2s: SELECT * FROM parent;
+parent_key|aux
+----------+---
+         2|foo
+(1 row)
+
+step s1s: SELECT * FROM child;
+child_key|parent_key
+---------+----------
+(0 rows)
+
+
+starting permutation: s2uaux s1i s2c s1c s2s s1s
+step s2uaux: UPDATE parent SET aux = 'bar' WHERE parent_key = 1;
+step s1i: INSERT INTO child VALUES (1, 1);
+step s2c: COMMIT;
+step s1c: COMMIT;
+step s2s: SELECT * FROM parent;
+parent_key|aux
+----------+---
+         1|bar
+(1 row)
+
+step s1s: SELECT * FROM child;
+child_key|parent_key
+---------+----------
+        1|         1
+(1 row)
+
+
+starting permutation: s2ukey s1i s2ukey2 s2c s1c s2s s1s
+step s2ukey: UPDATE parent SET parent_key = 2 WHERE parent_key = 1;
+step s1i: INSERT INTO child VALUES (1, 1); <waiting ...>
+step s2ukey2: UPDATE parent SET parent_key = 1 WHERE parent_key = 2;
+step s2c: COMMIT;
+step s1i: <... completed>
+step s1c: COMMIT;
+step s2s: SELECT * FROM parent;
+parent_key|aux
+----------+---
+         1|foo
+(1 row)
+
+step s1s: SELECT * FROM child;
+child_key|parent_key
+---------+----------
+        1|         1
+(1 row)
+
diff --git a/src/test/isolation/expected/fk-snapshot.out b/src/test/isolation/expected/fk-snapshot.out
index bdd26bac6c..c4a35b69bb 100644
--- a/src/test/isolation/expected/fk-snapshot.out
+++ b/src/test/isolation/expected/fk-snapshot.out
@@ -47,12 +47,12 @@ a
 
 step s2ifn2: INSERT INTO fk_noparted VALUES (2);
 step s2c: COMMIT;
+ERROR:  insert or update on table "fk_noparted" violates foreign key constraint "fk_noparted_a_fkey"
 step s2sfn: SELECT * FROM fk_noparted;
 a
 -
 1
-2
-(2 rows)
+(1 row)
 
 
 starting permutation: s1brc s2brc s2ip2 s1sp s2c s1sp s1ifp2 s2brc s2sfp s1c s1sfp s2ifn2 s2c s2sfn
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index 143109aa4d..106deb3034 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -34,6 +34,7 @@ test: fk-deadlock2
 test: fk-partitioned-1
 test: fk-partitioned-2
 test: fk-snapshot
+test: fk-concurrent-pk-upd
 test: subxid-overflow
 test: eval-plan-qual
 test: eval-plan-qual-trigger
diff --git a/src/test/isolation/specs/fk-concurrent-pk-upd.spec b/src/test/isolation/specs/fk-concurrent-pk-upd.spec
new file mode 100644
index 0000000000..4bdd92cd2d
--- /dev/null
+++ b/src/test/isolation/specs/fk-concurrent-pk-upd.spec
@@ -0,0 +1,42 @@
+# Tests that an INSERT on referencing table correctly fails when
+# the referenced value disappears due to a concurrent update
+setup
+{
+  CREATE TABLE parent (
+	parent_key	int		PRIMARY KEY,
+	aux			text	NOT NULL
+  );
+
+  CREATE TABLE child (
+	child_key	int		PRIMARY KEY,
+	parent_key	int		NOT NULL REFERENCES parent
+  );
+
+  INSERT INTO parent VALUES (1, 'foo');
+}
+
+teardown
+{
+  DROP TABLE parent, child;
+}
+
+session s1
+setup		{ BEGIN; }
+step s1i	{ INSERT INTO child VALUES (1, 1); }
+step s1c	{ COMMIT; }
+step s1s	{ SELECT * FROM child; }
+
+session s2
+setup		{ BEGIN; }
+step s2ukey	{ UPDATE parent SET parent_key = 2 WHERE parent_key = 1; }
+step s2uaux	{ UPDATE parent SET aux = 'bar' WHERE parent_key = 1; }
+step s2ukey2	{ UPDATE parent SET parent_key = 1 WHERE parent_key = 2; }
+step s2c	{ COMMIT; }
+step s2s	{ SELECT * FROM parent; }
+
+# fail
+permutation s2ukey s1i s2c s1c s2s s1s
+# ok
+permutation s2uaux s1i s2c s1c s2s s1s
+# ok
+permutation s2ukey s1i s2ukey2 s2c s1c s2s s1s
diff --git a/src/test/isolation/specs/fk-snapshot.spec b/src/test/isolation/specs/fk-snapshot.spec
index 9fad57e768..ec5fe0c50c 100644
--- a/src/test/isolation/specs/fk-snapshot.spec
+++ b/src/test/isolation/specs/fk-snapshot.spec
@@ -53,10 +53,7 @@ step s2sfn	{ SELECT * FROM fk_noparted; }
 # inserting into referencing tables in transaction-snapshot mode
 # PK table is non-partitioned
 permutation s1brr s2brc s2ip2 s1sp s2c s1sp s1ifp2 s1c s1sfp
-# PK table is partitioned: buggy, because s2's serialization transaction can
-# see the uncommitted row thanks to the latest snapshot taken for
-# partition lookup to work correctly also ends up getting used by the PK index
-# scan
+# PK table is partitioned
 permutation s2ip2 s2brr s1brc s1ifp2 s2sfp s1c s2sfp s2ifn2 s2c s2sfn
 
 # inserting into referencing tables in up-to-date snapshot mode
-- 
2.43.0

v1-0001-Explicitly-pass-snapshot-necessary-for-omit_detac.patchapplication/x-patch; name=v1-0001-Explicitly-pass-snapshot-necessary-for-omit_detac.patchDownload
From f6fb0f4b9bf68e58d8d857721bb22ded5130149a Mon Sep 17 00:00:00 2001
From: Amit Langote <amitlan@postgresql.org>
Date: Thu, 19 Dec 2024 21:20:11 +0900
Subject: [PATCH v1 1/3] Explicitly pass snapshot necessary for omit_detached
 logic

This commit changes find_inheritance_children_extended() and
RelationBuildPartitionDesc() to accept the snapshot necessary
to implement the omit_detach logic correctly.

Previously, these functions used ActiveSnapshot to check if a
detach-pending partition's pg_inherits row was visible. This
logic aimed to make RI queries over partitioned PK tables under
REPEATABLE READ isolation handle detach-pending partitions
correctly. However, forcing a snapshot onto ActiveSnapshot led
to isolation violations by making scans in the query see changes
not consistent with the parent transaction's snapshot. A test
added in commit 00cb86e75d demonstrates this issue.

The new interface of RelationBuildPartitionDesc() and relevant
callers allows passing the necessary snapshot explcitly, thus
avoiding modifications to ActiveSnapshot. Default behavior remains
unchanged when no snapshot is provided, maintaining compatibility
with non-RI queries and other uses of
find_inheritance_children_extended().

A future commit will update RI PK lookups to use this interface.

Robert Haas contributed the changes to PartitionDesc interface.

Co-author: Robert Haas
Reviewed-by: Robert Haas
Discussion: https://postgr.es/m/CA+HiwqGkfJfYdeq5vHPh6eqPKjSbfpDDY+j-kXYFePQedtSLeg@mail.gmail.com
Discussion: https://postgr.es/m/CA+HiwqG5e8pk8s7+7zhr1Nc_PGyhEdM5f=pHkMOdK1RYWXfJsg@mail.gmail.com
---
 src/backend/catalog/pg_inherits.c    | 33 +++++-----
 src/backend/executor/execPartition.c | 23 +++++--
 src/backend/optimizer/util/plancat.c |  6 +-
 src/backend/partitioning/partdesc.c  | 94 +++++++++++++++++-----------
 src/include/catalog/pg_inherits.h    |  7 ++-
 src/include/partitioning/partdesc.h  |  6 +-
 6 files changed, 109 insertions(+), 60 deletions(-)

diff --git a/src/backend/catalog/pg_inherits.c b/src/backend/catalog/pg_inherits.c
index 836b4bfd89..8917a65690 100644
--- a/src/backend/catalog/pg_inherits.c
+++ b/src/backend/catalog/pg_inherits.c
@@ -51,14 +51,18 @@ typedef struct SeenRelsEntry
  * then no locks are acquired, but caller must beware of race conditions
  * against possible DROPs of child relations.
  *
- * Partitions marked as being detached are omitted; see
+ * A partition marked as being detached is omitted from the result if the
+ * pg_inherits row showing the partition as being detached is visible to
+ * ActiveSnapshot, doing so only when one has been pushed; see
  * find_inheritance_children_extended for details.
  */
 List *
 find_inheritance_children(Oid parentrelId, LOCKMODE lockmode)
 {
-	return find_inheritance_children_extended(parentrelId, true, lockmode,
-											  NULL, NULL);
+	return find_inheritance_children_extended(parentrelId,
+											  ActiveSnapshotSet() ?
+											  GetActiveSnapshot() : NULL,
+											  lockmode, NULL, NULL);
 }
 
 /*
@@ -70,16 +74,17 @@ find_inheritance_children(Oid parentrelId, LOCKMODE lockmode)
  * If a partition's pg_inherits row is marked "detach pending",
  * *detached_exist (if not null) is set true.
  *
- * If omit_detached is true and there is an active snapshot (not the same as
- * the catalog snapshot used to scan pg_inherits!) and a pg_inherits tuple
- * marked "detach pending" is visible to that snapshot, then that partition is
- * omitted from the output list.  This makes partitions invisible depending on
- * whether the transaction that marked those partitions as detached appears
- * committed to the active snapshot.  In addition, *detached_xmin (if not null)
- * is set to the xmin of the row of the detached partition.
+ * If the caller passed 'omit_detached_snapshot', the partition whose
+ * pg_inherits tuple marks it as "detach pending" is omitted from the output
+ * list if the tuple is visible to that snapshot.  That is, such a partition
+ * is omitted from the output list depending on whether the transaction that
+ * marked that partition as detached appears committed to
+ * omit_detached_snapshot.  If omitted, *detached_xmin (if non NULL) is set
+ * to the xmin of that pg_inherits tuple.
  */
 List *
-find_inheritance_children_extended(Oid parentrelId, bool omit_detached,
+find_inheritance_children_extended(Oid parentrelId,
+								   Snapshot omit_detached_snapshot,
 								   LOCKMODE lockmode, bool *detached_exist,
 								   TransactionId *detached_xmin)
 {
@@ -140,15 +145,13 @@ find_inheritance_children_extended(Oid parentrelId, bool omit_detached,
 			if (detached_exist)
 				*detached_exist = true;
 
-			if (omit_detached && ActiveSnapshotSet())
+			if (omit_detached_snapshot)
 			{
 				TransactionId xmin;
-				Snapshot	snap;
 
 				xmin = HeapTupleHeaderGetXmin(inheritsTuple->t_data);
-				snap = GetActiveSnapshot();
 
-				if (!XidInMVCCSnapshot(xmin, snap))
+				if (!XidInMVCCSnapshot(xmin, omit_detached_snapshot))
 				{
 					if (detached_xmin)
 					{
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index 7651886229..d26cf20003 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -31,6 +31,7 @@
 #include "utils/partcache.h"
 #include "utils/rls.h"
 #include "utils/ruleutils.h"
+#include "utils/snapmgr.h"
 
 
 /*-----------------------
@@ -1101,17 +1102,24 @@ ExecInitPartitionDispatchInfo(EState *estate,
 	MemoryContext oldcxt;
 
 	/*
-	 * For data modification, it is better that executor does not include
-	 * partitions being detached, except when running in snapshot-isolation
-	 * mode.  This means that a read-committed transaction immediately gets a
+	 * For data modification, it is better that executor omits the partitions
+	 * being detached, except when running in snapshot-isolation mode.  This
+	 * means that a read-committed transaction immediately gets a
 	 * "no partition for tuple" error when a tuple is inserted into a
 	 * partition that's being detached concurrently, but a transaction in
 	 * repeatable-read mode can still use such a partition.
 	 */
 	if (estate->es_partition_directory == NULL)
+	{
+		Snapshot	omit_detached_snapshot = NULL;
+
+		Assert(ActiveSnapshotSet());
+		if (!IsolationUsesXactSnapshot())
+			omit_detached_snapshot = GetActiveSnapshot();
 		estate->es_partition_directory =
 			CreatePartitionDirectory(estate->es_query_cxt,
-									 !IsolationUsesXactSnapshot());
+									 omit_detached_snapshot);
+	}
 
 	oldcxt = MemoryContextSwitchTo(proute->memcxt);
 
@@ -1871,10 +1879,13 @@ CreatePartitionPruneState(PlanState *planstate, PartitionPruneInfo *pruneinfo)
 	int			i;
 	ExprContext *econtext = planstate->ps_ExprContext;
 
-	/* For data reading, executor always includes detached partitions */
+	/*
+	 * For data reading, executor always includes detached partitions,
+	 * so pass NULL for omit_detached_snapshot.
+	 */
 	if (estate->es_partition_directory == NULL)
 		estate->es_partition_directory =
-			CreatePartitionDirectory(estate->es_query_cxt, false);
+			CreatePartitionDirectory(estate->es_query_cxt, NULL);
 
 	n_part_hierarchies = list_length(pruneinfo->prune_infos);
 	Assert(n_part_hierarchies > 0);
diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c
index 153390f2dc..ee146db082 100644
--- a/src/backend/optimizer/util/plancat.c
+++ b/src/backend/optimizer/util/plancat.c
@@ -2378,11 +2378,15 @@ set_relation_partition_info(PlannerInfo *root, RelOptInfo *rel,
 
 	/*
 	 * Create the PartitionDirectory infrastructure if we didn't already.
+	 * Note that the planner always omits the partitions being detached
+	 * concurrently.
 	 */
 	if (root->glob->partition_directory == NULL)
 	{
+		Assert(ActiveSnapshotSet());
 		root->glob->partition_directory =
-			CreatePartitionDirectory(CurrentMemoryContext, true);
+			CreatePartitionDirectory(CurrentMemoryContext,
+									 GetActiveSnapshot());
 	}
 
 	partdesc = PartitionDirectoryLookup(root->glob->partition_directory,
diff --git a/src/backend/partitioning/partdesc.c b/src/backend/partitioning/partdesc.c
index b4e0ed0e71..a80bbe7378 100644
--- a/src/backend/partitioning/partdesc.c
+++ b/src/backend/partitioning/partdesc.c
@@ -36,7 +36,7 @@ typedef struct PartitionDirectoryData
 {
 	MemoryContext pdir_mcxt;
 	HTAB	   *pdir_hash;
-	bool		omit_detached;
+	Snapshot	omit_detached_snapshot;
 }			PartitionDirectoryData;
 
 typedef struct PartitionDirectoryEntry
@@ -47,17 +47,23 @@ typedef struct PartitionDirectoryEntry
 } PartitionDirectoryEntry;
 
 static PartitionDesc RelationBuildPartitionDesc(Relation rel,
-												bool omit_detached);
+												Snapshot omit_detached_snapshot);
 
 
 /*
- * RelationGetPartitionDesc -- get partition descriptor, if relation is partitioned
+ * RelationGetPartitionDescExt
+ * 		Get partition descriptor of a partitioned table, building one and
+ * 		caching it for later use if not already or if the cached one would
+ * 		not be suitable for a given request
  *
  * We keep two partdescs in relcache: rd_partdesc includes all partitions
- * (even those being concurrently marked detached), while rd_partdesc_nodetached
- * omits (some of) those.  We store the pg_inherits.xmin value for the latter,
- * to determine whether it can be validly reused in each case, since that
- * depends on the active snapshot.
+ * (even the one being concurrently marked detached), while
+ * rd_partdesc_nodetached omits the detach-pending partition.  If the latter one
+ * is present, rd_partdesc_nodetach_xmin would have been set to the xmin of
+ * the detach-pending partition's pg_inherits row, which is used to determine
+ * whether rd_partdesc_nodetach can be validly reused for a given request by
+ * checking if the xmin appears visible to 'omit_detached_snapshot' passed by
+ * the caller.
  *
  * Note: we arrange for partition descriptors to not get freed until the
  * relcache entry's refcount goes to zero (see hacks in RelationClose,
@@ -68,7 +74,7 @@ static PartitionDesc RelationBuildPartitionDesc(Relation rel,
  * that the data doesn't become stale.
  */
 PartitionDesc
-RelationGetPartitionDesc(Relation rel, bool omit_detached)
+RelationGetPartitionDescExt(Relation rel, Snapshot omit_detached_snapshot)
 {
 	Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
 
@@ -77,36 +83,51 @@ RelationGetPartitionDesc(Relation rel, bool omit_detached)
 	 * do so when we are asked to include all partitions including detached;
 	 * and also when we know that there are no detached partitions.
 	 *
-	 * If there is no active snapshot, detached partitions aren't omitted
-	 * either, so we can use the cached descriptor too in that case.
+	 * omit_detached_snapshot being NULL means that the caller doesn't care
+	 * that the returned partition descriptor may contain detached partitions,
+	 * so we we can used the cached descriptor in that case too.
 	 */
 	if (likely(rel->rd_partdesc &&
-			   (!rel->rd_partdesc->detached_exist || !omit_detached ||
-				!ActiveSnapshotSet())))
+			   (!rel->rd_partdesc->detached_exist ||
+				omit_detached_snapshot == NULL)))
 		return rel->rd_partdesc;
 
 	/*
-	 * If we're asked to omit detached partitions, we may be able to use a
-	 * cached descriptor too.  We determine that based on the pg_inherits.xmin
-	 * that was saved alongside that descriptor: if the xmin that was not in
-	 * progress for that active snapshot is also not in progress for the
-	 * current active snapshot, then we can use it.  Otherwise build one from
-	 * scratch.
+	 * If we're asked to omit the detached partition, we may be able to use
+	 * the other cached descriptor, which has been made to omit the detached
+	 * partition.  Whether that descriptor can be reused in this case is
+	 * determined based on cross-checking the visibility of
+	 * rd_partdesc_nodetached_xmin, that is, the pg_inherits.xmin of the
+	 * pg_inherits row of the detached partition: if the xmin seems in-progress
+	 * to both the given omit_detached_snapshot and to the snapshot that would
+	 * have been passed when rd_partdesc_nodetached was built, then we can
+	 * reuse it.  Otherwise we must build one from scratch.
 	 */
-	if (omit_detached &&
-		rel->rd_partdesc_nodetached &&
-		ActiveSnapshotSet())
+	if (rel->rd_partdesc_nodetached && omit_detached_snapshot)
 	{
-		Snapshot	activesnap;
-
 		Assert(TransactionIdIsValid(rel->rd_partdesc_nodetached_xmin));
-		activesnap = GetActiveSnapshot();
 
-		if (!XidInMVCCSnapshot(rel->rd_partdesc_nodetached_xmin, activesnap))
+		if (!XidInMVCCSnapshot(rel->rd_partdesc_nodetached_xmin,
+							   omit_detached_snapshot))
 			return rel->rd_partdesc_nodetached;
 	}
 
-	return RelationBuildPartitionDesc(rel, omit_detached);
+	return RelationBuildPartitionDesc(rel, omit_detached_snapshot);
+}
+
+/*
+ * RelationGetPartitionDesc
+ *		Like RelationGetPartitionDescExt() but for callers that are fine with
+ *		ActiveSnapshot being used as omit_detached_snapshot
+ */
+PartitionDesc
+RelationGetPartitionDesc(Relation rel, bool omit_detached)
+{
+	Snapshot	snapshot = NULL;
+
+	if (omit_detached && ActiveSnapshotSet())
+		snapshot = GetActiveSnapshot();
+	return RelationGetPartitionDescExt(rel, snapshot);
 }
 
 /*
@@ -131,7 +152,8 @@ RelationGetPartitionDesc(Relation rel, bool omit_detached)
  * for them.
  */
 static PartitionDesc
-RelationBuildPartitionDesc(Relation rel, bool omit_detached)
+RelationBuildPartitionDesc(Relation rel,
+						   Snapshot omit_detached_snapshot)
 {
 	PartitionDesc partdesc;
 	PartitionBoundInfo boundinfo = NULL;
@@ -162,7 +184,8 @@ retry:
 	detached_exist = false;
 	detached_xmin = InvalidTransactionId;
 	inhoids = find_inheritance_children_extended(RelationGetRelid(rel),
-												 omit_detached, NoLock,
+												 omit_detached_snapshot,
+												 NoLock,
 												 &detached_exist,
 												 &detached_xmin);
 
@@ -362,11 +385,11 @@ retry:
 	 *
 	 * Note that if a partition was found by the catalog's scan to have been
 	 * detached, but the pg_inherit tuple saying so was not visible to the
-	 * active snapshot (find_inheritance_children_extended will not have set
-	 * detached_xmin in that case), we consider there to be no "omittable"
-	 * detached partitions.
+	 * omit_detached_snapshot (find_inheritance_children_extended() will not
+	 * have set detached_xmin in that case), we consider there to be no
+	 * "omittable" detached partitions.
 	 */
-	is_omit = omit_detached && detached_exist && ActiveSnapshotSet() &&
+	is_omit = detached_exist && omit_detached_snapshot &&
 		TransactionIdIsValid(detached_xmin);
 
 	/*
@@ -420,7 +443,7 @@ retry:
  *		Create a new partition directory object.
  */
 PartitionDirectory
-CreatePartitionDirectory(MemoryContext mcxt, bool omit_detached)
+CreatePartitionDirectory(MemoryContext mcxt, Snapshot omit_detached_snapshot)
 {
 	MemoryContext oldcontext = MemoryContextSwitchTo(mcxt);
 	PartitionDirectory pdir;
@@ -435,7 +458,7 @@ CreatePartitionDirectory(MemoryContext mcxt, bool omit_detached)
 
 	pdir->pdir_hash = hash_create("partition directory", 256, &ctl,
 								  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-	pdir->omit_detached = omit_detached;
+	pdir->omit_detached_snapshot = omit_detached_snapshot;
 
 	MemoryContextSwitchTo(oldcontext);
 	return pdir;
@@ -468,7 +491,8 @@ PartitionDirectoryLookup(PartitionDirectory pdir, Relation rel)
 		 */
 		RelationIncrementReferenceCount(rel);
 		pde->rel = rel;
-		pde->pd = RelationGetPartitionDesc(rel, pdir->omit_detached);
+		pde->pd = RelationGetPartitionDescExt(rel,
+											  pdir->omit_detached_snapshot);
 		Assert(pde->pd != NULL);
 	}
 	return pde->pd;
diff --git a/src/include/catalog/pg_inherits.h b/src/include/catalog/pg_inherits.h
index b3da78c24b..465999795d 100644
--- a/src/include/catalog/pg_inherits.h
+++ b/src/include/catalog/pg_inherits.h
@@ -23,6 +23,7 @@
 
 #include "nodes/pg_list.h"
 #include "storage/lock.h"
+#include "utils/snapshot.h"
 
 /* ----------------
  *		pg_inherits definition.  cpp turns this into
@@ -49,8 +50,10 @@ DECLARE_INDEX(pg_inherits_parent_index, 2187, InheritsParentIndexId, pg_inherits
 
 
 extern List *find_inheritance_children(Oid parentrelId, LOCKMODE lockmode);
-extern List *find_inheritance_children_extended(Oid parentrelId, bool omit_detached,
-												LOCKMODE lockmode, bool *detached_exist, TransactionId *detached_xmin);
+extern List *find_inheritance_children_extended(Oid parentrelId,
+												Snapshot omit_detached_snapshot,
+												LOCKMODE lockmode, bool *detached_exist,
+												TransactionId *detached_xmin);
 
 extern List *find_all_inheritors(Oid parentrelId, LOCKMODE lockmode,
 								 List **numparents);
diff --git a/src/include/partitioning/partdesc.h b/src/include/partitioning/partdesc.h
index 87abfd76d7..d4a8ab3fb7 100644
--- a/src/include/partitioning/partdesc.h
+++ b/src/include/partitioning/partdesc.h
@@ -14,6 +14,7 @@
 
 #include "partitioning/partdefs.h"
 #include "utils/relcache.h"
+#include "utils/snapshot.h"
 
 /*
  * Information about partitions of a partitioned table.
@@ -65,8 +66,11 @@ typedef struct PartitionDescData
 
 
 extern PartitionDesc RelationGetPartitionDesc(Relation rel, bool omit_detached);
+extern PartitionDesc RelationGetPartitionDescExt(Relation rel,
+												 Snapshot omit_detached_snapshot);
 
-extern PartitionDirectory CreatePartitionDirectory(MemoryContext mcxt, bool omit_detached);
+extern PartitionDirectory CreatePartitionDirectory(MemoryContext mcxt,
+												   Snapshot omit_detached_snapshot);
 extern PartitionDesc PartitionDirectoryLookup(PartitionDirectory, Relation);
 extern void DestroyPartitionDirectory(PartitionDirectory pdir);
 
-- 
2.43.0

v1-0002-Avoid-using-SPI-in-RI-trigger-functions.patchapplication/x-patch; name=v1-0002-Avoid-using-SPI-in-RI-trigger-functions.patchDownload
From dc49e938cb6a31034c974f33e268c0a16c778e9d Mon Sep 17 00:00:00 2001
From: Amit Langote <amitlan@postgresql.org>
Date: Thu, 19 Dec 2024 21:35:04 +0900
Subject: [PATCH v1 2/3] Avoid using SPI in RI trigger functions

Currently, ri_PlanCheck() uses SPI_prepare() to create an SPI plan
containing a CachedPlanSource for the SQL query used in RI checks.
Similarly, ri_PerformCheck() calls SPI_execute_snapshot() to
execute the query with a specific snapshot.

This commit introduces ri_PlanCreate() and ri_PlanExecute() to
replace SPI_prepare() and SPI_execute_snapshot(), respectively.

* ri_PlanCreate()

Creates an "RI plan" for a query using a caller-specified callback
function, such as ri_SqlStringPlanCreate(), which produces a
CachedPlanSource for the input SQL string. This mirrors SPI_prepare()
functionality.

* ri_PlanExecute()

Executes an "RI plan" using a callback saved within the "RI plan"
(struct RIPlan). For example, ri_SqlStringPlanExecute() fetches a
CachedPlan from the CachedPlanSource and executes its PlannedStmt
using the executor. Snapshot handling is now fully managed by
ri_PerformCheck(), eliminating dependence on SPI's snapshot logic.

These changes make ri_PlanCreate() and ri_PlanExecute() pluggable,
laying the groundwork for future commits to replace SQL-based RI
checks with optimized C functions for direct table/index scans.

Note:

This only addresses RI_* functions addressed from RI triggers and
not those called from tablecmds.c, such as RI_Initial_Check() and
RI_PartitionRemove_Check(), which still use SPI_prepare() and
SPI_execute_snapshot().

Reviewed-by: Robert Haas
Discussion: https://postgr.es/m/CA+HiwqGkfJfYdeq5vHPh6eqPKjSbfpDDY+j-kXYFePQedtSLeg@mail.gmail.com
Discussion: https://postgr.es/m/CA+HiwqG5e8pk8s7+7zhr1Nc_PGyhEdM5f=pHkMOdK1RYWXfJsg@mail.gmail.com
---
 src/backend/executor/spi.c          |   2 +-
 src/backend/utils/adt/ri_triggers.c | 595 +++++++++++++++++++++++-----
 2 files changed, 488 insertions(+), 109 deletions(-)

diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index c1d8fd08c6..1d5e6532bf 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -764,7 +764,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
  * end of the command.
  *
  * This is currently not documented in spi.sgml because it is only intended
- * for use by RI triggers.
+ * for use by some functions in ri_triggers.c.
  *
  * Passing snapshot == InvalidSnapshot will select the normal behavior of
  * fetching a new snapshot for each query.
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 3185f48afa..804a2a69e4 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -9,7 +9,7 @@
  *	across query and transaction boundaries, in fact they live as long as
  *	the backend does.  This works because the hashtable structures
  *	themselves are allocated by dynahash.c in its permanent DynaHashCxt,
- *	and the SPI plans they point to are saved using SPI_keepplan().
+ *	and the CachedPlanSources they point to are saved in CachedMemoryContext.
  *	There is not currently any provision for throwing away a no-longer-needed
  *	plan --- consider improving this someday.
  *
@@ -38,6 +38,9 @@
 #include "miscadmin.h"
 #include "parser/parse_coerce.h"
 #include "parser/parse_relation.h"
+#include "storage/bufmgr.h"
+#include "tcop/pquery.h"
+#include "tcop/utility.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/datum.h"
@@ -132,10 +135,55 @@ typedef struct RI_ConstraintInfo
 	dlist_node	valid_link;		/* Link in list of valid entries */
 } RI_ConstraintInfo;
 
+/* RI plan callback functions */
+struct RI_Plan;
+typedef void (*RI_PlanCreateFunc_type) (struct RI_Plan *plan, const char *querystr, int nargs, Oid *paramtypes);
+typedef int (*RI_PlanExecFunc_type) (struct RI_Plan *plan, Relation fk_rel, Relation pk_rel,
+									 Datum *param_vals, char *params_isnulls,
+									 Snapshot crosscheck_snapshot,
+									 int limit, CmdType *last_stmt_cmdtype);
+typedef bool (*RI_PlanIsValidFunc_type) (struct RI_Plan *plan);
+typedef void (*RI_PlanFreeFunc_type) (struct RI_Plan *plan);
+
+/*
+ * RI_Plan
+ *
+ * Information related to the implementation of a plan for a given RI query.
+ * ri_PlanCheck() makes and stores these in ri_query_cache.  The callers of
+ * ri_PlanCheck() specify a RI_PlanCreateFunc_type function to fill in the
+ * caller-specific implementation details such as the callback functions
+ * to create, validate, free a plan, and also the arguments necessary for
+ * the execution of the plan.
+ */
+typedef struct RI_Plan
+{
+	/*
+	 * Context under which this struct and its subsidiary data gets allocated.
+	 * It is made a child of CacheMemoryContext.
+	 */
+	MemoryContext	plancxt;
+
+	/* Query parameter types. */
+	int				nargs;
+	Oid			   *paramtypes;
+
+	/*
+	 * Set of functions specified by a RI trigger function to implement
+	 * the plan for the trigger's RI query.
+	 */
+	RI_PlanExecFunc_type plan_exec_func;	/* execute the plan */
+	void		   *plan_exec_arg;			/* execution argument, such as
+											 * a List of CachedPlanSource */
+	RI_PlanIsValidFunc_type plan_is_valid_func; /* check if the plan still
+												 * valid for ri_query_cache
+												 * to continue caching it */
+	RI_PlanFreeFunc_type plan_free_func;	/* release plan resources */
+} RI_Plan;
+
 /*
  * RI_QueryKey
  *
- * The key identifying a prepared SPI plan in our query hashtable
+ * The key identifying a plan in our query hashtable
  */
 typedef struct RI_QueryKey
 {
@@ -149,7 +197,7 @@ typedef struct RI_QueryKey
 typedef struct RI_QueryHashEntry
 {
 	RI_QueryKey key;
-	SPIPlanPtr	plan;
+	RI_Plan	   *plan;
 } RI_QueryHashEntry;
 
 /*
@@ -212,8 +260,8 @@ static bool ri_CompareWithCast(Oid eq_opr, Oid typeid, Oid collid,
 
 static void ri_InitHashTables(void);
 static void InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue);
-static SPIPlanPtr ri_FetchPreparedPlan(RI_QueryKey *key);
-static void ri_HashPreparedPlan(RI_QueryKey *key, SPIPlanPtr plan);
+static RI_Plan *ri_FetchPreparedPlan(RI_QueryKey *key);
+static void ri_HashPreparedPlan(RI_QueryKey *key, RI_Plan *plan);
 static RI_CompareHashEntry *ri_HashCompareOp(Oid eq_opr, Oid typeid);
 
 static void ri_CheckTrigger(FunctionCallInfo fcinfo, const char *funcname,
@@ -222,14 +270,15 @@ static const RI_ConstraintInfo *ri_FetchConstraintInfo(Trigger *trigger,
 													   Relation trig_rel, bool rel_is_pk);
 static const RI_ConstraintInfo *ri_LoadConstraintInfo(Oid constraintOid);
 static Oid	get_ri_constraint_root(Oid constrOid);
-static SPIPlanPtr ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
-							   RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel);
+static RI_Plan *ri_PlanCheck(RI_PlanCreateFunc_type plan_create_func,
+							 const char *querystr, int nargs, Oid *argtypes,
+							 RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel);
 static bool ri_PerformCheck(const RI_ConstraintInfo *riinfo,
-							RI_QueryKey *qkey, SPIPlanPtr qplan,
+							RI_QueryKey *qkey, RI_Plan *qplan,
 							Relation fk_rel, Relation pk_rel,
 							TupleTableSlot *oldslot, TupleTableSlot *newslot,
 							bool is_restrict,
-							bool detectNewRows, int expect_OK);
+							bool detectNewRows, int expected_cmdtype);
 static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
 							 const RI_ConstraintInfo *riinfo, bool rel_is_pk,
 							 Datum *vals, char *nulls);
@@ -237,6 +286,14 @@ static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 							   Relation pk_rel, Relation fk_rel,
 							   TupleTableSlot *violatorslot, TupleDesc tupdesc,
 							   int queryno, bool is_restrict, bool partgone) pg_attribute_noreturn();
+static void ri_SqlStringPlanCreate(RI_Plan *plan,
+					   const char *querystr, int nargs, Oid *paramtypes);
+static bool ri_SqlStringPlanIsValid(RI_Plan *plan);
+static int ri_SqlStringPlanExecute(RI_Plan *plan, Relation fk_rel, Relation pk_rel,
+						Datum *vals, char *nulls,
+						Snapshot crosscheck_snapshot,
+						int limit, CmdType *last_stmt_cmdtype);
+static void ri_SqlStringPlanFree(RI_Plan *plan);
 
 
 /*
@@ -252,7 +309,7 @@ RI_FKey_check(TriggerData *trigdata)
 	Relation	pk_rel;
 	TupleTableSlot *newslot;
 	RI_QueryKey qkey;
-	SPIPlanPtr	qplan;
+	RI_Plan	   *qplan;
 
 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
 									trigdata->tg_relation, false);
@@ -349,8 +406,6 @@ RI_FKey_check(TriggerData *trigdata)
 			break;
 	}
 
-	SPI_connect();
-
 	/* Fetch or prepare a saved plan for the real check */
 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK);
 
@@ -435,8 +490,9 @@ RI_FKey_check(TriggerData *trigdata)
 			appendStringInfo(&querybuf, "(x1.r)");
 		}
 
-		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
+		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
 
@@ -452,10 +508,7 @@ RI_FKey_check(TriggerData *trigdata)
 					NULL, newslot,
 					false,
 					pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE,
-					SPI_OK_SELECT);
-
-	if (SPI_finish() != SPI_OK_FINISH)
-		elog(ERROR, "SPI_finish failed");
+					CMD_SELECT);
 
 	table_close(pk_rel, RowShareLock);
 
@@ -510,15 +563,13 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 				  TupleTableSlot *oldslot,
 				  const RI_ConstraintInfo *riinfo)
 {
-	SPIPlanPtr	qplan;
+	RI_Plan	   *qplan;
 	RI_QueryKey qkey;
 	bool		result;
 
 	/* Only called for non-null rows */
 	Assert(ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) == RI_KEYS_NONE_NULL);
 
-	SPI_connect();
-
 	/*
 	 * Fetch or prepare a saved plan for checking PK table with values coming
 	 * from a PK row
@@ -604,8 +655,9 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 			appendStringInfo(&querybuf, "(x1.r)");
 		}
 
-		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
+		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
 
@@ -617,10 +669,7 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 							 oldslot, NULL,
 							 false,
 							 true,	/* treat like update */
-							 SPI_OK_SELECT);
-
-	if (SPI_finish() != SPI_OK_FINISH)
-		elog(ERROR, "SPI_finish failed");
+							 CMD_SELECT);
 
 	return result;
 }
@@ -714,7 +763,7 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
 	Relation	pk_rel;
 	TupleTableSlot *oldslot;
 	RI_QueryKey qkey;
-	SPIPlanPtr	qplan;
+	RI_Plan	   *qplan;
 
 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
 									trigdata->tg_relation, true);
@@ -742,8 +791,6 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
 		return PointerGetDatum(NULL);
 	}
 
-	SPI_connect();
-
 	/*
 	 * Fetch or prepare a saved plan for the restrict lookup (it's the same
 	 * query for delete and update cases)
@@ -792,8 +839,9 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
 		}
 		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
 
-		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
+		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
 
@@ -805,10 +853,7 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
 					oldslot, NULL,
 					!is_no_action,
 					true,		/* must detect new rows */
-					SPI_OK_SELECT);
-
-	if (SPI_finish() != SPI_OK_FINISH)
-		elog(ERROR, "SPI_finish failed");
+					CMD_SELECT);
 
 	table_close(fk_rel, RowShareLock);
 
@@ -830,7 +875,7 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
 	Relation	pk_rel;
 	TupleTableSlot *oldslot;
 	RI_QueryKey qkey;
-	SPIPlanPtr	qplan;
+	RI_Plan	   *qplan;
 
 	/* Check that this is a valid trigger call on the right time and event. */
 	ri_CheckTrigger(fcinfo, "RI_FKey_cascade_del", RI_TRIGTYPE_DELETE);
@@ -848,8 +893,6 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
 	pk_rel = trigdata->tg_relation;
 	oldslot = trigdata->tg_trigslot;
 
-	SPI_connect();
-
 	/* Fetch or prepare a saved plan for the cascaded delete */
 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CASCADE_ONDELETE);
 
@@ -893,8 +936,9 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
 			queryoids[i] = pk_type;
 		}
 
-		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
+		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
 
@@ -907,10 +951,7 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
 					oldslot, NULL,
 					false,
 					true,		/* must detect new rows */
-					SPI_OK_DELETE);
-
-	if (SPI_finish() != SPI_OK_FINISH)
-		elog(ERROR, "SPI_finish failed");
+					CMD_DELETE);
 
 	table_close(fk_rel, RowExclusiveLock);
 
@@ -933,7 +974,7 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
 	TupleTableSlot *newslot;
 	TupleTableSlot *oldslot;
 	RI_QueryKey qkey;
-	SPIPlanPtr	qplan;
+	RI_Plan	   *qplan;
 
 	/* Check that this is a valid trigger call on the right time and event. */
 	ri_CheckTrigger(fcinfo, "RI_FKey_cascade_upd", RI_TRIGTYPE_UPDATE);
@@ -953,8 +994,6 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
 	newslot = trigdata->tg_newslot;
 	oldslot = trigdata->tg_trigslot;
 
-	SPI_connect();
-
 	/* Fetch or prepare a saved plan for the cascaded update */
 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CASCADE_ONUPDATE);
 
@@ -1011,8 +1050,9 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
 		}
 		appendBinaryStringInfo(&querybuf, qualbuf.data, qualbuf.len);
 
-		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys * 2, queryoids,
+		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
+		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+							 querybuf.data, riinfo->nkeys * 2, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
 
@@ -1024,10 +1064,7 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
 					oldslot, newslot,
 					false,
 					true,		/* must detect new rows */
-					SPI_OK_UPDATE);
-
-	if (SPI_finish() != SPI_OK_FINISH)
-		elog(ERROR, "SPI_finish failed");
+					CMD_UPDATE);
 
 	table_close(fk_rel, RowExclusiveLock);
 
@@ -1109,7 +1146,7 @@ ri_set(TriggerData *trigdata, bool is_set_null, int tgkind)
 	Relation	pk_rel;
 	TupleTableSlot *oldslot;
 	RI_QueryKey qkey;
-	SPIPlanPtr	qplan;
+	RI_Plan	   *qplan;
 	int32		queryno;
 
 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
@@ -1125,8 +1162,6 @@ ri_set(TriggerData *trigdata, bool is_set_null, int tgkind)
 	pk_rel = trigdata->tg_relation;
 	oldslot = trigdata->tg_trigslot;
 
-	SPI_connect();
-
 	/*
 	 * Fetch or prepare a saved plan for the trigger.
 	 */
@@ -1239,8 +1274,9 @@ ri_set(TriggerData *trigdata, bool is_set_null, int tgkind)
 			queryoids[i] = pk_type;
 		}
 
-		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
+		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
 
@@ -1252,10 +1288,7 @@ ri_set(TriggerData *trigdata, bool is_set_null, int tgkind)
 					oldslot, NULL,
 					false,
 					true,		/* must detect new rows */
-					SPI_OK_UPDATE);
-
-	if (SPI_finish() != SPI_OK_FINISH)
-		elog(ERROR, "SPI_finish failed");
+					CMD_UPDATE);
 
 	table_close(fk_rel, RowExclusiveLock);
 
@@ -1445,7 +1478,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	int			save_nestlevel;
 	char		workmembuf[32];
 	int			spi_result;
-	SPIPlanPtr	qplan;
+	SPIPlanPtr  qplan;
 
 	riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
 
@@ -2034,7 +2067,7 @@ ri_GenerateQualCollation(StringInfo buf, Oid collation)
 /* ----------
  * ri_BuildQueryKey -
  *
- *	Construct a hashtable key for a prepared SPI plan of an FK constraint.
+ *	Construct a hashtable key for a plan of an FK constraint.
  *
  *		key: output argument, *key is filled in based on the other arguments
  *		riinfo: info derived from pg_constraint entry
@@ -2053,9 +2086,9 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
 	 * the FK constraint (i.e., not the table on which the trigger has been
 	 * fired), and so it will be the same for all members of the inheritance
 	 * tree.  So we may use the root constraint's OID in the hash key, rather
-	 * than the constraint's own OID.  This avoids creating duplicate SPI
-	 * plans, saving lots of work and memory when there are many partitions
-	 * with similar FK constraints.
+	 * than the constraint's own OID.  This avoids creating duplicate plans,
+	 * saving lots of work and memory when there are many partitions with
+	 * similar FK constraints.
 	 *
 	 * (Note that we must still have a separate RI_ConstraintInfo for each
 	 * constraint, because partitions can have different column orders,
@@ -2342,15 +2375,366 @@ InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue)
 	}
 }
 
+/* Query string or an equivalent name to show in the error CONTEXT. */
+typedef struct RIErrorCallbackArg
+{
+	const char *query;
+} RIErrorCallbackArg;
+
+/*
+ * _RI_error_callback
+ *
+ * Add context information when a query being processed with ri_CreatePlan()
+ * or ri_PlanExecute() fails.
+ */
+static void
+_RI_error_callback(void *arg)
+{
+	RIErrorCallbackArg *carg = (RIErrorCallbackArg *) arg;
+	const char *query = carg->query;
+	int			syntaxerrposition;
+
+	Assert(query != NULL);
+
+	/*
+	 * If there is a syntax error position, convert to internal syntax error;
+	 * otherwise treat the query as an item of context stack
+	 */
+	syntaxerrposition = geterrposition();
+	if (syntaxerrposition > 0)
+	{
+		errposition(0);
+		internalerrposition(syntaxerrposition);
+		internalerrquery(query);
+	}
+	else
+		errcontext("SQL statement \"%s\"", query);
+}
+
+/*
+ * This creates a plan for a query written in SQL.
+ *
+ * The main product is a list of CachedPlanSource for each of the queries
+ * resulting from the provided query's rewrite that is saved to
+ * plan->plan_exec_arg.
+ */
+static void
+ri_SqlStringPlanCreate(RI_Plan *plan,
+					   const char *querystr, int nargs, Oid *paramtypes)
+{
+	List	   *raw_parsetree_list;
+	List	   *plancache_list = NIL;
+	ListCell   *list_item;
+	RIErrorCallbackArg ricallbackarg;
+	ErrorContextCallback rierrcontext;
+
+	Assert(querystr != NULL);
+
+	/*
+	 * Setup error traceback support for ereport()
+	 */
+	ricallbackarg.query = querystr;
+	rierrcontext.callback = _RI_error_callback;
+	rierrcontext.arg = &ricallbackarg;
+	rierrcontext.previous = error_context_stack;
+	error_context_stack = &rierrcontext;
+
+	/*
+	 * Parse the request string into a list of raw parse trees.
+	 */
+	raw_parsetree_list = raw_parser(querystr, RAW_PARSE_DEFAULT);
+
+	/*
+	 * Do parse analysis and rule rewrite for each raw parsetree, storing the
+	 * results into unsaved plancache entries.
+	 */
+	plancache_list = NIL;
+
+	foreach(list_item, raw_parsetree_list)
+	{
+		RawStmt    *parsetree = lfirst_node(RawStmt, list_item);
+		List	   *stmt_list;
+		CachedPlanSource *plansource;
+
+		/*
+		 * Create the CachedPlanSource before we do parse analysis, since it
+		 * needs to see the unmodified raw parse tree.
+		 */
+		plansource = CreateCachedPlan(parsetree, querystr,
+									  CreateCommandTag(parsetree->stmt));
+
+		stmt_list = pg_analyze_and_rewrite_fixedparams(parsetree, querystr,
+													   paramtypes, nargs,
+													   NULL);
+
+		/* Finish filling in the CachedPlanSource */
+		CompleteCachedPlan(plansource,
+						   stmt_list,
+						   NULL,
+						   paramtypes, nargs,
+						   NULL, NULL, 0,
+						   false);	/* not fixed result */
+
+		SaveCachedPlan(plansource);
+		plancache_list = lappend(plancache_list, plansource);
+	}
+
+	plan->plan_exec_func = ri_SqlStringPlanExecute;
+	plan->plan_exec_arg = (void *) plancache_list;
+	plan->plan_is_valid_func = ri_SqlStringPlanIsValid;
+	plan->plan_free_func = ri_SqlStringPlanFree;
+
+	/*
+	 * Pop the error context stack
+	 */
+	error_context_stack = rierrcontext.previous;
+}
+
+/*
+ * This executes the plan after creating a CachedPlan for each
+ * CachedPlanSource found stored in plan->plan_exec_arg using given
+ * parameter values.
+ *
+ * Return value is the number of tuples returned by the "last" CachedPlan.
+ */
+static int
+ri_SqlStringPlanExecute(RI_Plan *plan, Relation fk_rel, Relation pk_rel,
+						Datum *param_vals, char *param_isnulls,
+						Snapshot crosscheck_snapshot,
+						int limit, CmdType *last_stmt_cmdtype)
+{
+	List   *plancache_list = (List *) plan->plan_exec_arg;
+	ListCell   *lc;
+	CachedPlan *cplan;
+	ResourceOwner plan_owner;
+	int			tuples_processed = 0;	/* appease compiler */
+	ParamListInfo paramLI;
+	RIErrorCallbackArg ricallbackarg;
+	ErrorContextCallback rierrcontext;
+
+	Assert(list_length(plancache_list) > 0);
+
+	/*
+	 * Setup error traceback support for ereport()
+	 */
+	ricallbackarg.query = NULL;		/* will be filled below */
+	rierrcontext.callback = _RI_error_callback;
+	rierrcontext.arg = &ricallbackarg;
+	rierrcontext.previous = error_context_stack;
+	error_context_stack = &rierrcontext;
+
+	/*
+	 * Convert the parameters into a format that the planner and the executor
+	 * expect them to be in.
+	 */
+	if (plan->nargs > 0)
+	{
+		paramLI = makeParamList(plan->nargs);
+
+		for (int i = 0; i < plan->nargs; i++)
+		{
+			ParamExternData *prm = &paramLI->params[i];
+
+			prm->value = param_vals[i];
+			prm->isnull = (param_isnulls && param_isnulls[i] == 'n');
+			prm->pflags = PARAM_FLAG_CONST;
+			prm->ptype = plan->paramtypes[i];
+		}
+	}
+	else
+		paramLI = NULL;
+
+	plan_owner = CurrentResourceOwner; /* XXX - why? */
+	foreach(lc, plancache_list)
+	{
+		CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc);
+		List	   *stmt_list;
+		ListCell   *lc2;
+
+		ricallbackarg.query = plansource->query_string;
+
+		/*
+		 * Replan if needed, and increment plan refcount.  If it's a saved
+		 * plan, the refcount must be backed by the plan_owner.
+		 */
+		cplan = GetCachedPlan(plansource, paramLI, plan_owner, NULL);
+
+		stmt_list = cplan->stmt_list;
+
+		foreach(lc2, stmt_list)
+		{
+			PlannedStmt *stmt = lfirst_node(PlannedStmt, lc2);
+			DestReceiver *dest;
+			QueryDesc  *qdesc;
+			int			eflags;
+
+			*last_stmt_cmdtype = stmt->commandType;
+
+			/*
+			 * Advance the command counter before each command and update the
+			 * snapshot.
+			 */
+			CommandCounterIncrement();
+			UpdateActiveSnapshotCommandId();
+
+			dest = CreateDestReceiver(DestNone);
+			qdesc = CreateQueryDesc(stmt, plansource->query_string,
+									GetActiveSnapshot(), crosscheck_snapshot,
+									dest, paramLI, NULL, 0);
+
+			/* Select execution options */
+			eflags = EXEC_FLAG_SKIP_TRIGGERS;
+			ExecutorStart(qdesc, eflags);
+			ExecutorRun(qdesc, ForwardScanDirection, limit);
+
+			/* We return the last executed statement's value. */
+			tuples_processed = qdesc->estate->es_processed;
+
+			ExecutorFinish(qdesc);
+			ExecutorEnd(qdesc);
+			FreeQueryDesc(qdesc);
+		}
+
+		/* Done with this plan, so release refcount */
+		ReleaseCachedPlan(cplan, CurrentResourceOwner);
+		cplan = NULL;
+	}
+
+	Assert(cplan == NULL);
+
+	/*
+	 * Pop the error context stack
+	 */
+	error_context_stack = rierrcontext.previous;
+
+	return tuples_processed;
+}
+
+/*
+ * Have any of the CachedPlanSources been invalidated since being created?
+ */
+static bool
+ri_SqlStringPlanIsValid(RI_Plan *plan)
+{
+	List   *plancache_list = (List *) plan->plan_exec_arg;
+	ListCell *lc;
+
+	foreach(lc, plancache_list)
+	{
+		CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc);
+
+		if (!CachedPlanIsValid(plansource))
+			return false;
+	}
+	return true;
+}
+
+/* Release CachedPlanSources and associated CachedPlans if any.*/
+static void
+ri_SqlStringPlanFree(RI_Plan *plan)
+{
+	List   *plancache_list = (List *) plan->plan_exec_arg;
+	ListCell *lc;
+
+	foreach(lc, plancache_list)
+	{
+		CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc);
+
+		DropCachedPlan(plansource);
+	}
+}
+
+/*
+ * Create an RI_Plan for a given RI check query and initialize the
+ * plan callbacks and execution argument using the caller specified
+ * function.
+ */
+static RI_Plan *
+ri_PlanCreate(RI_PlanCreateFunc_type plan_create_func,
+			  const char *querystr, int nargs, Oid *paramtypes)
+{
+	RI_Plan	   *plan;
+	MemoryContext plancxt,
+				oldcxt;
+
+	/*
+	 * Create a memory context for the plan underneath CurrentMemoryContext,
+	 * which is reparented later to be underneath CacheMemoryContext;
+	 */
+	plancxt = AllocSetContextCreate(CurrentMemoryContext,
+									"RI Plan",
+									ALLOCSET_SMALL_SIZES);
+	oldcxt = MemoryContextSwitchTo(plancxt);
+	plan = (RI_Plan *) palloc0(sizeof(*plan));
+	plan->plancxt = plancxt;
+	plan->nargs = nargs;
+	if (plan->nargs > 0)
+	{
+		plan->paramtypes = (Oid *) palloc(plan->nargs * sizeof(Oid));
+		memcpy(plan->paramtypes, paramtypes, plan->nargs * sizeof(Oid));
+	}
+
+	plan_create_func(plan, querystr, nargs, paramtypes);
+
+	MemoryContextSetParent(plan->plancxt, CacheMemoryContext);
+	MemoryContextSwitchTo(oldcxt);
+
+	return plan;
+}
+
+/*
+ * Execute the plan by calling plan_exec_func().
+ *
+ * Returns the number of tuples obtained by executing the plan; the caller
+ * typically wants to checks if at least 1 row was returned.
+ *
+ * *last_stmt_cmdtype is set to the CmdType of the last operation performed
+ * by executing the plan, which may consist of more than 1 executable
+ * statements if, for example, any rules belonging to the tables mentioned in
+ * the original query added additional operations.
+ */
+static int
+ri_PlanExecute(RI_Plan *plan, Relation fk_rel, Relation pk_rel,
+			   Datum *param_vals, char *param_isnulls,
+			   Snapshot crosscheck_snapshot,
+			   int limit, CmdType *last_stmt_cmdtype)
+{
+	Assert(ActiveSnapshotSet());
+	return plan->plan_exec_func(plan, fk_rel, pk_rel,
+								param_vals, param_isnulls,
+								crosscheck_snapshot,
+								limit, last_stmt_cmdtype);
+}
+
+/*
+ * Is the plan still valid to continue caching?
+ */
+static bool
+ri_PlanIsValid(RI_Plan *plan)
+{
+	return plan->plan_is_valid_func(plan);
+}
+
+/* Release plan resources. */
+static void
+ri_FreePlan(RI_Plan *plan)
+{
+	/* First call the implementation specific release function. */
+	plan->plan_free_func(plan);
+
+	/* Now get rid of the RI_plan and subsidiary data in its plancxt */
+	MemoryContextDelete(plan->plancxt);
+}
 
 /*
  * Prepare execution plan for a query to enforce an RI restriction
  */
-static SPIPlanPtr
-ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
+static RI_Plan *
+ri_PlanCheck(RI_PlanCreateFunc_type plan_create_func,
+			 const char *querystr, int nargs, Oid *argtypes,
 			 RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel)
 {
-	SPIPlanPtr	qplan;
+	RI_Plan	   *qplan;
 	Relation	query_rel;
 	Oid			save_userid;
 	int			save_sec_context;
@@ -2369,18 +2753,12 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
 	SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner,
 						   save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
 						   SECURITY_NOFORCE_RLS);
-
 	/* Create the plan */
-	qplan = SPI_prepare(querystr, nargs, argtypes);
-
-	if (qplan == NULL)
-		elog(ERROR, "SPI_prepare returned %s for %s", SPI_result_code_string(SPI_result), querystr);
+	qplan = ri_PlanCreate(plan_create_func, querystr, nargs, argtypes);
 
 	/* Restore UID and security context */
 	SetUserIdAndSecContext(save_userid, save_sec_context);
 
-	/* Save the plan */
-	SPI_keepplan(qplan);
 	ri_HashPreparedPlan(qkey, qplan);
 
 	return qplan;
@@ -2391,23 +2769,23 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
  */
 static bool
 ri_PerformCheck(const RI_ConstraintInfo *riinfo,
-				RI_QueryKey *qkey, SPIPlanPtr qplan,
+				RI_QueryKey *qkey, RI_Plan *qplan,
 				Relation fk_rel, Relation pk_rel,
 				TupleTableSlot *oldslot, TupleTableSlot *newslot,
 				bool is_restrict,
-				bool detectNewRows, int expect_OK)
+				bool detectNewRows, int expected_cmdtype)
 {
 	Relation	query_rel,
 				source_rel;
 	bool		source_is_pk;
-	Snapshot	test_snapshot;
 	Snapshot	crosscheck_snapshot;
 	int			limit;
-	int			spi_result;
+	int			tuples_processed;
 	Oid			save_userid;
 	int			save_sec_context;
 	Datum		vals[RI_MAX_NUMKEYS * 2];
 	char		nulls[RI_MAX_NUMKEYS * 2];
+	CmdType		last_stmt_cmdtype;
 
 	/*
 	 * Use the query type code to determine whether the query is run against
@@ -2458,30 +2836,34 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 	 * the caller passes detectNewRows == false then it's okay to do the query
 	 * with the transaction snapshot; otherwise we use a current snapshot, and
 	 * tell the executor to error out if it finds any rows under the current
-	 * snapshot that wouldn't be visible per the transaction snapshot.  Note
-	 * that SPI_execute_snapshot will register the snapshots, so we don't need
-	 * to bother here.
+	 * snapshot that wouldn't be visible per the transaction snapshot.
+	 *
+	 * Also push the chosen snapshot so that anyplace that wants to use it
+	 * can get it by calling GetActiveSnapshot().
 	 */
 	if (IsolationUsesXactSnapshot() && detectNewRows)
 	{
-		CommandCounterIncrement();	/* be sure all my own work is visible */
-		test_snapshot = GetLatestSnapshot();
 		crosscheck_snapshot = GetTransactionSnapshot();
+		/* Make sure we have a private copy of the snapshot to modify. */
+		PushCopiedSnapshot(GetLatestSnapshot());
 	}
 	else
 	{
-		/* the default SPI behavior is okay */
-		test_snapshot = InvalidSnapshot;
 		crosscheck_snapshot = InvalidSnapshot;
+		PushActiveSnapshot(GetTransactionSnapshot());
 	}
 
+	/* Also advance the command counter and update the snapshot. */
+	CommandCounterIncrement();
+	UpdateActiveSnapshotCommandId();
+
 	/*
 	 * If this is a select query (e.g., for a 'no action' or 'restrict'
 	 * trigger), we only need to see if there is a single row in the table,
 	 * matching the key.  Otherwise, limit = 0 - because we want the query to
 	 * affect ALL the matching rows.
 	 */
-	limit = (expect_OK == SPI_OK_SELECT) ? 1 : 0;
+	limit = (expected_cmdtype == CMD_SELECT) ? 1 : 0;
 
 	/* Switch to proper UID to perform check as */
 	GetUserIdAndSecContext(&save_userid, &save_sec_context);
@@ -2490,19 +2872,16 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 						   SECURITY_NOFORCE_RLS);
 
 	/* Finally we can run the query. */
-	spi_result = SPI_execute_snapshot(qplan,
-									  vals, nulls,
-									  test_snapshot, crosscheck_snapshot,
-									  false, false, limit);
+	tuples_processed = ri_PlanExecute(qplan, fk_rel, pk_rel, vals, nulls,
+									  crosscheck_snapshot,
+									  limit, &last_stmt_cmdtype);
 
 	/* Restore UID and security context */
 	SetUserIdAndSecContext(save_userid, save_sec_context);
 
-	/* Check result */
-	if (spi_result < 0)
-		elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
+	PopActiveSnapshot();
 
-	if (expect_OK >= 0 && spi_result != expect_OK)
+	if (last_stmt_cmdtype != expected_cmdtype)
 		ereport(ERROR,
 				(errcode(ERRCODE_INTERNAL_ERROR),
 				 errmsg("referential integrity query on \"%s\" from constraint \"%s\" on \"%s\" gave unexpected result",
@@ -2513,15 +2892,15 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 
 	/* XXX wouldn't it be clearer to do this part at the caller? */
 	if (qkey->constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK &&
-		expect_OK == SPI_OK_SELECT &&
-		(SPI_processed == 0) == (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK))
+		expected_cmdtype == CMD_SELECT &&
+		(tuples_processed == 0) == (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK))
 		ri_ReportViolation(riinfo,
 						   pk_rel, fk_rel,
 						   newslot ? newslot : oldslot,
 						   NULL,
 						   qkey->constr_queryno, is_restrict, false);
 
-	return SPI_processed != 0;
+	return tuples_processed != 0;
 }
 
 /*
@@ -2798,14 +3177,14 @@ ri_InitHashTables(void)
 /*
  * ri_FetchPreparedPlan -
  *
- * Lookup for a query key in our private hash table of prepared
- * and saved SPI execution plans. Return the plan if found or NULL.
+ * Lookup for a query key in our private hash table of saved RI plans.
+ * Return the plan if found or NULL.
  */
-static SPIPlanPtr
+static RI_Plan *
 ri_FetchPreparedPlan(RI_QueryKey *key)
 {
 	RI_QueryHashEntry *entry;
-	SPIPlanPtr	plan;
+	RI_Plan *plan;
 
 	/*
 	 * On the first call initialize the hashtable
@@ -2833,7 +3212,7 @@ ri_FetchPreparedPlan(RI_QueryKey *key)
 	 * locked both FK and PK rels.
 	 */
 	plan = entry->plan;
-	if (plan && SPI_plan_is_valid(plan))
+	if (plan && ri_PlanIsValid(plan))
 		return plan;
 
 	/*
@@ -2842,7 +3221,7 @@ ri_FetchPreparedPlan(RI_QueryKey *key)
 	 */
 	entry->plan = NULL;
 	if (plan)
-		SPI_freeplan(plan);
+		ri_FreePlan(plan);
 
 	return NULL;
 }
@@ -2854,7 +3233,7 @@ ri_FetchPreparedPlan(RI_QueryKey *key)
  * Add another plan to our private SPI query plan hashtable.
  */
 static void
-ri_HashPreparedPlan(RI_QueryKey *key, SPIPlanPtr plan)
+ri_HashPreparedPlan(RI_QueryKey *key, RI_Plan *plan)
 {
 	RI_QueryHashEntry *entry;
 	bool		found;
-- 
2.43.0

#2Amit Langote
amitlangote09@gmail.com
In reply to: Amit Langote (#1)
Re: Eliminating SPI / SQL from some RI triggers - take 3

On Fri, Dec 20, 2024 at 1:23 PM Amit Langote <amitlangote09@gmail.com> wrote:

We discussed $subject at [1] and [2] and I'd like to continue that
work with the hope to commit some part of it for v18.

I did not get a chance to do any further work on this in this cycle,
but plan to start working on it after beta release, so moving this to
the next CF. I will post a rebased patch after the freeze to keep the
bots green for now.

--
Thanks, Amit Langote

#3Amit Langote
amitlangote09@gmail.com
In reply to: Amit Langote (#2)
Re: Eliminating SPI / SQL from some RI triggers - take 3

On Thu, Apr 3, 2025 at 7:19 PM Amit Langote <amitlangote09@gmail.com> wrote:

On Fri, Dec 20, 2024 at 1:23 PM Amit Langote <amitlangote09@gmail.com> wrote:

We discussed $subject at [1] and [2] and I'd like to continue that
work with the hope to commit some part of it for v18.

I did not get a chance to do any further work on this in this cycle,
but plan to start working on it after beta release, so moving this to
the next CF. I will post a rebased patch after the freeze to keep the
bots green for now.

Sorry for the inactivity. I've moved the patch entry in the CF app to
PG19-Drafts, since I don't plan to work on it myself in the immediate
future. However, Junwang Zhao has expressed interest in taking this
work forward, and I look forward to working with him on it.

--
Thanks, Amit Langote

#4Pavel Stehule
pavel.stehule@gmail.com
In reply to: Amit Langote (#3)
Re: Eliminating SPI / SQL from some RI triggers - take 3

Hi

út 21. 10. 2025 v 6:07 odesílatel Amit Langote <amitlangote09@gmail.com>
napsal:

On Thu, Apr 3, 2025 at 7:19 PM Amit Langote <amitlangote09@gmail.com>
wrote:

On Fri, Dec 20, 2024 at 1:23 PM Amit Langote <amitlangote09@gmail.com>

wrote:

We discussed $subject at [1] and [2] and I'd like to continue that
work with the hope to commit some part of it for v18.

I did not get a chance to do any further work on this in this cycle,
but plan to start working on it after beta release, so moving this to
the next CF. I will post a rebased patch after the freeze to keep the
bots green for now.

Sorry for the inactivity. I've moved the patch entry in the CF app to
PG19-Drafts, since I don't plan to work on it myself in the immediate
future. However, Junwang Zhao has expressed interest in taking this
work forward, and I look forward to working with him on it.

This is very interesting and important feature - I can help with testing
and review if it will be necessary

Regards

Pavel

Show quoted text

--
Thanks, Amit Langote

#5Amit Langote
amitlangote09@gmail.com
In reply to: Pavel Stehule (#4)
Re: Eliminating SPI / SQL from some RI triggers - take 3

.
On Tue, Oct 21, 2025 at 2:10 PM Pavel Stehule <pavel.stehule@gmail.com> wrote:

út 21. 10. 2025 v 6:07 odesílatel Amit Langote <amitlangote09@gmail.com> napsal:

On Thu, Apr 3, 2025 at 7:19 PM Amit Langote <amitlangote09@gmail.com> wrote:

On Fri, Dec 20, 2024 at 1:23 PM Amit Langote <amitlangote09@gmail.com> wrote:

We discussed $subject at [1] and [2] and I'd like to continue that
work with the hope to commit some part of it for v18.

I did not get a chance to do any further work on this in this cycle,
but plan to start working on it after beta release, so moving this to
the next CF. I will post a rebased patch after the freeze to keep the
bots green for now.

Sorry for the inactivity. I've moved the patch entry in the CF app to
PG19-Drafts, since I don't plan to work on it myself in the immediate
future. However, Junwang Zhao has expressed interest in taking this
work forward, and I look forward to working with him on it.

This is very interesting and important feature - I can help with testing and review if it will be necessary

Thanks for the interest.

Just to add a quick note on the current direction I’ve been discussing
off-list with Junwang:

The next iteration of this work will likely follow a hybrid "fast-path
+ fallback" design rather than the original pure fast-path approach.
The idea is to keep the optimization for straightforward cases where
the foreign key and referenced key can be verified by a direct index
probe, while falling back to the existing SPI path only when the
runtime behavior of the executor is non-trivial to replicate -- such
as visibility rechecks under concurrent updates -- or when the
constraint itself involves richer semantics, like temporal foreign
keys that require range and aggregation logic. That keeps the
optimization safe without changing the meaning of constraint
enforcement.

This direction comes partly in response to the feedback from Robert
and Tom in the earlier Eliminating SPI threads, who raised concerns
that a fast path might silently diverge from what the executor does at
runtime in subtle cases. The fallback design aims to address that
directly: it keeps the optimization where it’s clearly safe, but
defers to the existing SPI-based implementation whenever correctness
might depend on executor behavior that would otherwise be difficult or
risky to reproduce locally.

In practice, this means adding a guarded fast path that performs the
index probe and tuple lock directly under the same snapshot and
security context that SPI would use, while caching stable metadata
such as index descriptors, scan keys, and operator information per
constraint or per statement. The fallback to SPI remains for the few
cases that either depend on executor behavior or need features beyond
a simple index probe:

* Concurrent updates or deletes: If table_tuple_lock() reports that
the target tuple was updated or deleted, we delegate to the SPI path
so that EvalPlanQual and visibility rules are applied as today.

* Partitioned parents: Skipped in v1 for simplicity, since they
require routing the probe through the correct partition using
PartitionDirectory. This can be added later as a separate patch once
the core mechanism is stable.

* Temporal foreign keys: These use range overlap and containment
semantics (&&, <@, range_agg()) that inherently involve aggregation
and multiple-row reasoning, so they stay on the SPI path.

Everything else -- multi-column keys, cross-type equality supported by
the index opfamily, collation matching, and RLS/ACL enforcement --
will be handled directly in the fast path. The security behavior will
mirror the existing SPI path by temporarily switching to the parent
table's owner with SECURITY_LOCAL_USERID_CHANGE | SECURITY_NOFORCE_RLS
around the probe, like ri_PerformCheck() does.

For concurrency, the fast path locks the located parent tuple with
LockTupleKeyShare under GetActiveSnapshot(). If that succeeds (TM_Ok),
the check passes immediately. While non-TM_Ok cases fall back for now,
a later refinement could follow the update chain with
table_tuple_fetch_row_version() under the current snapshot and re-lock
the visible version, making the fast path fully self-contained.

That’s the direction Junwang and I plan to explore next.

--
Thanks, Amit Langote

#6Junwang Zhao
zhjwpku@gmail.com
In reply to: Amit Langote (#5)
2 attachment(s)
Re: Eliminating SPI / SQL from some RI triggers - take 3

Hi,

On Wed, Oct 22, 2025 at 9:56 PM Amit Langote <amitlangote09@gmail.com> wrote:

.
On Tue, Oct 21, 2025 at 2:10 PM Pavel Stehule <pavel.stehule@gmail.com> wrote:

út 21. 10. 2025 v 6:07 odesílatel Amit Langote <amitlangote09@gmail.com> napsal:

On Thu, Apr 3, 2025 at 7:19 PM Amit Langote <amitlangote09@gmail.com> wrote:

On Fri, Dec 20, 2024 at 1:23 PM Amit Langote <amitlangote09@gmail.com> wrote:

We discussed $subject at [1] and [2] and I'd like to continue that
work with the hope to commit some part of it for v18.

I did not get a chance to do any further work on this in this cycle,
but plan to start working on it after beta release, so moving this to
the next CF. I will post a rebased patch after the freeze to keep the
bots green for now.

Sorry for the inactivity. I've moved the patch entry in the CF app to
PG19-Drafts, since I don't plan to work on it myself in the immediate
future. However, Junwang Zhao has expressed interest in taking this
work forward, and I look forward to working with him on it.

This is very interesting and important feature - I can help with testing and review if it will be necessary

Thanks for the interest.

Just to add a quick note on the current direction I’ve been discussing
off-list with Junwang:

The next iteration of this work will likely follow a hybrid "fast-path
+ fallback" design rather than the original pure fast-path approach.
The idea is to keep the optimization for straightforward cases where
the foreign key and referenced key can be verified by a direct index
probe, while falling back to the existing SPI path only when the
runtime behavior of the executor is non-trivial to replicate -- such
as visibility rechecks under concurrent updates -- or when the
constraint itself involves richer semantics, like temporal foreign
keys that require range and aggregation logic. That keeps the
optimization safe without changing the meaning of constraint
enforcement.

This direction comes partly in response to the feedback from Robert
and Tom in the earlier Eliminating SPI threads, who raised concerns
that a fast path might silently diverge from what the executor does at
runtime in subtle cases. The fallback design aims to address that
directly: it keeps the optimization where it’s clearly safe, but
defers to the existing SPI-based implementation whenever correctness
might depend on executor behavior that would otherwise be difficult or
risky to reproduce locally.

In practice, this means adding a guarded fast path that performs the
index probe and tuple lock directly under the same snapshot and
security context that SPI would use, while caching stable metadata
such as index descriptors, scan keys, and operator information per
constraint or per statement. The fallback to SPI remains for the few
cases that either depend on executor behavior or need features beyond
a simple index probe:

* Concurrent updates or deletes: If table_tuple_lock() reports that
the target tuple was updated or deleted, we delegate to the SPI path
so that EvalPlanQual and visibility rules are applied as today.

* Partitioned parents: Skipped in v1 for simplicity, since they
require routing the probe through the correct partition using
PartitionDirectory. This can be added later as a separate patch once
the core mechanism is stable.

* Temporal foreign keys: These use range overlap and containment
semantics (&&, <@, range_agg()) that inherently involve aggregation
and multiple-row reasoning, so they stay on the SPI path.

Everything else -- multi-column keys, cross-type equality supported by
the index opfamily, collation matching, and RLS/ACL enforcement --
will be handled directly in the fast path. The security behavior will
mirror the existing SPI path by temporarily switching to the parent
table's owner with SECURITY_LOCAL_USERID_CHANGE | SECURITY_NOFORCE_RLS
around the probe, like ri_PerformCheck() does.

For concurrency, the fast path locks the located parent tuple with
LockTupleKeyShare under GetActiveSnapshot(). If that succeeds (TM_Ok),
the check passes immediately. While non-TM_Ok cases fall back for now,
a later refinement could follow the update chain with
table_tuple_fetch_row_version() under the current snapshot and re-lock
the visible version, making the fast path fully self-contained.

That’s the direction Junwang and I plan to explore next.

--
Thanks, Amit Langote

As Amit has already stated, we are approaching a hybrid "fast-path + fallback"
design.

0001 adds a fast path optimization for foreign key constraint checks
that bypasses the SPI executor, the fast path applies when the referenced
table is not partitioned, and the constraint does not involve temporal
semantics.

With the following test:

create table pk (a numeric primary key);
create table fk (a bigint references pk);
insert into pk select generate_series(1, 2000000);

head:

[local] zhjwpku@postgres:5432-90419=# insert into fk select
generate_series(1, 2000000, 2);
INSERT 0 1000000
Time: 13516.177 ms (00:13.516)

[local] zhjwpku@postgres:5432-90419=# update fk set a = a + 1;
UPDATE 1000000
Time: 15057.638 ms (00:15.058)

patched:

[local] zhjwpku@postgres:5432-98673=# insert into fk select
generate_series(1, 2000000, 2);
INSERT 0 1000000
Time: 8248.777 ms (00:08.249)

[local] zhjwpku@postgres:5432-98673=# update fk set a = a + 1;
UPDATE 1000000
Time: 10117.002 ms (00:10.117)

0002 cache fast-path metadata used by the index probe, at the current
time only comparison operator hash entries, operator function OIDs
and strategy numbers and subtypes for index scans. But this cache
doesn't buy any performance improvement.

Caching additional metadata should improve performance for foreign key checks.

Amit suggested introducing a mechanism for ri_triggers.c to register a
cleanup callback in the EState, which AfterTriggerEndQuery() could then
invoke to release per-statement cached metadata (such as the IndexScanDesc).
However, I haven't been able to implement this mechanism yet.

Amit and I agree that we can post the patches here for review now. We are
continuing to work on improving the metadata cache implementation.

--
Regards
Junwang Zhao

Attachments:

v2-0002-Cache-fast-path-metadata-for-foreign-key-checks.patchapplication/octet-stream; name=v2-0002-Cache-fast-path-metadata-for-foreign-key-checks.patchDownload
From 020729139c824d65a008c6644431be8e8efd7800 Mon Sep 17 00:00:00 2001
From: Junwang Zhao <zhjwpku@gmail.com>
Date: Mon, 1 Dec 2025 12:58:59 +0800
Subject: [PATCH v2 2/2] Cache fast-path metadata for foreign key checks

The metadata is populated lazily on first use via
ri_populate_fastpath_metadata() and reused in subsequent checks via
build_scankeys_from_cache(). This eliminates repeated calls to
ri_HashCompareOp() and get_op_opfamily_properties() during FK checks.
---
 src/backend/utils/adt/ri_triggers.c | 90 +++++++++++++++++++++--------
 1 file changed, 65 insertions(+), 25 deletions(-)

diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index cfb85b9d753..f2e7e4f4ae9 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -94,6 +94,7 @@
 #define RI_TRIGTYPE_UPDATE 2
 #define RI_TRIGTYPE_DELETE 3
 
+struct RI_CompareHashEntry;
 
 /*
  * RI_ConstraintInfo
@@ -133,6 +134,16 @@ typedef struct RI_ConstraintInfo
 	Oid			agged_period_contained_by_oper; /* fkattr <@ range_agg(pkattr) */
 	Oid			period_intersect_oper;	/* anyrange * anyrange */
 	dlist_node	valid_link;		/* Link in list of valid entries */
+
+	/* Fast-path metadata for RI checks on foreign tables */
+	bool		fpmeta_valid; /* is fast-path metadata valid? */
+	// Relation	idxrel;
+	// IndexScanDesc idxscan;
+	// TupleTableSlot *outslot;
+	struct RI_CompareHashEntry *compare_entries[RI_MAX_NUMKEYS];
+	RegProcedure	regops[RI_MAX_NUMKEYS];
+	Oid				subtypes[RI_MAX_NUMKEYS];
+	int				strats[RI_MAX_NUMKEYS];
 } RI_ConstraintInfo;
 
 /*
@@ -295,6 +306,42 @@ get_fkey_unique_index(Oid conoid)
 	return result;
 }
 
+static void
+ri_populate_fastpath_metadata(Oid constraintOid,
+							  Relation pk_rel, Relation fk_rel, Relation idx_rel)
+{
+	RI_ConstraintInfo *riinfo;
+
+	/* Find the constraint info */
+	riinfo = (RI_ConstraintInfo *)
+		hash_search(ri_constraint_cache,
+					&constraintOid,
+					HASH_FIND,
+					NULL);
+	Assert(riinfo != NULL && riinfo->valid);
+
+	for (int i = 0; i < riinfo->nkeys; i++)
+	{
+		/* Use PK = FK equality operator. */
+		Oid eq_opr = riinfo->pf_eq_oprs[i];
+		Oid typeid = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+		Oid lefttype;
+		RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
+
+		riinfo->compare_entries[i] = entry;
+		riinfo->regops[i] = get_opcode(eq_opr);
+
+		get_op_opfamily_properties(eq_opr,
+								   idx_rel->rd_opfamily[i],
+								   false,
+								   &riinfo->strats[i],
+								   &lefttype,
+								   &riinfo->subtypes[i]);
+	}
+
+	riinfo->fpmeta_valid = true;
+}
+
 /*
  * ri_CheckPermissions
  *   Check that the new user has permissions to look into the schema of
@@ -365,20 +412,14 @@ recheck_matched_pk_tuple(Relation idxrel, ScanKeyData *skeys,
 }
 
 /*
- * Doesn't include any cache for now.
+ * Build ScanKeys from cached metadata for fast-path foreign key checks
  */
 static void
 build_scankeys_from_cache(const RI_ConstraintInfo *riinfo,
 						  Relation pk_rel, Relation fk_rel,
-						  Relation idx_rel, int num_pk,
-						  Datum *pk_vals, char *pk_nulls,
-						  ScanKey skeys)
+						  Relation idx_rel, Datum *pk_vals,
+						  char *pk_nulls, ScanKey skeys)
 {
-	/* Use PK = FK equality operator. */
-	const Oid *eq_oprs = riinfo->pf_eq_oprs;
-
-	Assert(num_pk == riinfo->nkeys);
-
 	/*
 	 * May need to cast each of the individual values of the foreign key
 	 * to the corresponding PK column's type if the equality operator
@@ -388,9 +429,7 @@ build_scankeys_from_cache(const RI_ConstraintInfo *riinfo,
 	{
 		if (pk_nulls[i] != 'n')
 		{
-			Oid  eq_opr = eq_oprs[i];
-			Oid  typeid = RIAttType(fk_rel, riinfo->fk_attnums[i]);
-			RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
+			RI_CompareHashEntry *entry = riinfo->compare_entries[i];
 
 			if (OidIsValid(entry->cast_func_finfo.fn_oid))
 				pk_vals[i] = FunctionCall3(&entry->cast_func_finfo,
@@ -406,20 +445,12 @@ build_scankeys_from_cache(const RI_ConstraintInfo *riinfo,
 	 * Set up ScanKeys for the index scan. This is essentially how
 	 * ExecIndexBuildScanKeys() sets them up.
 	 */
-	for (int i = 0; i < num_pk; i++)
+	for (int i = 0; i < riinfo->nkeys; i++)
 	{
 		int		pkattrno = i + 1;
-		Oid		lefttype,
-				righttype;
-		Oid		operator = eq_oprs[i];
-		Oid		opfamily = idx_rel->rd_opfamily[i];
-		int  strat;
-		RegProcedure regop = get_opcode(operator);
-
-		get_op_opfamily_properties(operator, opfamily, false, &strat,
-								   &lefttype, &righttype);
-		ScanKeyEntryInitialize(&skeys[i], 0, pkattrno, strat, righttype,
-							   idx_rel->rd_indcollation[i], regop,
+
+		ScanKeyEntryInitialize(&skeys[i], 0, pkattrno, riinfo->strats[i], riinfo->subtypes[i],
+							   idx_rel->rd_indcollation[i], riinfo->regops[i],
 							   pk_vals[i]);
 	}
 }
@@ -583,7 +614,15 @@ RI_FKey_check(TriggerData *trigdata)
 		idxrel = index_open(idxoid, RowShareLock);
 		num_pk = IndexRelationGetNumberOfKeyAttributes(idxrel);
 
-		build_scankeys_from_cache(riinfo, pk_rel, fk_rel, idxrel, num_pk,
+		Assert(num_pk == riinfo->nkeys);
+
+		/* If Fast-path metadata hasn't been populated, do it now */
+		if (!riinfo->fpmeta_valid)
+			ri_populate_fastpath_metadata(riinfo->constraint_id,
+										  pk_rel, fk_rel, idxrel);
+		Assert(riinfo->fpmeta_valid);
+
+		build_scankeys_from_cache(riinfo, pk_rel, fk_rel, idxrel,
 								  pk_vals, pk_nulls, skey);
 
 		scan = index_beginscan(pk_rel, idxrel, GetActiveSnapshot(), NULL, riinfo->nkeys, 0);
@@ -2663,6 +2702,7 @@ ri_LoadConstraintInfo(Oid constraintOid)
 	dclist_push_tail(&ri_constraint_cache_valid_list, &riinfo->valid_link);
 
 	riinfo->valid = true;
+	riinfo->fpmeta_valid = false;
 
 	return riinfo;
 }
-- 
2.41.0

v2-0001-Add-fast-path-for-foreign-key-constraint-checks.patchapplication/octet-stream; name=v2-0001-Add-fast-path-for-foreign-key-constraint-checks.patchDownload
From c93ee8b6dfd5f345603c327e82b50f1dd8f31cf0 Mon Sep 17 00:00:00 2001
From: Junwang Zhao <zhjwpku@gmail.com>
Date: Mon, 1 Dec 2025 12:16:46 +0800
Subject: [PATCH v2 1/2] Add fast path for foreign key constraint checks

Add a fast path optimization for foreign key constraint checks that
bypasses the SPI executor for simple foreign keys by directly probing
the unique index on the referenced table.

The fast path applies when the referenced table is not partitioned,
and the constraint does not involve temporal semantics. It extracts
the FK value, scans the unique index directly, and locks the tuple
with KEY SHARE lock, matching SPI behavior.

This avoids SPI overhead and improves performance for bulk operations
with many FK checks.

Refactoring: Extract tuple locking logic into ExecLockTableTuple() for
reuse.

Author: Amit Langote, Junwang Zhao

Discussion:
---
 src/backend/executor/nodeLockRows.c           | 164 +++++----
 src/backend/utils/adt/ri_triggers.c           | 323 +++++++++++++++++-
 src/include/executor/executor.h               |   9 +
 .../expected/fk-concurrent-pk-upd.out         |  58 ++++
 src/test/isolation/isolation_schedule         |   1 +
 .../isolation/specs/fk-concurrent-pk-upd.spec |  42 +++
 src/test/regress/expected/foreign_key.out     |  47 +++
 src/test/regress/sql/foreign_key.sql          |  64 ++++
 8 files changed, 635 insertions(+), 73 deletions(-)
 create mode 100644 src/test/isolation/expected/fk-concurrent-pk-upd.out
 create mode 100644 src/test/isolation/specs/fk-concurrent-pk-upd.spec

diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index a8afbf93b48..06c4784c0f5 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -79,10 +79,7 @@ lnext:
 		Datum		datum;
 		bool		isNull;
 		ItemPointerData tid;
-		TM_FailureData tmfd;
 		LockTupleMode lockmode;
-		int			lockflags = 0;
-		TM_Result	test;
 		TupleTableSlot *markSlot;
 
 		/* clear any leftover test tuple for this rel */
@@ -178,74 +175,11 @@ lnext:
 				break;
 		}
 
-		lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
-		if (!IsolationUsesXactSnapshot())
-			lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
-
-		test = table_tuple_lock(erm->relation, &tid, estate->es_snapshot,
-								markSlot, estate->es_output_cid,
-								lockmode, erm->waitPolicy,
-								lockflags,
-								&tmfd);
-
-		switch (test)
-		{
-			case TM_WouldBlock:
-				/* couldn't lock tuple in SKIP LOCKED mode */
-				goto lnext;
-
-			case TM_SelfModified:
-
-				/*
-				 * The target tuple was already updated or deleted by the
-				 * current command, or by a later command in the current
-				 * transaction.  We *must* ignore the tuple in the former
-				 * case, so as to avoid the "Halloween problem" of repeated
-				 * update attempts.  In the latter case it might be sensible
-				 * to fetch the updated tuple instead, but doing so would
-				 * require changing heap_update and heap_delete to not
-				 * complain about updating "invisible" tuples, which seems
-				 * pretty scary (table_tuple_lock will not complain, but few
-				 * callers expect TM_Invisible, and we're not one of them). So
-				 * for now, treat the tuple as deleted and do not process.
-				 */
-				goto lnext;
-
-			case TM_Ok:
-
-				/*
-				 * Got the lock successfully, the locked tuple saved in
-				 * markSlot for, if needed, EvalPlanQual testing below.
-				 */
-				if (tmfd.traversed)
-					epq_needed = true;
-				break;
-
-			case TM_Updated:
-				if (IsolationUsesXactSnapshot())
-					ereport(ERROR,
-							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-							 errmsg("could not serialize access due to concurrent update")));
-				elog(ERROR, "unexpected table_tuple_lock status: %u",
-					 test);
-				break;
-
-			case TM_Deleted:
-				if (IsolationUsesXactSnapshot())
-					ereport(ERROR,
-							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-							 errmsg("could not serialize access due to concurrent update")));
-				/* tuple was deleted so don't return it */
-				goto lnext;
-
-			case TM_Invisible:
-				elog(ERROR, "attempted to lock invisible tuple");
-				break;
-
-			default:
-				elog(ERROR, "unrecognized table_tuple_lock status: %u",
-					 test);
-		}
+		/* skip tuple if it couldn't be locked */
+		if (!ExecLockTableTuple(erm->relation, &tid, markSlot,
+								estate->es_snapshot, estate->es_output_cid,
+								lockmode, erm->waitPolicy, &epq_needed))
+			goto lnext;
 
 		/* Remember locked tuple's TID for EPQ testing and WHERE CURRENT OF */
 		erm->curCtid = tid;
@@ -280,6 +214,94 @@ lnext:
 	return slot;
 }
 
+
+/*
+ * ExecLockTableTuple
+ * 		Locks tuple with the specified TID in lockmode following given wait
+ * 		policy
+ *
+ * Returns true if the tuple was successfully locked.  Locked tuple is loaded
+ * into provided slot.
+ */
+bool
+ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
+				   Snapshot snapshot, CommandId cid,
+				   LockTupleMode lockmode, LockWaitPolicy waitPolicy,
+				   bool *tuple_concurrently_updated)
+{
+	TM_FailureData tmfd;
+	int			lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
+	TM_Result	test;
+
+	if (tuple_concurrently_updated)
+		*tuple_concurrently_updated = false;
+
+	if (!IsolationUsesXactSnapshot())
+		lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
+
+	test = table_tuple_lock(relation, tid, snapshot, slot, cid, lockmode,
+							waitPolicy, lockflags, &tmfd);
+
+	switch (test)
+	{
+		case TM_WouldBlock:
+			/* couldn't lock tuple in SKIP LOCKED mode */
+			return false;
+
+		case TM_SelfModified:
+			/*
+			 * The target tuple was already updated or deleted by the
+			 * current command, or by a later command in the current
+			 * transaction.  We *must* ignore the tuple in the former
+			 * case, so as to avoid the "Halloween problem" of repeated
+			 * update attempts.  In the latter case it might be sensible
+			 * to fetch the updated tuple instead, but doing so would
+			 * require changing heap_update and heap_delete to not
+			 * complain about updating "invisible" tuples, which seems
+			 * pretty scary (table_tuple_lock will not complain, but few
+			 * callers expect TM_Invisible, and we're not one of them). So
+			 * for now, treat the tuple as deleted and do not process.
+			 */
+			return false;
+
+		case TM_Ok:
+			/*
+			 * Got the lock successfully, the locked tuple saved in
+			 * slot for EvalPlanQual, if asked by the caller.
+			 */
+			if (tmfd.traversed && tuple_concurrently_updated)
+				*tuple_concurrently_updated = true;
+			break;
+
+		case TM_Updated:
+			if (IsolationUsesXactSnapshot())
+				ereport(ERROR,
+						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+						 errmsg("could not serialize access due to concurrent update")));
+			elog(ERROR, "unexpected table_tuple_lock status: %u",
+				 test);
+			break;
+
+		case TM_Deleted:
+			if (IsolationUsesXactSnapshot())
+				ereport(ERROR,
+						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+						 errmsg("could not serialize access due to concurrent update")));
+			/* tuple was deleted so don't return it */
+			return false;
+
+		case TM_Invisible:
+			elog(ERROR, "attempted to lock invisible tuple");
+			return false;
+
+		default:
+			elog(ERROR, "unrecognized table_tuple_lock status: %u", test);
+			return false;
+	}
+
+	return true;
+}
+
 /* ----------------------------------------------------------------
  *		ExecInitLockRows
  *
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 059fc5ebf60..cfb85b9d753 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -24,12 +24,15 @@
 #include "postgres.h"
 
 #include "access/htup_details.h"
+#include "access/skey.h"
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "access/tableam.h"
 #include "access/xact.h"
+#include "catalog/index.h"
 #include "catalog/pg_collation.h"
 #include "catalog/pg_constraint.h"
+#include "catalog/pg_namespace.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/spi.h"
@@ -238,6 +241,188 @@ pg_noreturn static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 										   TupleTableSlot *violatorslot, TupleDesc tupdesc,
 										   int queryno, bool is_restrict, bool partgone);
 
+static bool
+ri_fastpath_is_applicable(const RI_ConstraintInfo *riinfo, Relation pk_rel)
+{
+	/*
+	 * Partitioned referenced tables are skipped for simplicity, since
+	 * they require routing the probe through the correct partition using
+	 * PartitionDirectory.
+	 * This can be added later as a separate patch once the core mechanism
+	 * is stable.
+	 */
+	if (pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		return false;
+
+	/*
+	 * Temporal foreign keys use range overlap and containment semantics
+	 * (&&, <@, range_agg()) that inherently involve aggregation and
+	 * multiple-row reasoning, so they stay on the SPI path.
+	 */
+	if (riinfo->hasperiod)
+		return false;
+
+	return true;
+}
+
+/*
+ * get_fkey_unique_index
+ *  Returns the unique index used by a supposedly foreign key constraint
+ *
+ * XXX This is very similar to get_constraint_index; probably they should be
+ * unified.
+ */
+static Oid
+get_fkey_unique_index(Oid conoid)
+{
+	Oid			result = InvalidOid;
+	HeapTuple	tp;
+
+	tp = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conoid));
+	if (HeapTupleIsValid(tp))
+	{
+		Form_pg_constraint contup = (Form_pg_constraint) GETSTRUCT(tp);
+
+		if (contup->contype == CONSTRAINT_FOREIGN)
+			result = contup->conindid;
+		ReleaseSysCache(tp);
+	}
+
+	if (!OidIsValid(result))
+		elog(ERROR, "unique index not found for foreign key constraint %u",
+			 conoid);
+
+	return result;
+}
+
+/*
+ * ri_CheckPermissions
+ *   Check that the new user has permissions to look into the schema of
+ *   and SELECT from 'query_rel'
+ *
+ * Provided for non-SQL implementors of an RI_Plan.
+ */
+static void
+ri_CheckPermissions(Relation query_rel)
+{
+	AclResult aclresult;
+
+	/* USAGE on schema. */
+	aclresult = object_aclcheck(NamespaceRelationId,
+								RelationGetNamespace(query_rel),
+								GetUserId(), ACL_USAGE);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_SCHEMA,
+					   get_namespace_name(RelationGetNamespace(query_rel)));
+
+	/* SELECT on relation. */
+	aclresult = pg_class_aclcheck(RelationGetRelid(query_rel), GetUserId(),
+								  ACL_SELECT);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_TABLE,
+					   RelationGetRelationName(query_rel));
+}
+
+/*
+ * This checks that the index key of the tuple specified in 'new_slot' matches
+ * the key that has already been found in the PK index relation 'idxrel'.
+ *
+ * Returns true if the index key of the tuple matches the existing index
+ * key, false otherwise.
+ */
+static bool
+recheck_matched_pk_tuple(Relation idxrel, ScanKeyData *skeys,
+						 TupleTableSlot *new_slot)
+{
+	IndexInfo *indexInfo = BuildIndexInfo(idxrel);
+	Datum		values[INDEX_MAX_KEYS];
+	bool		isnull[INDEX_MAX_KEYS];
+	bool		matched = true;
+
+	/* PK indexes never have these. */
+	Assert(indexInfo->ii_Expressions == NIL &&
+		   indexInfo->ii_ExclusionOps == NULL);
+
+	/* Form the index values and isnull flags given the table tuple. */
+	FormIndexDatum(indexInfo, new_slot, NULL, values, isnull);
+	for (int i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
+	{
+		ScanKeyData		*skey = &skeys[i];
+
+		/* A PK column can never be set to NULL. */
+		Assert(!isnull[i]);
+		if (!DatumGetBool(FunctionCall2Coll(&skey->sk_func,
+											skey->sk_collation,
+											skey->sk_argument,
+											values[i])))
+		{
+			matched = false;
+			break;
+		}
+	}
+
+	return matched;
+}
+
+/*
+ * Doesn't include any cache for now.
+ */
+static void
+build_scankeys_from_cache(const RI_ConstraintInfo *riinfo,
+						  Relation pk_rel, Relation fk_rel,
+						  Relation idx_rel, int num_pk,
+						  Datum *pk_vals, char *pk_nulls,
+						  ScanKey skeys)
+{
+	/* Use PK = FK equality operator. */
+	const Oid *eq_oprs = riinfo->pf_eq_oprs;
+
+	Assert(num_pk == riinfo->nkeys);
+
+	/*
+	 * May need to cast each of the individual values of the foreign key
+	 * to the corresponding PK column's type if the equality operator
+	 * demands it.
+	 */
+	for (int i = 0; i < riinfo->nkeys; i++)
+	{
+		if (pk_nulls[i] != 'n')
+		{
+			Oid  eq_opr = eq_oprs[i];
+			Oid  typeid = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+			RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
+
+			if (OidIsValid(entry->cast_func_finfo.fn_oid))
+				pk_vals[i] = FunctionCall3(&entry->cast_func_finfo,
+										   pk_vals[i],
+										   Int32GetDatum(-1), /* typmod */
+										   BoolGetDatum(false)); /* implicit coercion */
+		} else {
+			Assert(false);
+		}
+	}
+
+	/*
+	 * Set up ScanKeys for the index scan. This is essentially how
+	 * ExecIndexBuildScanKeys() sets them up.
+	 */
+	for (int i = 0; i < num_pk; i++)
+	{
+		int		pkattrno = i + 1;
+		Oid		lefttype,
+				righttype;
+		Oid		operator = eq_oprs[i];
+		Oid		opfamily = idx_rel->rd_opfamily[i];
+		int  strat;
+		RegProcedure regop = get_opcode(operator);
+
+		get_op_opfamily_properties(operator, opfamily, false, &strat,
+								   &lefttype, &righttype);
+		ScanKeyEntryInitialize(&skeys[i], 0, pkattrno, strat, righttype,
+							   idx_rel->rd_indcollation[i], regop,
+							   pk_vals[i]);
+	}
+}
 
 /*
  * RI_FKey_check -
@@ -349,6 +534,132 @@ RI_FKey_check(TriggerData *trigdata)
 			break;
 	}
 
+	/* Fast path, for simple cases, probe the unique index directly */
+	if (ri_fastpath_is_applicable(riinfo, pk_rel))
+	{
+		Oid			idxoid;
+		Relation	idxrel;
+		int			num_pk;
+		Datum		pk_vals[INDEX_MAX_KEYS];
+		char		pk_nulls[INDEX_MAX_KEYS];
+		ScanKeyData	skey[INDEX_MAX_KEYS];
+		IndexScanDesc	scan;
+		TupleTableSlot *outslot;
+		Oid				saved_userid;
+		int				saved_sec_context;
+		bool			tuple_concurrently_updated;
+		int				tuples_processed = 0;
+
+		elog(DEBUG1,
+			 "RI fastpath: constraint \"%s\" using fast path",
+			 NameStr(riinfo->conname));
+
+		/*
+		 * Extract the unique key from the provided slot and choose the
+		 * equality operators to use when scanning the index below.
+		 */
+		ri_ExtractValues(fk_rel, newslot, riinfo, false, pk_vals, pk_nulls);
+
+		/*
+		 * Switch to referenced table's owner to perform the below operations as.
+		 * This matches what ri_PerformCheck() does.
+		 */
+		GetUserIdAndSecContext(&saved_userid, &saved_sec_context);
+		SetUserIdAndSecContext(RelationGetForm(pk_rel)->relowner,
+							   saved_sec_context | SECURITY_LOCAL_USERID_CHANGE |
+							   SECURITY_NOFORCE_RLS);
+		ri_CheckPermissions(pk_rel);
+
+		PushActiveSnapshot(GetTransactionSnapshot());
+		CommandCounterIncrement();
+		UpdateActiveSnapshotCommandId();
+
+		/*
+		 * Open the constraint index to be scanned.
+		 *
+		 * Handle partitioned 'pk_rel' later, skipped in ri_fastpath_is_applicable
+		 */
+		idxoid = get_fkey_unique_index(riinfo->constraint_id);
+		idxrel = index_open(idxoid, RowShareLock);
+		num_pk = IndexRelationGetNumberOfKeyAttributes(idxrel);
+
+		build_scankeys_from_cache(riinfo, pk_rel, fk_rel, idxrel, num_pk,
+								  pk_vals, pk_nulls, skey);
+
+		scan = index_beginscan(pk_rel, idxrel, GetActiveSnapshot(), NULL, riinfo->nkeys, 0);
+
+		/* Install the ScanKeys. */
+		index_rescan(scan, skey, num_pk, NULL, 0);
+
+		/* should be cached, avoid create for each row */
+		outslot = table_slot_create(pk_rel, NULL);
+
+		/* Look for the tuple, and if found, try to lock it in key share mode. */
+		if (!index_getnext_slot(scan, ForwardScanDirection, outslot))
+			ri_ReportViolation(riinfo,
+							   pk_rel, fk_rel,
+							   newslot,
+							   NULL,
+							   RI_PLAN_CHECK_LOOKUPPK, false, false);
+
+		/*
+		 * If we fail to lock the tuple for whatever reason, assume it doesn't
+		 * exist.  If the locked tuple is the one that was found to be updated
+		 * concurrently, retry.
+		 */
+		if (ExecLockTableTuple(pk_rel, &(outslot->tts_tid), outslot,
+							   GetActiveSnapshot(),
+							   GetCurrentCommandId(false),
+							   LockTupleKeyShare,
+							   LockWaitBlock,
+							   &tuple_concurrently_updated))
+		{
+			bool		matched = true;
+
+			/*
+			 * If the matched table tuple has been updated, check if the key is
+			 * still the same.
+			 *
+			 * This emulates EvalPlanQual() in the executor.
+			 */
+			if (tuple_concurrently_updated &&
+				!recheck_matched_pk_tuple(idxrel, skey, outslot))
+				matched = false;
+
+			if (matched)
+				tuples_processed = 1;
+		}
+
+		index_endscan(scan);
+		ExecDropSingleTupleTableSlot(outslot);
+
+		/* Don't release lock until commit. */
+		index_close(idxrel, NoLock);
+
+		PopActiveSnapshot();
+
+		/* Restore UID and security context */
+		SetUserIdAndSecContext(saved_userid, saved_sec_context);
+
+		if (tuples_processed == 1)
+		{
+			table_close(pk_rel, RowShareLock);
+			return PointerGetDatum(NULL);
+		}
+		else
+		{
+			ri_ReportViolation(riinfo,
+							   pk_rel, fk_rel,
+							   newslot,
+							   NULL,
+							   RI_PLAN_CHECK_LOOKUPPK, false, false);
+		}
+	}
+
+	/* Fall back to SPI */
+	elog(DEBUG1, "RI fastpath: constraint \"%s\" falling back to SPI",
+		 NameStr(riinfo->conname));
+
 	SPI_connect();
 
 	/* Fetch or prepare a saved plan for the real check */
@@ -3165,8 +3476,16 @@ ri_HashCompareOp(Oid eq_opr, Oid typeid)
 		 * moment since that will never be generated for implicit coercions.
 		 */
 		op_input_types(eq_opr, &lefttype, &righttype);
-		Assert(lefttype == righttype);
-		if (typeid == lefttype)
+
+		/*
+		 * Don't need to cast if the values that will be passed to the
+		 * operator will be of expected operand type(s).  The operator can be
+		 * cross-type (such as when called by ri_LookupKeyInPkRel()), in which
+		 * case, we only need the cast if the right operand value doesn't match
+		 * the type expected by the operator.
+		 */
+		if ((lefttype == righttype && typeid == lefttype) ||
+			(lefttype != righttype && typeid == righttype))
 			castfunc = InvalidOid;	/* simplest case */
 		else
 		{
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index fa2b657fb2f..8155aa7ae79 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -303,6 +303,15 @@ extern void ExecShutdownNode(PlanState *node);
 extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node);
 
 
+/*
+ * functions in nodeLockRows.c
+ */
+
+extern bool ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
+							   Snapshot snapshot, CommandId cid,
+							   LockTupleMode lockmode, LockWaitPolicy waitPolicy,
+							   bool *tuple_concurrently_updated);
+
 /* ----------------------------------------------------------------
  *		ExecProcNode
  *
diff --git a/src/test/isolation/expected/fk-concurrent-pk-upd.out b/src/test/isolation/expected/fk-concurrent-pk-upd.out
new file mode 100644
index 00000000000..9bbec638ac9
--- /dev/null
+++ b/src/test/isolation/expected/fk-concurrent-pk-upd.out
@@ -0,0 +1,58 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s2ukey s1i s2c s1c s2s s1s
+step s2ukey: UPDATE parent SET parent_key = 2 WHERE parent_key = 1;
+step s1i: INSERT INTO child VALUES (1, 1); <waiting ...>
+step s2c: COMMIT;
+step s1i: <... completed>
+ERROR:  insert or update on table "child" violates foreign key constraint "child_parent_key_fkey"
+step s1c: COMMIT;
+step s2s: SELECT * FROM parent;
+parent_key|aux
+----------+---
+         2|foo
+(1 row)
+
+step s1s: SELECT * FROM child;
+child_key|parent_key
+---------+----------
+(0 rows)
+
+
+starting permutation: s2uaux s1i s2c s1c s2s s1s
+step s2uaux: UPDATE parent SET aux = 'bar' WHERE parent_key = 1;
+step s1i: INSERT INTO child VALUES (1, 1);
+step s2c: COMMIT;
+step s1c: COMMIT;
+step s2s: SELECT * FROM parent;
+parent_key|aux
+----------+---
+         1|bar
+(1 row)
+
+step s1s: SELECT * FROM child;
+child_key|parent_key
+---------+----------
+        1|         1
+(1 row)
+
+
+starting permutation: s2ukey s1i s2ukey2 s2c s1c s2s s1s
+step s2ukey: UPDATE parent SET parent_key = 2 WHERE parent_key = 1;
+step s1i: INSERT INTO child VALUES (1, 1); <waiting ...>
+step s2ukey2: UPDATE parent SET parent_key = 1 WHERE parent_key = 2;
+step s2c: COMMIT;
+step s1i: <... completed>
+step s1c: COMMIT;
+step s2s: SELECT * FROM parent;
+parent_key|aux
+----------+---
+         1|foo
+(1 row)
+
+step s1s: SELECT * FROM child;
+child_key|parent_key
+---------+----------
+        1|         1
+(1 row)
+
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index 112f05a3677..124d4cc289f 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -37,6 +37,7 @@ test: fk-partitioned-2
 test: fk-snapshot
 test: fk-snapshot-2
 test: fk-snapshot-3
+test: fk-concurrent-pk-upd
 test: subxid-overflow
 test: eval-plan-qual
 test: eval-plan-qual-trigger
diff --git a/src/test/isolation/specs/fk-concurrent-pk-upd.spec b/src/test/isolation/specs/fk-concurrent-pk-upd.spec
new file mode 100644
index 00000000000..cba05a85f78
--- /dev/null
+++ b/src/test/isolation/specs/fk-concurrent-pk-upd.spec
@@ -0,0 +1,42 @@
+# Tests that an INSERT on referencing table correctly fails when
+# the referenced value disappears due to a concurrent update
+setup
+{
+  CREATE TABLE parent (
+    parent_key int PRIMARY KEY,
+    aux   text NOT NULL
+  );
+
+  CREATE TABLE child (
+    child_key int PRIMARY KEY,
+    parent_key int NOT NULL REFERENCES parent
+  );
+
+  INSERT INTO parent VALUES (1, 'foo');
+}
+
+teardown
+{
+  DROP TABLE parent, child;
+}
+
+session s1
+setup  { BEGIN; }
+step s1i { INSERT INTO child VALUES (1, 1); }
+step s1c { COMMIT; }
+step s1s { SELECT * FROM child; }
+
+session s2
+setup  { BEGIN; }
+step s2ukey { UPDATE parent SET parent_key = 2 WHERE parent_key = 1; }
+step s2uaux { UPDATE parent SET aux = 'bar' WHERE parent_key = 1; }
+step s2ukey2 { UPDATE parent SET parent_key = 1 WHERE parent_key = 2; }
+step s2c { COMMIT; }
+step s2s { SELECT * FROM parent; }
+
+# fail
+permutation s2ukey s1i s2c s1c s2s s1s
+# ok
+permutation s2uaux s1i s2c s1c s2s s1s
+# ok
+permutation s2ukey s1i s2ukey2 s2c s1c s2s s1s
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
index 7f9e0ebb82d..eb7d393ea25 100644
--- a/src/test/regress/expected/foreign_key.out
+++ b/src/test/regress/expected/foreign_key.out
@@ -370,6 +370,53 @@ SELECT * FROM PKTABLE;
 DROP TABLE FKTABLE;
 DROP TABLE PKTABLE;
 --
+-- Check RLS
+--
+CREATE TABLE PKTABLE ( ptest1 int PRIMARY KEY, ptest2 text );
+CREATE TABLE FKTABLE ( ftest1 int REFERENCES PKTABLE, ftest2 int );
+-- Insert test data into PKTABLE
+INSERT INTO PKTABLE VALUES (1, 'Test1');
+INSERT INTO PKTABLE VALUES (2, 'Test2');
+INSERT INTO PKTABLE VALUES (3, 'Test3');
+-- Grant privileges on PKTABLE/FKTABLE to user regress_foreign_key_user
+CREATE USER regress_foreign_key_user NOLOGIN;
+GRANT SELECT ON PKTABLE TO regress_foreign_key_user;
+GRANT SELECT, INSERT ON FKTABLE TO regress_foreign_key_user;
+-- Enable RLS on PKTABLE and Create policies
+ALTER TABLE PKTABLE ENABLE ROW LEVEL SECURITY;
+CREATE POLICY pktable_view_odd_policy ON PKTABLE TO regress_foreign_key_user USING (ptest1 % 2 = 1);
+ALTER TABLE PKTABLE OWNER to regress_foreign_key_user;
+SET ROLE regress_foreign_key_user;
+INSERT INTO FKTABLE VALUES (3, 5);
+INSERT INTO FKTABLE VALUES (2, 5); -- success, REFERENCES are not subject to row security
+RESET ROLE;
+DROP TABLE FKTABLE;
+DROP TABLE PKTABLE;
+DROP USER regress_foreign_key_user;
+--
+-- Check ACL
+--
+CREATE TABLE PKTABLE ( ptest1 int PRIMARY KEY, ptest2 text );
+CREATE TABLE FKTABLE ( ftest1 int REFERENCES PKTABLE, ftest2 int );
+-- Insert test data into PKTABLE
+INSERT INTO PKTABLE VALUES (1, 'Test1');
+INSERT INTO PKTABLE VALUES (2, 'Test2');
+INSERT INTO PKTABLE VALUES (3, 'Test3');
+-- Grant usage on PKTABLE to user regress_foreign_key_user
+CREATE USER regress_foreign_key_user NOLOGIN;
+GRANT SELECT ON PKTABLE TO regress_foreign_key_user;
+ALTER TABLE PKTABLE OWNER to regress_foreign_key_user;
+-- Inserting into FKTABLE should work
+INSERT INTO FKTABLE VALUES (3, 5);
+-- Revoke usage on PKTABLE from user regress_foreign_key_user
+REVOKE SELECT ON PKTABLE FROM regress_foreign_key_user;
+-- Inserting into FKTABLE should fail
+INSERT INTO FKTABLE VALUES (2, 6);
+ERROR:  permission denied for table pktable
+DROP TABLE FKTABLE;
+DROP TABLE PKTABLE;
+DROP USER regress_foreign_key_user;
+--
 -- Check initial check upon ALTER TABLE
 --
 CREATE TABLE PKTABLE ( ptest1 int, ptest2 int, PRIMARY KEY(ptest1, ptest2) );
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
index 4a6172b8e56..4b2198348d2 100644
--- a/src/test/regress/sql/foreign_key.sql
+++ b/src/test/regress/sql/foreign_key.sql
@@ -242,6 +242,70 @@ SELECT * FROM PKTABLE;
 DROP TABLE FKTABLE;
 DROP TABLE PKTABLE;
 
+--
+-- Check RLS
+--
+CREATE TABLE PKTABLE ( ptest1 int PRIMARY KEY, ptest2 text );
+CREATE TABLE FKTABLE ( ftest1 int REFERENCES PKTABLE, ftest2 int );
+
+-- Insert test data into PKTABLE
+INSERT INTO PKTABLE VALUES (1, 'Test1');
+INSERT INTO PKTABLE VALUES (2, 'Test2');
+INSERT INTO PKTABLE VALUES (3, 'Test3');
+
+-- Grant privileges on PKTABLE/FKTABLE to user regress_foreign_key_user
+CREATE USER regress_foreign_key_user NOLOGIN;
+GRANT SELECT ON PKTABLE TO regress_foreign_key_user;
+GRANT SELECT, INSERT ON FKTABLE TO regress_foreign_key_user;
+
+-- Enable RLS on PKTABLE and Create policies
+ALTER TABLE PKTABLE ENABLE ROW LEVEL SECURITY;
+CREATE POLICY pktable_view_odd_policy ON PKTABLE TO regress_foreign_key_user USING (ptest1 % 2 = 1);
+
+ALTER TABLE PKTABLE OWNER to regress_foreign_key_user;
+
+SET ROLE regress_foreign_key_user;
+
+INSERT INTO FKTABLE VALUES (3, 5);
+INSERT INTO FKTABLE VALUES (2, 5); -- success, REFERENCES are not subject to row security
+
+RESET ROLE;
+
+DROP TABLE FKTABLE;
+DROP TABLE PKTABLE;
+DROP USER regress_foreign_key_user;
+
+--
+-- Check ACL
+--
+CREATE TABLE PKTABLE ( ptest1 int PRIMARY KEY, ptest2 text );
+CREATE TABLE FKTABLE ( ftest1 int REFERENCES PKTABLE, ftest2 int );
+
+-- Insert test data into PKTABLE
+INSERT INTO PKTABLE VALUES (1, 'Test1');
+INSERT INTO PKTABLE VALUES (2, 'Test2');
+INSERT INTO PKTABLE VALUES (3, 'Test3');
+
+-- Grant usage on PKTABLE to user regress_foreign_key_user
+CREATE USER regress_foreign_key_user NOLOGIN;
+GRANT SELECT ON PKTABLE TO regress_foreign_key_user;
+
+ALTER TABLE PKTABLE OWNER to regress_foreign_key_user;
+
+-- Inserting into FKTABLE should work
+INSERT INTO FKTABLE VALUES (3, 5);
+
+-- Revoke usage on PKTABLE from user regress_foreign_key_user
+REVOKE SELECT ON PKTABLE FROM regress_foreign_key_user;
+
+-- Inserting into FKTABLE should fail
+INSERT INTO FKTABLE VALUES (2, 6);
+
+DROP TABLE FKTABLE;
+DROP TABLE PKTABLE;
+
+DROP USER regress_foreign_key_user;
+
 --
 -- Check initial check upon ALTER TABLE
 --
-- 
2.41.0