From 59fdb0423ddc9032380247f987b682944a52d476 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Tue, 13 Feb 2024 10:57:07 -0500
Subject: [PATCH v2 09/13] Make table_scan_bitmap_next_block() async friendly

table_scan_bitmap_next_block() previously returned false if we did not
wish to call table_scan_bitmap_next_tuple() on the tuples on the page.
This could happen when there were no visible tuples on the page or, due
to concurrent activity on the table, the block returned by the iterator
is past the known end of the table when the scan started.

This forced the caller to be responsible for determining if additional
blocks should be fetched and then for invoking
table_scan_bitmap_next_block() for these blocks.

It makes more sense for table_scan_bitmap_next_block() to be responsible
for finding a block that is not past the end of the table (as of the
time that the scan began) and for table_scan_bitmap_next_tuple() to
return false if there are no visible tuples on the page.

This also allows us to move responsibility for the iterator to table AM
specific code. This means handling invalid blocks is entirely up to
the table AM.

These changes will enable bitmapheapscan to use the future streaming
read API. The table AMs will implement a streaming read API callback
that returns the next block that needs to be fetched. In heap AM's case,
the callback will use the iterator to find the next block to be fetched.
Since choosing the next block will no longer the responsibility of
BitmapHeapNext(), the streaming read control flow requires these changes
to table_scan_bitmap_next_block().
---
 src/backend/access/heap/heapam_handler.c  |  58 +++++++--
 src/backend/executor/nodeBitmapHeapscan.c | 148 ++++++++--------------
 src/include/access/relscan.h              |   5 +
 src/include/access/tableam.h              |  47 +++++--
 src/include/nodes/execnodes.h             |  11 +-
 5 files changed, 145 insertions(+), 124 deletions(-)

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 3af9466b9ca..c8da3def645 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2114,17 +2114,51 @@ heapam_estimate_rel_size(Relation rel, int32 *attr_widths,
 
 static bool
 heapam_scan_bitmap_next_block(TableScanDesc scan,
-							  TBMIterateResult *tbmres)
+							  bool *recheck, BlockNumber *blockno)
 {
 	HeapScanDesc hscan = (HeapScanDesc) scan;
-	BlockNumber block = tbmres->blockno;
+	BlockNumber block;
 	Buffer		buffer;
 	Snapshot	snapshot;
 	int			ntup;
+	TBMIterateResult *tbmres;
 
 	hscan->rs_cindex = 0;
 	hscan->rs_ntuples = 0;
 
+	*blockno = InvalidBlockNumber;
+	*recheck = true;
+
+	do
+	{
+		CHECK_FOR_INTERRUPTS();
+
+		if (scan->shared_tbmiterator)
+			tbmres = tbm_shared_iterate(scan->shared_tbmiterator);
+		else
+			tbmres = tbm_iterate(scan->tbmiterator);
+
+		if (tbmres == NULL)
+		{
+			/* no more entries in the bitmap */
+			Assert(hscan->rs_empty_tuples_pending == 0);
+			return false;
+		}
+
+		/*
+		 * Ignore any claimed entries past what we think is the end of the
+		 * relation. It may have been extended after the start of our scan (we
+		 * only hold an AccessShareLock, and it could be inserts from this
+		 * backend).  We don't take this optimization in SERIALIZABLE
+		 * isolation though, as we need to examine all invisible tuples
+		 * reachable by the index.
+		 */
+	} while (!IsolationIsSerializable() && tbmres->blockno >= hscan->rs_nblocks);
+
+	/* Got a valid block */
+	*blockno = tbmres->blockno;
+	*recheck = tbmres->recheck;
+
 	/*
 	 * We can skip fetching the heap page if we don't need any fields from the
 	 * heap, and the bitmap entries don't need rechecking, and all tuples on
@@ -2143,16 +2177,7 @@ heapam_scan_bitmap_next_block(TableScanDesc scan,
 		return true;
 	}
 
-	/*
-	 * Ignore any claimed entries past what we think is the end of the
-	 * relation. It may have been extended after the start of our scan (we
-	 * only hold an AccessShareLock, and it could be inserts from this
-	 * backend).  We don't take this optimization in SERIALIZABLE isolation
-	 * though, as we need to examine all invisible tuples reachable by the
-	 * index.
-	 */
-	if (!IsolationIsSerializable() && block >= hscan->rs_nblocks)
-		return false;
+	block = tbmres->blockno;
 
 	/*
 	 * Acquire pin on the target heap page, trading in any pin we held before.
@@ -2251,7 +2276,14 @@ heapam_scan_bitmap_next_block(TableScanDesc scan,
 			scan->lossy_pages++;
 	}
 
-	return ntup > 0;
+	/*
+	 * Return true to indicate that a valid block was found and the bitmap is
+	 * not exhausted. If there are no visible tuples on this page,
+	 * hscan->rs_ntuples will be 0 and heapam_scan_bitmap_next_tuple() will
+	 * return false returning control to this function to advance to the next
+	 * block in the bitmap.
+	 */
+	return true;
 }
 
 static bool
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index c62f978f5d7..ae837785116 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -76,7 +76,6 @@ BitmapHeapNext(BitmapHeapScanState *node)
 	ExprContext *econtext;
 	TableScanDesc scan;
 	TIDBitmap  *tbm;
