From 8ad10261b84775fde97f71cd273041af5160fc28 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Sun, 7 Apr 2024 15:28:32 -0400
Subject: [PATCH v9 2/3] Use streaming read API in ANALYZE

The ANALYZE command prefetches and reads sample blocks chosen by a
BlockSampler algorithm. Instead of calling Prefetch|ReadBuffer() for
each block, ANALYZE now uses the streaming API introduced in b5a9b18cd0.

Author: Nazir Bilal Yavuz
Reviewed-by: Melanie Plageman
Discussion: https://postgr.es/m/flat/CAN55FZ0UhXqk9v3y-zW_fp4-WCp43V8y0A72xPmLkOM%2B6M%2BmJg%40mail.gmail.com
---
 src/backend/commands/analyze.c | 89 ++++++++++------------------------
 1 file changed, 26 insertions(+), 63 deletions(-)

diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 335ffb24302..3cfad92390d 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -1253,6 +1253,20 @@ heap_scan_analyze_next_tuple(HeapScanDesc scan, TransactionId OldestXmin,
 	return false;
 }
 
+/*
+ * Streaming read callback returning the next block number while using
+ * BlockSampling algorithm.
+ */
+static BlockNumber
+block_sampling_streaming_read_next(ReadStream *stream,
+								   void *user_data,
+								   void *per_buffer_data)
+{
+	BlockSamplerData *bs = user_data;
+
+	return BlockSampler_HasMore(bs) ? BlockSampler_Next(bs) : InvalidBlockNumber;
+}
+
 /*
  * acquire_sample_rows -- acquire a random sample of rows from the heap
  *
@@ -1305,10 +1319,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	HeapScanDesc scan;
 	BlockNumber nblocks;
 	BlockNumber blksdone = 0;
-#ifdef USE_PREFETCH
-	int			prefetch_maximum = 0;	/* blocks to prefetch if enabled */
-	BlockSamplerData prefetch_bs;
-#endif
+	ReadStream *stream;
 
 	Assert(targrows > 0);
 
@@ -1321,13 +1332,6 @@ acquire_sample_rows(Relation onerel, int elevel,
 	randseed = pg_prng_uint32(&pg_global_prng_state);
 	nblocks = BlockSampler_Init(&bs, totalblocks, targrows, randseed);
 
-#ifdef USE_PREFETCH
-	prefetch_maximum = get_tablespace_maintenance_io_concurrency(onerel->rd_rel->reltablespace);
-	/* Create another BlockSampler, using the same seed, for prefetching */
-	if (prefetch_maximum)
-		(void) BlockSampler_Init(&prefetch_bs, totalblocks, targrows, randseed);
-#endif
-
 	/* Report sampling block numbers */
 	pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_TOTAL,
 								 nblocks);
@@ -1337,54 +1341,21 @@ acquire_sample_rows(Relation onerel, int elevel,
 
 	scan = (HeapScanDesc) heap_beginscan(onerel, NULL, 0, NULL, NULL, SO_TYPE_ANALYZE);
 	slot = table_slot_create(onerel, NULL);
-
-#ifdef USE_PREFETCH
-
-	/*
-	 * If we are doing prefetching, then go ahead and tell the kernel about
-	 * the first set of pages we are going to want.  This also moves our
-	 * iterator out ahead of the main one being used, where we will keep it so
-	 * that we're always pre-fetching out prefetch_maximum number of blocks
-	 * ahead.
-	 */
-	if (prefetch_maximum)
-	{
-		for (int i = 0; i < prefetch_maximum; i++)
-		{
-			BlockNumber prefetch_block;
-
-			if (!BlockSampler_HasMore(&prefetch_bs))
-				break;
-
-			prefetch_block = BlockSampler_Next(&prefetch_bs);
-			PrefetchBuffer(scan->rs_base.rs_rd, MAIN_FORKNUM, prefetch_block);
-		}
-	}
-#endif
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE,
+										vac_strategy,
+										scan->rs_base.rs_rd,
+										MAIN_FORKNUM,
+										block_sampling_streaming_read_next,
+										&bs,
+										0);
 
 	scan->rs_cbuf = InvalidBuffer;
 
 	/* Outer loop over blocks to sample */
-	while (BlockSampler_HasMore(&bs))
+	while (BufferIsValid(scan->rs_cbuf = read_stream_next_buffer(stream, NULL)))
 	{
-		BlockNumber targblock = BlockSampler_Next(&bs);
-#ifdef USE_PREFETCH
-		BlockNumber prefetch_targblock = InvalidBlockNumber;
-
-		/*
-		 * Make sure that every time the main BlockSampler is moved forward
-		 * that our prefetch BlockSampler also gets moved forward, so that we
-		 * always stay out ahead.
-		 */
-		if (prefetch_maximum && BlockSampler_HasMore(&prefetch_bs))
-			prefetch_targblock = BlockSampler_Next(&prefetch_bs);
-#endif
-
 		vacuum_delay_point();
 
-		scan->rs_cblock = targblock;
-		scan->rs_cindex = FirstOffsetNumber;
-
 		/*
 		 * We must maintain a pin on the target page's buffer to ensure that
 		 * concurrent activity - e.g. HOT pruning - doesn't delete tuples out
@@ -1394,19 +1365,10 @@ acquire_sample_rows(Relation onerel, int elevel,
 		 * we aren't doing much work per tuple, the extra lock traffic is
 		 * probably better avoided.
 		 */
-		scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
-										   targblock, RBM_NORMAL, vac_strategy);
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
-#ifdef USE_PREFETCH
-
-		/*
-		 * When pre-fetching, after we get a block, tell the kernel about the
-		 * next one we will want, if there's any left.
-		 */
-		if (prefetch_maximum && prefetch_targblock != InvalidBlockNumber)
-			PrefetchBuffer(scan->rs_base.rs_rd, MAIN_FORKNUM, prefetch_targblock);
-#endif
+		scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
+		scan->rs_cindex = FirstOffsetNumber;
 
 		while (heap_scan_analyze_next_tuple(scan, OldestXmin, &liverows, &deadrows, slot))
 		{
@@ -1456,6 +1418,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 		pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_DONE,
 									 ++blksdone);
 	}
+	read_stream_end(stream);
 
 	ExecDropSingleTupleTableSlot(slot);
 	heap_endscan((TableScanDesc) scan);
-- 
2.40.1

