From b946b65744d2e7906d8ea39085d5749c4b6be4d5 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Thu, 28 Mar 2024 00:16:09 +0200
Subject: [PATCH v9 21/21] WIP refactor

- the revisit array is removed. with the restructuring of
  heap_prune_chain(), we could ensure that we called record_unchanged()
  for all of the correct tuples without stashing them away

- in heap_prune_chain(), I cleaned up the the chain traversal logic and
  grouped the post traversal chain processing is in three separate
  branches. I find this to be a big clarity improvement.
---
 src/backend/access/heap/pruneheap.c  | 794 ++++++++++++++++-----------
 src/backend/access/heap/vacuumlazy.c |   6 +-
 src/include/access/heapam.h          |  40 +-
 3 files changed, 504 insertions(+), 336 deletions(-)

diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index e1eed42004f..1cb692dd25f 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -32,16 +32,16 @@
 /* Working data for heap_page_prune_and_freeze() and subroutines */
 typedef struct
 {
+	/* PRUNE_DO_* arguments */
+	uint8		actions;
+
 	/* tuple visibility test, initialized for the relation */
 	GlobalVisState *vistest;
-	uint8		actions;
-	TransactionId visibility_cutoff_xid;
-	bool		all_visible_except_removable;
 
 	/*
 	 * Fields describing what to do to the page
 	 */
-	TransactionId new_prune_xid;		/* new prune hint value */
+	TransactionId new_prune_xid;	/* new prune hint value */
 	TransactionId latest_xid_removed;
 	int			nredirected;	/* numbers of entries in arrays below */
 	int			ndead;
@@ -53,8 +53,10 @@ typedef struct
 	OffsetNumber nowunused[MaxHeapTuplesPerPage];
 	HeapTupleFreeze frozen[MaxHeapTuplesPerPage];
 
+	HeapPageFreeze pagefrz;
+
 	/*
-	 * marked[i] is true if item i is entered in one of the above arrays.
+	 * marked[i] is true when heap_prune_chain() has already processed item i.
 	 *
 	 * This needs to be MaxHeapTuplesPerPage + 1 long as FirstOffsetNumber is
 	 * 1. Otherwise every access would need to subtract 1.
@@ -73,37 +75,73 @@ typedef struct
 	 */
 	int8		htsv[MaxHeapTuplesPerPage + 1];
 
-	HeapPageFreeze pagefrz;
+	/*
+	 * The rest of the fields are not used by pruning itself, but are used to
+	 * collect information about what was pruned and what state the page is in
+	 * after pruning, for the benefit of the caller.  They are copied to
+	 * PruneFreezeResult at the end.
+	 */
+
+	int			ndeleted;		/* Number of tuples deleted from the page */
+
+	/* Number of live and recently dead tuples, after pruning */
+	int			live_tuples;
+	int			recently_dead_tuples;
+
+	/* Whether or not the page makes rel truncation unsafe */
+	bool		hastup;
 
 	/*
-	 * Whether or not this tuple has been counted toward vacuum stats. In
-	 * heap_prune_chain(), we have to be sure that Heap Only Tuples that are
-	 * not part of any chain are counted correctly.
+	 * LP_DEAD items on the page after pruning.  Includes existing LP_DEAD
+	 * items
+	 */
+	int			lpdead_items;	/* includes existing LP_DEAD items */
+	OffsetNumber *deadoffsets;	/* points directly to PruneResult->deadoffsets */
+
+	/*
+	 * all_visible and all_frozen indicate if the all-visible and all-frozen
+	 * bits in the visibility map can be set for this page, after pruning.
+	 *
+	 * visibility_cutoff_xid is the newest xmin of live tuples on the page.
+	 * The caller can use it as the conflict horizon, when setting the VM
+	 * bits.  It is only valid if we froze some tuples, and all_frozen is
+	 * true.
+	 *
+	 * These are only set if the PRUNE_DO_TRY_FREEZE action flag is set.
+	 *
+	 * NOTE: This 'all_visible' doesn't include LP_DEAD items.  That's
+	 * convenient for heap_page_prune_and_freeze(), to use this to decide
+	 * whether to freeze the page or not.  The 'all_visible' value returned to
+	 * the caller is adjusted to include LP_DEAD items at the end.
 	 */
-	bool		counted[MaxHeapTuplesPerPage + 1];
+	bool		all_visible;
+	bool		all_frozen;
+	TransactionId visibility_cutoff_xid;
+
 } PruneState;
 
 /* Local functions */
 static HTSV_Result heap_prune_satisfies_vacuum(PruneState *prstate,
 											   HeapTuple tup,
 											   Buffer buffer);
-static int	heap_prune_chain(Buffer buffer,
-							 OffsetNumber rootoffnum,
-							 PruneState *prstate, PruneFreezeResult *presult);
-
 static inline HTSV_Result htsv_get_valid_status(int status);
+static void heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
+							 PruneState *prstate);
+
 static void heap_prune_record_prunable(PruneState *prstate, TransactionId xid);
-static void heap_prune_record_redirect(Page page, PruneState *prstate,
+static void heap_prune_record_redirect(PruneState *prstate,
 									   OffsetNumber offnum, OffsetNumber rdoffnum,
-									   PruneFreezeResult *presult);
+									   bool was_normal);
 static void heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum,
-								   PruneFreezeResult *presult);
+								   bool was_normal);
 static void heap_prune_record_dead_or_unused(PruneState *prstate, OffsetNumber offnum,
-											 PruneFreezeResult *presult);
-static void heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum);
+											 bool was_normal);
+static void heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum, bool was_normal);
+
+static void heap_prune_record_unchanged(Page page, PruneState *prstate, OffsetNumber offnum);
+static void heap_prune_record_unchanged_lp_dead(PruneState *prstate, OffsetNumber offnum);
+static void heap_prune_record_unchanged_lp_redirect(PruneState *prstate, OffsetNumber offnum);
 
-static void heap_prune_record_live_or_recently_dead(Page page, PruneState *prstate,
-													OffsetNumber offnum, PruneFreezeResult *presult);
 static void page_verify_redirects(Page page);
 
 
@@ -242,6 +280,10 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
  * vistest is used to distinguish whether tuples are DEAD or RECENTLY_DEAD
  * (see heap_prune_satisfies_vacuum).
  *
+ * cutoffs contains the information on visibility for the whole relation
+ * collected by vacuum at the beginning of vacuuming the relation. It will be
+ * NULL for callers other than vacuum.
+ *
  * presult contains output parameters needed by callers such as the number of
  * tuples removed and the number of line pointers newly marked LP_DEAD.
  * heap_page_prune_and_freeze() is responsible for initializing it.
