From 208c733d759592402901599446b4f7e7197c1777 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Fri, 5 Jun 2020 16:42:34 +0200
Subject: [PATCH 4/5] Introduce infrastructure for batch processing RI events.

Separate storage is used for the RI trigger events because the "transient
table" that we provide to statement triggers would not be available for
deferred constraints. Also, the regular statement level trigger is not ideal
for the RI checks because it requires the query execution to complete before
the RI checks even start. On the other hand, if we use batches of row trigger
events, we only need to tune the batch size so that user gets RI violation
error rather soon.

This patch only introduces the infrastructure, however the trigger function is
still called per event. This is just to reduce the size of the diffs.
---
 src/backend/commands/tablecmds.c    |   68 +-
 src/backend/commands/trigger.c      |  406 ++++++--
 src/backend/executor/spi.c          |   16 +-
 src/backend/utils/adt/ri_triggers.c | 1385 +++++++++++++++++++--------
 src/include/commands/trigger.h      |   25 +
 5 files changed, 1381 insertions(+), 519 deletions(-)

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 2ab02e01a0..8f4dd07bf7 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -10326,6 +10326,15 @@ validateForeignKeyConstraint(char *conname,
 	MemoryContext oldcxt;
 	MemoryContext perTupCxt;
 
+	LOCAL_FCINFO(fcinfo, 0);
+	TriggerData trigdata = {0};
+	ResourceOwner saveResourceOwner;
+	Tuplestorestate *table;
+	TupleDesc	tupdesc;
+	const int16 *attnums;
+	Datum	   *values;
+	bool	   *isnull;
+
 	ereport(DEBUG1,
 			(errmsg("validating foreign key constraint \"%s\"", conname)));
 
@@ -10360,41 +10369,58 @@ validateForeignKeyConstraint(char *conname,
 	slot = table_slot_create(rel, NULL);
 	scan = table_beginscan(rel, snapshot, 0, NULL);
 
+	saveResourceOwner = CurrentResourceOwner;
+	CurrentResourceOwner = CurTransactionResourceOwner;
+	table = tuplestore_begin_heap(false, false, work_mem);
+	CurrentResourceOwner = saveResourceOwner;
+
+	/* Retrieve information on FK attributes. */
+	tupdesc = RI_FKey_fk_attributes(&trig, rel, &attnums);
+	values = (Datum *) palloc(tupdesc->natts * sizeof(Datum));
+	isnull = (bool *) palloc(tupdesc->natts * sizeof(bool));
+
 	perTupCxt = AllocSetContextCreate(CurrentMemoryContext,
 									  "validateForeignKeyConstraint",
 									  ALLOCSET_SMALL_SIZES);
 	oldcxt = MemoryContextSwitchTo(perTupCxt);
 
+	/* Store the rows to be checked, but only the FK attributes. */
 	while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
 	{
-		LOCAL_FCINFO(fcinfo, 0);
-		TriggerData trigdata = {0};
+		int			i;
 
 		CHECK_FOR_INTERRUPTS();
 
-		/*
-		 * Make a call to the trigger function
-		 *
-		 * No parameters are passed, but we do set a context
-		 */
-		MemSet(fcinfo, 0, SizeForFunctionCallInfo(0));
-
-		/*
-		 * We assume RI_FKey_check_ins won't look at flinfo...
-		 */
-		trigdata.type = T_TriggerData;
-		trigdata.tg_event = TRIGGER_EVENT_INSERT | TRIGGER_EVENT_ROW;
-		trigdata.tg_relation = rel;
-		trigdata.tg_trigtuple = ExecFetchSlotHeapTuple(slot, false, NULL);
-		trigdata.tg_trigslot = slot;
-		trigdata.tg_trigger = &trig;
-
-		fcinfo->context = (Node *) &trigdata;
+		for (i = 0; i < slot->tts_tupleDescriptor->natts; i++)
+			values[i] = slot_getattr(slot, attnums[i], &isnull[i]);
 
-		RI_FKey_check_ins(fcinfo);
+		tuplestore_putvalues(table, tupdesc, values, isnull);
 
 		MemoryContextReset(perTupCxt);
 	}
+	pfree(values);
+	pfree(isnull);
+
+	/*
+	 * Make a call to the trigger function
+	 *
+	 * No parameters are passed, but we do set a context
+	 */
+	MemSet(fcinfo, 0, SizeForFunctionCallInfo(0));
+
+	/*
+	 * We assume RI_FKey_check_ins won't look at flinfo...
+	 */
+	trigdata.type = T_TriggerData;
+	trigdata.tg_event = TRIGGER_EVENT_INSERT | TRIGGER_EVENT_ROW;
+	trigdata.tg_relation = rel;
+	trigdata.tg_trigslot = slot;
+	trigdata.tg_trigger = &trig;
+	trigdata.tg_oldtable = table;
+
+	fcinfo->context = (Node *) &trigdata;
+
+	RI_FKey_check_ins(fcinfo);
 
 	MemoryContextSwitchTo(oldcxt);
 	MemoryContextDelete(perTupCxt);
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index 672fccff5b..c240988471 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -105,6 +105,8 @@ static void AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
 static void AfterTriggerEnlargeQueryState(void);
 static bool before_stmt_triggers_fired(Oid relid, CmdType cmdType);
 
+static TIDArray *alloc_tid_array(void);
+static void add_tid(TIDArray *ta, ItemPointer item);
 
 /*
  * Create a trigger.  Returns the address of the created trigger.
@@ -3337,10 +3339,14 @@ typedef struct AfterTriggerEventList
 /* Macros to help in iterating over a list of events */
 #define for_each_chunk(cptr, evtlist) \
 	for (cptr = (evtlist).head; cptr != NULL; cptr = cptr->next)
+#define next_event_in_chunk(eptr, cptr) \
+	(AfterTriggerEvent) (((char *) eptr) + SizeofTriggerEvent(eptr))
 #define for_each_event(eptr, cptr) \
 	for (eptr = (AfterTriggerEvent) CHUNK_DATA_START(cptr); \
 		 (char *) eptr < (cptr)->freeptr; \
-		 eptr = (AfterTriggerEvent) (((char *) eptr) + SizeofTriggerEvent(eptr)))
+		 eptr = next_event_in_chunk(eptr, cptr))
+#define is_last_event_in_chunk(eptr, cptr) \
+	((((char *) eptr) + SizeofTriggerEvent(eptr)) >= (cptr)->freeptr)
 /* Use this if no special per-chunk processing is needed */
 #define for_each_event_chunk(eptr, cptr, evtlist) \
 	for_each_chunk(cptr, evtlist) for_each_event(eptr, cptr)
@@ -3488,9 +3494,17 @@ static void AfterTriggerExecute(EState *estate,
 								TriggerDesc *trigdesc,
 								FmgrInfo *finfo,
 								Instrumentation *instr,
+								TriggerData *trig_last,
 								MemoryContext per_tuple_context,
+								MemoryContext batch_context,
 								TupleTableSlot *trig_tuple_slot1,
 								TupleTableSlot *trig_tuple_slot2);
+static void AfterTriggerExecuteRI(EState *estate,
+								  ResultRelInfo *relInfo,
+								  FmgrInfo *finfo,
+								  Instrumentation *instr,
+								  TriggerData *trig_last,
+								  MemoryContext batch_context);
 static AfterTriggersTableData *GetAfterTriggersTableData(Oid relid,
 														 CmdType cmdType);
 static void AfterTriggerFreeQuery(AfterTriggersQueryData *qs);
@@ -3807,13 +3821,16 @@ afterTriggerDeleteHeadEventChunk(AfterTriggersQueryData *qs)
  *	fmgr lookup cache space at the caller level.  (For triggers fired at
  *	the end of a query, we can even piggyback on the executor's state.)
  *
- *	event: event currently being fired.
+ *	event: event currently being fired. Pass NULL if the current batch of RI
+ *		trigger events should be processed.
  *	rel: open relation for event.
  *	trigdesc: working copy of rel's trigger info.
  *	finfo: array of fmgr lookup cache entries (one per trigger in trigdesc).
  *	instr: array of EXPLAIN ANALYZE instrumentation nodes (one per trigger),
  *		or NULL if no instrumentation is wanted.
+ *	trig_last: trigger info used for the last trigger execution.
  *	per_tuple_context: memory context to call trigger function in.
+ *	batch_context: memory context to store tuples for RI triggers.
  *	trig_tuple_slot1: scratch slot for tg_trigtuple (foreign tables only)
  *	trig_tuple_slot2: scratch slot for tg_newtuple (foreign tables only)
  * ----------
@@ -3824,39 +3841,55 @@ AfterTriggerExecute(EState *estate,
 					ResultRelInfo *relInfo,
 					TriggerDesc *trigdesc,
 					FmgrInfo *finfo, Instrumentation *instr,
+					TriggerData *trig_last,
 					MemoryContext per_tuple_context,
+					MemoryContext batch_context,
 					TupleTableSlot *trig_tuple_slot1,
 					TupleTableSlot *trig_tuple_slot2)
 {
 	Relation	rel = relInfo->ri_RelationDesc;
 	AfterTriggerShared evtshared = GetTriggerSharedData(event);
 	Oid			tgoid = evtshared->ats_tgoid;
-	TriggerData LocTriggerData = {0};
 	HeapTuple	rettuple;
-	int			tgindx;
 	bool		should_free_trig = false;
 	bool		should_free_new = false;
+	bool		is_new = false;
 
-	/*
-	 * Locate trigger in trigdesc.
-	 */
-	for (tgindx = 0; tgindx < trigdesc->numtriggers; tgindx++)
+	if (trig_last->tg_trigger == NULL)
 	{
-		if (trigdesc->triggers[tgindx].tgoid == tgoid)
+		int			tgindx;
+
+		/*
+		 * Locate trigger in trigdesc.
+		 */
+		for (tgindx = 0; tgindx < trigdesc->numtriggers; tgindx++)
 		{
-			LocTriggerData.tg_trigger = &(trigdesc->triggers[tgindx]);
-			break;
+			if (trigdesc->triggers[tgindx].tgoid == tgoid)
+			{
+				trig_last->tg_trigger = &(trigdesc->triggers[tgindx]);
+				trig_last->tgindx = tgindx;
+				break;
+			}
 		}
+		if (trig_last->tg_trigger == NULL)
+			elog(ERROR, "could not find trigger %u", tgoid);
+
+		if (RI_FKey_trigger_type(trig_last->tg_trigger->tgfoid) !=
+			RI_TRIGGER_NONE)
+			trig_last->is_ri_trigger = true;
+
+		is_new = true;
 	}
-	if (LocTriggerData.tg_trigger == NULL)
-		elog(ERROR, "could not find trigger %u", tgoid);
+
+	/* trig_last for non-RI trigger should always be initialized again. */
+	Assert(trig_last->is_ri_trigger || is_new);
 
 	/*
 	 * If doing EXPLAIN ANALYZE, start charging time to this trigger. We want
 	 * to include time spent re-fetching tuples in the trigger cost.
 	 */
-	if (instr)
-		InstrStartNode(instr + tgindx);
+	if (instr && !trig_last->is_ri_trigger)
+		InstrStartNode(instr + trig_last->tgindx);
 
 	/*
 	 * Fetch the required tuple(s).
@@ -3864,6 +3897,9 @@ AfterTriggerExecute(EState *estate,
 	switch (event->ate_flags & AFTER_TRIGGER_TUP_BITS)
 	{
 		case AFTER_TRIGGER_FDW_FETCH:
+			/* Foreign keys are not supported on foreign tables. */
+			Assert(!trig_last->is_ri_trigger);
+
 			{
 				Tuplestorestate *fdw_tuplestore = GetCurrentFDWTuplestore();
 
@@ -3879,6 +3915,8 @@ AfterTriggerExecute(EState *estate,
 			}
 			/* fall through */
 		case AFTER_TRIGGER_FDW_REUSE:
+			/* Foreign keys are not supported on foreign tables. */
+			Assert(!trig_last->is_ri_trigger);
 
 			/*
 			 * Store tuple in the slot so that tg_trigtuple does not reference
@@ -3889,38 +3927,56 @@ AfterTriggerExecute(EState *estate,
 			 * that is stored as a heap tuple, constructed in different memory
 			 * context, in the slot anyway.
 			 */
-			LocTriggerData.tg_trigslot = trig_tuple_slot1;
-			LocTriggerData.tg_trigtuple =
+			trig_last->tg_trigslot = trig_tuple_slot1;
+			trig_last->tg_trigtuple =
 				ExecFetchSlotHeapTuple(trig_tuple_slot1, true, &should_free_trig);
 
 			if ((evtshared->ats_event & TRIGGER_EVENT_OPMASK) ==
 				TRIGGER_EVENT_UPDATE)
 			{
-				LocTriggerData.tg_newslot = trig_tuple_slot2;
-				LocTriggerData.tg_newtuple =
+				trig_last->tg_newslot = trig_tuple_slot2;
+				trig_last->tg_newtuple =
 					ExecFetchSlotHeapTuple(trig_tuple_slot2, true, &should_free_new);
 			}
 			else
 			{
-				LocTriggerData.tg_newtuple = NULL;
+				trig_last->tg_newtuple = NULL;
 			}
 			break;
 
 		default:
 			if (ItemPointerIsValid(&(event->ate_ctid1)))
 			{
-				LocTriggerData.tg_trigslot = ExecGetTriggerOldSlot(estate, relInfo);
+				if (!trig_last->is_ri_trigger)
+				{
+					trig_last->tg_trigslot = ExecGetTriggerOldSlot(estate,
+																   relInfo);
 
-				if (!table_tuple_fetch_row_version(rel, &(event->ate_ctid1),
-												   SnapshotAny,
-												   LocTriggerData.tg_trigslot))
-					elog(ERROR, "failed to fetch tuple1 for AFTER trigger");
-				LocTriggerData.tg_trigtuple =
-					ExecFetchSlotHeapTuple(LocTriggerData.tg_trigslot, false, &should_free_trig);
+					if (!table_tuple_fetch_row_version(rel, &(event->ate_ctid1),
+													   SnapshotAny,
+													   trig_last->tg_trigslot))
+						elog(ERROR, "failed to fetch tuple1 for AFTER trigger");
+
+					trig_last->tg_trigtuple =
+						ExecFetchSlotHeapTuple(trig_last->tg_trigslot, false,
+											   &should_free_trig);
+				}
+				else
+				{
+					if (trig_last->ri_tids_old == NULL)
+					{
+						MemoryContext oldcxt;
+
+						oldcxt = MemoryContextSwitchTo(batch_context);
+						trig_last->ri_tids_old = alloc_tid_array();
+						MemoryContextSwitchTo(oldcxt);
+					}
+					add_tid(trig_last->ri_tids_old, &(event->ate_ctid1));
+				}
 			}
 			else
 			{
-				LocTriggerData.tg_trigtuple = NULL;
+				trig_last->tg_trigtuple = NULL;
 			}
 
 			/* don't touch ctid2 if not there */
@@ -3928,18 +3984,36 @@ AfterTriggerExecute(EState *estate,
 				AFTER_TRIGGER_2CTID &&
 				ItemPointerIsValid(&(event->ate_ctid2)))
 			{
-				LocTriggerData.tg_newslot = ExecGetTriggerNewSlot(estate, relInfo);
+				if (!trig_last->is_ri_trigger)
+				{
+					trig_last->tg_newslot = ExecGetTriggerNewSlot(estate,
+																  relInfo);
 
-				if (!table_tuple_fetch_row_version(rel, &(event->ate_ctid2),
-												   SnapshotAny,
-												   LocTriggerData.tg_newslot))
-					elog(ERROR, "failed to fetch tuple2 for AFTER trigger");
-				LocTriggerData.tg_newtuple =
-					ExecFetchSlotHeapTuple(LocTriggerData.tg_newslot, false, &should_free_new);
+					if (!table_tuple_fetch_row_version(rel, &(event->ate_ctid2),
+													   SnapshotAny,
+													   trig_last->tg_newslot))
+						elog(ERROR, "failed to fetch tuple2 for AFTER trigger");
+
+					trig_last->tg_newtuple =
+						ExecFetchSlotHeapTuple(trig_last->tg_newslot, false,
+											   &should_free_new);
+				}
+				else
+				{
+					if (trig_last->ri_tids_new == NULL)
+					{
+						MemoryContext oldcxt;
+
+						oldcxt = MemoryContextSwitchTo(batch_context);
+						trig_last->ri_tids_new = alloc_tid_array();
+						MemoryContextSwitchTo(oldcxt);
+					}
+					add_tid(trig_last->ri_tids_new, &(event->ate_ctid2));
+				}
 			}
 			else
 			{
-				LocTriggerData.tg_newtuple = NULL;
+				trig_last->tg_newtuple = NULL;
 			}
 	}
 
@@ -3949,19 +4023,26 @@ AfterTriggerExecute(EState *estate,
 	 * a trigger, mark it "closed" so that it cannot change anymore.  If any
 	 * additional events of the same type get queued in the current trigger
 	 * query level, they'll go into new transition tables.
+	 *
+	 * RI triggers treat the tuplestores specially, see above.
 	 */
-	LocTriggerData.tg_oldtable = LocTriggerData.tg_newtable = NULL;
+	if (!trig_last->is_ri_trigger)
+		trig_last->tg_oldtable = trig_last->tg_newtable = NULL;
+
 	if (evtshared->ats_table)
 	{
-		if (LocTriggerData.tg_trigger->tgoldtable)
+		/* There shouldn't be any transition table for an RI trigger event. */
+		Assert(!trig_last->is_ri_trigger);
+
+		if (trig_last->tg_trigger->tgoldtable)
 		{
-			LocTriggerData.tg_oldtable = evtshared->ats_table->old_tuplestore;
+			trig_last->tg_oldtable = evtshared->ats_table->old_tuplestore;
 			evtshared->ats_table->closed = true;
 		}
 
-		if (LocTriggerData.tg_trigger->tgnewtable)
+		if (trig_last->tg_trigger->tgnewtable)
 		{
-			LocTriggerData.tg_newtable = evtshared->ats_table->new_tuplestore;
+			trig_last->tg_newtable = evtshared->ats_table->new_tuplestore;
 			evtshared->ats_table->closed = true;
 		}
 	}
@@ -3969,54 +4050,139 @@ AfterTriggerExecute(EState *estate,
 	/*
 	 * Setup the remaining trigger information
 	 */
-	LocTriggerData.type = T_TriggerData;
-	LocTriggerData.tg_event =
-		evtshared->ats_event & (TRIGGER_EVENT_OPMASK | TRIGGER_EVENT_ROW);
-	LocTriggerData.tg_relation = rel;
-	if (TRIGGER_FOR_UPDATE(LocTriggerData.tg_trigger->tgtype))
-		LocTriggerData.tg_updatedcols = evtshared->ats_modifiedcols;
-
-	MemoryContextReset(per_tuple_context);
+	if (is_new)
+	{
+		trig_last->type = T_TriggerData;
+		trig_last->tg_event =
+			evtshared->ats_event & (TRIGGER_EVENT_OPMASK | TRIGGER_EVENT_ROW);
+		trig_last->tg_relation = rel;
+		if (TRIGGER_FOR_UPDATE(trig_last->tg_trigger->tgtype))
+			trig_last->tg_updatedcols = evtshared->ats_modifiedcols;
+	}
 
 	/*
-	 * Call the trigger and throw away any possibly returned updated tuple.
-	 * (Don't let ExecCallTriggerFunc measure EXPLAIN time.)
+	 * RI triggers are executed in batches, see the top of the function.
 	 */
-	rettuple = ExecCallTriggerFunc(&LocTriggerData,
-								   tgindx,
-								   finfo,
-								   NULL,
-								   per_tuple_context);
-	if (rettuple != NULL &&
-		rettuple != LocTriggerData.tg_trigtuple &&
-		rettuple != LocTriggerData.tg_newtuple)
-		heap_freetuple(rettuple);
+	if (!trig_last->is_ri_trigger)
+	{
+		MemoryContextReset(per_tuple_context);
+
+		/*
+		 * Call the trigger and throw away any possibly returned updated
+		 * tuple. (Don't let ExecCallTriggerFunc measure EXPLAIN time.)
+		 */
+		rettuple = ExecCallTriggerFunc(trig_last,
+									   trig_last->tgindx,
+									   finfo,
+									   NULL,
+									   per_tuple_context);
+		if (rettuple != NULL &&
+			rettuple != trig_last->tg_trigtuple &&
+			rettuple != trig_last->tg_newtuple)
+			heap_freetuple(rettuple);
+	}
 
 	/*
 	 * Release resources
 	 */
 	if (should_free_trig)
-		heap_freetuple(LocTriggerData.tg_trigtuple);
+		heap_freetuple(trig_last->tg_trigtuple);
 	if (should_free_new)
-		heap_freetuple(LocTriggerData.tg_newtuple);
+		heap_freetuple(trig_last->tg_newtuple);
 
-	/* don't clear slots' contents if foreign table */
-	if (trig_tuple_slot1 == NULL)
+	/*
+	 * Don't clear slots' contents if foreign table.
+	 *
+	 * For for RI trigger we manage these slots separately, see
+	 * AfterTriggerExecuteRI().
+	 */
+	if (trig_tuple_slot1 == NULL && !trig_last->is_ri_trigger)
 	{
-		if (LocTriggerData.tg_trigslot)
-			ExecClearTuple(LocTriggerData.tg_trigslot);
-		if (LocTriggerData.tg_newslot)
-			ExecClearTuple(LocTriggerData.tg_newslot);
+		if (trig_last->tg_trigslot)
+			ExecClearTuple(trig_last->tg_trigslot);
+		if (trig_last->tg_newslot)
+			ExecClearTuple(trig_last->tg_newslot);
 	}
 
 	/*
 	 * If doing EXPLAIN ANALYZE, stop charging time to this trigger, and count
 	 * one "tuple returned" (really the number of firings).
 	 */
-	if (instr)
-		InstrStopNode(instr + tgindx, 1);
+	if (instr && !trig_last->is_ri_trigger)
+		InstrStopNode(instr + trig_last->tgindx, 1);
+
+	/* RI triggers use trig_last across calls. */
+	if (!trig_last->is_ri_trigger)
+		memset(trig_last, 0, sizeof(TriggerData));
 }
 
+/*
+ * AfterTriggerExecuteRI()
+ *
+ * Execute an RI trigger. It's assumed that AfterTriggerExecute() recognized
+ * RI trigger events and only added them to the batch instead of executing
+ * them. The actual processing of the batch is done by this function.
+ */
+static void
+AfterTriggerExecuteRI(EState *estate,
+					  ResultRelInfo *relInfo,
+					  FmgrInfo *finfo,
+					  Instrumentation *instr,
+					  TriggerData *trig_last,
+					  MemoryContext batch_context)
+{
+	HeapTuple	rettuple;
+
+	/*
+	 * AfterTriggerExecute() must have been called for this trigger already.
+	 */
+	Assert(trig_last->tg_trigger);
+	Assert(trig_last->is_ri_trigger);
+
+	/*
+	 * RI trigger constructs a local tuplestore when it needs it. The point is
+	 * that it might need to check visibility first. If we put the tuples into
+	 * a tuplestore now, it'd be hard to keep pins of the containing buffers,
+	 * and so table_tuple_satisfies_snapshot check wouldn't work.
+	 */
+	Assert(trig_last->tg_oldtable == NULL);
+	Assert(trig_last->tg_newtable == NULL);
+
+	/* Initialize the slots to retrieve the rows by TID. */
+	trig_last->tg_trigslot = ExecGetTriggerOldSlot(estate, relInfo);
+	trig_last->tg_newslot = ExecGetTriggerNewSlot(estate, relInfo);
+
+	if (instr)
+		InstrStartNode(instr + trig_last->tgindx);
+
+	/*
+	 * Call the trigger and throw away any possibly returned updated tuple.
+	 * (Don't let ExecCallTriggerFunc measure EXPLAIN time.)
+	 *
+	 * batch_context already contains the TIDs of the affected rows. The RI
+	 * trigger should also use this context to create the tuplestore for them.
+	 */
+	rettuple = ExecCallTriggerFunc(trig_last,
+								   trig_last->tgindx,
+								   finfo,
+								   NULL,
+								   batch_context);
+	if (rettuple != NULL &&
+		rettuple != trig_last->tg_trigtuple &&
+		rettuple != trig_last->tg_newtuple)
+		heap_freetuple(rettuple);
+
+	if (instr)
+		InstrStopNode(instr + trig_last->tgindx, 1);
+
+	ExecClearTuple(trig_last->tg_trigslot);
+	ExecClearTuple(trig_last->tg_newslot);
+
+	MemoryContextReset(batch_context);
+
+	memset(trig_last, 0, sizeof(TriggerData));
+	return;
+}
 
 /*
  * afterTriggerMarkEvents()
@@ -4112,7 +4278,8 @@ afterTriggerInvokeEvents(AfterTriggerEventList *events,
 {
 	bool		all_fired = true;
 	AfterTriggerEventChunk *chunk;
-	MemoryContext per_tuple_context;
+	MemoryContext per_tuple_context,
+				batch_context;
 	bool		local_estate = false;
 	ResultRelInfo *rInfo = NULL;
 	Relation	rel = NULL;
@@ -4121,6 +4288,7 @@ afterTriggerInvokeEvents(AfterTriggerEventList *events,
 	Instrumentation *instr = NULL;
 	TupleTableSlot *slot1 = NULL,
 			   *slot2 = NULL;
+	TriggerData trig_last;
 
 	/* Make a local EState if need be */
 	if (estate == NULL)
@@ -4134,6 +4302,14 @@ afterTriggerInvokeEvents(AfterTriggerEventList *events,
 		AllocSetContextCreate(CurrentMemoryContext,
 							  "AfterTriggerTupleContext",
 							  ALLOCSET_DEFAULT_SIZES);
+	/* Separate context for a batch of RI trigger events. */
+	batch_context =
+		AllocSetContextCreate(CurrentMemoryContext,
+							  "AfterTriggerBatchContext",
+							  ALLOCSET_DEFAULT_SIZES);
+
+	/* No trigger executed yet in this batch. */
+	memset(&trig_last, 0, sizeof(TriggerData));
 
 	for_each_chunk(chunk, *events)
 	{
@@ -4150,6 +4326,8 @@ afterTriggerInvokeEvents(AfterTriggerEventList *events,
 			if ((event->ate_flags & AFTER_TRIGGER_IN_PROGRESS) &&
 				evtshared->ats_firing_id == firing_id)
 			{
+				bool		fire_ri_batch = false;
+
 				/*
 				 * So let's fire it... but first, find the correct relation if
 				 * this is not the same relation as before.
@@ -4180,12 +4358,60 @@ afterTriggerInvokeEvents(AfterTriggerEventList *events,
 				}
 
 				/*
-				 * Fire it.  Note that the AFTER_TRIGGER_IN_PROGRESS flag is
-				 * still set, so recursive examinations of the event list
-				 * won't try to re-fire it.
+				 * Fire it (or add the corresponding tuple(s) to the current
+				 * batch if it's RI trigger).
+				 *
+				 * Note that the AFTER_TRIGGER_IN_PROGRESS flag is still set,
+				 * so recursive examinations of the event list won't try to
+				 * re-fire it.
 				 */
-				AfterTriggerExecute(estate, event, rInfo, trigdesc, finfo, instr,
-									per_tuple_context, slot1, slot2);
+				AfterTriggerExecute(estate, event, rInfo, trigdesc, finfo,
+									instr, &trig_last,
+									per_tuple_context, batch_context,
+									slot1, slot2);
+
+				/*
+				 * RI trigger events are processed in batches, so extra work
+				 * might be needed to finish the current batch. It's important
+				 * to do this before the chunk iteration ends because the
+				 * trigger execution may generate other events.
+				 *
+				 * XXX Implement maximum batch size so that constraint
+				 * violations are reported as soon as possible?
+				 */
+				if (trig_last.tg_trigger && trig_last.is_ri_trigger)
+				{
+					if (is_last_event_in_chunk(event, chunk))
+						fire_ri_batch = true;
+					else
+					{
+						AfterTriggerEvent evtnext;
+						AfterTriggerShared evtshnext;
+
+						/*
+						 * We even need to look ahead because the next event
+						 * might be affected by execution of the current one.
+						 * For example if the next event is an AS trigger
+						 * event to be cancelled (cancel_prior_stmt_triggers)
+						 * because the current event, during its execution,
+						 * generates a new AS event for the same trigger.
+						 */
+						evtnext = next_event_in_chunk(event, chunk);
+						evtshnext = GetTriggerSharedData(evtnext);
+
+						if (evtshnext != evtshared)
+							fire_ri_batch = true;
+					}
+				}
+
+				if (fire_ri_batch)
+					AfterTriggerExecuteRI(estate,
+										  rInfo,
+										  finfo,
+										  instr,
+										  &trig_last,
+										  batch_context);
+
 
 				/*
 				 * Mark the event as done.
@@ -4216,6 +4442,7 @@ afterTriggerInvokeEvents(AfterTriggerEventList *events,
 				events->tailfree = chunk->freeptr;
 		}
 	}
+
 	if (slot1 != NULL)
 	{
 		ExecDropSingleTupleTableSlot(slot1);
@@ -4224,6 +4451,7 @@ afterTriggerInvokeEvents(AfterTriggerEventList *events,
 
 	/* Release working resources */
 	MemoryContextDelete(per_tuple_context);
+	MemoryContextDelete(batch_context);
 
 	if (local_estate)
 	{
@@ -5812,3 +6040,29 @@ pg_trigger_depth(PG_FUNCTION_ARGS)
 {
 	PG_RETURN_INT32(MyTriggerDepth);
 }
+
+static TIDArray *
+alloc_tid_array(void)
+{
+	TIDArray   *result = (TIDArray *) palloc(sizeof(TIDArray));
+
+	/* XXX Tune the chunk size. */
+	result->nmax = 1024;
+	result->tids = (ItemPointer) palloc(result->nmax *
+										sizeof(ItemPointerData));
+	result->n = 0;
+	return result;
+}
+
+static void
+add_tid(TIDArray *ta, ItemPointer item)
+{
+	if (ta->n == ta->nmax)
+	{
+		ta->nmax += 1024;
+		ta->tids = (ItemPointer) repalloc(ta->tids,
+										  ta->nmax * sizeof(ItemPointerData));
+	}
+	memcpy(ta->tids + ta->n, item, sizeof(ItemPointerData));
+	ta->n++;
+}
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index b108168821..37026219b6 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -2929,12 +2929,14 @@ SPI_register_trigger_data(TriggerData *tdata)
 	if (tdata->tg_newtable)
 	{
 		EphemeralNamedRelation enr =
-		palloc(sizeof(EphemeralNamedRelationData));
+		palloc0(sizeof(EphemeralNamedRelationData));
 		int			rc;
 
 		enr->md.name = tdata->tg_trigger->tgnewtable;
-		enr->md.reliddesc = tdata->tg_relation->rd_id;
-		enr->md.tupdesc = NULL;
+		if (tdata->desc)
+			enr->md.tupdesc = tdata->desc;
+		else
+			enr->md.reliddesc = tdata->tg_relation->rd_id;
 		enr->md.enrtype = ENR_NAMED_TUPLESTORE;
 		enr->md.enrtuples = tuplestore_tuple_count(tdata->tg_newtable);
 		enr->reldata = tdata->tg_newtable;
@@ -2946,12 +2948,14 @@ SPI_register_trigger_data(TriggerData *tdata)
 	if (tdata->tg_oldtable)
 	{
 		EphemeralNamedRelation enr =
-		palloc(sizeof(EphemeralNamedRelationData));
+		palloc0(sizeof(EphemeralNamedRelationData));
 		int			rc;
 
 		enr->md.name = tdata->tg_trigger->tgoldtable;
-		enr->md.reliddesc = tdata->tg_relation->rd_id;
-		enr->md.tupdesc = NULL;
+		if (tdata->desc)
+			enr->md.tupdesc = tdata->desc;
+		else
+			enr->md.reliddesc = tdata->tg_relation->rd_id;
 		enr->md.enrtype = ENR_NAMED_TUPLESTORE;
 		enr->md.enrtuples = tuplestore_tuple_count(tdata->tg_oldtable);
 		enr->reldata = tdata->tg_oldtable;
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 647b102be1..44d1e12a81 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -114,6 +114,10 @@ typedef struct RI_ConstraintInfo
 	Oid			pf_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (PK = FK) */
 	Oid			pp_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (PK = PK) */
 	Oid			ff_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (FK = FK) */
+	TupleTableSlot *slot_pk;	/* slot for PK attributes */
+	TupleTableSlot *slot_fk;	/* slot for FK attributes */
+	TupleTableSlot *slot_both;	/* Both OLD an NEW version of PK table row. */
+	MemoryContext slot_mcxt;	/* the slots will exist in this context  */
 	dlist_node	valid_link;		/* Link in list of valid entries */
 } RI_ConstraintInfo;
 
@@ -173,11 +177,29 @@ static int	ri_constraint_cache_valid_count = 0;
 /*
  * Local function prototypes
  */
+static char *RI_FKey_check_query_single_row(const RI_ConstraintInfo *riinfo,
+											Relation fk_rel, Relation pk_rel,
+											Oid *paramtypes);
+static bool RI_FKey_check_query_required(Trigger *trigger, Relation fk_rel,
+										 TupleTableSlot *newslot);
 static bool ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 							  TupleTableSlot *oldslot,
 							  const RI_ConstraintInfo *riinfo);
-static Datum ri_restrict(TriggerData *trigdata, bool is_no_action);
+static Datum ri_restrict(TriggerData *trigdata, bool is_no_action,
+						 TupleTableSlot *oldslot);
+static char *ri_restrict_query_single_row(const RI_ConstraintInfo *riinfo,
+										  Relation fk_rel,
+										  Relation pk_rel, Oid *paramtypes);
+static char *ri_cascade_del_query_single_row(const RI_ConstraintInfo *riinfo,
+											 Relation fk_rel, Relation pk_rel,
+											 Oid *paramtypes);
+static char *ri_cascade_upd_query_single_row(const RI_ConstraintInfo *riinfo,
+											 Relation fk_rel, Relation pk_rel,
+											 Oid *paramtypes);
 static Datum ri_set(TriggerData *trigdata, bool is_set_null);
+static char *ri_set_query_single_row(const RI_ConstraintInfo *riinfo,
+									 Relation fk_rel, Relation pk_rel,
+									 Oid *paramtypes, bool is_set_null);
 static void quoteOneName(char *buffer, const char *name);
 static void quoteRelationName(char *buffer, Relation rel);
 static char *ri_ColNameQuoted(const char *tabname, const char *attname);
@@ -200,6 +222,7 @@ static void ri_GenerateQual(StringInfo buf, char *sep, int nkeys,
 							const char *rtabname, Relation rrel,
 							const int16 *rattnums, const Oid *eq_oprs,
 							GenQualParams params, Oid *paramtypes);
+
 static void ri_GenerateQualComponent(StringInfo buf,
 									 const char *sep,
 									 const char *leftop, Oid leftoptype,
@@ -207,7 +230,8 @@ static void ri_GenerateQualComponent(StringInfo buf,
 									 const char *rightop, Oid rightoptype);
 static void ri_GenerateQualCollation(StringInfo buf, Oid collation);
 static int	ri_NullCheck(TupleDesc tupdesc, TupleTableSlot *slot,
-						 const RI_ConstraintInfo *riinfo, bool rel_is_pk);
+						 const RI_ConstraintInfo *riinfo, bool rel_is_pk,
+						 bool ignore_attnums);
 static void ri_BuildQueryKey(RI_QueryKey *key,
 							 const RI_ConstraintInfo *riinfo,
 							 int32 constr_queryno);
@@ -226,22 +250,36 @@ static void ri_CheckTrigger(FunctionCallInfo fcinfo, const char *funcname,
 							int tgkind);
 static const RI_ConstraintInfo *ri_FetchConstraintInfo(Trigger *trigger,
 													   Relation trig_rel, bool rel_is_pk);
-static const RI_ConstraintInfo *ri_LoadConstraintInfo(Oid constraintOid);
+static const RI_ConstraintInfo *ri_LoadConstraintInfo(Oid constraintOid,
+													  Relation trig_rel,
+													  bool rel_is_pk);
 static SPIPlanPtr ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
-							   RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel);
+							   RI_QueryKey *qkey,
+							   Relation fk_rel, Relation pk_rel);
 static bool ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 							RI_QueryKey *qkey, SPIPlanPtr qplan,
 							Relation fk_rel, Relation pk_rel,
-							TupleTableSlot *oldslot, TupleTableSlot *newslot,
+							TupleTableSlot *oldslot,
 							bool detectNewRows, int expect_OK);
-static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
-							 const RI_ConstraintInfo *riinfo, bool rel_is_pk,
+static void ri_ExtractValues(TupleTableSlot *slot, int first, int nkeys,
 							 Datum *vals, char *nulls);
 static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 							   Relation pk_rel, Relation fk_rel,
 							   TupleTableSlot *violatorslot, TupleDesc tupdesc,
 							   int queryno, bool partgone) pg_attribute_noreturn();
-
+static Tuplestorestate *get_event_tuplestore(TriggerData *trigdata, int nkeys,
+											 const int16 *attnums, bool old,
+											 TupleDesc tupdesc, Snapshot snapshot);
+static Tuplestorestate *get_event_tuplestore_for_cascade_update(TriggerData *trigdata,
+																const RI_ConstraintInfo *riinfo);
+static void add_key_attrs_to_tupdesc(TupleDesc tupdesc, Relation rel,
+									 const RI_ConstraintInfo *riinfo, int16 *attnums,
+									 int first, bool generate_attnames);
+static void add_key_values(TupleTableSlot *slot,
+						   const RI_ConstraintInfo *riinfo,
+						   Relation rel, ItemPointer ip,
+						   Datum *key_values, bool *key_nulls,
+						   Datum *values, bool *nulls, int first);
 
 /*
  * RI_FKey_check -
@@ -254,29 +292,17 @@ RI_FKey_check(TriggerData *trigdata)
 	const RI_ConstraintInfo *riinfo;
 	Relation	fk_rel;
 	Relation	pk_rel;
-	TupleTableSlot *newslot;
+	bool		is_insert;
 	RI_QueryKey qkey;
 	SPIPlanPtr	qplan;
+	Tuplestorestate *oldtable = NULL;
+	Tuplestorestate *newtable = NULL;
+	Tuplestorestate *table;
+	TupleTableSlot *slot = NULL;
 
 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
 									trigdata->tg_relation, false);
 
-	if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
-		newslot = trigdata->tg_newslot;
-	else
-		newslot = trigdata->tg_trigslot;
-
-	/*
-	 * We should not even consider checking the row if it is no longer valid,
-	 * since it was either deleted (so the deferred check should be skipped)
-	 * or updated (in which case only the latest version of the row should be
-	 * checked).  Test its liveness according to SnapshotSelf.  We need pin
-	 * and lock on the buffer to call HeapTupleSatisfiesVisibility.  Caller
-	 * should be holding pin, but not lock.
-	 */
-	if (!table_tuple_satisfies_snapshot(trigdata->tg_relation, newslot, SnapshotSelf))
-		return PointerGetDatum(NULL);
-
 	/*
 	 * Get the relation descriptors of the FK and PK tables.
 	 *
@@ -286,7 +312,142 @@ RI_FKey_check(TriggerData *trigdata)
 	fk_rel = trigdata->tg_relation;
 	pk_rel = table_open(riinfo->pk_relid, RowShareLock);
 
-	switch (ri_NullCheck(RelationGetDescr(fk_rel), newslot, riinfo, false))
+	if (SPI_connect() != SPI_OK_CONNECT)
+		elog(ERROR, "SPI_connect failed");
+
+	/* Fetch or prepare a saved plan for the real check */
+	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK);
+	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
+	{
+		char	   *query;
+		Oid			paramtypes[RI_MAX_NUMKEYS];
+
+		query = RI_FKey_check_query_single_row(riinfo, fk_rel, pk_rel,
+											   paramtypes);
+
+		/* Prepare and save the plan */
+		qplan = ri_PlanCheck(query, riinfo->nkeys, paramtypes, &qkey, fk_rel,
+							 pk_rel);
+	}
+
+	/*
+	 * Retrieve the changed rows and put them into the appropriate tuplestore.
+	 */
+	is_insert = TRIGGER_FIRED_BY_INSERT(trigdata->tg_event);
+	if (is_insert)
+	{
+		if (trigdata->ri_tids_old)
+			oldtable = get_event_tuplestore(trigdata,
+											riinfo->nkeys,
+											riinfo->fk_attnums,
+											true,
+											riinfo->slot_fk->tts_tupleDescriptor,
+											SnapshotSelf);
+		else
+		{
+			/* The table is passed by caller if not called from trigger.c */
+			oldtable = trigdata->tg_oldtable;
+		}
+		table = oldtable;
+	}
+	else
+	{
+		Assert((TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)));
+
+		if (trigdata->ri_tids_new)
+			newtable = get_event_tuplestore(trigdata,
+											riinfo->nkeys,
+											riinfo->fk_attnums,
+											false,
+											riinfo->slot_fk->tts_tupleDescriptor,
+											SnapshotSelf);
+		else
+		{
+			/* The table is passed by caller if not called from trigger.c */
+			newtable = trigdata->tg_newtable;
+		}
+		table = newtable;
+	}
+
+	/*
+	 * Retrieve and check the inserted / updated rows, one after another.
+	 */
+	slot = riinfo->slot_fk;
+	while (tuplestore_gettupleslot(table, true, false, slot))
+	{
+		if (!ri_PerformCheck(riinfo, &qkey, qplan,
+							 fk_rel, pk_rel,
+							 slot,
+							 false,
+							 SPI_OK_SELECT))
+			ri_ReportViolation(riinfo,
+							   pk_rel, fk_rel,
+							   slot,
+							   NULL,
+							   qkey.constr_queryno, false);
+	}
+
+	if (SPI_finish() != SPI_OK_FINISH)
+		elog(ERROR, "SPI_finish failed");
+
+	table_close(pk_rel, RowShareLock);
+
+	return PointerGetDatum(NULL);
+}
+
+/* ----------
+ * Like RI_FKey_check_query(), but check a single row.
+ *
+ * The query string built is
+ *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
+ *		   FOR KEY SHARE OF x
+ * The type id's for the $ parameters are those of the
+ * corresponding FK attributes.
+ *
+ * The query is quite a bit simpler than the one for bulk processing, and so
+ * it should execute faster.
+ *
+ * "paramtypes" will receive types of the query parameters (FK attributes).
+ * ----------
+ */
+static char *
+RI_FKey_check_query_single_row(const RI_ConstraintInfo *riinfo,
+							   Relation fk_rel, Relation pk_rel,
+							   Oid *paramtypes)
+{
+	StringInfo	querybuf = makeStringInfo();
+	const char *pk_only;
+	char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
+
+	pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+		"" : "ONLY ";
+	quoteRelationName(pkrelname, pk_rel);
+	appendStringInfo(querybuf, "SELECT 1 FROM %s%s p WHERE ",
+					 pk_only, pkrelname);
+	ri_GenerateQual(querybuf, "AND", riinfo->nkeys,
+					NULL, pk_rel, riinfo->pk_attnums,
+					NULL, fk_rel, riinfo->fk_attnums,
+					riinfo->pf_eq_oprs,
+					GQ_PARAMS_RIGHT, paramtypes);
+	appendStringInfoString(querybuf, " FOR KEY SHARE OF p");
+
+	return querybuf->data;
+}
+
+/*
+ * Check if the PK table needs to be queried (using the query generated by
+ * RI_FKey_check_query).
+ */
+static bool
+RI_FKey_check_query_required(Trigger *trigger, Relation fk_rel,
+							 TupleTableSlot *newslot)
+{
+	const RI_ConstraintInfo *riinfo;
+
+	riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
+
+	switch (ri_NullCheck(RelationGetDescr(fk_rel), newslot, riinfo, false,
+						 false))
 	{
 		case RI_KEYS_ALL_NULL:
 
@@ -294,8 +455,7 @@ RI_FKey_check(TriggerData *trigdata)
 			 * No further check needed - an all-NULL key passes every type of
 			 * foreign key constraint.
 			 */
-			table_close(pk_rel, RowShareLock);
-			return PointerGetDatum(NULL);
+			return false;
 
 		case RI_KEYS_SOME_NULL:
 
@@ -319,8 +479,7 @@ RI_FKey_check(TriggerData *trigdata)
 							 errdetail("MATCH FULL does not allow mixing of null and nonnull key values."),
 							 errtableconstraint(fk_rel,
 												NameStr(riinfo->conname))));
-					table_close(pk_rel, RowShareLock);
-					return PointerGetDatum(NULL);
+					break;
 
 				case FKCONSTR_MATCH_SIMPLE:
 
@@ -328,17 +487,16 @@ RI_FKey_check(TriggerData *trigdata)
 					 * MATCH SIMPLE - if ANY column is null, the key passes
 					 * the constraint.
 					 */
-					table_close(pk_rel, RowShareLock);
-					return PointerGetDatum(NULL);
+					return false;
 
 #ifdef NOT_USED
 				case FKCONSTR_MATCH_PARTIAL:
 
 					/*
 					 * MATCH PARTIAL - all non-null columns must match. (not
-					 * implemented, can be done by modifying the query below
-					 * to only include non-null columns, or by writing a
-					 * special version here)
+					 * implemented, can be done by modifying the query to only
+					 * include non-null columns, or by writing a special
+					 * version)
 					 */
 					break;
 #endif
@@ -347,71 +505,12 @@ RI_FKey_check(TriggerData *trigdata)
 		case RI_KEYS_NONE_NULL:
 
 			/*
-			 * Have a full qualified key - continue below for all three kinds
-			 * of MATCH.
+			 * Have a full qualified key - regular check is needed.
 			 */
 			break;
 	}
 
-	if (SPI_connect() != SPI_OK_CONNECT)
-		elog(ERROR, "SPI_connect failed");
-
-	/* Fetch or prepare a saved plan for the real check */
-	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK);
-
-	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
-	{
-		StringInfoData querybuf;
-		char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
-		Oid			queryoids[RI_MAX_NUMKEYS];
-		const char *pk_only;
-
-		/* ----------
-		 * The query string built is
-		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
-		 *		   FOR KEY SHARE OF x
-		 * The type id's for the $ parameters are those of the
-		 * corresponding FK attributes.
-		 * ----------
-		 */
-		initStringInfo(&querybuf);
-		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
-			"" : "ONLY ";
-		quoteRelationName(pkrelname, pk_rel);
-		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s p WHERE ",
-						 pk_only, pkrelname);
-		ri_GenerateQual(&querybuf, "AND", riinfo->nkeys,
-						NULL, pk_rel, riinfo->pk_attnums,
-						NULL, fk_rel, riinfo->fk_attnums,
-						riinfo->pf_eq_oprs,
-						GQ_PARAMS_RIGHT, queryoids);
-		appendStringInfoString(&querybuf, " FOR KEY SHARE OF p");
-
-		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
-							 &qkey, fk_rel, pk_rel);
-	}
-
-	/*
-	 * Now check that foreign key exists in PK table
-	 */
-	if (!ri_PerformCheck(riinfo, &qkey, qplan,
-						 fk_rel, pk_rel,
-						 NULL, newslot,
-						 false,
-						 SPI_OK_SELECT))
-		ri_ReportViolation(riinfo,
-						   pk_rel, fk_rel,
-						   newslot,
-						   NULL,
-						   qkey.constr_queryno, false);
-
-	if (SPI_finish() != SPI_OK_FINISH)
-		elog(ERROR, "SPI_finish failed");
-
-	table_close(pk_rel, RowShareLock);
-
-	return PointerGetDatum(NULL);
+	return true;
 }
 
 
