From a624052d0db98cc04bc07f55200c51facb6f3191 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Mon, 31 Mar 2025 15:47:08 +0200
Subject: [PATCH 5/9] Preserve visibility information of the concurrent data
 changes.

As explained in the commit message of the preceding patch of the series, the
data changes done by applications while REPACK CONCURRENTLY is copying the
table contents to a new file are decoded from WAL and eventually also applied
to the new file. To reduce the complexity a little bit, the preceding patch
uses the current transaction (i.e. transaction opened by the REPACK command)
to execute those INSERT, UPDATE and DELETE commands.

However, REPACK is not expected to change visibility of tuples. Therefore,
this patch fixes the handling of the "concurrent data changes". It ensures
that tuples written into the new table have the same XID and command ID (CID)
as they had in the old table.

To "replay" an UPDATE or DELETE command on the new table, we need the
appropriate snapshot to find the previous tuple version in the new table. The
(historic) snapshot we used to decode the UPDATE / DELETE should (by
definition) see the state of the catalog prior to that UPDATE / DELETE. Thus
we can use the same snapshot to find the "old tuple" for UPDATE / DELETE in
the new table if:

1) REPACK CONCURRENTLY preserves visibility information of all tuples - that's
the purpose of this part of the patch series.

2) The table being REPACKed is treated as a system catalog by all transactions
that modify its data. This ensures that reorderbuffer.c generates a new
snapshot for each data change in the table.

We ensure 2) by maintaining a shared hashtable of tables being REPACKed
CONCURRENTLY and by adjusting the RelationIsAccessibleInLogicalDecoding()
macro so it checks this hashtable. (The corresponding flag is also added to
the relation cache, so that the shared hashtable does not have to be accessed
too often.) It's essential that after adding an entry to the hashtable we wait
for completion of all the transactions that might have started to modify our
table before our entry has was added. We achieve that by upgrading our lock on
the table to ShareLock temporarily: as soon as we acquire it, no DML command
should be running on the table. (This lock upgrade shouldn't cause any
deadlock because we care to not hold a lock on other objects at the same
time.)

As long as we preserve the tuple visibility information (which includes XID),
it's important to avoid logical decoding of the WAL generated by DMLs on the
new table: the logical decoding subsystem probably does not expect that the
incoming WAL records contain XIDs of an already decoded transactions. (And of
course, repeated decoding would be wasted effort.)
---
 src/backend/access/common/toast_internals.c   |   3 +-
 src/backend/access/heap/heapam.c              |  82 ++--
 src/backend/access/heap/heapam_handler.c      |  14 +-
 src/backend/access/transam/xact.c             |  52 +++
 src/backend/commands/cluster.c                | 406 +++++++++++++++++-
 src/backend/replication/logical/decode.c      |  77 +++-
 src/backend/replication/logical/snapbuild.c   |  22 +-
 .../pgoutput_repack/pgoutput_repack.c         |  68 ++-
 src/backend/storage/ipc/ipci.c                |   2 +
 src/backend/utils/cache/inval.c               |  21 +
 src/backend/utils/cache/relcache.c            |   4 +
 src/include/access/heapam.h                   |  15 +-
 src/include/access/heapam_xlog.h              |   2 +
 src/include/access/xact.h                     |   2 +
 src/include/commands/cluster.h                |  22 +
 src/include/utils/inval.h                     |   2 +
 src/include/utils/rel.h                       |   7 +-
 src/include/utils/snapshot.h                  |   3 +
 src/tools/pgindent/typedefs.list              |   1 +
 19 files changed, 722 insertions(+), 83 deletions(-)

diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c
index 7d8be8346ce..75d889ec72c 100644
--- a/src/backend/access/common/toast_internals.c
+++ b/src/backend/access/common/toast_internals.c
@@ -320,7 +320,8 @@ toast_save_datum(Relation rel, Datum value,
 		memcpy(VARDATA(&chunk_data), data_p, chunk_size);
 		toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
 
-		heap_insert(toastrel, toasttup, mycid, options, NULL);
+		heap_insert(toastrel, toasttup, GetCurrentTransactionId(), mycid,
+					options, NULL);
 
 		/*
 		 * Create the index entry.  We cheat a little here by not using
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 6e433db039e..c5baff18bc2 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -60,7 +60,8 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
 								  Buffer newbuf, HeapTuple oldtup,
 								  HeapTuple newtup, HeapTuple old_key_tuple,
-								  bool all_visible_cleared, bool new_all_visible_cleared);
+								  bool all_visible_cleared, bool new_all_visible_cleared,
+								  bool wal_logical);
 #ifdef USE_ASSERT_CHECKING
 static void check_lock_if_inplace_updateable_rel(Relation relation,
 												 ItemPointer otid,
@@ -2084,7 +2085,7 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
 /*
  *	heap_insert		- insert tuple into a heap
  *
- * The new tuple is stamped with current transaction ID and the specified
+ * The new tuple is stamped with specified transaction ID and the specified
  * command ID.
  *
  * See table_tuple_insert for comments about most of the input flags, except
@@ -2100,15 +2101,16 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
  * reflected into *tup.
  */
 void
-heap_insert(Relation relation, HeapTuple tup, CommandId cid,
-			int options, BulkInsertState bistate)
+heap_insert(Relation relation, HeapTuple tup, TransactionId xid,
+			CommandId cid, int options, BulkInsertState bistate)
 {
-	TransactionId xid = GetCurrentTransactionId();
 	HeapTuple	heaptup;
 	Buffer		buffer;
 	Buffer		vmbuffer = InvalidBuffer;
 	bool		all_visible_cleared = false;
 
+	Assert(TransactionIdIsValid(xid));
+
 	/* Cheap, simplistic check that the tuple matches the rel's rowtype. */
 	Assert(HeapTupleHeaderGetNatts(tup->t_data) <=
 		   RelationGetNumberOfAttributes(relation));
@@ -2188,8 +2190,15 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 		/*
 		 * If this is a catalog, we need to transmit combo CIDs to properly
 		 * decode, so log that as well.
+		 *
+		 * HEAP_INSERT_NO_LOGICAL should be set when applying data changes
+		 * done by other transactions during REPACK CONCURRENTLY. In such a
+		 * case, the insertion should not be decoded at all - see
+		 * heap_decode(). (It's also set by raw_heap_insert() for TOAST, but
+		 * TOAST does not pass this test anyway.)
 		 */
-		if (RelationIsAccessibleInLogicalDecoding(relation))
+		if ((options & HEAP_INSERT_NO_LOGICAL) == 0 &&
+			RelationIsAccessibleInLogicalDecoding(relation))
 			log_heap_new_cid(relation, heaptup);
 
 		/*
@@ -2733,7 +2742,8 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 void
 simple_heap_insert(Relation relation, HeapTuple tup)
 {
-	heap_insert(relation, tup, GetCurrentCommandId(true), 0, NULL);
+	heap_insert(relation, tup, GetCurrentTransactionId(),
+				GetCurrentCommandId(true), 0, NULL);
 }
 
 /*
@@ -2790,11 +2800,11 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask)
  */
 TM_Result
 heap_delete(Relation relation, ItemPointer tid,
-			CommandId cid, Snapshot crosscheck, bool wait,
-			TM_FailureData *tmfd, bool changingPart)
+			TransactionId xid, CommandId cid, Snapshot crosscheck, bool wait,
+			TM_FailureData *tmfd, bool changingPart,
+			bool wal_logical)
 {
 	TM_Result	result;
-	TransactionId xid = GetCurrentTransactionId();
 	ItemId		lp;
 	HeapTupleData tp;
 	Page		page;
@@ -2811,6 +2821,7 @@ heap_delete(Relation relation, ItemPointer tid,
 	bool		old_key_copied = false;
 
 	Assert(ItemPointerIsValid(tid));
+	Assert(TransactionIdIsValid(xid));
 
 	/*
 	 * Forbid this during a parallel operation, lest it allocate a combo CID.
@@ -3036,7 +3047,8 @@ l1:
 	 * Compute replica identity tuple before entering the critical section so
 	 * we don't PANIC upon a memory allocation failure.
 	 */
-	old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied);
+	old_key_tuple = wal_logical ?
+		ExtractReplicaIdentity(relation, &tp, true, &old_key_copied) : NULL;
 
 	/*
 	 * If this is the first possibly-multixact-able operation in the current
@@ -3104,8 +3116,12 @@ l1:
 		/*
 		 * For logical decode we need combo CIDs to properly decode the
 		 * catalog
+		 *
+		 * Like in heap_insert(), visibility is unchanged when called from
+		 * VACUUM FULL / CLUSTER.
 		 */
-		if (RelationIsAccessibleInLogicalDecoding(relation))
+		if (wal_logical &&
+			RelationIsAccessibleInLogicalDecoding(relation))
 			log_heap_new_cid(relation, &tp);
 
 		xlrec.flags = 0;
@@ -3126,6 +3142,15 @@ l1:
 				xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY;
 		}
 
+		/*
+		 * Unlike UPDATE, DELETE is decoded even if there is no old key, so it
+		 * does not help to clear both XLH_DELETE_CONTAINS_OLD_TUPLE and
+		 * XLH_DELETE_CONTAINS_OLD_KEY. Thus we need an extra flag. TODO
+		 * Consider not decoding tuples w/o the old tuple/key instead.
+		 */
+		if (!wal_logical)
+			xlrec.flags |= XLH_DELETE_NO_LOGICAL;
+
 		XLogBeginInsert();
 		XLogRegisterData(&xlrec, SizeOfHeapDelete);
 
@@ -3215,10 +3240,11 @@ simple_heap_delete(Relation relation, ItemPointer tid)
 	TM_Result	result;
 	TM_FailureData tmfd;
 
-	result = heap_delete(relation, tid,
+	result = heap_delete(relation, tid, GetCurrentTransactionId(),
 						 GetCurrentCommandId(true), InvalidSnapshot,
 						 true /* wait for commit */ ,
-						 &tmfd, false /* changingPart */ );
+						 &tmfd, false,	/* changingPart */
+						 true /* wal_logical */ );
 	switch (result)
 	{
 		case TM_SelfModified:
@@ -3257,12 +3283,11 @@ simple_heap_delete(Relation relation, ItemPointer tid)
  */
 TM_Result
 heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
-			CommandId cid, Snapshot crosscheck, bool wait,
-			TM_FailureData *tmfd, LockTupleMode *lockmode,
-			TU_UpdateIndexes *update_indexes)
+			TransactionId xid, CommandId cid, Snapshot crosscheck,
+			bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode,
+			TU_UpdateIndexes *update_indexes, bool wal_logical)
 {
 	TM_Result	result;
-	TransactionId xid = GetCurrentTransactionId();
 	Bitmapset  *hot_attrs;
 	Bitmapset  *sum_attrs;
 	Bitmapset  *key_attrs;
@@ -3302,6 +3327,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 				infomask2_new_tuple;
 
 	Assert(ItemPointerIsValid(otid));
+	Assert(TransactionIdIsValid(xid));
 
 	/* Cheap, simplistic check that the tuple matches the rel's rowtype. */
 	Assert(HeapTupleHeaderGetNatts(newtup->t_data) <=
@@ -4139,8 +4165,12 @@ l2:
 		/*
 		 * For logical decoding we need combo CIDs to properly decode the
 		 * catalog.
+		 *
+		 * Like in heap_insert(), visibility is unchanged when called from
+		 * VACUUM FULL / CLUSTER.
 		 */
-		if (RelationIsAccessibleInLogicalDecoding(relation))
+		if (wal_logical &&
+			RelationIsAccessibleInLogicalDecoding(relation))
 		{
 			log_heap_new_cid(relation, &oldtup);
 			log_heap_new_cid(relation, heaptup);
@@ -4150,7 +4180,8 @@ l2:
 								 newbuf, &oldtup, heaptup,
 								 old_key_tuple,
 								 all_visible_cleared,
-								 all_visible_cleared_new);
+								 all_visible_cleared_new,
+								 wal_logical);
 		if (newbuf != buffer)
 		{
 			PageSetLSN(BufferGetPage(newbuf), recptr);
@@ -4505,10 +4536,10 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup,
 	TM_FailureData tmfd;
 	LockTupleMode lockmode;
 
-	result = heap_update(relation, otid, tup,
+	result = heap_update(relation, otid, tup, GetCurrentTransactionId(),
 						 GetCurrentCommandId(true), InvalidSnapshot,
 						 true /* wait for commit */ ,
-						 &tmfd, &lockmode, update_indexes);
+						 &tmfd, &lockmode, update_indexes, true);
 	switch (result)
 	{
 		case TM_SelfModified:
@@ -8841,7 +8872,8 @@ static XLogRecPtr
 log_heap_update(Relation reln, Buffer oldbuf,
 				Buffer newbuf, HeapTuple oldtup, HeapTuple newtup,
 				HeapTuple old_key_tuple,
-				bool all_visible_cleared, bool new_all_visible_cleared)
+				bool all_visible_cleared, bool new_all_visible_cleared,
+				bool wal_logical)
 {
 	xl_heap_update xlrec;
 	xl_heap_header xlhdr;
@@ -8852,10 +8884,12 @@ log_heap_update(Relation reln, Buffer oldbuf,
 				suffixlen = 0;
 	XLogRecPtr	recptr;
 	Page		page = BufferGetPage(newbuf);
-	bool		need_tuple_data = RelationIsLogicallyLogged(reln);
+	bool		need_tuple_data;
 	bool		init;
 	int			bufflags;
 
+	need_tuple_data = RelationIsLogicallyLogged(reln) && wal_logical;
+
 	/* Caller should not call me on a non-WAL-logged relation */
 	Assert(RelationNeedsWAL(reln));
 
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 371afa6ad59..ea1d6f299b3 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -256,7 +256,8 @@ heapam_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
 	tuple->t_tableOid = slot->tts_tableOid;
 
 	/* Perform the insertion, and copy the resulting ItemPointer */
-	heap_insert(relation, tuple, cid, options, bistate);
+	heap_insert(relation, tuple, GetCurrentTransactionId(), cid, options,
+				bistate);
 	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
 	if (shouldFree)
@@ -279,7 +280,8 @@ heapam_tuple_insert_speculative(Relation relation, TupleTableSlot *slot,
 	options |= HEAP_INSERT_SPECULATIVE;
 
 	/* Perform the insertion, and copy the resulting ItemPointer */
-	heap_insert(relation, tuple, cid, options, bistate);
+	heap_insert(relation, tuple, GetCurrentTransactionId(), cid, options,
+				bistate);
 	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
 	if (shouldFree)
@@ -313,7 +315,8 @@ heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
 	 * the storage itself is cleaning the dead tuples by itself, it is the
 	 * time to call the index tuple deletion also.
 	 */
-	return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart);
+	return heap_delete(relation, tid, GetCurrentTransactionId(), cid,
+					   crosscheck, wait, tmfd, changingPart, true);
 }
 
 
@@ -331,8 +334,9 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 	slot->tts_tableOid = RelationGetRelid(relation);
 	tuple->t_tableOid = slot->tts_tableOid;
 
-	result = heap_update(relation, otid, tuple, cid, crosscheck, wait,
-						 tmfd, lockmode, update_indexes);
+	result = heap_update(relation, otid, tuple, GetCurrentTransactionId(),
+						 cid, crosscheck, wait,
+						 tmfd, lockmode, update_indexes, true);
 	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
 	/*
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 23f2de587a1..3db4cac030e 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -126,6 +126,18 @@ static FullTransactionId XactTopFullTransactionId = {InvalidTransactionId};
 static int	nParallelCurrentXids = 0;
 static TransactionId *ParallelCurrentXids;
 
+/*
+ * Another case that requires TransactionIdIsCurrentTransactionId() to behave
+ * specially is when REPACK CONCURRENTLY is processing data changes made in
+ * the old storage of a table by other transactions. When applying the changes
+ * to the new storage, the backend executing the CLUSTER command needs to act
+ * on behalf on those other transactions. The transactions responsible for the
+ * changes in the old storage are stored in this array, sorted by
+ * xidComparator.
+ */
+static int	nRepackCurrentXids = 0;
+static TransactionId *RepackCurrentXids = NULL;
+
 /*
  * Miscellaneous flag bits to record events which occur on the top level
  * transaction. These flags are only persisted in MyXactFlags and are intended
@@ -973,6 +985,8 @@ TransactionIdIsCurrentTransactionId(TransactionId xid)
 		int			low,
 					high;
 
+		Assert(nRepackCurrentXids == 0);
+
 		low = 0;
 		high = nParallelCurrentXids - 1;
 		while (low <= high)
@@ -992,6 +1006,21 @@ TransactionIdIsCurrentTransactionId(TransactionId xid)
 		return false;
 	}
 
+	/*
+	 * When executing CLUSTER CONCURRENTLY, the array of current transactions
+	 * is given.
+	 */
+	if (nRepackCurrentXids > 0)
+	{
+		Assert(nParallelCurrentXids == 0);
+
+		return bsearch(&xid,
+					   RepackCurrentXids,
+					   nRepackCurrentXids,
+					   sizeof(TransactionId),
+					   xidComparator) != NULL;
+	}
+
 	/*
 	 * We will return true for the Xid of the current subtransaction, any of
 	 * its subcommitted children, any of its parents, or any of their
@@ -5649,6 +5678,29 @@ EndParallelWorkerTransaction(void)
 	CurrentTransactionState->blockState = TBLOCK_DEFAULT;
 }
 
+/*
+ * SetRepackCurrentXids
+ *		Set the XID array that TransactionIdIsCurrentTransactionId() should
+ *		use.
+ */
+void
+SetRepackCurrentXids(TransactionId *xip, int xcnt)
+{
+	RepackCurrentXids = xip;
+	nRepackCurrentXids = xcnt;
+}
+
+/*
+ * ResetRepackCurrentXids
+ *		Undo the effect of SetRepackCurrentXids().
+ */
+void
+ResetRepackCurrentXids(void)
+{
+	RepackCurrentXids = NULL;
+	nRepackCurrentXids = 0;
+}
+
 /*
  * ShowTransactionState
  *		Debug support
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index b1aa1e8d820..78380c882c0 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -90,6 +90,11 @@ typedef struct
  * The following definitions are used for concurrent processing.
  */
 
+/*
+ * OID of the table being repacked by this backend.
+ */
+static Oid	repacked_rel = InvalidOid;
+
 /*
  * The locators are used to avoid logical decoding of data that we do not need
  * for our table.
@@ -133,8 +138,10 @@ static List *get_tables_to_cluster_partitioned(MemoryContext cluster_context,
 											   ClusterCommand cmd);
 static bool cluster_is_permitted_for_relation(Oid relid, Oid userid,
 											  ClusterCommand cmd);
-static void begin_concurrent_repack(Relation rel);
-static void end_concurrent_repack(void);
+static void begin_concurrent_repack(Relation rel, Relation *index_p,
+									bool *entered_p);
+static void end_concurrent_repack(bool error);
+static void cluster_before_shmem_exit_callback(int code, Datum arg);
 static LogicalDecodingContext *setup_logical_decoding(Oid relid,
 													  const char *slotname,
 													  TupleDesc tupdesc);
@@ -154,6 +161,7 @@ static void apply_concurrent_delete(Relation rel, HeapTuple tup_target,
 									ConcurrentChange *change);
 static HeapTuple find_target_tuple(Relation rel, ScanKey key, int nkeys,
 								   HeapTuple tup_key,
+								   Snapshot snapshot,
 								   IndexInsertState *iistate,
 								   TupleTableSlot *ident_slot,
 								   IndexScanDesc *scan_p);
@@ -379,6 +387,8 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params,
 	const char *cmd_str = CLUSTER_COMMAND_STR(cmd);
 	bool		concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
 	LOCKMODE	lmode;
+	bool		entered,
+				success;
 
 	/*
 	 * Check that the correct lock is held. The lock mode is
@@ -535,23 +545,30 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params,
 		TransferPredicateLocksToHeapRelation(OldHeap);
 
 	/* rebuild_relation does all the dirty work */
+	entered = false;
+	success = false;
 	PG_TRY();
 	{
 		/*
-		 * For concurrent processing, make sure that our logical decoding
-		 * ignores data changes of other tables than the one we are
-		 * processing.
+		 * For concurrent processing, make sure that
+		 *
+		 * 1) our logical decoding ignores data changes of other tables than
+		 * the one we are processing.
+		 *
+		 * 2) other transactions treat this table as if it was a system / user
+		 * catalog, and WAL the relevant additional information.
 		 */
 		if (concurrent)
-			begin_concurrent_repack(OldHeap);
+			begin_concurrent_repack(OldHeap, &index, &entered);
 
 		rebuild_relation(OldHeap, index, verbose, cmd, concurrent,
 						 save_userid);
+		success = true;
 	}
 	PG_FINALLY();
 	{
-		if (concurrent)
-			end_concurrent_repack();
+		if (concurrent && entered)
+			end_concurrent_repack(!success);
 	}
 	PG_END_TRY();
 
@@ -2161,6 +2178,47 @@ cluster_is_permitted_for_relation(Oid relid, Oid userid, ClusterCommand cmd)
 
 #define REPL_PLUGIN_NAME	"pgoutput_repack"
 
+/*
+ * Each relation being processed by REPACK CONCURRENTLY must be in the
+ * repackedRels hashtable.
+ */
+typedef struct RepackedRel
+{
+	Oid			relid;
+	Oid			dbid;
+} RepackedRel;
+
+static HTAB *RepackedRelsHash = NULL;
+
+/*
+ * Maximum number of entries in the hashtable.
+ *
+ * A replication slot is needed for the processing, so use this GUC to
+ * allocate memory for the hashtable.
+ */
+#define	MAX_REPACKED_RELS	(max_replication_slots)
+
+Size
+RepackShmemSize(void)
+{
+	return hash_estimate_size(MAX_REPACKED_RELS, sizeof(RepackedRel));
+}
+
+void
+RepackShmemInit(void)
+{
+	HASHCTL		info;
+
+	info.keysize = sizeof(RepackedRel);
+	info.entrysize = info.keysize;
+
+	RepackedRelsHash = ShmemInitHash("Repacked Relations",
+									 MAX_REPACKED_RELS,
+									 MAX_REPACKED_RELS,
+									 &info,
+									 HASH_ELEM | HASH_BLOBS);
+}
+
 /*
  * Call this function before REPACK CONCURRENTLY starts to setup logical
  * decoding. It makes sure that other users of the table put enough
@@ -2175,11 +2233,120 @@ cluster_is_permitted_for_relation(Oid relid, Oid userid, ClusterCommand cmd)
  *
  * Note that TOAST table needs no attention here as it's not scanned using
  * historic snapshot.
+ *
+ * 'index_p' is in/out argument because the function unlocks the index
+ * temporarily.
+ *
+ * 'enter_p' receives a bool value telling whether relation OID was entered
+ * into RepackedRelsHash or not.
  */
 static void
-begin_concurrent_repack(Relation rel)
+begin_concurrent_repack(Relation rel, Relation *index_p, bool *entered_p)
 {
-	Oid			toastrelid;
+	Oid			relid,
+				toastrelid;
+	Relation	index = NULL;
+	Oid			indexid = InvalidOid;
+	RepackedRel key,
+			   *entry;
+	bool		found;
+	static bool before_shmem_exit_callback_setup = false;
+
+	relid = RelationGetRelid(rel);
+	index = index_p ? *index_p : NULL;
+
+	/*
+	 * Make sure that we do not leave an entry in RepackedRelsHash if exiting
+	 * due to FATAL.
+	 */
+	if (!before_shmem_exit_callback_setup)
+	{
+		before_shmem_exit(cluster_before_shmem_exit_callback, 0);
+		before_shmem_exit_callback_setup = true;
+	}
+
+	memset(&key, 0, sizeof(key));
+	key.relid = relid;
+	key.dbid = MyDatabaseId;
+
+	*entered_p = false;
+	LWLockAcquire(RepackedRelsLock, LW_EXCLUSIVE);
+	entry = (RepackedRel *)
+		hash_search(RepackedRelsHash, &key, HASH_ENTER_NULL, &found);
+	if (found)
+	{
+		/*
+		 * Since REPACK CONCURRENTLY takes ShareRowExclusiveLock, a conflict
+		 * should occur much earlier. However that lock may be released
+		 * temporarily, see below.  Anyway, we should complain whatever the
+		 * reason of the conflict might be.
+		 */
+		ereport(ERROR,
+				(errmsg("relation \"%s\" is already being processed by REPACK CONCURRENTLY",
+						RelationGetRelationName(rel))));
+	}
+	if (entry == NULL)
+		ereport(ERROR,
+				(errmsg("too many requests for REPACK CONCURRENTLY at a time")),
+				(errhint("Please consider increasing the \"max_replication_slots\" configuration parameter.")));
+
+	/*
+	 * Even if anything fails below, the caller has to do cleanup in the
+	 * shared memory.
+	 */
+	*entered_p = true;
+
+	/*
+	 * Enable the callback to remove the entry in case of exit. We should not
+	 * do this earlier, otherwise an attempt to insert already existing entry
+	 * could make us remove that entry (inserted by another backend) during
+	 * ERROR handling.
+	 */
+	Assert(!OidIsValid(repacked_rel));
+	repacked_rel = relid;
+
+	LWLockRelease(RepackedRelsLock);
+
+	/*
+	 * Make sure that other backends are aware of the new hash entry as soon
+	 * as they open our table.
+	 */
+	CacheInvalidateRelcacheImmediate(relid);
+
+	/*
+	 * Also make sure that the existing users of the table update their
+	 * relcache entry as soon as they try to run DML commands on it.
+	 *
+	 * ShareLock is the weakest lock that conflicts with DMLs. If any backend
+	 * has a lower lock, we assume it'll accept our invalidation message when
+	 * it changes the lock mode.
+	 *
+	 * Before upgrading the lock on the relation, close the index temporarily
+	 * to avoid a deadlock if another backend running DML already has its lock
+	 * (ShareLock) on the table and waits for the lock on the index.
+	 */
+	if (index)
+	{
+		index_close(index, ShareUpdateExclusiveLock);
+		indexid = RelationGetRelid(index);
+	}
+	LockRelationOid(relid, ShareLock);
+	UnlockRelationOid(relid, ShareLock);
+	if (OidIsValid(indexid))
+	{
+		/*
+		 * Re-open the index and check that it hasn't changed while unlocked.
+		 */
+		check_index_is_clusterable(rel, indexid, ShareUpdateExclusiveLock,
+								   CLUSTER_COMMAND_REPACK);
+
+		/*
+		 * Return the new relcache entry to the caller. (It's been locked by
+		 * the call above.)
+		 */
+		index = index_open(indexid, NoLock);
+		*index_p = index;
+	}
 
 	/* Avoid logical decoding of other relations by this backend. */
 	repacked_rel_locator = rel->rd_locator;
@@ -2197,15 +2364,122 @@ begin_concurrent_repack(Relation rel)
 
 /*
  * Call this when done with REPACK CONCURRENTLY.
+ *
+ * 'error' tells whether the function is being called in order to handle
+ * error.
  */
 static void
-end_concurrent_repack(void)
+end_concurrent_repack(bool error)
 {
+	RepackedRel key;
+	RepackedRel *entry = NULL;
+	Oid			relid = repacked_rel;
+
+	/* Remove the relation from the hash if we managed to insert one. */
+	if (OidIsValid(repacked_rel))
+	{
+		memset(&key, 0, sizeof(key));
+		key.relid = repacked_rel;
+		key.dbid = MyDatabaseId;
+		LWLockAcquire(RepackedRelsLock, LW_EXCLUSIVE);
+		entry = hash_search(RepackedRelsHash, &key, HASH_REMOVE, NULL);
+		LWLockRelease(RepackedRelsLock);
+
+		/*
+		 * Make others refresh their information whether they should still
+		 * treat the table as catalog from the perspective of writing WAL.
+		 *
+		 * XXX Unlike entering the entry into the hashtable, we do not bother
+		 * with locking and unlocking the table here:
+		 *
+		 * 1) On normal completion (and sometimes even on ERROR), the caller
+		 * is already holding AccessExclusiveLock on the table, so there
+		 * should be no relcache reference unaware of this change.
+		 *
+		 * 2) In the other cases, the worst scenario is that the other
+		 * backends will write unnecessary information to WAL until they close
+		 * the relation.
+		 *
+		 * Should we use ShareLock mode to fix 2) at least for the non-FATAL
+		 * errors? (Our before_shmem_exit callback is in charge of FATAL, and
+		 * that probably should not try to acquire any lock.)
+		 */
+		CacheInvalidateRelcacheImmediate(repacked_rel);
+
+		/*
+		 * By clearing this variable we also disable
+		 * cluster_before_shmem_exit_callback().
+		 */
+		repacked_rel = InvalidOid;
+	}
+
 	/*
 	 * Restore normal function of (future) logical decoding for this backend.
 	 */
 	repacked_rel_locator.relNumber = InvalidOid;
 	repacked_rel_toast_locator.relNumber = InvalidOid;
+
+	/*
+	 * On normal completion (!error), we should not really fail to remove the
+	 * entry. But if it wasn't there for any reason, raise ERROR to make sure
+	 * the transaction is aborted: if other transactions, while changing the
+	 * contents of the relation, didn't know that REPACK CONCURRENTLY was in
+	 * progress, they could have missed to WAL enough information, and thus we
+	 * could have produced an inconsistent table contents.
+	 *
+	 * On the other hand, if we are already handling an error, there's no
+	 * reason to worry about inconsistent contents of the new storage because
+	 * the transaction is going to be rolled back anyway. Furthermore, by
+	 * raising ERROR here we'd shadow the original error.
+	 */
+	if (!error)
+	{
+		char	   *relname;
+
+		if (OidIsValid(relid) && entry == NULL)
+		{
+			relname = get_rel_name(relid);
+			if (!relname)
+				ereport(ERROR,
+						(errmsg("cache lookup failed for relation %u",
+								relid)));
+
+			ereport(ERROR,
+					(errmsg("relation \"%s\" not found among repacked relations",
+							relname)));
+		}
+	}
+}
+
+/*
+ * A wrapper to call end_concurrent_repack() as a before_shmem_exit callback.
+ */
+static void
+cluster_before_shmem_exit_callback(int code, Datum arg)
+{
+	if (OidIsValid(repacked_rel))
+		end_concurrent_repack(true);
+}
+
+/*
+ * Check if relation is currently being processed by REPACK CONCURRENTLY.
+ */
+bool
+is_concurrent_repack_in_progress(Oid relid)
+{
+	RepackedRel key,
+			   *entry;
+
+	memset(&key, 0, sizeof(key));
+	key.relid = relid;
+	key.dbid = MyDatabaseId;
+
+	LWLockAcquire(RepackedRelsLock, LW_SHARED);
+	entry = (RepackedRel *)
+		hash_search(RepackedRelsHash, &key, HASH_FIND, NULL);
+	LWLockRelease(RepackedRelsLock);
+
+	return entry != NULL;
 }
 
 /*
@@ -2267,6 +2541,9 @@ setup_logical_decoding(Oid relid, const char *slotname, TupleDesc tupdesc)
 	dstate->relid = relid;
 	dstate->tstore = tuplestore_begin_heap(false, false,
 										   maintenance_work_mem);
+#ifdef USE_ASSERT_CHECKING
+	dstate->last_change_xid = InvalidTransactionId;
+#endif
 
 	dstate->tupdesc = tupdesc;
 
@@ -2414,6 +2691,7 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel,
 		char	   *change_raw,
 				   *src;
 		ConcurrentChange change;
+		Snapshot	snapshot;
 		bool		isnull[1];
 		Datum		values[1];
 
@@ -2482,8 +2760,30 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel,
 
 			/*
 			 * Find the tuple to be updated or deleted.
+			 *
+			 * As the table being REPACKed concurrently is treated like a
+			 * catalog, new CID is WAL-logged and decoded. And since we use
+			 * the same XID that the original DMLs did, the snapshot used for
+			 * the logical decoding (by now converted to a non-historic MVCC
+			 * snapshot) should see the tuples inserted previously into the
+			 * new heap and/or updated there.
+			 */
+			snapshot = change.snapshot;
+
+			/*
+			 * Set what should be considered current transaction (and
+			 * subtransactions) during visibility check.
+			 *
+			 * Note that this snapshot was created from a historic snapshot
+			 * using SnapBuildMVCCFromHistoric(), which does not touch
+			 * 'subxip'. Thus, unlike in a regular MVCC snapshot, the array
+			 * only contains the transactions whose data changes we are
+			 * applying, and its subtransactions. That's exactly what we need
+			 * to check if particular xact is a "current transaction:".
 			 */
-			tup_exist = find_target_tuple(rel, key, nkeys, tup_key,
+			SetRepackCurrentXids(snapshot->subxip, snapshot->subxcnt);
+
+			tup_exist = find_target_tuple(rel, key, nkeys, tup_key, snapshot,
 										  iistate, ident_slot, &ind_scan);
 			if (tup_exist == NULL)
 				elog(ERROR, "Failed to find target tuple");
@@ -2494,6 +2794,8 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel,
 			else
 				apply_concurrent_delete(rel, tup_exist, &change);
 
+			ResetRepackCurrentXids();
+
 			if (tup_old != NULL)
 			{
 				pfree(tup_old);
@@ -2506,11 +2808,14 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel,
 		else
 			elog(ERROR, "Unrecognized kind of change: %d", change.kind);
 
-		/* If there's any change, make it visible to the next iteration. */
-		if (change.kind != CHANGE_UPDATE_OLD)
+		/* Free the snapshot if this is the last change that needed it. */
+		Assert(change.snapshot->active_count > 0);
+		change.snapshot->active_count--;
+		if (change.snapshot->active_count == 0)
 		{
-			CommandCounterIncrement();
-			UpdateActiveSnapshotCommandId();
+			if (change.snapshot == dstate->snapshot)
+				dstate->snapshot = NULL;
+			FreeSnapshot(change.snapshot);
 		}
 
 		/* TTSOpsMinimalTuple has .get_heap_tuple==NULL. */
@@ -2530,10 +2835,30 @@ static void
 apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 						IndexInsertState *iistate, TupleTableSlot *index_slot)
 {
+	Snapshot	snapshot = change->snapshot;
 	List	   *recheck;
 
+	/*
+	 * For INSERT, the visibility information is not important, but we use the
+	 * snapshot to get CID. Index functions might need the whole snapshot
+	 * anyway.
+	 */
+	SetRepackCurrentXids(snapshot->subxip, snapshot->subxcnt);
+
+	/*
+	 * Write the tuple into the new heap.
+	 *
+	 * The snapshot is the one we used to decode the insert (though converted
+	 * to "non-historic" MVCC snapshot), i.e. the snapshot's curcid is the
+	 * tuple CID incremented by one (due to the "new CID" WAL record that got
+	 * written along with the INSERT record). Thus if we want to use the
+	 * original CID, we need to subtract 1 from curcid.
+	 */
+	Assert(snapshot->curcid != InvalidCommandId &&
+		   snapshot->curcid > FirstCommandId);
 
-	simple_heap_insert(rel, tup);
+	heap_insert(rel, tup, change->xid, snapshot->curcid - 1,
+				HEAP_INSERT_NO_LOGICAL, NULL);
 
 	/*
 	 * Update indexes.
@@ -2541,6 +2866,7 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 	 * In case functions in the index need the active snapshot and caller
 	 * hasn't set one.
 	 */
+	PushActiveSnapshot(snapshot);
 	ExecStoreHeapTuple(tup, index_slot, false);
 	recheck = ExecInsertIndexTuples(iistate->rri,
 									index_slot,
@@ -2551,6 +2877,8 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 									NIL,	/* arbiterIndexes */
 									false	/* onlySummarizing */
 		);
+	PopActiveSnapshot();
+	ResetRepackCurrentXids();
 
 	/*
 	 * If recheck is required, it must have been preformed on the source
@@ -2568,18 +2896,36 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 						TupleTableSlot *index_slot)
 {
 	List	   *recheck;
+	LockTupleMode lockmode;
 	TU_UpdateIndexes update_indexes;
+	TM_Result	res;
+	Snapshot	snapshot = change->snapshot;
+	TM_FailureData tmfd;
 
 	/*
 	 * Write the new tuple into the new heap. ('tup' gets the TID assigned
 	 * here.)
+	 *
+	 * Regarding CID, see the comment in apply_concurrent_insert().
 	 */
-	simple_heap_update(rel, &tup_target->t_self, tup, &update_indexes);
+	Assert(snapshot->curcid != InvalidCommandId &&
+		   snapshot->curcid > FirstCommandId);
+
+	res = heap_update(rel, &tup_target->t_self, tup,
+					  change->xid, snapshot->curcid - 1,
+					  InvalidSnapshot,
+					  false,	/* no wait - only we are doing changes */
+					  &tmfd, &lockmode, &update_indexes,
+	/* wal_logical */
+					  false);
+	if (res != TM_Ok)
+		ereport(ERROR, (errmsg("failed to apply concurrent UPDATE")));
 
 	ExecStoreHeapTuple(tup, index_slot, false);
 
 	if (update_indexes != TU_None)
 	{
+		PushActiveSnapshot(snapshot);
 		recheck = ExecInsertIndexTuples(iistate->rri,
 										index_slot,
 										iistate->estate,
@@ -2589,6 +2935,7 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 										NIL,	/* arbiterIndexes */
 		/* onlySummarizing */
 										update_indexes == TU_Summarizing);
+		PopActiveSnapshot();
 		list_free(recheck);
 	}
 
@@ -2599,7 +2946,22 @@ static void
 apply_concurrent_delete(Relation rel, HeapTuple tup_target,
 						ConcurrentChange *change)
 {
-	simple_heap_delete(rel, &tup_target->t_self);
+	TM_Result	res;
+	TM_FailureData tmfd;
+	Snapshot	snapshot = change->snapshot;
+
+	/* Regarding CID, see the comment in apply_concurrent_insert(). */
+	Assert(snapshot->curcid != InvalidCommandId &&
+		   snapshot->curcid > FirstCommandId);
+
+	res = heap_delete(rel, &tup_target->t_self, change->xid,
+					  snapshot->curcid - 1, InvalidSnapshot, false,
+					  &tmfd, false,
+	/* wal_logical */
+					  false);
+
+	if (res != TM_Ok)
+		ereport(ERROR, (errmsg("failed to apply concurrent DELETE")));
 
 	pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_DELETED, 1);
 }
@@ -2617,7 +2979,7 @@ apply_concurrent_delete(Relation rel, HeapTuple tup_target,
  */
 static HeapTuple
 find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key,
-				  IndexInsertState *iistate,
+				  Snapshot snapshot, IndexInsertState *iistate,
 				  TupleTableSlot *ident_slot, IndexScanDesc *scan_p)
 {
 	IndexScanDesc scan;
@@ -2626,7 +2988,7 @@ find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key,
 	HeapTuple	result = NULL;
 
 	/* XXX no instrumentation for now */
-	scan = index_beginscan(rel, iistate->ident_index, GetActiveSnapshot(),
+	scan = index_beginscan(rel, iistate->ident_index, snapshot,
 						   NULL, nkeys, 0);
 	*scan_p = scan;
 	index_rescan(scan, key, nkeys, NULL, 0);
@@ -2698,6 +3060,8 @@ process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 	}
 	PG_FINALLY();
 	{
+		ResetRepackCurrentXids();
+
 		if (rel_src)
 			rel_dst->rd_toastoid = InvalidOid;
 	}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 00f7bbc5f59..25bb92b33f2 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -469,9 +469,18 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	SnapBuild  *builder = ctx->snapshot_builder;
 
 	/*
-	 * Check if REPACK CONCURRENTLY is being performed by this backend. If so,
-	 * only decode data changes of the table that it is processing, and the
-	 * changes of its TOAST relation.
+	 * If the change is not intended for logical decoding, do not even
+	 * establish transaction for it. This is particularly important if the
+	 * record was generated by REPACK CONCURRENTLY because this command uses
+	 * the original XID when doing changes in the new storage. The decoding
+	 * system probably does not expect to see the same transaction multiple
+	 * times.
+	 */
+
+	/*
+	 * First, check if REPACK CONCURRENTLY is being performed by this backend.
+	 * If so, only decode data changes of the table that it is processing, and
+	 * the changes of its TOAST relation.
 	 *
 	 * (TOAST locator should not be set unless the main is.)
 	 */
@@ -491,6 +500,61 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			return;
 	}
 
+	/*
+	 * Second, skip records which do not contain sufficient information for
+	 * the decoding.
+	 *
+	 * One particular problem we solve here is that REPACK CONCURRENTLY
+	 * generates WAL when doing changes in the new table. Those changes should
+	 * not be decoded because reorderbuffer.c considers their XID already
+	 * committed. (REPACK CONCURRENTLY deliberately generates WAL records in
+	 * such a way that they are skipped here.)
+	 */
+	switch (info)
+	{
+		case XLOG_HEAP_INSERT:
+			{
+				xl_heap_insert *rec;
+
+				rec = (xl_heap_insert *) XLogRecGetData(buf->record);
+
+				/*
+				 * This does happen when 1) raw_heap_insert marks the TOAST
+				 * record as HEAP_INSERT_NO_LOGICAL, 2) REPACK CONCURRENTLY
+				 * replays inserts performed by other backends.
+				 */
+				if ((rec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE) == 0)
+					return;
+
+				break;
+			}
+
+		case XLOG_HEAP_HOT_UPDATE:
+		case XLOG_HEAP_UPDATE:
+			{
+				xl_heap_update *rec;
+
+				rec = (xl_heap_update *) XLogRecGetData(buf->record);
+				if ((rec->flags &
+					 (XLH_UPDATE_CONTAINS_NEW_TUPLE |
+					  XLH_UPDATE_CONTAINS_OLD_TUPLE |
+					  XLH_UPDATE_CONTAINS_OLD_KEY)) == 0)
+					return;
+
+				break;
+			}
+
+		case XLOG_HEAP_DELETE:
+			{
+				xl_heap_delete *rec;
+
+				rec = (xl_heap_delete *) XLogRecGetData(buf->record);
+				if (rec->flags & XLH_DELETE_NO_LOGICAL)
+					return;
+				break;
+			}
+	}
+
 	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
 
 	/*
@@ -923,13 +987,6 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	xlrec = (xl_heap_insert *) XLogRecGetData(r);
 
-	/*
-	 * Ignore insert records without new tuples (this does happen when
-	 * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
-	 */
-	if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
-		return;
-
 	/* only interested in our database */
 	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
 	if (target_locator.dbOid != ctx->slot->data.database)
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index c32e459411b..fde4955c328 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -155,7 +155,7 @@ static bool ExportInProgress = false;
 static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
 
 /* snapshot building/manipulation/distribution functions */
-static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
+static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, XLogRecPtr lsn);
 
 static void SnapBuildFreeSnapshot(Snapshot snap);
 
@@ -352,12 +352,17 @@ SnapBuildSnapDecRefcount(Snapshot snap)
  * Build a new snapshot, based on currently committed catalog-modifying
  * transactions.
  *
+ * 'lsn' is the location of the commit record (of a catalog-changing
+ * transaction) that triggered creation of the snapshot. Pass
+ * InvalidXLogRecPtr for the transaction base snapshot or if it the user of
+ * the snapshot should not need the LSN.
+ *
  * In-progress transactions with catalog access are *not* allowed to modify
  * these snapshots; they have to copy them and fill in appropriate ->curcid
  * and ->subxip/subxcnt values.
  */
 static Snapshot
-SnapBuildBuildSnapshot(SnapBuild *builder)
+SnapBuildBuildSnapshot(SnapBuild *builder, XLogRecPtr lsn)
 {
 	Snapshot	snapshot;
 	Size		ssize;
@@ -425,6 +430,7 @@ SnapBuildBuildSnapshot(SnapBuild *builder)
 	snapshot->active_count = 0;
 	snapshot->regd_count = 0;
 	snapshot->snapXactCompletionCount = 0;
+	snapshot->lsn = lsn;
 
 	return snapshot;
 }
@@ -461,7 +467,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 	if (TransactionIdIsValid(MyProc->xmin))
 		elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
 
-	snap = SnapBuildBuildSnapshot(builder);
+	snap = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr);
 
 	/*
 	 * We know that snap->xmin is alive, enforced by the logical xmin
@@ -502,7 +508,7 @@ SnapBuildInitialSnapshotForRepack(SnapBuild *builder)
 
 	Assert(builder->state == SNAPBUILD_CONSISTENT);
 
-	snap = SnapBuildBuildSnapshot(builder);
+	snap = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr);
 	return SnapBuildMVCCFromHistoric(snap, false);
 }
 
@@ -636,7 +642,7 @@ SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
 	/* only build a new snapshot if we don't have a prebuilt one */
 	if (builder->snapshot == NULL)
 	{
-		builder->snapshot = SnapBuildBuildSnapshot(builder);
+		builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr);
 		/* increase refcount for the snapshot builder */
 		SnapBuildSnapIncRefcount(builder->snapshot);
 	}
@@ -716,7 +722,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 		/* only build a new snapshot if we don't have a prebuilt one */
 		if (builder->snapshot == NULL)
 		{
-			builder->snapshot = SnapBuildBuildSnapshot(builder);
+			builder->snapshot = SnapBuildBuildSnapshot(builder, lsn);
 			/* increase refcount for the snapshot builder */
 			SnapBuildSnapIncRefcount(builder->snapshot);
 		}
@@ -1085,7 +1091,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		if (builder->snapshot)
 			SnapBuildSnapDecRefcount(builder->snapshot);
 
-		builder->snapshot = SnapBuildBuildSnapshot(builder);
+		builder->snapshot = SnapBuildBuildSnapshot(builder, lsn);
 
 		/* we might need to execute invalidations, add snapshot */
 		if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
@@ -1910,7 +1916,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	{
 		SnapBuildSnapDecRefcount(builder->snapshot);
 	}
-	builder->snapshot = SnapBuildBuildSnapshot(builder);
+	builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr);
 	SnapBuildSnapIncRefcount(builder->snapshot);
 
 	ReorderBufferSetRestartPoint(builder->reorder, lsn);
diff --git a/src/backend/replication/pgoutput_repack/pgoutput_repack.c b/src/backend/replication/pgoutput_repack/pgoutput_repack.c
index 687fbbc59bb..28bd16f9cc7 100644
--- a/src/backend/replication/pgoutput_repack/pgoutput_repack.c
+++ b/src/backend/replication/pgoutput_repack/pgoutput_repack.c
@@ -32,7 +32,8 @@ static void plugin_truncate(struct LogicalDecodingContext *ctx,
 							Relation relations[],
 							ReorderBufferChange *change);
 static void store_change(LogicalDecodingContext *ctx,
-						 ConcurrentChangeKind kind, HeapTuple tuple);
+						 ConcurrentChangeKind kind, HeapTuple tuple,
+						 TransactionId xid);
 
 void
 _PG_output_plugin_init(OutputPluginCallbacks *cb)
