From bc9d97de3729e65752ef6a6e9cbfc0808c4725ac Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Sun, 31 Dec 2023 11:29:02 -0500
Subject: [PATCH v7 6/7] Vacuum first pass uses Streaming Read interface

Now vacuum's first pass, which HOT prunes and records the TIDs of
non-removable dead tuples, uses the streaming read API by implementing a
streaming read callback which invokes heap_vac_scan_next_block().
---
 src/backend/access/heap/vacuumlazy.c | 131 +++++++++++++++++++--------
 1 file changed, 92 insertions(+), 39 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index d2c8f27fc57..d07a2a58b15 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -54,6 +54,7 @@
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "storage/lmgr.h"
+#include "storage/streaming_read.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/pg_rusage.h"
@@ -168,7 +169,12 @@ typedef struct LVRelState
 	char	   *relnamespace;
 	char	   *relname;
 	char	   *indname;		/* Current index name */
-	BlockNumber blkno;			/* used only for heap operations */
+
+	/*
+	 * The current block being processed by vacuum. Used only for heap
+	 * operations. Primarily for error reporting and logging.
+	 */
+	BlockNumber blkno;
 	OffsetNumber offnum;		/* used only for heap operations */
 	VacErrPhase phase;
 	bool		verbose;		/* VACUUM VERBOSE? */
@@ -220,6 +226,12 @@ typedef struct LVRelState
 		BlockNumber next_unskippable_block;
 		/* Next unskippable block's visibility status */
 		bool		next_unskippable_allvis;
+
+		/*
+		 * Buffer containing block of VM with visibility information for
+		 * next_unskippable_block.
+		 */
+		Buffer		next_unskippable_vmbuffer;
 	}			next_block_state;
 } LVRelState;
 
@@ -233,8 +245,7 @@ typedef struct LVSavedErrInfo
 
 /* non-export function prototypes */
 static void lazy_scan_heap(LVRelState *vacrel);
-static bool heap_vac_scan_next_block(LVRelState *vacrel, Buffer *vmbuffer,
-									 BlockNumber *blkno,
+static void heap_vac_scan_next_block(LVRelState *vacrel,
 									 bool *all_visible_according_to_vm);
 static bool lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf,
 								   BlockNumber blkno, Page page,
@@ -777,6 +788,47 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	}
 }
 
+static BlockNumber
+vacuum_scan_pgsr_next(PgStreamingRead *pgsr,
+					  void *pgsr_private, void *per_buffer_data)
+{
+	LVRelState *vacrel = pgsr_private;
+	bool	   *all_visible_according_to_vm = per_buffer_data;
+
+	heap_vac_scan_next_block(vacrel,
+							 all_visible_according_to_vm);
+
+	/*
+	 * If there are no further blocks to vacuum in the relation, release the
+	 * vmbuffer.
+	 */
+	if (!BlockNumberIsValid(vacrel->next_block_state.current_block) &&
+		BufferIsValid(vacrel->next_block_state.next_unskippable_vmbuffer))
+	{
+		ReleaseBuffer(vacrel->next_block_state.next_unskippable_vmbuffer);
+		vacrel->next_block_state.next_unskippable_vmbuffer = InvalidBuffer;
+	}
+
+	return vacrel->next_block_state.current_block;
+}
+
+static inline PgStreamingRead *
+vac_scan_pgsr_alloc(LVRelState *vacrel, PgStreamingReadBufferCB next_block_cb)
+{
+	PgStreamingRead *result = pg_streaming_read_buffer_alloc(PGSR_FLAG_MAINTENANCE, vacrel,
+															 sizeof(bool), vacrel->bstrategy, BMR_REL(vacrel->rel),
+															 MAIN_FORKNUM, next_block_cb);
+
+	/*
+	 * Initialize for first heap_vac_scan_next_block() call. These rely on
+	 * InvalidBlockNumber + 1 = 0
+	 */
+	vacrel->next_block_state.current_block = InvalidBlockNumber;
+	vacrel->next_block_state.next_unskippable_block = InvalidBlockNumber;
+
+	return result;
+}
+
 /*
  *	lazy_scan_heap() -- workhorse function for VACUUM
  *
@@ -816,10 +868,10 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 static void
 lazy_scan_heap(LVRelState *vacrel)
 {
+	Buffer		buf;
 	BlockNumber rel_pages = vacrel->rel_pages,
-				blkno,
 				next_fsm_block_to_vacuum = 0;
-	bool		all_visible_according_to_vm;
+	bool	   *all_visible_according_to_vm;
 
 	VacDeadItems *dead_items = vacrel->dead_items;
 	Buffer		vmbuffer = InvalidBuffer;
@@ -830,23 +882,27 @@ lazy_scan_heap(LVRelState *vacrel)
 	};
 	int64		initprog_val[3];
 
+	PgStreamingRead *pgsr = vac_scan_pgsr_alloc(vacrel, vacuum_scan_pgsr_next);
+
 	/* Report that we're scanning the heap, advertising total # of blocks */
 	initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP;
 	initprog_val[1] = rel_pages;
 	initprog_val[2] = dead_items->max_items;
 	pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
 
