From 05443030201d59216b3125d51c641b68decd4379 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Mon, 31 Mar 2025 21:46:54 +0300
Subject: [PATCH v6 07/12] Split MVCCSnapshot into inner and outer parts

Split MVCCSnapshot into two parts: inner struct to hold the xmin, xmax
and XID arrays that determine which transactions are visible, and an
outer shell that includes the command ID and a pointer to the inner
struct. That way, the inner struct can be shared by snapshots derived
from the same original snapshot, just with different command IDs.

The inner struct, MVCCSnapshotShared, is reference counted separately
so that we can avoid copying it when pushing or registering a snapshot
for the first time. Also, GetMVCCSnapshotData() can reuse it more
aggressively: we always keep a pointer to the latest shared struct
(latestSnapshotShared), and GetMVCCSnapshotData() always tries to
reuse the same latest snapshot, regardless of whether it was called
from GetTransactionSnapshot(), GetLatestSnapshot(), or
GetCatalogSnapshot(). That avoids unnecessary copying. Snapshots are
usually small so that it doesn't matter, but it can help in extreme
cases where you have thousands of (sub-)XIDs in progress.

Now that the shared inner structs are reference counted, it seems
unnecessary to reference count the outer MVCCSnapshots
separately. That means that RegisterSnapshot() always makes a new
palloc'd copy of the outer struct, but that's pretty small. The
ActiveSnapshot stack entries now embed the outer struct directly, so
the 'active_count' is gone too.

The ValidSnapshots list now tracks the shared structs rather than the
outer snapshots. That's sufficient for finding the oldest xmin, but if
we ever wanted to also know the oldest command ID in use, we'd need to
track the outer structs instead.
---
 contrib/amcheck/verify_heapam.c             |   2 +-
 contrib/amcheck/verify_nbtree.c             |   2 +-
 src/backend/access/heap/heapam.c            |   2 +-
 src/backend/access/heap/heapam_handler.c    |   2 +-
 src/backend/access/heap/heapam_visibility.c |  18 +-
 src/backend/access/spgist/spgvacuum.c       |   2 +-
 src/backend/access/transam/README           |  26 +-
 src/backend/catalog/pg_inherits.c           |   6 +-
 src/backend/commands/async.c                |   2 +-
 src/backend/commands/indexcmds.c            |   4 +-
 src/backend/commands/tablecmds.c            |   2 +-
 src/backend/executor/execMain.c             |  12 +-
 src/backend/executor/execParallel.c         |   3 +-
 src/backend/partitioning/partdesc.c         |   2 +-
 src/backend/replication/logical/snapbuild.c |  40 +-
 src/backend/replication/walsender.c         |   2 +-
 src/backend/storage/ipc/procarray.c         | 138 +++--
 src/backend/storage/lmgr/predicate.c        | 109 ++--
 src/backend/utils/adt/xid8funcs.c           |   8 +-
 src/backend/utils/time/snapmgr.c            | 605 ++++++++++----------
 src/include/access/transam.h                |   4 +-
 src/include/storage/predicate.h             |   8 +-
 src/include/storage/proc.h                  |   2 +-
 src/include/storage/procarray.h             |   2 +-
 src/include/utils/snapmgr.h                 |  11 +-
 src/include/utils/snapshot.h                |  51 +-
 src/tools/pgindent/typedefs.list            |   2 +
 27 files changed, 536 insertions(+), 531 deletions(-)

diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 6665cafc179..d7f0b772f94 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -310,7 +310,7 @@ verify_heapam(PG_FUNCTION_ARGS)
 	 * Any xmin newer than the xmin of our snapshot can't become all-visible
 	 * while we're running.
 	 */
