From a5cb8877b001e9ad5e46ba565778f41bfa47ffec Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Tue, 26 Mar 2024 13:54:19 -0400
Subject: [PATCH v7 16/16] move live tuple accounting to heap_prune_chain()

ci-os-only:
---
 src/backend/access/heap/pruneheap.c  | 636 ++++++++++++++++-----------
 src/backend/access/heap/vacuumlazy.c |  38 +-
 src/include/access/heapam.h          |  59 ++-
 3 files changed, 424 insertions(+), 309 deletions(-)

diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 6d5f8ba4417..744f3b5fabd 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -34,8 +34,9 @@ typedef struct
 {
 	/* tuple visibility test, initialized for the relation */
 	GlobalVisState *vistest;
-	/* whether or not dead items can be set LP_UNUSED during pruning */
-	bool		mark_unused_now;
+	uint8		actions;
+	TransactionId visibility_cutoff_xid;
+	bool		all_visible_except_removable;
 
 	TransactionId new_prune_xid;	/* new prune hint value for page */
 	TransactionId latest_xid_removed;
@@ -67,10 +68,14 @@ typedef struct
 	 */
 	int8		htsv[MaxHeapTuplesPerPage + 1];
 
+	HeapPageFreeze pagefrz;
+
 	/*
-	 * One entry for every tuple that we may freeze.
+	 * Whether or not this tuple has been counted toward vacuum stats. In
+	 * heap_prune_chain(), we have to be sure that Heap Only Tuples that are
+	 * not part of any chain are counted correctly.
 	 */
-	HeapTupleFreeze frozen[MaxHeapTuplesPerPage];
+	bool		counted[MaxHeapTuplesPerPage + 1];
 } PruneState;
 
 /* Local functions */
