From ed6a28c6832c29b7a7831dc5a30366d2fb67f052 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Sat, 30 Mar 2024 01:38:01 -0400
Subject: [PATCH v10 10/10] Combine freezing and pruning

Execute both freezing and pruning of tuples and emit a single WAL record
containing all changes.
---
 src/backend/access/heap/heapam.c         |  76 +--
 src/backend/access/heap/heapam_handler.c |   2 +-
 src/backend/access/heap/pruneheap.c      | 712 ++++++++++++++++++-----
 src/backend/access/heap/vacuumlazy.c     | 352 ++---------
 src/include/access/heapam.h              |  75 +--
 src/tools/pgindent/typedefs.list         |   2 +-
 6 files changed, 680 insertions(+), 539 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index f8fddce03b..e07c959abe 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -6125,9 +6125,9 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
  */
 static TransactionId
 FreezeMultiXactId(MultiXactId multi, uint16 t_infomask,
-				  const struct VacuumCutoffs *cutoffs, uint16 *flags,
-				  HeapPageFreeze *pagefrz)
+				  uint16 *flags, HeapPageFreeze *pagefrz)
 {
+	const struct VacuumCutoffs *cutoffs = pagefrz->cutoffs;
 	TransactionId newxmax;
 	MultiXactMember *members;
 	int			nmembers;
@@ -6445,9 +6445,9 @@ FreezeMultiXactId(MultiXactId multi, uint16 t_infomask,
  * XIDs or MultiXactIds that will need to be processed by a future VACUUM.
  *
  * VACUUM caller must assemble HeapTupleFreeze freeze plan entries for every
- * tuple that we returned true for, and call heap_freeze_execute_prepared to
- * execute freezing.  Caller must initialize pagefrz fields for page as a
- * whole before first call here for each heap page.
+ * tuple that we returned true for, and then execute freezing.  Caller must
+ * initialize pagefrz fields for page as a whole before first call here for
+ * each heap page.
  *
  * VACUUM caller decides on whether or not to freeze the page as a whole.
  * We'll often prepare freeze plans for a page that caller just discards.
@@ -6550,8 +6550,7 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 		 * perform no-op xmax processing.  The only constraint is that the
 		 * FreezeLimit/MultiXactCutoff postcondition must never be violated.
 		 */
-		newxmax = FreezeMultiXactId(xid, tuple->t_infomask, cutoffs,
-									&flags, pagefrz);
+		newxmax = FreezeMultiXactId(xid, tuple->t_infomask, &flags, pagefrz);
 
 		if (flags & FRM_NOOP)
 		{
@@ -6729,7 +6728,7 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 		 * Does this tuple force caller to freeze the entire page?
 		 */
 		pagefrz->freeze_required =
-			heap_tuple_should_freeze(tuple, cutoffs,
+			heap_tuple_should_freeze(tuple, pagefrz->cutoffs,
 									 &pagefrz->NoFreezePageRelfrozenXid,
 									 &pagefrz->NoFreezePageRelminMxid);
 	}
@@ -6763,35 +6762,19 @@ heap_execute_freeze_tuple(HeapTupleHeader tuple, HeapTupleFreeze *frz)
 }
 
 /*
- * heap_freeze_execute_prepared
- *
- * Executes freezing of one or more heap tuples on a page on behalf of caller.
- * Caller passes an array of tuple plans from heap_prepare_freeze_tuple.
- * Caller must set 'offset' in each plan for us.  Note that we destructively
- * sort caller's tuples array in-place, so caller had better be done with it.
- *
- * WAL-logs the changes so that VACUUM can advance the rel's relfrozenxid
- * later on without any risk of unsafe pg_xact lookups, even following a hard
- * crash (or when querying from a standby).  We represent freezing by setting
- * infomask bits in tuple headers, but this shouldn't be thought of as a hint.
- * See section on buffer access rules in src/backend/storage/buffer/README.
+ * Perform xmin/xmax XID status sanity checks before actually executing freeze
+ * plans.
+ *
+ * heap_prepare_freeze_tuple doesn't perform these checks directly because
+ * pg_xact lookups are relatively expensive.  They shouldn't be repeated
+ * by successive VACUUMs that each decide against freezing the same page.
  */
 void
-heap_freeze_execute_prepared(Relation rel, Buffer buffer,
-							 TransactionId snapshotConflictHorizon,
-							 HeapTupleFreeze *tuples, int ntuples)
+heap_pre_freeze_checks(Buffer buffer,
+					   HeapTupleFreeze *tuples, int ntuples)
 {
 	Page		page = BufferGetPage(buffer);
 
-	Assert(ntuples > 0);
-
-	/*
-	 * Perform xmin/xmax XID status sanity checks before critical section.
-	 *
-	 * heap_prepare_freeze_tuple doesn't perform these checks directly because
-	 * pg_xact lookups are relatively expensive.  They shouldn't be repeated
-	 * by successive VACUUMs that each decide against freezing the same page.
-	 */
 	for (int i = 0; i < ntuples; i++)
 	{
 		HeapTupleFreeze *frz = tuples + i;
@@ -6830,8 +6813,19 @@ heap_freeze_execute_prepared(Relation rel, Buffer buffer,
 										 xmax)));
 		}
 	}
+}
 
-	START_CRIT_SECTION();
+/*
+ * Helper which executes freezing of one or more heap tuples on a page on
+ * behalf of caller. Caller passes an array of tuple plans from
+ * heap_prepare_freeze_tuple. Caller must set 'offset' in each plan for us.
+ * Must be called in a critical section that also marks the buffer dirty and,
+ * if needed, emits WAL.
+ */
+void
+heap_freeze_prepared_tuples(Buffer buffer, HeapTupleFreeze *tuples, int ntuples)
+{
+	Page		page = BufferGetPage(buffer);
 
 	for (int i = 0; i < ntuples; i++)
 	{
@@ -6842,22 +6836,6 @@ heap_freeze_execute_prepared(Relation rel, Buffer buffer,
 		htup = (HeapTupleHeader) PageGetItem(page, itemid);
 		heap_execute_freeze_tuple(htup, frz);
 	}
-
-	MarkBufferDirty(buffer);
-
-	/* Now WAL-log freezing if necessary */
-	if (RelationNeedsWAL(rel))
-	{
-		log_heap_prune_and_freeze(rel, buffer, snapshotConflictHorizon,
-								  false,	/* no cleanup lock required */
-								  PRUNE_VACUUM_SCAN,
-								  tuples, ntuples,
-								  NULL, 0,	/* redirected */
-								  NULL, 0,	/* dead */
-								  NULL, 0); /* unused */
-	}
-
-	END_CRIT_SECTION();
 }
 
 /*
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 6abfe36dec..a793c0f56e 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -1106,7 +1106,7 @@ heapam_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
 		 * We ignore unused and redirect line pointers.  DEAD line pointers
 		 * should be counted as dead, because we need vacuum to run to get rid
 		 * of them.  Note that this rule agrees with the way that
-		 * heap_page_prune() counts things.
+		 * heap_page_prune_and_freeze() counts things.
 		 */
 		if (!ItemIdIsNormal(itemid))
 		{
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 7f55e9c839..4059e6d0c2 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -21,13 +21,15 @@
 #include "access/transam.h"
 #include "access/xlog.h"
 #include "access/xloginsert.h"
+#include "commands/vacuum.h"
+#include "executor/instrument.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 
-/* Working data for heap_page_prune and subroutines */
+/* Working data for heap_page_prune_and_freeze() and subroutines */
 typedef struct
 {
 	/* PRUNE_DO_* arguments */
@@ -36,38 +38,56 @@ typedef struct
 	/* tuple visibility test, initialized for the relation */
 	GlobalVisState *vistest;
 
-	TransactionId new_prune_xid;	/* new prune hint value for page */
-	TransactionId snapshotConflictHorizon;	/* latest xid removed */
+	/*
+	 * Fields describing what to do to the page
+	 */
+	TransactionId new_prune_xid;	/* new prune hint value */
+	TransactionId latest_xid_removed;
 	int			nredirected;	/* numbers of entries in arrays below */
 	int			ndead;
 	int			nunused;
+	int			nfrozen;
 	/* arrays that accumulate indexes of items to be changed */
 	OffsetNumber redirected[MaxHeapTuplesPerPage * 2];
 	OffsetNumber nowdead[MaxHeapTuplesPerPage];
 	OffsetNumber nowunused[MaxHeapTuplesPerPage];
+	HeapTupleFreeze frozen[MaxHeapTuplesPerPage];
+
+	HeapPageFreeze pagefrz;
 
 	/*
-	 * Chain candidates contains indexes of all LP_NORMAL and LP_REDIRECT
-	 * items. The first partition are the indexes of the LP_NORMAL and
-	 * LP_REDIRECT items we know to be part of a chain. The second partition
-	 * are the indexes of HOT tuples that may or may not be part of a HOT
-	 * chain. Those which are part of a HOT chain will be visited and marked
-	 * by heap_prune_chain() and the others will be processed afterward.
+	 * marked[i] is true when heap_prune_chain() has already processed item i.
+	 *
+	 * This needs to be MaxHeapTuplesPerPage + 1 long as FirstOffsetNumber is
+	 * 1. Otherwise every access would need to subtract 1.
 	 */
-	int			nchain_members;
-	int			nchain_candidates;
-	OffsetNumber chain_candidates[MaxHeapTuplesPerPage];
+	bool		marked[MaxHeapTuplesPerPage + 1];
 
 	/*
-	 * marked[i] is true if item i is entered in one of the above arrays.
+	 * Tuple visibility is only computed once for each tuple, for correctness
+	 * and efficiency reasons; see comment in heap_page_prune_and_freeze() for
+	 * details.  This is of type int8[], instead of HTSV_Result[], so we can
+	 * use -1 to indicate no visibility has been computed, e.g. for LP_DEAD
+	 * items.
 	 *
 	 * This needs to be MaxHeapTuplesPerPage + 1 long as FirstOffsetNumber is
 	 * 1. Otherwise every access would need to subtract 1.
 	 */
-	bool		marked[MaxHeapTuplesPerPage + 1];
+	int8		htsv[MaxHeapTuplesPerPage + 1];
+
+	/*
+	 * The rest of the fields are not used by pruning itself, but are used to
+	 * collect information about what was pruned and what state the page is in
+	 * after pruning, for the benefit of the caller.  They are copied to
+	 * PruneFreezeResult at the end.
+	 */
 
 	int			ndeleted;		/* Number of tuples deleted from the page */
 
+	/* Number of live and recently dead tuples, after pruning */
+	int			live_tuples;
+	int			recently_dead_tuples;
+
 	/* Whether or not the page makes rel truncation unsafe */
 	bool		hastup;
 
@@ -77,24 +97,59 @@ typedef struct
 	 */
 	int			lpdead_items;	/* includes existing LP_DEAD items */
 	OffsetNumber *deadoffsets;	/* points directly to PruneResult->deadoffsets */
+
+	/*
+	 * all_visible and all_frozen indicate if the all-visible and all-frozen
+	 * bits in the visibility map can be set for this page, after pruning.
+	 *
+	 * visibility_cutoff_xid is the newest xmin of live tuples on the page.
+	 * The caller can use it as the conflict horizon, when setting the VM
+	 * bits.  It is only valid if we froze some tuples, and all_frozen is
+	 * true.
+	 *
+	 * These are only set if the PRUNE_DO_TRY_FREEZE action flag is set.
+	 *
+	 * NOTE: This 'all_visible' doesn't include LP_DEAD items.  That's
+	 * convenient for heap_page_prune_and_freeze(), to use this to decide
+	 * whether to freeze the page or not.  The 'all_visible' value returned to
+	 * the caller is adjusted to include LP_DEAD items at the end.
+	 */
+	bool		all_visible;
+	bool		all_frozen;
+	TransactionId visibility_cutoff_xid;
+
+	/*
+	 * Chain candidates contains indexes of all LP_NORMAL and LP_REDIRECT
+	 * items. The first partition are the indexes of the LP_NORMAL and
+	 * LP_REDIRECT items we know to be part of a chain. The second partition
+	 * are the indexes of HOT tuples that may or may not be part of a HOT
+	 * chain. Those which are part of a HOT chain will be visited and marked
+	 * by heap_prune_chain() and the others will be processed afterward.
+	 */
+	int			nchain_members;
+	int			nchain_candidates;
+	OffsetNumber chain_candidates[MaxHeapTuplesPerPage];
 } PruneState;
 
 /* Local functions */
 static HTSV_Result heap_prune_satisfies_vacuum(PruneState *prstate,
 											   HeapTuple tup,
 											   Buffer buffer);
-static void heap_prune_chain(Buffer buffer,
-							 OffsetNumber rootoffnum,
-							 int8 *htsv,
-							 PruneState *prstate, PruneResult *presult);
+static inline HTSV_Result htsv_get_valid_status(int status);
+static void heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
+							 PruneState *prstate);
+
 static void heap_prune_record_prunable(PruneState *prstate, TransactionId xid);
 static void heap_prune_record_redirect(PruneState *prstate,
-									   OffsetNumber offnum, OffsetNumber rdoffnum, bool was_normal);
-static void heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum, bool was_normal);
-static void heap_prune_record_dead_or_unused(PruneState *prstate, OffsetNumber offnum, bool was_normal);
+									   OffsetNumber offnum, OffsetNumber rdoffnum,
+									   bool was_normal);
+static void heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum,
+								   bool was_normal);
+static void heap_prune_record_dead_or_unused(PruneState *prstate, OffsetNumber offnum,
+											 bool was_normal);
 static void heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum, bool was_normal);
 