@@ -467,7 +566,8 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 	bool		result;
 
 	/* Only called for non-null rows */
-	Assert(ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) == RI_KEYS_NONE_NULL);
+	Assert(ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true,
+						true) == RI_KEYS_NONE_NULL);
 
 	if (SPI_connect() != SPI_OK_CONNECT)
 		elog(ERROR, "SPI_connect failed");
@@ -480,10 +580,10 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 
 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
 	{
-		StringInfoData querybuf;
+		StringInfo	querybuf = makeStringInfo();
 		char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
 		const char *pk_only;
-		Oid			queryoids[RI_MAX_NUMKEYS];
+		Oid			paramtypes[RI_MAX_NUMKEYS];
 
 		/* ----------
 		 * The query string built is
@@ -493,23 +593,23 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 		 * PK attributes themselves.
 		 * ----------
 		 */
-		initStringInfo(&querybuf);
 		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
 			"" : "ONLY ";
 		quoteRelationName(pkrelname, pk_rel);
-		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x WHERE ",
+		appendStringInfo(querybuf, "SELECT 1 FROM %s%s x WHERE ",
 						 pk_only, pkrelname);
 
-		ri_GenerateQual(&querybuf, "AND", riinfo->nkeys,
+		ri_GenerateQual(querybuf, "AND", riinfo->nkeys,
 						NULL, pk_rel, riinfo->pk_attnums,
 						NULL, fk_rel, riinfo->fk_attnums,
 						riinfo->pf_eq_oprs,
 						GQ_PARAMS_RIGHT,
-						queryoids);
-		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
+						paramtypes);
+
+		appendStringInfoString(querybuf, " FOR KEY SHARE OF x");
 
 		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+		qplan = ri_PlanCheck(querybuf->data, riinfo->nkeys, paramtypes,
 							 &qkey, fk_rel, pk_rel);
 	}
 
