From 9a310d238e191445c514f036966dd5af5a7a15c2 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Mon, 22 Jan 2024 16:55:28 -0500
Subject: [PATCH v2 15/17] Save dead tuple offsets during heap_page_prune

After heap_page_prune() returned, lazy_scan_prune() looped through all
of the offsets of LP_DEAD items which it later added to
LVRelState->dead_items. Instead take care of this when marking a line
pointer or when an existing non-removable LP_DEAD item is encountered in
heap_prune_chain().
---
 src/backend/access/heap/pruneheap.c  |  7 ++++
 src/backend/access/heap/vacuumlazy.c | 60 ++++++----------------------
 src/include/access/heapam.h          |  2 +
 3 files changed, 22 insertions(+), 47 deletions(-)

diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 46b5173a401..c5046da6d1e 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -298,6 +298,7 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 
 	presult->live_tuples = 0;
 	presult->recently_dead_tuples = 0;
+	presult->lpdead_items = 0;
 
 	/*
 	 * Keep track of whether or not the page is all_visible in case the caller
@@ -987,7 +988,10 @@ heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
 			if (unlikely(prstate->mark_unused_now))
 				heap_prune_record_unused(prstate, offnum);
 			else
+			{
 				presult->all_visible = false;
+				presult->deadoffsets[presult->lpdead_items++] = offnum;
+			}
 
 			break;
 		}
@@ -1239,6 +1243,9 @@ heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum,
 	 * all_visible.
 	 */
 	presult->all_visible = false;
+
+	/* Record the dead offset for vacuum */
+	presult->deadoffsets[presult->lpdead_items++] = offnum;
 }
 
 /*
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 634f4da9a17..4b45e8be1ad 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -1439,23 +1439,11 @@ lazy_scan_prune(LVRelState *vacrel,
 				bool *has_lpdead_items)
 {
 	Relation	rel = vacrel->rel;
-	OffsetNumber offnum,
-				maxoff;
-	ItemId		itemid;
-	int			lpdead_items = 0;
 	PruneFreezeResult presult;
 	HeapPageFreeze pagefrz;
-	OffsetNumber deadoffsets[MaxHeapTuplesPerPage];
 
 	Assert(BufferGetBlockNumber(buf) == blkno);
 
-	/*
-	 * maxoff might be reduced following line pointer array truncation in
-	 * heap_page_prune_and_freeze().  That's safe for us to ignore, since the
-	 * reclaimed space will continue to look like LP_UNUSED items below.
-	 */
-	maxoff = PageGetMaxOffsetNumber(page);
-
 	/* Initialize pagefrz */
 	pagefrz.freeze_required = false;
 	pagefrz.FreezePageRelfrozenXid = vacrel->NewRelfrozenXid;