@@ -326,70 +368,62 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 	prstate.latest_xid_removed = InvalidTransactionId;
 	prstate.nredirected = prstate.ndead = prstate.nunused = prstate.nfrozen = 0;
 	memset(prstate.marked, 0, sizeof(prstate.marked));
-	memset(prstate.counted, 0, sizeof(prstate.counted));
 
 	/*
 	 * prstate.htsv is not initialized here because all ntuple spots in the
 	 * array will be set either to a valid HTSV_Result value or -1.
 	 */
-	presult->ndeleted = 0;
-	presult->nnewlpdead = 0;
-
-	presult->hastup = false;
 
-	presult->live_tuples = 0;
-	presult->recently_dead_tuples = 0;
-	presult->lpdead_items = 0;
+	prstate.ndeleted = 0;
+	prstate.hastup = false;
+	prstate.live_tuples = 0;
+	prstate.recently_dead_tuples = 0;
+	prstate.lpdead_items = 0;
+	prstate.deadoffsets = presult->deadoffsets;
 
 	/*
-	 * Caller may update the VM after pruning, collecting LP_DEAD items, and
-	 * freezing tuples. Keep track of whether or not the page is all_visible
-	 * and all_frozen and use this information to update the VM. all_visible
-	 * implies lpdead_items == 0, but don't trust all_frozen result unless
-	 * all_visible is also set to true. If we won't even try freezing,
-	 * initialize all_frozen to false.
+	 * Caller may update the VM after we're done.  We keep track of whether
+	 * the page will be all_visible and all_frozen, once we're done with the
+	 * pruning and freezing, to help the caller to do that.
 	 *
-	 * For vacuum, if the whole page will become frozen, we consider
-	 * opportunistically freezing tuples. Dead tuples which will be removed by
-	 * the end of vacuuming should not preclude us from opportunistically
-	 * freezing. We will not be able to freeze the whole page if there are
-	 * tuples present which are not visible to everyone or if there are dead
-	 * tuples which are not yet removable. We need all_visible to be false if
-	 * LP_DEAD tuples remain after pruning so that we do not incorrectly
-	 * update the visibility map or page hint bit. So, we will update
-	 * presult->all_visible to reflect the presence of LP_DEAD items while
-	 * pruning and keep all_visible_except_removable to permit freezing if the
-	 * whole page will eventually become all visible after removing tuples.
+	 * Currently, only VACUUM sets the VM bits.  To save the effort, only do
+	 * only the bookkeeping if the caller needs it.  Currently, that's tied to
+	 * PRUNE_DO_TRY_FREEZE, but it could be a separate flag, if you wanted to
+	 * update the VM bits without also freezing, or freezing without setting
+	 * the VM bits.
+	 *
+	 * In addition to telling the caller whether it can set the VM bit, we
+	 * also use 'all_visible' and 'all_frozen' for our own decision-making. If
+	 * the whole page will become frozen, we consider opportunistically
+	 * freezing tuples.  We will not be able to freeze the whole page if there
+	 * are tuples present which are not visible to everyone or if there are
+	 * dead tuples which are not yet removable.  However, dead tuples which
+	 * will be removed by the end of vacuuming should not preclude us from
+	 * opportunistically freezing.  Because of that, we do not clear
+	 * all_visible when we see LP_DEAD items.  We fix that at the end of the
+	 * function, when we return the value to the caller, so that the caller
+	 * doesn't set the VM bit incorrectly.
 	 */
-	presult->all_visible = true;
-
 	if (prstate.actions & PRUNE_DO_TRY_FREEZE)
-		presult->set_all_frozen = true;
+	{
+		prstate.all_visible = true;
+		prstate.all_frozen = true;
+	}
 	else
-		presult->set_all_frozen = false;
-
-	/*
-	 * Deliberately delay unsetting all_visible until later during pruning.
-	 * Removable dead tuples shouldn't preclude freezing the page. After
-	 * finishing this first pass of tuple visibility checks, initialize
-	 * all_visible_except_removable with the current value of all_visible to
-	 * indicate whether or not the page is all visible except for dead tuples.
-	 * This will allow us to attempt to freeze the page after pruning. Later
-	 * during pruning, if we encounter an LP_DEAD item or are setting an item
-	 * LP_DEAD, we will unset all_visible. As long as we unset it before
-	 * updating the visibility map, this will be correct.
-	 */
-	prstate.all_visible_except_removable = true;
+	{
+		prstate.all_visible = false;
+		prstate.all_frozen = false;
+	}
 
 	/*
 	 * The visibility cutoff xid is the newest xmin of live tuples on the
 	 * page. In the common case, this will be set as the conflict horizon the
-	 * caller can use for updating the VM. If, at the end of freezing and
+	 * caller can use for updating the VM.  If, at the end of freezing and
 	 * pruning, the page is all-frozen, there is no possibility that any
 	 * running transaction on the standby does not see tuples on the page as
 	 * all-visible, so the conflict horizon remains InvalidTransactionId.
 	 */
-	presult->vm_conflict_horizon = prstate.visibility_cutoff_xid = InvalidTransactionId;
+	prstate.visibility_cutoff_xid = InvalidTransactionId;
 
 	maxoff = PageGetMaxOffsetNumber(page);
 	tup.t_tableOid = RelationGetRelid(relation);
@@ -450,7 +484,13 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 	 */
 	hint_bit_fpi = fpi_before != pgWalUsage.wal_fpi;
 
-	/* Scan the page */
+	/*
+	 * Scan the page, processing each tuple.
+	 *
+	 * heap_prune_chain() decides for each tuple, whether it can be pruned,
+	 * redirected or frozen.  It follows HOT chains, processing each HOT chain
+	 * as a unit.
+	 */
 	for (offnum = FirstOffsetNumber;
 		 offnum <= maxoff;
 		 offnum = OffsetNumberNext(offnum))
@@ -471,10 +511,44 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 			continue;
 
 		/* Process this item or chain of items */
-		presult->ndeleted += heap_prune_chain(buffer, offnum,
-											  &prstate, presult);
+		heap_prune_chain(buffer, offnum, &prstate);
 	}
 