-	TBMIterateResult *tbmres;
 	TupleTableSlot *slot;
 	ParallelBitmapHeapState *pstate = node->pstate;
 	dsa_area   *dsa = node->ss.ps.state->es_query_dsa;
@@ -88,7 +87,6 @@ BitmapHeapNext(BitmapHeapScanState *node)
 	slot = node->ss.ss_ScanTupleSlot;
 	scan = node->ss.ss_currentScanDesc;
 	tbm = node->tbm;
-	tbmres = node->tbmres;
 
 	/*
 	 * If we haven't yet performed the underlying index scan, do it, and begin
@@ -116,7 +114,6 @@ BitmapHeapNext(BitmapHeapScanState *node)
 
 			node->tbm = tbm;
 			tbmiterator = tbm_begin_iterate(tbm);
-			node->tbmres = tbmres = NULL;
 
 #ifdef USE_PREFETCH
 			if (node->prefetch_maximum > 0)
@@ -169,7 +166,6 @@ BitmapHeapNext(BitmapHeapScanState *node)
 
 			/* Allocate a private iterator and attach the shared state to it */
 			shared_tbmiterator = tbm_attach_shared_iterate(dsa, pstate->tbmiterator);
-			node->tbmres = tbmres = NULL;
 
 #ifdef USE_PREFETCH
 			if (node->prefetch_maximum > 0)
@@ -214,46 +210,24 @@ BitmapHeapNext(BitmapHeapScanState *node)
 																	extra_flags);
 		}
 
-		node->tbmiterator = tbmiterator;
-		node->shared_tbmiterator = shared_tbmiterator;
+		scan->tbmiterator = tbmiterator;
+		scan->shared_tbmiterator = shared_tbmiterator;
+
 		node->initialized = true;
+
+		/* Get the first block. if none, end of scan */
+		if (!table_scan_bitmap_next_block(scan, &node->recheck, &node->blockno))
+			return ExecClearTuple(slot);
+
+		BitmapAdjustPrefetchIterator(node, node->blockno);
+		BitmapAdjustPrefetchTarget(node);
 	}
 
 	for (;;)
 	{
-		CHECK_FOR_INTERRUPTS();
-
-		/*
-		 * Get next page of results if needed
-		 */
-		if (tbmres == NULL)
-		{
-			if (!pstate)
-				node->tbmres = tbmres = tbm_iterate(node->tbmiterator);
-			else
-				node->tbmres = tbmres = tbm_shared_iterate(node->shared_tbmiterator);
-			if (tbmres == NULL)
-			{
-				/* no more entries in the bitmap */
-				break;
-			}
-
-			BitmapAdjustPrefetchIterator(node, tbmres->blockno);
-
-			if (!table_scan_bitmap_next_block(scan, tbmres))
-			{
-				/* AM doesn't think this block is valid, skip */
-				continue;
-			}
-
-			/* Adjust the prefetch target */
-			BitmapAdjustPrefetchTarget(node);
-		}
-		else
+		while (table_scan_bitmap_next_tuple(scan, slot))
 		{
-			/*
-			 * Continuing in previously obtained page.
-			 */
+			CHECK_FOR_INTERRUPTS();
 
 #ifdef USE_PREFETCH
 
@@ -275,49 +249,41 @@ BitmapHeapNext(BitmapHeapScanState *node)
 				SpinLockRelease(&pstate->mutex);
 			}
 #endif							/* USE_PREFETCH */
-		}
 
