From 158545b5de5cccd27a3a023875891b9c4459bd89 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Tue, 26 Mar 2024 10:16:11 -0400
Subject: [PATCH v9 15/21] Save dead tuple offsets during heap_page_prune

After heap_page_prune() returned, lazy_scan_prune() looped through all
of the offsets of LP_DEAD items which it later added to
LVRelState->dead_items. Instead take care of this when marking a line
pointer or when an existing non-removable LP_DEAD item is encountered in
heap_prune_chain().
---
 src/backend/access/heap/pruneheap.c  |  7 ++++
 src/backend/access/heap/vacuumlazy.c | 60 +++++++---------------------
 src/include/access/heapam.h          |  2 +
 3 files changed, 23 insertions(+), 46 deletions(-)

diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index fde5f26bb5a..3529ea69520 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -297,6 +297,7 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 
 	presult->live_tuples = 0;
 	presult->recently_dead_tuples = 0;
+	presult->lpdead_items = 0;
 
 	/*
 	 * Caller will update the VM after pruning, collecting LP_DEAD items, and
@@ -971,7 +972,10 @@ heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
 			if (unlikely(prstate->mark_unused_now))
 				heap_prune_record_unused(prstate, offnum);
 			else
+			{
 				presult->all_visible = false;
+				presult->deadoffsets[presult->lpdead_items++] = offnum;
+			}
 
 			break;
 		}
@@ -1175,6 +1179,9 @@ heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum,
 	 * all_visible.
 	 */
 	presult->all_visible = false;
+
+	/* Record the dead offset for vacuum */
+	presult->deadoffsets[presult->lpdead_items++] = offnum;
 }
 
 /*
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index c28e786a1e0..0fb5a7dd24d 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -1396,23 +1396,11 @@ lazy_scan_prune(LVRelState *vacrel,
 				bool *has_lpdead_items)
 {
 	Relation	rel = vacrel->rel;
-	OffsetNumber offnum,
-				maxoff;
-	ItemId		itemid;
-	int			lpdead_items = 0;
 	PruneFreezeResult presult;
 	HeapPageFreeze pagefrz;
-	OffsetNumber deadoffsets[MaxHeapTuplesPerPage];
 
 	Assert(BufferGetBlockNumber(buf) == blkno);
 
-	/*
-	 * maxoff might be reduced following line pointer array truncation in
-	 * heap_page_prune_and_freeze().  That's safe for us to ignore, since the
-	 * reclaimed space will continue to look like LP_UNUSED items below.
-	 */
-	maxoff = PageGetMaxOffsetNumber(page);
-
 	/* Initialize pagefrz */
 	pagefrz.freeze_required = false;
 	pagefrz.FreezePageRelfrozenXid = vacrel->NewRelfrozenXid;
