From cdd01791055122cd3c68478d33db02d5d1e6f31c Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Wed, 11 Dec 2024 19:22:42 +0100
Subject: [PATCH 5/8] Preserve visibility information of the concurrent data
 changes.

As explained in the commit message of the preceding patch of the series, the
data changes done by applications while VACUUM FULL / CLUSTER CONCURRENTLY is
copying the table contents to a new file are decoded from WAL and eventually
also applied to the new file. To reduce the complexity a little bit, the
preceding patch uses the current transaction (i.e. transaction opened by the
VACUUM FULL / CLUSTER command) to execute those INSERT, UPDATE and DELETE
commands.

However, neither VACUUM nor CLUSTER is expected to change visibility of
tuples. Therefore, this patch fixes the handling of the "concurrent data
changes". Now the tuples written into the new table storage have the same XID
and command ID (CID) as they had in the old storage.

Related change we do here is that the data changes (INSERT, UPDATE, DELETE) we
"replay" on the new storage are not logically decoded. First, the logical
decoding subsystem does not expect that already committed transaction is
decoded again. Second, repeated decoding would be just wasted effort.
---
 src/backend/access/common/toast_internals.c   |   3 +-
 src/backend/access/heap/heapam.c              |  73 ++++++++----
 src/backend/access/heap/heapam_handler.c      |  14 ++-
 src/backend/access/transam/xact.c             |  52 ++++++++
 src/backend/commands/cluster.c                | 111 ++++++++++++++++--
 src/backend/replication/logical/decode.c      |  76 ++++++++++--
 src/backend/replication/logical/snapbuild.c   |  22 ++--
 .../pgoutput_cluster/pgoutput_cluster.c       |  68 +++++++++--
 src/include/access/heapam.h                   |  15 ++-
 src/include/access/heapam_xlog.h              |   2 +
 src/include/access/xact.h                     |   2 +
 src/include/commands/cluster.h                |  18 +++
 src/include/utils/snapshot.h                  |   3 +
 13 files changed, 389 insertions(+), 70 deletions(-)

diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c
index 1939cfb4d2..47b05b2135 100644
--- a/src/backend/access/common/toast_internals.c
+++ b/src/backend/access/common/toast_internals.c
@@ -320,7 +320,8 @@ toast_save_datum(Relation rel, Datum value,
 		memcpy(VARDATA(&chunk_data), data_p, chunk_size);
 		toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
 
-		heap_insert(toastrel, toasttup, mycid, options, NULL);
+		heap_insert(toastrel, toasttup, GetCurrentTransactionId(), mycid,
+					options, NULL);
 
 		/*
 		 * Create the index entry.  We cheat a little here by not using
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index a842b84415..d56d0f5164 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -58,7 +58,8 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
 								  Buffer newbuf, HeapTuple oldtup,
 								  HeapTuple newtup, HeapTuple old_key_tuple,
-								  bool all_visible_cleared, bool new_all_visible_cleared);
+								  bool all_visible_cleared, bool new_all_visible_cleared,
+								  bool wal_logical);
 #ifdef USE_ASSERT_CHECKING
 static void check_lock_if_inplace_updateable_rel(Relation relation,
 												 ItemPointer otid,
@@ -1966,7 +1967,7 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
 /*
  *	heap_insert		- insert tuple into a heap
  *
- * The new tuple is stamped with current transaction ID and the specified
+ * The new tuple is stamped with specified transaction ID and the specified
  * command ID.
  *
  * See table_tuple_insert for comments about most of the input flags, except
@@ -1982,15 +1983,16 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
  * reflected into *tup.
  */
 void
-heap_insert(Relation relation, HeapTuple tup, CommandId cid,
-			int options, BulkInsertState bistate)
+heap_insert(Relation relation, HeapTuple tup, TransactionId xid,
+			CommandId cid, int options, BulkInsertState bistate)
 {
-	TransactionId xid = GetCurrentTransactionId();
 	HeapTuple	heaptup;
 	Buffer		buffer;
 	Buffer		vmbuffer = InvalidBuffer;
 	bool		all_visible_cleared = false;
 
+	Assert(TransactionIdIsValid(xid));
+
 	/* Cheap, simplistic check that the tuple matches the rel's rowtype. */
 	Assert(HeapTupleHeaderGetNatts(tup->t_data) <=
 		   RelationGetNumberOfAttributes(relation));
@@ -2621,7 +2623,8 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 void
 simple_heap_insert(Relation relation, HeapTuple tup)
 {
-	heap_insert(relation, tup, GetCurrentCommandId(true), 0, NULL);
+	heap_insert(relation, tup, GetCurrentTransactionId(),
+				GetCurrentCommandId(true), 0, NULL);
 }
 
 /*
@@ -2678,11 +2681,11 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask)
  */
 TM_Result
 heap_delete(Relation relation, ItemPointer tid,
-			CommandId cid, Snapshot crosscheck, bool wait,
-			TM_FailureData *tmfd, bool changingPart)
+			TransactionId xid, CommandId cid, Snapshot crosscheck, bool wait,
+			TM_FailureData *tmfd, bool changingPart,
+			bool wal_logical)
 {
 	TM_Result	result;
-	TransactionId xid = GetCurrentTransactionId();
 	ItemId		lp;
 	HeapTupleData tp;
 	Page		page;
@@ -2699,6 +2702,7 @@ heap_delete(Relation relation, ItemPointer tid,
 	bool		old_key_copied = false;
 
 	Assert(ItemPointerIsValid(tid));
+	Assert(TransactionIdIsValid(xid));
 
 	/*
 	 * Forbid this during a parallel operation, lest it allocate a combo CID.
@@ -2924,7 +2928,8 @@ l1:
 	 * Compute replica identity tuple before entering the critical section so
 	 * we don't PANIC upon a memory allocation failure.
 	 */
-	old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied);
+	old_key_tuple = wal_logical ?
+		ExtractReplicaIdentity(relation, &tp, true, &old_key_copied) : NULL;
 
 	/*
 	 * If this is the first possibly-multixact-able operation in the current
@@ -2992,8 +2997,12 @@ l1:
 		/*
 		 * For logical decode we need combo CIDs to properly decode the
 		 * catalog
+		 *
+		 * Like in heap_insert(), visibility is unchanged when called from
+		 * VACUUM FULL / CLUSTER.
 		 */
-		if (RelationIsAccessibleInLogicalDecoding(relation))
+		if (wal_logical &&
+			RelationIsAccessibleInLogicalDecoding(relation))
 			log_heap_new_cid(relation, &tp);
 
 		xlrec.flags = 0;
@@ -3014,6 +3023,15 @@ l1:
 				xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY;
 		}
 
+		/*
+		 * Unlike UPDATE, DELETE is decoded even if there is no old key, so it
+		 * does not help to clear both XLH_DELETE_CONTAINS_OLD_TUPLE and
+		 * XLH_DELETE_CONTAINS_OLD_KEY. Thus we need an extra flag. TODO
+		 * Consider not decoding tuples w/o the old tuple/key instead.
+		 */
+		if (!wal_logical)
+			xlrec.flags |= XLH_DELETE_NO_LOGICAL;
+
 		XLogBeginInsert();
 		XLogRegisterData((char *) &xlrec, SizeOfHeapDelete);
 
@@ -3103,10 +3121,11 @@ simple_heap_delete(Relation relation, ItemPointer tid)
 	TM_Result	result;
 	TM_FailureData tmfd;
 
-	result = heap_delete(relation, tid,
+	result = heap_delete(relation, tid, GetCurrentTransactionId(),
 						 GetCurrentCommandId(true), InvalidSnapshot,
 						 true /* wait for commit */ ,
-						 &tmfd, false /* changingPart */ );
+						 &tmfd, false, /* changingPart */
+						 true /* wal_logical */);
 	switch (result)
 	{
 		case TM_SelfModified:
@@ -3145,12 +3164,11 @@ simple_heap_delete(Relation relation, ItemPointer tid)
  */
 TM_Result
 heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
-			CommandId cid, Snapshot crosscheck, bool wait,
-			TM_FailureData *tmfd, LockTupleMode *lockmode,
-			TU_UpdateIndexes *update_indexes)
+			TransactionId xid, CommandId cid, Snapshot crosscheck,
+			bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode,
+			TU_UpdateIndexes *update_indexes, bool wal_logical)
 {
 	TM_Result	result;
-	TransactionId xid = GetCurrentTransactionId();
 	Bitmapset  *hot_attrs;
 	Bitmapset  *sum_attrs;
 	Bitmapset  *key_attrs;
@@ -3190,6 +3208,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 				infomask2_new_tuple;
 
 	Assert(ItemPointerIsValid(otid));
+	Assert(TransactionIdIsValid(xid));
 
 	/* Cheap, simplistic check that the tuple matches the rel's rowtype. */
 	Assert(HeapTupleHeaderGetNatts(newtup->t_data) <=
@@ -3982,8 +4001,12 @@ l2:
 		/*
 		 * For logical decoding we need combo CIDs to properly decode the
 		 * catalog.
+		 *
+		 * Like in heap_insert(), visibility is unchanged when called from
+		 * VACUUM FULL / CLUSTER.
 		 */
-		if (RelationIsAccessibleInLogicalDecoding(relation))
+		if (wal_logical &&
+			RelationIsAccessibleInLogicalDecoding(relation))
 		{
 			log_heap_new_cid(relation, &oldtup);
 			log_heap_new_cid(relation, heaptup);
@@ -3993,7 +4016,8 @@ l2:
 								 newbuf, &oldtup, heaptup,
 								 old_key_tuple,
 								 all_visible_cleared,
-								 all_visible_cleared_new);
+								 all_visible_cleared_new,
+								 wal_logical);
 		if (newbuf != buffer)
 		{
 			PageSetLSN(BufferGetPage(newbuf), recptr);
@@ -4348,10 +4372,10 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup,
 	TM_FailureData tmfd;
 	LockTupleMode lockmode;
 
-	result = heap_update(relation, otid, tup,
+	result = heap_update(relation, otid, tup, GetCurrentTransactionId(),
 						 GetCurrentCommandId(true), InvalidSnapshot,
 						 true /* wait for commit */ ,
-						 &tmfd, &lockmode, update_indexes);
+						 &tmfd, &lockmode, update_indexes, true);
 	switch (result)
 	{
 		case TM_SelfModified:
@@ -8682,7 +8706,8 @@ static XLogRecPtr
 log_heap_update(Relation reln, Buffer oldbuf,
 				Buffer newbuf, HeapTuple oldtup, HeapTuple newtup,
 				HeapTuple old_key_tuple,
-				bool all_visible_cleared, bool new_all_visible_cleared)
+				bool all_visible_cleared, bool new_all_visible_cleared,
+				bool wal_logical)
 {
 	xl_heap_update xlrec;
 	xl_heap_header xlhdr;
@@ -8693,10 +8718,12 @@ log_heap_update(Relation reln, Buffer oldbuf,
 				suffixlen = 0;
 	XLogRecPtr	recptr;
 	Page		page = BufferGetPage(newbuf);
-	bool		need_tuple_data = RelationIsLogicallyLogged(reln);
+	bool		need_tuple_data;
 	bool		init;
 	int			bufflags;
 
+	need_tuple_data = RelationIsLogicallyLogged(reln) && wal_logical;
+
 	/* Caller should not call me on a non-WAL-logged relation */
 	Assert(RelationNeedsWAL(reln));
 
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 06cd85b34b..16525a4669 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -252,7 +252,8 @@ heapam_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
 	tuple->t_tableOid = slot->tts_tableOid;
 
 	/* Perform the insertion, and copy the resulting ItemPointer */
-	heap_insert(relation, tuple, cid, options, bistate);
+	heap_insert(relation, tuple, GetCurrentTransactionId(), cid, options,
+				bistate);
 	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
 	if (shouldFree)
@@ -275,7 +276,8 @@ heapam_tuple_insert_speculative(Relation relation, TupleTableSlot *slot,
 	options |= HEAP_INSERT_SPECULATIVE;
 
 	/* Perform the insertion, and copy the resulting ItemPointer */
-	heap_insert(relation, tuple, cid, options, bistate);
+	heap_insert(relation, tuple, GetCurrentTransactionId(), cid, options,
+				bistate);
 	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
 	if (shouldFree)
@@ -309,7 +311,8 @@ heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
 	 * the storage itself is cleaning the dead tuples by itself, it is the
 	 * time to call the index tuple deletion also.
 	 */
-	return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart);
+	return heap_delete(relation, tid, GetCurrentTransactionId(), cid,
+					   crosscheck, wait, tmfd, changingPart, true);
 }
 
 
@@ -327,8 +330,9 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 	slot->tts_tableOid = RelationGetRelid(relation);
 	tuple->t_tableOid = slot->tts_tableOid;
 
-	result = heap_update(relation, otid, tuple, cid, crosscheck, wait,
-						 tmfd, lockmode, update_indexes);
+	result = heap_update(relation, otid, tuple, GetCurrentTransactionId(),
+						 cid, crosscheck, wait,
+						 tmfd, lockmode, update_indexes, true);
 	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
 	/*
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 3ebd7c4041..c227520757 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -125,6 +125,18 @@ static FullTransactionId XactTopFullTransactionId = {InvalidTransactionId};
 static int	nParallelCurrentXids = 0;
 static TransactionId *ParallelCurrentXids;
 
+/*
+ * Another case that requires TransactionIdIsCurrentTransactionId() to behave
+ * specially is when CLUSTER CONCURRENTLY is processing data changes made in
+ * the old storage of a table by other transactions. When applying the changes
+ * to the new storage, the backend executing the CLUSTER command needs to act
+ * on behalf on those other transactions. The transactions responsible for the
+ * changes in the old storage are stored in this array, sorted by
+ * xidComparator.
+ */
+static int				 nClusterCurrentXids = 0;
+static TransactionId	*ClusterCurrentXids = NULL;
+
 /*
  * Miscellaneous flag bits to record events which occur on the top level
  * transaction. These flags are only persisted in MyXactFlags and are intended
@@ -971,6 +983,8 @@ TransactionIdIsCurrentTransactionId(TransactionId xid)
 		int			low,
 					high;
 
+		Assert(nClusterCurrentXids == 0);
+
 		low = 0;
 		high = nParallelCurrentXids - 1;
 		while (low <= high)
@@ -990,6 +1004,21 @@ TransactionIdIsCurrentTransactionId(TransactionId xid)
 		return false;
 	}
 
+	/*
+	 * When executing CLUSTER CONCURRENTLY, the array of current transactions
+	 * is given.
+	 */
+	if (nClusterCurrentXids > 0)
+	{
+		Assert(nParallelCurrentXids == 0);
+
+		return bsearch(&xid,
+					   ClusterCurrentXids,
+					   nClusterCurrentXids,
+					   sizeof(TransactionId),
+					   xidComparator) != NULL;
+	}
+
 	/*
 	 * We will return true for the Xid of the current subtransaction, any of
 	 * its subcommitted children, any of its parents, or any of their
@@ -5628,6 +5657,29 @@ EndParallelWorkerTransaction(void)
 	CurrentTransactionState->blockState = TBLOCK_DEFAULT;
 }
 
+/*
+ * SetClusterCurrentXids
+ *		Set the XID array that TransactionIdIsCurrentTransactionId() should
+ *		use.
+ */
+void
+SetClusterCurrentXids(TransactionId *xip, int xcnt)
+{
+	ClusterCurrentXids = xip;
+	nClusterCurrentXids = xcnt;
+}
+
+/*
+ * ResetClusterCurrentXids
+ *		Undo the effect of SetClusterCurrentXids().
+ */
+void
+ResetClusterCurrentXids(void)
+{
+	ClusterCurrentXids = NULL;
+	nClusterCurrentXids = 0;
+}
+
 /*
  * ShowTransactionState
  *		Debug support
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 4a4b51f77d..1cb3201c5a 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -200,6 +200,7 @@ static void apply_concurrent_delete(Relation rel, HeapTuple tup_target,
 									ConcurrentChange *change);
 static HeapTuple find_target_tuple(Relation rel, ScanKey key, int nkeys,
 								   HeapTuple tup_key,
+								   Snapshot snapshot,
 								   IndexInsertState *iistate,
 								   TupleTableSlot *ident_slot,
 								   IndexScanDesc *scan_p);
@@ -2963,6 +2964,9 @@ setup_logical_decoding(Oid relid, const char *slotname, TupleDesc tupdesc)
 	dstate->relid = relid;
 	dstate->tstore = tuplestore_begin_heap(false, false,
 										   maintenance_work_mem);
+#ifdef USE_ASSERT_CHECKING
+	dstate->last_change_xid = InvalidTransactionId;
+#endif
 	dstate->tupdesc = tupdesc;
 
 	/* Initialize the descriptor to store the changes ... */
@@ -3118,6 +3122,7 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
 					tup_exist;
 		char	   *change_raw, *src;
 		ConcurrentChange change;
+		Snapshot	snapshot;
 		bool		isnull[1];
 		Datum		values[1];
 
@@ -3186,8 +3191,30 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
 
 			/*
 			 * Find the tuple to be updated or deleted.
+			 *
+			 * As the table being CLUSTERed concurrently is considered an
+			 * "user catalog", new CID is WAL-logged and decoded. And since we
+			 * use the same XID that the original DMLs did, the snapshot used
+			 * for the logical decoding (by now converted to a non-historic
+			 * MVCC snapshot) should see the tuples inserted previously into
+			 * the new heap and/or updated there.
+			 */
+			snapshot = change.snapshot;
+
+			/*
+			 * Set what should be considered current transaction (and
+			 * subtransactions) during visibility check.
+			 *
+			 * Note that this snapshot was created from a historic snapshot
+			 * using SnapBuildMVCCFromHistoric(), which does not touch
+			 * 'subxip'. Thus, unlike in a regular MVCC snapshot, the array
+			 * only contains the transactions whose data changes we are
+			 * applying, and its subtransactions. That's exactly what we need
+			 * to check if particular xact is a "current transaction:".
 			 */
-			tup_exist = find_target_tuple(rel, key, nkeys, tup_key,
+			SetClusterCurrentXids(snapshot->subxip, snapshot->subxcnt);
+
+			tup_exist = find_target_tuple(rel, key, nkeys, tup_key, snapshot,
 										  iistate, ident_slot, &ind_scan);
 			if (tup_exist == NULL)
 				elog(ERROR, "Failed to find target tuple");
@@ -3198,6 +3225,8 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
 			else
 				apply_concurrent_delete(rel, tup_exist, &change);
 
+			ResetClusterCurrentXids();
+
 			if (tup_old != NULL)
 			{
 				pfree(tup_old);
@@ -3210,11 +3239,14 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
 		else
 			elog(ERROR, "Unrecognized kind of change: %d", change.kind);
 
-		/* If there's any change, make it visible to the next iteration. */
-		if (change.kind != CHANGE_UPDATE_OLD)
+		/* Free the snapshot if this is the last change that needed it. */
+		Assert(change.snapshot->active_count > 0);
+		change.snapshot->active_count--;
+		if (change.snapshot->active_count == 0)
 		{
-			CommandCounterIncrement();
-			UpdateActiveSnapshotCommandId();
+			if (change.snapshot == dstate->snapshot)
+				dstate->snapshot = NULL;
+			FreeSnapshot(change.snapshot);
 		}
 
 		/* TTSOpsMinimalTuple has .get_heap_tuple==NULL. */
@@ -3234,10 +3266,30 @@ static void
 apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 						IndexInsertState *iistate, TupleTableSlot *index_slot)
 {
+	Snapshot	snapshot = change->snapshot;
 	List	   *recheck;
 
+	/*
+	 * For INSERT, the visibility information is not important, but we use the
+	 * snapshot to get CID. Index functions might need the whole snapshot
+	 * anyway.
+	 */
+	SetClusterCurrentXids(snapshot->subxip, snapshot->subxcnt);
 
-	heap_insert(rel, tup, GetCurrentCommandId(true), HEAP_INSERT_NO_LOGICAL, NULL);
+	/*
+	 * Write the tuple into the new heap.
+	 *
+	 * The snapshot is the one we used to decode the insert (though converted
+	 * to "non-historic" MVCC snapshot), i.e. the snapshot's curcid is the
+	 * tuple CID incremented by one (due to the "new CID" WAL record that got
+	 * written along with the INSERT record). Thus if we want to use the
+	 * original CID, we need to subtract 1 from curcid.
+	 */
+	Assert(snapshot->curcid != InvalidCommandId &&
+		   snapshot->curcid > FirstCommandId);
+
+	heap_insert(rel, tup, change->xid, snapshot->curcid - 1,
+				HEAP_INSERT_NO_LOGICAL, NULL);
 
 	/*
 	 * Update indexes.
@@ -3245,6 +3297,7 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 	 * In case functions in the index need the active snapshot and caller
 	 * hasn't set one.
 	 */
+	PushActiveSnapshot(snapshot);
 	ExecStoreHeapTuple(tup, index_slot, false);
 	recheck = ExecInsertIndexTuples(iistate->rri,
 									index_slot,
@@ -3255,6 +3308,8 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 									NIL, /* arbiterIndexes */
 									false	/* onlySummarizing */
 		);
+	PopActiveSnapshot();
+	ResetClusterCurrentXids();
 
 	/*
 	 * If recheck is required, it must have been preformed on the source
@@ -3272,18 +3327,36 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 						TupleTableSlot *index_slot)
 {
 	List	   *recheck;
+	LockTupleMode	lockmode;
 	TU_UpdateIndexes	update_indexes;
+	TM_Result	res;
+	Snapshot snapshot	= change->snapshot;
+	TM_FailureData tmfd;
 
 	/*
 	 * Write the new tuple into the new heap. ('tup' gets the TID assigned
 	 * here.)
+	 *
+	 * Regarding CID, see the comment in apply_concurrent_insert().
 	 */
-	simple_heap_update(rel, &tup_target->t_self, tup, &update_indexes);
+	Assert(snapshot->curcid != InvalidCommandId &&
+		   snapshot->curcid > FirstCommandId);
+
+	res = heap_update(rel, &tup_target->t_self, tup,
+					  change->xid, snapshot->curcid - 1,
+					  InvalidSnapshot,
+					  false, /* no wait - only we are doing changes */
+					  &tmfd, &lockmode, &update_indexes,
+					  /* wal_logical */
+					  false);
+	if (res != TM_Ok)
+		ereport(ERROR, (errmsg("failed to apply concurrent UPDATE")));
 
 	ExecStoreHeapTuple(tup, index_slot, false);
 
 	if (update_indexes != TU_None)
 	{
+		PushActiveSnapshot(snapshot);
 		recheck = ExecInsertIndexTuples(iistate->rri,
 										index_slot,
 										iistate->estate,
@@ -3293,6 +3366,7 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 										NIL, /* arbiterIndexes */
 										/* onlySummarizing */
 										update_indexes == TU_Summarizing);
+		PopActiveSnapshot();
 		list_free(recheck);
 	}
 
@@ -3303,7 +3377,22 @@ static void
 apply_concurrent_delete(Relation rel, HeapTuple tup_target,
 						ConcurrentChange *change)
 {
-	simple_heap_delete(rel, &tup_target->t_self);
+	TM_Result	res;
+	TM_FailureData tmfd;
+	Snapshot	snapshot = change->snapshot;
+
+	/* Regarding CID, see the comment in apply_concurrent_insert(). */
+	Assert(snapshot->curcid != InvalidCommandId &&
+		   snapshot->curcid > FirstCommandId);
+
+	res = heap_delete(rel, &tup_target->t_self, change->xid,
+					  snapshot->curcid - 1, InvalidSnapshot, false,
+					  &tmfd, false,
+					  /* wal_logical */
+					  false);
+
+	if (res != TM_Ok)
+		ereport(ERROR, (errmsg("failed to apply concurrent DELETE")));
 
 	pgstat_progress_incr_param(PROGRESS_CLUSTER_HEAP_TUPLES_DELETED, 1);
 }
@@ -3321,7 +3410,7 @@ apply_concurrent_delete(Relation rel, HeapTuple tup_target,
  */
 static HeapTuple
 find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key,
-				  IndexInsertState *iistate,
+				  Snapshot snapshot, IndexInsertState *iistate,
 				  TupleTableSlot *ident_slot, IndexScanDesc *scan_p)
 {
 	IndexScanDesc scan;
@@ -3329,7 +3418,7 @@ find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key,
 	int2vector *ident_indkey;
 	HeapTuple	result = NULL;
 
-	scan = index_beginscan(rel, iistate->ident_index, GetActiveSnapshot(),
+	scan = index_beginscan(rel, iistate->ident_index, snapshot,
 						   nkeys, 0);
 	*scan_p = scan;
 	index_rescan(scan, key, nkeys, NULL, 0);
@@ -3401,6 +3490,8 @@ process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 	}
 	PG_FINALLY();
 	{
+		ResetClusterCurrentXids();
+
 		if (rel_src)
 			rel_dst->rd_toastoid = InvalidOid;
 	}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 06a9d4a61f..140b063a6c 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -469,9 +469,18 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	SnapBuild  *builder = ctx->snapshot_builder;
 
 	/*
-	 * Check if CLUSTER CONCURRENTLY is being performed by this backend. If
-	 * so, only decode data changes of the table that it is processing, and
-	 * the changes of its TOAST relation.
+	 * If the change is not intended for logical decoding, do not even
+	 * establish transaction for it. This is particularly important if the
+	 * record was generated by CLUSTER CONCURRENTLY because this command uses
+	 * the original XID when doing changes in the new storage. The decoding
+	 * subsystem probably does not expect to see the same transaction multiple
+	 * times.
+	 */
+
+	/*
+	 * First, check if CLUSTER CONCURRENTLY is being performed by this
+	 * backend. If so, only decode data changes of the table that it is
+	 * processing, and the changes of its TOAST relation.
 	 *
 	 * (TOAST locator should not be set unless the main is.)
 	 */
@@ -491,6 +500,60 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			return;
 	}
 
+	/*
+	 * Second, skip records which do not contain sufficient information for
+	 * the decoding.
+	 *
+	 * The backend executing CLUSTER CONCURRENTLY should not return here
+	 * because the records which passed the checks above should contain be
+	 * eligible for decoding. However, CLUSTER CONCURRENTLY generates WAL when
+	 * writing data into the new table, which should not be decoded by the
+	 * other backends. This is where the other backends skip them.
+	 */
+	switch (info)
+	{
+		case XLOG_HEAP_INSERT:
+		{
+			xl_heap_insert	*rec;
+
+			rec = (xl_heap_insert *) XLogRecGetData(buf->record);
+			/*
+			 * (Besides insertion into the main heap by CLUSTER CONCURRENTLY,
+			 * this does happen when raw_heap_insert marks the TOAST record as
+			 * HEAP_INSERT_NO_LOGICAL).
+			 */
+			if ((rec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE) == 0)
+				return;
+
+			break;
+		}
+
+		case XLOG_HEAP_HOT_UPDATE:
+		case XLOG_HEAP_UPDATE:
+		{
+			xl_heap_update	*rec;
+
+			rec = (xl_heap_update *) XLogRecGetData(buf->record);
+			if ((rec->flags &
+				 (XLH_UPDATE_CONTAINS_NEW_TUPLE |
+				  XLH_UPDATE_CONTAINS_OLD_TUPLE |
+				  XLH_UPDATE_CONTAINS_OLD_KEY)) == 0)
+				return;
+
+			break;
+		}
+
+		case XLOG_HEAP_DELETE:
+		{
+			xl_heap_delete	*rec;
+
+			rec = (xl_heap_delete *) XLogRecGetData(buf->record);
+			if (rec->flags & XLH_DELETE_NO_LOGICAL)
+				return;
+			break;
+		}
+	}
+
 	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
 
 	/*
@@ -923,13 +986,6 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	xlrec = (xl_heap_insert *) XLogRecGetData(r);
 
-	/*
-	 * Ignore insert records without new tuples (this does happen when
-	 * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
-	 */
-	if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
-		return;
-
 	/* only interested in our database */
 	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
 	if (target_locator.dbOid != ctx->slot->data.database)
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 61a57053c7..09ce0db562 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -155,7 +155,7 @@ static bool ExportInProgress = false;
 static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
 
 /* snapshot building/manipulation/distribution functions */
-static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
+static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, XLogRecPtr lsn);
 
 static void SnapBuildFreeSnapshot(Snapshot snap);
 
@@ -352,12 +352,17 @@ SnapBuildSnapDecRefcount(Snapshot snap)
  * Build a new snapshot, based on currently committed catalog-modifying
  * transactions.
  *
+ * 'lsn' is the location of the commit record (of a catalog-changing
+ * transaction) that triggered creation of the snapshot. Pass
+ * InvalidXLogRecPtr for the transaction base snapshot or if it the user of
+ * the snapshot should not need the LSN.
+ *
  * In-progress transactions with catalog access are *not* allowed to modify
  * these snapshots; they have to copy them and fill in appropriate ->curcid
  * and ->subxip/subxcnt values.
  */
 static Snapshot
-SnapBuildBuildSnapshot(SnapBuild *builder)
+SnapBuildBuildSnapshot(SnapBuild *builder, XLogRecPtr lsn)
 {
 	Snapshot	snapshot;
 	Size		ssize;
@@ -425,6 +430,7 @@ SnapBuildBuildSnapshot(SnapBuild *builder)
 	snapshot->active_count = 0;
 	snapshot->regd_count = 0;
 	snapshot->snapXactCompletionCount = 0;
+	snapshot->lsn = lsn;
 
 	return snapshot;
 }
@@ -461,7 +467,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 	if (TransactionIdIsValid(MyProc->xmin))
 		elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
 
-	snap = SnapBuildBuildSnapshot(builder);
+	snap = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr);
 
 	/*
 	 * We know that snap->xmin is alive, enforced by the logical xmin
@@ -502,7 +508,7 @@ SnapBuildInitialSnapshotForCluster(SnapBuild *builder)
 
 	Assert(builder->state == SNAPBUILD_CONSISTENT);
 
-	snap = SnapBuildBuildSnapshot(builder);
+	snap = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr);
 	return SnapBuildMVCCFromHistoric(snap, false);
 }
 
@@ -636,7 +642,7 @@ SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
 	/* only build a new snapshot if we don't have a prebuilt one */
 	if (builder->snapshot == NULL)
 	{
-		builder->snapshot = SnapBuildBuildSnapshot(builder);
+		builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr);
 		/* increase refcount for the snapshot builder */
 		SnapBuildSnapIncRefcount(builder->snapshot);
 	}
@@ -716,7 +722,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 		/* only build a new snapshot if we don't have a prebuilt one */
 		if (builder->snapshot == NULL)
 		{
-			builder->snapshot = SnapBuildBuildSnapshot(builder);
+			builder->snapshot = SnapBuildBuildSnapshot(builder, lsn);
 			/* increase refcount for the snapshot builder */
 			SnapBuildSnapIncRefcount(builder->snapshot);
 		}
@@ -1085,7 +1091,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		if (builder->snapshot)
 			SnapBuildSnapDecRefcount(builder->snapshot);
 
-		builder->snapshot = SnapBuildBuildSnapshot(builder);
+		builder->snapshot = SnapBuildBuildSnapshot(builder, lsn);
 
 		/* we might need to execute invalidations, add snapshot */
 		if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
@@ -1910,7 +1916,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	{
 		SnapBuildSnapDecRefcount(builder->snapshot);
 	}
-	builder->snapshot = SnapBuildBuildSnapshot(builder);
+	builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr);
 	SnapBuildSnapIncRefcount(builder->snapshot);
 
 	ReorderBufferSetRestartPoint(builder->reorder, lsn);
diff --git a/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c b/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
index 43f7b34297..8e915c55fb 100644
--- a/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
+++ b/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
@@ -33,7 +33,8 @@ static void plugin_truncate(struct LogicalDecodingContext *ctx,
 							Relation relations[],
 							ReorderBufferChange *change);
 static void store_change(LogicalDecodingContext *ctx,
-						 ConcurrentChangeKind kind, HeapTuple tuple);
+						 ConcurrentChangeKind kind, HeapTuple tuple,
+						 TransactionId xid);
 
 void
 _PG_output_plugin_init(OutputPluginCallbacks *cb)