@@ -518,7 +618,7 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 	 */
 	result = ri_PerformCheck(riinfo, &qkey, qplan,
 							 fk_rel, pk_rel,
-							 oldslot, NULL,
+							 oldslot,
 							 true,	/* treat like update */
 							 SPI_OK_SELECT);
 
@@ -543,7 +643,7 @@ RI_FKey_noaction_del(PG_FUNCTION_ARGS)
 	ri_CheckTrigger(fcinfo, "RI_FKey_noaction_del", RI_TRIGTYPE_DELETE);
 
 	/* Share code with RESTRICT/UPDATE cases. */
-	return ri_restrict((TriggerData *) fcinfo->context, true);
+	return ri_restrict((TriggerData *) fcinfo->context, true, NULL);
 }
 
 /*
@@ -563,7 +663,7 @@ RI_FKey_restrict_del(PG_FUNCTION_ARGS)
 	ri_CheckTrigger(fcinfo, "RI_FKey_restrict_del", RI_TRIGTYPE_DELETE);
 
 	/* Share code with NO ACTION/UPDATE cases. */
-	return ri_restrict((TriggerData *) fcinfo->context, false);
+	return ri_restrict((TriggerData *) fcinfo->context, false, NULL);
 }
 
 /*
@@ -580,7 +680,7 @@ RI_FKey_noaction_upd(PG_FUNCTION_ARGS)
 	ri_CheckTrigger(fcinfo, "RI_FKey_noaction_upd", RI_TRIGTYPE_UPDATE);
 
 	/* Share code with RESTRICT/DELETE cases. */