-	ctx.safe_xmin = GetTransactionSnapshot()->mvcc.xmin;
+	ctx.safe_xmin = GetTransactionSnapshot()->mvcc.shared->xmin;
 
 	/*
 	 * If we report corruption when not examining some individual attribute,
diff --git a/contrib/amcheck/verify_nbtree.c b/contrib/amcheck/verify_nbtree.c
index e90b4a2ad5a..d77ded4cc40 100644
--- a/contrib/amcheck/verify_nbtree.c
+++ b/contrib/amcheck/verify_nbtree.c
@@ -458,7 +458,7 @@ bt_check_every_level(Relation rel, Relation heaprel, bool heapkeyspace,
 			 */
 			if (IsolationUsesXactSnapshot() && rel->rd_index->indcheckxmin &&
 				!TransactionIdPrecedes(HeapTupleHeaderGetXmin(rel->rd_indextuple->t_data),
-									   snapshot->mvcc.xmin))
+									   snapshot->mvcc.shared->xmin))
 				ereport(ERROR,
 						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 						 errmsg("index \"%s\" cannot be verified using transaction snapshot",
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 0cfa100cbd1..0615ffa2bd1 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -606,7 +606,7 @@ heap_prepare_pagescan(TableScanDesc sscan)
 	 * tuple for visibility the hard way.
 	 */
 	all_visible = PageIsAllVisible(page) &&
-		(snapshot->snapshot_type != SNAPSHOT_MVCC || !snapshot->mvcc.takenDuringRecovery);
+		(snapshot->snapshot_type != SNAPSHOT_MVCC || !snapshot->mvcc.shared->takenDuringRecovery);
 	check_serializable =
 		CheckForSerializableConflictOutNeeded(scan->rs_base.rs_rd, snapshot);
 
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index fce657f00f6..b9a5b38dd08 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2308,7 +2308,7 @@ heapam_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate,
 
 	page = (Page) BufferGetPage(hscan->rs_cbuf);
 	all_visible = PageIsAllVisible(page) &&
-		(scan->rs_snapshot->snapshot_type != SNAPSHOT_MVCC || !scan->rs_snapshot->mvcc.takenDuringRecovery);
+		(scan->rs_snapshot->snapshot_type != SNAPSHOT_MVCC || !scan->rs_snapshot->mvcc.shared->takenDuringRecovery);
 	maxoffset = PageGetMaxOffsetNumber(page);
 
 	for (;;)
diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c
index f5d69b558f1..07f155498d4 100644
--- a/src/backend/access/heap/heapam_visibility.c
+++ b/src/backend/access/heap/heapam_visibility.c
@@ -19,7 +19,7 @@
  * That fixes that problem, but it also means there is a window where
  * TransactionIdIsInProgress and TransactionIdDidCommit will both return true.
  * If we check only TransactionIdDidCommit, we could consider a tuple
- * committed when a later GetSnapshotData call will still think the
+ * committed when a later GetMVCCSnapshotData call will still think the
  * originating transaction is in progress, which leads to application-level
  * inconsistency.  The upshot is that we gotta check TransactionIdIsInProgress
  * first in all code paths, except for a few cases where we are looking at
@@ -969,7 +969,7 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, MVCCSnapshot snapshot,
 	 * get invalidated while it's still in use, and this is a convenient place
 	 * to check for that.
 	 */
-	Assert(snapshot->regd_count > 0 || snapshot->active_count > 0);
+	Assert(snapshot->kind == SNAPSHOT_ACTIVE || snapshot->kind == SNAPSHOT_REGISTERED);
 
 	Assert(ItemPointerIsValid(&htup->t_self));
 	Assert(htup->t_tableOid != InvalidOid);
@@ -986,7 +986,7 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, MVCCSnapshot snapshot,
 
 			if (TransactionIdIsCurrentTransactionId(xvac))
 				return false;
-			if (!XidInMVCCSnapshot(xvac, snapshot))
+			if (!XidInMVCCSnapshot(xvac, snapshot->shared))
 			{
 				if (TransactionIdDidCommit(xvac))
 				{
@@ -1005,7 +1005,7 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, MVCCSnapshot snapshot,
 
 			if (!TransactionIdIsCurrentTransactionId(xvac))
 			{
-				if (XidInMVCCSnapshot(xvac, snapshot))
+				if (XidInMVCCSnapshot(xvac, snapshot->shared))
 					return false;
 				if (TransactionIdDidCommit(xvac))
 					SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
@@ -1060,7 +1060,7 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, MVCCSnapshot snapshot,
 			else
 				return false;	/* deleted before scan started */
 		}
-		else if (XidInMVCCSnapshot(HeapTupleHeaderGetRawXmin(tuple), snapshot))
+		else if (XidInMVCCSnapshot(HeapTupleHeaderGetRawXmin(tuple), snapshot->shared))
 			return false;
 		else if (TransactionIdDidCommit(HeapTupleHeaderGetRawXmin(tuple)))
 			SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
@@ -1077,7 +1077,7 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, MVCCSnapshot snapshot,
 	{
 		/* xmin is committed, but maybe not according to our snapshot */
 		if (!HeapTupleHeaderXminFrozen(tuple) &&
-			XidInMVCCSnapshot(HeapTupleHeaderGetRawXmin(tuple), snapshot))
+			XidInMVCCSnapshot(HeapTupleHeaderGetRawXmin(tuple), snapshot->shared))
 			return false;		/* treat as still in progress */
 	}
 
@@ -1108,7 +1108,7 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, MVCCSnapshot snapshot,
 			else
 				return false;	/* deleted before scan started */
 		}
-		if (XidInMVCCSnapshot(xmax, snapshot))
+		if (XidInMVCCSnapshot(xmax, snapshot->shared))
 			return true;
 		if (TransactionIdDidCommit(xmax))
 			return false;		/* updating transaction committed */
@@ -1126,7 +1126,7 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, MVCCSnapshot snapshot,
 				return false;	/* deleted before scan started */
 		}
 
-		if (XidInMVCCSnapshot(HeapTupleHeaderGetRawXmax(tuple), snapshot))
+		if (XidInMVCCSnapshot(HeapTupleHeaderGetRawXmax(tuple), snapshot->shared))
 			return true;
 
 		if (!TransactionIdDidCommit(HeapTupleHeaderGetRawXmax(tuple)))
@@ -1144,7 +1144,7 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, MVCCSnapshot snapshot,
 	else
 	{
 		/* xmax is committed, but maybe not according to our snapshot */
-		if (XidInMVCCSnapshot(HeapTupleHeaderGetRawXmax(tuple), snapshot))
+		if (XidInMVCCSnapshot(HeapTupleHeaderGetRawXmax(tuple), snapshot->shared))
 			return true;		/* treat as still in progress */
 	}
 
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
index 850ad36cd0a..0a8d7b0a0d6 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -808,7 +808,7 @@ spgvacuumscan(spgBulkDeleteState *bds)
 	/* Finish setting up spgBulkDeleteState */
 	initSpGistState(&bds->spgstate, index);
 	bds->pendingList = NULL;
-	bds->myXmin = GetActiveSnapshot()->mvcc.xmin;
+	bds->myXmin = GetActiveSnapshot()->mvcc.shared->xmin;
 	bds->lastFilledBlock = SPGIST_LAST_FIXED_BLKNO;
 
 	/*
diff --git a/src/backend/access/transam/README b/src/backend/access/transam/README
index 231106270fd..81792f0eab3 100644
--- a/src/backend/access/transam/README
+++ b/src/backend/access/transam/README
@@ -231,7 +231,7 @@ we must ensure consistency about the commit order of transactions.
 For example, suppose an UPDATE in xact A is blocked by xact B's prior
 update of the same row, and xact B is doing commit while xact C gets a
 snapshot.  Xact A can complete and commit as soon as B releases its locks.
-If xact C's GetSnapshotData sees xact B as still running, then it had
+If xact C's GetMVCCSnapshotData sees xact B as still running, then it had
 better see xact A as still running as well, or it will be able to see two
 tuple versions - one deleted by xact B and one inserted by xact A.  Another
 reason why this would be bad is that C would see (in the row inserted by A)
@@ -248,8 +248,8 @@ with snapshot-taking: we do not allow any transaction to exit the set of
 running transactions while a snapshot is being taken.  (This rule is
 stronger than necessary for consistency, but is relatively simple to
 enforce, and it assists with some other issues as explained below.)  The
-implementation of this is that GetSnapshotData takes the ProcArrayLock in
-shared mode (so that multiple backends can take snapshots in parallel),
+implementation of this is that GetMVCCSnapshotData takes the ProcArrayLock
+in shared mode (so that multiple backends can take snapshots in parallel),
 but ProcArrayEndTransaction must take the ProcArrayLock in exclusive mode
 while clearing the ProcGlobal->xids[] entry at transaction end (either
 commit or abort). (To reduce context switching, when multiple transactions
@@ -257,7 +257,7 @@ commit nearly simultaneously, we have one backend take ProcArrayLock and
 clear the XIDs of multiple processes at once.)
 
 ProcArrayEndTransaction also holds the lock while advancing the shared
-latestCompletedXid variable.  This allows GetSnapshotData to use
+latestCompletedXid variable.  This allows GetMVCCSnapshotData to use
 latestCompletedXid + 1 as xmax for its snapshot: there can be no
 transaction >= this xid value that the snapshot needs to consider as
 completed.
@@ -301,7 +301,7 @@ if it currently has no live snapshots (eg, if it's between transactions or
 hasn't yet set a snapshot for a new transaction).  ComputeXidHorizons takes
 the MIN() of the valid xmin fields.  It does this with only shared lock on
 ProcArrayLock, which means there is a potential race condition against other
-backends doing GetSnapshotData concurrently: we must be certain that a
+backends doing GetMVCCSnapshotData concurrently: we must be certain that a
 concurrent backend that is about to set its xmin does not compute an xmin
 less than what ComputeXidHorizons determines.  We ensure that by including
 all the active XIDs into the MIN() calculation, along with the valid xmins.
@@ -310,27 +310,27 @@ ensures that concurrent holders of shared ProcArrayLock will compute the
 same minimum of currently-active XIDs: no xact, in particular not the
 oldest, can exit while we hold shared ProcArrayLock.  So
 ComputeXidHorizons's view of the minimum active XID will be the same as that
-of any concurrent GetSnapshotData, and so it can't produce an overestimate.
+of any concurrent GetMVCCSnapshotData, and so it can't produce an overestimate.
 If there is no active transaction at all, ComputeXidHorizons uses
 latestCompletedXid + 1, which is a lower bound for the xmin that might
-be computed by concurrent or later GetSnapshotData calls.  (We know that no
+be computed by concurrent or later GetMVCCSnapshotData calls.  (We know that no
 XID less than this could be about to appear in the ProcArray, because of the
 XidGenLock interlock discussed above.)
 
-As GetSnapshotData is performance critical, it does not perform an accurate
+As GetMVCCSnapshotData is performance critical, it does not perform an accurate
 oldest-xmin calculation (it used to, until v14). The contents of a snapshot
 only depend on the xids of other backends, not their xmin. As backend's xmin
-changes much more often than its xid, having GetSnapshotData look at xmins
+changes much more often than its xid, having GetMVCCSnapshotData look at xmins
 can lead to a lot of unnecessary cacheline ping-pong.  Instead
-GetSnapshotData updates approximate thresholds (one that guarantees that all
-deleted rows older than it can be removed, another determining that deleted
+GetMVCCSnapshotData updates approximate thresholds (one that guarantees that
+all deleted rows older than it can be removed, another determining that deleted
 rows newer than it can not be removed). GlobalVisTest* uses those thresholds
 to make invisibility decision, falling back to ComputeXidHorizons if
 necessary.
 
 Note that while it is certain that two concurrent executions of
-GetSnapshotData will compute the same xmin for their own snapshots, there is
-no such guarantee for the horizons computed by ComputeXidHorizons.  This is
+GetMVCCSnapshotData will compute the same xmin for their own snapshots, there
+is no such guarantee for the horizons computed by ComputeXidHorizons.  This is
 because we allow XID-less transactions to clear their MyProc->xmin
 asynchronously (without taking ProcArrayLock), so one execution might see
 what had been the oldest xmin, and another not.  This is OK since the
diff --git a/src/backend/catalog/pg_inherits.c b/src/backend/catalog/pg_inherits.c
index b658601bf77..f1148dbe4a3 100644
--- a/src/backend/catalog/pg_inherits.c
+++ b/src/backend/catalog/pg_inherits.c
@@ -143,12 +143,12 @@ find_inheritance_children_extended(Oid parentrelId, bool omit_detached,
 			if (omit_detached && ActiveSnapshotSet())
 			{
 				TransactionId xmin;
-				Snapshot	snap;
+				MVCCSnapshot snap;
 
 				xmin = HeapTupleHeaderGetXmin(inheritsTuple->t_data);
-				snap = GetActiveSnapshot();
+				snap = (MVCCSnapshot) GetActiveSnapshot();
 
-				if (!XidInMVCCSnapshot(xmin, (MVCCSnapshot) snap))
+				if (!XidInMVCCSnapshot(xmin, snap->shared))
 				{
 					if (detached_xmin)
 					{
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 1ffb6f5fa70..037ca6c5444 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -2043,7 +2043,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 		/* Ignore messages destined for other databases */
 		if (qe->dboid == MyDatabaseId)
 		{
-			if (XidInMVCCSnapshot(qe->xid, (MVCCSnapshot) snapshot))
+			if (XidInMVCCSnapshot(qe->xid, ((MVCCSnapshot) snapshot)->shared))
 			{
 				/*
 				 * The source transaction is still in progress, so we can't
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index da3e02398bb..7fa044f6f1c 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -1761,7 +1761,7 @@ DefineIndex(Oid tableId,
 	 * they must wait for.  But first, save the snapshot's xmin to use as
 	 * limitXmin for GetCurrentVirtualXIDs().
 	 */
-	limitXmin = snapshot->mvcc.xmin;
+	limitXmin = snapshot->mvcc.shared->xmin;
 
 	PopActiveSnapshot();
 	UnregisterSnapshot(snapshot);
@@ -4156,7 +4156,7 @@ ReindexRelationConcurrently(const ReindexStmt *stmt, Oid relationOid, const Rein
 		 * We can now do away with our active snapshot, we still need to save
 		 * the xmin limit to wait for older snapshots.
 		 */
-		limitXmin = snapshot->mvcc.xmin;
+		limitXmin = snapshot->mvcc.shared->xmin;
 
 		PopActiveSnapshot();
 		UnregisterSnapshot(snapshot);
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index c55b5a7a014..9aca810f9d5 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -20797,7 +20797,7 @@ ATExecDetachPartitionFinalize(Relation rel, RangeVar *name)
 	 * all such queries are complete (otherwise we would present them with an
 	 * inconsistent view of catalogs).
 	 */
-	WaitForOlderSnapshots(snap->mvcc.xmin, false);
+	WaitForOlderSnapshots(snap->mvcc.shared->xmin, false);
 
 	DetachPartitionFinalize(rel, partRel, true, InvalidOid);
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 2da848970be..9ee10050873 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -157,8 +157,8 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
 	Assert(queryDesc != NULL);
 	Assert(queryDesc->estate == NULL);
 
-	/* caller must ensure the query's snapshot is active */
-	Assert(GetActiveSnapshot() == queryDesc->snapshot);
+	/* ensure the query's snapshot is active */
+	PushActiveSnapshot(queryDesc->snapshot);
 
 	/*
 	 * If the transaction is read-only, we need to check if any writes are
@@ -272,6 +272,8 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
 
 	MemoryContextSwitchTo(oldcontext);
 
+	PopActiveSnapshot();
+
 	return ExecPlanStillValid(queryDesc->estate);
 }
 
@@ -390,8 +392,8 @@ standard_ExecutorRun(QueryDesc *queryDesc,
 	Assert(!estate->es_aborted);
 	Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));
 
-	/* caller must ensure the query's snapshot is active */
-	Assert(GetActiveSnapshot() == estate->es_snapshot);
+	/* ensure the query's snapshot is active */
+	PushActiveSnapshot(estate->es_snapshot);
 
 	/*
 	 * Switch into per-query memory context
@@ -455,6 +457,8 @@ standard_ExecutorRun(QueryDesc *queryDesc,
 		InstrStopNode(queryDesc->totaltime, estate->es_processed);
 
 	MemoryContextSwitchTo(oldcontext);
+
+	PopActiveSnapshot();
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 39c990ae638..af3f8f28144 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -737,7 +737,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	 * worker, which uses it to set es_snapshot.  Make sure we don't set
 	 * es_snapshot differently in the child.
 	 */
-	Assert(GetActiveSnapshot() == estate->es_snapshot);
+	Assert(((MVCCSnapshot) GetActiveSnapshot())->shared == ((MVCCSnapshot) estate->es_snapshot)->shared);
+	Assert(((MVCCSnapshot) GetActiveSnapshot())->curcid == ((MVCCSnapshot) estate->es_snapshot)->curcid);
 
 	/* Everyone's had a chance to ask for space, so now create the DSM. */
 	InitializeParallelDSM(pcxt);
diff --git a/src/backend/partitioning/partdesc.c b/src/backend/partitioning/partdesc.c
index 7c15c634181..c5000b37b87 100644
--- a/src/backend/partitioning/partdesc.c
+++ b/src/backend/partitioning/partdesc.c
@@ -102,7 +102,7 @@ RelationGetPartitionDesc(Relation rel, bool omit_detached)
 		Assert(TransactionIdIsValid(rel->rd_partdesc_nodetached_xmin));
 		activesnap = GetActiveSnapshot();
 
-		if (!XidInMVCCSnapshot(rel->rd_partdesc_nodetached_xmin, &activesnap->mvcc))
+		if (!XidInMVCCSnapshot(rel->rd_partdesc_nodetached_xmin, activesnap->mvcc.shared))
 			return rel->rd_partdesc_nodetached;
 	}
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 50dca7cb758..3c94a62cdf6 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -389,6 +389,12 @@ SnapBuildBuildSnapshot(SnapBuild *builder)
  *
  * The snapshot will be usable directly in current transaction or exported
  * for loading in different transaction.
+ *
+ * XXX: The snapshot manager doesn't know anything about the returned
+ * snapshot.  It does not hold back MyProc->xmin, nor is it registered with
+ * any resource owner.  There's also no good way to free it, but leaking it is
+ * acceptable for the current usage where only one snapshot is build for the
+ * whole session.
  */
 MVCCSnapshot
 SnapBuildInitialSnapshot(SnapBuild *builder)
@@ -440,11 +446,14 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 	MyProc->xmin = historicsnap->xmin;
 
 	/* allocate in transaction context */
-	mvccsnap = palloc(sizeof(MVCCSnapshotData) + sizeof(TransactionId) * GetMaxSnapshotXidCount());
+	mvccsnap = palloc(sizeof(MVCCSnapshotData));
+	mvccsnap->kind = SNAPSHOT_STATIC;
+	mvccsnap->shared = AllocMVCCSnapshotShared();
+	mvccsnap->shared->refcount = 1;
 	mvccsnap->snapshot_type = SNAPSHOT_MVCC;
-	mvccsnap->xmin = historicsnap->xmin;
-	mvccsnap->xmax = historicsnap->xmax;
-	mvccsnap->xip = (TransactionId *) ((char *) mvccsnap + sizeof(MVCCSnapshotData));
+	mvccsnap->shared->xmin = historicsnap->xmin;
+	mvccsnap->shared->xmax = historicsnap->xmax;
+	mvccsnap->shared->xip = (TransactionId *) ((char *) mvccsnap->shared + sizeof(MVCCSnapshotData));
 
 	/*
 	 * snapbuild.c builds transactions in an "inverted" manner, which means it
@@ -470,23 +479,20 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 						 errmsg("initial slot snapshot too large")));
 
-			mvccsnap->xip[newxcnt++] = xid;
+			mvccsnap->shared->xip[newxcnt++] = xid;
 		}
 
 		TransactionIdAdvance(xid);
 	}
-	mvccsnap->xcnt = newxcnt;
+	mvccsnap->shared->xcnt = newxcnt;
 
 	/* Initialize remaining MVCCSnapshot fields */
-	mvccsnap->subxip = NULL;
-	mvccsnap->subxcnt = 0;
-	mvccsnap->suboverflowed = false;
-	mvccsnap->takenDuringRecovery = false;
-	mvccsnap->copied = true;
+	mvccsnap->shared->subxip = NULL;
+	mvccsnap->shared->subxcnt = 0;
+	mvccsnap->shared->suboverflowed = false;
+	mvccsnap->shared->takenDuringRecovery = false;
+	mvccsnap->shared->snapXactCompletionCount = 0;
 	mvccsnap->curcid = FirstCommandId;
-	mvccsnap->active_count = 0;
-	mvccsnap->regd_count = 0;
-	mvccsnap->snapXactCompletionCount = 0;
 
 	pfree(historicsnap);
 
@@ -528,13 +534,13 @@ SnapBuildExportSnapshot(SnapBuild *builder)
 	 * now that we've built a plain snapshot, make it active and use the
 	 * normal mechanisms for exporting it
 	 */
-	snapname = ExportSnapshot(snap);
+	snapname = ExportSnapshot(snap->shared);
 
 	ereport(LOG,
 			(errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
 						   "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
-						   snap->xcnt,
-						   snapname, snap->xcnt)));
+						   snap->shared->xcnt,
+						   snapname, snap->shared->xcnt)));
 	return snapname;
 }
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1a7a35e25eb..513449ea9de 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2620,7 +2620,7 @@ ProcessStandbyHSFeedbackMessage(void)
 
 	/*
 	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
-	 * the xmin will be taken into account by GetSnapshotData() /
+	 * the xmin will be taken into account by GetMVCCSnapshotData() /
 	 * ComputeXidHorizons().  This will hold back the removal of dead rows and
 	 * thereby prevent the generation of cleanup conflicts on the standby
 	 * server.
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index ba5ed8960dd..819649741f6 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -62,6 +62,7 @@
 #include "storage/procarray.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
+#include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 
@@ -105,7 +106,7 @@ typedef struct ProcArrayStruct
  * MVCC semantics: If the deleted row's xmax is not considered to be running
  * by anyone, the row can be removed.
  *
- * To avoid slowing down GetSnapshotData(), we don't calculate a precise
+ * To avoid slowing down GetMVCCSnapshotData(), we don't calculate a precise
  * cutoff XID while building a snapshot (looking at the frequently changing
  * xmins scales badly). Instead we compute two boundaries while building the
  * snapshot:
@@ -159,7 +160,7 @@ typedef struct ProcArrayStruct
  *
  * The boundaries are FullTransactionIds instead of TransactionIds to avoid
  * wraparound dangers. There e.g. would otherwise exist no procarray state to
- * prevent maybe_needed to become old enough after the GetSnapshotData()
+ * prevent maybe_needed to become old enough after the GetMVCCSnapshotData()
  * call.
  *
  * The typedef is in the header.
@@ -386,7 +387,7 @@ ProcArrayShmemSize(void)
 	/*
 	 * During Hot Standby processing we have a data structure called
 	 * KnownAssignedXids, created in shared memory. Local data structures are
-	 * also created in various backends during GetSnapshotData(),
+	 * also created in various backends during GetMVCCSnapshotData(),
 	 * TransactionIdIsInProgress() and GetRunningTransactionData(). All of the
 	 * main structures created in those functions must be identically sized,
 	 * since we may at times copy the whole of the data structures around. We
@@ -938,7 +939,7 @@ ProcArrayClearTransaction(PGPROC *proc)
 
 	/*
 	 * Need to increment completion count even though transaction hasn't
-	 * really committed yet. The reason for that is that GetSnapshotData()
+	 * really committed yet. The reason for that is that GetMVCCSnapshotData()
 	 * omits the xid of the current transaction, thus without the increment we
 	 * otherwise could end up reusing the snapshot later. Which would be bad,
 	 * because it might not count the prepared transaction as running.
@@ -2083,7 +2084,7 @@ GetMaxSnapshotSubxidCount(void)
 }
 
 /*
- * Helper function for GetSnapshotData() that checks if the bulk of the
+ * Helper function for GetMVCCSnapshotData() that checks if the bulk of the
  * visibility information in the snapshot is still valid. If so, it updates
  * the fields that need to change and returns true. Otherwise it returns
  * false.
@@ -2092,7 +2093,7 @@ GetMaxSnapshotSubxidCount(void)
  * least in the case we already hold a snapshot), but that's for another day.
  */
 static bool