@@ -101,6 +102,7 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			  Relation relation, ReorderBufferChange *change)
 {
 	ClusterDecodingState *dstate;
+	Snapshot	snapshot;
 
 	dstate = (ClusterDecodingState *) ctx->output_writer_private;
 
@@ -108,6 +110,48 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (relation->rd_id != dstate->relid)
 		return;
 
+	/*
+	 * Catalog snapshot is fine because the table we are processing is
+	 * temporarily considered a user catalog table.
+	 */
+	snapshot = GetCatalogSnapshot(InvalidOid);
+	Assert(snapshot->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
+	Assert(!snapshot->suboverflowed);
+
+	/*
+	 * This should not happen, but if we don't have enough information to
+	 * apply a new snapshot, the consequences would be bad. Thus prefer ERROR
+	 * to Assert().
+	 */
+	if (XLogRecPtrIsInvalid(snapshot->lsn))
+		ereport(ERROR, (errmsg("snapshot has invalid LSN")));
+
+	/*
+	 * reorderbuffer.c changes the catalog snapshot as soon as it sees a new
+	 * CID or a commit record of a catalog-changing transaction.
+	 */
+	if (dstate->snapshot == NULL || snapshot->lsn != dstate->snapshot_lsn ||
+		snapshot->curcid != dstate->snapshot->curcid)
+	{
+		/* CID should not go backwards. */
+		Assert(dstate->snapshot == NULL ||
+			   snapshot->curcid >= dstate->snapshot->curcid ||
+			   change->txn->xid != dstate->last_change_xid);
+
+		/*
+		 * XXX Is it a problem that the copy is created in
+		 * TopTransactionContext?
+		 *
+		 * XXX Wouldn't it be o.k. for SnapBuildMVCCFromHistoric() to set xcnt
+		 * to 0 instead of converting xip in this case? The point is that
+		 * transactions which are still in progress from the perspective of
+		 * reorderbuffer.c could not be replayed yet, so we do not need to
+		 * examine their XIDs.
+		 */
+		dstate->snapshot = SnapBuildMVCCFromHistoric(snapshot, false);
+		dstate->snapshot_lsn = snapshot->lsn;
+	}
+
 	/* Decode entry depending on its type */
 	switch (change->action)
 	{
@@ -125,7 +169,7 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				if (newtuple == NULL)
 					elog(ERROR, "Incomplete insert info.");
 
-				store_change(ctx, CHANGE_INSERT, newtuple);
+				store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid);
 			}
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
@@ -142,9 +186,11 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					elog(ERROR, "Incomplete update info.");
 
 				if (oldtuple != NULL)
-					store_change(ctx, CHANGE_UPDATE_OLD, oldtuple);
+					store_change(ctx, CHANGE_UPDATE_OLD, oldtuple,
+								 change->txn->xid);
 
-				store_change(ctx, CHANGE_UPDATE_NEW, newtuple);
+				store_change(ctx, CHANGE_UPDATE_NEW, newtuple,
+							 change->txn->xid);
 			}
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
@@ -157,7 +203,7 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				if (oldtuple == NULL)
 					elog(ERROR, "Incomplete delete info.");
 
