From 3b58990c088936122f38d855a5a3900602deacf7 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Mon, 6 Apr 2020 21:28:55 -0700
Subject: [PATCH 1/2] TMP: work around missing snapshot registrations.

This is just what's hit by the tests. It's not an actual fix.
---
 src/backend/catalog/namespace.c             |  7 +++++++
 src/backend/catalog/pg_subscription.c       |  4 ++++
 src/backend/commands/indexcmds.c            |  9 +++++++++
 src/backend/commands/tablecmds.c            |  8 ++++++++
 src/backend/replication/logical/tablesync.c | 12 ++++++++++++
 src/backend/replication/logical/worker.c    |  4 ++++
 src/backend/utils/time/snapmgr.c            |  4 ++++
 7 files changed, 48 insertions(+)

diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index 2ec23016fe5..e4696d8d417 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -55,6 +55,7 @@
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
 
@@ -4244,12 +4245,18 @@ RemoveTempRelationsCallback(int code, Datum arg)
 {
 	if (OidIsValid(myTempNamespace))	/* should always be true */
 	{
+		Snapshot snap;
+
 		/* Need to ensure we have a usable transaction. */
 		AbortOutOfAnyTransaction();
 		StartTransactionCommand();
 
+		/* ensure xmin stays set */
+		snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
+
 		RemoveTempRelations(myTempNamespace);
 
+		UnregisterSnapshot(snap);
 		CommitTransactionCommand();
 	}
 }
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index cb157311154..4a324dfb4f1 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -31,6 +31,7 @@
 #include "utils/fmgroids.h"
 #include "utils/pg_lsn.h"
 #include "utils/rel.h"
+#include "utils/snapmgr.h"
 #include "utils/syscache.h"
 
 static List *textarray_to_stringlist(ArrayType *textarray);
@@ -286,6 +287,7 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	bool		nulls[Natts_pg_subscription_rel];
 	Datum		values[Natts_pg_subscription_rel];
 	bool		replaces[Natts_pg_subscription_rel];
+	Snapshot snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
 
 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 
@@ -321,6 +323,8 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 
 	/* Cleanup. */
 	table_close(rel, NoLock);
+
+	UnregisterSnapshot(snap);
 }
 
 /*
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 2baca12c5f4..094bf6139f0 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -2837,6 +2837,7 @@ ReindexRelationConcurrently(Oid relationOid, int options)
 	char	   *relationName = NULL;
 	char	   *relationNamespace = NULL;
 	PGRUsage	ru0;
+	Snapshot	snap;
 
 	/*
 	 * Create a memory context that will survive forced transaction commits we
@@ -3306,6 +3307,7 @@ ReindexRelationConcurrently(Oid relationOid, int options)
 	 */
 
 	StartTransactionCommand();
+	snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
 
 	forboth(lc, indexIds, lc2, newIndexIds)
 	{
@@ -3354,8 +3356,11 @@ ReindexRelationConcurrently(Oid relationOid, int options)
 	}
 
 	/* Commit this transaction and make index swaps visible */
+	UnregisterSnapshot(snap);
 	CommitTransactionCommand();
+
 	StartTransactionCommand();
+	snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
 
 	/*
 	 * Phase 5 of REINDEX CONCURRENTLY
@@ -3386,7 +3391,9 @@ ReindexRelationConcurrently(Oid relationOid, int options)
 	}
 
 	/* Commit this transaction to make the updates visible. */
+	UnregisterSnapshot(snap);
 	CommitTransactionCommand();