-GetSnapshotDataReuse(MVCCSnapshot snapshot)
+GetMVCCSnapshotDataReuse(MVCCSnapshotShared snapshot)
 {
 	uint64		curXactCompletionCount;
 
@@ -2112,17 +2113,18 @@ GetSnapshotDataReuse(MVCCSnapshot snapshot)
 	 * contents:
 	 *
 	 * As explained in transam/README, the set of xids considered running by
-	 * GetSnapshotData() cannot change while ProcArrayLock is held. Snapshot
-	 * contents only depend on transactions with xids and xactCompletionCount
-	 * is incremented whenever a transaction with an xid finishes (while
-	 * holding ProcArrayLock exclusively). Thus the xactCompletionCount check
-	 * ensures we would detect if the snapshot would have changed.
+	 * GetMVCCSnapshotData() cannot change while ProcArrayLock is held.
+	 * Snapshot contents only depend on transactions with xids and
+	 * xactCompletionCount is incremented whenever a transaction with an xid
+	 * finishes (while holding ProcArrayLock exclusively). Thus the
+	 * xactCompletionCount check ensures we would detect if the snapshot would
+	 * have changed.
 	 *
 	 * As the snapshot contents are the same as it was before, it is safe to
 	 * re-enter the snapshot's xmin into the PGPROC array. None of the rows
 	 * visible under the snapshot could already have been removed (that'd
 	 * require the set of running transactions to change) and it fulfills the
-	 * requirement that concurrent GetSnapshotData() calls yield the same
+	 * requirement that concurrent GetMVCCSnapshotData() calls yield the same
 	 * xmin.
 	 */
 	if (!TransactionIdIsValid(MyProc->xmin))
@@ -2131,17 +2133,11 @@ GetSnapshotDataReuse(MVCCSnapshot snapshot)
 	RecentXmin = snapshot->xmin;
 	Assert(TransactionIdPrecedesOrEquals(TransactionXmin, RecentXmin));
 
-	snapshot->curcid = GetCurrentCommandId(false);
-	snapshot->active_count = 0;
-	snapshot->regd_count = 0;
-	snapshot->copied = false;
-	snapshot->valid = true;
-
 	return true;
 }
 
 /*
- * GetSnapshotData -- returns information about running transactions.
+ * GetMVCCSnapshotData -- returns information about running transactions.
  *
  * The returned snapshot includes xmin (lowest still-running xact ID),
  * xmax (highest completed xact ID + 1), and a list of running xact IDs
@@ -2168,12 +2164,9 @@ GetSnapshotDataReuse(MVCCSnapshot snapshot)
  *
  * And try to advance the bounds of GlobalVis{Shared,Catalog,Data,Temp}Rels
  * for the benefit of the GlobalVisTest* family of functions.
- *
- * Note: this function should probably not be called with an argument that's
- * not statically allocated (see xip allocation below).
  */
-MVCCSnapshot
-GetSnapshotData(MVCCSnapshot snapshot)
+MVCCSnapshotShared
+GetMVCCSnapshotData(void)
 {
 	ProcArrayStruct *arrayP = procArray;
 	TransactionId *other_xids = ProcGlobal->xids;
@@ -2187,43 +2180,34 @@ GetSnapshotData(MVCCSnapshot snapshot)
 	int			mypgxactoff;
 	TransactionId myxid;
 	uint64		curXactCompletionCount;
+	MVCCSnapshotShared snapshot;
 
 	TransactionId replication_slot_xmin = InvalidTransactionId;
 	TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
 
-	Assert(snapshot != NULL);
-
-	/*
-	 * Allocating space for maxProcs xids is usually overkill; numProcs would
-	 * be sufficient.  But it seems better to do the malloc while not holding
-	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
-	 * more subxip storage than is probably needed.
+	/*---
+	 * Allocate an MVCCSnapshotShared struct.  There are three cases:
+	 *
+	 * 1. No transactions have completed since the last call: we can reuse the
+	 *    latest snapshot information.  See GetMVCCSnapshotDataReuse().
+	 *
+	 * 2. Need to recalculate the snapshot, and 'latestSnapshotShared' is not
+	 *    currently in use by any snapshot.  We can overwrite its contents.
+	 *
+	 * 3. Need to recalculate the XID list and 'latestSnapshotShared' is still
+	 *    in use.  We need to allocate a new MVCCSnapshotShared struct.
 	 *
-	 * This does open a possibility for avoiding repeated malloc/free: since
-	 * maxProcs does not change at runtime, we can simply reuse the previous
-	 * xip arrays if any.  (This relies on the fact that all callers pass
-	 * static SnapshotData structs.)
+	 * We don't know if 'latestSnapshotShared' can be reused before we acquire
+	 * the lock, but if we do need to allocate, we want to do it before
+	 * acquiring the lock.  Therefore, we always make the allocation if we
+	 * might need it and if it turns out to have been unnecessary, we stash
+	 * away the allocated struct in 'spareSnapshotShared' to be reused on next
+	 * call.  This way, the unnecessary allocation is very cheap.
 	 */
-	if (snapshot->xip == NULL)
-	{
-		/*
-		 * First call for this snapshot. Snapshot is same size whether or not
-		 * we are in recovery, see later comments.
-		 */
-		snapshot->xip = (TransactionId *)
-			malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
-		if (snapshot->xip == NULL)
-			ereport(ERROR,
-					(errcode(ERRCODE_OUT_OF_MEMORY),
-					 errmsg("out of memory")));
-		Assert(snapshot->subxip == NULL);
-		snapshot->subxip = (TransactionId *)
-			malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
-		if (snapshot->subxip == NULL)
-			ereport(ERROR,
-					(errcode(ERRCODE_OUT_OF_MEMORY),
-					 errmsg("out of memory")));
-	}
+	if (latestSnapshotShared && latestSnapshotShared->refcount == 0)
+		snapshot = latestSnapshotShared;	/* case 1 or 2 */
+	else
+		snapshot = AllocMVCCSnapshotShared();	/* case 1 or 3 */
 
 	/*
 	 * It is sufficient to get shared lock on ProcArrayLock, even if we are
@@ -2231,10 +2215,14 @@ GetSnapshotData(MVCCSnapshot snapshot)
 	 */
 	LWLockAcquire(ProcArrayLock, LW_SHARED);
 
-	if (GetSnapshotDataReuse(snapshot))
+	if (latestSnapshotShared && GetMVCCSnapshotDataReuse(latestSnapshotShared))
 	{
 		LWLockRelease(ProcArrayLock);
-		return snapshot;
+
+		/* if we made an allocation, stash it away for next call */
+		if (snapshot != latestSnapshotShared)
+			spareSnapshotShared = snapshot;
+		return latestSnapshotShared;
 	}
 
 	latest_completed = TransamVariables->latestCompletedXid;
@@ -2506,16 +2494,18 @@ GetSnapshotData(MVCCSnapshot snapshot)
 	snapshot->suboverflowed = suboverflowed;
 	snapshot->snapXactCompletionCount = curXactCompletionCount;
 
-	snapshot->curcid = GetCurrentCommandId(false);
-
 	/*
-	 * This is a new snapshot, so set both refcounts are zero, and mark it as
-	 * not copied in persistent memory.
+	 * If we allocated a new struct for this, remember that it is the latest
+	 * now and adjust the refcounts accordingly.
 	 */
-	snapshot->active_count = 0;
-	snapshot->regd_count = 0;
-	snapshot->copied = false;
-	snapshot->valid = true;
+	if (snapshot != latestSnapshotShared)
+	{
+		Assert(snapshot->refcount == 0);
+
+		if (latestSnapshotShared && latestSnapshotShared->refcount == 0)
+			FreeMVCCSnapshotShared(latestSnapshotShared);
+		latestSnapshotShared = snapshot;
+	}
 
 	return snapshot;
 }
@@ -2585,10 +2575,10 @@ ProcArrayInstallImportedXmin(TransactionId xmin,
 			continue;
 
 		/*
-		 * We're good.  Install the new xmin.  As in GetSnapshotData, set
+		 * We're good.  Install the new xmin.  As in GetMVCCSnapshotData, set
 		 * TransactionXmin too.  (Note that because snapmgr.c called
-		 * GetSnapshotData first, we'll be overwriting a valid xmin here, so
-		 * we don't check that.)
+		 * GetMVCCSnapshotData first, we'll be overwriting a valid xmin here,
+		 * so we don't check that.)
 		 */
 		MyProc->xmin = TransactionXmin = xmin;
 
@@ -2659,7 +2649,7 @@ ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
 /*
  * GetRunningTransactionData -- returns information about running transactions.
  *
- * Similar to GetSnapshotData but returns more information. We include
+ * Similar to GetMVCCSnapshotData but returns more information. We include
  * all PGPROCs with an assigned TransactionId, even VACUUM processes and
  * prepared transactions.
  *
@@ -2681,7 +2671,7 @@ ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
  * entries here to not hold on ProcArrayLock more than necessary.
  *
  * We don't worry about updating other counters, we want to keep this as
- * simple as possible and leave GetSnapshotData() as the primary code for
+ * simple as possible and leave GetMVCCSnapshotData() as the primary code for
  * that bookkeeping.
  *
  * Note that if any transaction has overflowed its cached subtransactions
@@ -2866,8 +2856,8 @@ GetRunningTransactionData(void)
 /*
  * GetOldestActiveTransactionId()
  *
- * Similar to GetSnapshotData but returns just oldestActiveXid. We include
- * all PGPROCs with an assigned TransactionId, even VACUUM processes.
+ * Similar to GetMVCCSnapshotData but returns just oldestActiveXid. We
+ * include all PGPROCs with an assigned TransactionId, even VACUUM processes.
  * We look at all databases, though there is no need to include WALSender
  * since this has no effect on hot standby conflicts.
  *
@@ -2875,7 +2865,7 @@ GetRunningTransactionData(void)
  * KnownAssignedXids.
  *
  * We don't worry about updating other counters, we want to keep this as
- * simple as possible and leave GetSnapshotData() as the primary code for
+ * simple as possible and leave GetMVCCSnapshotData() as the primary code for
  * that bookkeeping.
  */
 TransactionId
@@ -4356,7 +4346,7 @@ FullXidRelativeTo(FullTransactionId rel, TransactionId xid)
  * During hot standby we do not fret too much about the distinction between
  * top-level XIDs and subtransaction XIDs. We store both together in the
  * KnownAssignedXids list.  In backends, this is copied into snapshots in
- * GetSnapshotData(), taking advantage of the fact that XidInMVCCSnapshot()
+ * GetMVCCSnapshotData(), taking advantage of the fact that XidInMVCCSnapshot()
  * doesn't care about the distinction either.  Subtransaction XIDs are
  * effectively treated as top-level XIDs and in the typical case pg_subtrans
  * links are *not* maintained (which does not affect visibility).
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index dd52782ff22..edc6b9de7ca 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -449,10 +449,10 @@ static void SerialSetActiveSerXmin(TransactionId xid);
 
 static uint32 predicatelock_hash(const void *key, Size keysize);
 static void SummarizeOldestCommittedSxact(void);
-static MVCCSnapshot GetSafeSnapshot(MVCCSnapshot origSnapshot);
-static MVCCSnapshot GetSerializableTransactionSnapshotInt(MVCCSnapshot snapshot,
-														  VirtualTransactionId *sourcevxid,
-														  int sourcepid);
+static MVCCSnapshotShared GetSafeSnapshot(void);
+static MVCCSnapshotShared GetSerializableTransactionSnapshotInt(VirtualTransactionId *sourcevxid,
+																TransactionId sourcexmin,
+																int sourcepid);
 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
 									  PREDICATELOCKTARGETTAG *parent);
@@ -1542,25 +1542,20 @@ SummarizeOldestCommittedSxact(void)
  *
  *		As with GetSerializableTransactionSnapshot (which this is a subroutine
  *		for), the passed-in Snapshot pointer should reference a static data
- *		area that can safely be passed to GetSnapshotData.
+ *		area that can safely be passed to GetMVCCSnapshotData.
  */
-static MVCCSnapshot
-GetSafeSnapshot(MVCCSnapshot origSnapshot)
+static MVCCSnapshotShared
+GetSafeSnapshot(void)
 {
-	MVCCSnapshot snapshot;
+	MVCCSnapshotShared snapshot;
 
 	Assert(XactReadOnly && XactDeferrable);
 
 	while (true)
 	{
-		/*
-		 * GetSerializableTransactionSnapshotInt is going to call
-		 * GetSnapshotData, so we need to provide it the static snapshot area
-		 * our caller passed to us.  The pointer returned is actually the same
-		 * one passed to it, but we avoid assuming that here.
-		 */
-		snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
-														 NULL, InvalidPid);
+		snapshot = GetSerializableTransactionSnapshotInt(NULL,
+														 InvalidTransactionId,
+														 InvalidPid);
 
 		if (MySerializableXact == InvalidSerializableXact)
 			return snapshot;	/* no concurrent r/w xacts; it's safe */
@@ -1663,13 +1658,11 @@ GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size)
  * Make sure we have a SERIALIZABLEXACT reference in MySerializableXact.
  * It should be current for this process and be contained in PredXact.
  *
