From dda58b9b556001a336b2ddaa0d516abce4b10f3f Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 27 Mar 2024 20:25:06 -0400
Subject: [PATCH v7 4/4] Sequential scans and TID range scans stream reads

Implementing streaming read support for heap sequential scans and TID
range scans includes three parts:

Allocate the read stream object in heap_beginscan(). On rescan, reset
the stream by releasing all pinned buffers and resetting the prefetch
block.

Implement a callback returning the next block to prefetch to the
read stream infrastructure.

Invoke the read stream API when a new page is needed. When the scan
direction changes, reset the stream.
---
 src/backend/access/heap/heapam.c | 90 ++++++++++++++++++++++++++++----
 src/include/access/heapam.h      | 15 ++++++
 2 files changed, 95 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 46645dc971f..6b550308e58 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -221,6 +221,25 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
  * ----------------------------------------------------------------
  */
 
+static BlockNumber
+heap_scan_stream_read_next(ReadStream *pgsr, void *private_data,
+						   void *per_buffer_data)
+{
+	HeapScanDesc scan = (HeapScanDesc) private_data;
+
+	if (unlikely(!scan->rs_inited))
+	{
+		scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir);
+		scan->rs_inited = true;
+	}
+	else
+		scan->rs_prefetch_block = heapgettup_advance_block(scan,
+														   scan->rs_prefetch_block,
+														   scan->rs_dir);
+
+	return scan->rs_prefetch_block;
+}
+
 /* ----------------
  *		initscan - scan code common to heap_beginscan and heap_rescan
  * ----------------
@@ -323,6 +342,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
 	scan->rs_cbuf = InvalidBuffer;
 	scan->rs_cblock = InvalidBlockNumber;
 
+	/*
+	 * Initialize to ForwardScanDirection because it is most common and heap
+	 * scans usually must go forwards before going backward.
+	 */
+	scan->rs_dir = ForwardScanDirection;
+	scan->rs_prefetch_block = InvalidBlockNumber;
+
 	/* page-at-a-time fields are always invalid when not rs_inited */
 
 	/*
@@ -465,6 +491,8 @@ heapbuildvis(TableScanDesc sscan)
 static pg_attribute_always_inline void
 heapfetchbuf(HeapScanDesc scan, ScanDirection dir)
 {
+	Assert(scan->rs_read_stream);
+
 	/* release previous scan buffer, if any */
 	if (BufferIsValid(scan->rs_cbuf))
 	{
@@ -479,19 +507,23 @@ heapfetchbuf(HeapScanDesc scan, ScanDirection dir)
 	 */
 	CHECK_FOR_INTERRUPTS();
 
-	if (unlikely(!scan->rs_inited))
+	/*
+	 * If the scan direction is changing, reset the prefetch block to the
+	 * current block. Otherwise, we will incorrectly prefetch the blocks
+	 * between the prefetch block and the current block again before
+	 * prefetching blocks in the new, correct scan direction.
+	 */
+	if (unlikely(scan->rs_dir != dir))
 	{
-		scan->rs_cblock = heapgettup_initial_block(scan, dir);
-		Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
-		scan->rs_inited = true;
+		scan->rs_prefetch_block = scan->rs_cblock;
+		read_stream_reset(scan->rs_read_stream);
 	}
-	else
-		scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir);
 
-	/* read block if valid */
-	if (BlockNumberIsValid(scan->rs_cblock))
-		scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
-										   scan->rs_cblock, RBM_NORMAL, scan->rs_strategy);
+	scan->rs_dir = dir;
+
+	scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL);
+	if (BufferIsValid(scan->rs_cbuf))
+		scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
 }
 
 /*
@@ -820,6 +852,7 @@ continue_page:
 
 	scan->rs_cbuf = InvalidBuffer;
 	scan->rs_cblock = InvalidBlockNumber;
+	scan->rs_prefetch_block = InvalidBlockNumber;
 	tuple->t_data = NULL;
 	scan->rs_inited = false;
 }
@@ -910,6 +943,7 @@ continue_page:
 		ReleaseBuffer(scan->rs_cbuf);
 	scan->rs_cbuf = InvalidBuffer;
 	scan->rs_cblock = InvalidBlockNumber;
+	scan->rs_prefetch_block = InvalidBlockNumber;
 	tuple->t_data = NULL;
 	scan->rs_inited = false;
 }
@@ -1003,6 +1037,28 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 
 	initscan(scan, key, false);
 
+	scan->rs_read_stream = NULL;
+
+	/*
+	 * For sequential scans and TID range scans, we will set up a read stream.
+	 * We do not know the scan direction yet. If the scan does not end up
+	 * being a forward scan, the read stream will be freed. This should be
+	 * done after initscan() because initscan() allocates the
+	 * BufferAccessStrategy object.
+	 */
+	if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN ||
+		scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN)
+	{
+		scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL,
+														  scan->rs_strategy,
+														  BMR_REL(scan->rs_base.rs_rd),
+														  MAIN_FORKNUM,
+														  heap_scan_stream_read_next,
+														  scan,
+														  0);
+	}
+
+
 	return (TableScanDesc) scan;
 }
 
@@ -1037,6 +1093,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
 	if (BufferIsValid(scan->rs_cbuf))
 		ReleaseBuffer(scan->rs_cbuf);
 
+	/*
+	 * The read stream is reset on rescan. This must be done before
+	 * initscan(), as some state referred to by read_stream_reset() is reset
+	 * in initscan().
+	 */
+	if (scan->rs_read_stream)
+		read_stream_reset(scan->rs_read_stream);
+
 	/*
 	 * reinitialize scan descriptor
 	 */
@@ -1056,6 +1120,12 @@ heap_endscan(TableScanDesc sscan)
 	if (BufferIsValid(scan->rs_cbuf))
 		ReleaseBuffer(scan->rs_cbuf);
 
+	/*
+	 * Must free the read stream before freeing the BufferAccessStrategy.
+	 */
+	if (scan->rs_read_stream)
+		read_stream_end(scan->rs_read_stream);
+
 	/*
 	 * decrement relation reference count and free scan descriptor storage
 	 */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 5a6d22a4732..3ec78672587 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -25,6 +25,7 @@
 #include "storage/bufpage.h"
 #include "storage/dsm.h"
 #include "storage/lockdefs.h"
+#include "storage/read_stream.h"
 #include "storage/shm_toc.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
@@ -66,6 +67,20 @@ typedef struct HeapScanDescData
 
 	HeapTupleData rs_ctup;		/* current tuple in scan, if any */
 
+	/* For scans that stream reads */
+	ReadStream *rs_read_stream;
+
+	/*
+	 * For sequential scans and TID range scans to stream reads. The read
+	 * stream is allocated at the beginning of the scan and reset on rescan or
+	 * when the scan direction changes. The scan direction is saved each time
+	 * a new page is requested. If the scan direction changes from one page to
+	 * the next, the read stream releases all previously pinned buffers and
+	 * resets the prefetch block.
+	 */
+	ScanDirection rs_dir;
+	BlockNumber rs_prefetch_block;
+
 	/*
 	 * For parallel scans to store page allocation data.  NULL when not
 	 * performing a parallel scan.
-- 
2.40.1

