From 2e5ed537b9053ff3212177c1732d6afa2100fa0f Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Sun, 31 Dec 2023 11:29:02 -0500
Subject: [PATCH v5a 6/7] Vacuum first pass uses Streaming Read interface

Now vacuum's first pass, which HOT prunes and records the TIDs of
non-removable dead tuples, uses the streaming read API by implementing a
streaming read callback which invokes heap_vac_scan_get_next_block().
---
 src/backend/access/heap/vacuumlazy.c | 79 +++++++++++++++++++++-------
 1 file changed, 59 insertions(+), 20 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 65d257aab83..fbbc87938e4 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -54,6 +54,7 @@
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "storage/lmgr.h"
+#include "storage/streaming_read.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/pg_rusage.h"
@@ -168,7 +169,12 @@ typedef struct LVRelState
 	char	   *relnamespace;
 	char	   *relname;
 	char	   *indname;		/* Current index name */
-	BlockNumber blkno;			/* used only for heap operations */
+
+	/*
+	 * The current block being processed by vacuum. Used only for heap
+	 * operations. Primarily for error reporting and logging.
+	 */
+	BlockNumber blkno;
 	OffsetNumber offnum;		/* used only for heap operations */
 	VacErrPhase phase;
 	bool		verbose;		/* VACUUM VERBOSE? */
@@ -189,6 +195,12 @@ typedef struct LVRelState
 	BlockNumber missed_dead_pages;	/* # pages with missed dead tuples */
 	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
 
+	/*
+	 * The most recent block submitted in the streaming read callback by the
+	 * first vacuum pass.
+	 */
+	BlockNumber blkno_prefetch;
+
 	/* Statistics output by us, for table */
 	double		new_rel_tuples; /* new estimated total # of tuples */
 	double		new_live_tuples;	/* new estimated total # of live tuples */
@@ -232,7 +244,7 @@ typedef struct LVSavedErrInfo
 
 /* non-export function prototypes */
 static void lazy_scan_heap(LVRelState *vacrel);
-static bool heap_vac_scan_get_next_block(LVRelState *vacrel, BlockNumber next_block,
+static void heap_vac_scan_get_next_block(LVRelState *vacrel, BlockNumber next_block,
 										 BlockNumber *blkno,
 										 bool *all_visible_according_to_vm);
 static bool lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf,
@@ -416,6 +428,9 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	vacrel->nonempty_pages = 0;
 	/* dead_items_alloc allocates vacrel->dead_items later on */
 
+	/* relies on InvalidBlockNumber overflowing to 0 */
+	vacrel->blkno_prefetch = InvalidBlockNumber;
+
 	/* Allocate/initialize output statistics state */
 	vacrel->new_rel_tuples = 0;
 	vacrel->new_live_tuples = 0;
@@ -776,6 +791,22 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	}
 }
 
+static BlockNumber
+vacuum_scan_pgsr_next(PgStreamingRead *pgsr,
+					  void *pgsr_private, void *per_buffer_data)
+{
+	LVRelState *vacrel = pgsr_private;
+	bool	   *all_visible_according_to_vm = per_buffer_data;
+
+	vacrel->blkno_prefetch++;
+
+	heap_vac_scan_get_next_block(vacrel,
+								 vacrel->blkno_prefetch, &vacrel->blkno_prefetch,
+								 all_visible_according_to_vm);
+
+	return vacrel->blkno_prefetch;
+}
+
 /*
  *	lazy_scan_heap() -- workhorse function for VACUUM
  *
@@ -815,12 +846,11 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 static void
 lazy_scan_heap(LVRelState *vacrel)
 {
+	Buffer		buf;
 	BlockNumber rel_pages = vacrel->rel_pages,
 				next_fsm_block_to_vacuum = 0;
-	bool		all_visible_according_to_vm;
+	bool	   *all_visible_according_to_vm;
 
-	/* relies on InvalidBlockNumber overflowing to 0 */
-	BlockNumber blkno = InvalidBlockNumber;
 	VacDeadItems *dead_items = vacrel->dead_items;
 	const int	initprog_index[] = {
 		PROGRESS_VACUUM_PHASE,
@@ -828,6 +858,11 @@ lazy_scan_heap(LVRelState *vacrel)
 		PROGRESS_VACUUM_MAX_DEAD_TUPLES
 	};
 	int64		initprog_val[3];
+	PgStreamingRead *pgsr;
+
+	pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_MAINTENANCE, vacrel,
+										  sizeof(bool), vacrel->bstrategy, BMR_REL(vacrel->rel),
+										  MAIN_FORKNUM, vacuum_scan_pgsr_next);
 
 	/* Report that we're scanning the heap, advertising total # of blocks */
 	initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP;