- * The passed-in Snapshot pointer should reference a static data area that
- * can safely be passed to GetSnapshotData.  The return value is actually
- * always this same pointer; no new snapshot data structure is allocated
- * within this function.
+ * This calls GetMVCCSnapshotData to do the heavy lifting, but also sets up
+ * shared memory data structures specific to serializable transactions.
  */
-MVCCSnapshot
-GetSerializableTransactionSnapshot(MVCCSnapshot snapshot)
+MVCCSnapshotShared
+GetSerializableTransactionSnapshotData(void)
 {
 	Assert(IsolationIsSerializable());
 
@@ -1692,26 +1685,25 @@ GetSerializableTransactionSnapshot(MVCCSnapshot snapshot)
 	 * thereby avoid all SSI overhead once it's running.
 	 */
 	if (XactReadOnly && XactDeferrable)
-		return GetSafeSnapshot(snapshot);
+		return GetSafeSnapshot();
 
-	return GetSerializableTransactionSnapshotInt(snapshot,
-												 NULL, InvalidPid);
+	return GetSerializableTransactionSnapshotInt(NULL, InvalidTransactionId, InvalidPid);
 }
 
 /*
  * Import a snapshot to be used for the current transaction.
  *
- * This is nearly the same as GetSerializableTransactionSnapshot, except that
- * we don't take a new snapshot, but rather use the data we're handed.
+ * This is nearly the same as GetSerializableTransactionSnapshotData, except
+ * that we don't take a new snapshot, but rather use the data we're handed.
  *
  * The caller must have verified that the snapshot came from a serializable
  * transaction; and if we're read-write, the source transaction must not be
  * read-only.
  */
 void
-SetSerializableTransactionSnapshot(MVCCSnapshot snapshot,
-								   VirtualTransactionId *sourcevxid,
-								   int sourcepid)
+SetSerializableTransactionSnapshotData(MVCCSnapshotShared snapshot,
+									   VirtualTransactionId *sourcevxid,
+									   int sourcepid)
 {
 	Assert(IsolationIsSerializable());
 
@@ -1737,28 +1729,29 @@ SetSerializableTransactionSnapshot(MVCCSnapshot snapshot,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
 
-	(void) GetSerializableTransactionSnapshotInt(snapshot, sourcevxid,
-												 sourcepid);
+	(void) GetSerializableTransactionSnapshotInt(sourcevxid, snapshot->xmin, sourcepid);
 }
 
 /*
  * Guts of GetSerializableTransactionSnapshot
  *
  * If sourcevxid is valid, this is actually an import operation and we should
- * skip calling GetSnapshotData, because the snapshot contents are already
+ * skip calling GetMVCCSnapshotData, because the snapshot contents are already
  * loaded up.  HOWEVER: to avoid race conditions, we must check that the
  * source xact is still running after we acquire SerializableXactHashLock.
  * We do that by calling ProcArrayInstallImportedXmin.
  */
-static MVCCSnapshot
-GetSerializableTransactionSnapshotInt(MVCCSnapshot snapshot,
-									  VirtualTransactionId *sourcevxid,
+static MVCCSnapshotShared
+GetSerializableTransactionSnapshotInt(VirtualTransactionId *sourcevxid,
+									  TransactionId sourcexmin,
 									  int sourcepid)
 {
 	PGPROC	   *proc;
 	VirtualTransactionId vxid;
 	SERIALIZABLEXACT *sxact,
 			   *othersxact;
+	MVCCSnapshotShared snapshot;
+	TransactionId xmin;
 
 	/* We only do this for serializable transactions.  Once. */
 	Assert(MySerializableXact == InvalidSerializableXact);
@@ -1783,7 +1776,7 @@ GetSerializableTransactionSnapshotInt(MVCCSnapshot snapshot,
 	 *
 	 * We must hold SerializableXactHashLock when taking/checking the snapshot
 	 * to avoid race conditions, for much the same reasons that
-	 * GetSnapshotData takes the ProcArrayLock.  Since we might have to
+	 * GetMVCCSnapshotData takes the ProcArrayLock.  Since we might have to
 	 * release SerializableXactHashLock to call SummarizeOldestCommittedSxact,
 	 * this means we have to create the sxact first, which is a bit annoying
 	 * (in particular, an elog(ERROR) in procarray.c would cause us to leak
@@ -1807,16 +1800,24 @@ GetSerializableTransactionSnapshotInt(MVCCSnapshot snapshot,
 
 	/* Get the snapshot, or check that it's safe to use */
 	if (!sourcevxid)
-		snapshot = GetSnapshotData(snapshot);
-	else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcevxid))
 	{
-		ReleasePredXact(sxact);
-		LWLockRelease(SerializableXactHashLock);
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("could not import the requested snapshot"),
-				 errdetail("The source process with PID %d is not running anymore.",
-						   sourcepid)));
+		snapshot = GetMVCCSnapshotData();
+		xmin = snapshot->xmin;
+	}
+	else
+	{
+		if (!ProcArrayInstallImportedXmin(sourcexmin, sourcevxid))
+		{
+			ReleasePredXact(sxact);
+			LWLockRelease(SerializableXactHashLock);
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("could not import the requested snapshot"),
+					 errdetail("The source process with PID %d is not running anymore.",
+							   sourcepid)));
+		}
+		snapshot = NULL;
+		xmin = sourcexmin;
 	}
 
 	/*
@@ -1848,7 +1849,7 @@ GetSerializableTransactionSnapshotInt(MVCCSnapshot snapshot,
 	dlist_init(&(sxact->possibleUnsafeConflicts));
 	sxact->topXid = GetTopTransactionIdIfAny();
 	sxact->finishedBefore = InvalidTransactionId;
-	sxact->xmin = snapshot->xmin;
+	sxact->xmin = xmin;
 	sxact->pid = MyProcPid;
 	sxact->pgprocno = MyProcNumber;
 	dlist_init(&sxact->predicateLocks);
@@ -1902,18 +1903,18 @@ GetSerializableTransactionSnapshotInt(MVCCSnapshot snapshot,
 	if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
 	{
 		Assert(PredXact->SxactGlobalXminCount == 0);
-		PredXact->SxactGlobalXmin = snapshot->xmin;
+		PredXact->SxactGlobalXmin = xmin;
 		PredXact->SxactGlobalXminCount = 1;
-		SerialSetActiveSerXmin(snapshot->xmin);
+		SerialSetActiveSerXmin(xmin);
 	}
-	else if (TransactionIdEquals(snapshot->xmin, PredXact->SxactGlobalXmin))
+	else if (TransactionIdEquals(xmin, PredXact->SxactGlobalXmin))
 	{
 		Assert(PredXact->SxactGlobalXminCount > 0);
 		PredXact->SxactGlobalXminCount++;
 	}
 	else
 	{
-		Assert(TransactionIdFollows(snapshot->xmin, PredXact->SxactGlobalXmin));
+		Assert(TransactionIdFollows(xmin, PredXact->SxactGlobalXmin));
 	}
 
 	MySerializableXact = sxact;
@@ -3968,13 +3969,13 @@ XidIsConcurrent(TransactionId xid)
 
 	snap = (MVCCSnapshot) GetTransactionSnapshot();
 
-	if (TransactionIdPrecedes(xid, snap->xmin))
+	if (TransactionIdPrecedes(xid, snap->shared->xmin))
 		return false;
 
-	if (TransactionIdFollowsOrEquals(xid, snap->xmax))
+	if (TransactionIdFollowsOrEquals(xid, snap->shared->xmax))
 		return true;
 
-	return pg_lfind32(xid, snap->xip, snap->xcnt);
+	return pg_lfind32(xid, snap->shared->xip, snap->shared->xcnt);
 }
 
 bool
diff --git a/src/backend/utils/adt/xid8funcs.c b/src/backend/utils/adt/xid8funcs.c
index d4aa8ef9e4e..eef632390cb 100644
--- a/src/backend/utils/adt/xid8funcs.c
+++ b/src/backend/utils/adt/xid8funcs.c
@@ -380,7 +380,7 @@ pg_current_snapshot(PG_FUNCTION_ARGS)
 		elog(ERROR, "no active snapshot set");
 
 	/* allocate */
-	nxip = cur->xcnt;
+	nxip = cur->shared->xcnt;
 	snap = palloc(PG_SNAPSHOT_SIZE(nxip));
 
 	/*
@@ -389,12 +389,12 @@ pg_current_snapshot(PG_FUNCTION_ARGS)
 	 * advance past any of these XIDs.  Hence, these XIDs remain allowable
 	 * relative to next_fxid.
 	 */
-	snap->xmin = FullTransactionIdFromAllowableAt(next_fxid, cur->xmin);
-	snap->xmax = FullTransactionIdFromAllowableAt(next_fxid, cur->xmax);
+	snap->xmin = FullTransactionIdFromAllowableAt(next_fxid, cur->shared->xmin);
+	snap->xmax = FullTransactionIdFromAllowableAt(next_fxid, cur->shared->xmax);
 	snap->nxip = nxip;
 	for (i = 0; i < nxip; i++)
 		snap->xip[i] =
-			FullTransactionIdFromAllowableAt(next_fxid, cur->xip[i]);
+			FullTransactionIdFromAllowableAt(next_fxid, cur->shared->xip[i]);
 
 	/*
 	 * We want them guaranteed to be in ascending order.  This also removes
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 1c39cc11609..5f9f2b9d8b2 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -122,9 +122,6 @@
  * special-purpose code (say, RI checking.)  CatalogSnapshot points to an
  * MVCC snapshot intended to be used for catalog scans; we must invalidate it
  * whenever a system catalog change occurs.
- *
- * These SnapshotData structs are static to simplify memory allocation
- * (see the hack in GetSnapshotData to avoid repeated malloc/free).
  */
 static MVCCSnapshotData CurrentSnapshotData = {SNAPSHOT_MVCC};
 static MVCCSnapshotData SecondarySnapshotData = {SNAPSHOT_MVCC};
@@ -137,7 +134,7 @@ SnapshotData SnapshotToastData = {SNAPSHOT_TOAST};
 static HistoricMVCCSnapshot HistoricSnapshot = NULL;
 
 /*
- * These are updated by GetSnapshotData.  We initialize them this way
+ * These are updated by GetMVCCSnapshotData.  We initialize them this way
  * for the convenience of TransactionIdIsInProgress: even in bootstrap
  * mode, we don't want it to say that BootstrapTransactionId is in progress.
  */
@@ -150,14 +147,12 @@ static HTAB *tuplecid_data = NULL;
 /*
  * Elements of the active snapshot stack.
  *
- * Each element here accounts for exactly one active_count on SnapshotData.
- *
  * NB: the code assumes that elements in this list are in non-increasing
  * order of as_level; also, the list must be NULL-terminated.
  */
 typedef struct ActiveSnapshotElt
 {
-	MVCCSnapshot as_snap;
+	MVCCSnapshotData as_snap;
 	int			as_level;
 	struct ActiveSnapshotElt *as_next;
 } ActiveSnapshotElt;
@@ -188,19 +183,23 @@ static bool FirstXactSnapshotRegistered = false;
 typedef struct ExportedSnapshot
 {
 	char	   *snapfile;
-	MVCCSnapshot snapshot;
+	MVCCSnapshotShared snapshot;
 } ExportedSnapshot;
 
 /* Current xact's exported snapshots (a list of ExportedSnapshot structs) */
 static List *exportedSnapshots = NIL;
 
+MVCCSnapshotShared latestSnapshotShared = NULL;
+MVCCSnapshotShared spareSnapshotShared = NULL;
+
 /* Prototypes for local functions */
-static MVCCSnapshot CopyMVCCSnapshot(MVCCSnapshot snapshot);
+static void UpdateStaticMVCCSnapshot(MVCCSnapshot snapshot, MVCCSnapshotShared shared);
 static void UnregisterSnapshotNoOwner(Snapshot snapshot);
-static void FreeMVCCSnapshot(MVCCSnapshot snapshot);
 static void SnapshotResetXmin(void);
-static void valid_snapshots_push_tail(MVCCSnapshot snapshot);
-static void valid_snapshots_push_out_of_order(MVCCSnapshot snapshot);
+static void ReleaseMVCCSnapshotShared(MVCCSnapshotShared shared);
+static void valid_snapshots_push_tail(MVCCSnapshotShared snapshot);
+static void valid_snapshots_push_out_of_order(MVCCSnapshotShared snapshot);
+
 
 /* ResourceOwner callbacks to track snapshot references */
 static void ResOwnerReleaseSnapshot(Datum res);
@@ -266,6 +265,8 @@ GetTransactionSnapshot(void)
 	/* First call in transaction? */
 	if (!FirstSnapshotSet)
 	{
+		MVCCSnapshotShared shared;
+
 		/*
 		 * Don't allow catalog snapshot to be older than xact snapshot.  Must
 		 * do this first to allow the empty-heap Assert to succeed.
@@ -287,23 +288,18 @@ GetTransactionSnapshot(void)
 		 * mode, predicate.c needs to wrap the snapshot fetch in its own
 		 * processing.
 		 */
+		if (IsolationIsSerializable())
+			shared = GetSerializableTransactionSnapshotData();
+		else
+			shared = GetMVCCSnapshotData();
+
+		UpdateStaticMVCCSnapshot(&CurrentSnapshotData, shared);
+
 		if (IsolationUsesXactSnapshot())
 		{
-			/* First, create the snapshot in CurrentSnapshotData */
-			if (IsolationIsSerializable())
-				GetSerializableTransactionSnapshot(&CurrentSnapshotData);
-			else
-				GetSnapshotData(&CurrentSnapshotData);
-
-			/* Mark it as "registered" */
+			/* keep it */
 			FirstXactSnapshotRegistered = true;
 		}
-		else
-		{
-			GetSnapshotData(&CurrentSnapshotData);
-		}
-		valid_snapshots_push_tail(&CurrentSnapshotData);
-
 		FirstSnapshotSet = true;
 		return (Snapshot) &CurrentSnapshotData;
 	}
