From 766703de5f03bf9d2bda73a0bb703f1c49bb2178 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 12 Apr 2023 18:16:06 -0700
Subject: [PATCH v5 07/10] WIP: Use streaming reads in heapam scans.

XXX Cherry-picked from https://github.com/anarazel/postgres/tree/aio and
lightly modified by TM, for demonstration purposes.

Author: Andres Freund <andres@anarazel.de>
---
 src/backend/access/heap/heapam.c         | 205 +++++++++++++++++++++--
 src/backend/access/heap/heapam_handler.c |   2 +-
 src/include/access/heapam.h              |   5 +-
 3 files changed, 192 insertions(+), 20 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 707460a536..76a53f68b6 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -65,6 +65,7 @@
 #include "storage/smgr.h"
 #include "storage/spin.h"
 #include "storage/standby.h"
+#include "storage/streaming_read.h"
 #include "utils/datum.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
@@ -225,6 +226,89 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
  * ----------------------------------------------------------------
  */
 
+static BlockNumber
+heap_pgsr_next_single(PgStreamingRead *pgsr, void *pgsr_private,
+					  void *per_buffer_data)
+{
+	HeapScanDesc scan = (HeapScanDesc) pgsr_private;
+	BlockNumber blockno;
+
+	Assert(!scan->rs_base.rs_parallel);
+	Assert(scan->rs_nblocks > 0);
+
+	if (scan->rs_prefetch_block == InvalidBlockNumber)
+	{
+		scan->rs_prefetch_block = blockno = scan->rs_startblock;
+	}
+	else
+	{
+		blockno = ++scan->rs_prefetch_block;
+
+		/* wrap back to the start of the heap */
+		if (blockno >= scan->rs_nblocks)
+			scan->rs_prefetch_block = blockno = 0;
+
+		/* we're done if we're back at where we started */
+		if (blockno == scan->rs_startblock)
+			return InvalidBlockNumber;
+
+		/* check if the limit imposed by heap_setscanlimits() is met */
+		if (scan->rs_numblocks != InvalidBlockNumber)
+		{
+			if (--scan->rs_numblocks == 0)
+				return InvalidBlockNumber;
+		}
+	}
+
+	return blockno;
+}
+
+static BlockNumber
+heap_pgsr_next_parallel(PgStreamingRead *pgsr, void *pgsr_private,
+						void *per_buffer_data)
+{
+	HeapScanDesc scan = (HeapScanDesc) pgsr_private;
+	ParallelBlockTableScanDesc pbscan =
+		(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+	ParallelBlockTableScanWorker pbscanwork =
+		scan->rs_parallelworkerdata;
+	BlockNumber blockno;
+
+	Assert(scan->rs_base.rs_parallel);
+	Assert(scan->rs_nblocks > 0);
+
+	/* Note that other processes might have already finished the scan */
+	blockno = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+												pbscanwork, pbscan);
+
+	return blockno;
+}
+
+static PgStreamingRead *
+heap_pgsr_single_alloc(HeapScanDesc scan)
+{
+	return pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT,
+										  scan,
+										  0,
+										  scan->rs_strategy,
+										  BMR_REL(scan->rs_base.rs_rd),
+										  MAIN_FORKNUM,
+										  heap_pgsr_next_single);
+}
+
+static PgStreamingRead *
+heap_pgsr_parallel_alloc(HeapScanDesc scan)
+{
+	return pg_streaming_read_buffer_alloc(PGSR_FLAG_SEQUENTIAL,
+										  scan,
+										  0,
+										  scan->rs_strategy,
+										  BMR_REL(scan->rs_base.rs_rd),
+										  MAIN_FORKNUM,
+										  heap_pgsr_next_parallel);
+}
+
+
 /* ----------------
  *		initscan - scan code common to heap_beginscan and heap_rescan
  * ----------------
@@ -342,6 +426,26 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
 	 */
 	if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN)
 		pgstat_count_heap_scan(scan->rs_base.rs_rd);
+
+	scan->rs_prefetch_block = InvalidBlockNumber;
+	if (scan->pgsr)
+	{
+		pg_streaming_read_free(scan->pgsr);
+		scan->pgsr = NULL;
+	}
+
+	/*
+	 * FIXME: This probably should be done in the !rs_inited blocks instead.
+	 */
+	scan->pgsr = NULL;
+	if (!RelationUsesLocalBuffers(scan->rs_base.rs_rd) &&
+		(scan->rs_base.rs_flags & SO_TYPE_SEQSCAN))
+	{
+		if (scan->rs_base.rs_parallel)
+			scan->pgsr = heap_pgsr_parallel_alloc(scan);
+		else
+			scan->pgsr = heap_pgsr_single_alloc(scan);
+	}
 }
 
 /*
@@ -374,7 +478,7 @@ heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk, BlockNumber numBlk
  * which tuples on the page are visible.
  */
 void