-		/*
-		 * We issue prefetch requests *after* fetching the current page to try
-		 * to avoid having prefetching interfere with the main I/O. Also, this
-		 * should happen only when we have determined there is still something
-		 * to do on the current page, else we may uselessly prefetch the same
-		 * page we are just about to request for real.
-		 *
-		 * XXX: It's a layering violation that we do these checks above
-		 * tableam, they should probably moved below it at some point.
-		 */
-		BitmapPrefetch(node, scan);
-
-		/*
-		 * Attempt to fetch tuple from AM.
-		 */
-		if (!table_scan_bitmap_next_tuple(scan, slot))
-		{
-			/* nothing more to look at on this page */
-			node->tbmres = tbmres = NULL;
-			continue;
-		}
+			/*
+			 * We prefetch before fetching the current pages. We expect that a
+			 * future streaming read API will do this, so do it now for
+			 * consistency.
+			 */
+			BitmapPrefetch(node, scan);
 
-		/*
-		 * If we are using lossy info, we have to recheck the qual conditions
-		 * at every tuple.
-		 */
-		if (tbmres->recheck)
-		{
-			econtext->ecxt_scantuple = slot;
-			if (!ExecQualAndReset(node->bitmapqualorig, econtext))
+			/*
+			 * If we are using lossy info, we have to recheck the qual
+			 * conditions at every tuple.
+			 */
+			if (node->recheck)
 			{
-				/* Fails recheck, so drop it and loop back for another */
-				InstrCountFiltered2(node, 1);
-				ExecClearTuple(slot);
-				continue;
+				econtext->ecxt_scantuple = slot;
+				if (!ExecQualAndReset(node->bitmapqualorig, econtext))
+				{
+					/* Fails recheck, so drop it and loop back for another */
+					InstrCountFiltered2(node, 1);
+					ExecClearTuple(slot);
+					continue;
+				}
 			}
+
+			/* OK to return this tuple */
+			BitmapAccumCounters(node, scan);
+			return slot;
 		}
 
-		/* OK to return this tuple */
-		BitmapAccumCounters(node, scan);
-		return slot;
+		if (!table_scan_bitmap_next_block(scan, &node->recheck, &node->blockno))
+			break;
+
+		BitmapAdjustPrefetchIterator(node, node->blockno);
+		/* Adjust the prefetch target */
+		BitmapAdjustPrefetchTarget(node);
 	}
 
 	/*
@@ -608,12 +574,8 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
 		table_rescan(node->ss.ss_currentScanDesc, NULL);
 
 	/* release bitmaps and buffers if any */
-	if (node->tbmiterator)
-		tbm_end_iterate(node->tbmiterator);
 	if (node->prefetch_iterator)
 		tbm_end_iterate(node->prefetch_iterator);
-	if (node->shared_tbmiterator)
-		tbm_end_shared_iterate(node->shared_tbmiterator);
 	if (node->shared_prefetch_iterator)
 		tbm_end_shared_iterate(node->shared_prefetch_iterator);
 	if (node->tbm)
@@ -621,13 +583,12 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
 	if (node->pvmbuffer != InvalidBuffer)
 		ReleaseBuffer(node->pvmbuffer);
 	node->tbm = NULL;
-	node->tbmiterator = NULL;
-	node->tbmres = NULL;
 	node->prefetch_iterator = NULL;
 	node->initialized = false;
-	node->shared_tbmiterator = NULL;
 	node->shared_prefetch_iterator = NULL;
 	node->pvmbuffer = InvalidBuffer;
+	node->recheck = true;
+	node->blockno = InvalidBlockNumber;
 
 	ExecScanReScan(&node->ss);
 
@@ -658,28 +619,24 @@ ExecEndBitmapHeapScan(BitmapHeapScanState *node)
 	 */
 	ExecEndNode(outerPlanState(node));
 
+
+	/*
+	 * close heap scan
+	 */
+	if (scanDesc)
+		table_endscan(scanDesc);
+
 	/*
 	 * release bitmaps and buffers if any
 	 */
