From 1a87519763a0fa67433a0049dcb3f9f021bd5e11 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Mon, 29 Jan 2024 12:22:38 -0500
Subject: [PATCH v1 4/4] Sequential scans support streaming read

Add streaming read support for sequential scans. Do so by implementing
the streaming read callback to get the next block and save this block in
the scan descriptor.

The PgStreamingRead object is allocated in initscan(). This means it
will be freed and reallocated on rescan. Implementing a streaming read
reset function is a TODO for that API.

Currently, only forwards scans are supported by the streaming read API;
so, if a scan switches from forwards to backwards, the PgStreamingRead
object will need to be freed. This also means that if a scan switches
from backwards to forwards, it will not use streaming reads.

Distinguishing between a scan that has yet to be initialized, one that
doesn't support streaming reads and one that has switched scan
directions is one reason why it is difficult to wait until
heapfetchbuf() to allocate the PgStreamingRead object.
---
 src/backend/access/heap/heapam.c | 106 ++++++++++++++++++++++++++++---
 src/include/access/heapam.h      |   3 +
 2 files changed, 100 insertions(+), 9 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 9e3e6d8b52b..cc20e0f972c 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -65,6 +65,7 @@
 #include "storage/smgr.h"
 #include "storage/spin.h"
 #include "storage/standby.h"
+#include "storage/streaming_read.h"
 #include "utils/datum.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
@@ -228,6 +229,27 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
  * ----------------------------------------------------------------
  */
 
+static BlockNumber
+heap_pgsr_next_single(PgStreamingRead *pgsr, void *pgsr_private,
+					  void *per_buffer_data)
+{
+	HeapScanDesc scan = (HeapScanDesc) pgsr_private;
+
+	/* Only forward scans support streaming reads */
+	if (!scan->rs_inited)
+	{
+		scan->rs_prefetch_block = heapgettup_initial_block(scan,
+														   ForwardScanDirection);
+		scan->rs_inited = true;
+	}
+	else
+		scan->rs_prefetch_block = heapgettup_advance_block(scan,
+														   scan->rs_prefetch_block,
+														   ForwardScanDirection);
+
+	return scan->rs_prefetch_block;
+}
+
 /* ----------------
  *		initscan - scan code common to heap_beginscan and heap_rescan
  * ----------------
@@ -345,6 +367,36 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
 	 */
 	if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN)
 		pgstat_count_heap_scan(scan->rs_base.rs_rd);
+
+	scan->rs_prefetch_block = InvalidBlockNumber;
+
+	/* pgsr is freed and reallocated on rescan */
+	if (scan->pgsr)
+		pg_streaming_read_free(scan->pgsr);
+	scan->pgsr = NULL;
+	scan->rs_prefetch_block = InvalidBlockNumber;
+
+	/*
+	 * This streaming read cannot be allocated in the per tuple memory context
+	 * which is the current memory context during heapgettup[_pagemode](), as
+	 * the per tuple context is often reset before the end of the query. There
+	 * was discussion of allocating the pgsr when rs_inited is false. We could
+	 * switch into a memory context that doesn't get reset to allocate it
+	 * there, but 1) we probably want to reuse the pgsr across rescans and 2)
+	 * we have to free the pgsr if the scan changes from forwards to a
+	 * backwards scan anyway, so we better just allocate it here.
+	 */
+	if (!RelationUsesLocalBuffers(scan->rs_base.rs_rd) &&
+		(scan->rs_base.rs_flags & SO_TYPE_SEQSCAN))
+	{
+		scan->pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_SEQUENTIAL,
+													scan,
+													0,
+													scan->rs_strategy,
+													BMR_REL(scan->rs_base.rs_rd),
+													MAIN_FORKNUM,
+													heap_pgsr_next_single);
+	}
 }
 
 /*
@@ -488,19 +540,41 @@ heapfetchbuf(TableScanDesc sscan, ScanDirection dir)
 	 */
 	CHECK_FOR_INTERRUPTS();
 
-	if (!scan->rs_inited)
+	/*
+	 * Backwards scans aren't supported with streaming read. At the time of
+	 * allocation, the scan direction is not determined. Note that this means
+	 * that if the scan switches from backwards to forwards, the forward scan
+	 * will not use streaming reads
+	 */
+	if (!ScanDirectionIsForward(dir) && scan->pgsr)
 	{
-		scan->rs_cblock = heapgettup_initial_block(scan, dir);
-		Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
-		scan->rs_inited = true;
+		pg_streaming_read_free(scan->pgsr);
+		scan->pgsr = NULL;
+		scan->rs_prefetch_block = InvalidBlockNumber;
+	}
+
+	if (scan->pgsr)
+	{
+		scan->rs_cbuf = pg_streaming_read_buffer_get_next(scan->pgsr, NULL);
+		if (BufferIsValid(scan->rs_cbuf))
+			scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
 	}
 	else
-		scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir);
+	{
+		if (!scan->rs_inited)
+		{
+			scan->rs_cblock = heapgettup_initial_block(scan, dir);
+			Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
+			scan->rs_inited = true;
+		}
+		else
+			scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir);
 
-	/* read block if valid */
-	if (BlockNumberIsValid(scan->rs_cblock))
-		scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
-										   scan->rs_cblock, RBM_NORMAL, scan->rs_strategy);
+		/* read block if valid */
+		if (BlockNumberIsValid(scan->rs_cblock))
+			scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
+											   scan->rs_cblock, RBM_NORMAL, scan->rs_strategy);
+	}
 }
 
 /*
@@ -1001,6 +1075,15 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 	else
 		scan->rs_parallelworkerdata = NULL;
 
+	/*
+	 * TODO: implement pg_streaming_read_reset(), then allocate the streaming
+	 * reads here. Currently, they are allocated in initscan() which will free
+	 * and reallocate the pgsr on each rescan. Fixing this is especially
+	 * important for nested loop join. For now, set this to NULL to ensure the
+	 * streaming read is allocated in initscan().
+	 */
+	scan->pgsr = NULL;
+
 	/*
 	 * we do this here instead of in initscan() because heap_rescan also calls
 	 * initscan() and we don't want to allocate memory again
@@ -1065,6 +1148,11 @@ heap_endscan(TableScanDesc sscan)
 	if (BufferIsValid(scan->rs_cbuf))
 		ReleaseBuffer(scan->rs_cbuf);
 
+	if (scan->pgsr)
+		pg_streaming_read_free(scan->pgsr);
+	scan->pgsr = NULL;
+	scan->rs_prefetch_block = InvalidBlockNumber;
+
 	/*
 	 * decrement relation reference count and free scan descriptor storage
 	 */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 4a3a017c33a..8e702454367 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -59,6 +59,7 @@ typedef struct HeapScanDescData
 	bool		rs_inited;		/* false = scan not init'd yet */
 	OffsetNumber rs_coffset;	/* current offset # in non-page-at-a-time mode */
 	BlockNumber rs_cblock;		/* current block # in scan, if any */
+	BlockNumber rs_prefetch_block;	/* block being prefetched */
 	Buffer		rs_cbuf;		/* current buffer in scan, if any */
 	/* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */
 
@@ -72,6 +73,8 @@ typedef struct HeapScanDescData
 	 */
 	ParallelBlockTableScanWorkerData *rs_parallelworkerdata;
 
+	struct PgStreamingRead *pgsr;
+
 	/* these fields only used in page-at-a-time mode and for bitmap scans */
 	int			rs_cindex;		/* current tuple's index in vistuples */
 	int			rs_ntuples;		/* number of visible tuples on page */
-- 
2.37.2