@@ -318,14 +314,31 @@ GetTransactionSnapshot(void)
 	/* Don't allow catalog snapshot to be older than xact snapshot. */
 	InvalidateCatalogSnapshot();
 
-	if (CurrentSnapshotData.valid)
-		dlist_delete(&CurrentSnapshotData.node);
-	GetSnapshotData(&CurrentSnapshotData);
-	valid_snapshots_push_tail(&CurrentSnapshotData);
-
+	UpdateStaticMVCCSnapshot(&CurrentSnapshotData, GetMVCCSnapshotData());
 	return (Snapshot) &CurrentSnapshotData;
 }
 
+/*
+ * Update a static snapshot with the given shared struct.
+ *
+ * If the static snapshot is previously valid, release its old 'shared'
+ * struct first.
+ */
+static void
+UpdateStaticMVCCSnapshot(MVCCSnapshot snapshot, MVCCSnapshotShared shared)
+{
+	/* Replace the 'shared' struct */
+	if (snapshot->shared)
+		ReleaseMVCCSnapshotShared(snapshot->shared);
+	snapshot->shared = shared;
+	snapshot->shared->refcount++;
+	if (snapshot->shared->refcount == 1)
+		valid_snapshots_push_tail(shared);
+
+	snapshot->curcid = GetCurrentCommandId(false);
+	snapshot->valid = true;
+}
+
 /*
  * GetLatestSnapshot
  *		Get a snapshot that is up-to-date as of the current instant,
@@ -352,10 +365,7 @@ GetLatestSnapshot(void)
 	if (!FirstSnapshotSet)
 		return GetTransactionSnapshot();
 
-	if (SecondarySnapshotData.valid)
-		dlist_delete(&SecondarySnapshotData.node);
-	GetSnapshotData(&SecondarySnapshotData);
-	valid_snapshots_push_tail(&SecondarySnapshotData);
+	UpdateStaticMVCCSnapshot(&SecondarySnapshotData, GetMVCCSnapshotData());
 
 	return (Snapshot) &SecondarySnapshotData;
 }
@@ -405,7 +415,7 @@ GetNonHistoricCatalogSnapshot(Oid relid)
 	if (!CatalogSnapshotData.valid)
 	{
 		/* Get new snapshot. */
-		GetSnapshotData(&CatalogSnapshotData);
+		UpdateStaticMVCCSnapshot(&CatalogSnapshotData, GetMVCCSnapshotData());
 
 		/*
 		 * Make sure the catalog snapshot will be accounted for in decisions
@@ -419,7 +429,6 @@ GetNonHistoricCatalogSnapshot(Oid relid)
 		 * NB: it had better be impossible for this to throw error, since the
 		 * CatalogSnapshot pointer is already valid.
 		 */
-		valid_snapshots_push_tail(&CatalogSnapshotData);
 	}
 
 	return (Snapshot) &CatalogSnapshotData;
@@ -440,17 +449,20 @@ InvalidateCatalogSnapshot(void)
 {
 	if (CatalogSnapshotData.valid)
 	{
-		dlist_delete(&CatalogSnapshotData.node);
+		ReleaseMVCCSnapshotShared(CatalogSnapshotData.shared);
+		CatalogSnapshotData.shared = NULL;
 		CatalogSnapshotData.valid = false;
 	}
 	if (!FirstXactSnapshotRegistered && CurrentSnapshotData.valid)
 	{
-		dlist_delete(&CurrentSnapshotData.node);
+		ReleaseMVCCSnapshotShared(CurrentSnapshotData.shared);
+		CurrentSnapshotData.shared = NULL;
 		CurrentSnapshotData.valid = false;
 	}
 	if (SecondarySnapshotData.valid)
 	{
-		dlist_delete(&SecondarySnapshotData.node);
+		ReleaseMVCCSnapshotShared(SecondarySnapshotData.shared);
+		SecondarySnapshotData.shared = NULL;
 		SecondarySnapshotData.valid = false;
 	}
 
@@ -465,13 +477,14 @@ InvalidateCatalogSnapshot(void)
  * want to continue holding the catalog snapshot if it might mean that the
  * global xmin horizon can't advance.  However, if there are other snapshots
  * still active or registered, the catalog snapshot isn't likely to be the
- * oldest one, so we might as well keep it.
+ * oldest one, so we might as well keep it. XXX
  */
 void
 InvalidateCatalogSnapshotConditionally(void)
 {
 	if (CatalogSnapshotData.valid &&
-		dlist_head_node(&ValidSnapshots) == &CatalogSnapshotData.node)
+		dlist_tail_node(&ValidSnapshots) == &CatalogSnapshotData.shared->node &&
+		CatalogSnapshotData.shared->refcount == 1)
 		InvalidateCatalogSnapshot();
 }
 
@@ -501,7 +514,7 @@ SnapshotSetCommandId(CommandId curcid)
  * in GetTransactionSnapshot.
  */
 static void
-SetTransactionSnapshot(MVCCSnapshot sourcesnap, VirtualTransactionId *sourcevxid,
+SetTransactionSnapshot(MVCCSnapshotShared sourcesnap, VirtualTransactionId *sourcevxid,
 					   int sourcepid, PGPROC *sourceproc)
 {
 	/* Caller should have checked this already */
@@ -512,38 +525,25 @@ SetTransactionSnapshot(MVCCSnapshot sourcesnap, VirtualTransactionId *sourcevxid
 
 	Assert(!FirstXactSnapshotRegistered);
 	Assert(!HistoricSnapshotActive());
+	Assert(sourcesnap->refcount > 0);
 
 	/*
 	 * Even though we are not going to use the snapshot it computes, we must
-	 * call GetSnapshotData, for two reasons: (1) to be sure that
-	 * CurrentSnapshotData's XID arrays have been allocated, and (2) to update
-	 * the state for GlobalVis*.
+	 * call GetMVCCSnapshotData to update the state for GlobalVis*.
 	 */
-	GetSnapshotData(&CurrentSnapshotData);
+	UpdateStaticMVCCSnapshot(&CurrentSnapshotData, GetMVCCSnapshotData());
 
 	/*
 	 * Now copy appropriate fields from the source snapshot.
 	 */
-	CurrentSnapshotData.xmin = sourcesnap->xmin;
-	CurrentSnapshotData.xmax = sourcesnap->xmax;
-	CurrentSnapshotData.xcnt = sourcesnap->xcnt;
-	Assert(sourcesnap->xcnt <= GetMaxSnapshotXidCount());
-	if (sourcesnap->xcnt > 0)
-		memcpy(CurrentSnapshotData.xip, sourcesnap->xip,
-			   sourcesnap->xcnt * sizeof(TransactionId));
-	CurrentSnapshotData.subxcnt = sourcesnap->subxcnt;
-	Assert(sourcesnap->subxcnt <= GetMaxSnapshotSubxidCount());
-	if (sourcesnap->subxcnt > 0)
-		memcpy(CurrentSnapshotData.subxip, sourcesnap->subxip,
-			   sourcesnap->subxcnt * sizeof(TransactionId));
-	CurrentSnapshotData.suboverflowed = sourcesnap->suboverflowed;
-	CurrentSnapshotData.takenDuringRecovery = sourcesnap->takenDuringRecovery;
-	/* NB: curcid should NOT be copied, it's a local matter */
+	ReleaseMVCCSnapshotShared(CurrentSnapshotData.shared);
+	CurrentSnapshotData.shared = sourcesnap;
+	CurrentSnapshotData.shared->refcount++;
 
-	CurrentSnapshotData.snapXactCompletionCount = 0;
+	/* NB: curcid should NOT be copied, it's a local matter */
 
 	/*
-	 * Now we have to fix what GetSnapshotData did with MyProc->xmin and
+	 * Now we have to fix what GetMVCCSnapshotData did with MyProc->xmin and
 	 * TransactionXmin.  There is a race condition: to make sure we are not
 	 * causing the global xmin to go backwards, we have to test that the
 	 * source transaction is still running, and that has to be done
@@ -555,13 +555,13 @@ SetTransactionSnapshot(MVCCSnapshot sourcesnap, VirtualTransactionId *sourcevxid
 	 */
 	if (sourceproc != NULL)
 	{
-		if (!ProcArrayInstallRestoredXmin(CurrentSnapshotData.xmin, sourceproc))
+		if (!ProcArrayInstallRestoredXmin(CurrentSnapshotData.shared->xmin, sourceproc))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 errmsg("could not import the requested snapshot"),
 					 errdetail("The source transaction is not running anymore.")));
 	}
-	else if (!ProcArrayInstallImportedXmin(CurrentSnapshotData.xmin, sourcevxid))
+	else if (!ProcArrayInstallImportedXmin(CurrentSnapshotData.shared->xmin, sourcevxid))
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("could not import the requested snapshot"),
@@ -577,96 +577,22 @@ SetTransactionSnapshot(MVCCSnapshot sourcesnap, VirtualTransactionId *sourcevxid
 	if (IsolationUsesXactSnapshot())
 	{
 		if (IsolationIsSerializable())
-			SetSerializableTransactionSnapshot(&CurrentSnapshotData, sourcevxid,
-											   sourcepid);
-		/* Mark it as "registered" */
+			SetSerializableTransactionSnapshotData(CurrentSnapshotData.shared,
+												   sourcevxid, sourcepid);
+		/* keep it */
 		FirstXactSnapshotRegistered = true;
 	}
-	valid_snapshots_push_tail(&CurrentSnapshotData);
 
 	FirstSnapshotSet = true;
 }
 
-/*
- * CopyMVCCSnapshot
- *		Copy the given snapshot.
- *
- * The copy is palloc'd in TopTransactionContext and has initial refcounts set
- * to 0.  The returned snapshot has the copied flag set.
- */
-static MVCCSnapshot
-CopyMVCCSnapshot(MVCCSnapshot snapshot)
-{
-	MVCCSnapshot newsnap;
-	Size		subxipoff;
-	Size		size;
-
-	/* We allocate any XID arrays needed in the same palloc block. */
-	size = subxipoff = sizeof(MVCCSnapshotData) +
-		snapshot->xcnt * sizeof(TransactionId);
-	if (snapshot->subxcnt > 0)
-		size += snapshot->subxcnt * sizeof(TransactionId);
-
-	newsnap = (MVCCSnapshot) MemoryContextAlloc(TopTransactionContext, size);
-	memcpy(newsnap, snapshot, sizeof(MVCCSnapshotData));
-
-	newsnap->regd_count = 0;
-	newsnap->active_count = 0;
-	newsnap->copied = true;
-	newsnap->valid = true;
-	newsnap->snapXactCompletionCount = 0;
-
-	/* setup XID array */
-	if (snapshot->xcnt > 0)
-	{
-		newsnap->xip = (TransactionId *) (newsnap + 1);
-		memcpy(newsnap->xip, snapshot->xip,
-			   snapshot->xcnt * sizeof(TransactionId));
-	}
-	else
-		newsnap->xip = NULL;
-
-	/*
-	 * Setup subXID array. Don't bother to copy it if it had overflowed,
-	 * though, because it's not used anywhere in that case. Except if it's a
-	 * snapshot taken during recovery; all the top-level XIDs are in subxip as
-	 * well in that case, so we mustn't lose them.
-	 */
-	if (snapshot->subxcnt > 0 &&
-		(!snapshot->suboverflowed || snapshot->takenDuringRecovery))
-	{
-		newsnap->subxip = (TransactionId *) ((char *) newsnap + subxipoff);
-		memcpy(newsnap->subxip, snapshot->subxip,
-			   snapshot->subxcnt * sizeof(TransactionId));
-	}
-	else
-		newsnap->subxip = NULL;
-
-	return newsnap;
-}
-
-/*
- * FreeMVCCSnapshot
- *		Free the memory associated with a snapshot.
- */
-static void
-FreeMVCCSnapshot(MVCCSnapshot snapshot)
-{
-	Assert(snapshot->regd_count == 0);
-	Assert(snapshot->active_count == 0);
-	Assert(snapshot->copied);
-	Assert(snapshot->valid);
-
-	pfree(snapshot);
-}
-
 /*
  * PushActiveSnapshot
  *		Set the given snapshot as the current active snapshot
  *
  * If the passed snapshot is a statically-allocated one, or it is possibly
  * subject to a future command counter update, create a new long-lived copy
- * with active refcount=1.  Otherwise, only increment the refcount.
+ * with active refcount=1.  Otherwise, only increment the refcount. XXX
  *
  * Only regular MVCC snaphots can be used as the active snapshot.
  */
@@ -697,24 +623,13 @@ PushActiveSnapshotWithLevel(Snapshot snapshot, int snap_level)
 	Assert(ActiveSnapshot == NULL || snap_level >= ActiveSnapshot->as_level);
 
 	newactive = MemoryContextAlloc(TopTransactionContext, sizeof(ActiveSnapshotElt));
-
-	/*
-	 * Checking SecondarySnapshot is probably useless here, but it seems
-	 * better to be sure.
-	 */
-	if (!origsnap->copied)
-	{
-		newactive->as_snap = CopyMVCCSnapshot(origsnap);
-		dlist_insert_after(&origsnap->node, &newactive->as_snap->node);
-	}
-	else
-		newactive->as_snap = origsnap;
+	memcpy(&newactive->as_snap, origsnap, sizeof(MVCCSnapshotData));
+	newactive->as_snap.kind = SNAPSHOT_ACTIVE;
+	newactive->as_snap.shared->refcount++;
 
 	newactive->as_next = ActiveSnapshot;
 	newactive->as_level = snap_level;
 
-	newactive->as_snap->active_count++;
-
 	ActiveSnapshot = newactive;
 }
 