@@ -100,6 +101,7 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			  Relation relation, ReorderBufferChange *change)
 {
 	RepackDecodingState *dstate;
+	Snapshot	snapshot;
 
 	dstate = (RepackDecodingState *) ctx->output_writer_private;
 
@@ -107,6 +109,48 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (relation->rd_id != dstate->relid)
 		return;
 
+	/*
+	 * Catalog snapshot is fine because the table we are processing is
+	 * temporarily considered a user catalog table.
+	 */
+	snapshot = GetCatalogSnapshot(InvalidOid);
+	Assert(snapshot->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
+	Assert(!snapshot->suboverflowed);
+
+	/*
+	 * This should not happen, but if we don't have enough information to
+	 * apply a new snapshot, the consequences would be bad. Thus prefer ERROR
+	 * to Assert().
+	 */
+	if (XLogRecPtrIsInvalid(snapshot->lsn))
+		ereport(ERROR, (errmsg("snapshot has invalid LSN")));
+
+	/*
+	 * reorderbuffer.c changes the catalog snapshot as soon as it sees a new
+	 * CID or a commit record of a catalog-changing transaction.
+	 */
+	if (dstate->snapshot == NULL || snapshot->lsn != dstate->snapshot_lsn ||
+		snapshot->curcid != dstate->snapshot->curcid)
+	{
+		/* CID should not go backwards. */
+		Assert(dstate->snapshot == NULL ||
+			   snapshot->curcid >= dstate->snapshot->curcid ||
+			   change->txn->xid != dstate->last_change_xid);
+
+		/*
+		 * XXX Is it a problem that the copy is created in
+		 * TopTransactionContext?
+		 *
+		 * XXX Wouldn't it be o.k. for SnapBuildMVCCFromHistoric() to set xcnt
+		 * to 0 instead of converting xip in this case? The point is that
+		 * transactions which are still in progress from the perspective of
+		 * reorderbuffer.c could not be replayed yet, so we do not need to
+		 * examine their XIDs.
+		 */
+		dstate->snapshot = SnapBuildMVCCFromHistoric(snapshot, false);
+		dstate->snapshot_lsn = snapshot->lsn;
+	}
+
 	/* Decode entry depending on its type */
 	switch (change->action)
 	{
@@ -124,7 +168,7 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				if (newtuple == NULL)
 					elog(ERROR, "Incomplete insert info.");
 
-				store_change(ctx, CHANGE_INSERT, newtuple);
+				store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid);
 			}
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
@@ -141,9 +185,11 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					elog(ERROR, "Incomplete update info.");
 
 				if (oldtuple != NULL)
-					store_change(ctx, CHANGE_UPDATE_OLD, oldtuple);
+					store_change(ctx, CHANGE_UPDATE_OLD, oldtuple,
+								 change->txn->xid);
 
-				store_change(ctx, CHANGE_UPDATE_NEW, newtuple);
+				store_change(ctx, CHANGE_UPDATE_NEW, newtuple,
+							 change->txn->xid);
 			}
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
@@ -156,7 +202,7 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				if (oldtuple == NULL)
 					elog(ERROR, "Incomplete delete info.");
 