-	return ri_restrict((TriggerData *) fcinfo->context, true);
+	return ri_restrict((TriggerData *) fcinfo->context, true, NULL);
 }
 
 /*
@@ -600,7 +700,7 @@ RI_FKey_restrict_upd(PG_FUNCTION_ARGS)
 	ri_CheckTrigger(fcinfo, "RI_FKey_restrict_upd", RI_TRIGTYPE_UPDATE);
 
 	/* Share code with NO ACTION/DELETE cases. */
-	return ri_restrict((TriggerData *) fcinfo->context, false);
+	return ri_restrict((TriggerData *) fcinfo->context, false, NULL);
 }
 
 /*
@@ -608,16 +708,20 @@ RI_FKey_restrict_upd(PG_FUNCTION_ARGS)
  *
  * Common code for ON DELETE RESTRICT, ON DELETE NO ACTION,
  * ON UPDATE RESTRICT, and ON UPDATE NO ACTION.
+ *
+ * If NULL is passed for oldslot, retrieve the rows from
+ * trigdata->ri_tids_old.
  */
 static Datum
-ri_restrict(TriggerData *trigdata, bool is_no_action)
+ri_restrict(TriggerData *trigdata, bool is_no_action, TupleTableSlot *oldslot)
 {
 	const RI_ConstraintInfo *riinfo;
 	Relation	fk_rel;
 	Relation	pk_rel;
-	TupleTableSlot *oldslot;
 	RI_QueryKey qkey;
 	SPIPlanPtr	qplan;
+	Tuplestorestate *oldtable = NULL;
+	bool		first_tuple;
 
 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
 									trigdata->tg_relation, true);