@@ -1425,41 +1413,21 @@ lazy_scan_prune(LVRelState *vacrel,
 	 * Prune all HOT-update chains and potentially freeze tuples on this page.
 	 *
 	 * We count the number of tuples removed from the page by the pruning step
-	 * in presult.ndeleted. It should not be confused with lpdead_items;
-	 * lpdead_items's final value can be thought of as the number of tuples
-	 * that were deleted from indexes.
+	 * in presult.ndeleted. It should not be confused with
+	 * presult.lpdead_items; presult.lpdead_items's final value can be thought
+	 * of as the number of tuples that were deleted from indexes.
 	 *
 	 * If the relation has no indexes, we can immediately mark would-be dead
 	 * items LP_UNUSED, so mark_unused_now should be true if no indexes and
 	 * false otherwise.
+	 *
+	 * We will update the VM after collecting LP_DEAD items and freezing
+	 * tuples. Pruning will have determined whether or not the page is
+	 * all-visible.
 	 */
 	heap_page_prune_and_freeze(rel, buf, vacrel->vistest, vacrel->nindexes == 0,
 							   &pagefrz, &presult, PRUNE_VACUUM_SCAN, &vacrel->offnum);
 
-	/*
-	 * Now scan the page to collect LP_DEAD items and update the variables set
-	 * just above.
-	 */
-	for (offnum = FirstOffsetNumber;
-		 offnum <= maxoff;
-		 offnum = OffsetNumberNext(offnum))
-	{
-		/*
-		 * Set the offset number so that we can display it along with any
-		 * error that occurred while processing this tuple.
-		 */
-		vacrel->offnum = offnum;
-		itemid = PageGetItemId(page, offnum);
-
-
-		if (ItemIdIsDead(itemid))
-		{
-			deadoffsets[lpdead_items++] = offnum;
-			continue;
-		}
-
-	}
-
 	vacrel->offnum = InvalidOffsetNumber;
 
 	Assert(MultiXactIdIsValid(presult.new_relminmxid));
@@ -1492,7 +1460,7 @@ lazy_scan_prune(LVRelState *vacrel,
 		TransactionId debug_cutoff;
 		bool		debug_all_frozen;
 
-		Assert(lpdead_items == 0);
+		Assert(presult.lpdead_items == 0);
 
 		if (!heap_page_is_all_visible(vacrel, buf,
 									  &debug_cutoff, &debug_all_frozen))
@@ -1508,7 +1476,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	/*
 	 * Now save details of the LP_DEAD items from the page in vacrel
 	 */
-	if (lpdead_items > 0)
+	if (presult.lpdead_items > 0)
 	{
 		VacDeadItems *dead_items = vacrel->dead_items;
 		ItemPointerData tmp;
@@ -1517,9 +1485,9 @@ lazy_scan_prune(LVRelState *vacrel,
 
 		ItemPointerSetBlockNumber(&tmp, blkno);
 
-		for (int i = 0; i < lpdead_items; i++)
+		for (int i = 0; i < presult.lpdead_items; i++)
 		{
-			ItemPointerSetOffsetNumber(&tmp, deadoffsets[i]);
+			ItemPointerSetOffsetNumber(&tmp, presult.deadoffsets[i]);
 			dead_items->items[dead_items->num_items++] = tmp;
 		}
 
@@ -1531,7 +1499,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	/* Finally, add page-local counts to whole-VACUUM counts */
 	vacrel->tuples_deleted += presult.ndeleted;
 	vacrel->tuples_frozen += presult.nfrozen;
-	vacrel->lpdead_items += lpdead_items;
+	vacrel->lpdead_items += presult.lpdead_items;
 	vacrel->live_tuples += presult.live_tuples;
 	vacrel->recently_dead_tuples += presult.recently_dead_tuples;
 
@@ -1540,7 +1508,7 @@ lazy_scan_prune(LVRelState *vacrel,
 		vacrel->nonempty_pages = blkno + 1;
 
 	/* Did we find LP_DEAD items? */
-	*has_lpdead_items = (lpdead_items > 0);
+	*has_lpdead_items = (presult.lpdead_items > 0);
 
 	Assert(!presult.all_visible || !(*has_lpdead_items));
 
@@ -1608,7 +1576,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	 * There should never be LP_DEAD items on a page with PD_ALL_VISIBLE set,
 	 * however.
 	 */
-	else if (lpdead_items > 0 && PageIsAllVisible(page))
+	else if (presult.lpdead_items > 0 && PageIsAllVisible(page))
 	{
 		elog(WARNING, "page containing LP_DEAD items is marked as all-visible in relation \"%s\" page %u",
 			 vacrel->relname, blkno);
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 79ec4049f12..68b4d5b859c 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -241,6 +241,8 @@ typedef struct PruneFreezeResult
 
 	/* New value of relminmxid found by heap_page_prune_and_freeze() */
 	MultiXactId new_relminmxid;
+	int			lpdead_items;	/* includes existing LP_DEAD items */
+	OffsetNumber deadoffsets[MaxHeapTuplesPerPage];
 } PruneFreezeResult;
 
 /* 'reason' codes for heap_page_prune_and_freeze() */
-- 
2.40.1