-	if (node->tbmiterator)
-		tbm_end_iterate(node->tbmiterator);
 	if (node->prefetch_iterator)
 		tbm_end_iterate(node->prefetch_iterator);
 	if (node->tbm)
 		tbm_free(node->tbm);
-	if (node->shared_tbmiterator)
-		tbm_end_shared_iterate(node->shared_tbmiterator);
 	if (node->shared_prefetch_iterator)
 		tbm_end_shared_iterate(node->shared_prefetch_iterator);
 	if (node->pvmbuffer != InvalidBuffer)
 		ReleaseBuffer(node->pvmbuffer);
-
-	/*
-	 * close heap scan
-	 */
-	if (scanDesc)
-		table_endscan(scanDesc);
-
 }
 
 /* ----------------------------------------------------------------
@@ -712,8 +669,6 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
 	scanstate->ss.ps.ExecProcNode = ExecBitmapHeapScan;
 
 	scanstate->tbm = NULL;
-	scanstate->tbmiterator = NULL;
-	scanstate->tbmres = NULL;
 	scanstate->pvmbuffer = InvalidBuffer;
 	scanstate->exact_pages = 0;
 	scanstate->lossy_pages = 0;
@@ -722,10 +677,11 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
 	scanstate->prefetch_target = 0;
 	scanstate->pscan_len = 0;
 	scanstate->initialized = false;
-	scanstate->shared_tbmiterator = NULL;
 	scanstate->shared_prefetch_iterator = NULL;
 	scanstate->pstate = NULL;
 	scanstate->worker_snapshot = NULL;
+	scanstate->recheck = true;
+	scanstate->blockno = InvalidBlockNumber;
 
 	/*
 	 * Miscellaneous initialization
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index b74e08dd745..5dea9c7a03d 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -24,6 +24,9 @@
 
 struct ParallelTableScanDescData;
 
+struct TBMIterator;
+struct TBMSharedIterator;
+
 /*
  * Generic descriptor for table scans. This is the base-class for table scans,
  * which needs to be embedded in the scans of individual AMs.
@@ -41,6 +44,8 @@ typedef struct TableScanDescData
 	ItemPointerData rs_maxtid;
 
 	/* Only used for Bitmap table scans */
+	struct TBMIterator *tbmiterator;
+	struct TBMSharedIterator *shared_tbmiterator;
 	long		exact_pages;
 	long		lossy_pages;
 
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index a3e30c4eda7..ef1fcc02b1a 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -21,6 +21,7 @@
 #include "access/sdir.h"
 #include "access/xact.h"
 #include "executor/tuptable.h"
+#include "nodes/tidbitmap.h"
 #include "utils/rel.h"
 #include "utils/snapshot.h"
 