-				store_change(ctx, CHANGE_DELETE, oldtuple);
+				store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid);
 			}
 			break;
 		default:
@@ -190,13 +236,13 @@ plugin_truncate(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (i == nrelations)
 		return;
 
-	store_change(ctx, CHANGE_TRUNCATE, NULL);
+	store_change(ctx, CHANGE_TRUNCATE, NULL, InvalidTransactionId);
 }
 
 /* Store concurrent data change. */
 static void
 store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
-			 HeapTuple tuple)
+			 HeapTuple tuple, TransactionId xid)
 {
 	RepackDecodingState *dstate;
 	char	   *change_raw;
@@ -266,6 +312,11 @@ store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
 	dst = dst_start + SizeOfConcurrentChange;
 	memcpy(dst, tuple->t_data, tuple->t_len);
 
+	/* Initialize the other fields. */
+	change.xid = xid;
+	change.snapshot = dstate->snapshot;
+	dstate->snapshot->active_count++;
+
 	/* The data has been copied. */
 	if (flattened)
 		pfree(tuple);
@@ -279,6 +330,9 @@ store:
 	isnull[0] = false;
 	tuplestore_putvalues(dstate->tstore, dstate->tupdesc_change,
 						 values, isnull);
+#ifdef USE_ASSERT_CHECKING
+	dstate->last_change_xid = xid;
+#endif
 
 	/* Accounting. */
 	dstate->nchanges++;
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index e9ddf39500c..e24e1795aa9 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -151,6 +151,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, InjectionPointShmemSize());
 	size = add_size(size, SlotSyncShmemSize());
 	size = add_size(size, AioShmemSize());