-	/* Initialize for first heap_vac_scan_next_block() call */
-	vacrel->next_block_state.current_block = InvalidBlockNumber;
-	vacrel->next_block_state.next_unskippable_block = InvalidBlockNumber;
-
-	while (heap_vac_scan_next_block(vacrel, &vmbuffer,
-									&blkno, &all_visible_according_to_vm))
+	while (BufferIsValid(buf = pg_streaming_read_buffer_get_next(pgsr,
+																 (void **) &all_visible_according_to_vm)))
 	{
-		Buffer		buf;
 		Page		page;
 		bool		has_lpdead_items;
 		bool		got_cleanup_lock = false;
+		BlockNumber blkno;
+
+		vacrel->blkno = blkno = BufferGetBlockNumber(buf);
+
+		CheckBufferIsPinnedOnce(buf);
+
+		page = BufferGetPage(buf);
 
 		vacrel->scanned_pages++;
 
@@ -914,9 +970,6 @@ lazy_scan_heap(LVRelState *vacrel)
 		 */
 		visibilitymap_pin(vacrel->rel, blkno, &vmbuffer);
 
-		buf = ReadBufferExtended(vacrel->rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
-								 vacrel->bstrategy);
-		page = BufferGetPage(buf);
 
 		/*
 		 * We need a buffer cleanup lock to prune HOT chains and defragment
@@ -973,7 +1026,7 @@ lazy_scan_heap(LVRelState *vacrel)
 		 */
 		if (got_cleanup_lock)
 			lazy_scan_prune(vacrel, buf, blkno, page,
-							vmbuffer, all_visible_according_to_vm,
+							vmbuffer, *all_visible_according_to_vm,
 							&has_lpdead_items);
 
 		/*
@@ -1030,7 +1083,7 @@ lazy_scan_heap(LVRelState *vacrel)
 	}
 
 	/* report that everything is now scanned */
-	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
+	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, vacrel->rel_pages);
 
 	/* now we can compute the new value for pg_class.reltuples */
 	vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages,
@@ -1045,6 +1098,8 @@ lazy_scan_heap(LVRelState *vacrel)
 		Max(vacrel->new_live_tuples, 0) + vacrel->recently_dead_tuples +
 		vacrel->missed_dead_tuples;
 
+	pg_streaming_read_free(pgsr);
+
 	/*
 	 * Do index vacuuming (call each index's ambulkdelete routine), then do
 	 * related heap vacuuming
@@ -1056,11 +1111,11 @@ lazy_scan_heap(LVRelState *vacrel)
 	 * Vacuum the remainder of the Free Space Map.  We must do this whether or
 	 * not there were indexes, and whether or not we bypassed index vacuuming.
 	 */
-	if (blkno > next_fsm_block_to_vacuum)
-		FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, blkno);
+	if (vacrel->rel_pages > next_fsm_block_to_vacuum)
+		FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, vacrel->rel_pages);
 
 	/* report all blocks vacuumed */
-	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
+	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, vacrel->rel_pages);
 
 	/* Do final index cleanup (call each index's amvacuumcleanup routine) */
 	if (vacrel->nindexes > 0 && vacrel->do_index_cleanup)