@@ -630,79 +734,76 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
 	 */
 	fk_rel = table_open(riinfo->fk_relid, RowShareLock);
 	pk_rel = trigdata->tg_relation;
-	oldslot = trigdata->tg_trigslot;
-
-	/*
-	 * If another PK row now exists providing the old key values, we should
-	 * not do anything.  However, this check should only be made in the NO
-	 * ACTION case; in RESTRICT cases we don't wish to allow another row to be
-	 * substituted.
-	 */
-	if (is_no_action &&
-		ri_Check_Pk_Match(pk_rel, fk_rel, oldslot, riinfo))
-	{
-		table_close(fk_rel, RowShareLock);
-		return PointerGetDatum(NULL);
-	}
 
 	if (SPI_connect() != SPI_OK_CONNECT)
 		elog(ERROR, "SPI_connect failed");
 
-	/*
-	 * Fetch or prepare a saved plan for the restrict lookup (it's the same
-	 * query for delete and update cases)
-	 */
+	/* Fetch or prepare a saved plan for the real check */
 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_RESTRICT_CHECKREF);
-
 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
 	{
-		StringInfoData querybuf;
-		char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
-		Oid			queryoids[RI_MAX_NUMKEYS];
-		const char *fk_only;
-
-		/* ----------
-		 * The query string built is
-		 *	SELECT 1 FROM [ONLY] <fktable> x WHERE $1 = fkatt1 [AND ...]
-		 *		   FOR KEY SHARE OF x
-		 * The type id's for the $ parameters are those of the
-		 * corresponding PK attributes.
-		 * ----------
-		 */
-		initStringInfo(&querybuf);
-		fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
-			"" : "ONLY ";
-		quoteRelationName(fkrelname, fk_rel);
-		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x WHERE ",
-						 fk_only, fkrelname);
-
-		ri_GenerateQual(&querybuf, "AND", riinfo->nkeys,
-						NULL, pk_rel, riinfo->pk_attnums,
-						NULL, fk_rel, riinfo->fk_attnums,
-						riinfo->pf_eq_oprs,
-						GQ_PARAMS_LEFT,
-						queryoids);
+		char	   *query;
+		Oid			paramtypes[RI_MAX_NUMKEYS];
 
-		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
+		query = ri_restrict_query_single_row(riinfo, fk_rel, pk_rel,
+											 paramtypes);
 
 		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
-							 &qkey, fk_rel, pk_rel);
+		qplan = ri_PlanCheck(query, riinfo->nkeys, paramtypes, &qkey,
+							 fk_rel, pk_rel);
 	}
 
+	if (oldslot == NULL)
+	{
+		oldtable = get_event_tuplestore(trigdata,
+										riinfo->nkeys,
+										riinfo->pk_attnums,
+										true,
+										riinfo->slot_pk->tts_tupleDescriptor,
+										NULL);
+		oldslot = riinfo->slot_pk;
+	}
+
+	first_tuple = true;
+
 	/*
-	 * We have a plan now. Run it to check for existing references.
+	 * Retrieve and check the rows, one after another.
+	 *
+	 * One tuple should always be processed: if there's no "oldtable", valid
+	 * "oldslot" should have been passed.
 	 */
-	if (ri_PerformCheck(riinfo, &qkey, qplan,
-						fk_rel, pk_rel,
-						oldslot, NULL,
-						true,	/* must detect new rows */
-						SPI_OK_SELECT))
-		ri_ReportViolation(riinfo,
-						   pk_rel, fk_rel,
-						   oldslot,
-						   NULL,
-						   qkey.constr_queryno, false);
+	while ((oldtable && tuplestore_gettupleslot(oldtable, true, false, oldslot))
+		   || first_tuple)
+	{
+		/*
+		 * If another PK row now exists providing the old key values, we
+		 * should not do anything.  However, this check should only be made in
+		 * the NO ACTION case; in RESTRICT cases we don't wish to allow
+		 * another row to be substituted.
+		 */
+		if (is_no_action &&
+			ri_Check_Pk_Match(pk_rel, fk_rel, oldslot, riinfo))
+			continue;
+
+		if (ri_PerformCheck(riinfo, &qkey, qplan,
+							fk_rel, pk_rel,
+							oldslot,
+							true,	/* must detect new rows */
+							SPI_OK_SELECT))
+			ri_ReportViolation(riinfo,
+							   pk_rel, fk_rel,
+							   oldslot,
+							   NULL,
+							   qkey.constr_queryno, false);
+
+		if (first_tuple)
+		{
+			if (oldtable == NULL)
+				break;
+
+			first_tuple = false;
+		}
+	}
 
 	if (SPI_finish() != SPI_OK_FINISH)
 		elog(ERROR, "SPI_finish failed");
@@ -712,6 +813,44 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
 	return PointerGetDatum(NULL);
 }
 
+/*
+ * Like ri_restrict_query(), but check a single row.
+ */
+static char *
+ri_restrict_query_single_row(const RI_ConstraintInfo *riinfo, Relation fk_rel,
+							 Relation pk_rel, Oid *paramtypes)
+{
+	StringInfo	querybuf = makeStringInfo();
+	char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
+	const char *fk_only;
+
+	/* ----------
+	 * The query string built is
+	 *
+	 *	SELECT 1 FROM [ONLY] <fktable> x WHERE $1 = fkatt1 [AND ...]
+	 *		   FOR KEY SHARE OF x
+	 *
+	 * The type id's for the $ parameters are those of the
+	 * corresponding PK attributes.
+	 * ----------
+	 */
+	fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+		"" : "ONLY ";
+	quoteRelationName(fkrelname, fk_rel);
+	appendStringInfo(querybuf, "SELECT 1 FROM %s%s x WHERE ",
+					 fk_only, fkrelname);
+
+	ri_GenerateQual(querybuf, "AND", riinfo->nkeys,
+					NULL, pk_rel, riinfo->pk_attnums,
+					NULL, fk_rel, riinfo->fk_attnums,
+					riinfo->pf_eq_oprs,
+					GQ_PARAMS_LEFT,
+					paramtypes);
+
+	appendStringInfoString(querybuf, " FOR KEY SHARE OF x");
+
+	return querybuf->data;
+}
 
 /*
  * RI_FKey_cascade_del -
@@ -725,9 +864,10 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
 	const RI_ConstraintInfo *riinfo;
 	Relation	fk_rel;
 	Relation	pk_rel;
-	TupleTableSlot *oldslot;
 	RI_QueryKey qkey;
 	SPIPlanPtr	qplan;
+	Tuplestorestate *oldtable;
+	TupleTableSlot *oldslot;
 
 	/* Check that this is a valid trigger call on the right time and event. */
 	ri_CheckTrigger(fcinfo, "RI_FKey_cascade_del", RI_TRIGTYPE_DELETE);
@@ -743,56 +883,46 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
 	 */
 	fk_rel = table_open(riinfo->fk_relid, RowExclusiveLock);
 	pk_rel = trigdata->tg_relation;
-	oldslot = trigdata->tg_trigslot;
 
 	if (SPI_connect() != SPI_OK_CONNECT)
 		elog(ERROR, "SPI_connect failed");
 
-	/* Fetch or prepare a saved plan for the cascaded delete */
 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CASCADE_DEL_DODELETE);
 
 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
 	{
-		StringInfoData querybuf;
-		char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
-		Oid			queryoids[RI_MAX_NUMKEYS];
-		const char *fk_only;
-
-		/* ----------
-		 * The query string built is
-		 *	DELETE FROM [ONLY] <fktable> WHERE $1 = fkatt1 [AND ...]
-		 * The type id's for the $ parameters are those of the
-		 * corresponding PK attributes.
-		 * ----------
-		 */
-		initStringInfo(&querybuf);
-		fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
-			"" : "ONLY ";
-		quoteRelationName(fkrelname, fk_rel);
-
-		appendStringInfo(&querybuf, "DELETE FROM %s%s WHERE ", fk_only,
-						 fkrelname);
-		ri_GenerateQual(&querybuf, "AND", riinfo->nkeys,
-						NULL, pk_rel, riinfo->pk_attnums,
-						NULL, fk_rel, riinfo->fk_attnums,
-						riinfo->pf_eq_oprs,
-						GQ_PARAMS_LEFT,
-						queryoids);
+		Oid			paramtypes[RI_MAX_NUMKEYS];
+		char	   *query = ri_cascade_del_query_single_row(riinfo,
+															fk_rel,
+															pk_rel,
+															paramtypes);
 
 		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
-							 &qkey, fk_rel, pk_rel);
+		qplan = ri_PlanCheck(query, riinfo->nkeys, paramtypes, &qkey,
+							 fk_rel, pk_rel);
 	}
 
-	/*
-	 * We have a plan now. Build up the arguments from the key values in the
-	 * deleted PK tuple and delete the referencing rows
-	 */
-	ri_PerformCheck(riinfo, &qkey, qplan,
-					fk_rel, pk_rel,
-					oldslot, NULL,
-					true,		/* must detect new rows */
-					SPI_OK_DELETE);
+	oldtable = get_event_tuplestore(trigdata,
+									riinfo->nkeys,
+									riinfo->pk_attnums,
+									true,
+									riinfo->slot_pk->tts_tupleDescriptor,
+									NULL);
+
+	/* Retrieve and check the rows, one after another. */
+	oldslot = riinfo->slot_pk;
+	while (tuplestore_gettupleslot(oldtable, true, false, oldslot))
+	{
+		/*
+		 * We have a plan now. Build up the arguments from the key values in
+		 * the deleted PK tuple and delete the referencing rows
+		 */
+		ri_PerformCheck(riinfo, &qkey, qplan,
+						fk_rel, pk_rel,
+						oldslot,
+						true,	/* must detect new rows */
+						SPI_OK_DELETE);
+	}
 
 	if (SPI_finish() != SPI_OK_FINISH)
 		elog(ERROR, "SPI_finish failed");
@@ -802,6 +932,41 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
 	return PointerGetDatum(NULL);
 }
 
+static char *
+ri_cascade_del_query_single_row(const RI_ConstraintInfo *riinfo,
+								Relation fk_rel, Relation pk_rel,
+								Oid *paramtypes)
+{
+	StringInfo	querybuf = makeStringInfo();
+	const char *fk_only;
+	char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
+
+	/* ----------
+	 * The query string built is
+	 *
+	 *	DELETE FROM [ONLY] <fktable> WHERE $1 = fkatt1 [AND ...]
+	 *
+	 * The type id's for the $ parameters are those of the
+	 * corresponding PK attributes.
+	 * ----------
+	 */
+
+	fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+		"" : "ONLY ";
+	quoteRelationName(fkrelname, fk_rel);
+
+	appendStringInfo(querybuf, "DELETE FROM %s%s WHERE ", fk_only,
+					 fkrelname);
+
+	ri_GenerateQual(querybuf, "AND", riinfo->nkeys,
+					NULL, pk_rel, riinfo->pk_attnums,
+					NULL, fk_rel, riinfo->fk_attnums,
+					riinfo->pf_eq_oprs,
+					GQ_PARAMS_LEFT,
+					paramtypes);
+
+	return querybuf->data;
+}
 
 /*
  * RI_FKey_cascade_upd -
@@ -815,10 +980,10 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
 	const RI_ConstraintInfo *riinfo;
 	Relation	fk_rel;
 	Relation	pk_rel;
-	TupleTableSlot *newslot;
-	TupleTableSlot *oldslot;
 	RI_QueryKey qkey;
 	SPIPlanPtr	qplan;
+	Tuplestorestate *newtable;
+	TupleTableSlot *slot;
 
 	/* Check that this is a valid trigger call on the right time and event. */
 	ri_CheckTrigger(fcinfo, "RI_FKey_cascade_upd", RI_TRIGTYPE_UPDATE);
@@ -835,85 +1000,44 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
 	 */
 	fk_rel = table_open(riinfo->fk_relid, RowExclusiveLock);
 	pk_rel = trigdata->tg_relation;
-	newslot = trigdata->tg_newslot;
-	oldslot = trigdata->tg_trigslot;
 
 	if (SPI_connect() != SPI_OK_CONNECT)
 		elog(ERROR, "SPI_connect failed");
 
 	/* Fetch or prepare a saved plan for the cascaded update */
 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CASCADE_UPD_DOUPDATE);