+	size = add_size(size, RepackShmemSize());
 
 	/* include additional requested shmem from preload libraries */
 	size = add_size(size, total_addin_request);
@@ -344,6 +345,7 @@ CreateOrAttachShmemStructs(void)
 	WaitEventCustomShmemInit();
 	InjectionPointShmemInit();
 	AioShmemInit();
+	RepackShmemInit();
 }
 
 /*
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 4eb67720737..14eda1c24ee 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -1633,6 +1633,27 @@ CacheInvalidateRelcache(Relation relation)
 								 databaseId, relationId);
 }
 
+/*
+ * CacheInvalidateRelcacheImmediate
+ *		Send invalidation message for the specified relation's relcache entry.
+ *
+ * Currently this is used in REPACK CONCURRENTLY, to make sure that other
+ * backends are aware that the command is being executed for the relation.
+ */
+void
+CacheInvalidateRelcacheImmediate(Oid relid)
+{
+	SharedInvalidationMessage msg;
+
+	msg.rc.id = SHAREDINVALRELCACHE_ID;
+	msg.rc.dbId = MyDatabaseId;
+	msg.rc.relId = relid;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	SendSharedInvalidMessages(&msg, 1);
+}
+
 /*
  * CacheInvalidateRelcacheAll
  *		Register invalidation of the whole relcache at the end of command.
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index a495f22876d..679cc6be1d1 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -1253,6 +1253,10 @@ retry:
 	/* make sure relation is marked as having no open file yet */
 	relation->rd_smgr = NULL;
 