+	/*
+	 * If PruneReason is PRUNE_ON_ACCESS, there may have been in-progress
+	 * deletes or inserts of HOT tuples which broke up the HOT chain and left
+	 * unchanged tuples unprocessed. MFIXME: should we just skip the below
+	 * since we don't care about most of it if ON_ACCESS? We would have to move
+	 * the record_prunable() calls back out, so maybe it's not worht it...
+	 */
+	for (offnum = FirstOffsetNumber;
+		 offnum <= maxoff;
+		 offnum = OffsetNumberNext(offnum))
+	{
+		ItemId		itemid = PageGetItemId(page, offnum);
+
+		if (ItemIdIsUsed(itemid) && !prstate.marked[offnum])
+			heap_prune_record_unchanged(page, &prstate, offnum);
+	}
+
+	/* We should now have processed every tuple exactly once  */
+#ifdef USE_ASSERT_CHECKING
+	for (offnum = FirstOffsetNumber;
+		 offnum <= maxoff;
+		 offnum = OffsetNumberNext(offnum))
+	{
+		ItemId		itemid;
+
+		if (off_loc)
+			*off_loc = offnum;
+		itemid = PageGetItemId(page, offnum);
+		if (ItemIdIsUsed(itemid))
+			Assert(prstate.marked[offnum]);
+		else
+			Assert(!prstate.marked[offnum]);
+	}
+#endif
+
 	/* Clear the offset information once we have processed the given page. */
 	if (off_loc)
 		*off_loc = InvalidOffsetNumber;
@@ -483,9 +557,6 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 		prstate.ndead > 0 ||
 		prstate.nunused > 0;
 
-	/* Record number of newly-set-LP_DEAD items for caller */
-	presult->nnewlpdead = prstate.ndead;
-
 	/*
 	 * Even if we don't prune anything, if we found a new value for the
 	 * pd_prune_xid field or the page was marked full, we will update the hint
@@ -509,8 +580,8 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 	if (prstate.actions & PRUNE_DO_TRY_FREEZE)
 	{
 		/* Is the whole page freezable? And is there something to freeze? */
-		bool		whole_page_freezable = prstate.all_visible_except_removable &&
-			presult->set_all_frozen;
+		bool		whole_page_freezable = prstate.all_visible &&
+			prstate.all_frozen;
 
 		if (prstate.pagefrz.freeze_required)
 			do_freeze = true;
@@ -538,14 +609,16 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 	 */
 	if (do_freeze)
 		heap_pre_freeze_checks(buffer, prstate.frozen, prstate.nfrozen);
-	else if (!presult->set_all_frozen || prstate.nfrozen > 0)
+	else if (!prstate.all_frozen || prstate.nfrozen > 0)
 	{
+		Assert(!prstate.pagefrz.freeze_required);
+
 		/*
 		 * If we will neither freeze tuples on the page nor set the page all
 		 * frozen in the visibility map, the page is not all-frozen and there
 		 * will be no newly frozen tuples.
 		 */
-		presult->set_all_frozen = false;
+		prstate.all_frozen = false;
 		prstate.nfrozen = 0;	/* avoid miscounts in instrumentation */
 	}
 
@@ -618,7 +691,7 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 			 */
 			if (do_freeze)
 			{
-				if (prstate.all_visible_except_removable && presult->set_all_frozen)
+				if (prstate.all_visible && prstate.all_frozen)
 					frz_conflict_horizon = prstate.visibility_cutoff_xid;
 				else
 				{
@@ -645,24 +718,61 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 
 	END_CRIT_SECTION();
 
+	/* Copy data back to 'presult' */
+	presult->ndeleted = prstate.ndeleted;
+	presult->nnewlpdead = prstate.ndead;
+	presult->nfrozen = prstate.nfrozen;
+	presult->live_tuples = prstate.live_tuples;
+	presult->recently_dead_tuples = prstate.recently_dead_tuples;
+
+	/*
+	 * It was convenient to ignore LP_DEAD items in all_visible earlier on to
+	 * make the choice of whether or not to freeze the page unaffected by the
+	 * short-term presence of LP_DEAD items.  These LP_DEAD items were
+	 * effectively assumed to be LP_UNUSED items in the making.  It doesn't
+	 * matter which heap pass (initial pass or final pass) ends up setting the
+	 * page all-frozen, as long as the ongoing VACUUM does it.
+	 *
+	 * Now that freezing has been finalized, unset all_visible if there are
+	 * any LP_DEAD items on the page.  It needs to reflect the present state
+	 * of things, as expected by our caller.
+	 */
+	if (prstate.lpdead_items == 0)
+	{
+		presult->all_visible = prstate.all_visible;
+		presult->all_frozen = prstate.all_frozen;
+	}
+	else
+	{
+		presult->all_visible = false;
+		presult->all_frozen = false;
+	}
+	presult->hastup = prstate.hastup;
+
 	/*
 	 * For callers planning to update the visibility map, the conflict horizon
-	 * for that record must be the newest xmin on the page. However, if the
+	 * for that record must be the newest xmin on the page.  However, if the
 	 * page is completely frozen, there can be no conflict and the
-	 * vm_conflict_horizon should remain InvalidTransactionId.
+	 * vm_conflict_horizon should remain InvalidTransactionId.  This includes
+	 * the case that we just froze all the tuples; the prune-freeze record
+	 * included the conflict XID already so the caller doesn't need it.
 	 */
-	if (!presult->set_all_frozen)
+	if (!presult->all_frozen)
 		presult->vm_conflict_horizon = prstate.visibility_cutoff_xid;
-	presult->nfrozen = prstate.nfrozen;
+	else
+		presult->vm_conflict_horizon = InvalidTransactionId;
+
+	presult->lpdead_items = prstate.lpdead_items;
+	/* the presult->deadoffsets array was already filled in */
 
 	/*
 	 * If we will freeze tuples on the page or, even if we don't freeze tuples
 	 * on the page, if we will set the page all-frozen in the visibility map,
 	 * we can advance relfrozenxid and relminmxid to the values in
 	 * pagefrz->FreezePageRelfrozenXid and pagefrz->FreezePageRelminMxid.
-	 * MFIXME: which one should be pick if presult->nfrozen == 0 and
-	 * presult->all_frozen = True.
 	 */
+	Assert(presult->nfrozen > 0 || !prstate.pagefrz.freeze_required);
+
 	if (new_relfrozen_xid)
 	{
 		if (presult->nfrozen > 0)
@@ -670,7 +780,6 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 		else
 			*new_relfrozen_xid = prstate.pagefrz.NoFreezePageRelfrozenXid;
 	}
-
 	if (new_relmin_mxid)
 	{
 		if (presult->nfrozen > 0)
@@ -739,25 +848,32 @@ htsv_get_valid_status(int status)
  * prstate showing the changes to be made.  Items to be redirected are added
  * to the redirected[] array (two entries per redirection); items to be set to
  * LP_DEAD state are added to nowdead[]; and items to be set to LP_UNUSED
- * state are added to nowunused[].
- *
- * Returns the number of tuples (to be) deleted from the page.
+ * state are added to nowunused[].  We perform bookkeeping of live tuples,
+ * visibility etc. based on what the page will look like after the changes
+ * applied.  All that bookkeeping is performed in the heap_prune_record_*()
+ * subroutines.  The division of labor is that heap_prune_chain() decides the
+ * fate of each tuple, ie. whether it's going to be removed, redirected or
+ * left unchanged, and the heap_prune_record_*() subroutines update PruneState
+ * based on that outcome.
  */
-static int
+static void
 heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
-				 PruneState *prstate, PruneFreezeResult *presult)
+				 PruneState *prstate)
 {
-	int			ndeleted = 0;
 	Page		dp = (Page) BufferGetPage(buffer);
 	TransactionId priorXmax = InvalidTransactionId;
 	ItemId		rootlp;
 	HeapTupleHeader htup;
-	OffsetNumber latestdead = InvalidOffsetNumber,
-				maxoff = PageGetMaxOffsetNumber(dp),
+	OffsetNumber maxoff = PageGetMaxOffsetNumber(dp),
 				offnum;
 	OffsetNumber chainitems[MaxHeapTuplesPerPage];
-	int			nchain = 0,
-				i;
+
+	/*
+	 * After traversing the HOT chain, survivor is the index in chainitems of
+	 * the first live sucessor after the last dead item.
+	 */
+	int			survivor = 0,
+				nchain = 0;
 
 	rootlp = PageGetItemId(dp, rootoffnum);
 
@@ -788,49 +904,62 @@ heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
 			 * Note that we might first arrive at a dead heap-only tuple
 			 * either here or while following a chain below.  Whichever path
 			 * gets there first will mark the tuple unused.
+			 *
+			 * Whether we arrive at the dead HOT tuple first here or while
+			 * following a chain below affects whether preceding RECENTLY_DEAD
+			 * tuples in the chain can be removed or not.  Imagine that you
+			 * have a chain with two tuples: RECENTLY_DEAD -> DEAD.  If we
+			 * reach the RECENTLY_DEAD tuple first, the chain-following logic
+			 * will find the DEAD tuple and conclude that both tuples are in
+			 * fact dead and can be removed.  But if we reach the DEAD tuple
+			 * at the end of the chain first, when we reach the RECENTLY_DEAD
+			 * tuple later, we will not follow the chain because the DEAD
+			 * TUPLE is already 'marked', and will not remove the
+			 * RECENTLY_DEAD tuple.  This is not a correctness issue, and the
+			 * RECENTLY_DEAD tuple will be removed by a later VACUUM.
 			 */
-			if (!HeapTupleHeaderIsHotUpdated(htup))
+			if (prstate->htsv[rootoffnum] == HEAPTUPLE_DEAD &&
+				!HeapTupleHeaderIsHotUpdated(htup))
 			{
-				if (prstate->htsv[rootoffnum] == HEAPTUPLE_DEAD)
-				{
-					heap_prune_record_unused(prstate, rootoffnum);
-					HeapTupleHeaderAdvanceConflictHorizon(htup,
-														  &prstate->latest_xid_removed);
-					ndeleted++;
-				}
-				else
-				{
-					Assert(!prstate->marked[rootoffnum]);
-
-					/*
-					 * MFIXME: not sure if this is right -- maybe counting too
-					 * many
-					 */
-
-					/*
-					 * Ensure that this tuple is counted. If it is later
-					 * redirected to, it would have been counted then, but we
-					 * won't double count because we check if it has already
-					 * been counted first.
-					 */
-					heap_prune_record_live_or_recently_dead(dp, prstate, rootoffnum, presult);
-				}
+				heap_prune_record_unused(prstate, rootoffnum, true);
+				HeapTupleHeaderAdvanceConflictHorizon(htup,
+													  &prstate->latest_xid_removed);
 			}
 
-			/* Nothing more to do */
-			return ndeleted;
+			/*
+			 * MFIXME: I think we could use an example like the one you
+			 * suggested below here for on-access pruning. For example, given
+			 * the following tuple versions (this is an UPDATE), we bail here
+			 * and don't end up returning because there are no prunable tuples
+			 * in this chain. This is one of the cases the record_unchanged()
+			 * loop outside in heap_page_prune_and_freeze() is catching.
+			 *
+			 * REDIRECT -> DELETE IN PROGRESS -> INSERT IN PROGRESS
+			 */
+			return;
 		}
 	}
 
 	/* Start from the root tuple */