-
 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
 	{
-		StringInfoData querybuf;
-		StringInfoData qualbuf;
-		char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
-		char		attname[MAX_QUOTED_NAME_LEN];
-		char		paramname[16];
-		const char *querysep;
-		const char *qualsep;
-		Oid			queryoids[RI_MAX_NUMKEYS * 2];
-		const char *fk_only;
-
-		/* ----------
-		 * The query string built is
-		 *	UPDATE [ONLY] <fktable> SET fkatt1 = $1 [, ...]
-		 *			WHERE $n = fkatt1 [AND ...]
-		 * The type id's for the $ parameters are those of the
-		 * corresponding PK attributes.  Note that we are assuming
-		 * there is an assignment cast from the PK to the FK type;
-		 * else the parser will fail.
-		 * ----------
-		 */
-		initStringInfo(&querybuf);
-		initStringInfo(&qualbuf);
-		fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
-			"" : "ONLY ";
-		quoteRelationName(fkrelname, fk_rel);
-		appendStringInfo(&querybuf, "UPDATE %s%s SET",
-						 fk_only, fkrelname);
-		querysep = "";
-		qualsep = "WHERE";
-		for (int i = 0, j = riinfo->nkeys; i < riinfo->nkeys; i++, j++)
-		{
-			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
-			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
-			Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
-			Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
-
-			quoteOneName(attname,
-						 RIAttName(fk_rel, riinfo->fk_attnums[i]));
-			appendStringInfo(&querybuf,
-							 "%s %s = $%d",
-							 querysep, attname, i + 1);
-			sprintf(paramname, "$%d", j + 1);
-			ri_GenerateQualComponent(&qualbuf, qualsep,
-									 paramname, pk_type,
-									 riinfo->pf_eq_oprs[i],
-									 attname, fk_type);
-			if (pk_coll != fk_coll && !get_collation_isdeterministic(pk_coll))
-				ri_GenerateQualCollation(&querybuf, pk_coll);
-			querysep = ",";
-			qualsep = "AND";
-			queryoids[i] = pk_type;
-			queryoids[j] = pk_type;
-		}
-		appendBinaryStringInfo(&querybuf, qualbuf.data, qualbuf.len);
+		Oid			paramtypes[RI_MAX_NUMKEYS * 2];
+		char	   *query = ri_cascade_upd_query_single_row(riinfo,
+															fk_rel,
+															pk_rel,
+															paramtypes);
 
 		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys * 2, queryoids,
-							 &qkey, fk_rel, pk_rel);
+		qplan = ri_PlanCheck(query, 2 * riinfo->nkeys, paramtypes, &qkey,
+							 fk_rel, pk_rel);
 	}
 
 	/*
-	 * We have a plan now. Run it to update the existing references.
+	 * In this case, both old and new values should be in the same tuplestore
+	 * because there's no useful join column.
 	 */
-	ri_PerformCheck(riinfo, &qkey, qplan,
-					fk_rel, pk_rel,
-					oldslot, newslot,
-					true,		/* must detect new rows */
-					SPI_OK_UPDATE);
+	newtable = get_event_tuplestore_for_cascade_update(trigdata, riinfo);
+
+	/* Retrieve and check the rows, one after another. */
+	slot = riinfo->slot_both;
+	while (tuplestore_gettupleslot(newtable, true, false, slot))
+	{
+		/*
+		 * We have a plan now. Run it to update the existing references.
+		 */
+		ri_PerformCheck(riinfo, &qkey, qplan,
+						fk_rel, pk_rel,
+						slot,
+						true,	/* must detect new rows */
+						SPI_OK_UPDATE);
+	}
 
 	if (SPI_finish() != SPI_OK_FINISH)
 		elog(ERROR, "SPI_finish failed");
@@ -923,6 +1047,69 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
 	return PointerGetDatum(NULL);
 }
 
+static char *
+ri_cascade_upd_query_single_row(const RI_ConstraintInfo *riinfo,
+								Relation fk_rel, Relation pk_rel,
+								Oid *paramtypes)
+{
+	StringInfo	querybuf = makeStringInfo();
+	StringInfo	qualbuf = makeStringInfo();
+	char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
+	char		attname[MAX_QUOTED_NAME_LEN];
+	char		paramname[16];
+	const char *querysep;
+	const char *qualsep;
+	const char *fk_only;
+
+	/* ----------
+	 * The query string built is
+	 *
+	 *	UPDATE [ONLY] <fktable> SET fkatt1 = $1 [, ...]
+	 *			WHERE $n = fkatt1 [AND ...]
+	 *
+	 * The type id's for the $ parameters are those of the
+	 * corresponding PK attributes.  Note that we are assuming
+	 * there is an assignment cast from the PK to the FK type;
+	 * else the parser will fail.
+	 * ----------
+	 */
+	fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+		"" : "ONLY ";
+	quoteRelationName(fkrelname, fk_rel);
+	appendStringInfo(querybuf, "UPDATE %s%s SET",
+					 fk_only, fkrelname);
+	querysep = "";
+	qualsep = "WHERE";
+	for (int i = 0, j = riinfo->nkeys; i < riinfo->nkeys; i++, j++)
+	{
+		Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
+		Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+		Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
+		Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
+
+		quoteOneName(attname,
+					 RIAttName(fk_rel, riinfo->fk_attnums[i]));
+		appendStringInfo(querybuf,
+						 "%s %s = $%d",
+						 querysep, attname, i + 1);
+		sprintf(paramname, "$%d", j + 1);
+		ri_GenerateQualComponent(qualbuf, qualsep,
+								 paramname, pk_type,
+								 riinfo->pf_eq_oprs[i],
+								 attname, fk_type);
+
+		if (pk_coll != fk_coll && !get_collation_isdeterministic(pk_coll))
+			ri_GenerateQualCollation(querybuf, pk_coll);
+
+		querysep = ",";
+		qualsep = "AND";
+		paramtypes[i] = pk_type;
+		paramtypes[j] = pk_type;
+	}
+	appendBinaryStringInfo(querybuf, qualbuf->data, qualbuf->len);
+
+	return querybuf->data;
+}
 
 /*
  * RI_FKey_setnull_del -
@@ -996,9 +1183,10 @@ ri_set(TriggerData *trigdata, bool is_set_null)
 	const RI_ConstraintInfo *riinfo;
 	Relation	fk_rel;
 	Relation	pk_rel;
-	TupleTableSlot *oldslot;
 	RI_QueryKey qkey;
 	SPIPlanPtr	qplan;
+	Tuplestorestate *oldtable;
+	TupleTableSlot *oldslot;
 
 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
 									trigdata->tg_relation, true);
@@ -1011,7 +1199,6 @@ ri_set(TriggerData *trigdata, bool is_set_null)
 	 */
 	fk_rel = table_open(riinfo->fk_relid, RowExclusiveLock);
 	pk_rel = trigdata->tg_relation;
-	oldslot = trigdata->tg_trigslot;
 
 	if (SPI_connect() != SPI_OK_CONNECT)
 		elog(ERROR, "SPI_connect failed");
@@ -1024,90 +1211,110 @@ ri_set(TriggerData *trigdata, bool is_set_null)
 					 (is_set_null
 					  ? RI_PLAN_SETNULL_DOUPDATE
 					  : RI_PLAN_SETDEFAULT_DOUPDATE));
-
 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
 	{
-		StringInfoData querybuf;
-		char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
-		Oid			queryoids[RI_MAX_NUMKEYS];
-		const char *fk_only;
+		Oid			paramtypes[RI_MAX_NUMKEYS];
+		char	   *query = ri_set_query_single_row(riinfo, fk_rel, pk_rel,
+													paramtypes, is_set_null);
 
-		/* ----------
-		 * The query string built is
-		 *	UPDATE [ONLY] <fktable> SET fkatt1 = {NULL|DEFAULT} [, ...]
-		 *			WHERE $1 = fkatt1 [AND ...]
-		 * The type id's for the $ parameters are those of the
-		 * corresponding PK attributes.
-		 * ----------
-		 */
-		initStringInfo(&querybuf);
-		fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
-			"" : "ONLY ";
-		quoteRelationName(fkrelname, fk_rel);
-		appendStringInfo(&querybuf, "UPDATE %s%s SET",
-						 fk_only, fkrelname);
+		/* Prepare and save the plan */
+		qplan = ri_PlanCheck(query, riinfo->nkeys, paramtypes, &qkey,
+							 fk_rel, pk_rel);
+	}
 
-		for (int i = 0; i < riinfo->nkeys; i++)
-		{
-			char		attname[MAX_QUOTED_NAME_LEN];
-			const char *sep = i > 0 ? "," : "";
+	oldtable = get_event_tuplestore(trigdata,
+									riinfo->nkeys,
+									riinfo->pk_attnums,
+									true,
+									riinfo->slot_pk->tts_tupleDescriptor,
+									NULL);
 
-			quoteOneName(attname,
-						 RIAttName(fk_rel, riinfo->fk_attnums[i]));
+	/* The query needs parameters, so retrieve them now. */
+	oldslot = riinfo->slot_pk;
+	while (tuplestore_gettupleslot(oldtable, true, false, oldslot))
+	{
+		/*
+		 * We have a plan now. Run it to update the existing references.
+		 */
+		ri_PerformCheck(riinfo, &qkey, qplan,
+						fk_rel, pk_rel,
+						oldslot,
+						true,	/* must detect new rows */
+						SPI_OK_UPDATE);
 
-			appendStringInfo(&querybuf,
-							 "%s %s = %s",
-							 sep, attname,
-							 is_set_null ? "NULL" : "DEFAULT");
+		if (!is_set_null)
+		{
+			/*
+			 * If we just deleted or updated the PK row whose key was equal to
+			 * the FK columns' default values, and a referencing row exists in
+			 * the FK table, we would have updated that row to the same values
+			 * it already had --- and RI_FKey_fk_upd_check_required would
+			 * hence believe no check is necessary.  So we need to do another
+			 * lookup now and in case a reference still exists, abort the
+			 * operation.  That is already implemented in the NO ACTION
+			 * trigger, so just run it. (This recheck is only needed in the
+			 * SET DEFAULT case, since CASCADE would remove such rows in case
+			 * of a DELETE operation or would change the FK key values in case
+			 * of an UPDATE, while SET NULL is certain to result in rows that
+			 * satisfy the FK constraint.)
+			 */
+			ri_restrict(trigdata, true, oldslot);
 		}
-
-		appendStringInfo(&querybuf, " WHERE ");
-		ri_GenerateQual(&querybuf, "AND", riinfo->nkeys,
-						NULL, pk_rel, riinfo->pk_attnums,
-						NULL, fk_rel, riinfo->fk_attnums,
-						riinfo->pf_eq_oprs,
-						GQ_PARAMS_LEFT, queryoids);
-
-		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
-							 &qkey, fk_rel, pk_rel);
 	}
 
-	/*
-	 * We have a plan now. Run it to update the existing references.
-	 */
-	ri_PerformCheck(riinfo, &qkey, qplan,
-					fk_rel, pk_rel,
-					oldslot, NULL,
-					true,		/* must detect new rows */
-					SPI_OK_UPDATE);
-
 	if (SPI_finish() != SPI_OK_FINISH)
 		elog(ERROR, "SPI_finish failed");
 
 	table_close(fk_rel, RowExclusiveLock);
 
-	if (is_set_null)
-		return PointerGetDatum(NULL);
-	else
+	return PointerGetDatum(NULL);
+}
+
+static char *
+ri_set_query_single_row(const RI_ConstraintInfo *riinfo, Relation fk_rel,
+						Relation pk_rel, Oid *paramtypes, bool is_set_null)
+{
+	StringInfo	querybuf = makeStringInfo();
+	char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
+	const char *fk_only;
+
+	/* ----------
+	 * The query string built is
+	 *	UPDATE [ONLY] <fktable> SET fkatt1 = {NULL|DEFAULT} [, ...]
+	 *			WHERE $1 = fkatt1 [AND ...]
+	 * The type id's for the $ parameters are those of the
+	 * corresponding PK attributes.
+	 * ----------
+	 */
+	fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+		"" : "ONLY ";
+	quoteRelationName(fkrelname, fk_rel);
+	appendStringInfo(querybuf, "UPDATE %s%s SET",
+					 fk_only, fkrelname);
+
+	for (int i = 0; i < riinfo->nkeys; i++)
 	{
-		/*
-		 * If we just deleted or updated the PK row whose key was equal to the
-		 * FK columns' default values, and a referencing row exists in the FK
-		 * table, we would have updated that row to the same values it already
-		 * had --- and RI_FKey_fk_upd_check_required would hence believe no
-		 * check is necessary.  So we need to do another lookup now and in
-		 * case a reference still exists, abort the operation.  That is
-		 * already implemented in the NO ACTION trigger, so just run it. (This
-		 * recheck is only needed in the SET DEFAULT case, since CASCADE would
-		 * remove such rows in case of a DELETE operation or would change the
-		 * FK key values in case of an UPDATE, while SET NULL is certain to
-		 * result in rows that satisfy the FK constraint.)
-		 */
-		return ri_restrict(trigdata, true);
+		char		attname[MAX_QUOTED_NAME_LEN];
+		const char *sep = i > 0 ? "," : "";
+
+		quoteOneName(attname,
+					 RIAttName(fk_rel, riinfo->fk_attnums[i]));
+
+		appendStringInfo(querybuf,
+						 "%s %s = %s",
+						 sep, attname,
+						 is_set_null ? "NULL" : "DEFAULT");
 	}
-}
 
+	appendStringInfo(querybuf, " WHERE ");
+	ri_GenerateQual(querybuf, "AND", riinfo->nkeys,
+					NULL, pk_rel, riinfo->pk_attnums,
+					NULL, fk_rel, riinfo->fk_attnums,
+					riinfo->pf_eq_oprs,
+					GQ_PARAMS_LEFT, paramtypes);
+
+	return querybuf->data;
+}
 
 /*
  * RI_FKey_pk_upd_check_required -
@@ -1132,7 +1339,8 @@ RI_FKey_pk_upd_check_required(Trigger *trigger, Relation pk_rel,
 	 * If any old key value is NULL, the row could not have been referenced by
 	 * an FK row, so no check is needed.
 	 */