@@ -83,7 +88,7 @@ static int	heap_prune_chain(Buffer buffer,
 
 static inline HTSV_Result htsv_get_valid_status(int status);
 static void heap_prune_record_prunable(PruneState *prstate, TransactionId xid);
-static void heap_prune_record_redirect(PruneState *prstate,
+static void heap_prune_record_redirect(Page page, PruneState *prstate,
 									   OffsetNumber offnum, OffsetNumber rdoffnum,
 									   PruneFreezeResult *presult);
 static void heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum,
@@ -91,6 +96,9 @@ static void heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum,
 static void heap_prune_record_dead_or_unused(PruneState *prstate, OffsetNumber offnum,
 											 PruneFreezeResult *presult);
 static void heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum);
+
+static void heap_prune_record_live_or_recently_dead(Page page, PruneState *prstate,
+													OffsetNumber offnum, PruneFreezeResult *presult);
 static void page_verify_redirects(Page page);
 
 
@@ -172,12 +180,13 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
 			PruneFreezeResult presult;
 
 			/*
-			 * For now, pass mark_unused_now as false regardless of whether or
-			 * not the relation has indexes, since we cannot safely determine
-			 * that during on-access pruning with the current implementation.
+			 * For now, do not set PRUNE_DO_MARK_UNUSED_NOW regardless of
+			 * whether or not the relation has indexes, since we cannot safely
+			 * determine that during on-access pruning with the current
+			 * implementation.
 			 */
-			heap_page_prune_and_freeze(relation, buffer, vistest, false, NULL,
-									   &presult, PRUNE_ON_ACCESS, NULL);
+			heap_page_prune_and_freeze(relation, buffer, 0, vistest,
+									   NULL, &presult, PRUNE_ON_ACCESS, NULL, NULL, NULL);
 
 			/*
 			 * Report the number of tuples reclaimed to pgstats.  This is
@@ -209,7 +218,6 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
 	}
 }
 
-
 /*
  * Prune and repair fragmentation and potentially freeze tuples on the
  * specified page.
@@ -223,16 +231,12 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
  * also need to account for a reduction in the length of the line pointer
  * array following array truncation by us.
  *
+ * actions are the pruning actions that heap_page_prune_and_freeze() should
+ * take.
+ *
  * vistest is used to distinguish whether tuples are DEAD or RECENTLY_DEAD
  * (see heap_prune_satisfies_vacuum).
  *
- * mark_unused_now indicates whether or not dead items can be set LP_UNUSED
- * during pruning.
- *
- * pagefrz is an input parameter containing visibility cutoff information and
- * the current relfrozenxid and relminmxids used if the caller is interested in
- * freezing tuples on the page.
- *
  * presult contains output parameters needed by callers such as the number of
  * tuples removed and the number of line pointers newly marked LP_DEAD.
  * heap_page_prune_and_freeze() is responsible for initializing it.
@@ -242,15 +246,21 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
  *
  * off_loc is the offset location required by the caller to use in error
  * callback.
+ *
+ * new_relfrozen_xid and new_relmin_xid are provided by the caller if they
+ * would like the current values of those updated as part of advancing
+ * relfrozenxid/relminmxid.
  */
 void
 heap_page_prune_and_freeze(Relation relation, Buffer buffer,
+						   uint8 actions,
 						   GlobalVisState *vistest,
-						   bool mark_unused_now,
-						   HeapPageFreeze *pagefrz,
+						   struct VacuumCutoffs *cutoffs,
 						   PruneFreezeResult *presult,
 						   PruneReason reason,
-						   OffsetNumber *off_loc)
+						   OffsetNumber *off_loc,
+						   TransactionId *new_relfrozen_xid,
+						   MultiXactId *new_relmin_mxid)
 {
 	Page		page = BufferGetPage(buffer);
 	BlockNumber blockno = BufferGetBlockNumber(buffer);
@@ -258,15 +268,43 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 				maxoff;
 	PruneState	prstate;
 	HeapTupleData tup;
-	TransactionId visibility_cutoff_xid;
 	TransactionId frz_conflict_horizon;
 	bool		do_freeze;
-	bool		all_visible_except_removable;
 	bool		do_prune;
 	bool		do_hint;
 	bool		hint_bit_fpi;
 	int64		fpi_before = pgWalUsage.wal_fpi;
 
+	/*
+	 * pagefrz contains visibility cutoff information and the current
+	 * relfrozenxid and relminmxids used if the caller is interested in
+	 * freezing tuples on the page.
+	 */
+	prstate.pagefrz.cutoffs = cutoffs;
+	prstate.pagefrz.freeze_required = false;
+
+	if (new_relmin_mxid)
+	{
+		prstate.pagefrz.FreezePageRelminMxid = *new_relmin_mxid;
+		prstate.pagefrz.NoFreezePageRelminMxid = *new_relmin_mxid;
+	}
+	else
+	{
+		prstate.pagefrz.FreezePageRelminMxid = InvalidMultiXactId;
+		prstate.pagefrz.NoFreezePageRelminMxid = InvalidMultiXactId;
+	}
+
+	if (new_relfrozen_xid)
+	{
+		prstate.pagefrz.FreezePageRelfrozenXid = *new_relfrozen_xid;
+		prstate.pagefrz.NoFreezePageRelfrozenXid = *new_relfrozen_xid;
+	}
+	else
+	{
+		prstate.pagefrz.FreezePageRelfrozenXid = InvalidTransactionId;
+		prstate.pagefrz.NoFreezePageRelfrozenXid = InvalidTransactionId;
+	}
+
 	/*
 	 * Our strategy is to scan the page and make lists of items to change,
 	 * then apply the changes within a critical section.  This keeps as much
@@ -280,10 +318,11 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 	 */
 	prstate.new_prune_xid = InvalidTransactionId;
 	prstate.vistest = vistest;
-	prstate.mark_unused_now = mark_unused_now;
+	prstate.actions = actions;
 	prstate.latest_xid_removed = InvalidTransactionId;
 	prstate.nredirected = prstate.ndead = prstate.nunused = 0;
 	memset(prstate.marked, 0, sizeof(prstate.marked));
+	memset(prstate.counted, 0, sizeof(prstate.counted));
 
 	/*
 	 * prstate.htsv is not initialized here because all ntuple spots in the
@@ -291,7 +330,6 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 	 */
 	presult->ndeleted = 0;
 	presult->nnewlpdead = 0;
-	presult->nfrozen = 0;
 
 	presult->hastup = false;
 
@@ -300,13 +338,45 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 	presult->lpdead_items = 0;
 
 	/*
-	 * Caller will update the VM after pruning, collecting LP_DEAD items, and
+	 * Caller may update the VM after pruning, collecting LP_DEAD items, and
 	 * freezing tuples. Keep track of whether or not the page is all_visible
 	 * and all_frozen and use this information to update the VM. all_visible
 	 * implies lpdead_items == 0, but don't trust all_frozen result unless
-	 * all_visible is also set to true.
+	 * all_visible is also set to true. If we won't even try freezing,
+	 * initialize all_frozen to false.
+	 *
+	 * For vacuum, if the whole page will become frozen, we consider
+	 * opportunistically freezing tuples. Dead tuples which will be removed by
+	 * the end of vacuuming should not preclude us from opportunistically
+	 * freezing. We will not be able to freeze the whole page if there are
+	 * tuples present which are not visible to everyone or if there are dead
+	 * tuples which are not yet removable. We need all_visible to be false if
+	 * LP_DEAD tuples remain after pruning so that we do not incorrectly
+	 * update the visibility map or page hint bit. So, we will update
+	 * presult->all_visible to reflect the presence of LP_DEAD items while
+	 * pruning and keep all_visible_except_removable to permit freezing if the
+	 * whole page will eventually become all visible after removing tuples.
 	 */
-	presult->all_frozen = true;
+	presult->all_visible = true;
+
+	if (prstate.actions & PRUNE_DO_TRY_FREEZE)
+		presult->set_all_frozen = true;
+	else
+		presult->set_all_frozen = false;
+	presult->nfrozen = 0;
+
+	/*
+	 * Deliberately delay unsetting all_visible until later during pruning.
+	 * Removable dead tuples shouldn't preclude freezing the page. After
+	 * finishing this first pass of tuple visibility checks, initialize
+	 * all_visible_except_removable with the current value of all_visible to
+	 * indicate whether or not the page is all visible except for dead tuples.
+	 * This will allow us to attempt to freeze the page after pruning. Later
+	 * during pruning, if we encounter an LP_DEAD item or are setting an item
+	 * LP_DEAD, we will unset all_visible. As long as we unset it before
+	 * updating the visibility map, this will be correct.
+	 */
+	prstate.all_visible_except_removable = true;
 
 	/*
 	 * The visibility cutoff xid is the newest xmin of live tuples on the
@@ -316,13 +386,9 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 	 * running transaction on the standby does not see tuples on the page as
 	 * all-visible, so the conflict horizon remains InvalidTransactionId.
 	 */
-	presult->vm_conflict_horizon = visibility_cutoff_xid = InvalidTransactionId;
+	presult->vm_conflict_horizon = prstate.visibility_cutoff_xid = InvalidTransactionId;
 	frz_conflict_horizon = InvalidTransactionId;
 
-	/* For advancing relfrozenxid and relminmxid */
-	presult->new_relfrozenxid = InvalidTransactionId;
-	presult->new_relminmxid = InvalidMultiXactId;
-
 	maxoff = PageGetMaxOffsetNumber(page);
 	tup.t_tableOid = RelationGetRelid(relation);
 
@@ -346,7 +412,6 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 	 * prefetching efficiency significantly / decreases the number of cache
 	 * misses.
 	 */
-	all_visible_except_removable = true;
 	for (offnum = maxoff;
 		 offnum >= FirstOffsetNumber;
 		 offnum = OffsetNumberPrev(offnum))
@@ -375,168 +440,6 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 
 		prstate.htsv[offnum] = heap_prune_satisfies_vacuum(&prstate, &tup,
 														   buffer);
-
-		if (reason == PRUNE_ON_ACCESS)
-			continue;
-
-		/*
-		 * The criteria for counting a tuple as live in this block need to
-		 * match what analyze.c's acquire_sample_rows() does, otherwise VACUUM
-		 * and ANALYZE may produce wildly different reltuples values, e.g.
-		 * when there are many recently-dead tuples.
-		 *
-		 * The logic here is a bit simpler than acquire_sample_rows(), as
-		 * VACUUM can't run inside a transaction block, which makes some cases
-		 * impossible (e.g. in-progress insert from the same transaction).
-		 *
-		 * We treat LP_DEAD items (which are the closest thing to DEAD tuples
-		 * that might be seen here) differently, too: we assume that they'll
-		 * become LP_UNUSED before VACUUM finishes.  This difference is only
-		 * superficial.  VACUUM effectively agrees with ANALYZE about DEAD
-		 * items, in the end.  VACUUM won't remember LP_DEAD items, but only
-		 * because they're not supposed to be left behind when it is done.
-		 * (Cases where we bypass index vacuuming will violate this optimistic
-		 * assumption, but the overall impact of that should be negligible.)
-		 */
-		switch (prstate.htsv[offnum])
-		{
-			case HEAPTUPLE_DEAD:
-
-				/*
-				 * Deliberately delay unsetting all_visible until later during
-				 * pruning. Removable dead tuples shouldn't preclude freezing
-				 * the page. After finishing this first pass of tuple
-				 * visibility checks, initialize all_visible_except_removable
-				 * with the current value of all_visible to indicate whether
-				 * or not the page is all visible except for dead tuples. This
-				 * will allow us to attempt to freeze the page after pruning.
-				 * Later during pruning, if we encounter an LP_DEAD item or
-				 * are setting an item LP_DEAD, we will unset all_visible. As
-				 * long as we unset it before updating the visibility map,
-				 * this will be correct.
-				 */
-				break;
-			case HEAPTUPLE_LIVE:
-
-				/*
-				 * Count it as live.  Not only is this natural, but it's also
-				 * what acquire_sample_rows() does.
-				 */
-				presult->live_tuples++;
-
-				/*
-				 * Is the tuple definitely visible to all transactions?
-				 *
-				 * NB: Like with per-tuple hint bits, we can't set the
-				 * PD_ALL_VISIBLE flag if the inserter committed
-				 * asynchronously. See SetHintBits for more info. Check that
-				 * the tuple is hinted xmin-committed because of that.
-				 */
-				if (all_visible_except_removable)
-				{
-					TransactionId xmin;
-
-					if (!HeapTupleHeaderXminCommitted(htup))
-					{
-						all_visible_except_removable = false;
-						break;
-					}
-
-					/*
-					 * The inserter definitely committed. But is it old enough
-					 * that everyone sees it as committed? A
-					 * FrozenTransactionId is seen as committed to everyone.
-					 * Otherwise, we check if there is a snapshot that
-					 * considers this xid to still be running, and if so, we
-					 * don't consider the page all-visible.
-					 */
-					xmin = HeapTupleHeaderGetXmin(htup);
-					if (xmin != FrozenTransactionId &&
-						!GlobalVisTestIsRemovableXid(vistest, xmin))
-					{
-						all_visible_except_removable = false;
-						break;
-					}
-
-					/* Track newest xmin on page. */
-					if (TransactionIdFollows(xmin, visibility_cutoff_xid) &&
-						TransactionIdIsNormal(xmin))
-						visibility_cutoff_xid = xmin;
-				}
-				break;
-			case HEAPTUPLE_RECENTLY_DEAD:
-
-				/*
-				 * If tuple is recently dead then we must not remove it from
-				 * the relation.  (We only remove items that are LP_DEAD from
-				 * pruning.)
-				 */
-				presult->recently_dead_tuples++;
-				all_visible_except_removable = false;
-				break;
-			case HEAPTUPLE_INSERT_IN_PROGRESS:
-
-				/*
-				 * We do not count these rows as live, because we expect the
-				 * inserting transaction to update the counters at commit, and
-				 * we assume that will happen only after we report our
-				 * results.  This assumption is a bit shaky, but it is what
-				 * acquire_sample_rows() does, so be consistent.
-				 */
-				all_visible_except_removable = false;
-				break;
-			case HEAPTUPLE_DELETE_IN_PROGRESS:
-
-				/*
-				 * This an expected case during concurrent vacuum. Count such
-				 * rows as live.  As above, we assume the deleting transaction
-				 * will commit and update the counters after we report.
-				 */
-				presult->live_tuples++;
-				all_visible_except_removable = false;
-				break;
-			default:
-				elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
-				break;
-		}
-
-		if (prstate.htsv[offnum] != HEAPTUPLE_DEAD)
-		{
-			/*
-			 * Deliberately don't set hastup for LP_DEAD items.  We make the
-			 * soft assumption that any LP_DEAD items encountered here will
-			 * become LP_UNUSED later on, before count_nondeletable_pages is
-			 * reached.  If we don't make this assumption then rel truncation
-			 * will only happen every other VACUUM, at most.  Besides, VACUUM
-			 * must treat hastup/nonempty_pages as provisional no matter how
-			 * LP_DEAD items are handled (handled here, or handled later on).
-			 */
-			presult->hastup = true;
-
-			/* Consider freezing any normal tuples which will not be removed */
-			if (pagefrz)
-			{
-				bool		totally_frozen;
-
-				/* Tuple with storage -- consider need to freeze */
-				if ((heap_prepare_freeze_tuple(htup, pagefrz,
-											   &prstate.frozen[presult->nfrozen],
-											   &totally_frozen)))
-				{
-					/* Save prepared freeze plan for later */
-					prstate.frozen[presult->nfrozen++].offset = offnum;
-				}
-
-				/*
-				 * If any tuple isn't either totally frozen already or
-				 * eligible to become totally frozen (according to its freeze
-				 * plan), then the page definitely cannot be set all-frozen in
-				 * the visibility map later on
-				 */
-				if (!totally_frozen)
-					presult->all_frozen = false;
-			}
-		}
 	}
 
 	/*
@@ -545,21 +448,6 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 	 */
 	hint_bit_fpi = fpi_before != pgWalUsage.wal_fpi;
 