-	offnum = rootoffnum;
+
+	/*----
+	 * MFIXME: make this into something...
+	 * this helped me to visualize how different chains might look like here.
+	 * It's not an exhaustive list, just some examples to help with thinking.
+	 * Remove this comment from final version, or refine.
+	 *
+	 * REDIRECT -> LIVE (stop) -> ...
+	 * REDIRECT -> RECENTLY_DEAD -> LIVE (stop) -> ...
+	 * REDIRECT -> RECENTLY_DEAD -> RECENTLY_DEAD
+	 * REDIRECT -> RECENTLY_DEAD -> DEAD
+	 * REDIRECT -> RECENTLY_DEAD -> DEAD -> RECENTLY_DEAD -> DEAD
+	 * RECENTLY_DEAD -> ...
+	 */
 
 	/* while not end of the chain */
-	for (;;)
+	for (offnum = rootoffnum;;)
 	{
 		ItemId		lp;
-		bool		tupdead,
-					recent_dead;
 
 		/* Sanity check (pure paranoia) */
 		if (offnum < FirstOffsetNumber)
@@ -876,19 +1005,12 @@ heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
 		{
 			/*
 			 * If the caller set PRUNE_DO_MARK_UNUSED_NOW, we can set dead
-			 * line pointers LP_UNUSED now. We don't increment ndeleted here
-			 * since the LP was already marked dead. If it will not be marked
-			 * LP_UNUSED, it will remain LP_DEAD, making the page not
-			 * all_visible.
+			 * line pointers LP_UNUSED now.
 			 */
 			if (unlikely(prstate->actions & PRUNE_DO_MARK_UNUSED_NOW))
-				heap_prune_record_unused(prstate, offnum);
+				heap_prune_record_unused(prstate, offnum, false);
 			else
-			{
-				presult->all_visible = false;
-				presult->deadoffsets[presult->lpdead_items++] = offnum;
-			}
-
+				heap_prune_record_unchanged_lp_dead(prstate, offnum);
 			break;
 		}
 
@@ -910,67 +1032,37 @@ heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
 		/*
 		 * Check tuple's visibility status.
 		 */
-		tupdead = recent_dead = false;
-
 		switch (htsv_get_valid_status(prstate->htsv[offnum]))
 		{
 			case HEAPTUPLE_DEAD:
-				tupdead = true;
-				break;
-
-			case HEAPTUPLE_RECENTLY_DEAD:
-				recent_dead = true;
 
 				/*
-				 * This tuple may soon become DEAD.  Update the hint field so
-				 * that the page is reconsidered for pruning in future.
+				 * Remember the last DEAD tuple seen.  We will advance past
+				 * RECENTLY_DEAD tuples just in case there's a DEAD one after
+				 * them; but we can't advance past anything else.  We want to
+				 * ensure that any line pointers for DEAD tuples are set
+				 * LP_DEAD or LP_UNUSED. It is important that line pointers
+				 * whose offsets are added to deadoffsets are in fact set
+				 * LP_DEAD.
 				 */
-				heap_prune_record_prunable(prstate,
-										   HeapTupleHeaderGetUpdateXid(htup));
+				survivor = nchain;
+				HeapTupleHeaderAdvanceConflictHorizon(htup,
+													  &prstate->latest_xid_removed);
 				break;
 
-			case HEAPTUPLE_DELETE_IN_PROGRESS:
-
-				/*
-				 * This tuple may soon become DEAD.  Update the hint field so
-				 * that the page is reconsidered for pruning in future.
-				 */
-				heap_prune_record_prunable(prstate,
-										   HeapTupleHeaderGetUpdateXid(htup));
+			case HEAPTUPLE_RECENTLY_DEAD:
 				break;
 
+			case HEAPTUPLE_DELETE_IN_PROGRESS:
 			case HEAPTUPLE_LIVE:
 			case HEAPTUPLE_INSERT_IN_PROGRESS:
-
-				/*
-				 * If we wanted to optimize for aborts, we might consider
-				 * marking the page prunable when we see INSERT_IN_PROGRESS.
-				 * But we don't.  See related decisions about when to mark the
-				 * page prunable in heapam.c.
-				 */
-				break;
+				goto process_chains;
 
 			default:
 				elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
 				break;
 		}
 
-		/*
-		 * Remember the last DEAD tuple seen.  We will advance past
-		 * RECENTLY_DEAD tuples just in case there's a DEAD one after them;
-		 * but we can't advance past anything else.  We have to make sure that
-		 * we don't miss any DEAD tuples, since DEAD tuples that still have
-		 * tuple storage after pruning will confuse VACUUM.
-		 */
-		if (tupdead)
-		{
-			latestdead = offnum;
-			HeapTupleHeaderAdvanceConflictHorizon(htup,
-												  &prstate->latest_xid_removed);
-		}
-		else if (!recent_dead)
-			break;
-
 		/*
 		 * If the tuple is not HOT-updated, then we are at the end of this
 		 * HOT-update chain.
@@ -990,65 +1082,67 @@ heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
 		priorXmax = HeapTupleHeaderGetUpdateXid(htup);
 	}
 
-	/*
-	 * If we found a DEAD tuple in the chain, adjust the HOT chain so that all
-	 * the DEAD tuples at the start of the chain are removed and the root line
-	 * pointer is appropriately redirected.
-	 */
-	if (OffsetNumberIsValid(latestdead))
+	if (ItemIdIsRedirected(rootlp) && nchain < 2)
 	{
 		/*
-		 * Mark as unused each intermediate item that we are able to remove
-		 * from the chain.
-		 *
-		 * When the previous item is the last dead tuple seen, we are at the
-		 * right candidate for redirection.
+		 * We found a redirect item that doesn't point to a valid follow-on
+		 * item.  This can happen if the loop in heap_page_prune_and_freeze()
+		 * caused us to visit the dead successor of a redirect item before
+		 * visiting the redirect item.  We can clean up by setting the
+		 * redirect item to DEAD state or LP_UNUSED if the caller indicated.
 		 */
-		for (i = 1; (i < nchain) && (chainitems[i - 1] != latestdead); i++)
-		{
-			heap_prune_record_unused(prstate, chainitems[i]);
-			ndeleted++;
-		}
+		heap_prune_record_dead_or_unused(prstate, rootoffnum, false);
+		return;
+	}
+
+process_chains:
+	if (!survivor)
+	{
+		int			i;
 
 		/*
-		 * If the root entry had been a normal tuple, we are deleting it, so
-		 * count it in the result.  But changing a redirect (even to DEAD
-		 * state) doesn't count.
+		 * If no DEAD tuple was found, and the root is redirected, mark it as
+		 * such.
 		 */
-		if (ItemIdIsNormal(rootlp))
-			ndeleted++;
+		if ((i = ItemIdIsRedirected(rootlp)))
+			heap_prune_record_unchanged_lp_redirect(prstate, rootoffnum);
 
+		/* the rest of tuples in the chain are normal, unchanged tuples */
+		for (; i < nchain; i++)
+			heap_prune_record_unchanged(dp, prstate, chainitems[i]);
+	}
+	else if (survivor == nchain)
+	{
 		/*
 		 * If the DEAD tuple is at the end of the chain, the entire chain is
-		 * dead and the root line pointer can be marked dead.  Otherwise just
-		 * redirect the root to the correct chain member.
+		 * dead and the root line pointer can be marked dead.
 		 */
-		if (i >= nchain)
-			heap_prune_record_dead_or_unused(prstate, rootoffnum, presult);
-		else
-			heap_prune_record_redirect(dp, prstate, rootoffnum, chainitems[i], presult);
+		heap_prune_record_dead_or_unused(prstate, rootoffnum, ItemIdIsNormal(rootlp));
+
+		for (int i = 1; i < nchain; i++)
+			heap_prune_record_unused(prstate, chainitems[i], true);
 	}
-	else if (nchain < 2 && ItemIdIsRedirected(rootlp))
+	else
 	{
 		/*
-		 * We found a redirect item that doesn't point to a valid follow-on
-		 * item.  This can happen if the loop in heap_page_prune_and_freeze()
-		 * caused us to visit the dead successor of a redirect item before
-		 * visiting the redirect item.  We can clean up by setting the
-		 * redirect item to DEAD state or LP_UNUSED if the caller indicated.
+		 * If we found a DEAD tuple in the chain, adjust the HOT chain so that
+		 * all the DEAD tuples at the start of the chain are removed and the
+		 * root line pointer is appropriately redirected.
 		 */
-		heap_prune_record_dead_or_unused(prstate, rootoffnum, presult);
-	}
+		heap_prune_record_redirect(prstate, rootoffnum, chainitems[survivor],
+								   ItemIdIsNormal(rootlp));
 
-	/*
-	 * If not marked for pruning, consider if the tuple should be counted as
-	 * live or recently dead. Note that line pointers redirected to will
-	 * already have been counted.
-	 */
-	if (ItemIdIsNormal(rootlp) && !prstate->marked[rootoffnum])
-		heap_prune_record_live_or_recently_dead(dp, prstate, rootoffnum, presult);
+		/*
+		 * Mark as unused each intermediate item that we are able to remove
+		 * from the chain.
+		 */
+		for (int i = 1; i < survivor; i++)
+			heap_prune_record_unused(prstate, chainitems[i], true);
 
-	return ndeleted;
+		/* the rest of tuples in the chain are normal, unchanged tuples */
+		for (int i = survivor; i < nchain; i++)
+			heap_prune_record_unchanged(dp, prstate, chainitems[i]);
+	}
 }
 
 /* Record lowest soon-prunable XID */