-				store_change(ctx, CHANGE_DELETE, oldtuple);
+				store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid);
 			}
 			break;
 		default:
@@ -191,13 +237,13 @@ plugin_truncate(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (i == nrelations)
 		return;
 
-	store_change(ctx, CHANGE_TRUNCATE, NULL);
+	store_change(ctx, CHANGE_TRUNCATE, NULL, InvalidTransactionId);
 }
 
 /* Store concurrent data change. */
 static void
 store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
-			 HeapTuple tuple)
+			 HeapTuple tuple, TransactionId xid)
 {
 	ClusterDecodingState *dstate;
 	char	   *change_raw;
@@ -265,6 +311,11 @@ store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
 	dst = dst_start + SizeOfConcurrentChange;
 	memcpy(dst, tuple->t_data, tuple->t_len);
 
+	/* Initialize the other fields. */
+	change.xid = xid;
+	change.snapshot = dstate->snapshot;
+	dstate->snapshot->active_count++;
+
 	/* The data has been copied. */
 	if (flattened)
 		pfree(tuple);
@@ -278,6 +329,9 @@ store:
 	isnull[0] = false;
 	tuplestore_putvalues(dstate->tstore, dstate->tupdesc_change,
 						 values, isnull);
+#ifdef USE_ASSERT_CHECKING
+	dstate->last_change_xid = xid;
+#endif
 
 	/* Accounting. */
 	dstate->nchanges++;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index e4a32fc391..4f8bb5677f 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -317,21 +317,24 @@ extern BulkInsertState GetBulkInsertState(void);
 extern void FreeBulkInsertState(BulkInsertState);
 extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
 
-extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
-						int options, BulkInsertState bistate);
+extern void heap_insert(Relation relation, HeapTuple tup, TransactionId xid,
+						CommandId cid, int options, BulkInsertState bistate);
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate);
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
-							 CommandId cid, Snapshot crosscheck, bool wait,
-							 struct TM_FailureData *tmfd, bool changingPart);
+							 TransactionId xid, CommandId cid,
+							 Snapshot crosscheck, bool wait,
+							 struct TM_FailureData *tmfd, bool changingPart,
+							 bool wal_logical);
 extern void heap_finish_speculative(Relation relation, ItemPointer tid);
 extern void heap_abort_speculative(Relation relation, ItemPointer tid);
 extern TM_Result heap_update(Relation relation, ItemPointer otid,
-							 HeapTuple newtup,
+							 HeapTuple newtup, TransactionId xid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, LockTupleMode *lockmode,
-							 TU_UpdateIndexes *update_indexes);
+							 TU_UpdateIndexes *update_indexes,
+							 bool wal_logical);
 extern TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
 								 CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy,
 								 bool follow_updates,
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 4591e9a918..90eea6dcd8 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -104,6 +104,8 @@
 #define XLH_DELETE_CONTAINS_OLD_KEY				(1<<2)
 #define XLH_DELETE_IS_SUPER						(1<<3)
 #define XLH_DELETE_IS_PARTITION_MOVE			(1<<4)