+	/* Is REPACK CONCURRENTLY in progress? */
+	relation->rd_repack_concurrent =
+		is_concurrent_repack_in_progress(targetRelId);
+
 	/*
 	 * now we can free the memory allocated for pg_class_tuple
 	 */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index bdeb2f83540..b0c6f1d916f 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -325,21 +325,24 @@ extern BulkInsertState GetBulkInsertState(void);
 extern void FreeBulkInsertState(BulkInsertState);
 extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
 
-extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
-						int options, BulkInsertState bistate);
+extern void heap_insert(Relation relation, HeapTuple tup, TransactionId xid,
+						CommandId cid, int options, BulkInsertState bistate);
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate);
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
-							 CommandId cid, Snapshot crosscheck, bool wait,
-							 struct TM_FailureData *tmfd, bool changingPart);
+							 TransactionId xid, CommandId cid,
+							 Snapshot crosscheck, bool wait,
+							 struct TM_FailureData *tmfd, bool changingPart,
+							 bool wal_logical);
 extern void heap_finish_speculative(Relation relation, ItemPointer tid);
 extern void heap_abort_speculative(Relation relation, ItemPointer tid);
 extern TM_Result heap_update(Relation relation, ItemPointer otid,
-							 HeapTuple newtup,
+							 HeapTuple newtup, TransactionId xid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, LockTupleMode *lockmode,
-							 TU_UpdateIndexes *update_indexes);
+							 TU_UpdateIndexes *update_indexes,
+							 bool wal_logical);
 extern TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
 								 CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy,
 								 bool follow_updates,
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 277df6b3cf0..8d4af07f840 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -104,6 +104,8 @@
 #define XLH_DELETE_CONTAINS_OLD_KEY				(1<<2)
 #define XLH_DELETE_IS_SUPER						(1<<3)
 #define XLH_DELETE_IS_PARTITION_MOVE			(1<<4)