@@ -1067,43 +1161,69 @@ heap_prune_record_prunable(PruneState *prstate, TransactionId xid)
 
 /* Record line pointer to be redirected */
 static void
-heap_prune_record_redirect(Page page, PruneState *prstate,
+heap_prune_record_redirect(PruneState *prstate,
 						   OffsetNumber offnum, OffsetNumber rdoffnum,
-						   PruneFreezeResult *presult)
+						   bool was_normal)
 {
+	Assert(!prstate->marked[offnum]);
+	prstate->marked[offnum] = true;
+
+	/*
+	 * Do not mark the redirect target here.  It needs to be counted
+	 * separately as an unchanged tuple.
+	 */
+
 	Assert(prstate->nredirected < MaxHeapTuplesPerPage);
 	prstate->redirected[prstate->nredirected * 2] = offnum;
 	prstate->redirected[prstate->nredirected * 2 + 1] = rdoffnum;
-	heap_prune_record_live_or_recently_dead(page, prstate, rdoffnum, presult);
 
 	prstate->nredirected++;
-	Assert(!prstate->marked[offnum]);
-	prstate->marked[offnum] = true;
-	Assert(!prstate->marked[rdoffnum]);
-	prstate->marked[rdoffnum] = true;
 
-	presult->hastup = true;
+	/*
+	 * If the root entry had been a normal tuple, we are deleting it, so count
+	 * it in the result.  But changing a redirect (even to DEAD state) doesn't
+	 * count.
+	 */
+	if (was_normal)
+		prstate->ndeleted++;
+
+	prstate->hastup = true;
 }
 
 /* Record line pointer to be marked dead */
 static void
 heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum,