+/* See heap_delete() */
+#define XLH_DELETE_NO_LOGICAL					(1<<5)
 
 /* convenience macro for checking whether any form of old tuple was logged */
 #define XLH_DELETE_CONTAINS_OLD						\
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index fb64d7413a..2f9be7afaa 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -482,6 +482,8 @@ extern Size EstimateTransactionStateSpace(void);
 extern void SerializeTransactionState(Size maxsize, char *start_address);
 extern void StartParallelWorkerTransaction(char *tstatespace);
 extern void EndParallelWorkerTransaction(void);
+extern void SetClusterCurrentXids(TransactionId *xip, int xcnt);
+extern void ResetClusterCurrentXids(void);
 extern bool IsTransactionBlock(void);
 extern bool IsTransactionOrTransactionBlock(void);
 extern char TransactionBlockStatusCode(void);
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index c0f2cdabf0..e64b21c862 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -58,6 +58,14 @@ typedef struct ConcurrentChange
 	/* See the enum above. */
 	ConcurrentChangeKind kind;
 
+	/* Transaction that changes the data. */
+	TransactionId	xid;
+
+	/*
+	 * Historic catalog snapshot that was used to decode this change.
+	 */
+	Snapshot	snapshot;
+
 	/*
 	 * The actual tuple.
 	 *
@@ -89,6 +97,8 @@ typedef struct ClusterDecodingState
 	 * tuplestore does this transparently.
 	 */
 	Tuplestorestate *tstore;