@@ -729,20 +644,20 @@ PushActiveSnapshotWithLevel(Snapshot snapshot, int snap_level)
 void
 PushCopiedSnapshot(Snapshot snapshot)
 {
-	MVCCSnapshot copy;
-
 	Assert(snapshot->snapshot_type == SNAPSHOT_MVCC);
 
-	copy = CopyMVCCSnapshot(&snapshot->mvcc);
-	dlist_insert_after(&snapshot->mvcc.node, &copy->node);
-	PushActiveSnapshot((Snapshot) copy);
+	/*
+	 * This used to be different from PushActiveSnapshot, but these days
+	 * PushActiveSnapshot creates a copy too and there's no difference.
+	 */
+	PushActiveSnapshot(snapshot);
 }
 
 /*
  * UpdateActiveSnapshotCommandId
  *
  * Update the current CID of the active snapshot.  This can only be applied
- * to a snapshot that is not referenced elsewhere.
+ * to a snapshot that is not referenced elsewhere. XXX
  */
 void
 UpdateActiveSnapshotCommandId(void)
@@ -751,8 +666,6 @@ UpdateActiveSnapshotCommandId(void)
 				curcid;
 
 	Assert(ActiveSnapshot != NULL);
-	Assert(ActiveSnapshot->as_snap->active_count == 1);
-	Assert(ActiveSnapshot->as_snap->regd_count == 0);
 
 	/*
 	 * Don't allow modification of the active snapshot during parallel
@@ -762,11 +675,12 @@ UpdateActiveSnapshotCommandId(void)
 	 * CommandCounterIncrement, but there are a few places that call this
 	 * directly, so we put an additional guard here.
 	 */
-	save_curcid = ActiveSnapshot->as_snap->curcid;
+	save_curcid = ActiveSnapshot->as_snap.curcid;
 	curcid = GetCurrentCommandId(false);
 	if (IsInParallelMode() && save_curcid != curcid)
 		elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation");
-	ActiveSnapshot->as_snap->curcid = curcid;
+
+	ActiveSnapshot->as_snap.curcid = curcid;
 }
 
 /*
@@ -782,16 +696,7 @@ PopActiveSnapshot(void)
 
 	newstack = ActiveSnapshot->as_next;
 
-	Assert(ActiveSnapshot->as_snap->active_count > 0);
-
-	ActiveSnapshot->as_snap->active_count--;
-
-	if (ActiveSnapshot->as_snap->active_count == 0 &&
-		ActiveSnapshot->as_snap->regd_count == 0)
-	{
-		dlist_delete(&ActiveSnapshot->as_snap->node);
-		FreeMVCCSnapshot(ActiveSnapshot->as_snap);
-	}
+	ReleaseMVCCSnapshotShared(ActiveSnapshot->as_snap.shared);
 
 	pfree(ActiveSnapshot);
 	ActiveSnapshot = newstack;
@@ -808,7 +713,7 @@ GetActiveSnapshot(void)
 {
 	Assert(ActiveSnapshot != NULL);
 
-	return (Snapshot) ActiveSnapshot->as_snap;
+	return (Snapshot) &ActiveSnapshot->as_snap;
 }
 
 /*
@@ -844,7 +749,7 @@ RegisterSnapshot(Snapshot snapshot)
 Snapshot
 RegisterSnapshotOnOwner(Snapshot orig_snapshot, ResourceOwner owner)
 {
-	MVCCSnapshot snapshot;
+	MVCCSnapshot newsnap;
 
 	if (orig_snapshot == InvalidSnapshot)
 		return InvalidSnapshot;
@@ -861,22 +766,19 @@ RegisterSnapshotOnOwner(Snapshot orig_snapshot, ResourceOwner owner)
 	}
 
 	Assert(orig_snapshot->snapshot_type == SNAPSHOT_MVCC);
-	snapshot = &orig_snapshot->mvcc;
-	Assert(snapshot->valid);
+	Assert(orig_snapshot->mvcc.valid);
 
-	/* Static snapshot?  Create a persistent copy */
-	if (!snapshot->copied)
-	{
-		snapshot = CopyMVCCSnapshot(snapshot);
-		dlist_insert_after(&orig_snapshot->mvcc.node, &snapshot->node);
-	}
+	/* Create a copy */
+	newsnap = MemoryContextAlloc(TopTransactionContext, sizeof(MVCCSnapshotData));
+	memcpy(newsnap, &orig_snapshot->mvcc, sizeof(MVCCSnapshotData));
+	newsnap->kind = SNAPSHOT_REGISTERED;
+	newsnap->shared->refcount++;
 
 	/* and tell resowner.c about it */
 	ResourceOwnerEnlarge(owner);
-	snapshot->regd_count++;
-	ResourceOwnerRememberSnapshot(owner, (Snapshot) snapshot);
+	ResourceOwnerRememberSnapshot(owner, (Snapshot) newsnap);
 
-	return (Snapshot) snapshot;
+	return (Snapshot) newsnap;
 }
 
 /*
@@ -914,18 +816,12 @@ UnregisterSnapshotNoOwner(Snapshot snapshot)
 {
 	if (snapshot->snapshot_type == SNAPSHOT_MVCC)
 	{
-		MVCCSnapshot mvccsnap = &snapshot->mvcc;
-
-		Assert(mvccsnap->regd_count > 0);
+		Assert(snapshot->mvcc.kind == SNAPSHOT_REGISTERED);
 		Assert(!dlist_is_empty(&ValidSnapshots));
 
-		mvccsnap->regd_count--;
-		if (mvccsnap->regd_count == 0 && mvccsnap->active_count == 0)
-		{
-			dlist_delete(&mvccsnap->node);
-			FreeMVCCSnapshot(mvccsnap);
-			SnapshotResetXmin();
-		}
+		ReleaseMVCCSnapshotShared(snapshot->mvcc.shared);
+		pfree(snapshot);
+		SnapshotResetXmin();
 	}
 	else if (snapshot->snapshot_type == SNAPSHOT_HISTORIC_MVCC)
 	{
@@ -963,19 +859,21 @@ UnregisterSnapshotNoOwner(Snapshot snapshot)
 static void
 SnapshotResetXmin(void)
 {
-	MVCCSnapshot minSnapshot;
+	MVCCSnapshotShared minSnapshot;
 
 	/*
 	 * Invalidate these static snapshots so that we can advance xmin.
 	 */
 	if (!FirstXactSnapshotRegistered && CurrentSnapshotData.valid)
 	{
-		dlist_delete(&CurrentSnapshotData.node);
+		ReleaseMVCCSnapshotShared(CurrentSnapshotData.shared);
+		CurrentSnapshotData.shared = NULL;
 		CurrentSnapshotData.valid = false;
 	}
 	if (SecondarySnapshotData.valid)
 	{
-		dlist_delete(&SecondarySnapshotData.node);
+		ReleaseMVCCSnapshotShared(SecondarySnapshotData.shared);
+		SecondarySnapshotData.shared = NULL;
 		SecondarySnapshotData.valid = false;
 	}
 
@@ -988,7 +886,7 @@ SnapshotResetXmin(void)
 		return;
 	}
 
-	minSnapshot = dlist_head_element(MVCCSnapshotData, node, &ValidSnapshots);
+	minSnapshot = dlist_head_element(MVCCSnapshotSharedData, node, &ValidSnapshots);
 
 	if (TransactionIdPrecedes(MyProc->xmin, minSnapshot->xmin))
 		MyProc->xmin = TransactionXmin = minSnapshot->xmin;
@@ -1028,21 +926,7 @@ AtSubAbort_Snapshot(int level)
 
 		next = ActiveSnapshot->as_next;
 
-		/*
-		 * Decrement the snapshot's active count.  If it's still registered or
-		 * marked as active by an outer subtransaction, we can't free it yet.
-		 */
-		Assert(ActiveSnapshot->as_snap->active_count >= 1);
-		ActiveSnapshot->as_snap->active_count -= 1;
-
-		if (ActiveSnapshot->as_snap->active_count == 0 &&
-			ActiveSnapshot->as_snap->regd_count == 0)
-		{
-			dlist_delete(&ActiveSnapshot->as_snap->node);
-			FreeMVCCSnapshot(ActiveSnapshot->as_snap);
-		}
-
-		/* and free the stack element */
+		ReleaseMVCCSnapshotShared(ActiveSnapshot->as_snap.shared);
 		pfree(ActiveSnapshot);
 
 		ActiveSnapshot = next;
@@ -1058,6 +942,8 @@ AtSubAbort_Snapshot(int level)
 void
 AtEOXact_Snapshot(bool isCommit, bool resetXmin)
 {
+	dlist_mutable_iter iter;
+
 	/*
 	 * If we exported any snapshots, clean them up.
 	 */
@@ -1084,7 +970,7 @@ AtEOXact_Snapshot(bool isCommit, bool resetXmin)
 				elog(WARNING, "could not unlink file \"%s\": %m",
 					 esnap->snapfile);
 
-			dlist_delete(&esnap->snapshot->node);
+			ReleaseMVCCSnapshotShared(esnap->snapshot);
 		}
 
 		exportedSnapshots = NIL;
@@ -1093,17 +979,20 @@ AtEOXact_Snapshot(bool isCommit, bool resetXmin)
 	/* Drop all static snapshot */
 	if (CatalogSnapshotData.valid)
 	{
-		dlist_delete(&CatalogSnapshotData.node);
+		ReleaseMVCCSnapshotShared(CatalogSnapshotData.shared);
+		CatalogSnapshotData.shared = NULL;
 		CatalogSnapshotData.valid = false;
 	}
 	if (CurrentSnapshotData.valid)
 	{
-		dlist_delete(&CurrentSnapshotData.node);
+		ReleaseMVCCSnapshotShared(CurrentSnapshotData.shared);
+		CurrentSnapshotData.shared = NULL;
 		CurrentSnapshotData.valid = false;
 	}
 	if (SecondarySnapshotData.valid)
 	{
-		dlist_delete(&SecondarySnapshotData.node);
+		ReleaseMVCCSnapshotShared(SecondarySnapshotData.shared);
+		SecondarySnapshotData.shared = NULL;
 		SecondarySnapshotData.valid = false;
 	}
 
@@ -1124,11 +1013,23 @@ AtEOXact_Snapshot(bool isCommit, bool resetXmin)
 	 * And reset our state.  We don't need to free the memory explicitly --
 	 * it'll go away with TopTransactionContext.
 	 */
-	ActiveSnapshot = NULL;
-	dlist_init(&ValidSnapshots);
+	dlist_foreach_modify(iter, &ValidSnapshots)
+	{
+		MVCCSnapshotShared cur = dlist_container(MVCCSnapshotSharedData, node, iter.cur);
 
-	CurrentSnapshotData.valid = false;
-	SecondarySnapshotData.valid = false;
+		dlist_delete(iter.cur);
+		cur->refcount = 0;
+		if (cur == latestSnapshotShared)
+		{
+			/* keep it */
+		}
+		else if (spareSnapshotShared == NULL)
+			spareSnapshotShared = cur;
+		else
+			pfree(cur);
+	}
+
+	ActiveSnapshot = NULL;
 	FirstSnapshotSet = false;
 	FirstXactSnapshotRegistered = false;
 
@@ -1151,9 +1052,8 @@ AtEOXact_Snapshot(bool isCommit, bool resetXmin)
  *		snapshot.
  */
 char *
-ExportSnapshot(MVCCSnapshot snapshot)
+ExportSnapshot(MVCCSnapshotShared snapshot)
 {
-	MVCCSnapshot orig_snapshot;
 	TransactionId topXid;
 	TransactionId *children;
 	ExportedSnapshot *esnap;
@@ -1214,21 +1114,16 @@ ExportSnapshot(MVCCSnapshot snapshot)
 	 * Copy the snapshot into TopTransactionContext, add it to the
 	 * exportedSnapshots list, and mark it pseudo-registered.  We do this to
 	 * ensure that the snapshot's xmin is honored for the rest of the
-	 * transaction.
+	 * transaction. XXX
 	 */
-	orig_snapshot = snapshot;
-	snapshot = CopyMVCCSnapshot(orig_snapshot);
-
 	oldcxt = MemoryContextSwitchTo(TopTransactionContext);
 	esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot));
 	esnap->snapfile = pstrdup(path);
 	esnap->snapshot = snapshot;
+	snapshot->refcount++;
 	exportedSnapshots = lappend(exportedSnapshots, esnap);
 	MemoryContextSwitchTo(oldcxt);
 
-	snapshot->regd_count++;
-	dlist_insert_after(&orig_snapshot->node, &snapshot->node);
-
 	/*
 	 * Fill buf with a text serialization of the snapshot, plus identification
 	 * data about this transaction.  The format expected by ImportSnapshot is
@@ -1248,8 +1143,8 @@ ExportSnapshot(MVCCSnapshot snapshot)
 	/*
 	 * We must include our own top transaction ID in the top-xid data, since
 	 * by definition we will still be running when the importing transaction
-	 * adopts the snapshot, but GetSnapshotData never includes our own XID in
-	 * the snapshot.  (There must, therefore, be enough room to add it.)
+	 * adopts the snapshot, but GetMVCCSnapshotData never includes our own XID
+	 * in the snapshot.  (There must, therefore, be enough room to add it.)
 	 *
 	 * However, it could be that our topXid is after the xmax, in which case
 	 * we shouldn't include it because xip[] members are expected to be before
@@ -1334,7 +1229,7 @@ pg_export_snapshot(PG_FUNCTION_ARGS)
 {
 	char	   *snapshotName;
 
-	snapshotName = ExportSnapshot((MVCCSnapshot) GetActiveSnapshot());
+	snapshotName = ExportSnapshot(((MVCCSnapshot) GetActiveSnapshot())->shared);
 	PG_RETURN_TEXT_P(cstring_to_text(snapshotName));
 }
 
@@ -1438,7 +1333,7 @@ ImportSnapshot(const char *idstr)
 	Oid			src_dbid;
 	int			src_isolevel;
 	bool		src_readonly;
-	MVCCSnapshotData snapshot;
+	MVCCSnapshotShared snapshot;
 
 	/*
 	 * Must be at top level of a fresh transaction.  Note in particular that
@@ -1508,8 +1403,6 @@ ImportSnapshot(const char *idstr)
 	/*
 	 * Construct a snapshot struct by parsing the file content.
 	 */
