From 2e3c2890ff7aa13c2b33f66ed1cfb8c3b58ea21c Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Mon, 26 Feb 2024 15:41:32 -0500
Subject: [PATCH v3 5/5] Sequential scans and TID range scans stream reads

Implementing streaming read support for heap sequential scans and TID
range scans includes three parts:

Allocate the streaming read object in heap_beginscan(). On rescan, reset
the streaming read by release all pinned buffers and resetting the
prefetch block.

Implement a callback returning the next block to prefetch to the
streaming read API.

Invoke the streaming read API when a new page is needed and streaming
reads are enabled. When the scan direction changes, reset the streaming
read.
---
 src/backend/access/heap/heapam.c | 88 ++++++++++++++++++++++++++++----
 src/include/access/heapam.h      | 12 +++++
 2 files changed, 90 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index e0fe3d9c326..7143a3efb9a 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -65,6 +65,7 @@
 #include "storage/smgr.h"
 #include "storage/spin.h"
 #include "storage/standby.h"
+#include "storage/streaming_read.h"
 #include "utils/datum.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
@@ -228,6 +229,29 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
  * ----------------------------------------------------------------
  */
 
+static BlockNumber
+heap_pgsr_next_single(PgStreamingRead *pgsr, void *pgsr_private,
+					  void *per_buffer_data)
+{
+	HeapScanDesc scan = (HeapScanDesc) pgsr_private;
+
+	/*
+	 * Hard-code ScanDirection to ForwardScanDirection since only forward
+	 * scans support streaming reads.
+	 */
+	if (!scan->rs_inited)
+	{
+		scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir);
+		scan->rs_inited = true;
+	}
+	else
+		scan->rs_prefetch_block = heapgettup_advance_block(scan,
+														   scan->rs_prefetch_block,
+														   scan->rs_dir);
+
+	return scan->rs_prefetch_block;
+}
+
 /* ----------------
  *		initscan - scan code common to heap_beginscan and heap_rescan
  * ----------------
@@ -330,6 +354,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
 	scan->rs_cbuf = InvalidBuffer;
 	scan->rs_cblock = InvalidBlockNumber;
 
+	/*
+	 * Initialize to ForwardScanDirection because it is most common and heap
+	 * scans usually must go forwards before going backward.
+	 */
+	scan->rs_dir = ForwardScanDirection;
+	scan->rs_prefetch_block = InvalidBlockNumber;
+
 	/* page-at-a-time fields are always invalid when not rs_inited */
 
 	/*
@@ -472,6 +503,8 @@ heapbuildvis(TableScanDesc sscan)
 static inline void
 heapfetchbuf(HeapScanDesc scan, ScanDirection dir)
 {
+	Assert(scan->rs_pgsr);
+
 	/* release previous scan buffer, if any */
 	if (BufferIsValid(scan->rs_cbuf))
 	{
@@ -486,19 +519,23 @@ heapfetchbuf(HeapScanDesc scan, ScanDirection dir)
 	 */
 	CHECK_FOR_INTERRUPTS();
 
-	if (!scan->rs_inited)
+	/*
+	 * If the scan direction is changing, reset the prefetch block to the
+	 * current block. Otherwise, we will incorrectly prefetch the blocks
+	 * between the prefetch block and the current block again before
+	 * prefetching blocks in the new, correct scan direction.
+	 */
+	if (scan->rs_dir != dir && scan->rs_pgsr)
 	{
-		scan->rs_cblock = heapgettup_initial_block(scan, dir);
-		Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
-		scan->rs_inited = true;
+		scan->rs_prefetch_block = scan->rs_cblock;
+		pg_streaming_read_reset(scan->rs_pgsr);
 	}
-	else
-		scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir);
 
-	/* read block if valid */
-	if (BlockNumberIsValid(scan->rs_cblock))
-		scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
-										   scan->rs_cblock, RBM_NORMAL, scan->rs_strategy);
+	scan->rs_dir = dir;
+
+	scan->rs_cbuf = pg_streaming_read_buffer_get_next(scan->rs_pgsr, NULL);
+	if (BufferIsValid(scan->rs_cbuf))
+		scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
 }
 
 /*
@@ -827,6 +864,7 @@ continue_page:
 
 	scan->rs_cbuf = InvalidBuffer;
 	scan->rs_cblock = InvalidBlockNumber;
+	scan->rs_prefetch_block = InvalidBlockNumber;
 	tuple->t_data = NULL;
 	scan->rs_inited = false;
 }
@@ -917,6 +955,7 @@ continue_page:
 		ReleaseBuffer(scan->rs_cbuf);
 	scan->rs_cbuf = InvalidBuffer;
 	scan->rs_cblock = InvalidBlockNumber;
+	scan->rs_prefetch_block = InvalidBlockNumber;
 	tuple->t_data = NULL;
 	scan->rs_inited = false;
 }
@@ -999,6 +1038,8 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 	else
 		scan->rs_parallelworkerdata = NULL;
 
+	scan->rs_pgsr = NULL;
+
 	/*
 	 * we do this here instead of in initscan() because heap_rescan also calls
 	 * initscan() and we don't want to allocate memory again
@@ -1010,6 +1051,22 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 
 	initscan(scan, key, false);
 
+	/*
+	 * We do not know the scan direction yet. If the scan does not end up
+	 * being a forward scan, the streaming read object will be freed.
+	 */
+	if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN ||
+		scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN)
+	{
+		scan->rs_pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_SEQUENTIAL,
+													   scan,
+													   0,
+													   scan->rs_strategy,
+													   BMR_REL(scan->rs_base.rs_rd),
+													   MAIN_FORKNUM,
+													   heap_pgsr_next_single);
+	}
+
 	return (TableScanDesc) scan;
 }
 
@@ -1048,6 +1105,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
 	 * reinitialize scan descriptor
 	 */
 	initscan(scan, key, true);
+
+	/*
+	 * The streaming read object is reset on rescan. This must be done after
+	 * initscan(), as some state referred to by pg_streaming_read_reset() is
+	 * reset in initscan().
+	 */
+	if (scan->rs_pgsr)
+		pg_streaming_read_reset(scan->rs_pgsr);
 }
 
 void
@@ -1063,6 +1128,9 @@ heap_endscan(TableScanDesc sscan)
 	if (BufferIsValid(scan->rs_cbuf))
 		ReleaseBuffer(scan->rs_cbuf);
 
+	if (scan->rs_pgsr)
+		pg_streaming_read_free(scan->rs_pgsr);
+
 	/*
 	 * decrement relation reference count and free scan descriptor storage
 	 */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index e2b1b2a3ad9..0f8f51ce326 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -72,6 +72,18 @@ typedef struct HeapScanDescData
 	 */
 	ParallelBlockTableScanWorkerData *rs_parallelworkerdata;
 
+	/*
+	 * Fields used for streaming reads by sequential scans and TID range
+	 * scans. The streaming read object is allocated at the beginning of the
+	 * scan and reset on rescan or when the scan direction changes. The scan
+	 * direction is saved each time a new page is requested. If the scan
+	 * direction changes from one page to the next, the streaming read object
+	 * releases all previously pinned buffers and resets the prefetch block.
+	 */
+	ScanDirection rs_dir;
+	struct PgStreamingRead *rs_pgsr;
+	BlockNumber rs_prefetch_block;
+
 	/* these fields only used in page-at-a-time mode and for bitmap scans */
 	int			rs_cindex;		/* current tuple's index in vistuples */
 	int			rs_ntuples;		/* number of visible tuples on page */
-- 
2.37.2

