From 84a5907b486d1f6c2ffe029a2e28fc557065739f Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Tue, 27 Feb 2024 14:35:36 -0500
Subject: [PATCH v5a 7/7] Vacuum second pass uses Streaming Read interface

Now vacuum's second pass, which removes dead items referring to dead
tuples catalogued in the first pass, uses the streaming read API by
implementing a streaming read callback which returns the next block
containing previously catalogued dead items. A new struct,
VacReapBlkState, is introduced to provide the caller with the starting
and ending indexes of dead items to vacuum.

ci-os-only:
---
 src/backend/access/heap/vacuumlazy.c | 110 ++++++++++++++++++++-------
 src/tools/pgindent/typedefs.list     |   1 +
 2 files changed, 85 insertions(+), 26 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index fbbc87938e4..68c146984b1 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -201,6 +201,12 @@ typedef struct LVRelState
 	 */
 	BlockNumber blkno_prefetch;
 
+	/*
+	 * The index of the next TID in dead_items to reap during the second
+	 * vacuum pass.
+	 */
+	int			idx_prefetch;
+
 	/* Statistics output by us, for table */
 	double		new_rel_tuples; /* new estimated total # of tuples */
 	double		new_live_tuples;	/* new estimated total # of live tuples */
@@ -242,6 +248,21 @@ typedef struct LVSavedErrInfo
 	VacErrPhase phase;
 } LVSavedErrInfo;
 
+/*
+ * State set up in streaming read callback during vacuum's second pass which
+ * removes dead items referring to dead tuples cataloged in the first pass
+ */
+typedef struct VacReapBlkState
+{
+	/*
+	 * The indexes of the TIDs of the first and last dead tuples in a single
+	 * block in the currently vacuumed relation. The callback will set these
+	 * up prior to adding this block to the stream.
+	 */
+	int			start_idx;
+	int			end_idx;
+} VacReapBlkState;
+
 /* non-export function prototypes */
 static void lazy_scan_heap(LVRelState *vacrel);
 static void heap_vac_scan_get_next_block(LVRelState *vacrel, BlockNumber next_block,
@@ -260,8 +281,9 @@ static bool lazy_scan_noprune(LVRelState *vacrel, Buffer buf,
 static void lazy_vacuum(LVRelState *vacrel);
 static bool lazy_vacuum_all_indexes(LVRelState *vacrel);
 static void lazy_vacuum_heap_rel(LVRelState *vacrel);
-static int	lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
-								  Buffer buffer, int index, Buffer vmbuffer);
+static void lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
+								  Buffer buffer, Buffer vmbuffer,
+								  VacReapBlkState *rbstate);
 static bool lazy_check_wraparound_failsafe(LVRelState *vacrel);
 static void lazy_cleanup_all_indexes(LVRelState *vacrel);
 static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel,
@@ -2401,6 +2423,37 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	return allindexes;
 }
 
+static BlockNumber
+vacuum_reap_lp_pgsr_next(PgStreamingRead *pgsr,
+						 void *pgsr_private,
+						 void *per_buffer_data)
+{
+	BlockNumber blkno;
+	LVRelState *vacrel = pgsr_private;
+	VacReapBlkState *rbstate = per_buffer_data;
+
+	VacDeadItems *dead_items = vacrel->dead_items;
+
+	if (vacrel->idx_prefetch == dead_items->num_items)
+		return InvalidBlockNumber;
+
+	blkno = ItemPointerGetBlockNumber(&dead_items->items[vacrel->idx_prefetch]);
+	rbstate->start_idx = vacrel->idx_prefetch;
+
+	for (; vacrel->idx_prefetch < dead_items->num_items; vacrel->idx_prefetch++)
+	{
+		BlockNumber curblkno =
+			ItemPointerGetBlockNumber(&dead_items->items[vacrel->idx_prefetch]);
+
+		if (blkno != curblkno)
+			break;				/* past end of tuples for this block */
+	}
+
+	rbstate->end_idx = vacrel->idx_prefetch;
+
+	return blkno;
+}
+
 /*
  *	lazy_vacuum_heap_rel() -- second pass over the heap for two pass strategy
  *
@@ -2422,7 +2475,9 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 static void
 lazy_vacuum_heap_rel(LVRelState *vacrel)
 {
-	int			index = 0;
+	Buffer		buf;
+	PgStreamingRead *pgsr;
+	VacReapBlkState *rbstate;
 	BlockNumber vacuumed_pages = 0;
 	Buffer		vmbuffer = InvalidBuffer;
 	LVSavedErrInfo saved_err_info;
@@ -2440,17 +2495,21 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 							 VACUUM_ERRCB_PHASE_VACUUM_HEAP,
 							 InvalidBlockNumber, InvalidOffsetNumber);
 
-	while (index < vacrel->dead_items->num_items)
+	pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_MAINTENANCE, vacrel,
+										  sizeof(VacReapBlkState), vacrel->bstrategy, BMR_REL(vacrel->rel),
+										  MAIN_FORKNUM, vacuum_reap_lp_pgsr_next);
+
+	while (BufferIsValid(buf =
+						 pg_streaming_read_buffer_get_next(pgsr,
+														   (void **) &rbstate)))
 	{
 		BlockNumber blkno;
-		Buffer		buf;
 		Page		page;
 		Size		freespace;
 
 		vacuum_delay_point();
 
-		blkno = ItemPointerGetBlockNumber(&vacrel->dead_items->items[index]);
-		vacrel->blkno = blkno;
+		vacrel->blkno = blkno = BufferGetBlockNumber(buf);
 
 		/*
 		 * Pin the visibility map page in case we need to mark the page
@@ -2460,10 +2519,8 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 		visibilitymap_pin(vacrel->rel, blkno, &vmbuffer);
 
 		/* We need a non-cleanup exclusive lock to mark dead_items unused */