@@ -1468,9 +1456,9 @@ lazy_scan_prune(LVRelState *vacrel,
 	 * Prune all HOT-update chains and potentially freeze tuples on this page.
 	 *
 	 * We count the number of tuples removed from the page by the pruning step
-	 * in presult.ndeleted. It should not be confused with lpdead_items;
-	 * lpdead_items's final value can be thought of as the number of tuples
-	 * that were deleted from indexes.
+	 * in presult.ndeleted. It should not be confused with
+	 * presult.lpdead_items; presult.lpdead_items's final value can be thought
+	 * of as the number of tuples that were deleted from indexes.
 	 *
 	 * If the relation has no indexes, we can immediately mark would-be dead
 	 * items LP_UNUSED, so mark_unused_now should be true if no indexes and
@@ -1480,32 +1468,10 @@ lazy_scan_prune(LVRelState *vacrel,
 							   &pagefrz, &presult, &vacrel->offnum);
 
 	/*
-	 * Now scan the page to collect LP_DEAD items and check for tuples
-	 * requiring freezing among remaining tuples with storage. We will update
-	 * the VM after collecting LP_DEAD items and freezing tuples. Pruning will
-	 * have determined whether or not the page is all_visible and able to
-	 * become all_frozen.
+	 * We will update the VM after collecting LP_DEAD items and freezing
+	 * tuples. Pruning will have determined whether or not the page is
+	 * all_visible.
 	 */
-	for (offnum = FirstOffsetNumber;
-		 offnum <= maxoff;
-		 offnum = OffsetNumberNext(offnum))
-	{
-		/*
-		 * Set the offset number so that we can display it along with any
-		 * error that occurred while processing this tuple.
-		 */
-		vacrel->offnum = offnum;
-		itemid = PageGetItemId(page, offnum);
-
-
-		if (ItemIdIsDead(itemid))
-		{
-			deadoffsets[lpdead_items++] = offnum;
-			continue;
-		}
-
-	}
-
 	vacrel->offnum = InvalidOffsetNumber;
 
 	Assert(MultiXactIdIsValid(presult.new_relminmxid));
@@ -1541,7 +1507,7 @@ lazy_scan_prune(LVRelState *vacrel,
 		TransactionId debug_cutoff;
 		bool		debug_all_frozen;
 
-		Assert(lpdead_items == 0);
+		Assert(presult.lpdead_items == 0);
 
 		if (!heap_page_is_all_visible(vacrel, buf,
 									  &debug_cutoff, &debug_all_frozen))
@@ -1557,7 +1523,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	/*
 	 * Now save details of the LP_DEAD items from the page in vacrel
 	 */
-	if (lpdead_items > 0)
+	if (presult.lpdead_items > 0)
 	{
 		VacDeadItems *dead_items = vacrel->dead_items;
 		ItemPointerData tmp;
@@ -1566,9 +1532,9 @@ lazy_scan_prune(LVRelState *vacrel,
 
 		ItemPointerSetBlockNumber(&tmp, blkno);
 
-		for (int i = 0; i < lpdead_items; i++)
+		for (int i = 0; i < presult.lpdead_items; i++)
 		{
-			ItemPointerSetOffsetNumber(&tmp, deadoffsets[i]);
+			ItemPointerSetOffsetNumber(&tmp, presult.deadoffsets[i]);
 			dead_items->items[dead_items->num_items++] = tmp;
 		}
 
@@ -1580,7 +1546,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	/* Finally, add page-local counts to whole-VACUUM counts */
 	vacrel->tuples_deleted += presult.ndeleted;
 	vacrel->tuples_frozen += presult.nfrozen;
-	vacrel->lpdead_items += lpdead_items;
+	vacrel->lpdead_items += presult.lpdead_items;
 	vacrel->live_tuples += presult.live_tuples;
 	vacrel->recently_dead_tuples += presult.recently_dead_tuples;
 
@@ -1589,7 +1555,7 @@ lazy_scan_prune(LVRelState *vacrel,
 		vacrel->nonempty_pages = blkno + 1;
 
 	/* Did we find LP_DEAD items? */
-	*has_lpdead_items = (lpdead_items > 0);
+	*has_lpdead_items = (presult.lpdead_items > 0);
 
 	Assert(!presult.all_visible || !(*has_lpdead_items));
 
@@ -1657,7 +1623,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	 * There should never be LP_DEAD items on a page with PD_ALL_VISIBLE set,
 	 * however.
 	 */
-	else if (lpdead_items > 0 && PageIsAllVisible(page))
+	else if (presult.lpdead_items > 0 && PageIsAllVisible(page))
 	{
 		elog(WARNING, "page containing LP_DEAD items is marked as all-visible in relation \"%s\" page %u",
 			 vacrel->relname, blkno);
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index f4bf60192f8..86524ae0c3d 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -219,6 +219,8 @@ typedef struct PruneFreezeResult
 
 	/* New value of relminmxid found by heap_page_prune_and_freeze() */
 	MultiXactId new_relminmxid;
+	OffsetNumber deadoffsets[MaxHeapTuplesPerPage];
+	int			lpdead_items;	/* includes existing LP_DEAD items */
 } PruneFreezeResult;
 
 /* ----------------
-- 
2.40.1