+/* See heap_delete() */
+#define XLH_DELETE_NO_LOGICAL					(1<<5)
 
 /* convenience macro for checking whether any form of old tuple was logged */
 #define XLH_DELETE_CONTAINS_OLD						\
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index b2bc10ee041..fbb66d559b6 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -482,6 +482,8 @@ extern Size EstimateTransactionStateSpace(void);
 extern void SerializeTransactionState(Size maxsize, char *start_address);
 extern void StartParallelWorkerTransaction(char *tstatespace);
 extern void EndParallelWorkerTransaction(void);
+extern void SetRepackCurrentXids(TransactionId *xip, int xcnt);
+extern void ResetRepackCurrentXids(void);
 extern bool IsTransactionBlock(void);
 extern bool IsTransactionOrTransactionBlock(void);
 extern char TransactionBlockStatusCode(void);
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index 569cc2184b3..ab1d9fc25dc 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -73,6 +73,14 @@ typedef struct ConcurrentChange
 	/* See the enum above. */
 	ConcurrentChangeKind kind;
 
+	/* Transaction that changes the data. */
+	TransactionId xid;
+
+	/*
+	 * Historic catalog snapshot that was used to decode this change.
+	 */
+	Snapshot	snapshot;
+
 	/*
 	 * The actual tuple.
 	 *
@@ -104,6 +112,8 @@ typedef struct RepackDecodingState
 	 * tuplestore does this transparently.
 	 */
 	Tuplestorestate *tstore;
