From c3cf35fcb3110da791e9edc1b3325dc8d0080068 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Tue, 27 Feb 2024 14:35:36 -0500
Subject: [PATCH v7 7/7] Vacuum second pass uses Streaming Read interface

Now vacuum's second pass, which removes dead items referring to dead
tuples catalogued in the first pass, uses the streaming read API by
implementing a streaming read callback which returns the next block
containing previously catalogued dead items. A new struct,
VacReapBlkState, is introduced to provide the caller with the starting
and ending indexes of dead items to vacuum.
---
 src/backend/access/heap/vacuumlazy.c | 110 ++++++++++++++++++++-------
 src/tools/pgindent/typedefs.list     |   1 +
 2 files changed, 85 insertions(+), 26 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index d07a2a58b15..375b66a62c4 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -195,6 +195,12 @@ typedef struct LVRelState
 	BlockNumber missed_dead_pages;	/* # pages with missed dead tuples */
 	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
 
+	/*
+	 * The index of the next TID in dead_items to reap during the second
+	 * vacuum pass.
+	 */
+	int			idx_prefetch;
+
 	/* Statistics output by us, for table */
 	double		new_rel_tuples; /* new estimated total # of tuples */
 	double		new_live_tuples;	/* new estimated total # of live tuples */
@@ -243,6 +249,21 @@ typedef struct LVSavedErrInfo
 	VacErrPhase phase;
 } LVSavedErrInfo;
 
+/*
+ * State set up in streaming read callback during vacuum's second pass which
+ * removes dead items referring to dead tuples catalogued in the first pass
+ */
+typedef struct VacReapBlkState
+{
+	/*
+	 * The indexes of the TIDs of the first and last dead tuples in a single
+	 * block in the currently vacuumed relation. The callback will set these
+	 * up prior to adding this block to the stream.
+	 */
+	int			start_idx;
+	int			end_idx;
+} VacReapBlkState;
+
 /* non-export function prototypes */
 static void lazy_scan_heap(LVRelState *vacrel);
 static void heap_vac_scan_next_block(LVRelState *vacrel,
@@ -260,8 +281,9 @@ static bool lazy_scan_noprune(LVRelState *vacrel, Buffer buf,
 static void lazy_vacuum(LVRelState *vacrel);
 static bool lazy_vacuum_all_indexes(LVRelState *vacrel);
 static void lazy_vacuum_heap_rel(LVRelState *vacrel);
-static int	lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
-								  Buffer buffer, int index, Buffer vmbuffer);
+static void lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
+								  Buffer buffer, Buffer vmbuffer,
+								  VacReapBlkState *rbstate);
 static bool lazy_check_wraparound_failsafe(LVRelState *vacrel);
 static void lazy_cleanup_all_indexes(LVRelState *vacrel);
 static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel,
@@ -2426,6 +2448,37 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	return allindexes;
 }
 
+static BlockNumber
+vacuum_reap_lp_pgsr_next(PgStreamingRead *pgsr,
+						 void *pgsr_private,
+						 void *per_buffer_data)
+{
+	BlockNumber blkno;
+	LVRelState *vacrel = pgsr_private;
+	VacReapBlkState *rbstate = per_buffer_data;
+
+	VacDeadItems *dead_items = vacrel->dead_items;
+
+	if (vacrel->idx_prefetch == dead_items->num_items)
+		return InvalidBlockNumber;
+
+	blkno = ItemPointerGetBlockNumber(&dead_items->items[vacrel->idx_prefetch]);
+	rbstate->start_idx = vacrel->idx_prefetch;
+
+	for (; vacrel->idx_prefetch < dead_items->num_items; vacrel->idx_prefetch++)
+	{
+		BlockNumber curblkno =
+			ItemPointerGetBlockNumber(&dead_items->items[vacrel->idx_prefetch]);
+
+		if (blkno != curblkno)
+			break;				/* past end of tuples for this block */
+	}
+
+	rbstate->end_idx = vacrel->idx_prefetch;
+
+	return blkno;
+}
+
 /*
  *	lazy_vacuum_heap_rel() -- second pass over the heap for two pass strategy
  *
@@ -2447,7 +2500,9 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 static void
 lazy_vacuum_heap_rel(LVRelState *vacrel)
 {
-	int			index = 0;
+	Buffer		buf;
+	PgStreamingRead *pgsr;
+	VacReapBlkState *rbstate;
 	BlockNumber vacuumed_pages = 0;
 	Buffer		vmbuffer = InvalidBuffer;
 	LVSavedErrInfo saved_err_info;
@@ -2465,17 +2520,21 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 							 VACUUM_ERRCB_PHASE_VACUUM_HEAP,
 							 InvalidBlockNumber, InvalidOffsetNumber);
 
-	while (index < vacrel->dead_items->num_items)
+	pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_MAINTENANCE, vacrel,
+										  sizeof(VacReapBlkState), vacrel->bstrategy, BMR_REL(vacrel->rel),
+										  MAIN_FORKNUM, vacuum_reap_lp_pgsr_next);
+
+	while (BufferIsValid(buf =
+						 pg_streaming_read_buffer_get_next(pgsr,
+														   (void **) &rbstate)))
 	{
 		BlockNumber blkno;
-		Buffer		buf;
 		Page		page;
 		Size		freespace;
 
 		vacuum_delay_point();
 
-		blkno = ItemPointerGetBlockNumber(&vacrel->dead_items->items[index]);
-		vacrel->blkno = blkno;
+		vacrel->blkno = blkno = BufferGetBlockNumber(buf);
 
 		/*
 		 * Pin the visibility map page in case we need to mark the page
@@ -2485,10 +2544,8 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 		visibilitymap_pin(vacrel->rel, blkno, &vmbuffer);
 
 		/* We need a non-cleanup exclusive lock to mark dead_items unused */