+	/* XID of the last change added to tstore. */
+	TransactionId	last_change_xid	PG_USED_FOR_ASSERTS_ONLY;
 
 	/* The current number of changes in tstore. */
 	double		nchanges;
@@ -109,6 +119,14 @@ typedef struct ClusterDecodingState
 	/* Slot to retrieve data from tstore. */
 	TupleTableSlot *tsslot;
 
+	/*
+	 * Historic catalog snapshot that was used to decode the most recent
+	 * change.
+	 */
+	Snapshot	snapshot;
+	/* LSN of the record  */
+	XLogRecPtr	snapshot_lsn;
+
 	ResourceOwner resowner;
 } ClusterDecodingState;
 
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 7d3ba38f2c..01d7ca8420 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -13,6 +13,7 @@
 #ifndef SNAPSHOT_H
 #define SNAPSHOT_H
 
+#include "access/xlogdefs.h"
 #include "lib/pairingheap.h"
 
 
@@ -201,6 +202,8 @@ typedef struct SnapshotData
 	uint32		regd_count;		/* refcount on RegisteredSnapshots */
 	pairingheap_node ph_node;	/* link in the RegisteredSnapshots heap */
 
+	XLogRecPtr	lsn;			/* position in the WAL stream when taken */
+
 	/*
 	 * The transaction completion count at the time GetSnapshotData() built
 	 * this snapshot. Allows to avoid re-computing static snapshots when no
-- 
2.45.2