-heapgetpage(TableScanDesc sscan, BlockNumber block)
+heapgetpage(TableScanDesc sscan, BlockNumber block, Buffer pgsr_buffer)
 {
 	HeapScanDesc scan = (HeapScanDesc) sscan;
 	Buffer		buffer;
@@ -401,9 +505,20 @@ heapgetpage(TableScanDesc sscan, BlockNumber block)
 	 */
 	CHECK_FOR_INTERRUPTS();
 
-	/* read page using selected strategy */
-	scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, block,
-									   RBM_NORMAL, scan->rs_strategy);
+	if (BufferIsValid(pgsr_buffer))
+	{
+		Assert(scan->pgsr);
+		Assert(BufferGetBlockNumber(pgsr_buffer) == block);
+		scan->rs_cbuf = pgsr_buffer;
+	}
+	else
+	{
+		Assert(!scan->pgsr);
+
+		/* read page using selected strategy */
+		scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, block,
+										   RBM_NORMAL, scan->rs_strategy);
+	}
 	scan->rs_cblock = block;
 
 	if (!(scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE))
@@ -490,7 +605,7 @@ heapgetpage(TableScanDesc sscan, BlockNumber block)
  * of the pages before we can get a chance to get our first page.
  */
 static BlockNumber
-heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
+heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir, Buffer *pgsr_buf)
 {
 	Assert(!scan->rs_inited);
 
@@ -500,16 +615,25 @@ heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
 
 	if (ScanDirectionIsForward(dir))
 	{
+		if (scan->rs_base.rs_parallel != NULL)
+			table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
+													 scan->rs_parallelworkerdata,
+													 (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
+
+		/* FIXME: Integrate more neatly */
+		if (scan->pgsr)
+		{
+			*pgsr_buf = pg_streaming_read_buffer_get_next(scan->pgsr, NULL);
+			if (*pgsr_buf == InvalidBuffer)
+				return InvalidBlockNumber;
+			return BufferGetBlockNumber(*pgsr_buf);
+		}
+
 		/* serial scan */
 		if (scan->rs_base.rs_parallel == NULL)
 			return scan->rs_startblock;
 		else
 		{
-			/* parallel scan */
-			table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
-													 scan->rs_parallelworkerdata,
-													 (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
-
 			/* may return InvalidBlockNumber if there are no more blocks */
 			return table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
 													 scan->rs_parallelworkerdata,
@@ -529,6 +653,12 @@ heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
 		 */
 		scan->rs_base.rs_flags &= ~SO_ALLOW_SYNC;
 
+		if (scan->pgsr)
+		{
+			pg_streaming_read_free(scan->pgsr);
+			scan->pgsr = NULL;
+		}
+
 		/*
 		 * Start from last page of the scan.  Ensure we take into account
 		 * rs_numblocks if it's been adjusted by heap_setscanlimits().
@@ -630,11 +760,33 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft,
  * heap_setscanlimits().
  */
 static inline BlockNumber
-heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir)
+heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir,
+						 Buffer *pgsr_buf)
 {
 	if (ScanDirectionIsForward(dir))
 	{
-		if (scan->rs_base.rs_parallel == NULL)
+		if (scan->pgsr)
+		{
+#ifdef USE_ASSERT_CHECKING
+			block++;
+
+			/* wrap back to the start of the heap */
+			if (block >= scan->rs_nblocks)
+				block = 0;
+
+			/* we're done if we're back at where we started */
+			if (block == scan->rs_startblock)
+				block = InvalidBlockNumber;
+#endif
+			*pgsr_buf = pg_streaming_read_buffer_get_next(scan->pgsr, NULL);
+			if (*pgsr_buf == InvalidBuffer)
+				return InvalidBlockNumber;
+
+			Assert(scan->rs_base.rs_parallel ||
+				   block == BufferGetBlockNumber(*pgsr_buf));
+			return BufferGetBlockNumber(*pgsr_buf);
+		}
+		else if (scan->rs_base.rs_parallel == NULL)
 		{
 			block++;
 
@@ -679,6 +831,12 @@ heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir
 	}
 	else
 	{
+		if (scan->pgsr)
+		{
+			pg_streaming_read_free(scan->pgsr);
+			scan->pgsr = NULL;
+		}
+
 		/* we're done if the last block is the start position */
 		if (block == scan->rs_startblock)
 			return InvalidBlockNumber;
@@ -728,13 +886,14 @@ heapgettup(HeapScanDesc scan,
 {
 	HeapTuple	tuple = &(scan->rs_ctup);
 	BlockNumber block;
+	Buffer		pgsr_buf = InvalidBuffer;
 	Page		page;
 	OffsetNumber lineoff;
 	int			linesleft;
 
 	if (unlikely(!scan->rs_inited))
 	{
-		block = heapgettup_initial_block(scan, dir);
+		block = heapgettup_initial_block(scan, dir, &pgsr_buf);
 		/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
 		Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
 		scan->rs_inited = true;
@@ -755,7 +914,7 @@ heapgettup(HeapScanDesc scan,
 	 */
 	while (block != InvalidBlockNumber)
 	{
-		heapgetpage((TableScanDesc) scan, block);
+		heapgetpage((TableScanDesc) scan, block, pgsr_buf);
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 		page = heapgettup_start_page(scan, dir, &linesleft, &lineoff);
 continue_page:
@@ -807,9 +966,10 @@ continue_page:
 		 * it's time to move to the next.
 		 */
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
+		pgsr_buf = InvalidBuffer;
 
 		/* get the BlockNumber to scan next */
-		block = heapgettup_advance_block(scan, block, dir);
+		block = heapgettup_advance_block(scan, block, dir, &pgsr_buf);
 	}
 
 	/* end of scan */
@@ -843,13 +1003,14 @@ heapgettup_pagemode(HeapScanDesc scan,
 {
 	HeapTuple	tuple = &(scan->rs_ctup);
 	BlockNumber block;
+	Buffer		pgsr_buf = InvalidBuffer;
 	Page		page;
 	int			lineindex;
 	int			linesleft;
 
 	if (unlikely(!scan->rs_inited))
 	{
-		block = heapgettup_initial_block(scan, dir);
+		block = heapgettup_initial_block(scan, dir, &pgsr_buf);
 		/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
 		Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
 		scan->rs_inited = true;
@@ -876,7 +1037,7 @@ heapgettup_pagemode(HeapScanDesc scan,
 	 */
 	while (block != InvalidBlockNumber)
 	{
-		heapgetpage((TableScanDesc) scan, block);
+		heapgetpage((TableScanDesc) scan, block, pgsr_buf);
 		page = BufferGetPage(scan->rs_cbuf);
 		linesleft = scan->rs_ntuples;
 		lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1;
@@ -908,7 +1069,7 @@ continue_page:
 		}
 
 		/* get the BlockNumber to scan next */
-		block = heapgettup_advance_block(scan, block, dir);
+		block = heapgettup_advance_block(scan, block, dir, &pgsr_buf);
 	}
 
 	/* end of scan */
@@ -956,6 +1117,8 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 	scan->rs_base.rs_parallel = parallel_scan;
 	scan->rs_strategy = NULL;	/* set in initscan */
 
+	scan->pgsr = NULL;
+
 	/*
 	 * Disable page-at-a-time mode if it's not a MVCC-safe snapshot.
 	 */
@@ -1062,6 +1225,12 @@ heap_endscan(TableScanDesc sscan)
 	if (BufferIsValid(scan->rs_cbuf))
 		ReleaseBuffer(scan->rs_cbuf);
 
+	if (scan->pgsr)
+	{
+		pg_streaming_read_free(scan->pgsr);
+		scan->pgsr = NULL;
+	}
+
 	/*
 	 * decrement relation reference count and free scan descriptor storage
 	 */
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index d15a02b2be..6127f9d75a 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2335,7 +2335,7 @@ heapam_scan_sample_next_block(TableScanDesc scan, SampleScanState *scanstate)
 		return false;
 	}
 
-	heapgetpage(scan, blockno);
+	heapgetpage(scan, blockno, InvalidBuffer);
 	hscan->rs_inited = true;
 
 	return true;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 932ec0d6f2..bc53f9f4d2 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -59,6 +59,7 @@ typedef struct HeapScanDescData
 	bool		rs_inited;		/* false = scan not init'd yet */
 	OffsetNumber rs_coffset;	/* current offset # in non-page-at-a-time mode */
 	BlockNumber rs_cblock;		/* current block # in scan, if any */
+	BlockNumber rs_prefetch_block;	/* block being prefetched */
 	Buffer		rs_cbuf;		/* current buffer in scan, if any */
 	/* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */
 
@@ -72,6 +73,8 @@ typedef struct HeapScanDescData
 	 */
 	ParallelBlockTableScanWorkerData *rs_parallelworkerdata;
 
+	struct PgStreamingRead *pgsr;
+
 	/* these fields only used in page-at-a-time mode and for bitmap scans */
 	int			rs_cindex;		/* current tuple's index in vistuples */
 	int			rs_ntuples;		/* number of visible tuples on page */
@@ -246,7 +249,7 @@ extern TableScanDesc heap_beginscan(Relation relation, Snapshot snapshot,
 									uint32 flags);
 extern void heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk,
 							   BlockNumber numBlks);
-extern void heapgetpage(TableScanDesc sscan, BlockNumber block);
+extern void heapgetpage(TableScanDesc sscan, BlockNumber block, Buffer buffer);
 extern void heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
 						bool allow_strat, bool allow_sync, bool allow_pagemode);
 extern void heap_endscan(TableScanDesc sscan);
-- 
2.39.2