-					   PruneFreezeResult *presult)
+					   bool was_normal)
 {
+	Assert(!prstate->marked[offnum]);
+	prstate->marked[offnum] = true;
+
 	Assert(prstate->ndead < MaxHeapTuplesPerPage);
 	prstate->nowdead[prstate->ndead] = offnum;
 	prstate->ndead++;
-	Assert(!prstate->marked[offnum]);
-	prstate->marked[offnum] = true;
 
 	/*
-	 * Setting the line pointer LP_DEAD means the page will definitely not be
-	 * all_visible.
+	 * Deliberately delay unsetting all_visible until later during pruning.
+	 * Removable dead tuples shouldn't preclude freezing the page. After
+	 * finishing this first pass of tuple visibility checks, initialize
+	 * all_visible_except_removable with the current value of all_visible to
+	 * indicate whether or not the page is all visible except for dead tuples.
+	 * This will allow us to attempt to freeze the page after pruning. Later
+	 * during pruning, if we encounter an LP_DEAD item or are setting an item
+	 * LP_DEAD, we will unset all_visible. As long as we unset it before
+	 * updating the visibility map, this will be correct.
 	 */
-	presult->all_visible = false;
 
 	/* Record the dead offset for vacuum */
-	presult->deadoffsets[presult->lpdead_items++] = offnum;
+	prstate->deadoffsets[prstate->lpdead_items++] = offnum;
+
+	/*
+	 * If the root entry had been a normal tuple, we are deleting it, so count
+	 * it in the result.  But changing a redirect (even to DEAD state) doesn't
+	 * count.
+	 */
+	if (was_normal)
+		prstate->ndeleted++;
 }
 
 /*
@@ -1114,7 +1234,7 @@ heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum,
  */
 static void
 heap_prune_record_dead_or_unused(PruneState *prstate, OffsetNumber offnum,
-								 PruneFreezeResult *presult)
+								 bool was_normal)
 {
 	/*
 	 * If the caller set PRUNE_DO_MARK_UNUSED_NOW, we can remove dead tuples
@@ -1123,57 +1243,45 @@ heap_prune_record_dead_or_unused(PruneState *prstate, OffsetNumber offnum,
 	 * likely.
 	 */
 	if (unlikely(prstate->actions & PRUNE_DO_MARK_UNUSED_NOW))
-		heap_prune_record_unused(prstate, offnum);
+		heap_prune_record_unused(prstate, offnum, was_normal);
 	else
-		heap_prune_record_dead(prstate, offnum, presult);
+		heap_prune_record_dead(prstate, offnum, was_normal);
 }
 
 /* Record line pointer to be marked unused */
 static void
-heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum)
+heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum, bool was_normal)
 {
+	Assert(!prstate->marked[offnum]);
+	prstate->marked[offnum] = true;
+
 	Assert(prstate->nunused < MaxHeapTuplesPerPage);
 	prstate->nowunused[prstate->nunused] = offnum;
 	prstate->nunused++;
-	Assert(!prstate->marked[offnum]);
-	prstate->marked[offnum] = true;
+
+	/*
+	 * If the root entry had been a normal tuple, we are deleting it, so count
+	 * it in the result.  But changing a redirect (even to DEAD state) doesn't
+	 * count.
+	 */
+	if (was_normal)
+		prstate->ndeleted++;
 }
 
+
+/*
+ * Record line pointer that is left unchanged.  We consider freezing it, and
+ * update bookkeeping of tuple counts and page visibility.
+ */
 static void
