From ca7f60419aecdc07c7145d62e9d5033087598fbb Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Mon, 29 Jan 2024 11:50:01 -0500
Subject: [PATCH v7 2/4] Replace blocks with buffers in heapgettup control flow

Future commits will introduce the streaming read API and the sequential
scan streaming read API user. Streaming read API users implement a
callback which returns the next block to read. Sequential scans
previously looped through the blocks in the relation, synchronously
reading in a block and then processing it. An InvalidBlockNumber
returned by heapgettup_advance_block() meant that the relation was
exhausted and all blocks had been processed.

The streaming read API may exhaust the blocks in a relation (having read
all of them into buffers) before they have all been processed by the
sequential scan. As such, the sequential scan should continue processing
blocks until heapfetchbuf() returns InvalidBuffer.

Note that this commit does not implement the streaming read API user. It
simply restructures heapgettup() and heapgettup_pagemode() to use
buffers instead of blocks for control flow.

Not all sequential scans will support streaming reads. As such, this
code will remain for compatability even after sequential scans support
streaming reads.
---
 src/backend/access/heap/heapam.c | 79 ++++++++++++++------------------
 1 file changed, 35 insertions(+), 44 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 22ffa541ef7..46645dc971f 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -83,6 +83,9 @@ static Bitmapset *HeapDetermineColumnsInfo(Relation relation,
 static bool heap_acquire_tuplock(Relation relation, ItemPointer tid,
 								 LockTupleMode mode, LockWaitPolicy wait_policy,
 								 bool *have_tuple_lock);
+static inline BlockNumber heapgettup_advance_block(HeapScanDesc scan,
+												   BlockNumber block, ScanDirection dir);
+static inline BlockNumber heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir);
 static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask,
 									  uint16 old_infomask2, TransactionId add_to_xmax,
 									  LockTupleMode mode, bool is_update,
@@ -456,14 +459,12 @@ heapbuildvis(TableScanDesc sscan)
 /*
  * heapfetchbuf - subroutine for heapgettup()
  *
- * This routine reads the specified block of the relation into a buffer and
- * returns with that pinned buffer saved in the scan descriptor.
+ * This routine reads the next block of the relation into a buffer and returns
+ * with that pinned buffer saved in the scan descriptor.
  */
 static pg_attribute_always_inline void
-heapfetchbuf(HeapScanDesc scan, BlockNumber block)
+heapfetchbuf(HeapScanDesc scan, ScanDirection dir)
 {
-	Assert(block < scan->rs_nblocks);
-
 	/* release previous scan buffer, if any */
 	if (BufferIsValid(scan->rs_cbuf))
 	{
@@ -478,10 +479,19 @@ heapfetchbuf(HeapScanDesc scan, BlockNumber block)
 	 */
 	CHECK_FOR_INTERRUPTS();
 
-	/* read page using selected strategy */
-	scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, block,
-									   RBM_NORMAL, scan->rs_strategy);
-	scan->rs_cblock = block;
+	if (unlikely(!scan->rs_inited))
+	{
+		scan->rs_cblock = heapgettup_initial_block(scan, dir);
+		Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
+		scan->rs_inited = true;
+	}
+	else
+		scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir);
+
+	/* read block if valid */
+	if (BlockNumberIsValid(scan->rs_cblock))
+		scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
+										   scan->rs_cblock, RBM_NORMAL, scan->rs_strategy);
 }
 
 /*
@@ -491,7 +501,7 @@ heapfetchbuf(HeapScanDesc scan, BlockNumber block)
  * occur with empty tables and in parallel scans when parallel workers get all
  * of the pages before we can get a chance to get our first page.
  */