-	memset(&snapshot, 0, sizeof(snapshot));
-
 	parseVxidFromText("vxid:", &filebuf, path, &src_vxid);
 	src_pid = parseIntFromText("pid:", &filebuf, path);
 	/* we abuse parseXidFromText a bit here ... */
@@ -1517,12 +1410,11 @@ ImportSnapshot(const char *idstr)
 	src_isolevel = parseIntFromText("iso:", &filebuf, path);
 	src_readonly = parseIntFromText("ro:", &filebuf, path);
 
-	snapshot.snapshot_type = SNAPSHOT_MVCC;
-
-	snapshot.xmin = parseXidFromText("xmin:", &filebuf, path);
-	snapshot.xmax = parseXidFromText("xmax:", &filebuf, path);
+	snapshot = AllocMVCCSnapshotShared();
+	snapshot->xmin = parseXidFromText("xmin:", &filebuf, path);
+	snapshot->xmax = parseXidFromText("xmax:", &filebuf, path);
 
-	snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path);
+	snapshot->xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path);
 
 	/* sanity-check the xid count before palloc */
 	if (xcnt < 0 || xcnt > GetMaxSnapshotXidCount())
@@ -1530,15 +1422,15 @@ ImportSnapshot(const char *idstr)
 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
 				 errmsg("invalid snapshot data in file \"%s\"", path)));
 
-	snapshot.xip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
+	snapshot->xip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
 	for (i = 0; i < xcnt; i++)
-		snapshot.xip[i] = parseXidFromText("xip:", &filebuf, path);
+		snapshot->xip[i] = parseXidFromText("xip:", &filebuf, path);
 
-	snapshot.suboverflowed = parseIntFromText("sof:", &filebuf, path);
+	snapshot->suboverflowed = parseIntFromText("sof:", &filebuf, path);
 
-	if (!snapshot.suboverflowed)
+	if (!snapshot->suboverflowed)
 	{
-		snapshot.subxcnt = xcnt = parseIntFromText("sxcnt:", &filebuf, path);
+		snapshot->subxcnt = xcnt = parseIntFromText("sxcnt:", &filebuf, path);
 
 		/* sanity-check the xid count before palloc */
 		if (xcnt < 0 || xcnt > GetMaxSnapshotSubxidCount())
@@ -1546,17 +1438,19 @@ ImportSnapshot(const char *idstr)
 					(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
 					 errmsg("invalid snapshot data in file \"%s\"", path)));
 
-		snapshot.subxip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
+		snapshot->subxip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
 		for (i = 0; i < xcnt; i++)
-			snapshot.subxip[i] = parseXidFromText("sxp:", &filebuf, path);
+			snapshot->subxip[i] = parseXidFromText("sxp:", &filebuf, path);
 	}
 	else
 	{
-		snapshot.subxcnt = 0;
-		snapshot.subxip = NULL;
+		snapshot->subxcnt = 0;
 	}
 
-	snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path);
+	snapshot->takenDuringRecovery = parseIntFromText("rec:", &filebuf, path);
+
+	snapshot->refcount = 1;
+	valid_snapshots_push_out_of_order(snapshot);
 
 	/*
 	 * Do some additional sanity checking, just to protect ourselves.  We
@@ -1565,8 +1459,8 @@ ImportSnapshot(const char *idstr)
 	 */
 	if (!VirtualTransactionIdIsValid(src_vxid) ||
 		!OidIsValid(src_dbid) ||
-		!TransactionIdIsNormal(snapshot.xmin) ||
-		!TransactionIdIsNormal(snapshot.xmax))
+		!TransactionIdIsNormal(snapshot->xmin) ||
+		!TransactionIdIsNormal(snapshot->xmax))
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
 				 errmsg("invalid snapshot data in file \"%s\"", path)));
@@ -1604,7 +1498,7 @@ ImportSnapshot(const char *idstr)
 				 errmsg("cannot import a snapshot from a different database")));
 
 	/* OK, install the snapshot */
-	SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL);
+	SetTransactionSnapshot(snapshot, &src_vxid, src_pid, NULL);
 }
 
 /*
@@ -1670,18 +1564,21 @@ ThereAreNoPriorRegisteredSnapshots(void)
 
 	dlist_foreach(iter, &ValidSnapshots)
 	{
-		MVCCSnapshot cur = dlist_container(MVCCSnapshotData, node, iter.cur);
+		MVCCSnapshotShared cur =
+			dlist_container(MVCCSnapshotSharedData, node, iter.cur);
+		uint32		allowedcount = 0;
 
 		if (FirstXactSnapshotRegistered)
 		{
 			Assert(CurrentSnapshotData.valid);
-			if (cur != &CurrentSnapshotData)
-				continue;
+			if (cur == CurrentSnapshotData.shared)
+				allowedcount++;
 		}
-		if (ActiveSnapshot && cur == ActiveSnapshot->as_snap)
-			continue;
+		if (ActiveSnapshot && cur == ActiveSnapshot->as_snap.shared)
+			allowedcount++;
 
-		return false;
+		if (cur->refcount != allowedcount)
+			return false;
 	}
 
 	return true;
@@ -1707,8 +1604,9 @@ HaveRegisteredOrActiveSnapshot(void)
 	 * registered more than one snapshot has to be in ValidSnapshots.
 	 */
 	if (CatalogSnapshotData.valid &&
-		dlist_head_node(&ValidSnapshots) == &CatalogSnapshotData.node &&
-		dlist_tail_node(&ValidSnapshots) == &CatalogSnapshotData.node)
+		CatalogSnapshotData.shared->refcount == 1 &&
+		dlist_head_node(&ValidSnapshots) == &CatalogSnapshotData.shared->node &&
+		dlist_tail_node(&ValidSnapshots) == &CatalogSnapshotData.shared->node)
 	{
 		return false;
 	}
@@ -1775,11 +1673,11 @@ EstimateSnapshotSpace(MVCCSnapshot snapshot)
 
 	/* We allocate any XID arrays needed in the same palloc block. */
 	size = add_size(sizeof(SerializedSnapshotData),
-					mul_size(snapshot->xcnt, sizeof(TransactionId)));
-	if (snapshot->subxcnt > 0 &&
-		(!snapshot->suboverflowed || snapshot->takenDuringRecovery))
+					mul_size(snapshot->shared->xcnt, sizeof(TransactionId)));
+	if (snapshot->shared->subxcnt > 0 &&
+		(!snapshot->shared->suboverflowed || snapshot->shared->takenDuringRecovery))
 		size = add_size(size,
-						mul_size(snapshot->subxcnt, sizeof(TransactionId)));
+						mul_size(snapshot->shared->subxcnt, sizeof(TransactionId)));
 
 	return size;
 }
@@ -1794,15 +1692,15 @@ SerializeSnapshot(MVCCSnapshot snapshot, char *start_address)
 {
 	SerializedSnapshotData serialized_snapshot;
 
-	Assert(snapshot->subxcnt >= 0);
+	Assert(snapshot->shared->subxcnt >= 0);
 
 	/* Copy all required fields */
-	serialized_snapshot.xmin = snapshot->xmin;
-	serialized_snapshot.xmax = snapshot->xmax;
-	serialized_snapshot.xcnt = snapshot->xcnt;
-	serialized_snapshot.subxcnt = snapshot->subxcnt;
-	serialized_snapshot.suboverflowed = snapshot->suboverflowed;
-	serialized_snapshot.takenDuringRecovery = snapshot->takenDuringRecovery;
+	serialized_snapshot.xmin = snapshot->shared->xmin;
+	serialized_snapshot.xmax = snapshot->shared->xmax;
+	serialized_snapshot.xcnt = snapshot->shared->xcnt;
+	serialized_snapshot.subxcnt = snapshot->shared->subxcnt;
+	serialized_snapshot.suboverflowed = snapshot->shared->suboverflowed;
+	serialized_snapshot.takenDuringRecovery = snapshot->shared->takenDuringRecovery;
 	serialized_snapshot.curcid = snapshot->curcid;
 
 	/*
@@ -1810,7 +1708,7 @@ SerializeSnapshot(MVCCSnapshot snapshot, char *start_address)
 	 * taken during recovery - in that case, top-level XIDs are in subxip as
 	 * well, and we mustn't lose them.
 	 */
-	if (serialized_snapshot.suboverflowed && !snapshot->takenDuringRecovery)
+	if (serialized_snapshot.suboverflowed && !snapshot->shared->takenDuringRecovery)
 		serialized_snapshot.subxcnt = 0;
 
 	/* Copy struct to possibly-unaligned buffer */
@@ -1818,10 +1716,10 @@ SerializeSnapshot(MVCCSnapshot snapshot, char *start_address)
 		   &serialized_snapshot, sizeof(SerializedSnapshotData));
 
 	/* Copy XID array */
-	if (snapshot->xcnt > 0)
+	if (snapshot->shared->xcnt > 0)
 		memcpy((TransactionId *) (start_address +
 								  sizeof(SerializedSnapshotData)),
-			   snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
+			   snapshot->shared->xip, snapshot->shared->xcnt * sizeof(TransactionId));
 
 	/*
 	 * Copy SubXID array. Don't bother to copy it if it had overflowed,
@@ -1832,10 +1730,10 @@ SerializeSnapshot(MVCCSnapshot snapshot, char *start_address)
 	if (serialized_snapshot.subxcnt > 0)
 	{
 		Size		subxipoff = sizeof(SerializedSnapshotData) +
-			snapshot->xcnt * sizeof(TransactionId);
+			snapshot->shared->xcnt * sizeof(TransactionId);
 
 		memcpy((TransactionId *) (start_address + subxipoff),
-			   snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId));
+			   snapshot->shared->subxip, snapshot->shared->subxcnt * sizeof(TransactionId));
 	}
 }
 
@@ -1863,49 +1761,46 @@ RestoreSnapshot(char *start_address)
 	size = sizeof(MVCCSnapshotData)
 		+ serialized_snapshot.xcnt * sizeof(TransactionId)
 		+ serialized_snapshot.subxcnt * sizeof(TransactionId);
+	Assert(serialized_snapshot.xcnt <= GetMaxSnapshotXidCount());
+	Assert(serialized_snapshot.subxcnt <= GetMaxSnapshotSubxidCount());
 
 	/* Copy all required fields */
 	snapshot = (MVCCSnapshot) MemoryContextAlloc(TopTransactionContext, size);
 	snapshot->snapshot_type = SNAPSHOT_MVCC;
-	snapshot->xmin = serialized_snapshot.xmin;
-	snapshot->xmax = serialized_snapshot.xmax;
-	snapshot->xip = NULL;
-	snapshot->xcnt = serialized_snapshot.xcnt;
-	snapshot->subxip = NULL;
-	snapshot->subxcnt = serialized_snapshot.subxcnt;
-	snapshot->suboverflowed = serialized_snapshot.suboverflowed;
-	snapshot->takenDuringRecovery = serialized_snapshot.takenDuringRecovery;
+	snapshot->kind = SNAPSHOT_REGISTERED;
+	snapshot->shared = AllocMVCCSnapshotShared();
+	snapshot->shared->xmin = serialized_snapshot.xmin;
+	snapshot->shared->xmax = serialized_snapshot.xmax;
+	snapshot->shared->xcnt = serialized_snapshot.xcnt;
+	snapshot->shared->subxcnt = serialized_snapshot.subxcnt;
+	snapshot->shared->suboverflowed = serialized_snapshot.suboverflowed;
+	snapshot->shared->takenDuringRecovery = serialized_snapshot.takenDuringRecovery;
+	snapshot->shared->snapXactCompletionCount = 0;
+
+	snapshot->shared->refcount = 1;
+	valid_snapshots_push_out_of_order(snapshot->shared);
+
 	snapshot->curcid = serialized_snapshot.curcid;
