From 56c2033318efd0062d4d0690f12a59654112cb37 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Mon, 26 Feb 2024 15:41:32 -0500
Subject: [PATCH v2 5/5] Sequential scans support streaming read

Implementing streaming read support for heap sequential scans includes
three parts:

Allocate the streaming read object in initscan(). On rescan, free the
existing streaming read object.

Implement a callback returning the next block to prefetch to the
streaming read API.

Invoke the streaming read API when a new page is needed and streaming
reads are enabled. If the scan direction is not forwards, the streaming
read object must be freed so that the fallback method can be used.
Streaming will not be re-enabled for a scan even if it changes back to
forwards.

ci-os-only:
---
 src/backend/access/heap/heapam.c | 96 +++++++++++++++++++++++++++++---
 src/include/access/heapam.h      | 10 ++++
 2 files changed, 97 insertions(+), 9 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index e0fe3d9c326..c88b4221864 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -65,6 +65,7 @@
 #include "storage/smgr.h"
 #include "storage/spin.h"
 #include "storage/standby.h"
+#include "storage/streaming_read.h"
 #include "utils/datum.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
@@ -228,6 +229,29 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
  * ----------------------------------------------------------------
  */
 
+static BlockNumber
+heap_pgsr_next_single(PgStreamingRead *pgsr, void *pgsr_private,
+					  void *per_buffer_data)
+{
+	HeapScanDesc scan = (HeapScanDesc) pgsr_private;
+
+	/*
+	 * Hard-code ScanDirection to ForwardScanDirection since only forward
+	 * scans support streaming reads.
+	 */
+	if (!scan->rs_inited)
+	{
+		scan->rs_prefetch_block = heapgettup_initial_block(scan, ForwardScanDirection);
+		scan->rs_inited = true;
+	}
+	else
+		scan->rs_prefetch_block = heapgettup_advance_block(scan,
+														   scan->rs_prefetch_block,
+														   ForwardScanDirection);
+
+	return scan->rs_prefetch_block;
+}
+
 /* ----------------
  *		initscan - scan code common to heap_beginscan and heap_rescan
  * ----------------
@@ -329,6 +353,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
 	ItemPointerSetInvalid(&scan->rs_ctup.t_self);
 	scan->rs_cbuf = InvalidBuffer;
 	scan->rs_cblock = InvalidBlockNumber;
+	scan->rs_prefetch_block = InvalidBlockNumber;
 
 	/* page-at-a-time fields are always invalid when not rs_inited */
 
@@ -486,19 +511,44 @@ heapfetchbuf(HeapScanDesc scan, ScanDirection dir)
 	 */
 	CHECK_FOR_INTERRUPTS();
 
-	if (!scan->rs_inited)
+	/*
+	 * Only forward scans stream reads. The scan direction is not yet
+	 * determined when the streaming read object is allocated. Thus, all
+	 * sequential scans start out with a streaming object and if they request
+	 * blocks in any direction besides forwards, the streaming read object is
+	 * freed and the scan will not stream reads again. Even if the scan
+	 * changes to forwards from another direction, it will not resume
+	 * streaming reading.
+	 */
+	if (!ScanDirectionIsForward(dir) && scan->rs_pgsr)
 	{
-		scan->rs_cblock = heapgettup_initial_block(scan, dir);
-		Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
-		scan->rs_inited = true;
+		pg_streaming_read_free(scan->rs_pgsr);
+		scan->rs_pgsr = NULL;
+		scan->rs_prefetch_block = InvalidBlockNumber;
+	}
+
+	if (scan->rs_pgsr)
+	{
+		scan->rs_cbuf = pg_streaming_read_buffer_get_next(scan->rs_pgsr, NULL);
+		if (BufferIsValid(scan->rs_cbuf))
+			scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
 	}
 	else
-		scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir);
+	{
+		if (!scan->rs_inited)
+		{
+			scan->rs_cblock = heapgettup_initial_block(scan, dir);
+			Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
+			scan->rs_inited = true;
+		}
+		else
+			scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir);
 
-	/* read block if valid */
-	if (BlockNumberIsValid(scan->rs_cblock))
-		scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
-										   scan->rs_cblock, RBM_NORMAL, scan->rs_strategy);
+		/* read block if valid */
+		if (BlockNumberIsValid(scan->rs_cblock))
+			scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
+											   scan->rs_cblock, RBM_NORMAL, scan->rs_strategy);
+	}
 }
 
 /*
@@ -999,6 +1049,8 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 	else
 		scan->rs_parallelworkerdata = NULL;
 
+	scan->rs_pgsr = NULL;
+
 	/*
 	 * we do this here instead of in initscan() because heap_rescan also calls
 	 * initscan() and we don't want to allocate memory again
@@ -1010,6 +1062,21 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 
 	initscan(scan, key, false);
 
+	/*
+	 * We do not know the scan direction yet. If the scan does not end up
+	 * being a forward scan, the streaming read object will be freed.
+	 */
+	if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN)
+	{
+		scan->rs_pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_SEQUENTIAL,
+													   scan,
+													   0,
+													   scan->rs_strategy,
+													   BMR_REL(scan->rs_base.rs_rd),
+													   MAIN_FORKNUM,
+													   heap_pgsr_next_single);
+	}
+
 	return (TableScanDesc) scan;
 }
 
@@ -1048,6 +1115,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
 	 * reinitialize scan descriptor
 	 */
 	initscan(scan, key, true);
+
+	/*
+	 * The streaming read object is reset on rescan. This must be done after
+	 * initscan(), as some state referred to by pg_streaming_read_reset() is
+	 * reset in initscan().
+	 */
+	if (scan->rs_pgsr)
+		pg_streaming_read_reset(scan->rs_pgsr);
 }
 
 void
@@ -1063,6 +1138,9 @@ heap_endscan(TableScanDesc sscan)
 	if (BufferIsValid(scan->rs_cbuf))
 		ReleaseBuffer(scan->rs_cbuf);
 
+	if (scan->rs_pgsr)
+		pg_streaming_read_free(scan->rs_pgsr);
+
 	/*
 	 * decrement relation reference count and free scan descriptor storage
 	 */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 4a3a017c33a..fe4ab40e18d 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -59,6 +59,7 @@ typedef struct HeapScanDescData
 	bool		rs_inited;		/* false = scan not init'd yet */
 	OffsetNumber rs_coffset;	/* current offset # in non-page-at-a-time mode */
 	BlockNumber rs_cblock;		/* current block # in scan, if any */
+	BlockNumber rs_prefetch_block;	/* block being prefetched */
 	Buffer		rs_cbuf;		/* current buffer in scan, if any */
 	/* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */
 
@@ -72,6 +73,15 @@ typedef struct HeapScanDescData
 	 */
 	ParallelBlockTableScanWorkerData *rs_parallelworkerdata;
 
+	/*
+	 * Streaming read object allocated for each sequential scan and
+	 * reallocated on rescan. If a sequential scan ends up requesting tuples
+	 * in any direction except ForwardScan, the streaming read object is freed
+	 * and the scan falls back to a standard, synchronous block reading
+	 * method.
+	 */
+	struct PgStreamingRead *rs_pgsr;
+
 	/* these fields only used in page-at-a-time mode and for bitmap scans */
 	int			rs_cindex;		/* current tuple's index in vistuples */
 	int			rs_ntuples;		/* number of visible tuples on page */
-- 
2.37.2