-static BlockNumber
+BlockNumber
 heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
 {
 	Assert(!scan->rs_inited);
@@ -631,7 +641,7 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft,
  * This also adjusts rs_numblocks when a limit has been imposed by
  * heap_setscanlimits().
  */
-static inline BlockNumber
+BlockNumber
 heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir)
 {
 	if (ScanDirectionIsForward(dir))
@@ -729,23 +739,13 @@ heapgettup(HeapScanDesc scan,
 		   ScanKey key)
 {
 	HeapTuple	tuple = &(scan->rs_ctup);
-	BlockNumber block;
 	Page		page;
 	OffsetNumber lineoff;
 	int			linesleft;
 
-	if (unlikely(!scan->rs_inited))
-	{
-		block = heapgettup_initial_block(scan, dir);
-		/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
-		Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
-		scan->rs_inited = true;
-	}
-	else
+	if (likely(scan->rs_inited))
 	{
 		/* continue from previously returned page/tuple */
-		block = scan->rs_cblock;
-
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 		page = heapgettup_continue_page(scan, dir, &linesleft, &lineoff);
 		goto continue_page;
@@ -755,9 +755,12 @@ heapgettup(HeapScanDesc scan,
 	 * advance the scan until we find a qualifying tuple or run out of stuff
 	 * to scan
 	 */
-	while (block != InvalidBlockNumber)
+	while (true)
 	{
-		heapfetchbuf(scan, block);
+		heapfetchbuf(scan, dir);
+		if (!BufferIsValid(scan->rs_cbuf))
+			break;
+		Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock);
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 		page = heapgettup_start_page(scan, dir, &linesleft, &lineoff);
 continue_page:
@@ -779,7 +782,7 @@ continue_page:
 
 			tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp);
 			tuple->t_len = ItemIdGetLength(lpp);
-			ItemPointerSet(&(tuple->t_self), block, lineoff);
+			ItemPointerSet(&(tuple->t_self), scan->rs_cblock, lineoff);
 
 			visible = HeapTupleSatisfiesVisibility(tuple,
 												   scan->rs_base.rs_snapshot,
@@ -809,9 +812,6 @@ continue_page:
 		 * it's time to move to the next.
 		 */
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
-
-		/* get the BlockNumber to scan next */
-		block = heapgettup_advance_block(scan, block, dir);
 	}
 
 	/* end of scan */
@@ -844,22 +844,13 @@ heapgettup_pagemode(HeapScanDesc scan,
 					ScanKey key)
 {
 	HeapTuple	tuple = &(scan->rs_ctup);
-	BlockNumber block;
 	Page		page;
 	int			lineindex;
 	int			linesleft;
 
-	if (unlikely(!scan->rs_inited))
-	{
-		block = heapgettup_initial_block(scan, dir);
-		/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
-		Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
-		scan->rs_inited = true;
-	}
-	else
+	if (likely(scan->rs_inited))
 	{
 		/* continue from previously returned page/tuple */
-		block = scan->rs_cblock;	/* current page */
 		page = BufferGetPage(scan->rs_cbuf);
 
 		lineindex = scan->rs_cindex + dir;
@@ -876,9 +867,12 @@ heapgettup_pagemode(HeapScanDesc scan,
 	 * advance the scan until we find a qualifying tuple or run out of stuff
 	 * to scan
 	 */
-	while (block != InvalidBlockNumber)
+	while (true)
 	{
-		heapfetchbuf(scan, block);
+		heapfetchbuf(scan, dir);
+		if (!BufferIsValid(scan->rs_cbuf))
+			break;
+		Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock);
 		heapbuildvis((TableScanDesc) scan);
 		page = BufferGetPage(scan->rs_cbuf);
 		linesleft = scan->rs_ntuples;
@@ -898,7 +892,7 @@ continue_page:
 
 			tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp);
 			tuple->t_len = ItemIdGetLength(lpp);
-			ItemPointerSet(&(tuple->t_self), block, lineoff);
+			ItemPointerSet(&(tuple->t_self), scan->rs_cblock, lineoff);
 
 			/* skip any tuples that don't match the scan key */
 			if (key != NULL &&
@@ -909,9 +903,6 @@ continue_page:
 			scan->rs_cindex = lineindex;
 			return;
 		}
-
-		/* get the BlockNumber to scan next */
-		block = heapgettup_advance_block(scan, block, dir);
 	}
 
 	/* end of scan */
-- 
2.40.1