@@ -838,13 +873,19 @@ lazy_scan_heap(LVRelState *vacrel)
 	vacrel->skip.next_unskippable_block = InvalidBlockNumber;
 	vacrel->skip.vmbuffer = InvalidBuffer;
 
-	while (heap_vac_scan_get_next_block(vacrel, blkno + 1,
-										&blkno, &all_visible_according_to_vm))
+	while (BufferIsValid(buf =
+						 pg_streaming_read_buffer_get_next(pgsr, (void **) &all_visible_according_to_vm)))
 	{
-		Buffer		buf;
 		Page		page;
 		bool		has_lpdead_items;
 		bool		got_cleanup_lock = false;
+		BlockNumber blkno;
+
+		vacrel->blkno = blkno = BufferGetBlockNumber(buf);
+
+		CheckBufferIsPinnedOnce(buf);
+
+		page = BufferGetPage(buf);
 
 		vacrel->scanned_pages++;
 
@@ -912,9 +953,6 @@ lazy_scan_heap(LVRelState *vacrel)
 		 */
 		visibilitymap_pin(vacrel->rel, blkno, &vacrel->skip.vmbuffer);
 
-		buf = ReadBufferExtended(vacrel->rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
-								 vacrel->bstrategy);
-		page = BufferGetPage(buf);
 
 		/*
 		 * We need a buffer cleanup lock to prune HOT chains and defragment
@@ -970,7 +1008,7 @@ lazy_scan_heap(LVRelState *vacrel)
 		 */
 		if (got_cleanup_lock)
 			lazy_scan_prune(vacrel, buf, blkno, page,
-							all_visible_according_to_vm,
+							*all_visible_according_to_vm,
 							&has_lpdead_items);
 
 		/*
@@ -1027,7 +1065,7 @@ lazy_scan_heap(LVRelState *vacrel)
 	}
 
 	/* report that everything is now scanned */
-	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
+	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, vacrel->rel_pages);
 
 	/* now we can compute the new value for pg_class.reltuples */
 	vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages,
@@ -1042,6 +1080,8 @@ lazy_scan_heap(LVRelState *vacrel)
 		Max(vacrel->new_live_tuples, 0) + vacrel->recently_dead_tuples +
 		vacrel->missed_dead_tuples;
 
+	pg_streaming_read_free(pgsr);
+
 	/*
 	 * Do index vacuuming (call each index's ambulkdelete routine), then do
 	 * related heap vacuuming
@@ -1053,11 +1093,11 @@ lazy_scan_heap(LVRelState *vacrel)
 	 * Vacuum the remainder of the Free Space Map.  We must do this whether or
 	 * not there were indexes, and whether or not we bypassed index vacuuming.
 	 */
-	if (blkno > next_fsm_block_to_vacuum)
-		FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, blkno);
+	if (vacrel->rel_pages > next_fsm_block_to_vacuum)
+		FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, vacrel->rel_pages);
 
 	/* report all blocks vacuumed */
-	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
+	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, vacrel->rel_pages);
 
 	/* Do final index cleanup (call each index's amvacuumcleanup routine) */
 	if (vacrel->nindexes > 0 && vacrel->do_index_cleanup)
@@ -1090,7 +1130,7 @@ lazy_scan_heap(LVRelState *vacrel)
  *
  * The block number and visibility status of the next block to process are set
  * in blkno and all_visible_according_to_vm. heap_vac_scan_get_next_block()
- * returns false if there are no further blocks to process.
+ * sets blkno to InvalidBlockNumber if there are no further blocks to process.
  *
  * vacrel is an in/out parameter here; vacuum options and information about the
  * relation are read and vacrel->skippedallvis is set to ensure we don't
@@ -1110,7 +1150,7 @@ lazy_scan_heap(LVRelState *vacrel)
  * older XIDs/MXIDs.  The vacrel->skippedallvis flag will be set here when the
  * choice to skip such a range is actually made, making everything safe.)
  */
-static bool
+static void
 heap_vac_scan_get_next_block(LVRelState *vacrel, BlockNumber next_block,
 							 BlockNumber *blkno, bool *all_visible_according_to_vm)
 {
@@ -1119,7 +1159,7 @@ heap_vac_scan_get_next_block(LVRelState *vacrel, BlockNumber next_block,
 	if (next_block >= vacrel->rel_pages)
 	{
 		*blkno = InvalidBlockNumber;
-		return false;
+		return;
 	}
 
 	if (vacrel->skip.next_unskippable_block == InvalidBlockNumber ||
@@ -1214,7 +1254,6 @@ heap_vac_scan_get_next_block(LVRelState *vacrel, BlockNumber next_block,
 		*all_visible_according_to_vm = true;
 
 	*blkno = next_block;
-	return true;
 }
 
 /*
-- 
2.40.1