-static void heap_prune_record_unchanged(Page page, int8 *htsv, PruneState *prstate, PruneResult *presult, OffsetNumber offnum);
+static void heap_prune_record_unchanged(Page page, PruneState *prstate, OffsetNumber offnum);
 static void heap_prune_record_unchanged_lp_dead(PruneState *prstate, OffsetNumber offnum);
 static void heap_prune_record_unchanged_lp_redirect(PruneState *prstate, OffsetNumber offnum);
 
@@ -176,14 +231,7 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
 		 */
 		if (PageIsFull(page) || PageGetHeapFreeSpace(page) < minfree)
 		{
-			PruneResult presult;
-
-			presult.pagefrz.freeze_required = false;
-			presult.pagefrz.FreezePageRelfrozenXid = InvalidTransactionId;
-			presult.pagefrz.FreezePageRelminMxid = InvalidMultiXactId;
-			presult.pagefrz.NoFreezePageRelfrozenXid = InvalidTransactionId;
-			presult.pagefrz.NoFreezePageRelminMxid = InvalidMultiXactId;
-			presult.pagefrz.cutoffs = NULL;
+			PruneFreezeResult presult;
 
 			/*
 			 * For now, do not set PRUNE_DO_MARK_UNUSED_NOW regardless of
@@ -191,8 +239,8 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
 			 * determine that during on-access pruning with the current
 			 * implementation.
 			 */
-			heap_page_prune(relation, buffer, vistest, 0,
-							&presult, PRUNE_ON_ACCESS, NULL);
+			heap_page_prune_and_freeze(relation, buffer, 0, vistest,
+									   NULL, &presult, PRUNE_ON_ACCESS, NULL, NULL, NULL);
 
 			/*
 			 * Report the number of tuples reclaimed to pgstats.  This is
@@ -226,35 +274,52 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
 
 
 /*
- * Prune and repair fragmentation in the specified page.
+ * Prune and repair fragmentation and potentially freeze tuples on the
+ * specified page.
+ *
+ * If the page can be marked all-frozen in the visibility map, we may
+ * opportunistically freeze tuples on the page if either its tuples are old
+ * enough or freezing will be cheap enough.
  *
  * Caller must have pin and buffer cleanup lock on the page.  Note that we
  * don't update the FSM information for page on caller's behalf.  Caller might
  * also need to account for a reduction in the length of the line pointer
  * array following array truncation by us.
  *
+ * actions are the pruning actions that heap_page_prune_and_freeze() should
+ * take.
+ *
  * vistest is used to distinguish whether tuples are DEAD or RECENTLY_DEAD
  * (see heap_prune_satisfies_vacuum).
  *
- * actions are the pruning actions that heap_page_prune() should take.
+ * cutoffs contains the information on visibility for the whole relation
+ * collected by vacuum at the beginning of vacuuming the relation. It will be
+ * NULL for callers other than vacuum.
  *
  * presult contains output parameters needed by callers such as the number of
  * tuples removed and the number of line pointers newly marked LP_DEAD.
- * heap_page_prune() is responsible for initializing it.
+ * heap_page_prune_and_freeze() is responsible for initializing it.
  *
  * reason indicates why the pruning is performed.  It is included in the WAL
  * record for debugging and analysis purposes, but otherwise has no effect.
  *
  * off_loc is the offset location required by the caller to use in error
  * callback.
+ *
+ * new_relfrozen_xid and new_relmin_xid are provided by the caller if they
+ * would like the current values of those updated as part of advancing
+ * relfrozenxid/relminmxid.
  */
 void
-heap_page_prune(Relation relation, Buffer buffer,
-				GlobalVisState *vistest,
-				uint8 actions,
-				PruneResult *presult,
-				PruneReason reason,
-				OffsetNumber *off_loc)
+heap_page_prune_and_freeze(Relation relation, Buffer buffer,
+						   uint8 actions,
+						   GlobalVisState *vistest,
+						   struct VacuumCutoffs *cutoffs,
+						   PruneFreezeResult *presult,
+						   PruneReason reason,
+						   OffsetNumber *off_loc,
+						   TransactionId *new_relfrozen_xid,
+						   MultiXactId *new_relmin_mxid)
 {
 	Page		page = BufferGetPage(buffer);
 	BlockNumber blockno = BufferGetBlockNumber(buffer);
@@ -262,6 +327,41 @@ heap_page_prune(Relation relation, Buffer buffer,
 				maxoff;
 	PruneState	prstate;
 	HeapTupleData tup;
+	bool		do_freeze;
+	bool		do_prune;
+	bool		do_hint;
+	bool		hint_bit_fpi;
+	int64		fpi_before = pgWalUsage.wal_fpi;
+
+	/*
+	 * pagefrz contains visibility cutoff information and the current
+	 * relfrozenxid and relminmxids used if the caller is interested in
+	 * freezing tuples on the page.
+	 */
+	prstate.pagefrz.cutoffs = cutoffs;
+	prstate.pagefrz.freeze_required = false;
+
+	if (new_relmin_mxid)
+	{
+		prstate.pagefrz.FreezePageRelminMxid = *new_relmin_mxid;
+		prstate.pagefrz.NoFreezePageRelminMxid = *new_relmin_mxid;
+	}
+	else
+	{
+		prstate.pagefrz.FreezePageRelminMxid = InvalidMultiXactId;
+		prstate.pagefrz.NoFreezePageRelminMxid = InvalidMultiXactId;
+	}
+
+	if (new_relfrozen_xid)
+	{
+		prstate.pagefrz.FreezePageRelfrozenXid = *new_relfrozen_xid;
+		prstate.pagefrz.NoFreezePageRelfrozenXid = *new_relfrozen_xid;
+	}
+	else
+	{
+		prstate.pagefrz.FreezePageRelfrozenXid = InvalidTransactionId;
+		prstate.pagefrz.NoFreezePageRelfrozenXid = InvalidTransactionId;
+	}
 
 	/*
 	 * Our strategy is to scan the page and make lists of items to change,
@@ -277,38 +377,73 @@ heap_page_prune(Relation relation, Buffer buffer,
 	prstate.new_prune_xid = InvalidTransactionId;
 	prstate.vistest = vistest;
 	prstate.actions = actions;
-	prstate.snapshotConflictHorizon = InvalidTransactionId;
-	prstate.nredirected = prstate.ndead = prstate.nunused = 0;
+	prstate.latest_xid_removed = InvalidTransactionId;
+	prstate.nredirected = prstate.ndead = prstate.nunused = prstate.nfrozen = 0;
 	memset(prstate.marked, 0, sizeof(prstate.marked));
+
+	/*
+	 * prstate.htsv is not initialized here because all ntuple spots in the
+	 * array will be set either to a valid HTSV_Result value or -1.
+	 */
+
 	prstate.ndeleted = 0;
-	prstate.nchain_members = 0;
-	prstate.nchain_candidates = 0;
 	prstate.hastup = false;
+	prstate.live_tuples = 0;
+	prstate.recently_dead_tuples = 0;
 	prstate.lpdead_items = 0;
 	prstate.deadoffsets = presult->deadoffsets;
 
 	/*
-	 * If we will prepare to freeze tuples, consider that it might be possible
-	 * to set the page all-frozen in the visibility map.
+	 * Caller may update the VM after we're done.  We keep track of whether
+	 * the page will be all_visible and all_frozen, once we're done with the
+	 * pruning and freezing, to help the caller to do that.
+	 *
+	 * Currently, only VACUUM sets the VM bits.  To save the effort, only do
+	 * only the bookkeeping if the caller needs it.  Currently, that's tied to
+	 * PRUNE_DO_TRY_FREEZE, but it could be a separate flag, if you wanted to
+	 * update the VM bits without also freezing, or freezing without setting
+	 * the VM bits.
+	 *
+	 * In addition to telling the caller whether it can set the VM bit, we
+	 * also use 'all_visible' and 'all_frozen' for our own decision-making. If
+	 * the whole page will become frozen, we consider opportunistically
+	 * freezing tuples.  We will not be able to freeze the whole page if there
+	 * are tuples present which are not visible to everyone or if there are
+	 * dead tuples which are not yet removable.  However, dead tuples which
+	 * will be removed by the end of vacuuming should not preclude us from
+	 * opportunistically freezing.  Because of that, we do not clear
+	 * all_visible when we see LP_DEAD items.  We fix that at the end of the
+	 * function, when we return the value to the caller, so that the caller
+	 * doesn't set the VM bit incorrectly.
 	 */
 	if (prstate.actions & PRUNE_DO_TRY_FREEZE)
-		presult->all_frozen = true;
+	{
+		prstate.all_visible = true;
+		prstate.all_frozen = true;
+	}
 	else
-		presult->all_frozen = false;
-	presult->hastup = prstate.hastup;
+	{
+		prstate.all_visible = false;
+		prstate.all_frozen = false;
+	}
 
 	/*
-	 * presult->htsv is not initialized here because all ntuple spots in the
-	 * array will be set either to a valid HTSV_Result value or -1.
+	 * The visibility cutoff xid is the newest xmin of live tuples on the
+	 * page. In the common case, this will be set as the conflict horizon the
+	 * caller can use for updating the VM.  If, at the end of freezing and
+	 * pruning, the page is all-frozen, there is no possibility that any
+	 * running transaction on the standby does not see tuples on the page as
+	 * all-visible, so the conflict horizon remains InvalidTransactionId.
 	 */
-	presult->ndeleted = 0;
-	presult->nnewlpdead = 0;
+	prstate.visibility_cutoff_xid = InvalidTransactionId;
 
-	presult->nfrozen = 0;
+	prstate.nchain_members = 0;
+	prstate.nchain_candidates = 0;
 
 	maxoff = PageGetMaxOffsetNumber(page);
 	tup.t_tableOid = RelationGetRelid(relation);
 
+
 	/*
 	 * Determine HTSV for all tuples.
 	 *
@@ -336,7 +471,7 @@ heap_page_prune(Relation relation, Buffer buffer,
 		ItemId		itemid = PageGetItemId(page, offnum);
 		HeapTupleHeader htup;
 
-		presult->htsv[offnum] = -1;
+		prstate.htsv[offnum] = -1;
 
 		/* Nothing to do if slot doesn't contain a tuple */
 		if (!ItemIdIsUsed(itemid))
@@ -386,8 +521,8 @@ heap_page_prune(Relation relation, Buffer buffer,
 		tup.t_len = ItemIdGetLength(itemid);
 		ItemPointerSet(&tup.t_self, blockno, offnum);
 
-		presult->htsv[offnum] = heap_prune_satisfies_vacuum(&prstate, &tup,
-															buffer);
+		prstate.htsv[offnum] = heap_prune_satisfies_vacuum(&prstate, &tup,
+														   buffer);
 
 		if (!HeapTupleHeaderIsHeapOnly(htup))
 		{
@@ -404,6 +539,12 @@ heap_page_prune(Relation relation, Buffer buffer,
 		prstate.chain_candidates[prstate.nchain_candidates++] = offnum;
 	}
 
+	/*
+	 * If checksums are enabled, heap_prune_satisfies_vacuum() may have caused
+	 * an FPI to be emitted.
+	 */
+	hint_bit_fpi = fpi_before != pgWalUsage.wal_fpi;
+
 	/* Process HOT chains */
 	for (int i = 0; i < prstate.nchain_members; i++)
 	{
@@ -417,7 +558,7 @@ heap_page_prune(Relation relation, Buffer buffer,
 			*off_loc = offnum;
 
 		/* Process this item or chain of items */
-		heap_prune_chain(buffer, offnum, presult->htsv, &prstate, presult);
+		heap_prune_chain(buffer, offnum, &prstate);
 	}
 
 	/*
@@ -436,7 +577,7 @@ heap_page_prune(Relation relation, Buffer buffer,
 		if (off_loc)
 			*off_loc = offnum;
 
-		if (presult->htsv[offnum] == HEAPTUPLE_DEAD)
+		if (prstate.htsv[offnum] == HEAPTUPLE_DEAD)
 		{
 			ItemId		itemid = PageGetItemId(page, offnum);
 			HeapTupleHeader htup = (HeapTupleHeader) PageGetItem(page, itemid);
@@ -475,7 +616,7 @@ heap_page_prune(Relation relation, Buffer buffer,
 			if (!HeapTupleHeaderIsHotUpdated(htup))
 			{
 				HeapTupleHeaderAdvanceConflictHorizon(htup,
-													  &prstate.snapshotConflictHorizon);
+													  &prstate.latest_xid_removed);
 				heap_prune_record_unused(&prstate, offnum, true);
 				continue;
 			}
@@ -487,7 +628,7 @@ heap_page_prune(Relation relation, Buffer buffer,
 		 * marked by heap_prune_chain() and heap_prune_record_unchanged() will
 		 * return immediately.
 		 */
-		heap_prune_record_unchanged(page, presult->htsv, &prstate, presult, offnum);
+		heap_prune_record_unchanged(page, &prstate, offnum);
 	}
 
 /* We should now have processed every tuple exactly once  */
@@ -510,21 +651,80 @@ heap_page_prune(Relation relation, Buffer buffer,
 	if (off_loc)
 		*off_loc = InvalidOffsetNumber;
 
-	/* Any error while applying the changes is critical */
-	START_CRIT_SECTION();
+	do_prune = prstate.nredirected > 0 ||
+		prstate.ndead > 0 ||
+		prstate.nunused > 0;
+
+	/*
+	 * Even if we don't prune anything, if we found a new value for the
+	 * pd_prune_xid field or the page was marked full, we will update the hint
+	 * bit.
+	 */
+	do_hint = ((PageHeader) page)->pd_prune_xid != prstate.new_prune_xid ||
+		PageIsFull(page);
+
+	/*
+	 * Freeze the page when heap_prepare_freeze_tuple indicates that at least
+	 * one XID/MXID from before FreezeLimit/MultiXactCutoff is present.  Also
+	 * freeze when pruning generated an FPI, if doing so means that we set the
+	 * page all-frozen afterwards (might not happen until final heap pass).
+	 * XXX: Previously, we knew if pruning emitted an FPI by checking
+	 * pgWalUsage.wal_fpi before and after pruning. Once the freeze and prune
+	 * records are combined, this heuristic couldn't be used anymore. The
+	 * opportunistic freeze heuristic must be improved; however, for now, try
+	 * to approximate it.
+	 */
+	do_freeze = false;
+	if (prstate.actions & PRUNE_DO_TRY_FREEZE)
+	{
+		/* Is the whole page freezable? And is there something to freeze? */
+		bool		whole_page_freezable = prstate.all_visible &&
+			prstate.all_frozen;
+
+		if (prstate.pagefrz.freeze_required)
+			do_freeze = true;
+		else if (whole_page_freezable && prstate.nfrozen > 0)
+		{
+			/*
+			 * Freezing would make the page all-frozen. In this case, we will
+			 * freeze if we have already emitted an FPI or will do so anyway.
+			 * Be sure only to incur the overhead of checking if we will do an
+			 * FPI if we may use that information.
+			 */
+			if (hint_bit_fpi ||
+				((do_prune || do_hint) && XLogCheckBufferNeedsBackup(buffer)))
+			{
+				do_freeze = true;
+			}
+		}
+	}
 
-	/* Have we found any prunable items? */
-	if (prstate.nredirected > 0 || prstate.ndead > 0 || prstate.nunused > 0)
+	/*
+	 * Validate the tuples we are considering freezing. We do this even if
+	 * pruning and hint bit setting have not emitted an FPI so far because we
+	 * still may emit an FPI while setting the page hint bit later. But we
+	 * want to avoid doing the pre-freeze checks in a critical section.
+	 */
+	if (do_freeze)
+		heap_pre_freeze_checks(buffer, prstate.frozen, prstate.nfrozen);
+	else if (!prstate.all_frozen || prstate.nfrozen > 0)
 	{
+		Assert(!prstate.pagefrz.freeze_required);
+
 		/*
-		 * Apply the planned item changes, then repair page fragmentation, and
-		 * update the page's hint bit about whether it has free line pointers.
+		 * If we will neither freeze tuples on the page nor set the page all
+		 * frozen in the visibility map, the page is not all-frozen and there
+		 * will be no newly frozen tuples.
 		 */
-		heap_page_prune_execute(buffer, false,
-								prstate.redirected, prstate.nredirected,
-								prstate.nowdead, prstate.ndead,
-								prstate.nowunused, prstate.nunused);
+		prstate.all_frozen = false;
+		prstate.nfrozen = 0;	/* avoid miscounts in instrumentation */
+	}
+
+	/* Any error while applying the changes is critical */
+	START_CRIT_SECTION();
 
+	if (do_hint)
+	{
 		/*
 		 * Update the page's pd_prune_xid field to either zero, or the lowest
 		 * XID of any soon-prunable tuple.
@@ -532,12 +732,35 @@ heap_page_prune(Relation relation, Buffer buffer,
 		((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
 
 		/*
-		 * Also clear the "page is full" flag, since there's no point in
-		 * repeating the prune/defrag process until something else happens to
-		 * the page.
+		 * Clear the "page is full" flag if it is set since there's no point
+		 * in repeating the prune/defrag process until something else happens
+		 * to the page.
 		 */
 		PageClearFull(page);
 
+		/*
+		 * We only needed to update pd_prune_xid and clear the page-is-full
+		 * hint bit, this is a non-WAL-logged hint. If we will also freeze or
+		 * prune the page, we will mark the buffer dirty below.
+		 */
+		if (!do_freeze && !do_prune)
+			MarkBufferDirtyHint(buffer, true);
+	}
+
+	if (do_prune || do_freeze)
+	{
+		/* Apply the planned item changes, then repair page fragmentation. */
+		if (do_prune)
+		{
+			heap_page_prune_execute(buffer, false,
+									prstate.redirected, prstate.nredirected,
+									prstate.nowdead, prstate.ndead,
+									prstate.nowunused, prstate.nunused);
+		}
+
+		if (do_freeze)
+			heap_freeze_prepared_tuples(buffer, prstate.frozen, prstate.nfrozen);
+
 		MarkBufferDirty(buffer);
 
 		/*
@@ -545,42 +768,123 @@ heap_page_prune(Relation relation, Buffer buffer,
 		 */
 		if (RelationNeedsWAL(relation))
 		{
+			/*
+			 * The snapshotConflictHorizon for the whole record should be the
+			 * most conservative of all the horizons calculated for any of the
+			 * possible modifications. If this record will prune tuples, any
+			 * transactions on the standby older than the youngest xmax of the
+			 * most recently removed tuple this record will prune will
+			 * conflict. If this record will freeze tuples, any transactions
+			 * on the standby with xids older than the youngest tuple this
+			 * record will freeze will conflict.
+			 */
+			TransactionId frz_conflict_horizon = InvalidTransactionId;
+			TransactionId conflict_xid;
+
+			/*
+			 * We can use the visibility_cutoff_xid as our cutoff for
+			 * conflicts when the whole page is eligible to become all-frozen
+			 * in the VM once we're done with it.  Otherwise we generate a
+			 * conservative cutoff by stepping back from OldestXmin.
+			 */
+			if (do_freeze)
+			{
+				if (prstate.all_visible && prstate.all_frozen)
+					frz_conflict_horizon = prstate.visibility_cutoff_xid;
+				else
+				{
+					/* Avoids false conflicts when hot_standby_feedback in use */
+					frz_conflict_horizon = prstate.pagefrz.cutoffs->OldestXmin;
+					TransactionIdRetreat(frz_conflict_horizon);
+				}
+			}
+
+			if (TransactionIdFollows(frz_conflict_horizon, prstate.latest_xid_removed))
+				conflict_xid = frz_conflict_horizon;
+			else
+				conflict_xid = prstate.latest_xid_removed;
+
 			log_heap_prune_and_freeze(relation, buffer,
-									  prstate.snapshotConflictHorizon,
+									  conflict_xid,
 									  true, reason,
-									  NULL, 0,
+									  prstate.frozen, prstate.nfrozen,
 									  prstate.redirected, prstate.nredirected,
 									  prstate.nowdead, prstate.ndead,
 									  prstate.nowunused, prstate.nunused);
 		}
 	}
-	else
-	{
-		/*
-		 * If we didn't prune anything, but have found a new value for the
-		 * pd_prune_xid field, update it and mark the buffer dirty. This is
-		 * treated as a non-WAL-logged hint.
-		 *
-		 * Also clear the "page is full" flag if it is set, since there's no
-		 * point in repeating the prune/defrag process until something else
-		 * happens to the page.
-		 */
-		if (((PageHeader) page)->pd_prune_xid != prstate.new_prune_xid ||
-			PageIsFull(page))
-		{
-			((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
-			PageClearFull(page);
-			MarkBufferDirtyHint(buffer, true);
-		}
-	}
 
 	END_CRIT_SECTION();
 
 	/* Copy data back to 'presult' */
-	presult->nnewlpdead = prstate.ndead;
 	presult->ndeleted = prstate.ndeleted;
+	presult->nnewlpdead = prstate.ndead;
+	presult->nfrozen = prstate.nfrozen;
+	presult->live_tuples = prstate.live_tuples;
+	presult->recently_dead_tuples = prstate.recently_dead_tuples;
+
+	/*
+	 * It was convenient to ignore LP_DEAD items in all_visible earlier on to
+	 * make the choice of whether or not to freeze the page unaffected by the
+	 * short-term presence of LP_DEAD items.  These LP_DEAD items were
+	 * effectively assumed to be LP_UNUSED items in the making.  It doesn't
+	 * matter which heap pass (initial pass or final pass) ends up setting the
+	 * page all-frozen, as long as the ongoing VACUUM does it.
+	 *
+	 * Now that freezing has been finalized, unset all_visible if there are
+	 * any LP_DEAD items on the page.  It needs to reflect the present state
+	 * of things, as expected by our caller.
+	 */
+	if (prstate.lpdead_items == 0)
+	{
+		presult->all_visible = prstate.all_visible;
+		presult->all_frozen = prstate.all_frozen;
+	}
+	else
+	{
+		presult->all_visible = false;
+		presult->all_frozen = false;
+	}
+	presult->hastup = prstate.hastup;
+
+	/*
+	 * For callers planning to update the visibility map, the conflict horizon
+	 * for that record must be the newest xmin on the page.  However, if the
+	 * page is completely frozen, there can be no conflict and the
+	 * vm_conflict_horizon should remain InvalidTransactionId.  This includes
+	 * the case that we just froze all the tuples; the prune-freeze record
+	 * included the conflict XID already so the caller doesn't need it.
+	 */
+	if (!presult->all_frozen)
+		presult->vm_conflict_horizon = prstate.visibility_cutoff_xid;
+	else
+		presult->vm_conflict_horizon = InvalidTransactionId;
+
 	presult->lpdead_items = prstate.lpdead_items;
 	/* the presult->deadoffsets array was already filled in */
+
+	/*
+	 * If we will freeze tuples on the page or, even if we don't freeze tuples
+	 * on the page, if we will set the page all-frozen in the visibility map,
+	 * we can advance relfrozenxid and relminmxid to the values in
+	 * pagefrz->FreezePageRelfrozenXid and pagefrz->FreezePageRelminMxid.
+	 */
+	Assert(presult->nfrozen > 0 || !prstate.pagefrz.freeze_required);
+
+	if (new_relfrozen_xid)
+	{
+		if (presult->nfrozen > 0)
+			*new_relfrozen_xid = prstate.pagefrz.FreezePageRelfrozenXid;
+		else
+			*new_relfrozen_xid = prstate.pagefrz.NoFreezePageRelfrozenXid;
+	}
+	if (new_relmin_mxid)
+	{
+		if (presult->nfrozen > 0)
+			*new_relmin_mxid = prstate.pagefrz.FreezePageRelminMxid;
+		else
+			*new_relmin_mxid = prstate.pagefrz.NoFreezePageRelminMxid;
+	}
 }
 
 
@@ -605,10 +909,24 @@ heap_prune_satisfies_vacuum(PruneState *prstate, HeapTuple tup, Buffer buffer)
 }
 
 
+/*
+ * Pruning calculates tuple visibility once and saves the results in an array
+ * of int8. See PruneState.htsv for details. This helper function is meant to
+ * guard against examining visibility status array members which have not yet
+ * been computed.
+ */
+static inline HTSV_Result
+htsv_get_valid_status(int status)
+{
+	Assert(status >= HEAPTUPLE_DEAD &&
+		   status <= HEAPTUPLE_DELETE_IN_PROGRESS);
+	return (HTSV_Result) status;
+}
+
 /*
  * Prune specified line pointer or a HOT chain originating at line pointer.
  *
- * Tuple visibility information is provided in htsv.
+ * Tuple visibility information is provided in prstate->htsv.
  *
  * If the item is an index-referenced tuple (i.e. not a heap-only tuple),
  * the HOT chain is pruned by removing all DEAD tuples at the start of the HOT
@@ -628,11 +946,17 @@ heap_prune_satisfies_vacuum(PruneState *prstate, HeapTuple tup, Buffer buffer)
  * prstate showing the changes to be made.  Items to be redirected are added
  * to the redirected[] array (two entries per redirection); items to be set to
  * LP_DEAD state are added to nowdead[]; and items to be set to LP_UNUSED
- * state are added to nowunused[].
+ * state are added to nowunused[].  We perform bookkeeping of live tuples,
+ * visibility etc. based on what the page will look like after the changes
+ * applied.  All that bookkeeping is performed in the heap_prune_record_*()
+ * subroutines.  The division of labor is that heap_prune_chain() decides the
+ * fate of each tuple, ie. whether it's going to be removed, redirected or
+ * left unchanged, and the heap_prune_record_*() subroutines update PruneState
+ * based on that outcome.
  */
 static void
 heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
-				 int8 *htsv, PruneState *prstate, PruneResult *presult)
+				 PruneState *prstate)
 {
 	Page		page = (Page) BufferGetPage(buffer);
 	ItemId		rootlp = PageGetItemId(page, rootoffnum);
@@ -711,7 +1035,7 @@ heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
 		 */
 		chainitems[nchain++] = offnum;
 
-		switch (htsv_get_valid_status(htsv[offnum]))
+		switch (htsv_get_valid_status(prstate->htsv[offnum]))
 		{
 			case HEAPTUPLE_DEAD:
 
@@ -726,7 +1050,7 @@ heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
 				 */
 				ndeadchain = nchain;
 				HeapTupleHeaderAdvanceConflictHorizon(htup,
-													  &prstate->snapshotConflictHorizon);
+													  &prstate->latest_xid_removed);
 				break;
 
 			case HEAPTUPLE_RECENTLY_DEAD:
@@ -775,10 +1099,11 @@ heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
 	{
 		/*
 		 * We found a redirect item that doesn't point to a valid follow-on
-		 * item.  This can happen if the loop in heap_page_prune caused us to
-		 * visit the dead successor of a redirect item before visiting the
-		 * redirect item.  We can clean up by setting the redirect item to
-		 * LP_DEAD state or LP_UNUSED if the caller indicated.
+		 * item.  This can happen if the loop in heap_page_prune_and_freeze()
+		 * caused us to visit the dead successor of a redirect item before
+		 * visiting the redirect item.  We can clean up by setting the
+		 * redirect item to LP_DEAD state or LP_UNUSED if the caller
+		 * indicated.
 		 */
 		heap_prune_record_dead_or_unused(prstate, rootoffnum, false);
 		return;
@@ -799,7 +1124,7 @@ process_chains:
 
 		/* the rest of tuples in the chain are normal, unchanged tuples */
 		for (; i < nchain; i++)
-			heap_prune_record_unchanged(page, htsv, prstate, presult, chainitems[i]);
+			heap_prune_record_unchanged(page, prstate, chainitems[i]);
 	}
 	else if (ndeadchain == nchain)
 	{
@@ -831,7 +1156,7 @@ process_chains:
 
 		/* the rest of tuples in the chain are normal, unchanged tuples */
 		for (int i = ndeadchain; i < nchain; i++)
-			heap_prune_record_unchanged(page, htsv, prstate, presult, chainitems[i]);
+			heap_prune_record_unchanged(page, prstate, chainitems[i]);
 	}
 }
 
@@ -892,6 +1217,18 @@ heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum,
 	prstate->nowdead[prstate->ndead] = offnum;
 	prstate->ndead++;
 
+	/*
+	 * Deliberately delay unsetting all_visible until later during pruning.
+	 * Removable dead tuples shouldn't preclude freezing the page. After
+	 * finishing this first pass of tuple visibility checks, initialize
+	 * all_visible_except_removable with the current value of all_visible to
+	 * indicate whether or not the page is all visible except for dead tuples.
+	 * This will allow us to attempt to freeze the page after pruning. Later
+	 * during pruning, if we encounter an LP_DEAD item or are setting an item
+	 * LP_DEAD, we will unset all_visible. As long as we unset it before
+	 * updating the visibility map, this will be correct.
+	 */
+
 	/* Record the dead offset for vacuum */
 	prstate->deadoffsets[prstate->lpdead_items++] = offnum;
 
@@ -947,37 +1284,121 @@ heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum, bool was_norm
 }
 
 /*
- * Record LP_NORMAL line pointer that is left unchanged.
+ * Record line pointer that is left unchanged.  We consider freezing it, and
+ * update bookkeeping of tuple counts and page visibility.
  */
 static void
-heap_prune_record_unchanged(Page page, int8 *htsv, PruneState *prstate,
-							PruneResult *presult, OffsetNumber offnum)
+heap_prune_record_unchanged(Page page, PruneState *prstate, OffsetNumber offnum)
 {
 	HeapTupleHeader htup;
 
 	Assert(!prstate->marked[offnum]);
 	prstate->marked[offnum] = true;
 
-	presult->hastup = true;		/* the page is not empty */
+	prstate->hastup = true;		/* the page is not empty */
 
+	/*
+	 * The criteria for counting a tuple as live in this block need to match
+	 * what analyze.c's acquire_sample_rows() does, otherwise VACUUM and
+	 * ANALYZE may produce wildly different reltuples values, e.g. when there
+	 * are many recently-dead tuples.
+	 *
+	 * The logic here is a bit simpler than acquire_sample_rows(), as VACUUM
+	 * can't run inside a transaction block, which makes some cases impossible
+	 * (e.g. in-progress insert from the same transaction).
+	 *
+	 * HEAPTUPLE_DEAD are handled by the other heap_prune_record_*()
+	 * subroutines.  They don't count dead items like acquire_sample_rows()
+	 * does, because we assume that all dead items will become LP_UNUSED
+	 * before VACUUM finishes.  This difference is only superficial.  VACUUM
+	 * effectively agrees with ANALYZE about DEAD items, in the end.  VACUUM
+	 * won't remember LP_DEAD items, but only because they're not supposed to
+	 * be left behind when it is done. (Cases where we bypass index vacuuming
+	 * will violate this optimistic assumption, but the overall impact of that
+	 * should be negligible.)
+	 */
 	htup = (HeapTupleHeader) PageGetItem(page, PageGetItemId(page, offnum));
 
-	switch (htsv[offnum])
+	switch (prstate->htsv[offnum])
 	{
 		case HEAPTUPLE_LIVE:
-		case HEAPTUPLE_INSERT_IN_PROGRESS:
 
 			/*
-			 * If we wanted to optimize for aborts, we might consider marking
-			 * the page prunable when we see INSERT_IN_PROGRESS.  But we
-			 * don't.  See related decisions about when to mark the page
-			 * prunable in heapam.c.
+			 * Count it as live.  Not only is this natural, but it's also what
+			 * acquire_sample_rows() does.
+			 */
+			prstate->live_tuples++;
+
+			/*
+			 * Is the tuple definitely visible to all transactions?
+			 *
+			 * NB: Like with per-tuple hint bits, we can't set the
+			 * PD_ALL_VISIBLE flag if the inserter committed asynchronously.
+			 * See SetHintBits for more info. Check that the tuple is hinted
+			 * xmin-committed because of that.
 			 */
+			if (prstate->all_visible)
+			{
+				TransactionId xmin;
+
+				if (!HeapTupleHeaderXminCommitted(htup))
+				{
+					prstate->all_visible = false;
+					break;
+				}
+
+				/*
+				 * The inserter definitely committed. But is it old enough
+				 * that everyone sees it as committed? A FrozenTransactionId
+				 * is seen as committed to everyone. Otherwise, we check if
+				 * there is a snapshot that considers this xid to still be
+				 * running, and if so, we don't consider the page all-visible.
+				 */
+				xmin = HeapTupleHeaderGetXmin(htup);
+
+				/* For now always use pagefrz->cutoffs */
+				Assert(prstate->pagefrz.cutoffs);
+				if (!TransactionIdPrecedes(xmin, prstate->pagefrz.cutoffs->OldestXmin))
+				{
+					prstate->all_visible = false;
+					break;
+				}
+
+				/* Track newest xmin on page. */
+				if (TransactionIdFollows(xmin, prstate->visibility_cutoff_xid) &&
+					TransactionIdIsNormal(xmin))
+					prstate->visibility_cutoff_xid = xmin;
+			}
 			break;
 
 		case HEAPTUPLE_RECENTLY_DEAD:
+
+			/*
+			 * If tuple is recently dead then we must not remove it from the
+			 * relation.  (We only remove items that are LP_DEAD from
+			 * pruning.)
+			 */
+			prstate->recently_dead_tuples++;
+			prstate->all_visible = false;
+
+			/*
+			 * This tuple may soon become DEAD.  Update the hint field so that
+			 * the page is reconsidered for pruning in future.
+			 */
+			heap_prune_record_prunable(prstate,
+									   HeapTupleHeaderGetUpdateXid(htup));
+			break;
+
 		case HEAPTUPLE_DELETE_IN_PROGRESS:
 
+			/*
+			 * This an expected case during concurrent vacuum. Count such rows
+			 * as live.  As above, we assume the deleting transaction will
+			 * commit and update the counters after we report.
+			 */
+			prstate->live_tuples++;
+			prstate->all_visible = false;
+
 			/*
 			 * This tuple may soon become DEAD.  Update the hint field so that
 			 * the page is reconsidered for pruning in future.
@@ -986,6 +1407,24 @@ heap_prune_record_unchanged(Page page, int8 *htsv, PruneState *prstate,
 									   HeapTupleHeaderGetUpdateXid(htup));
 			break;
 
+		case HEAPTUPLE_INSERT_IN_PROGRESS:
+
+			/*
+			 * We do not count these rows as live, because we expect the
+			 * inserting transaction to update the counters at commit, and we
+			 * assume that will happen only after we report our results.  This
+			 * assumption is a bit shaky, but it is what acquire_sample_rows()
+			 * does, so be consistent.
+			 */
+			prstate->all_visible = false;
+
+			/*
+			 * If we wanted to optimize for aborts, we might consider marking
+			 * the page prunable when we see INSERT_IN_PROGRESS.  But we
+			 * don't.  See related decisions about when to mark the page
+			 * prunable in heapam.c.
+			 */
+			break;
 
 		default:
 
@@ -993,7 +1432,8 @@ heap_prune_record_unchanged(Page page, int8 *htsv, PruneState *prstate,
 			 * DEAD tuples should've been passed to heap_prune_record_dead()
 			 * or heap_prune_record_unused() instead.
 			 */
-			elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result %d", htsv[offnum]);
+			elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result %d",
+				 prstate->htsv[offnum]);
 			break;
 	}
 
@@ -1003,12 +1443,12 @@ heap_prune_record_unchanged(Page page, int8 *htsv, PruneState *prstate,
 		/* Tuple with storage -- consider need to freeze */
 		bool		totally_frozen;
 
-		if ((heap_prepare_freeze_tuple(htup, &presult->pagefrz,
-									   &presult->frozen[presult->nfrozen],
+		if ((heap_prepare_freeze_tuple(htup, &prstate->pagefrz,
+									   &prstate->frozen[prstate->nfrozen],
 									   &totally_frozen)))
 		{
 			/* Save prepared freeze plan for later */
-			presult->frozen[presult->nfrozen++].offset = offnum;
+			prstate->frozen[prstate->nfrozen++].offset = offnum;
 		}
 
 		/*
@@ -1017,7 +1457,7 @@ heap_prune_record_unchanged(Page page, int8 *htsv, PruneState *prstate,
 		 * definitely cannot be set all-frozen in the visibility map later on
 		 */
 		if (!totally_frozen)
-			presult->all_frozen = false;
+			prstate->all_frozen = false;
 	}
 }
 
@@ -1028,9 +1468,6 @@ heap_prune_record_unchanged(Page page, int8 *htsv, PruneState *prstate,
 static void
 heap_prune_record_unchanged_lp_dead(PruneState *prstate, OffsetNumber offnum)
 {
-	Assert(!prstate->marked[offnum]);
-	prstate->marked[offnum] = true;
-
 	/*
 	 * Deliberately don't set hastup for LP_DEAD items.  We make the soft
 	 * assumption that any LP_DEAD items encountered here will become
@@ -1039,12 +1476,19 @@ heap_prune_record_unchanged_lp_dead(PruneState *prstate, OffsetNumber offnum)
 	 * other VACUUM, at most.  Besides, VACUUM must treat
 	 * hastup/nonempty_pages as provisional no matter how LP_DEAD items are
 	 * handled (handled here, or handled later on).
+	 *
+	 * Similarly, don't unset all_visible until later, at the end of
+	 * heap_page_prune_and_freeze().  This will allow us to attempt to freeze
+	 * the page after pruning.  As long as we unset it before updating the
+	 * visibility map, this will be correct.
 	 */
 
 	/* Record the dead offset for vacuum */
 	prstate->deadoffsets[prstate->lpdead_items++] = offnum;
-}
 
+	Assert(!prstate->marked[offnum]);
+	prstate->marked[offnum] = true;
+}
 
 static void
 heap_prune_record_unchanged_lp_redirect(PruneState *prstate, OffsetNumber offnum)
@@ -1062,7 +1506,7 @@ heap_prune_record_unchanged_lp_redirect(PruneState *prstate, OffsetNumber offnum
 }
 
 /*
- * Perform the actual page changes needed by heap_page_prune.
+ * Perform the actual page changes needed by heap_page_prune_and_freeze().
  *
  * If 'lp_truncate_only' is set, we are merely marking LP_DEAD line pointers
  * as unused, not redirecting or removing anything else.  The
@@ -1193,12 +1637,12 @@ heap_page_prune_execute(Buffer buffer, bool lp_truncate_only,
 		else
 		{
 			/*
-			 * When heap_page_prune() was called, PRUNE_DO_MARK_UNUSED_NOW may
-			 * have been set, which allows would-be LP_DEAD items to be made
-			 * LP_UNUSED instead.  This is only possible if the relation has
-			 * no indexes.  If there are any dead items, then
-			 * PRUNE_DO_MARK_UNUSED_NOW was not set and every item being
-			 * marked LP_UNUSED must refer to a heap-only tuple.
+			 * When heap_page_prune_and_freeze() was called,
+			 * PRUNE_DO_MARK_UNUSED_NOW may have been set, which allows
+			 * would-be LP_DEAD items to be made LP_UNUSED instead.  This is
+			 * only possible if the relation has no indexes.  If there are any
+			 * dead items, then PRUNE_DO_MARK_UNUSED_NOW was not set and every
+			 * item being marked LP_UNUSED must refer to a heap-only tuple.
 			 */
 			if (ndead > 0)
 			{
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 7f1e4db55c..3913da7e16 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -430,12 +430,13 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	 * as an upper bound on the XIDs stored in the pages we'll actually scan
 	 * (NewRelfrozenXid tracking must never be allowed to miss unfrozen XIDs).
 	 *
-	 * Next acquire vistest, a related cutoff that's used in heap_page_prune.
-	 * We expect vistest will always make heap_page_prune remove any deleted
-	 * tuple whose xmax is < OldestXmin.  lazy_scan_prune must never become
-	 * confused about whether a tuple should be frozen or removed.  (In the
-	 * future we might want to teach lazy_scan_prune to recompute vistest from
-	 * time to time, to increase the number of dead tuples it can prune away.)
+	 * Next acquire vistest, a related cutoff that's used in
+	 * heap_page_prune_and_freeze(). We expect vistest will always make
+	 * heap_page_prune_and_freeze() remove any deleted tuple whose xmax is <
+	 * OldestXmin.  lazy_scan_prune must never become confused about whether a
+	 * tuple should be frozen or removed.  (In the future we might want to
+	 * teach lazy_scan_prune to recompute vistest from time to time, to
+	 * increase the number of dead tuples it can prune away.)
 	 */
 	vacrel->aggressive = vacuum_get_cutoffs(rel, params, &vacrel->cutoffs);
 	vacrel->rel_pages = orig_rel_pages = RelationGetNumberOfBlocks(rel);
@@ -1387,22 +1388,6 @@ OffsetNumber_cmp(const void *a, const void *b)
  *
  * Caller must hold pin and buffer cleanup lock on the buffer.
  *
- * Prior to PostgreSQL 14 there were very rare cases where heap_page_prune()
- * was allowed to disagree with our HeapTupleSatisfiesVacuum() call about
- * whether or not a tuple should be considered DEAD.  This happened when an
- * inserting transaction concurrently aborted (after our heap_page_prune()
- * call, before our HeapTupleSatisfiesVacuum() call).  There was rather a lot
- * of complexity just so we could deal with tuples that were DEAD to VACUUM,
- * but nevertheless were left with storage after pruning.
- *
- * As of Postgres 17, we circumvent this problem altogether by reusing the
- * result of heap_page_prune()'s visibility check. Without the second call to
- * HeapTupleSatisfiesVacuum(), there is no new HTSV_Result and there can be no
- * disagreement. We'll just handle such tuples as if they had become fully dead
- * right after this operation completes instead of in the middle of it. Note that
- * any tuple that becomes dead after the call to heap_page_prune() can't need to
- * be frozen, because it was visible to another session when vacuum started.
- *
  * vmbuffer is the buffer containing the VM block with visibility information
  * for the heap block, blkno. all_visible_according_to_vm is the saved
  * visibility status of the heap block looked up earlier by the caller. We
@@ -1421,292 +1406,50 @@ lazy_scan_prune(LVRelState *vacrel,
 				bool *has_lpdead_items)
 {
 	Relation	rel = vacrel->rel;
-	OffsetNumber offnum,
-				maxoff;
-	ItemId		itemid;
-	PruneResult presult;
-	int			live_tuples,
-				recently_dead_tuples;
-	bool		all_visible;
-	TransactionId visibility_cutoff_xid;
+	PruneFreezeResult presult;
 	uint8		actions = 0;
-	int64		fpi_before = pgWalUsage.wal_fpi;
 
 	Assert(BufferGetBlockNumber(buf) == blkno);
 
 	/*
-	 * maxoff might be reduced following line pointer array truncation in
-	 * heap_page_prune.  That's safe for us to ignore, since the reclaimed
-	 * space will continue to look like LP_UNUSED items below.
-	 */
-	maxoff = PageGetMaxOffsetNumber(page);
-
-	/* Initialize (or reset) page-level state */
-	presult.pagefrz.freeze_required = false;
-	presult.pagefrz.FreezePageRelfrozenXid = vacrel->NewRelfrozenXid;
-	presult.pagefrz.FreezePageRelminMxid = vacrel->NewRelminMxid;
-	presult.pagefrz.NoFreezePageRelfrozenXid = vacrel->NewRelfrozenXid;
-	presult.pagefrz.NoFreezePageRelminMxid = vacrel->NewRelminMxid;
-	presult.pagefrz.cutoffs = &vacrel->cutoffs;
-	live_tuples = 0;
-	recently_dead_tuples = 0;
-
-	/*
-	 * Prune all HOT-update chains in this page.
+	 * Prune all HOT-update chains and potentially freeze tuples on this page.
 	 *
 	 * We count the number of tuples removed from the page by the pruning step
-	 * in presult.ndeleted. It should not be confused with lpdead_items;
-	 * lpdead_items's final value can be thought of as the number of tuples
-	 * that were deleted from indexes.
+	 * in presult.ndeleted. It should not be confused with
+	 * presult.lpdead_items; presult.lpdead_items's final value can be thought
+	 * of as the number of tuples that were deleted from indexes.
 	 *
 	 * If the relation has no indexes, we can immediately mark would-be dead
 	 * items LP_UNUSED, so PRUNE_DO_MARK_UNUSED_NOW should be set if no
 	 * indexes and unset otherwise.
+	 *
+	 * We will update the VM after collecting LP_DEAD items and freezing
+	 * tuples. Pruning will have determined whether or not the page is
+	 * all-visible.
 	 */
 	actions |= PRUNE_DO_TRY_FREEZE;
 
 	if (vacrel->nindexes == 0)
 		actions |= PRUNE_DO_MARK_UNUSED_NOW;
 
-	heap_page_prune(rel, buf, vacrel->vistest, actions,
-					&presult, PRUNE_VACUUM_SCAN, &vacrel->offnum);
-
-	/*
-	 * We will update the VM after collecting LP_DEAD items and freezing
-	 * tuples. Keep track of whether or not the page is all_visible and
-	 * all_frozen and use this information to update the VM. all_visible
-	 * implies 0 lpdead_items, but don't trust all_frozen result unless
-	 * all_visible is also set to true.
-	 *
-	 * Also keep track of the visibility cutoff xid for recovery conflicts.
-	 */
-	all_visible = true;
-	visibility_cutoff_xid = InvalidTransactionId;
-
-	/*
-	 * Now scan the page to collect LP_DEAD items and update the variables set
-	 * just above.
-	 */
-	for (offnum = FirstOffsetNumber;
-		 offnum <= maxoff;
-		 offnum = OffsetNumberNext(offnum))
-	{
-		HeapTupleHeader htup;
-
-		/*
-		 * Set the offset number so that we can display it along with any
-		 * error that occurred while processing this tuple.
-		 */
-		vacrel->offnum = offnum;
-		itemid = PageGetItemId(page, offnum);
-
-		if (!ItemIdIsUsed(itemid))
-			continue;
-
-		/* Redirect items mustn't be touched */
-		if (ItemIdIsRedirected(itemid))
-			continue;
-
-		if (ItemIdIsDead(itemid))
-		{
-			/*
-			 * Also deliberately delay unsetting all_visible until just before
-			 * we return to lazy_scan_heap caller, as explained in full below.
-			 * (This is another case where it's useful to anticipate that any
-			 * LP_DEAD items will become LP_UNUSED during the ongoing VACUUM.)
-			 */
-			continue;
-		}
-
-		Assert(ItemIdIsNormal(itemid));
-
-		htup = (HeapTupleHeader) PageGetItem(page, itemid);
-
-		/*
-		 * The criteria for counting a tuple as live in this block need to
-		 * match what analyze.c's acquire_sample_rows() does, otherwise VACUUM
-		 * and ANALYZE may produce wildly different reltuples values, e.g.
-		 * when there are many recently-dead tuples.
-		 *
-		 * The logic here is a bit simpler than acquire_sample_rows(), as
-		 * VACUUM can't run inside a transaction block, which makes some cases
-		 * impossible (e.g. in-progress insert from the same transaction).
-		 *
-		 * We treat LP_DEAD items (which are the closest thing to DEAD tuples
-		 * that might be seen here) differently, too: we assume that they'll
-		 * become LP_UNUSED before VACUUM finishes.  This difference is only
-		 * superficial.  VACUUM effectively agrees with ANALYZE about DEAD
-		 * items, in the end.  VACUUM won't remember LP_DEAD items, but only
-		 * because they're not supposed to be left behind when it is done.
-		 * (Cases where we bypass index vacuuming will violate this optimistic
-		 * assumption, but the overall impact of that should be negligible.)
-		 */
-		switch (htsv_get_valid_status(presult.htsv[offnum]))
-		{
-			case HEAPTUPLE_LIVE:
-
-				/*
-				 * Count it as live.  Not only is this natural, but it's also
-				 * what acquire_sample_rows() does.
-				 */
-				live_tuples++;
-
-				/*
-				 * Is the tuple definitely visible to all transactions?
-				 *
-				 * NB: Like with per-tuple hint bits, we can't set the
-				 * PD_ALL_VISIBLE flag if the inserter committed
-				 * asynchronously. See SetHintBits for more info. Check that
-				 * the tuple is hinted xmin-committed because of that.
-				 */
-				if (all_visible)
-				{
-					TransactionId xmin;
-
-					if (!HeapTupleHeaderXminCommitted(htup))
-					{
-						all_visible = false;
-						break;
-					}
-
-					/*
-					 * The inserter definitely committed. But is it old enough
-					 * that everyone sees it as committed?
-					 */
-					xmin = HeapTupleHeaderGetXmin(htup);
-					if (!TransactionIdPrecedes(xmin,
-											   vacrel->cutoffs.OldestXmin))
-					{
-						all_visible = false;
-						break;
-					}
-
-					/* Track newest xmin on page. */
-					if (TransactionIdFollows(xmin, visibility_cutoff_xid) &&
-						TransactionIdIsNormal(xmin))
-						visibility_cutoff_xid = xmin;
-				}
-				break;
-			case HEAPTUPLE_RECENTLY_DEAD:
-
-				/*
-				 * If tuple is recently dead then we must not remove it from
-				 * the relation.  (We only remove items that are LP_DEAD from
-				 * pruning.)
-				 */
-				recently_dead_tuples++;
-				all_visible = false;
-				break;
-			case HEAPTUPLE_INSERT_IN_PROGRESS:
-
-				/*
-				 * We do not count these rows as live, because we expect the
-				 * inserting transaction to update the counters at commit, and
-				 * we assume that will happen only after we report our
-				 * results.  This assumption is a bit shaky, but it is what
-				 * acquire_sample_rows() does, so be consistent.
-				 */
-				all_visible = false;
-				break;
-			case HEAPTUPLE_DELETE_IN_PROGRESS:
-				/* This is an expected case during concurrent vacuum */
-				all_visible = false;
+	heap_page_prune_and_freeze(rel, buf, actions, vacrel->vistest,
+							   &vacrel->cutoffs, &presult, PRUNE_VACUUM_SCAN, &vacrel->offnum,
+							   &vacrel->NewRelfrozenXid, &vacrel->NewRelminMxid);
 
-				/*
-				 * Count such rows as live.  As above, we assume the deleting
-				 * transaction will commit and update the counters after we
-				 * report.
-				 */
-				live_tuples++;
-				break;
-			default:
-				elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
-				break;
-		}
-	}
+	Assert(MultiXactIdIsValid(vacrel->NewRelminMxid));
+	Assert(TransactionIdIsValid(vacrel->NewRelfrozenXid));
 
-	/*
-	 * We have now divided every item on the page into either an LP_DEAD item
-	 * that will need to be vacuumed in indexes later, or a LP_NORMAL tuple
-	 * that remains and needs to be considered for freezing now (LP_UNUSED and
-	 * LP_REDIRECT items also remain, but are of no further interest to us).
-	 */
 	vacrel->offnum = InvalidOffsetNumber;
 
-	/*
-	 * Freeze the page when heap_prepare_freeze_tuple indicates that at least
-	 * one XID/MXID from before FreezeLimit/MultiXactCutoff is present.  Also
-	 * freeze when pruning generated an FPI, if doing so means that we set the
-	 * page all-frozen afterwards (might not happen until final heap pass).
-	 */
-	if (presult.pagefrz.freeze_required || presult.nfrozen == 0 ||
-		(all_visible && presult.all_frozen &&
-		 fpi_before != pgWalUsage.wal_fpi))
+	if (presult.nfrozen > 0)
 	{
 		/*
-		 * We're freezing the page.  Our final NewRelfrozenXid doesn't need to
-		 * be affected by the XIDs that are just about to be frozen anyway.
+		 * We never increment the frozen_pages instrumentation counter when
+		 * nfrozen == 0, since it only counts pages with newly frozen tuples
+		 * (don't confuse that with pages newly set all-frozen in VM).
 		 */
-		vacrel->NewRelfrozenXid = presult.pagefrz.FreezePageRelfrozenXid;
-		vacrel->NewRelminMxid = presult.pagefrz.FreezePageRelminMxid;
-
-		if (presult.nfrozen == 0)
-		{
-			/*
-			 * We have no freeze plans to execute, so there's no added cost
-			 * from following the freeze path.  That's why it was chosen. This
-			 * is important in the case where the page only contains totally
-			 * frozen tuples at this point (perhaps only following pruning).
-			 * Such pages can be marked all-frozen in the VM by our caller,
-			 * even though none of its tuples were newly frozen here (note
-			 * that the "no freeze" path never sets pages all-frozen).
-			 *
-			 * We never increment the frozen_pages instrumentation counter
-			 * here, since it only counts pages with newly frozen tuples
-			 * (don't confuse that with pages newly set all-frozen in VM).
-			 */
-		}
-		else
-		{
-			TransactionId snapshotConflictHorizon;
-
-			vacrel->frozen_pages++;
-
-			/*
-			 * We can use visibility_cutoff_xid as our cutoff for conflicts
-			 * when the whole page is eligible to become all-frozen in the VM
-			 * once we're done with it.  Otherwise we generate a conservative
-			 * cutoff by stepping back from OldestXmin.
-			 */
-			if (all_visible && presult.all_frozen)
-			{
-				/* Using same cutoff when setting VM is now unnecessary */
-				snapshotConflictHorizon = visibility_cutoff_xid;
-				visibility_cutoff_xid = InvalidTransactionId;
-			}
-			else
-			{
-				/* Avoids false conflicts when hot_standby_feedback in use */
-				snapshotConflictHorizon = vacrel->cutoffs.OldestXmin;
-				TransactionIdRetreat(snapshotConflictHorizon);
-			}
+		vacrel->frozen_pages++;
 
-			/* Execute all freeze plans for page as a single atomic action */
-			heap_freeze_execute_prepared(vacrel->rel, buf,
-										 snapshotConflictHorizon,
-										 presult.frozen, presult.nfrozen);
-		}
-	}
-	else
-	{
-		/*
-		 * Page requires "no freeze" processing.  It might be set all-visible
-		 * in the visibility map, but it can never be set all-frozen.
-		 */
-		vacrel->NewRelfrozenXid = presult.pagefrz.NoFreezePageRelfrozenXid;
-		vacrel->NewRelminMxid = presult.pagefrz.NoFreezePageRelminMxid;
-		presult.all_frozen = false;
-		presult.nfrozen = 0;	/* avoid miscounts in instrumentation */
 	}
 
 	/*
@@ -1718,17 +1461,21 @@ lazy_scan_prune(LVRelState *vacrel,
 	 */
 #ifdef USE_ASSERT_CHECKING
 	/* Note that all_frozen value does not matter when !all_visible */
-	if (all_visible && presult.lpdead_items == 0)
+	if (presult.all_visible)
 	{
 		TransactionId debug_cutoff;
 		bool		debug_all_frozen;
 
+		Assert(presult.lpdead_items == 0);
+
 		if (!heap_page_is_all_visible(vacrel, buf,
 									  &debug_cutoff, &debug_all_frozen))
 			Assert(false);
 
+		Assert(presult.all_frozen == debug_all_frozen);
+
 		Assert(!TransactionIdIsValid(debug_cutoff) ||
-			   debug_cutoff == visibility_cutoff_xid);
+			   debug_cutoff == presult.vm_conflict_horizon);
 	}
 #endif
 
@@ -1762,27 +1509,14 @@ lazy_scan_prune(LVRelState *vacrel,
 		Assert(dead_items->num_items <= dead_items->max_items);
 		pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
 									 dead_items->num_items);
-
-		/*
-		 * It was convenient to ignore LP_DEAD items in all_visible earlier on
-		 * to make the choice of whether or not to freeze the page unaffected
-		 * by the short-term presence of LP_DEAD items.  These LP_DEAD items
-		 * were effectively assumed to be LP_UNUSED items in the making.  It
-		 * doesn't matter which heap pass (initial pass or final pass) ends up
-		 * setting the page all-frozen, as long as the ongoing VACUUM does it.
-		 *
-		 * Now that freezing has been finalized, unset all_visible.  It needs
-		 * to reflect the present state of things, as expected by our caller.
-		 */
-		all_visible = false;
 	}
 
 	/* Finally, add page-local counts to whole-VACUUM counts */
 	vacrel->tuples_deleted += presult.ndeleted;
 	vacrel->tuples_frozen += presult.nfrozen;
 	vacrel->lpdead_items += presult.lpdead_items;
-	vacrel->live_tuples += live_tuples;
-	vacrel->recently_dead_tuples += recently_dead_tuples;
+	vacrel->live_tuples += presult.live_tuples;
+	vacrel->recently_dead_tuples += presult.recently_dead_tuples;
 
 	/* Can't truncate this page */
 	if (presult.hastup)
@@ -1791,20 +1525,20 @@ lazy_scan_prune(LVRelState *vacrel,
 	/* Did we find LP_DEAD items? */
 	*has_lpdead_items = (presult.lpdead_items > 0);
 
-	Assert(!all_visible || !(*has_lpdead_items));
+	Assert(!presult.all_visible || !(*has_lpdead_items));
 
 	/*
 	 * Handle setting visibility map bit based on information from the VM (as
 	 * of last heap_vac_scan_next_block() call), and from all_visible and
 	 * all_frozen variables
 	 */
-	if (!all_visible_according_to_vm && all_visible)
+	if (!all_visible_according_to_vm && presult.all_visible)
 	{
 		uint8		flags = VISIBILITYMAP_ALL_VISIBLE;
 
 		if (presult.all_frozen)
 		{
-			Assert(!TransactionIdIsValid(visibility_cutoff_xid));
+			Assert(!TransactionIdIsValid(presult.vm_conflict_horizon));
 			flags |= VISIBILITYMAP_ALL_FROZEN;
 		}
 
@@ -1824,7 +1558,7 @@ lazy_scan_prune(LVRelState *vacrel,
 		PageSetAllVisible(page);
 		MarkBufferDirty(buf);
 		visibilitymap_set(vacrel->rel, blkno, buf, InvalidXLogRecPtr,
-						  vmbuffer, visibility_cutoff_xid,
+						  vmbuffer, presult.vm_conflict_horizon,
 						  flags);
 	}
 
@@ -1872,7 +1606,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	 * it as all-frozen.  Note that all_frozen is only valid if all_visible is
 	 * true, so we must check both all_visible and all_frozen.
 	 */
-	else if (all_visible_according_to_vm && all_visible &&
+	else if (all_visible_according_to_vm && presult.all_visible &&
 			 presult.all_frozen && !VM_ALL_FROZEN(vacrel->rel, blkno, &vmbuffer))
 	{
 		/*
@@ -1889,11 +1623,11 @@ lazy_scan_prune(LVRelState *vacrel,
 		/*
 		 * Set the page all-frozen (and all-visible) in the VM.
 		 *
-		 * We can pass InvalidTransactionId as our visibility_cutoff_xid,
-		 * since a snapshotConflictHorizon sufficient to make everything safe
-		 * for REDO was logged when the page's tuples were frozen.
+		 * We can pass InvalidTransactionId as our vm_conflict_horizon, since
+		 * a snapshotConflictHorizon sufficient to make everything safe for
+		 * REDO was logged when the page's tuples were frozen.
 		 */
-		Assert(!TransactionIdIsValid(visibility_cutoff_xid));
+		Assert(!TransactionIdIsValid(presult.vm_conflict_horizon));
 		visibilitymap_set(vacrel->rel, blkno, buf, InvalidXLogRecPtr,
 						  vmbuffer, InvalidTransactionId,
 						  VISIBILITYMAP_ALL_VISIBLE |
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index e346312471..dfb36ea404 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -215,21 +215,15 @@ typedef struct HeapPageFreeze
 /*
  * Per-page state returned from pruning
  */
-typedef struct PruneResult
+typedef struct PruneFreezeResult
 {
 	int			ndeleted;		/* Number of tuples deleted from the page */
 	int			nnewlpdead;		/* Number of newly LP_DEAD items */
+	int			nfrozen;		/* Number of tuples we froze */
 
-	/*
-	 * Tuple visibility is only computed once for each tuple, for correctness
-	 * and efficiency reasons; see comment in heap_page_prune() for details.
-	 * This is of type int8[], instead of HTSV_Result[], so we can use -1 to
-	 * indicate no visibility has been computed, e.g. for LP_DEAD items.
-	 *
-	 * This needs to be MaxHeapTuplesPerPage + 1 long as FirstOffsetNumber is
-	 * 1. Otherwise every access would need to subtract 1.
-	 */
-	int8		htsv[MaxHeapTuplesPerPage + 1];
+	/* Number of live and recently dead tuples on the page, after pruning */
+	int			live_tuples;
+	int			recently_dead_tuples;
 
 	/*
 	 * Whether or not the page makes rel truncation unsafe
@@ -240,18 +234,18 @@ typedef struct PruneResult
 	bool		hastup;
 
 	/*
-	 * Prepare to freeze in heap_page_prune(). lazy_scan_prune() will use the
-	 * returned freeze plans to execute freezing.
-	 */
-	HeapPageFreeze pagefrz;
-
-	/*
-	 * Whether or not the page can be set all-frozen in the visibility map.
-	 * This is only set if the PRUNE_DO_TRY_FREEZE action flag is set.
+	 * all_visible and all_frozen indicate if the all-visible and all-frozen
+	 * bits in the visibility map can be set for this page, after pruning.
+	 *
+	 * vm_conflict_horizon is the newest xmin of live tuples on the page.  The
+	 * caller can use it as the conflict horizon, when setting the VM bits. It
+	 * is only valid if we froze some tuples, and all_frozen is true.
+	 *
+	 * These are only set if the PRUNE_DO_TRY_FREEZE action flag is set.
 	 */
+	bool		all_visible;
 	bool		all_frozen;
-	int			nfrozen;
-	HeapTupleFreeze frozen[MaxHeapTuplesPerPage];
+	TransactionId vm_conflict_horizon;
 
 	/*
 	 * LP_DEAD items on the page after pruning. Includes existing LP_DEAD
@@ -259,7 +253,7 @@ typedef struct PruneResult
 	 */
 	int			lpdead_items;
 	OffsetNumber deadoffsets[MaxHeapTuplesPerPage];
-} PruneResult;
+} PruneFreezeResult;
 
 /* 'reason' codes for heap_page_prune() */
 typedef enum
@@ -269,20 +263,6 @@ typedef enum
 	PRUNE_VACUUM_CLEANUP,		/* VACUUM 2nd heap pass */
 } PruneReason;
 
-/*
- * Pruning calculates tuple visibility once and saves the results in an array
- * of int8. See PruneResult.htsv for details. This helper function is meant to
- * guard against examining visibility status array members which have not yet
- * been computed.
- */
-static inline HTSV_Result
-htsv_get_valid_status(int status)
-{
-	Assert(status >= HEAPTUPLE_DEAD &&
-		   status <= HEAPTUPLE_DELETE_IN_PROGRESS);
-	return (HTSV_Result) status;
-}
-
 /* ----------------
  *		function prototypes for heap access method
  *
@@ -355,9 +335,11 @@ extern void heap_inplace_update(Relation relation, HeapTuple tuple);
 extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 									  HeapPageFreeze *pagefrz,
 									  HeapTupleFreeze *frz, bool *totally_frozen);
-extern void heap_freeze_execute_prepared(Relation rel, Buffer buffer,
-										 TransactionId snapshotConflictHorizon,
-										 HeapTupleFreeze *tuples, int ntuples);
+
+extern void heap_pre_freeze_checks(Buffer buffer,
+								   HeapTupleFreeze *tuples, int ntuples);
+extern void heap_freeze_prepared_tuples(Buffer buffer,
+										HeapTupleFreeze *tuples, int ntuples);
 extern bool heap_freeze_tuple(HeapTupleHeader tuple,
 							  TransactionId relfrozenxid, TransactionId relminmxid,
 							  TransactionId FreezeLimit, TransactionId MultiXactCutoff);
@@ -378,12 +360,15 @@ extern TransactionId heap_index_delete_tuples(Relation rel,
 /* in heap/pruneheap.c */
 struct GlobalVisState;
 extern void heap_page_prune_opt(Relation relation, Buffer buffer);
-extern void heap_page_prune(Relation relation, Buffer buffer,
-							struct GlobalVisState *vistest,
-							uint8 actions,
-							PruneResult *presult,
-							PruneReason reason,
-							OffsetNumber *off_loc);
+extern void heap_page_prune_and_freeze(Relation relation, Buffer buffer,
+									   uint8 actions,
+									   struct GlobalVisState *vistest,
+									   struct VacuumCutoffs *cutoffs,
+									   PruneFreezeResult *presult,
+									   PruneReason reason,
+									   OffsetNumber *off_loc,
+									   TransactionId *new_relfrozen_xid,
+									   MultiXactId *new_relmin_mxid);
 extern void heap_page_prune_execute(Buffer buffer, bool lp_truncate_only,
 									OffsetNumber *redirected, int nredirected,
 									OffsetNumber *nowdead, int ndead,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index cfa9d5aaea..cbb9707b6a 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2192,7 +2192,7 @@ PromptInterruptContext
 ProtocolVersion
 PrsStorage
 PruneReason
-PruneResult
+PruneFreezeResult
 PruneState
 PruneStepResult
 PsqlScanCallbacks
-- 
2.40.1