+
 	StartTransactionCommand();
 
 	/*
@@ -3400,6 +3407,7 @@ ReindexRelationConcurrently(Oid relationOid, int options)
 	WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
 
 	PushActiveSnapshot(GetTransactionSnapshot());
+	snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
 
 	{
 		ObjectAddresses *objects = new_object_addresses();
@@ -3425,6 +3433,7 @@ ReindexRelationConcurrently(Oid relationOid, int options)
 	}
 
 	PopActiveSnapshot();
+	UnregisterSnapshot(snap);
 	CommitTransactionCommand();
 
 	/*
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 6162fb018c7..e1eacc6a4a6 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -15200,6 +15200,7 @@ PreCommit_on_commit_actions(void)
 	ListCell   *l;
 	List	   *oids_to_truncate = NIL;
 	List	   *oids_to_drop = NIL;
+	Snapshot	snap;
 
 	foreach(l, on_commits)
 	{
@@ -15231,6 +15232,11 @@ PreCommit_on_commit_actions(void)
 		}
 	}
 
+	if (oids_to_truncate == NIL && oids_to_drop == NIL)
+		return;
+
+	snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
+
 	/*
 	 * Truncate relations before dropping so that all dependencies between
 	 * relations are removed after they are worked on.  Doing it like this
@@ -15284,6 +15290,8 @@ PreCommit_on_commit_actions(void)
 		}
 #endif
 	}
+
+	UnregisterSnapshot(snap);
 }
 
 /*
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index c27d9705895..aec5a044790 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -863,6 +863,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 			{
 				Relation	rel;
 				WalRcvExecResult *res;
+				Snapshot	snap;
 
 				SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 				MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@@ -871,10 +872,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 				/* Update the state and make it visible to others. */
 				StartTransactionCommand();
+				snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
+
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 										   MyLogicalRepWorker->relid,
 										   MyLogicalRepWorker->relstate,
 										   MyLogicalRepWorker->relstate_lsn);
+
+				UnregisterSnapshot(snap);
 				CommitTransactionCommand();
 				pgstat_report_stat(false);
 
@@ -918,6 +923,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 								   CRS_USE_SNAPSHOT, origin_startpos);
 
 				PushActiveSnapshot(GetTransactionSnapshot());
+				snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
 				copy_table(rel);
 				PopActiveSnapshot();
 
@@ -933,6 +939,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 				/* Make the copy visible. */
 				CommandCounterIncrement();
 
+				UnregisterSnapshot(snap);
+
 				/*
 				 * We are done with the initial data synchronization, update
 				 * the state.
@@ -957,6 +965,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 				 */
 				if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
 				{
+					snap = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));
+
 					/*
 					 * Update the new state in catalog.  No need to bother
 					 * with the shmem state as we are exiting for good.
@@ -965,6 +975,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 											   MyLogicalRepWorker->relid,
 											   SUBREL_STATE_SYNCDONE,
 											   *origin_startpos);
+					UnregisterSnapshot(snap);
+
 					finish_sync_worker();
 				}
 				break;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a752a1224d6..f10f3f843d1 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1245,6 +1245,9 @@ apply_handle_truncate(StringInfo s)
 
 	ensure_transaction();
 
+	/* catalog modifications need a set snapshot */
+	PushActiveSnapshot(GetTransactionSnapshot());
+
 	remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
 
 	foreach(lc, remote_relids)
@@ -1332,6 +1335,7 @@ apply_handle_truncate(StringInfo s)
 	}
 
 	CommandCounterIncrement();
+	PopActiveSnapshot();
 }
 
 
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 1c063c592ce..b5cff157bf6 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -441,6 +441,8 @@ GetOldestSnapshot(void)
 Snapshot
 GetCatalogSnapshot(Oid relid)
 {
+	Assert(IsTransactionState());
+
 	/*
 	 * Return historic snapshot while we're doing logical decoding, so we can
 	 * see the appropriate state of the catalog.
@@ -1017,6 +1019,8 @@ SnapshotResetXmin(void)
 	if (pairingheap_is_empty(&RegisteredSnapshots))
 	{
 		MyPgXact->xmin = InvalidTransactionId;
+		TransactionXmin = InvalidTransactionId;
+		RecentXmin = InvalidTransactionId;
 		return;
 	}
 
-- 
2.25.0.114.g5b0ca878e0