-	if (ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) != RI_KEYS_NONE_NULL)
+	if (ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true,
+					 false) != RI_KEYS_NONE_NULL)
 		return false;
 
 	/* If all old and new key values are equal, no check is needed */
@@ -1164,7 +1372,8 @@ RI_FKey_fk_upd_check_required(Trigger *trigger, Relation fk_rel,
 
 	riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
 
-	ri_nullcheck = ri_NullCheck(RelationGetDescr(fk_rel), newslot, riinfo, false);
+	ri_nullcheck = ri_NullCheck(RelationGetDescr(fk_rel), newslot, riinfo, false,
+								false);
 
 	/*
 	 * If all new key values are NULL, the row satisfies the constraint, so no
@@ -1236,6 +1445,24 @@ RI_FKey_fk_upd_check_required(Trigger *trigger, Relation fk_rel,
 	return true;
 }
 
+/*
+ * RI_FKey_fk_attributes -
+ *
+ * Return tuple descriptor containing the FK attributes of given FK constraint
+ * and only those. In addition, array containing the numbers of the key
+ * attributes within the whole table is stored to *attnums_p.
+ */
+TupleDesc
+RI_FKey_fk_attributes(Trigger *trigger, Relation trig_rel, const int16 **attnums_p)
+{
+	const RI_ConstraintInfo *riinfo;
+
+	riinfo = ri_FetchConstraintInfo(trigger, trig_rel, false);
+	*attnums_p = riinfo->fk_attnums;
+
+	return riinfo->slot_fk->tts_tupleDescriptor;
+}
+
 /*
  * RI_Initial_Check -
  *
@@ -1471,7 +1698,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 		 * disallows partially-null FK rows.
 		 */
 		if (fake_riinfo.confmatchtype == FKCONSTR_MATCH_FULL &&
-			ri_NullCheck(tupdesc, slot, &fake_riinfo, false) != RI_KEYS_NONE_NULL)
+			ri_NullCheck(tupdesc, slot, &fake_riinfo, false, false) != RI_KEYS_NONE_NULL)
 			ereport(ERROR,
 					(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
 					 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
@@ -1974,7 +2201,7 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk)
 				 errhint("Remove this referential integrity trigger and its mates, then do ALTER TABLE ADD CONSTRAINT.")));
 
 	/* Find or create a hashtable entry for the constraint */
-	riinfo = ri_LoadConstraintInfo(constraintOid);
+	riinfo = ri_LoadConstraintInfo(constraintOid, trig_rel, rel_is_pk);
 
 	/* Do some easy cross-checks against the trigger call data */
 	if (rel_is_pk)
@@ -2010,12 +2237,16 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk)
  * Fetch or create the RI_ConstraintInfo struct for an FK constraint.
  */
 static const RI_ConstraintInfo *
-ri_LoadConstraintInfo(Oid constraintOid)
+ri_LoadConstraintInfo(Oid constraintOid, Relation trig_rel, bool rel_is_pk)
 {
 	RI_ConstraintInfo *riinfo;
 	bool		found;
 	HeapTuple	tup;
 	Form_pg_constraint conForm;
+	MemoryContext oldcxt;
+	TupleDesc	tupdesc;
+	Relation	pk_rel,
+				fk_rel;
 
 	/*
 	 * On the first call initialize the hashtable
@@ -2030,7 +2261,12 @@ ri_LoadConstraintInfo(Oid constraintOid)
 											   (void *) &constraintOid,
 											   HASH_ENTER, &found);
 	if (!found)
+	{
 		riinfo->valid = false;
+		riinfo->slot_mcxt = AllocSetContextCreate(TopMemoryContext,
+												  "RI_ConstraintInfoSlots",
+												  ALLOCSET_SMALL_SIZES);
+	}
 	else if (riinfo->valid)
 		return riinfo;
 
@@ -2067,6 +2303,60 @@ ri_LoadConstraintInfo(Oid constraintOid)
 
 	ReleaseSysCache(tup);
 
+	/*
+	 * Construct auxiliary tuple descriptors containing only the key
+	 * attributes.
+	 */
+	if (rel_is_pk)
+	{
+		pk_rel = trig_rel;
+		fk_rel = table_open(riinfo->fk_relid, AccessShareLock);
+	}
+	else
+	{
+		pk_rel = table_open(riinfo->pk_relid, AccessShareLock);
+		fk_rel = trig_rel;
+	}
+
+	/*
+	 * Use a separate memory context for the slots so that memory does not
+	 * leak if the riinfo needs to be reloaded.
+	 */
+	MemoryContextReset(riinfo->slot_mcxt);
+	oldcxt = MemoryContextSwitchTo(riinfo->slot_mcxt);
+
+	/* The PK attributes. */
+	tupdesc = CreateTemplateTupleDesc(riinfo->nkeys);
+	add_key_attrs_to_tupdesc(tupdesc, pk_rel, riinfo, riinfo->pk_attnums, 1,
+							 false);
+	riinfo->slot_pk = MakeSingleTupleTableSlot(tupdesc, &TTSOpsMinimalTuple);
+
+	/* The FK attributes. */
+	tupdesc = CreateTemplateTupleDesc(riinfo->nkeys);
+	add_key_attrs_to_tupdesc(tupdesc, fk_rel, riinfo, riinfo->fk_attnums, 1,
+							 false);
+	riinfo->slot_fk = MakeSingleTupleTableSlot(tupdesc, &TTSOpsMinimalTuple);
+
+	/*
+	 * The descriptor to store both NEW and OLD tuple into when processing ON
+	 * UPDATE CASCADE.
+	 */
+	tupdesc = CreateTemplateTupleDesc(2 * riinfo->nkeys);
+	/* Add the key attributes for both NEW and OLD. */
+	add_key_attrs_to_tupdesc(tupdesc, pk_rel, riinfo, riinfo->pk_attnums, 1,
+							 true);
+	add_key_attrs_to_tupdesc(tupdesc, pk_rel, riinfo, riinfo->pk_attnums,
+							 riinfo->nkeys + 1, true);
+	riinfo->slot_both = MakeSingleTupleTableSlot(tupdesc,
+												 &TTSOpsMinimalTuple);
+
+	MemoryContextSwitchTo(oldcxt);
+
+	if (rel_is_pk)
+		table_close(fk_rel, AccessShareLock);
+	else
+		table_close(pk_rel, AccessShareLock);
+
 	/*
 	 * For efficient processing of invalidation messages below, we keep a
 	 * doubly-linked list, and a count, of all currently valid entries.
@@ -2133,7 +2423,8 @@ InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue)
  */
 static SPIPlanPtr
 ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
-			 RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel)
+			 RI_QueryKey *qkey, Relation fk_rel,
+			 Relation pk_rel)
 {
 	SPIPlanPtr	qplan;
 	Relation	query_rel;
@@ -2156,7 +2447,7 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
 						   SECURITY_NOFORCE_RLS);
 
 	/* Create the plan */
-	qplan = SPI_prepare(querystr, nargs, argtypes);
+	qplan = SPI_prepare(querystr, nargs, nargs > 0 ? argtypes : NULL);
 
 	if (qplan == NULL)
 		elog(ERROR, "SPI_prepare returned %s for %s", SPI_result_code_string(SPI_result), querystr);
@@ -2178,20 +2469,20 @@ static bool
 ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 				RI_QueryKey *qkey, SPIPlanPtr qplan,
 				Relation fk_rel, Relation pk_rel,
-				TupleTableSlot *oldslot, TupleTableSlot *newslot,
+				TupleTableSlot *slot,
 				bool detectNewRows, int expect_OK)
 {
-	Relation	query_rel,
-				source_rel;
-	bool		source_is_pk;
+	Relation	query_rel;
 	Snapshot	test_snapshot;
 	Snapshot	crosscheck_snapshot;
 	int			limit;
 	int			spi_result;
 	Oid			save_userid;
 	int			save_sec_context;
-	Datum		vals[RI_MAX_NUMKEYS * 2];
-	char		nulls[RI_MAX_NUMKEYS * 2];
+	Datum		vals_loc[RI_MAX_NUMKEYS * 2];
+	char		nulls_loc[RI_MAX_NUMKEYS * 2];
+	Datum	   *vals = NULL;
+	char	   *nulls = NULL;
 
 	/*
 	 * Use the query type code to determine whether the query is run against
@@ -2202,37 +2493,26 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 	else
 		query_rel = fk_rel;
 
-	/*
-	 * The values for the query are taken from the table on which the trigger
-	 * is called - it is normally the other one with respect to query_rel. An
-	 * exception is ri_Check_Pk_Match(), which uses the PK table for both (and
-	 * sets queryno to RI_PLAN_CHECK_LOOKUPPK_FROM_PK).  We might eventually
-	 * need some less klugy way to determine this.
-	 */
-	if (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK)
+	if (slot)
 	{
-		source_rel = fk_rel;
-		source_is_pk = false;
-	}
-	else
-	{
-		source_rel = pk_rel;
-		source_is_pk = true;
-	}
+		int			nparams = riinfo->nkeys;
 
-	/* Extract the parameters to be passed into the query */
-	if (newslot)
-	{
-		ri_ExtractValues(source_rel, newslot, riinfo, source_is_pk,
-						 vals, nulls);
-		if (oldslot)
-			ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
-							 vals + riinfo->nkeys, nulls + riinfo->nkeys);
-	}
-	else
-	{
-		ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
-						 vals, nulls);
+		vals = vals_loc;
+		nulls = nulls_loc;
+
+		/* Extract the parameters to be passed into the query */
+		ri_ExtractValues(slot, 0, nparams, vals, nulls);
+
+		if (slot->tts_tupleDescriptor->natts != nparams)
+		{
+			/*
+			 * In a special case (ON UPDATE CASCADE) the slot may contain both
+			 * new and old values of the key.
+			 */
+			Assert(slot->tts_tupleDescriptor->natts == nparams * 2);
+
+			ri_ExtractValues(slot, nparams, nparams, vals, nulls);
+		}
 	}
 
 	/*
@@ -2295,28 +2575,21 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 						RelationGetRelationName(fk_rel)),
 				 errhint("This is most likely due to a rule having rewritten the query.")));
 
-	return SPI_processed != 0;
+	return SPI_processed > 0;
 }
 
 /*
  * Extract fields from a tuple into Datum/nulls arrays
  */
 static void
-ri_ExtractValues(Relation rel, TupleTableSlot *slot,
-				 const RI_ConstraintInfo *riinfo, bool rel_is_pk,
-				 Datum *vals, char *nulls)
+ri_ExtractValues(TupleTableSlot *slot, int first, int nkeys, Datum *vals,
+				 char *nulls)
 {
-	const int16 *attnums;
 	bool		isnull;
 
-	if (rel_is_pk)
-		attnums = riinfo->pk_attnums;
-	else
-		attnums = riinfo->fk_attnums;
-
-	for (int i = 0; i < riinfo->nkeys; i++)
+	for (int i = first; i < first + nkeys; i++)
 	{
-		vals[i] = slot_getattr(slot, attnums[i], &isnull);
+		vals[i] = slot_getattr(slot, i + 1, &isnull);
 		nulls[i] = isnull ? 'n' : ' ';
 	}
 }
@@ -2345,25 +2618,27 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 	bool		has_perm = true;
 
 	/*
-	 * Determine which relation to complain about.  If tupdesc wasn't passed
-	 * by caller, assume the violator tuple came from there.
+	 * Determine which relation to complain about.
 	 */
-	onfk = (queryno == RI_PLAN_CHECK_LOOKUPPK);
+	onfk = queryno == RI_PLAN_CHECK_LOOKUPPK;
 	if (onfk)
 	{
 		attnums = riinfo->fk_attnums;
 		rel_oid = fk_rel->rd_id;
-		if (tupdesc == NULL)
-			tupdesc = fk_rel->rd_att;
 	}
 	else
 	{
 		attnums = riinfo->pk_attnums;
 		rel_oid = pk_rel->rd_id;
-		if (tupdesc == NULL)
-			tupdesc = pk_rel->rd_att;
 	}
 