-		buf = ReadBufferExtended(vacrel->rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
-								 vacrel->bstrategy);
 		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
-		index = lazy_vacuum_heap_page(vacrel, blkno, buf, index, vmbuffer);
+		lazy_vacuum_heap_page(vacrel, blkno, buf, vmbuffer, rbstate);
 
 		/* Now that we've vacuumed the page, record its available space */
 		page = BufferGetPage(buf);
@@ -2507,14 +2564,16 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 	 * We set all LP_DEAD items from the first heap pass to LP_UNUSED during
 	 * the second heap pass.  No more, no less.
 	 */
-	Assert(index > 0);
+	Assert(rbstate->end_idx > 0);
 	Assert(vacrel->num_index_scans > 1 ||
-		   (index == vacrel->lpdead_items &&
+		   (rbstate->end_idx == vacrel->lpdead_items &&
 			vacuumed_pages == vacrel->lpdead_item_pages));
 
+	pg_streaming_read_free(pgsr);
+
 	ereport(DEBUG2,
 			(errmsg("table \"%s\": removed %lld dead item identifiers in %u pages",
-					vacrel->relname, (long long) index, vacuumed_pages)));
+					vacrel->relname, (long long) rbstate->end_idx, vacuumed_pages)));
 
 	/* Revert to the previous phase information for error traceback */
 	restore_vacuum_error_info(vacrel, &saved_err_info);
@@ -2528,13 +2587,12 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
  * cleanup lock is also acceptable).  vmbuffer must be valid and already have
  * a pin on blkno's visibility map page.
  *
- * index is an offset into the vacrel->dead_items array for the first listed
- * LP_DEAD item on the page.  The return value is the first index immediately
- * after all LP_DEAD items for the same page in the array.
+ * Given a block and dead items recorded during the first pass, set those items
+ * dead and truncate the line pointer array. Update the VM as appropriate.
  */
-static int
-lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
-					  int index, Buffer vmbuffer)
+static void
+lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
+					  Buffer buffer, Buffer vmbuffer, VacReapBlkState *rbstate)
 {
 	VacDeadItems *dead_items = vacrel->dead_items;
 	Page		page = BufferGetPage(buffer);
@@ -2555,16 +2613,17 @@ lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
 
 	START_CRIT_SECTION();
 
-	for (; index < dead_items->num_items; index++)
+	for (int i = rbstate->start_idx; i < rbstate->end_idx; i++)
 	{
-		BlockNumber tblk;
 		OffsetNumber toff;
+		ItemPointer dead_item;
 		ItemId		itemid;
 
-		tblk = ItemPointerGetBlockNumber(&dead_items->items[index]);
-		if (tblk != blkno)
-			break;				/* past end of tuples for this block */
-		toff = ItemPointerGetOffsetNumber(&dead_items->items[index]);
+		dead_item = &dead_items->items[i];
+
+		Assert(ItemPointerGetBlockNumber(dead_item) == blkno);
+
+		toff = ItemPointerGetOffsetNumber(dead_item);
 		itemid = PageGetItemId(page, toff);
 
 		Assert(ItemIdIsDead(itemid) && !ItemIdHasStorage(itemid));
@@ -2634,7 +2693,6 @@ lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
 
 	/* Revert to the previous phase information for error traceback */
 	restore_vacuum_error_info(vacrel, &saved_err_info);
-	return index;
 }
 
 /*
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 5f637f07eeb..20b85a69f9d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2972,6 +2972,7 @@ VacOptValue
 VacuumParams
 VacuumRelation
 VacuumStmt
+VacReapBlkState
 ValidIOData
 ValidateIndexState
 ValuesScan
-- 
2.40.1

