From 957cebd15f7f2c6d42c9b5dcdf28fd75f1001a75 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Sat, 30 Mar 2024 01:27:08 -0400
Subject: [PATCH v10 09/10] Save dead tuple offsets during heap_page_prune

After heap_page_prune() returned, lazy_scan_prune() looped through all
of the offsets of LP_DEAD items which it later added to
LVRelState->dead_items. Instead take care of this when marking a line
pointer or when an existing non-removable LP_DEAD item is encountered in
heap_prune_chain().

Because deadoffsets are expected to be in order in
LVRelState->dead_items, sort the deadoffsets before saving them there.
---
 src/backend/access/heap/pruneheap.c  | 17 +++++++++++++
 src/backend/access/heap/vacuumlazy.c | 38 +++++++++++++++++++---------
 src/include/access/heapam.h          |  7 +++++
 3 files changed, 50 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 73fcc0081c..7f55e9c839 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -70,6 +70,13 @@ typedef struct
 
 	/* Whether or not the page makes rel truncation unsafe */
 	bool		hastup;
+
+	/*
+	 * LP_DEAD items on the page after pruning.  Includes existing LP_DEAD
+	 * items
+	 */
+	int			lpdead_items;	/* includes existing LP_DEAD items */
+	OffsetNumber *deadoffsets;	/* points directly to PruneResult->deadoffsets */
 } PruneState;
 
 /* Local functions */
@@ -277,6 +284,8 @@ heap_page_prune(Relation relation, Buffer buffer,
 	prstate.nchain_members = 0;
 	prstate.nchain_candidates = 0;
 	prstate.hastup = false;
+	prstate.lpdead_items = 0;
+	prstate.deadoffsets = presult->deadoffsets;
 
 	/*
 	 * If we will prepare to freeze tuples, consider that it might be possible
@@ -570,6 +579,8 @@ heap_page_prune(Relation relation, Buffer buffer,
 	/* Copy data back to 'presult' */
 	presult->nnewlpdead = prstate.ndead;
 	presult->ndeleted = prstate.ndeleted;
+	presult->lpdead_items = prstate.lpdead_items;
+	/* the presult->deadoffsets array was already filled in */
 }
 
 
@@ -881,6 +892,9 @@ heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum,
 	prstate->nowdead[prstate->ndead] = offnum;
 	prstate->ndead++;
 
+	/* Record the dead offset for vacuum */
+	prstate->deadoffsets[prstate->lpdead_items++] = offnum;
+
 	/*
 	 * If the root entry had been a normal tuple, we are deleting it, so count
 	 * it in the result.  But changing a redirect (even to DEAD state) doesn't
@@ -1026,6 +1040,9 @@ heap_prune_record_unchanged_lp_dead(PruneState *prstate, OffsetNumber offnum)
 	 * hastup/nonempty_pages as provisional no matter how LP_DEAD items are
 	 * handled (handled here, or handled later on).
 	 */
+
+	/* Record the dead offset for vacuum */
+	prstate->deadoffsets[prstate->lpdead_items++] = offnum;
 }
 
 
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 212d76045e..7f1e4db55c 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -1373,6 +1373,15 @@ lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf, BlockNumber blkno,
 	return false;
 }
 
+static int
+OffsetNumber_cmp(const void *a, const void *b)
+{
+	OffsetNumber na = *(const OffsetNumber *) a,
+				nb = *(const OffsetNumber *) b;
+
+	return na < nb ? -1 : na > nb;
+}
+
 /*
  *	lazy_scan_prune() -- lazy_scan_heap() pruning and freezing.
  *
@@ -1416,14 +1425,12 @@ lazy_scan_prune(LVRelState *vacrel,
 				maxoff;
 	ItemId		itemid;
 	PruneResult presult;
-	int			lpdead_items,
-				live_tuples,
+	int			live_tuples,
 				recently_dead_tuples;
 	bool		all_visible;
 	TransactionId visibility_cutoff_xid;
 	uint8		actions = 0;
 	int64		fpi_before = pgWalUsage.wal_fpi;
-	OffsetNumber deadoffsets[MaxHeapTuplesPerPage];
 
 	Assert(BufferGetBlockNumber(buf) == blkno);
 
@@ -1441,7 +1448,6 @@ lazy_scan_prune(LVRelState *vacrel,
 	presult.pagefrz.NoFreezePageRelfrozenXid = vacrel->NewRelfrozenXid;
 	presult.pagefrz.NoFreezePageRelminMxid = vacrel->NewRelminMxid;
 	presult.pagefrz.cutoffs = &vacrel->cutoffs;
-	lpdead_items = 0;
 	live_tuples = 0;
 	recently_dead_tuples = 0;
 
@@ -1509,7 +1515,6 @@ lazy_scan_prune(LVRelState *vacrel,
 			 * (This is another case where it's useful to anticipate that any
 			 * LP_DEAD items will become LP_UNUSED during the ongoing VACUUM.)
 			 */