-	/*
-	 * For vacuum, if the whole page will become frozen, we consider
-	 * opportunistically freezing tuples. Dead tuples which will be removed by
-	 * the end of vacuuming should not preclude us from opportunistically
-	 * freezing. We will not be able to freeze the whole page if there are
-	 * tuples present which are not visible to everyone or if there are dead
-	 * tuples which are not yet removable. We need all_visible to be false if
-	 * LP_DEAD tuples remain after pruning so that we do not incorrectly
-	 * update the visibility map or page hint bit. So, we will update
-	 * presult->all_visible to reflect the presence of LP_DEAD items while
-	 * pruning and keep all_visible_except_removable to permit freezing if the
-	 * whole page will eventually become all visible after removing tuples.
-	 */
-	presult->all_visible = all_visible_except_removable;
-
 	/* Scan the page */
 	for (offnum = FirstOffsetNumber;
 		 offnum <= maxoff;
@@ -615,15 +503,14 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 	 * opportunistic freeze heuristic must be improved; however, for now, try
 	 * to approximate it.
 	 */
-
 	do_freeze = false;
-	if (pagefrz)
+	if (prstate.actions & PRUNE_DO_TRY_FREEZE)
 	{
 		/* Is the whole page freezable? And is there something to freeze? */
-		bool		whole_page_freezable = all_visible_except_removable &&
-			presult->all_frozen;
+		bool		whole_page_freezable = prstate.all_visible_except_removable &&
+			presult->set_all_frozen;
 
-		if (pagefrz->freeze_required)
+		if (prstate.pagefrz.freeze_required)
 			do_freeze = true;
 		else if (whole_page_freezable && presult->nfrozen > 0)
 		{
@@ -648,17 +535,16 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 	 * want to avoid doing the pre-freeze checks in a critical section.
 	 */
 	if (do_freeze)
-		heap_pre_freeze_checks(buffer, prstate.frozen, presult->nfrozen);
-
-	if (!do_freeze && (!pagefrz || !presult->all_frozen || presult->nfrozen > 0))
+		heap_pre_freeze_checks(buffer, prstate.pagefrz.frozen, presult->nfrozen);
+	else if (!presult->set_all_frozen || presult->nfrozen > 0)
 	{
 		/*
 		 * If we will neither freeze tuples on the page nor set the page all
 		 * frozen in the visibility map, the page is not all-frozen and there
 		 * will be no newly frozen tuples.
 		 */
-		presult->all_frozen = false;
-		presult->nfrozen = 0;	/* avoid miscounts in instrumentation */
+		presult->set_all_frozen = false;
+		presult->nfrozen = 0;	/* avoid miscounts in instrumenation */
 	}
 
 	/* Any error while applying the changes is critical */
@@ -708,15 +594,15 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 			 * conservative cutoff by stepping back from OldestXmin. This
 			 * avoids false conflicts when hot_standby_feedback is in use.
 			 */
-			if (all_visible_except_removable && presult->all_frozen)
-				frz_conflict_horizon = visibility_cutoff_xid;
+			if (prstate.all_visible_except_removable && presult->set_all_frozen)
+				frz_conflict_horizon = prstate.visibility_cutoff_xid;
 			else
 			{
 				/* Avoids false conflicts when hot_standby_feedback in use */
-				frz_conflict_horizon = pagefrz->cutoffs->OldestXmin;
+				frz_conflict_horizon = prstate.pagefrz.cutoffs->OldestXmin;
 				TransactionIdRetreat(frz_conflict_horizon);
 			}
-			heap_freeze_prepared_tuples(buffer, prstate.frozen, presult->nfrozen);
+			heap_freeze_prepared_tuples(buffer, prstate.pagefrz.frozen, presult->nfrozen);
 		}
 
 		MarkBufferDirty(buffer);
@@ -746,7 +632,7 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 			log_heap_prune_and_freeze(relation, buffer,
 									  conflict_xid,
 									  true, reason,
-									  prstate.frozen, presult->nfrozen,
+									  prstate.pagefrz.frozen, presult->nfrozen,
 									  prstate.redirected, prstate.nredirected,
 									  prstate.nowdead, prstate.ndead,
 									  prstate.nowunused, prstate.nunused);
@@ -761,29 +647,31 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 	 * page is completely frozen, there can be no conflict and the
 	 * vm_conflict_horizon should remain InvalidTransactionId.
 	 */
-	if (!presult->all_frozen)
-		presult->vm_conflict_horizon = visibility_cutoff_xid;
+	if (!presult->set_all_frozen)
+		presult->vm_conflict_horizon = prstate.visibility_cutoff_xid;
+
+	/*
+	 * If we will freeze tuples on the page or, even if we don't freeze tuples
+	 * on the page, if we will set the page all-frozen in the visibility map,
+	 * we can advance relfrozenxid and relminmxid to the values in
+	 * pagefrz->FreezePageRelfrozenXid and pagefrz->FreezePageRelminMxid.
+	 * MFIXME: which one should be pick if presult->nfrozen == 0 and
+	 * presult->all_frozen = True.
+	 */
+	if (new_relfrozen_xid)
+	{
+		if (presult->nfrozen > 0)
+			*new_relfrozen_xid = prstate.pagefrz.FreezePageRelfrozenXid;
+		else
+			*new_relfrozen_xid = prstate.pagefrz.NoFreezePageRelfrozenXid;
+	}
 
-	if (pagefrz)
+	if (new_relmin_mxid)
 	{
-		/*
-		 * If we will freeze tuples on the page or, even if we don't freeze
-		 * tuples on the page, if we will set the page all-frozen in the
-		 * visibility map, we can advance relfrozenxid and relminmxid to the
-		 * values in pagefrz->FreezePageRelfrozenXid and
-		 * pagefrz->FreezePageRelminMxid. MFIXME: which one should be pick if
-		 * presult->nfrozen == 0 and presult->all_frozen = True.
-		 */
 		if (presult->nfrozen > 0)
-		{
-			presult->new_relfrozenxid = pagefrz->FreezePageRelfrozenXid;
-			presult->new_relminmxid = pagefrz->FreezePageRelminMxid;
-		}
+			*new_relmin_mxid = prstate.pagefrz.FreezePageRelminMxid;
 		else
-		{
-			presult->new_relfrozenxid = pagefrz->NoFreezePageRelfrozenXid;
-			presult->new_relminmxid = pagefrz->NoFreezePageRelminMxid;
-		}
+			*new_relmin_mxid = prstate.pagefrz.NoFreezePageRelminMxid;
 	}
 }
 
@@ -900,13 +788,32 @@ heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
 			 * either here or while following a chain below.  Whichever path
 			 * gets there first will mark the tuple unused.
 			 */
-			if (prstate->htsv[rootoffnum] == HEAPTUPLE_DEAD &&
-				!HeapTupleHeaderIsHotUpdated(htup))
+			if (!HeapTupleHeaderIsHotUpdated(htup))
 			{
-				heap_prune_record_unused(prstate, rootoffnum);
-				HeapTupleHeaderAdvanceConflictHorizon(htup,
-													  &prstate->latest_xid_removed);
-				ndeleted++;
+				if (prstate->htsv[rootoffnum] == HEAPTUPLE_DEAD)
+				{
+					heap_prune_record_unused(prstate, rootoffnum);
+					HeapTupleHeaderAdvanceConflictHorizon(htup,
+														  &prstate->latest_xid_removed);
+					ndeleted++;
+				}
+				else
+				{
+					Assert(!prstate->marked[rootoffnum]);
+
+					/*
+					 * MFIXME: not sure if this is right -- maybe counting too
+					 * many
+					 */
+
+					/*
+					 * Ensure that this tuple is counted. If it is later
+					 * redirected to, it would have been counted then, but we
+					 * won't double count because we check if it has already
+					 * been counted first.
+					 */
+					heap_prune_record_live_or_recently_dead(dp, prstate, rootoffnum, presult);
+				}
 			}
 
 			/* Nothing more to do */
@@ -967,13 +874,13 @@ heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
 		if (ItemIdIsDead(lp))
 		{
 			/*
-			 * If the caller set mark_unused_now true, we can set dead line
-			 * pointers LP_UNUSED now. We don't increment ndeleted here since
-			 * the LP was already marked dead. If it will not be marked
+			 * If the caller set PRUNE_DO_MARK_UNUSED_NOW, we can set dead
+			 * line pointers LP_UNUSED now. We don't increment ndeleted here
+			 * since the LP was already marked dead. If it will not be marked
 			 * LP_UNUSED, it will remain LP_DEAD, making the page not
 			 * all_visible.
 			 */
-			if (unlikely(prstate->mark_unused_now))
+			if (unlikely(prstate->actions & PRUNE_DO_MARK_UNUSED_NOW))
 				heap_prune_record_unused(prstate, offnum);
 			else
 			{
@@ -1118,7 +1025,7 @@ heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
 		if (i >= nchain)
 			heap_prune_record_dead_or_unused(prstate, rootoffnum, presult);
 		else
-			heap_prune_record_redirect(prstate, rootoffnum, chainitems[i], presult);
+			heap_prune_record_redirect(dp, prstate, rootoffnum, chainitems[i], presult);
 	}
 	else if (nchain < 2 && ItemIdIsRedirected(rootlp))
 	{
@@ -1132,6 +1039,14 @@ heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
 		heap_prune_record_dead_or_unused(prstate, rootoffnum, presult);
 	}
 
+	/*
+	 * If not marked for pruning, consider if the tuple should be counted as
+	 * live or recently dead. Note that line pointers redirected to will
+	 * already have been counted.
+	 */
+	if (ItemIdIsNormal(rootlp) && !prstate->marked[rootoffnum])
+		heap_prune_record_live_or_recently_dead(dp, prstate, rootoffnum, presult);
+
 	return ndeleted;
 }
 
@@ -1151,13 +1066,15 @@ heap_prune_record_prunable(PruneState *prstate, TransactionId xid)
 
 /* Record line pointer to be redirected */
 static void
-heap_prune_record_redirect(PruneState *prstate,
+heap_prune_record_redirect(Page page, PruneState *prstate,
 						   OffsetNumber offnum, OffsetNumber rdoffnum,
 						   PruneFreezeResult *presult)
 {
 	Assert(prstate->nredirected < MaxHeapTuplesPerPage);
 	prstate->redirected[prstate->nredirected * 2] = offnum;
 	prstate->redirected[prstate->nredirected * 2 + 1] = rdoffnum;
+	heap_prune_record_live_or_recently_dead(page, prstate, rdoffnum, presult);
+
 	prstate->nredirected++;
 	Assert(!prstate->marked[offnum]);
 	prstate->marked[offnum] = true;
@@ -1189,22 +1106,22 @@ heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum,
 }
 
 /*
- * Depending on whether or not the caller set mark_unused_now to true, record that a
- * line pointer should be marked LP_DEAD or LP_UNUSED. There are other cases in
- * which we will mark line pointers LP_UNUSED, but we will not mark line
- * pointers LP_DEAD if mark_unused_now is true.
+ * Depending on whether or not the caller set PRUNE_DO_MARK_UNUSED_NOW, record
+ * that a line pointer should be marked LP_DEAD or LP_UNUSED. There are other
+ * cases in which we will mark line pointers LP_UNUSED, but we will not mark
+ * line pointers LP_DEAD if PRUNE_DO_MARK_UNUSED_NOW is set.
  */
 static void
 heap_prune_record_dead_or_unused(PruneState *prstate, OffsetNumber offnum,
 								 PruneFreezeResult *presult)
 {
 	/*
-	 * If the caller set mark_unused_now to true, we can remove dead tuples
+	 * If the caller set PRUNE_DO_MARK_UNUSED_NOW, we can remove dead tuples
 	 * during pruning instead of marking their line pointers dead. Set this
 	 * tuple's line pointer LP_UNUSED. We hint that this option is less
 	 * likely.
 	 */
-	if (unlikely(prstate->mark_unused_now))
+	if (unlikely(prstate->actions & PRUNE_DO_MARK_UNUSED_NOW))
 		heap_prune_record_unused(prstate, offnum);
 	else
 		heap_prune_record_dead(prstate, offnum, presult);
@@ -1221,6 +1138,187 @@ heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum)
 	prstate->marked[offnum] = true;
 }
 
+static void
+heap_prune_record_live_or_recently_dead(Page page, PruneState *prstate, OffsetNumber offnum,
+										PruneFreezeResult *presult)
+{
+	HTSV_Result status;
+	HeapTupleHeader htup;
+	bool		totally_frozen;
+
+	/* This could happen for items which are redirected to. */
+	if (prstate->counted[offnum])
+		return;
+
+	prstate->counted[offnum] = true;
+
+	/*
+	 * If we don't want to do any of the special defined actions, we don't
+	 * need to continue.
+	 */
+	if (prstate->actions == 0)
+		return;
+
+	status = htsv_get_valid_status(prstate->htsv[offnum]);
+
+	Assert(status != HEAPTUPLE_DEAD);
+
+	/*
+	 * Deliberately don't set hastup for LP_DEAD items.  We make the soft
+	 * assumption that any LP_DEAD items encountered here will become
+	 * LP_UNUSED later on, before count_nondeletable_pages is reached.  If we
+	 * don't make this assumption then rel truncation will only happen every
+	 * other VACUUM, at most.  Besides, VACUUM must treat
+	 * hastup/nonempty_pages as provisional no matter how LP_DEAD items are
+	 * handled (handled here, or handled later on).
+	 */
+	presult->hastup = true;
+
+	/*
+	 * The criteria for counting a tuple as live in this block need to match
+	 * what analyze.c's acquire_sample_rows() does, otherwise VACUUM and
+	 * ANALYZE may produce wildly different reltuples values, e.g. when there
+	 * are many recently-dead tuples.
+	 *
+	 * The logic here is a bit simpler than acquire_sample_rows(), as VACUUM
+	 * can't run inside a transaction block, which makes some cases impossible
+	 * (e.g. in-progress insert from the same transaction).
+	 *
+	 * We treat LP_DEAD items (which are the closest thing to DEAD tuples that
+	 * might be seen here) differently, too: we assume that they'll become
+	 * LP_UNUSED before VACUUM finishes.  This difference is only superficial.
+	 * VACUUM effectively agrees with ANALYZE about DEAD items, in the end.
+	 * VACUUM won't remember LP_DEAD items, but only because they're not
+	 * supposed to be left behind when it is done. (Cases where we bypass
+	 * index vacuuming will violate this optimistic assumption, but the
+	 * overall impact of that should be negligible.)
+	 *
+	 * HEAPTUPLE_LIVE tuples are naturally counted as live. This is also what
+	 * acquire_sample_rows() does.
+	 *
+	 * HEAPTUPLE_DELETE_IN_PROGRESS tuples are expected during concurrent
+	 * vacuum. We expect the deleting transaction to update the counters at
+	 * commit after we report our results, so count these tuples as live to
+	 * ensure the math works out. The assumption that the transaction will
+	 * commit and update the counters after we report is a bit shaky; but it
+	 * is what acquire_sample_rows() does, so we do the same to be consistent.
+	 */
+	htup = (HeapTupleHeader) PageGetItem(page, PageGetItemId(page, offnum));
+
+	switch (status)
+	{
+		case HEAPTUPLE_LIVE:
+
+			/*
+			 * Count it as live.  Not only is this natural, but it's also what
+			 * acquire_sample_rows() does.
+			 */
+			presult->live_tuples++;
+
+			/*
+			 * Is the tuple definitely visible to all transactions?
+			 *
+			 * NB: Like with per-tuple hint bits, we can't set the
+			 * PD_ALL_VISIBLE flag if the inserter committed asynchronously.
+			 * See SetHintBits for more info. Check that the tuple is hinted
+			 * xmin-committed because of that.
+			 */
+			if (prstate->all_visible_except_removable)
+			{
+				TransactionId xmin;
+
+				if (!HeapTupleHeaderXminCommitted(htup))
+				{
+					prstate->all_visible_except_removable = false;
+					presult->all_visible = false;
+					break;
+				}
+
+				/*
+				 * The inserter definitely committed. But is it old enough
+				 * that everyone sees it as committed? A FrozenTransactionId
+				 * is seen as committed to everyone. Otherwise, we check if
+				 * there is a snapshot that considers this xid to still be
+				 * running, and if so, we don't consider the page all-visible.
+				 */
+				xmin = HeapTupleHeaderGetXmin(htup);
+
+				/* For now always use pagefrz->cutoffs */
+				Assert(prstate->pagefrz.cutoffs);
+				if (!TransactionIdPrecedes(xmin, prstate->pagefrz.cutoffs->OldestXmin))
+				{
+					prstate->all_visible_except_removable = false;
+					presult->all_visible = false;
+					break;
+				}
+
+				/* Track newest xmin on page. */
+				if (TransactionIdFollows(xmin, prstate->visibility_cutoff_xid) &&
+					TransactionIdIsNormal(xmin))
+					prstate->visibility_cutoff_xid = xmin;
+			}
+			break;
+		case HEAPTUPLE_RECENTLY_DEAD:
+
+			/*
+			 * If tuple is recently dead then we must not remove it from the
+			 * relation.  (We only remove items that are LP_DEAD from
+			 * pruning.)
+			 */
+			presult->recently_dead_tuples++;
+			prstate->all_visible_except_removable = false;
+			presult->all_visible = false;
+			break;
+		case HEAPTUPLE_INSERT_IN_PROGRESS:
+
+			/*
+			 * We do not count these rows as live, because we expect the
+			 * inserting transaction to update the counters at commit, and we
+			 * assume that will happen only after we report our results.  This
+			 * assumption is a bit shaky, but it is what acquire_sample_rows()
+			 * does, so be consistent.
+			 */
+			prstate->all_visible_except_removable = false;
+			presult->all_visible = false;
+			break;
+		case HEAPTUPLE_DELETE_IN_PROGRESS:
+
+			/*
+			 * This an expected case during concurrent vacuum. Count such rows
+			 * as live.  As above, we assume the deleting transaction will
+			 * commit and update the counters after we report.
+			 */
+			presult->live_tuples++;
+			prstate->all_visible_except_removable = false;
+			presult->all_visible = false;
+			break;
+		default:
+			elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
+			break;
+	}
+
+	/* Consider freezing any normal tuples which will not be removed */
+	if (prstate->actions & PRUNE_DO_TRY_FREEZE)
+	{
+		/* Tuple with storage -- consider need to freeze */
+		if ((heap_prepare_freeze_tuple(htup, &prstate->pagefrz,
+									   &prstate->pagefrz.frozen[presult->nfrozen],
+									   &totally_frozen)))
+		{
+			/* Save prepared freeze plan for later */
+			prstate->pagefrz.frozen[presult->nfrozen++].offset = offnum;
+		}
+
+		/*
+		 * If any tuple isn't either totally frozen already or eligible to
+		 * become totally frozen (according to its freeze plan), then the page
+		 * definitely cannot be set all-frozen in the visibility map later on
+		 */
+		if (!totally_frozen)
+			presult->set_all_frozen = false;
+	}
+
+}
 
 /*
  * Perform the actual page changes needed by heap_page_prune.
@@ -1354,12 +1452,12 @@ heap_page_prune_execute(Buffer buffer, bool lp_truncate_only,
 		else
 		{
 			/*
-			 * When heap_page_prune() was called, mark_unused_now may have
-			 * been passed as true, which allows would-be LP_DEAD items to be
-			 * made LP_UNUSED instead.  This is only possible if the relation
-			 * has no indexes.  If there are any dead items, then
-			 * mark_unused_now was not true and every item being marked
-			 * LP_UNUSED must refer to a heap-only tuple.
+			 * When heap_page_prune() was called, PRUNE_DO_MARK_UNUSED_NOW may
+			 * have been set, which allows would-be LP_DEAD items to be made
+			 * LP_UNUSED instead.  This is only possible if the relation has
+			 * no indexes.  If there are any dead items, then
+			 * PRUNE_DO_MARK_UNUSED_NOW was not set and every item being
+			 * marked LP_UNUSED must refer to a heap-only tuple.
 			 */
 			if (ndead > 0)
 			{
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 0fb5a7dd24d..04e86347a0b 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -1397,18 +1397,10 @@ lazy_scan_prune(LVRelState *vacrel,
 {
 	Relation	rel = vacrel->rel;
 	PruneFreezeResult presult;
-	HeapPageFreeze pagefrz;
+	uint8		actions = 0;
 
 	Assert(BufferGetBlockNumber(buf) == blkno);
 
-	/* Initialize pagefrz */
-	pagefrz.freeze_required = false;
-	pagefrz.FreezePageRelfrozenXid = vacrel->NewRelfrozenXid;
-	pagefrz.FreezePageRelminMxid = vacrel->NewRelminMxid;
-	pagefrz.NoFreezePageRelfrozenXid = vacrel->NewRelfrozenXid;
-	pagefrz.NoFreezePageRelminMxid = vacrel->NewRelminMxid;
-	pagefrz.cutoffs = &vacrel->cutoffs;
-
 	/*
 	 * Prune all HOT-update chains and potentially freeze tuples on this page.
 	 *
@@ -1418,22 +1410,26 @@ lazy_scan_prune(LVRelState *vacrel,
 	 * of as the number of tuples that were deleted from indexes.
 	 *
 	 * If the relation has no indexes, we can immediately mark would-be dead
-	 * items LP_UNUSED, so mark_unused_now should be true if no indexes and
-	 * false otherwise.
+	 * items LP_UNUSED, so PRUNE_DO_MARK_UNUSED_NOW should be set if no
+	 * indexes and unset otherwise.
 	 *
 	 * We will update the VM after collecting LP_DEAD items and freezing
 	 * tuples. Pruning will have determined whether or not the page is
 	 * all-visible.
 	 */
-	heap_page_prune_and_freeze(rel, buf, vacrel->vistest, vacrel->nindexes == 0,
-							   &pagefrz, &presult, PRUNE_VACUUM_SCAN, &vacrel->offnum);
+	actions |= PRUNE_DO_TRY_FREEZE;
 
-	vacrel->offnum = InvalidOffsetNumber;
+	if (vacrel->nindexes == 0)
+		actions |= PRUNE_DO_MARK_UNUSED_NOW;
 
-	Assert(MultiXactIdIsValid(presult.new_relminmxid));
-	vacrel->NewRelfrozenXid = presult.new_relfrozenxid;
-	Assert(TransactionIdIsValid(presult.new_relfrozenxid));
-	vacrel->NewRelminMxid = presult.new_relminmxid;
+	heap_page_prune_and_freeze(rel, buf, actions, vacrel->vistest,
+							   &vacrel->cutoffs, &presult, PRUNE_VACUUM_SCAN, &vacrel->offnum,
+							   &vacrel->NewRelfrozenXid, &vacrel->NewRelminMxid);
+
+	Assert(MultiXactIdIsValid(vacrel->NewRelminMxid));
+	Assert(TransactionIdIsValid(vacrel->NewRelfrozenXid));
+
+	vacrel->offnum = InvalidOffsetNumber;
 
 	if (presult.nfrozen > 0)
 	{
@@ -1466,7 +1462,7 @@ lazy_scan_prune(LVRelState *vacrel,
 									  &debug_cutoff, &debug_all_frozen))
 			Assert(false);
 
-		Assert(presult.all_frozen == debug_all_frozen);
+		Assert(presult.set_all_frozen == debug_all_frozen);
 
 		Assert(!TransactionIdIsValid(debug_cutoff) ||
 			   debug_cutoff == presult.vm_conflict_horizon);
@@ -1521,7 +1517,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	{
 		uint8		flags = VISIBILITYMAP_ALL_VISIBLE;
 
-		if (presult.all_frozen)
+		if (presult.set_all_frozen)
 		{
 			Assert(!TransactionIdIsValid(presult.vm_conflict_horizon));
 			flags |= VISIBILITYMAP_ALL_FROZEN;
@@ -1592,7 +1588,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	 * true, so we must check both all_visible and all_frozen.
 	 */
 	else if (all_visible_according_to_vm && presult.all_visible &&
-			 presult.all_frozen && !VM_ALL_FROZEN(vacrel->rel, blkno, &vmbuffer))
+			 presult.set_all_frozen && !VM_ALL_FROZEN(vacrel->rel, blkno, &vmbuffer))
 	{
 		/*
 		 * Avoid relying on all_visible_according_to_vm as a proxy for the
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 2740eaac13e..747a9ea0052 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -191,8 +191,35 @@ typedef struct HeapPageFreeze
 	MultiXactId NoFreezePageRelminMxid;
 
 	struct VacuumCutoffs *cutoffs;
+
+	/*
+	 * One entry for every tuple that we may freeze.
+	 */
+	HeapTupleFreeze frozen[MaxHeapTuplesPerPage];
 } HeapPageFreeze;
 
+/*
+ * Actions that can be taken during pruning and freezing. By default, we will
+ * at least attempt regular pruning.
+ */
+
+/*
+ * mark_unused_now indicates whether or not dead items can be set LP_UNUSED
+ * during pruning.
+ */
+#define		PRUNE_DO_MARK_UNUSED_NOW (1 << 1)
+
+/*
+ * Freeze if advantageous or required and try to advance relfrozenxid and
+ * relminmxid. To attempt freezing, we will need to determine if the page is
+ * all frozen. So, if this action is set, we will also inform the caller if the
+ * page is all-visible and/or all-frozen and calculate a snapshot conflict
+ * horizon for updating the visibility map. While doing this, we also count if
+ * tuples are live or recently dead.
+ */
+#define		PRUNE_DO_TRY_FREEZE (1 << 2)
+
+
 /*
  * Per-page state returned from pruning
  */
@@ -203,14 +230,17 @@ typedef struct PruneFreezeResult
 
 	/*
 	 * The rest of the fields in PruneFreezeResult are only guaranteed to be
-	 * initialized if heap_page_prune_and_freeze() is passed a PruneReason
-	 * other than PRUNE_ON_ACCESS.
+	 * initialized if heap_page_prune_and_freeze() is passed
+	 * PRUNE_DO_TRY_FREEZE.
 	 */
-	int			live_tuples;
-	int			recently_dead_tuples;
-
 	/* Number of tuples we froze */
 	int			nfrozen;
+	/* Whether or not the page should be set all-frozen in the VM */
+	bool		set_all_frozen;
+
+	/* Number of live and recently dead tuples */
+	int			live_tuples;
+	int			recently_dead_tuples;
 
 	/*
 	 * Whether or not the page is truly all-visible after pruning. If there
@@ -219,8 +249,6 @@ typedef struct PruneFreezeResult
 	 */
 	bool		all_visible;
 
-	/* Whether or not the page can be set all-frozen in the VM */
-	bool		all_frozen;
 
 	/* Whether or not the page makes rel truncation unsafe */
 	bool		hastup;
@@ -232,15 +260,6 @@ typedef struct PruneFreezeResult
 	 */
 	TransactionId vm_conflict_horizon;
 
-	/*
-	 * One entry for every tuple that we may freeze.
-	 */
-	HeapTupleFreeze frozen[MaxHeapTuplesPerPage];
-	/* New value of relfrozenxid found by heap_page_prune_and_freeze() */
-	TransactionId new_relfrozenxid;
-
-	/* New value of relminmxid found by heap_page_prune_and_freeze() */
-	MultiXactId new_relminmxid;
 	int			lpdead_items;	/* includes existing LP_DEAD items */
 	OffsetNumber deadoffsets[MaxHeapTuplesPerPage];
 } PruneFreezeResult;
@@ -352,12 +371,14 @@ extern TransactionId heap_index_delete_tuples(Relation rel,
 struct GlobalVisState;
 extern void heap_page_prune_opt(Relation relation, Buffer buffer);
 extern void heap_page_prune_and_freeze(Relation relation, Buffer buffer,
+									   uint8 actions,
 									   struct GlobalVisState *vistest,
-									   bool mark_unused_now,
-									   HeapPageFreeze *pagefrz,
+									   struct VacuumCutoffs *cutoffs,
 									   PruneFreezeResult *presult,
 									   PruneReason reason,
-									   OffsetNumber *off_loc);
+									   OffsetNumber *off_loc,
+									   TransactionId *new_relfrozen_xid,
+									   MultiXactId *new_relmin_mxid);
 extern void heap_page_prune_execute(Buffer buffer, bool lp_truncate_only,
 									OffsetNumber *redirected, int nredirected,
 									OffsetNumber *nowdead, int ndead,
-- 
2.40.1