+	/*
+	 * If tupdesc wasn't passed by caller, assume the violator tuple matches
+	 * the descriptor of the violatorslot.
+	 */
+	if (tupdesc == NULL)
+		tupdesc = violatorslot->tts_tupleDescriptor;
+
 	/*
 	 * Check permissions- if the user does not have access to view the data in
 	 * any of the key columns then we don't include the errdetail() below.
@@ -2410,8 +2685,7 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 		initStringInfo(&key_values);
 		for (int idx = 0; idx < riinfo->nkeys; idx++)
 		{
-			int			fnum = attnums[idx];
-			Form_pg_attribute att = TupleDescAttr(tupdesc, fnum - 1);
+			Form_pg_attribute att = TupleDescAttr(tupdesc, idx);
 			char	   *name,
 					   *val;
 			Datum		datum;
@@ -2419,7 +2693,7 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 
 			name = NameStr(att->attname);
 
-			datum = slot_getattr(violatorslot, fnum, &isnull);
+			datum = slot_getattr(violatorslot, idx + 1, &isnull);
 			if (!isnull)
 			{
 				Oid			foutoid;
@@ -2487,24 +2761,34 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
  * Determine the NULL state of all key values in a tuple
  *
  * Returns one of RI_KEYS_ALL_NULL, RI_KEYS_NONE_NULL or RI_KEYS_SOME_NULL.
+ *
+ * If the slot only contains key columns, pass ignore_attnums=true.
  */
 static int
 ri_NullCheck(TupleDesc tupDesc,
 			 TupleTableSlot *slot,
-			 const RI_ConstraintInfo *riinfo, bool rel_is_pk)
+			 const RI_ConstraintInfo *riinfo, bool rel_is_pk,
+			 bool ignore_attnums)
 {
 	const int16 *attnums;
 	bool		allnull = true;
 	bool		nonenull = true;
 
-	if (rel_is_pk)
-		attnums = riinfo->pk_attnums;
-	else
-		attnums = riinfo->fk_attnums;
+	if (!ignore_attnums)
+	{
+		if (rel_is_pk)
+			attnums = riinfo->pk_attnums;
+		else
+			attnums = riinfo->fk_attnums;
+	}
 
 	for (int i = 0; i < riinfo->nkeys; i++)
 	{
-		if (slot_attisnull(slot, attnums[i]))
+		int16		attnum;
+
+		attnum = !ignore_attnums ? attnums[i] : i + 1;
+
+		if (slot_attisnull(slot, attnum))
 			nonenull = false;
 		else
 			allnull = false;
@@ -2888,3 +3172,272 @@ RI_FKey_trigger_type(Oid tgfoid)
 
 	return RI_TRIGGER_NONE;
 }
+
+/*
+ * Turn TID array into a tuplestore. If snapshot is passed, only use tuples
+ * visible by this snapshot.
+ */
+static Tuplestorestate *
+get_event_tuplestore(TriggerData *trigdata, int nkeys, const int16 *attnums,
+					 bool old, TupleDesc tupdesc, Snapshot snapshot)
+{
+	ResourceOwner saveResourceOwner;
+	Tuplestorestate *result;
+	TIDArray   *ta;
+	ItemPointer it;
+	TupleTableSlot *slot;
+	int			i;
+	Datum		values[RI_MAX_NUMKEYS];
+	bool		isnull[RI_MAX_NUMKEYS];
+
+	saveResourceOwner = CurrentResourceOwner;
+	CurrentResourceOwner = CurTransactionResourceOwner;
+	result = tuplestore_begin_heap(false, false, work_mem);
+	CurrentResourceOwner = saveResourceOwner;
+
+	/* XXX Shouldn't tg_trigslot and tg_newslot be the same? */
+	if (old)
+	{
+		ta = trigdata->ri_tids_old;
+		slot = trigdata->tg_trigslot;
+	}
+	else
+	{
+		ta = trigdata->ri_tids_new;
+		slot = trigdata->tg_newslot;
+	}
+
+	it = ta->tids;
+	for (i = 0; i < ta->n; i++)
+	{
+		int			j;
+
+		CHECK_FOR_INTERRUPTS();
+
+		ExecClearTuple(slot);
+
+		if (!table_tuple_fetch_row_version(trigdata->tg_relation, it,
+										   SnapshotAny, slot))
+		{
+			const char *tuple_kind = old ? "tuple1" : "tuple2";
+
+			elog(ERROR, "failed to fetch %s for AFTER trigger", tuple_kind);
+		}
+
+		if (snapshot)
+		{
+			/*
+			 * We should not even consider checking the row if it is no longer
+			 * valid, since it was either deleted (so the deferred check
+			 * should be skipped) or updated (in which case only the latest
+			 * version of the row should be checked).  Test its liveness
+			 * according to SnapshotSelf. We need pin and lock on the buffer
+			 * to call HeapTupleSatisfiesVisibility.  Caller should be holding
+			 * pin, but not lock.
+			 */
+			if (!table_tuple_satisfies_snapshot(trigdata->tg_relation, slot,
+												snapshot))
+				continue;
+
+			/*
+			 * In fact the snapshot is passed iff the slot contains a tuple of
+			 * the FK table being inserted / updated, so perform one more
+			 * related in this branch while we have the tuple in the slot. If
+			 * we tested this later, we might need to remove the tuple later,
+			 * however tuplestore.c does not support such an operation.
+			 */
+			if (!RI_FKey_check_query_required(trigdata->tg_trigger,
+											  trigdata->tg_relation, slot))
+				continue;
+		}
+
+		/*
+		 * Only store the key attributes.
+		 */
+		for (j = 0; j < nkeys; j++)
+			values[j] = slot_getattr(slot, attnums[j], &isnull[j]);
+
+		tuplestore_putvalues(result, tupdesc, values, isnull);
+		it++;
+	}
+
+	return result;
+}
+
+/*
+ * Like get_event_tuplestore(), but put both old and new key values into the
+ * same tuple. If the query (see RI_FKey_cascade_upd) used two tuplestores, it
+ * whould have to join them somehow, but there's not suitable join column.
+ */
+static Tuplestorestate *
+get_event_tuplestore_for_cascade_update(TriggerData *trigdata,
+										const RI_ConstraintInfo *riinfo)
+{
+	ResourceOwner saveResourceOwner;
+	Tuplestorestate *result;
+	TIDArray   *ta_old,
+			   *ta_new;
+	ItemPointer it_old,
+				it_new;
+	TupleTableSlot *slot_old,
+			   *slot_new;
+	int			i;
+	Datum	   *values,
+			   *key_values;
+	bool	   *nulls,
+			   *key_nulls;
+	MemoryContext tuple_context;
+	Relation	rel = trigdata->tg_relation;
+	TupleDesc	desc_rel = RelationGetDescr(rel);
+
+	saveResourceOwner = CurrentResourceOwner;
+	CurrentResourceOwner = CurTransactionResourceOwner;
+	result = tuplestore_begin_heap(false, false, work_mem);
+	CurrentResourceOwner = saveResourceOwner;
+
+	/*
+	 * This context will be used for the contents of "values".
+	 *
+	 * CurrentMemoryContext should be the "batch context", as passed to
+	 * AfterTriggerExecuteRI().
+	 */
+	tuple_context =
+		AllocSetContextCreate(CurrentMemoryContext,
+							  "AfterTriggerCascadeUpdateContext",
+							  ALLOCSET_DEFAULT_SIZES);
+
+	ta_old = trigdata->ri_tids_old;
+	ta_new = trigdata->ri_tids_new;
+	Assert(ta_old->n == ta_new->n);
+
+	slot_old = trigdata->tg_trigslot;
+	slot_new = trigdata->tg_newslot;
+
+	key_values = (Datum *) palloc(riinfo->nkeys * 2 * sizeof(Datum));
+	key_nulls = (bool *) palloc(riinfo->nkeys * 2 * sizeof(bool));
+	values = (Datum *) palloc(desc_rel->natts * sizeof(Datum));
+	nulls = (bool *) palloc(desc_rel->natts * sizeof(bool));
+
+	it_old = ta_old->tids;
+	it_new = ta_new->tids;
+	for (i = 0; i < ta_old->n; i++)
+	{
+		MemoryContext oldcxt;
+
+		MemoryContextReset(tuple_context);
+		oldcxt = MemoryContextSwitchTo(tuple_context);
+
+		/*
+		 * Add the new values, followed by the old ones. This order is
+		 * expected to satisfy the parameters of the query generated in
+		 * ri_cascade_upd_query_single_row().
+		 */
+		add_key_values(slot_new, riinfo, trigdata->tg_relation, it_new,
+					   key_values, key_nulls, values, nulls, 0);
+		add_key_values(slot_old, riinfo, trigdata->tg_relation, it_old,
+					   key_values, key_nulls, values, nulls, riinfo->nkeys);
+		MemoryContextSwitchTo(oldcxt);
+
+		tuplestore_putvalues(result, riinfo->slot_both->tts_tupleDescriptor,
+							 key_values, key_nulls);
+
+		it_old++;
+		it_new++;
+	}
+	MemoryContextDelete(tuple_context);
+
+	return result;
+}
+
+/*
+ * Add key attributes "attnums" of relation "rel" to "tupdesc", starting at
+ * position "first".
+ */
+static void
+add_key_attrs_to_tupdesc(TupleDesc tupdesc, Relation rel,
+						 const RI_ConstraintInfo *riinfo, int16 *attnums,
+						 int first, bool generate_attnames)
+{
+	int			i;
+
+	for (i = 0; i < riinfo->nkeys; i++)
+	{
+		Oid			atttypid;
+		const char *attname;
+		char		attname_loc[NAMEDATALEN];
+		Form_pg_attribute att;
+
+		atttypid = RIAttType(rel, attnums[i]);
+
+		if (!generate_attnames)
+			attname = RIAttName(rel, attnums[i]);
+		else
+		{
+			const char *kind;
+
+			/*
+			 * Tne NEW/OLD order does not matter for bulk update, but the
+			 * tuple must start with the NEW values so that it fits the query
+			 * to check a single row when processing ON UPDATE CASCADE --- see
+			 * ri_cascade_upd_query_single_row().
+			 */
+			kind = first == 1 ? "new" : "old";
+
+			/*
+			 * Generate unique names instead of e.g. using prefix to
+			 * distinguish the old values from new ones. The prefix might be a
+			 * problem due to the limited attribute name length.
+			 */
+			snprintf(attname_loc, NAMEDATALEN, "pkatt%d_%s", i + 1, kind);
+			attname = attname_loc;
+		}
+
+		att = tupdesc->attrs;
+		TupleDescInitEntry(tupdesc, first + i, attname, atttypid,
+						   att->atttypmod, att->attndims);
+		att++;
+	}
+}
+
+/*
+ * Retrieve tuple using given slot, deform it and add the attribute values to
+ * "key_values" and "key_null" arrays. "values" and "nulls" is a workspace to
+ * deform the tuple into. "first" tells where in the output array we should
+ * start.
+ */
+static void
+add_key_values(TupleTableSlot *slot, const RI_ConstraintInfo *riinfo,
+			   Relation rel, ItemPointer ip,
+			   Datum *key_values, bool *key_nulls,
+			   Datum *values, bool *nulls, int first)
+{
+	HeapTuple	tuple;
+	bool		shouldfree;
+	int			i,
+				c;
+
+	ExecClearTuple(slot);
+	if (!table_tuple_fetch_row_version(rel, ip, SnapshotAny, slot))
+	{
+		const char *tuple_kind = first == 0 ? "tuple1" : "tuple2";
+
+		elog(ERROR, "failed to fetch %s for AFTER trigger", tuple_kind);
+	}
+	tuple = ExecFetchSlotHeapTuple(slot, false, &shouldfree);
+
+	heap_deform_tuple(tuple, slot->tts_tupleDescriptor, values, nulls);
+
+	/* Pick the key values and store them in the output arrays. */
+	c = first;
+	for (i = 0; i < riinfo->nkeys; i++)
+	{
+		int16		attnum = riinfo->pk_attnums[i];
+
+		key_values[c] = values[attnum - 1];
+		key_nulls[c] = nulls[attnum - 1];
+		c++;
+	}
+
+	if (shouldfree)
+		pfree(tuple);
+}
diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h
index a40ddf5db5..71566a3b7c 100644
--- a/src/include/commands/trigger.h
+++ b/src/include/commands/trigger.h
@@ -27,19 +27,42 @@
 
 typedef uint32 TriggerEvent;
 
+/*
+ * An intermediate storage for TIDs, in order to process multiple events by a
+ * single call of RI trigger.
+ *
+ * XXX Introduce a size limit and make caller of add_tid() aware of it?
+ */
+typedef struct TIDArray
+{
+	ItemPointerData *tids;
+	uint64		n;
+	uint64		nmax;
+} TIDArray;
+
 typedef struct TriggerData
 {
 	NodeTag		type;
+	int			tgindx;
 	TriggerEvent tg_event;
 	Relation	tg_relation;
 	HeapTuple	tg_trigtuple;
 	HeapTuple	tg_newtuple;
 	Trigger    *tg_trigger;
+	bool		is_ri_trigger;
 	TupleTableSlot *tg_trigslot;
 	TupleTableSlot *tg_newslot;
 	Tuplestorestate *tg_oldtable;
 	Tuplestorestate *tg_newtable;
 	const Bitmapset *tg_updatedcols;
+
+	TupleDesc	desc;
+
+	/*
+	 * RI triggers receive TIDs and retrieve the tuples before they're needed.
+	 */
+	TIDArray   *ri_tids_old;
+	TIDArray   *ri_tids_new;
 } TriggerData;
 
 /*
@@ -262,6 +285,8 @@ extern bool RI_FKey_pk_upd_check_required(Trigger *trigger, Relation pk_rel,
 										  TupleTableSlot *old_slot, TupleTableSlot *new_slot);
 extern bool RI_FKey_fk_upd_check_required(Trigger *trigger, Relation fk_rel,
 										  TupleTableSlot *old_slot, TupleTableSlot *new_slot);
+extern TupleDesc RI_FKey_fk_attributes(Trigger *trigger, Relation trig_rel,
+									   const int16 **attnums_p);
 extern bool RI_Initial_Check(Trigger *trigger,
 							 Relation fk_rel, Relation pk_rel);
 extern void RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel,
-- 
2.20.1