-			deadoffsets[lpdead_items++] = offnum;
 			continue;
 		}
 
@@ -1713,7 +1718,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	 */
 #ifdef USE_ASSERT_CHECKING
 	/* Note that all_frozen value does not matter when !all_visible */
-	if (all_visible && lpdead_items == 0)
+	if (all_visible && presult.lpdead_items == 0)
 	{
 		TransactionId debug_cutoff;
 		bool		debug_all_frozen;
@@ -1730,7 +1735,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	/*
 	 * Now save details of the LP_DEAD items from the page in vacrel
 	 */
-	if (lpdead_items > 0)
+	if (presult.lpdead_items > 0)
 	{
 		VacDeadItems *dead_items = vacrel->dead_items;
 		ItemPointerData tmp;
@@ -1739,9 +1744,18 @@ lazy_scan_prune(LVRelState *vacrel,
 
 		ItemPointerSetBlockNumber(&tmp, blkno);
 
-		for (int i = 0; i < lpdead_items; i++)
+		/*
+		 * dead_items are expected to be in order. However, deadoffsets are
+		 * collected incrementally in heap_page_prune_and_freeze() as each
+		 * dead line pointer is recorded, with an indeterminate order. As
+		 * such, sort the deadoffsets before saving them in LVRelState.
+		 */
+		qsort(presult.deadoffsets, presult.lpdead_items, sizeof(OffsetNumber),
+			  OffsetNumber_cmp);
+
+		for (int i = 0; i < presult.lpdead_items; i++)
 		{
-			ItemPointerSetOffsetNumber(&tmp, deadoffsets[i]);
+			ItemPointerSetOffsetNumber(&tmp, presult.deadoffsets[i]);
 			dead_items->items[dead_items->num_items++] = tmp;
 		}
 
@@ -1766,7 +1780,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	/* Finally, add page-local counts to whole-VACUUM counts */
 	vacrel->tuples_deleted += presult.ndeleted;
 	vacrel->tuples_frozen += presult.nfrozen;
-	vacrel->lpdead_items += lpdead_items;
+	vacrel->lpdead_items += presult.lpdead_items;
 	vacrel->live_tuples += live_tuples;
 	vacrel->recently_dead_tuples += recently_dead_tuples;
 
@@ -1775,7 +1789,7 @@ lazy_scan_prune(LVRelState *vacrel,
 		vacrel->nonempty_pages = blkno + 1;
 
 	/* Did we find LP_DEAD items? */
-	*has_lpdead_items = (lpdead_items > 0);
+	*has_lpdead_items = (presult.lpdead_items > 0);
 
 	Assert(!all_visible || !(*has_lpdead_items));
 
@@ -1843,7 +1857,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	 * There should never be LP_DEAD items on a page with PD_ALL_VISIBLE set,
 	 * however.
 	 */
-	else if (lpdead_items > 0 && PageIsAllVisible(page))
+	else if (presult.lpdead_items > 0 && PageIsAllVisible(page))
 	{
 		elog(WARNING, "page containing LP_DEAD items is marked as all-visible in relation \"%s\" page %u",
 			 vacrel->relname, blkno);
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 2311d01998..e346312471 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -252,6 +252,13 @@ typedef struct PruneResult
 	bool		all_frozen;
 	int			nfrozen;
 	HeapTupleFreeze frozen[MaxHeapTuplesPerPage];
+
+	/*
+	 * LP_DEAD items on the page after pruning. Includes existing LP_DEAD
+	 * items
+	 */
+	int			lpdead_items;
+	OffsetNumber deadoffsets[MaxHeapTuplesPerPage];
 } PruneResult;
 
 /* 'reason' codes for heap_page_prune() */
-- 
2.40.1