-	snapshot->snapXactCompletionCount = 0;
 
 	/* Copy XIDs, if present. */
 	if (serialized_snapshot.xcnt > 0)
 	{
-		snapshot->xip = (TransactionId *) (snapshot + 1);
-		memcpy(snapshot->xip, serialized_xids,
+		memcpy(snapshot->shared->xip, serialized_xids,
 			   serialized_snapshot.xcnt * sizeof(TransactionId));
 	}
 
 	/* Copy SubXIDs, if present. */
 	if (serialized_snapshot.subxcnt > 0)
 	{
-		snapshot->subxip = ((TransactionId *) (snapshot + 1)) +
-			serialized_snapshot.xcnt;
-		memcpy(snapshot->subxip, serialized_xids + serialized_snapshot.xcnt,
+		memcpy(snapshot->shared->subxip, serialized_xids + serialized_snapshot.xcnt,
 			   serialized_snapshot.subxcnt * sizeof(TransactionId));
 	}
 
-	/* Set the copied flag so that the caller will set refcounts correctly. */
-	snapshot->regd_count = 0;
-	snapshot->active_count = 0;
-	snapshot->copied = true;
 	snapshot->valid = true;
 
 	/* and tell resowner.c about it, just like RegisterSnapshot() */
 	ResourceOwnerEnlarge(CurrentResourceOwner);
-	snapshot->regd_count++;
 	ResourceOwnerRememberSnapshot(CurrentResourceOwner, (Snapshot) snapshot);
-	valid_snapshots_push_out_of_order(snapshot);
 
 	return snapshot;
 }
@@ -1919,21 +1814,21 @@ RestoreSnapshot(char *start_address)
 void
 RestoreTransactionSnapshot(MVCCSnapshot snapshot, void *source_pgproc)
 {
-	SetTransactionSnapshot(snapshot, NULL, InvalidPid, source_pgproc);
+	SetTransactionSnapshot(snapshot->shared, NULL, InvalidPid, source_pgproc);
 }
 
 /*
  * XidInMVCCSnapshot
  *		Is the given XID still-in-progress according to the snapshot?
  *
- * Note: GetSnapshotData never stores either top xid or subxids of our own
- * backend into a snapshot, so these xids will not be reported as "running"
- * by this function.  This is OK for current uses, because we always check
- * TransactionIdIsCurrentTransactionId first, except when it's known the
- * XID could not be ours anyway.
+ * Note: GetMVCCSnapshotData never stores either top xid or subxids of our own
+ * backend into a snapshot, so these xids will not be reported as "running" by
+ * this function.  This is OK for current uses, because we always check
+ * TransactionIdIsCurrentTransactionId first, except when it's known the XID
+ * could not be ours anyway.
  */
 bool
-XidInMVCCSnapshot(TransactionId xid, MVCCSnapshot snapshot)
+XidInMVCCSnapshot(TransactionId xid, MVCCSnapshotShared snapshot)
 {
 	/*
 	 * Make a quick range check to eliminate most XIDs without looking at the
@@ -2029,6 +1924,84 @@ XidInMVCCSnapshot(TransactionId xid, MVCCSnapshot snapshot)
 	return false;
 }
 
+/*
+ * Allocate an MVCCSnapshotShared struct
+ *
+ * The 'xip' and 'subxip' arrays are allocated so that they can hold the max
+ * number of XIDs. That's usually overkill, but it allows us to do the
+ * allocation while not holding ProcArrayLock.
+ *
+ * MVCCSnapshotShared structs are kept in TopMemoryContext and refcounted.
+ * The refcount is initially zero, the caller is expected to increment it.
+ */
+MVCCSnapshotShared
+AllocMVCCSnapshotShared(void)
+{
+	MemoryContext save_cxt;
+	MVCCSnapshotShared shared;
+	size_t		size;
+	char	   *p;
+
+	/*
+	 * To reduce alloc/free overhead in GetMVCCSnapshotData(), we have a
+	 * single-element pool.
+	 */
+	if (spareSnapshotShared)
+	{
+		shared = spareSnapshotShared;
+		spareSnapshotShared = NULL;
+		return shared;
+	}
+
+	save_cxt = MemoryContextSwitchTo(TopMemoryContext);
+
+	size = sizeof(MVCCSnapshotSharedData) +
+		GetMaxSnapshotXidCount() * sizeof(TransactionId) +
+		GetMaxSnapshotSubxidCount() * sizeof(TransactionId);
+	p = palloc(size);
+
+	shared = (MVCCSnapshotShared) p;
+	p += sizeof(MVCCSnapshotSharedData);
+	shared->xip = (TransactionId *) p;
+	p += GetMaxSnapshotXidCount() * sizeof(TransactionId);
+	shared->subxip = (TransactionId *) p;
+
+	shared->snapXactCompletionCount = 0;
+	shared->refcount = 0;
+
+	MemoryContextSwitchTo(save_cxt);
+
+	return shared;
+}
+
+/*
+ * Decrement the refcount on an MVCCSnapshotShared struct, freeing it if it
+ * reaches zero.
+ */
+static void
+ReleaseMVCCSnapshotShared(MVCCSnapshotShared shared)
+{
+	Assert(shared->refcount > 0);
+	shared->refcount--;
+
+	if (shared->refcount == 0)
+	{
+		dlist_delete(&shared->node);
+		if (shared != latestSnapshotShared)
+			FreeMVCCSnapshotShared(shared);
+	}
+}
+
+void
+FreeMVCCSnapshotShared(MVCCSnapshotShared shared)
+{
+	Assert(shared->refcount == 0);
+	if (spareSnapshotShared == NULL)
+		spareSnapshotShared = shared;
+	else
+		pfree(shared);
+}
+
 /* ResourceOwner callbacks */
 
 static void
@@ -2042,12 +2015,13 @@ ResOwnerReleaseSnapshot(Datum res)
 
 /* dlist_push_tail, with assertion that the list stays ordered by xmin */
 static void
-valid_snapshots_push_tail(MVCCSnapshot snapshot)
+valid_snapshots_push_tail(MVCCSnapshotShared snapshot)
 {
 #ifdef USE_ASSERT_CHECKING
 	if (!dlist_is_empty(&ValidSnapshots))
 	{
-		MVCCSnapshot tail = dlist_tail_element(MVCCSnapshotData, node, &ValidSnapshots);
+		MVCCSnapshotShared tail =
+			dlist_tail_element(MVCCSnapshotSharedData, node, &ValidSnapshots);
 
 		Assert(TransactionIdFollowsOrEquals(snapshot->xmin, tail->xmin));
 	}
@@ -2062,13 +2036,14 @@ valid_snapshots_push_tail(MVCCSnapshot snapshot)
  * the list is small.
  */
 static void
-valid_snapshots_push_out_of_order(MVCCSnapshot snapshot)
+valid_snapshots_push_out_of_order(MVCCSnapshotShared snapshot)
 {
 	dlist_iter	iter;
 
 	dlist_foreach(iter, &ValidSnapshots)
 	{
-		MVCCSnapshot cur = dlist_container(MVCCSnapshotData, node, iter.cur);
+		MVCCSnapshotShared cur =
+			dlist_container(MVCCSnapshotSharedData, node, iter.cur);
 
 		if (TransactionIdFollowsOrEquals(snapshot->xmin, cur->xmin))
 		{
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 7d82cd2eb56..e71c660118e 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -242,8 +242,8 @@ typedef struct TransamVariablesData
 	 * Number of top-level transactions with xids (i.e. which may have
 	 * modified the database) that completed in some form since the start of
 	 * the server. This currently is solely used to check whether
-	 * GetSnapshotData() needs to recompute the contents of the snapshot, or
-	 * not. There are likely other users of this.  Always above 1.
+	 * GetMVCCSnapshotData() needs to recompute the contents of the snapshot,
+	 * or not. There are likely other users of this.  Always above 1.
 	 */
 	uint64		xactCompletionCount;
 
diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h
index 6a78dfeac96..e68862576ee 100644
--- a/src/include/storage/predicate.h
+++ b/src/include/storage/predicate.h
@@ -47,10 +47,10 @@ extern void CheckPointPredicate(void);
 extern bool PageIsPredicateLocked(Relation relation, BlockNumber blkno);
 
 /* predicate lock maintenance */
-extern MVCCSnapshot GetSerializableTransactionSnapshot(MVCCSnapshot snapshot);
-extern void SetSerializableTransactionSnapshot(MVCCSnapshot snapshot,
-											   VirtualTransactionId *sourcevxid,
-											   int sourcepid);
+extern MVCCSnapshotShared GetSerializableTransactionSnapshotData(void);
+extern void SetSerializableTransactionSnapshotData(MVCCSnapshotShared snapshot,
+												   VirtualTransactionId *sourcevxid,
+												   int sourcepid);
 extern void RegisterPredicateLockingXid(TransactionId xid);
 extern void PredicateLockRelation(Relation relation, Snapshot snapshot);
 extern void PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot);
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index f51b03d3822..46b58a17489 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -324,7 +324,7 @@ extern PGDLLIMPORT PGPROC *MyProc;
  * Adding/Removing an entry into the procarray requires holding *both*
  * ProcArrayLock and XidGenLock in exclusive mode (in that order). Both are
  * needed because the dense arrays (see below) are accessed from
- * GetNewTransactionId() and GetSnapshotData(), and we don't want to add
+ * GetNewTransactionId() and GetMVCCSnapshotData(), and we don't want to add
  * further contention by both using the same lock. Adding/Removing a procarray
  * entry is much less frequent.
  *
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 7f5727c2586..8eedc2d6b9f 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -44,7 +44,7 @@ extern void KnownAssignedTransactionIdsIdleMaintenance(void);
 extern int	GetMaxSnapshotXidCount(void);
 extern int	GetMaxSnapshotSubxidCount(void);
 
-extern MVCCSnapshot GetSnapshotData(MVCCSnapshot snapshot);
+extern MVCCSnapshotShared GetMVCCSnapshotData(void);
 
 extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
 										 VirtualTransactionId *sourcevxid);
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index 1f627ff966d..36c6043740f 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -56,6 +56,13 @@ extern PGDLLIMPORT SnapshotData SnapshotToastData;
 	((snapshot)->snapshot_type == SNAPSHOT_MVCC || \
 	 (snapshot)->snapshot_type == SNAPSHOT_HISTORIC_MVCC)
 
+/* exported so that GetMVCCSnapshotData() can access these */
+extern MVCCSnapshotShared latestSnapshotShared;
+extern MVCCSnapshotShared spareSnapshotShared;
+
+extern MVCCSnapshotShared AllocMVCCSnapshotShared(void);
+extern void FreeMVCCSnapshotShared(MVCCSnapshotShared shared);
+
 extern Snapshot GetTransactionSnapshot(void);
 extern Snapshot GetLatestSnapshot(void);
 extern void SnapshotSetCommandId(CommandId curcid);
@@ -89,7 +96,7 @@ extern void WaitForOlderSnapshots(TransactionId limitXmin, bool progress);
 extern bool ThereAreNoPriorRegisteredSnapshots(void);
 extern bool HaveRegisteredOrActiveSnapshot(void);
 
-extern char *ExportSnapshot(MVCCSnapshot snapshot);
+extern char *ExportSnapshot(MVCCSnapshotShared snapshot);
 
 /*
  * These live in procarray.c because they're intimately linked to the
@@ -105,7 +112,7 @@ extern bool GlobalVisCheckRemovableFullXid(Relation rel, FullTransactionId fxid)
 /*
  * Utility functions for implementing visibility routines in table AMs.
  */
-extern bool XidInMVCCSnapshot(TransactionId xid, MVCCSnapshot snapshot);
+extern bool XidInMVCCSnapshot(TransactionId xid, MVCCSnapshotShared snapshot);
 
 /* Support for catalog timetravel for logical decoding */
 struct HTAB;
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 44b3b20f73c..193366ce052 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -119,17 +119,44 @@ typedef enum SnapshotType
 	SNAPSHOT_NON_VACUUMABLE,
 } SnapshotType;
 
+typedef struct MVCCSnapshotSharedData *MVCCSnapshotShared;
+
+typedef enum MVCCSnapshotKind
+{
+	SNAPSHOT_STATIC,
+	SNAPSHOT_ACTIVE,
+	SNAPSHOT_REGISTERED,
+} MVCCSnapshotKind;
+
 /*
  * Struct representing a normal MVCC snapshot.
  *
  * MVCC snapshots come in two variants: those taken during recovery in hot
  * standby mode, and "normal" MVCC snapshots.  They are distinguished by
- * takenDuringRecovery.
+ * shared->takenDuringRecovery.
  */
 typedef struct MVCCSnapshotData
 {
 	SnapshotType snapshot_type; /* type of snapshot, must be first */
 
+	/*
+	 * Most fields are in this separate struct which can be reused and shared
+	 * between snapshots that only differ in the command ID.  It is reference
+	 * counted separately.
+	 */
+	MVCCSnapshotShared shared;
+
+	CommandId	curcid;			/* in my xact, CID < curcid are visible */
+
+	/*
+	 * Book-keeping information, used by the snapshot manager
+	 */
+	MVCCSnapshotKind kind;
+	bool		valid;
+} MVCCSnapshotData;
+
+typedef struct MVCCSnapshotSharedData
+{
 	/*
 	 * An MVCC snapshot can never see the effects of XIDs >= xmax. It can see
 	 * the effects of all older XIDs except those listed in the snapshot. xmin
@@ -160,25 +187,17 @@ typedef struct MVCCSnapshotData
 	bool		suboverflowed;	/* has the subxip array overflowed? */
 
 	bool		takenDuringRecovery;	/* recovery-shaped snapshot? */
-	bool		copied;			/* false if it's a static snapshot */
-	bool		valid;			/* is this snapshot valid? */
-
-	CommandId	curcid;			/* in my xact, CID < curcid are visible */
-
-	/*
-	 * Book-keeping information, used by the snapshot manager
-	 */
-	uint32		active_count;	/* refcount on ActiveSnapshot stack */
-	uint32		regd_count;		/* refcount of registrations in resowners */
-	dlist_node	node;			/* link in ValidSnapshots */
 
 	/*
-	 * The transaction completion count at the time GetSnapshotData() built
-	 * this snapshot. Allows to avoid re-computing static snapshots when no
-	 * transactions completed since the last GetSnapshotData().
+	 * The transaction completion count at the time GetMVCCSnapshotData()
+	 * built this snapshot. Allows to avoid re-computing static snapshots when
+	 * no transactions completed since the last GetMVCCSnapshotData().
 	 */
 	uint64		snapXactCompletionCount;
-} MVCCSnapshotData;
+
+	uint32		refcount;
+	dlist_node	node;			/* link in ValidSnapshots */
+} MVCCSnapshotSharedData;
 
 typedef struct MVCCSnapshotData *MVCCSnapshot;
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index c8ed18cf580..990c83c902a 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1636,6 +1636,8 @@ MINIDUMP_TYPE
 MJEvalResult
 MTTargetRelLookup
 MVCCSnapshotData
+MVCCSnapshotKind
+MVCCSnapshotSharedData
 MVDependencies
 MVDependency
 MVNDistinct
-- 
2.39.5