@@ -804,7 +805,7 @@ typedef struct TableAmRoutine
 	 * scan_bitmap_next_tuple need to exist, or neither.
 	 */
 	bool		(*scan_bitmap_next_block) (TableScanDesc scan,
-										   struct TBMIterateResult *tbmres);
+										   bool *recheck, BlockNumber *blockno);
 
 	/*
 	 * Fetch the next tuple of a bitmap table scan into `slot` and return true
@@ -948,6 +949,8 @@ table_beginscan_bm(Relation rel, Snapshot snapshot,
 	result = rel->rd_tableam->scan_begin(rel, snapshot, nkeys, key, NULL, flags);
 	result->lossy_pages = 0;
 	result->exact_pages = 0;
+	result->shared_tbmiterator = NULL;
+	result->tbmiterator = NULL;
 	return result;
 }
 
@@ -1008,6 +1011,21 @@ table_beginscan_analyze(Relation rel)
 static inline void
 table_endscan(TableScanDesc scan)
 {
+	if (scan->rs_flags & SO_TYPE_BITMAPSCAN)
+	{
+		if (scan->shared_tbmiterator)
+		{
+			tbm_end_shared_iterate(scan->shared_tbmiterator);
+			scan->shared_tbmiterator = NULL;
+		}
+
+		if (scan->tbmiterator)
+		{
+			tbm_end_iterate(scan->tbmiterator);
+			scan->tbmiterator = NULL;
+		}
+	}
+
 	scan->rs_rd->rd_tableam->scan_end(scan);
 }
 
@@ -1018,6 +1036,21 @@ static inline void
 table_rescan(TableScanDesc scan,
 			 struct ScanKeyData *key)
 {
+	if (scan->rs_flags & SO_TYPE_BITMAPSCAN)
+	{
+		if (scan->shared_tbmiterator)
+		{
+			tbm_end_shared_iterate(scan->shared_tbmiterator);
+			scan->shared_tbmiterator = NULL;
+		}
+
+		if (scan->tbmiterator)
+		{
+			tbm_end_iterate(scan->tbmiterator);
+			scan->tbmiterator = NULL;
+		}
+	}
+
 	scan->rs_rd->rd_tableam->scan_rescan(scan, key, false, false, false, false);
 }
 
@@ -1941,17 +1974,16 @@ table_relation_estimate_size(Relation rel, int32 *attr_widths,
  */
 
 /*
- * Prepare to fetch / check / return tuples from `tbmres->blockno` as part of
- * a bitmap table scan. `scan` needs to have been started via
- * table_beginscan_bm(). Returns false if there are no tuples to be found on
- * the page, true otherwise.
+ * Prepare to fetch / check / return tuples as part of a bitmap table scan.
+ * `scan` needs to have been started via table_beginscan_bm(). Returns false if
+ * there are no more blocks in the bitmap, true otherwise.
  *
  * Note, this is an optionally implemented function, therefore should only be
  * used after verifying the presence (at plan time or such).
  */
 static inline bool
 table_scan_bitmap_next_block(TableScanDesc scan,
-							 struct TBMIterateResult *tbmres)
+							 bool *recheck, BlockNumber *blockno)
 {
 	/*
 	 * We don't expect direct calls to table_scan_bitmap_next_block with valid
@@ -1961,8 +1993,7 @@ table_scan_bitmap_next_block(TableScanDesc scan,
 	if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
 		elog(ERROR, "unexpected table_scan_bitmap_next_block call during logical decoding");
 
-	return scan->rs_rd->rd_tableam->scan_bitmap_next_block(scan,
-														   tbmres);
+	return scan->rs_rd->rd_tableam->scan_bitmap_next_block(scan, recheck, blockno);
 }
 
 /*
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 9392923eb32..a59df51dd69 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1709,9 +1709,7 @@ typedef struct ParallelBitmapHeapState
  *
  *		bitmapqualorig	   execution state for bitmapqualorig expressions
  *		tbm				   bitmap obtained from child index scan(s)
- *		tbmiterator		   iterator for scanning current pages
- *		tbmres			   current-page data
- *		pvmbuffer		   ditto, for prefetched pages
+ *		pvmbuffer		   buffer for visibility-map lookups of prefetched pages
  *		exact_pages		   total number of exact pages retrieved
  *		lossy_pages		   total number of lossy pages retrieved
  *		prefetch_iterator  iterator for prefetching ahead of current page
@@ -1720,10 +1718,10 @@ typedef struct ParallelBitmapHeapState
  *		prefetch_maximum   maximum value for prefetch_target
  *		pscan_len		   size of the shared memory for parallel bitmap
  *		initialized		   is node is ready to iterate
- *		shared_tbmiterator	   shared iterator
  *		shared_prefetch_iterator shared iterator for prefetching
  *		pstate			   shared state for parallel bitmap scan
  *		worker_snapshot	   snapshot for parallel worker
+ *		recheck			   do current page's tuples need recheck
  * ----------------
  */
 typedef struct BitmapHeapScanState
@@ -1731,8 +1729,6 @@ typedef struct BitmapHeapScanState
 	ScanState	ss;				/* its first field is NodeTag */
 	ExprState  *bitmapqualorig;
 	TIDBitmap  *tbm;
-	TBMIterator *tbmiterator;
-	TBMIterateResult *tbmres;
 	Buffer		pvmbuffer;
 	long		exact_pages;
 	long		lossy_pages;
@@ -1742,10 +1738,11 @@ typedef struct BitmapHeapScanState
 	int			prefetch_maximum;
 	Size		pscan_len;
 	bool		initialized;
-	TBMSharedIterator *shared_tbmiterator;
 	TBMSharedIterator *shared_prefetch_iterator;
 	ParallelBitmapHeapState *pstate;
 	Snapshot	worker_snapshot;
+	bool		recheck;
+	BlockNumber blockno;
 } BitmapHeapScanState;
 
 /* ----------------
-- 
2.37.2