@@ -1072,12 +1127,13 @@ lazy_scan_heap(LVRelState *vacrel)
  *
  * lazy_scan_heap() calls here every time it needs to get the next block to
  * prune and vacuum, using the visibility map, vacuum options, and various
- * thresholds to skip blocks which do not need to be processed and set blkno to
- * the next block that actually needs to be processed.
+ * thresholds to skip blocks which do not need to be processed and set
+ * current_block to the next block that actually needs to be processed.
  *
- * The block number and visibility status of the next block to process are set
- * in blkno and all_visible_according_to_vm. heap_vac_scan_next_block()
- * returns false if there are no further blocks to process.
+ * The number and visibility status of the next block to process are set in
+ * vacrel->next_block_state->current_block and all_visible_according_to_vm.
+ * vacrel->next_block_state->current_block is set to InvalidBlockNumber if
+ * there are no further blocks to process.
  *
  * vacrel is an in/out parameter here; vacuum options and information about the
  * relation are read, members of vacrel->next_block_state are read and set as
@@ -1085,12 +1141,10 @@ lazy_scan_heap(LVRelState *vacrel)
  * don't advance relfrozenxid when we have skipped vacuuming all-visible
  * blocks.
  *
- * vmbuffer is an output parameter which, upon return, will contain the block
- * from the VM containing visibility information for the next unskippable heap
- * block. If we decide not to skip this heap block, the caller is responsible
- * for fetching the correct VM block into vmbuffer before using it. This is
- * okay as providing it as an output parameter is an optimization, not a
- * requirement.
+ * vacrel->next_block_state->vmbuffer will contain visibility information for
+ * the next unskippable heap block. If we decide not to skip this heap block,
+ * the caller is responsible for fetching the correct VM block into the
+ * vmbuffer before using it.
  *
  * Note: our opinion of which blocks can be skipped can go stale immediately.
  * It's okay if caller "misses" a page whose all-visible or all-frozen marking
@@ -1100,9 +1154,9 @@ lazy_scan_heap(LVRelState *vacrel)
  * older XIDs/MXIDs.  The vacrel->skippedallvis flag will be set here when the
  * choice to skip such a range is actually made, making everything safe.)
  */
-static bool
-heap_vac_scan_next_block(LVRelState *vacrel, Buffer *vmbuffer,
-						 BlockNumber *blkno, bool *all_visible_according_to_vm)
+static void
+heap_vac_scan_next_block(LVRelState *vacrel,
+						 bool *all_visible_according_to_vm)
 {
 	/* Relies on InvalidBlockNumber + 1 == 0 */
 	BlockNumber next_block = vacrel->next_block_state.current_block + 1;
@@ -1129,8 +1183,8 @@ heap_vac_scan_next_block(LVRelState *vacrel, Buffer *vmbuffer,
 	 */
 	if (next_block >= vacrel->rel_pages)
 	{
-		vacrel->next_block_state.current_block = *blkno = InvalidBlockNumber;
-		return false;
+		vacrel->next_block_state.current_block = InvalidBlockNumber;
+		return;
 	}
 
 	if (vacrel->next_block_state.next_unskippable_block == InvalidBlockNumber ||
@@ -1144,7 +1198,7 @@ heap_vac_scan_next_block(LVRelState *vacrel, Buffer *vmbuffer,
 		{
 			uint8		mapbits = visibilitymap_get_status(vacrel->rel,
 														   next_unskippable_block,
-														   vmbuffer);
+														   &vacrel->next_block_state.next_unskippable_vmbuffer);
 
 			vacrel->next_block_state.next_unskippable_allvis = mapbits & VISIBILITYMAP_ALL_VISIBLE;
 
@@ -1224,8 +1278,7 @@ heap_vac_scan_next_block(LVRelState *vacrel, Buffer *vmbuffer,
 	else
 		*all_visible_according_to_vm = true;
 
-	vacrel->next_block_state.current_block = *blkno = next_block;
-	return true;
+	vacrel->next_block_state.current_block = next_block;
 }
 
 /*
-- 
2.40.1