-		buf = ReadBufferExtended(vacrel->rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
-								 vacrel->bstrategy);
 		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
-		index = lazy_vacuum_heap_page(vacrel, blkno, buf, index, vmbuffer);
+		lazy_vacuum_heap_page(vacrel, blkno, buf, vmbuffer, rbstate);
 
 		/* Now that we've vacuumed the page, record its available space */
 		page = BufferGetPage(buf);
@@ -2482,14 +2539,16 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 	 * We set all LP_DEAD items from the first heap pass to LP_UNUSED during
 	 * the second heap pass.  No more, no less.
 	 */
-	Assert(index > 0);
+	Assert(rbstate->end_idx > 0);
 	Assert(vacrel->num_index_scans > 1 ||
-		   (index == vacrel->lpdead_items &&
+		   (rbstate->end_idx == vacrel->lpdead_items &&
 			vacuumed_pages == vacrel->lpdead_item_pages));
 
+	pg_streaming_read_free(pgsr);
+
 	ereport(DEBUG2,
 			(errmsg("table \"%s\": removed %lld dead item identifiers in %u pages",
-					vacrel->relname, (long long) index, vacuumed_pages)));
+					vacrel->relname, (long long) rbstate->end_idx, vacuumed_pages)));
 
 	/* Revert to the previous phase information for error traceback */
 	restore_vacuum_error_info(vacrel, &saved_err_info);
@@ -2503,13 +2562,12 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
  * cleanup lock is also acceptable).  vmbuffer must be valid and already have
  * a pin on blkno's visibility map page.
  *
- * index is an offset into the vacrel->dead_items array for the first listed
- * LP_DEAD item on the page.  The return value is the first index immediately
- * after all LP_DEAD items for the same page in the array.
+ * Given a block and dead items recorded during the first pass, set those items
+ * dead and truncate the line pointer array. Update the VM as appropriate.
  */
-static int
-lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
-					  int index, Buffer vmbuffer)
+static void
+lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
+					  Buffer buffer, Buffer vmbuffer, VacReapBlkState *rbstate)
 {
 	VacDeadItems *dead_items = vacrel->dead_items;
 	Page		page = BufferGetPage(buffer);
@@ -2530,16 +2588,17 @@ lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
 
 	START_CRIT_SECTION();
 
-	for (; index < dead_items->num_items; index++)
+	for (int i = rbstate->start_idx; i < rbstate->end_idx; i++)
 	{
-		BlockNumber tblk;
 		OffsetNumber toff;
+		ItemPointer dead_item;
 		ItemId		itemid;
 
-		tblk = ItemPointerGetBlockNumber(&dead_items->items[index]);
-		if (tblk != blkno)
-			break;				/* past end of tuples for this block */
-		toff = ItemPointerGetOffsetNumber(&dead_items->items[index]);
+		dead_item = &dead_items->items[i];
+
+		Assert(ItemPointerGetBlockNumber(dead_item) == blkno);
+
+		toff = ItemPointerGetOffsetNumber(dead_item);
 		itemid = PageGetItemId(page, toff);
 
 		Assert(ItemIdIsDead(itemid) && !ItemIdHasStorage(itemid));
@@ -2609,7 +2668,6 @@ lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
 
 	/* Revert to the previous phase information for error traceback */
 	restore_vacuum_error_info(vacrel, &saved_err_info);
-	return index;
 }
 
 /*
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index aea8babd71a..a8f0b5f091d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2972,6 +2972,7 @@ VacOptValue
 VacuumParams
 VacuumRelation
 VacuumStmt
+VacReapBlkState
 ValidIOData
 ValidateIndexState
 ValuesScan
-- 
2.40.1