+	/* XID of the last change added to tstore. */
+	TransactionId last_change_xid PG_USED_FOR_ASSERTS_ONLY;
 
 	/* The current number of changes in tstore. */
 	double		nchanges;
@@ -124,6 +134,14 @@ typedef struct RepackDecodingState
 	/* Slot to retrieve data from tstore. */
 	TupleTableSlot *tsslot;
 
+	/*
+	 * Historic catalog snapshot that was used to decode the most recent
+	 * change.
+	 */
+	Snapshot	snapshot;
+	/* LSN of the record  */
+	XLogRecPtr	snapshot_lsn;
+
 	ResourceOwner resowner;
 } RepackDecodingState;
 
@@ -148,5 +166,9 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
 							 MultiXactId cutoffMulti,
 							 char newrelpersistence);
 
+extern Size RepackShmemSize(void);
+extern void RepackShmemInit(void);
+extern bool is_concurrent_repack_in_progress(Oid relid);
+
 extern void repack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel);
 #endif							/* CLUSTER_H */
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 9b871caef62..ae9dee394dc 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -50,6 +50,8 @@ extern void CacheInvalidateCatalog(Oid catalogId);
 
 extern void CacheInvalidateRelcache(Relation relation);
 
+extern void CacheInvalidateRelcacheImmediate(Oid relid);
+
 extern void CacheInvalidateRelcacheAll(void);
 
 extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index d94fddd7cef..372065fc570 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -253,6 +253,9 @@ typedef struct RelationData
 	bool		pgstat_enabled; /* should relation stats be counted */
 	/* use "struct" here to avoid needing to include pgstat.h: */
 	struct PgStat_TableStatus *pgstat_info; /* statistics collection area */
+
+	/* Is REPACK CONCURRENTLY being performed on this relation? */
+	bool		rd_repack_concurrent;
 } RelationData;
 
 
@@ -692,7 +695,9 @@ RelationCloseSmgr(Relation relation)
 #define RelationIsAccessibleInLogicalDecoding(relation) \
 	(XLogLogicalInfoActive() && \
 	 RelationNeedsWAL(relation) && \
-	 (IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))
+	 (IsCatalogRelation(relation) || \
+	  RelationIsUsedAsCatalogTable(relation) || \
+	  (relation)->rd_repack_concurrent))
 
 /*
  * RelationIsLogicallyLogged
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 0e546ec1497..014f27db7d7 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -13,6 +13,7 @@
 #ifndef SNAPSHOT_H
 #define SNAPSHOT_H
 
+#include "access/xlogdefs.h"
 #include "lib/pairingheap.h"
 
 
@@ -201,6 +202,8 @@ typedef struct SnapshotData
 	uint32		regd_count;		/* refcount on RegisteredSnapshots */
 	pairingheap_node ph_node;	/* link in the RegisteredSnapshots heap */
 
+	XLogRecPtr	lsn;			/* position in the WAL stream when taken */
+
 	/*
 	 * The transaction completion count at the time GetSnapshotData() built
 	 * this snapshot. Allows to avoid re-computing static snapshots when no
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e89db0a2ee7..e1e3e619c4b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2510,6 +2510,7 @@ ReorderBufferTupleCidKey
 ReorderBufferUpdateProgressTxnCB
 ReorderTuple
 RepOriginId
+RepackedRel
 RepackDecodingState
 RepackStmt
 ReparameterizeForeignPathByChild_function
-- 
2.43.5