-heap_prune_record_live_or_recently_dead(Page page, PruneState *prstate, OffsetNumber offnum,
-										PruneFreezeResult *presult)
+heap_prune_record_unchanged(Page page, PruneState *prstate, OffsetNumber offnum)
 {
-	HTSV_Result status;
 	HeapTupleHeader htup;
-	bool		totally_frozen;
-
-	/* This could happen for items which are redirected to. */
-	if (prstate->counted[offnum])
-		return;
 
-	prstate->counted[offnum] = true;
-
-	/*
-	 * If we don't want to do any of the special defined actions, we don't
-	 * need to continue.
-	 */
-	if (prstate->actions == 0)
-		return;
-
-	status = htsv_get_valid_status(prstate->htsv[offnum]);
-
-	Assert(status != HEAPTUPLE_DEAD);
+	Assert(!prstate->marked[offnum]);
+	prstate->marked[offnum] = true;
 
-	/*
-	 * Deliberately don't set hastup for LP_DEAD items.  We make the soft
-	 * assumption that any LP_DEAD items encountered here will become
-	 * LP_UNUSED later on, before count_nondeletable_pages is reached.  If we
-	 * don't make this assumption then rel truncation will only happen every
-	 * other VACUUM, at most.  Besides, VACUUM must treat
-	 * hastup/nonempty_pages as provisional no matter how LP_DEAD items are
-	 * handled (handled here, or handled later on).
-	 */
-	presult->hastup = true;
+	prstate->hastup = true;		/* the page is not empty */
 
 	/*
 	 * The criteria for counting a tuple as live in this block need to match
@@ -1185,28 +1293,19 @@ heap_prune_record_live_or_recently_dead(Page page, PruneState *prstate, OffsetNu
 	 * can't run inside a transaction block, which makes some cases impossible
 	 * (e.g. in-progress insert from the same transaction).
 	 *
-	 * We treat LP_DEAD items (which are the closest thing to DEAD tuples that
-	 * might be seen here) differently, too: we assume that they'll become
-	 * LP_UNUSED before VACUUM finishes.  This difference is only superficial.
-	 * VACUUM effectively agrees with ANALYZE about DEAD items, in the end.
-	 * VACUUM won't remember LP_DEAD items, but only because they're not
-	 * supposed to be left behind when it is done. (Cases where we bypass
-	 * index vacuuming will violate this optimistic assumption, but the
-	 * overall impact of that should be negligible.)
-	 *
-	 * HEAPTUPLE_LIVE tuples are naturally counted as live. This is also what
-	 * acquire_sample_rows() does.
-	 *
-	 * HEAPTUPLE_DELETE_IN_PROGRESS tuples are expected during concurrent
-	 * vacuum. We expect the deleting transaction to update the counters at
-	 * commit after we report our results, so count these tuples as live to
-	 * ensure the math works out. The assumption that the transaction will
-	 * commit and update the counters after we report is a bit shaky; but it
-	 * is what acquire_sample_rows() does, so we do the same to be consistent.
+	 * HEAPTUPLE_DEAD are handled by the other heap_prune_record_*()
+	 * subroutines.  They don't count dead items like acquire_sample_rows()
+	 * does, because we assume that all dead items will become LP_UNUSED
+	 * before VACUUM finishes.  This difference is only superficial.  VACUUM
+	 * effectively agrees with ANALYZE about DEAD items, in the end.  VACUUM
+	 * won't remember LP_DEAD items, but only because they're not supposed to
+	 * be left behind when it is done. (Cases where we bypass index vacuuming
+	 * will violate this optimistic assumption, but the overall impact of that
+	 * should be negligible.)
 	 */
 	htup = (HeapTupleHeader) PageGetItem(page, PageGetItemId(page, offnum));
 
-	switch (status)
+	switch (prstate->htsv[offnum])
 	{
 		case HEAPTUPLE_LIVE:
 
@@ -1214,7 +1313,7 @@ heap_prune_record_live_or_recently_dead(Page page, PruneState *prstate, OffsetNu
 			 * Count it as live.  Not only is this natural, but it's also what
 			 * acquire_sample_rows() does.
 			 */
-			presult->live_tuples++;
+			prstate->live_tuples++;
 
 			/*
 			 * Is the tuple definitely visible to all transactions?
@@ -1224,14 +1323,13 @@ heap_prune_record_live_or_recently_dead(Page page, PruneState *prstate, OffsetNu
 			 * See SetHintBits for more info. Check that the tuple is hinted
 			 * xmin-committed because of that.
 			 */
-			if (prstate->all_visible_except_removable)
+			if (prstate->all_visible)
 			{
 				TransactionId xmin;
 
 				if (!HeapTupleHeaderXminCommitted(htup))
 				{
-					prstate->all_visible_except_removable = false;
-					presult->all_visible = false;
+					prstate->all_visible = false;
 					break;
 				}
 
@@ -1248,8 +1346,7 @@ heap_prune_record_live_or_recently_dead(Page page, PruneState *prstate, OffsetNu
 				Assert(prstate->pagefrz.cutoffs);
 				if (!TransactionIdPrecedes(xmin, prstate->pagefrz.cutoffs->OldestXmin))
 				{
-					prstate->all_visible_except_removable = false;
-					presult->all_visible = false;
+					prstate->all_visible = false;
 					break;
 				}
 
@@ -1259,6 +1356,7 @@ heap_prune_record_live_or_recently_dead(Page page, PruneState *prstate, OffsetNu
 					prstate->visibility_cutoff_xid = xmin;
 			}
 			break;
+
 		case HEAPTUPLE_RECENTLY_DEAD:
 
 			/*
@@ -1266,10 +1364,35 @@ heap_prune_record_live_or_recently_dead(Page page, PruneState *prstate, OffsetNu
 			 * relation.  (We only remove items that are LP_DEAD from
 			 * pruning.)
 			 */
-			presult->recently_dead_tuples++;
-			prstate->all_visible_except_removable = false;
-			presult->all_visible = false;
+			prstate->recently_dead_tuples++;
+			prstate->all_visible = false;
+
+			/*
+			 * This tuple may soon become DEAD.  Update the hint field so that
+			 * the page is reconsidered for pruning in future.
+			 */
+			heap_prune_record_prunable(prstate,
+									   HeapTupleHeaderGetUpdateXid(htup));
 			break;
+
+		case HEAPTUPLE_DELETE_IN_PROGRESS:
+
+			/*
+			 * This an expected case during concurrent vacuum. Count such rows
+			 * as live.  As above, we assume the deleting transaction will
+			 * commit and update the counters after we report.
+			 */
+			prstate->live_tuples++;
+			prstate->all_visible = false;
+
+			/*
+			 * This tuple may soon become DEAD.  Update the hint field so that
+			 * the page is reconsidered for pruning in future.
+			 */
+			heap_prune_record_prunable(prstate,
+									   HeapTupleHeaderGetUpdateXid(htup));
+			break;
+
 		case HEAPTUPLE_INSERT_IN_PROGRESS:
 
 			/*
@@ -1279,22 +1402,24 @@ heap_prune_record_live_or_recently_dead(Page page, PruneState *prstate, OffsetNu
 			 * assumption is a bit shaky, but it is what acquire_sample_rows()
 			 * does, so be consistent.
 			 */
-			prstate->all_visible_except_removable = false;
-			presult->all_visible = false;
-			break;
-		case HEAPTUPLE_DELETE_IN_PROGRESS:
+			prstate->all_visible = false;
 
 			/*
-			 * This an expected case during concurrent vacuum. Count such rows
-			 * as live.  As above, we assume the deleting transaction will
-			 * commit and update the counters after we report.
+			 * If we wanted to optimize for aborts, we might consider marking
+			 * the page prunable when we see INSERT_IN_PROGRESS.  But we
+			 * don't.  See related decisions about when to mark the page
+			 * prunable in heapam.c.
 			 */
-			presult->live_tuples++;
-			prstate->all_visible_except_removable = false;
-			presult->all_visible = false;
 			break;
+
 		default:
-			elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
+
+			/*
+			 * DEAD tuples should've been passed to heap_prune_record_dead()
+			 * or heap_prune_record_unused() instead.
+			 */
+			elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result %d",
+				 prstate->htsv[offnum]);
 			break;
 	}
 
@@ -1302,12 +1427,14 @@ heap_prune_record_live_or_recently_dead(Page page, PruneState *prstate, OffsetNu
 	if (prstate->actions & PRUNE_DO_TRY_FREEZE)
 	{
 		/* Tuple with storage -- consider need to freeze */
+		bool		totally_frozen;
+
 		if ((heap_prepare_freeze_tuple(htup, &prstate->pagefrz,
-									   &prstate->frozen[presult->nfrozen],
+									   &prstate->frozen[prstate->nfrozen],
 									   &totally_frozen)))
 		{
 			/* Save prepared freeze plan for later */
-			prstate->frozen[presult->nfrozen++].offset = offnum;
+			prstate->frozen[prstate->nfrozen++].offset = offnum;
 		}
 
 		/*
@@ -1316,9 +1443,50 @@ heap_prune_record_live_or_recently_dead(Page page, PruneState *prstate, OffsetNu
 		 * definitely cannot be set all-frozen in the visibility map later on
 		 */
 		if (!totally_frozen)
-			presult->set_all_frozen = false;
+			prstate->all_frozen = false;
 	}
+}
 
+/*
+ * Record line pointer that was already LP_DEAD and is left unchanged.
+ */
+static void
+heap_prune_record_unchanged_lp_dead(PruneState *prstate, OffsetNumber offnum)
+{
+	/*
+	 * Deliberately don't set hastup for LP_DEAD items.  We make the soft
+	 * assumption that any LP_DEAD items encountered here will become
+	 * LP_UNUSED later on, before count_nondeletable_pages is reached.  If we
+	 * don't make this assumption then rel truncation will only happen every
+	 * other VACUUM, at most.  Besides, VACUUM must treat
+	 * hastup/nonempty_pages as provisional no matter how LP_DEAD items are
+	 * handled (handled here, or handled later on).
+	 *
+	 * Similarly, don't unset all_visible until later, at the end of
+	 * heap_page_prune_and_freeze().  This will allow us to attempt to freeze
+	 * the page after pruning.  As long as we unset it before updating the
+	 * visibility map, this will be correct.
+	 */
+
+	Assert(!prstate->marked[offnum]);
+	prstate->marked[offnum] = true;
+
+	prstate->deadoffsets[prstate->lpdead_items++] = offnum;
+}
+
+static void
+heap_prune_record_unchanged_lp_redirect(PruneState *prstate, OffsetNumber offnum)
+{
+	/*
+	 * A redirect line pointer doesn't count as a live tuple.
+	 *
+	 * If we leave a redirect line pointer in place, there will be another
+	 * tuple on the page that it points to.  We will do the bookkeeping for
+	 * that separately. So we have nothing to do here, except remember that we
+	 * processed this item.
+	 */
+	Assert(!prstate->marked[offnum]);
+	prstate->marked[offnum] = true;
 }
 
 /*
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 04e86347a0b..92e02863e2d 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -1462,7 +1462,7 @@ lazy_scan_prune(LVRelState *vacrel,
 									  &debug_cutoff, &debug_all_frozen))
 			Assert(false);
 
-		Assert(presult.set_all_frozen == debug_all_frozen);
+		Assert(presult.all_frozen == debug_all_frozen);
 
 		Assert(!TransactionIdIsValid(debug_cutoff) ||
 			   debug_cutoff == presult.vm_conflict_horizon);
@@ -1517,7 +1517,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	{
 		uint8		flags = VISIBILITYMAP_ALL_VISIBLE;
 
-		if (presult.set_all_frozen)
+		if (presult.all_frozen)
 		{
 			Assert(!TransactionIdIsValid(presult.vm_conflict_horizon));
 			flags |= VISIBILITYMAP_ALL_FROZEN;
@@ -1588,7 +1588,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	 * true, so we must check both all_visible and all_frozen.
 	 */
 	else if (all_visible_according_to_vm && presult.all_visible &&
-			 presult.set_all_frozen && !VM_ALL_FROZEN(vacrel->rel, blkno, &vmbuffer))
+			 presult.all_frozen && !VM_ALL_FROZEN(vacrel->rel, blkno, &vmbuffer))
 	{
 		/*
 		 * Avoid relying on all_visible_according_to_vm as a proxy for the
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index ef61e0277ee..9a20fef3a79 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -224,37 +224,37 @@ typedef struct PruneFreezeResult
 	int			nnewlpdead;		/* Number of newly LP_DEAD items */
 	int			nfrozen;		/* Number of tuples we froze */
 
-	/*
-	 * The rest of the fields in PruneFreezeResult are only guaranteed to be
-	 * initialized if heap_page_prune_and_freeze() is passed
-	 * PRUNE_DO_TRY_FREEZE.
-	 */
-	/* Whether or not the page should be set all-frozen in the VM */
-	bool		set_all_frozen;
-
-	/* Number of live and recently dead tuples */
+	/* Number of live and recently dead tuples on the page, after pruning */
 	int			live_tuples;
 	int			recently_dead_tuples;
 
 	/*
-	 * Whether or not the page is truly all-visible after pruning. If there
-	 * are LP_DEAD items on the page which cannot be removed until vacuum's
-	 * second pass, this will be false.
+	 * Whether or not the page makes rel truncation unsafe
+	 *
+	 * This is set to 'true', even if the page contains LP_DEAD items. VACUUM
+	 * will remove them before attempting to truncate.
 	 */
-	bool		all_visible;
-
-
-	/* Whether or not the page makes rel truncation unsafe */
 	bool		hastup;
 
 	/*
-	 * If the page is all-visible and not all-frozen this is the oldest xid
-	 * that can see the page as all-visible. It is to be used as the snapshot
-	 * conflict horizon when emitting a XLOG_HEAP2_VISIBLE record.
+	 * all_visible and all_frozen indicate if the all-visible and all-frozen
+	 * bits in the visibility map can be set for this page, after pruning.
+	 *
+	 * vm_conflict_horizon is the newest xmin of live tuples on the page.  The
+	 * caller can use it as the conflict horizon, when setting the VM bits. It
+	 * is only valid if we froze some tuples, and all_frozen is true.
+	 *
+	 * These are only set if the PRUNE_DO_TRY_FREEZE action flag is set.
 	 */
+	bool		all_visible;
+	bool		all_frozen;
 	TransactionId vm_conflict_horizon;
 
-	int			lpdead_items;	/* includes existing LP_DEAD items */
+	/*
+	 * LP_DEAD items on the page after pruning. Includes existing LP_DEAD
+	 * items
+	 */
+	int			lpdead_items;
 	OffsetNumber deadoffsets[MaxHeapTuplesPerPage];
 } PruneFreezeResult;
 
-- 
2.40.1

