From 42475042baa09b3fd3ddb8f39f7a6304c49449de Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Thu, 15 Feb 2024 21:04:18 -0500
Subject: [PATCH v6 14/14] BitmapHeapScan uses streaming read API

Remove all of the code to do prefetching from BitmapHeapScan code and
rely on the streaming read API prefetching. Heap table AM implements a
streaming read callback which uses the iterator to get the next valid
block that needs to be fetched for the streaming read API.
---
 src/backend/access/heap/heapam.c          |  68 +++++
 src/backend/access/heap/heapam_handler.c  |  88 +++---
 src/backend/executor/nodeBitmapHeapscan.c | 352 +---------------------
 src/include/access/heapam.h               |   4 +
 src/include/access/tableam.h              |  22 +-
 src/include/nodes/execnodes.h             |  22 --
 6 files changed, 116 insertions(+), 440 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index e7bed84f75b..0f370bfec3e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -111,6 +111,8 @@ static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
 static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 										bool *copy);
 
+static BlockNumber bitmapheap_pgsr_next(PgStreamingRead *pgsr, void *pgsr_private,
+										void *per_buffer_data);
 
 /*
  * Each tuple lock mode has a corresponding heavyweight lock, and one or two
@@ -331,6 +333,22 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
 	if (key != NULL && scan->rs_base.rs_nkeys > 0)
 		memcpy(scan->rs_base.rs_key, key, scan->rs_base.rs_nkeys * sizeof(ScanKeyData));
 
+	if (scan->rs_base.rs_flags & SO_TYPE_BITMAPSCAN)
+	{
+		if (scan->rs_pgsr)
+			pg_streaming_read_free(scan->rs_pgsr);
+
+		scan->rs_pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT,
+													   scan,
+													   sizeof(TBMIterateResult),
+													   scan->rs_strategy,
+													   BMR_REL(scan->rs_base.rs_rd),
+													   MAIN_FORKNUM,
+													   bitmapheap_pgsr_next);
+
+
+	}
+
 	/*
 	 * Currently, we only have a stats counter for sequential heap scans (but
 	 * e.g for bitmap scans the underlying bitmap index scans will be counted,
@@ -951,6 +969,7 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 	scan->rs_base.rs_flags = flags;
 	scan->rs_base.rs_parallel = parallel_scan;
 	scan->rs_strategy = NULL;	/* set in initscan */
+	scan->rs_pgsr = NULL;
 	scan->rs_vmbuffer = InvalidBuffer;
 	scan->rs_empty_tuples_pending = 0;
 
@@ -1089,6 +1108,9 @@ heap_endscan(TableScanDesc sscan)
 	if (scan->rs_base.rs_flags & SO_TEMP_SNAPSHOT)
 		UnregisterSnapshot(scan->rs_base.rs_snapshot);
 
+	if (scan->rs_base.rs_flags & SO_TYPE_BITMAPSCAN && scan->rs_pgsr)
+		pg_streaming_read_free(scan->rs_pgsr);
+
 	pfree(scan);
 }
 
@@ -10246,3 +10268,49 @@ HeapCheckForSerializableConflictOut(bool visible, Relation relation,
 
 	CheckForSerializableConflictOut(relation, xid, snapshot);
 }
+
+static BlockNumber
+bitmapheap_pgsr_next(PgStreamingRead *pgsr, void *pgsr_private,
+					 void *per_buffer_data)
+{
+	TBMIterateResult *tbmres = per_buffer_data;
+	HeapScanDesc hdesc = (HeapScanDesc) pgsr_private;
+
+	for (;;)
+	{
+		CHECK_FOR_INTERRUPTS();
+
+		if (hdesc->rs_base.shared_tbmiterator)
+			tbm_shared_iterate(hdesc->rs_base.shared_tbmiterator, tbmres);
+		else
+			tbm_iterate(hdesc->rs_base.tbmiterator, tbmres);
+
+		/* no more entries in the bitmap */
+		if (!BlockNumberIsValid(tbmres->blockno))
+			return InvalidBlockNumber;
+
+		/*
+		 * Ignore any claimed entries past what we think is the end of the
+		 * relation. It may have been extended after the start of our scan (we
+		 * only hold an AccessShareLock, and it could be inserts from this
+		 * backend).  We don't take this optimization in SERIALIZABLE
+		 * isolation though, as we need to examine all invisible tuples
+		 * reachable by the index.
+		 */
+		if (!IsolationIsSerializable() && tbmres->blockno >= hdesc->rs_nblocks)
+			continue;
+
+		if (hdesc->rs_base.rs_flags & SO_CAN_SKIP_FETCH &&
+			!tbmres->recheck &&
+			VM_ALL_VISIBLE(hdesc->rs_base.rs_rd, tbmres->blockno, &hdesc->rs_vmbuffer))
+		{
+			hdesc->rs_empty_tuples_pending += tbmres->ntuples;
+			continue;
+		}
+
+		return tbmres->blockno;
+	}
+
+	/* not reachable */
+	Assert(false);
+}
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 022753e203a..9727613e87f 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2111,79 +2111,65 @@ heapam_estimate_rel_size(Relation rel, int32 *attr_widths,
  */
 
 static bool
-heapam_scan_bitmap_next_block(TableScanDesc scan,
-							  bool *recheck, bool *lossy, BlockNumber *blockno)
+heapam_scan_bitmap_next_block(TableScanDesc scan, bool *recheck, bool *lossy)
 {
 	HeapScanDesc hscan = (HeapScanDesc) scan;
+	void	   *io_private;
 	BlockNumber block;
 	Buffer		buffer;
 	Snapshot	snapshot;
 	int			ntup;
-	TBMIterateResult tbmres;
+	TBMIterateResult *tbmres;
+
+	Assert(hscan->rs_pgsr);
 
 	hscan->rs_cindex = 0;
 	hscan->rs_ntuples = 0;
 
-	*blockno = InvalidBlockNumber;
 	*recheck = true;
 
-	do
+	/* Release buffer containing previous block. */
+	if (BufferIsValid(hscan->rs_cbuf))
 	{
-		CHECK_FOR_INTERRUPTS();
+		ReleaseBuffer(hscan->rs_cbuf);
+		hscan->rs_cbuf = InvalidBuffer;
+	}
 
-		if (scan->shared_tbmiterator)
-			tbm_shared_iterate(scan->shared_tbmiterator, &tbmres);
-		else
-			tbm_iterate(scan->tbmiterator, &tbmres);
+	hscan->rs_cbuf = pg_streaming_read_buffer_get_next(hscan->rs_pgsr, &io_private);
 
-		if (!BlockNumberIsValid(tbmres.blockno))
+	if (BufferIsInvalid(hscan->rs_cbuf))
+	{
+		if (BufferIsValid(hscan->rs_vmbuffer))
 		{
-			/* no more entries in the bitmap */
-			Assert(hscan->rs_empty_tuples_pending == 0);
-			return false;
+			ReleaseBuffer(hscan->rs_vmbuffer);
+			hscan->rs_vmbuffer = InvalidBuffer;
 		}
 
 		/*
-		 * Ignore any claimed entries past what we think is the end of the
-		 * relation. It may have been extended after the start of our scan (we
-		 * only hold an AccessShareLock, and it could be inserts from this
-		 * backend).  We don't take this optimization in SERIALIZABLE
-		 * isolation though, as we need to examine all invisible tuples
-		 * reachable by the index.
+		 * Bitmap is exhausted. Time to emit empty tuples if relevant. We emit
+		 * all empty tuples at the end instead of emitting them per block we
+		 * skip fetching. This is necessary because the streaming read API
+		 * will only return TBMIterateResults for blocks actually fetched.
+		 * When we skip fetching a block, we keep track of how many empty
+		 * tuples to emit at the end of the BitmapHeapScan. We do not recheck
+		 * all NULL tuples.
 		 */
-	} while (!IsolationIsSerializable() && tbmres.blockno >= hscan->rs_nblocks);
+		*recheck = false;
+		return hscan->rs_empty_tuples_pending > 0;
+	}
 
-	/* Got a valid block */
-	*blockno = tbmres.blockno;
-	*recheck = tbmres.recheck;
+	Assert(io_private);
 
-	/*
-	 * We can skip fetching the heap page if we don't need any fields from the
-	 * heap, and the bitmap entries don't need rechecking, and all tuples on
-	 * the page are visible to our transaction.
-	 */
-	if (scan->rs_flags & SO_CAN_SKIP_FETCH &&
-		!tbmres.recheck &&
-		VM_ALL_VISIBLE(scan->rs_rd, tbmres.blockno, &hscan->rs_vmbuffer))
-	{
-		/* can't be lossy in the skip_fetch case */
-		Assert(tbmres.ntuples >= 0);
-		Assert(hscan->rs_empty_tuples_pending >= 0);
+	tbmres = io_private;
 
-		hscan->rs_empty_tuples_pending += tbmres.ntuples;
+	Assert(BufferGetBlockNumber(hscan->rs_cbuf) == tbmres->blockno);
 
-		return true;
-	}
+	*recheck = tbmres->recheck;
 
-	block = tbmres.blockno;
+	hscan->rs_cblock = tbmres->blockno;
+	hscan->rs_ntuples = tbmres->ntuples;
 
-	/*
-	 * Acquire pin on the target heap page, trading in any pin we held before.
-	 */
-	hscan->rs_cbuf = ReleaseAndReadBuffer(hscan->rs_cbuf,
-										  scan->rs_rd,
-										  block);
-	hscan->rs_cblock = block;
+	block = tbmres->blockno;
 	buffer = hscan->rs_cbuf;
 	snapshot = scan->rs_snapshot;
 
@@ -2204,7 +2190,7 @@ heapam_scan_bitmap_next_block(TableScanDesc scan,
 	/*
 	 * We need two separate strategies for lossy and non-lossy cases.
 	 */
-	if (tbmres.ntuples >= 0)
+	if (tbmres->ntuples >= 0)
 	{
 		/*
 		 * Bitmap is non-lossy, so we just look through the offsets listed in
@@ -2213,9 +2199,9 @@ heapam_scan_bitmap_next_block(TableScanDesc scan,
 		 */
 		int			curslot;
 
-		for (curslot = 0; curslot < tbmres.ntuples; curslot++)
+		for (curslot = 0; curslot < tbmres->ntuples; curslot++)
 		{
-			OffsetNumber offnum = tbmres.offsets[curslot];
+			OffsetNumber offnum = tbmres->offsets[curslot];
 			ItemPointerData tid;
 			HeapTupleData heapTuple;
 
@@ -2265,7 +2251,7 @@ heapam_scan_bitmap_next_block(TableScanDesc scan,
 	Assert(ntup <= MaxHeapTuplesPerPage);
 	hscan->rs_ntuples = ntup;
 
-	*lossy = tbmres.ntuples < 0;
+	*lossy = tbmres->ntuples < 0;
 
 	/*
 	 * Return true to indicate that a valid block was found and the bitmap is
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index b3b9448627b..c437718f980 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -51,10 +51,6 @@
 
 static TupleTableSlot *BitmapHeapNext(BitmapHeapScanState *node);
 static inline void BitmapDoneInitializingSharedState(ParallelBitmapHeapState *pstate);
-static inline void BitmapAdjustPrefetchIterator(BitmapHeapScanState *node);
-static inline void BitmapAdjustPrefetchTarget(BitmapHeapScanState *node);
-static inline void BitmapPrefetch(BitmapHeapScanState *node,
-								  TableScanDesc scan);
 static bool BitmapShouldInitializeSharedState(ParallelBitmapHeapState *pstate);
 
 
@@ -86,14 +82,6 @@ BitmapHeapNext(BitmapHeapScanState *node)
 	/*
 	 * If we haven't yet performed the underlying index scan, do it, and begin
 	 * the iteration over the bitmap.
-	 *
-	 * For prefetching, we use *two* iterators, one for the pages we are
-	 * actually scanning and another that runs ahead of the first for
-	 * prefetching.  node->prefetch_pages tracks exactly how many pages ahead
-	 * the prefetch iterator is.  Also, node->prefetch_target tracks the
-	 * desired prefetch distance, which starts small and increases up to the
-	 * node->prefetch_maximum.  This is to avoid doing a lot of prefetching in
-	 * a scan that stops after a few tuples because of a LIMIT.
 	 */
 	if (!node->initialized)
 	{
@@ -109,15 +97,6 @@ BitmapHeapNext(BitmapHeapScanState *node)
 
 			node->tbm = tbm;
 			tbmiterator = tbm_begin_iterate(tbm);
-
-#ifdef USE_PREFETCH
-			if (node->prefetch_maximum > 0)
-			{
-				node->prefetch_iterator = tbm_begin_iterate(tbm);
-				node->prefetch_pages = 0;
-				node->prefetch_target = -1;
-			}
-#endif							/* USE_PREFETCH */
 		}
 		else
 		{
@@ -140,20 +119,6 @@ BitmapHeapNext(BitmapHeapScanState *node)
 				 * multiple processes to iterate jointly.
 				 */
 				pstate->tbmiterator = tbm_prepare_shared_iterate(tbm);
-#ifdef USE_PREFETCH
-				if (node->prefetch_maximum > 0)
-				{
-					pstate->prefetch_iterator =
-						tbm_prepare_shared_iterate(tbm);
-
-					/*
-					 * We don't need the mutex here as we haven't yet woke up
-					 * others.
-					 */
-					pstate->prefetch_pages = 0;
-					pstate->prefetch_target = -1;
-				}
-#endif
 
 				/* We have initialized the shared state so wake up others. */
 				BitmapDoneInitializingSharedState(pstate);
@@ -161,14 +126,6 @@ BitmapHeapNext(BitmapHeapScanState *node)
 
 			/* Allocate a private iterator and attach the shared state to it */
 			shared_tbmiterator = tbm_attach_shared_iterate(dsa, pstate->tbmiterator);
-
-#ifdef USE_PREFETCH
-			if (node->prefetch_maximum > 0)
-			{
-				node->shared_prefetch_iterator =
-					tbm_attach_shared_iterate(dsa, pstate->prefetch_iterator);
-			}
-#endif							/* USE_PREFETCH */
 		}
 
 		/*
@@ -212,37 +169,6 @@ BitmapHeapNext(BitmapHeapScanState *node)
 		{
 			CHECK_FOR_INTERRUPTS();
 
-#ifdef USE_PREFETCH
-
-			/*
-			 * Try to prefetch at least a few pages even before we get to the
-			 * second page if we don't stop reading after the first tuple.
-			 */
-			if (!pstate)
-			{
-				if (node->prefetch_target < node->prefetch_maximum)
-					node->prefetch_target++;
-			}
-			else if (pstate->prefetch_target < node->prefetch_maximum)
-			{
-				/* take spinlock while updating shared state */
-				SpinLockAcquire(&pstate->mutex);
-				if (pstate->prefetch_target < node->prefetch_maximum)
-					pstate->prefetch_target++;
-				SpinLockRelease(&pstate->mutex);
-			}
-#endif							/* USE_PREFETCH */
-
-			/*
-			 * We issue prefetch requests *after* fetching the current page to
-			 * try to avoid having prefetching interfere with the main I/O.
-			 * Also, this should happen only when we have determined there is
-			 * still something to do on the current page, else we may
-			 * uselessly prefetch the same page we are just about to request
-			 * for real.
-			 */
-			BitmapPrefetch(node, scan);
-
 			/*
 			 * If we are using lossy info, we have to recheck the qual
 			 * conditions at every tuple.
@@ -264,27 +190,13 @@ BitmapHeapNext(BitmapHeapScanState *node)
 		}
 
 new_page:
-
-		BitmapAdjustPrefetchIterator(node);
-
-		if (!table_scan_bitmap_next_block(scan, &node->recheck, &lossy, &node->blockno))
+		if (!table_scan_bitmap_next_block(scan, &node->recheck, &lossy))
 			break;
 
 		if (lossy)
 			node->lossy_pages++;
 		else
 			node->exact_pages++;
-
-		/*
-		 * If serial, we can validate that the prefetch block stays ahead of
-		 * the current block.
-		 */
-		Assert(node->pstate == NULL ||
-			   node->prefetch_iterator == NULL ||
-			   node->pfblockno > node->blockno);
-
-		/* Adjust the prefetch target */
-		BitmapAdjustPrefetchTarget(node);
 	}
 
 	/*
@@ -308,225 +220,6 @@ BitmapDoneInitializingSharedState(ParallelBitmapHeapState *pstate)
 	ConditionVariableBroadcast(&pstate->cv);
 }
 
-/*
- *	BitmapAdjustPrefetchIterator - Adjust the prefetch iterator
- *
- *	We keep track of how far the prefetch iterator is ahead of the main
- *	iterator in prefetch_pages. For each block the main iterator returns, we
- *	decrement prefetch_pages.
- */
-static inline void
-BitmapAdjustPrefetchIterator(BitmapHeapScanState *node)
-{
-#ifdef USE_PREFETCH
-	ParallelBitmapHeapState *pstate = node->pstate;
-	TBMIterateResult tbmpre;
-
-	if (pstate == NULL)
-	{
-		TBMIterator *prefetch_iterator = node->prefetch_iterator;
-
-		if (node->prefetch_pages > 0)
-		{
-			/* The main iterator has closed the distance by one page */
-			node->prefetch_pages--;
-		}
-		else if (prefetch_iterator)
-		{
-			/* Do not let the prefetch iterator get behind the main one */
-			tbm_iterate(prefetch_iterator, &tbmpre);
-			node->pfblockno = tbmpre.blockno;
-		}
-		return;
-	}
-
-	/*
-	 * Adjusting the prefetch iterator before invoking
-	 * table_scan_bitmap_next_block() keeps prefetch distance higher across
-	 * the parallel workers.
-	 */
-	if (node->prefetch_maximum > 0)
-	{
-		TBMSharedIterator *prefetch_iterator = node->shared_prefetch_iterator;
-
-		SpinLockAcquire(&pstate->mutex);
-		if (pstate->prefetch_pages > 0)
-		{
-			pstate->prefetch_pages--;
-			SpinLockRelease(&pstate->mutex);
-		}
-		else
-		{
-			/* Release the mutex before iterating */
-			SpinLockRelease(&pstate->mutex);
-
-			/*
-			 * In case of shared mode, we can not ensure that the current
-			 * blockno of the main iterator and that of the prefetch iterator
-			 * are same.  It's possible that whatever blockno we are
-			 * prefetching will be processed by another process.  Therefore,
-			 * we don't validate the blockno here as we do in non-parallel
-			 * case.
-			 */
-			if (prefetch_iterator)
-			{
-				tbm_shared_iterate(prefetch_iterator, &tbmpre);
-				node->pfblockno = tbmpre.blockno;
-			}
-		}
-	}
-#endif							/* USE_PREFETCH */
-}
-
-/*
- * BitmapAdjustPrefetchTarget - Adjust the prefetch target
- *
- * Increase prefetch target if it's not yet at the max.  Note that
- * we will increase it to zero after fetching the very first
- * page/tuple, then to one after the second tuple is fetched, then
- * it doubles as later pages are fetched.
- */
-static inline void
-BitmapAdjustPrefetchTarget(BitmapHeapScanState *node)
-{
-#ifdef USE_PREFETCH
-	ParallelBitmapHeapState *pstate = node->pstate;
-
-	if (pstate == NULL)
-	{
-		if (node->prefetch_target >= node->prefetch_maximum)
-			 /* don't increase any further */ ;
-		else if (node->prefetch_target >= node->prefetch_maximum / 2)
-			node->prefetch_target = node->prefetch_maximum;
-		else if (node->prefetch_target > 0)
-			node->prefetch_target *= 2;
-		else
-			node->prefetch_target++;
-		return;
-	}
-
-	/* Do an unlocked check first to save spinlock acquisitions. */
-	if (pstate->prefetch_target < node->prefetch_maximum)
-	{
-		SpinLockAcquire(&pstate->mutex);
-		if (pstate->prefetch_target >= node->prefetch_maximum)
-			 /* don't increase any further */ ;
-		else if (pstate->prefetch_target >= node->prefetch_maximum / 2)
-			pstate->prefetch_target = node->prefetch_maximum;
-		else if (pstate->prefetch_target > 0)
-			pstate->prefetch_target *= 2;
-		else
-			pstate->prefetch_target++;
-		SpinLockRelease(&pstate->mutex);
-	}
-#endif							/* USE_PREFETCH */
-}
-
-/*
- * BitmapPrefetch - Prefetch, if prefetch_pages are behind prefetch_target
- */
-static inline void
-BitmapPrefetch(BitmapHeapScanState *node, TableScanDesc scan)
-{
-#ifdef USE_PREFETCH
-	ParallelBitmapHeapState *pstate = node->pstate;
-
-	if (pstate == NULL)
-	{
-		TBMIterator *prefetch_iterator = node->prefetch_iterator;
-
-		if (prefetch_iterator)
-		{
-			while (node->prefetch_pages < node->prefetch_target)
-			{
-				TBMIterateResult tbmpre;
-				bool		skip_fetch;
-
-				tbm_iterate(prefetch_iterator, &tbmpre);
-
-				if (!BlockNumberIsValid(tbmpre.blockno))
-				{
-					/* No more pages to prefetch */
-					tbm_end_iterate(prefetch_iterator);
-					node->prefetch_iterator = NULL;
-					break;
-				}
-				node->prefetch_pages++;
-				node->pfblockno = tbmpre.blockno;
-
-				/*
-				 * If we expect not to have to actually read this heap page,
-				 * skip this prefetch call, but continue to run the prefetch
-				 * logic normally.  (Would it be better not to increment
-				 * prefetch_pages?)
-				 */
-				skip_fetch = (scan->rs_flags & SO_CAN_SKIP_FETCH &&
-							  !tbmpre.recheck &&
-							  VM_ALL_VISIBLE(node->ss.ss_currentRelation,
-											 tbmpre.blockno,
-											 &node->pvmbuffer));
-
-				if (!skip_fetch)
-					PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre.blockno);
-			}
-		}
-
-		return;
-	}
-
-	if (pstate->prefetch_pages < pstate->prefetch_target)
-	{
-		TBMSharedIterator *prefetch_iterator = node->shared_prefetch_iterator;
-
-		if (prefetch_iterator)
-		{
-			while (1)
-			{
-				TBMIterateResult tbmpre;
-				bool		do_prefetch = false;
-				bool		skip_fetch;
-
-				/*
-				 * Recheck under the mutex. If some other process has already
-				 * done enough prefetching then we need not to do anything.
-				 */
-				SpinLockAcquire(&pstate->mutex);
-				if (pstate->prefetch_pages < pstate->prefetch_target)
-				{
-					pstate->prefetch_pages++;
-					do_prefetch = true;
-				}
-				SpinLockRelease(&pstate->mutex);
-
-				if (!do_prefetch)
-					return;
-
-				tbm_shared_iterate(prefetch_iterator, &tbmpre);
-				if (!BlockNumberIsValid(tbmpre.blockno))
-				{
-					/* No more pages to prefetch */
-					tbm_end_shared_iterate(prefetch_iterator);
-					node->shared_prefetch_iterator = NULL;
-					break;
-				}
-
-				node->pfblockno = tbmpre.blockno;
-
-				/* As above, skip prefetch if we expect not to need page */
-				skip_fetch = (scan->rs_flags & SO_CAN_SKIP_FETCH &&
-							  !tbmpre.recheck &&
-							  VM_ALL_VISIBLE(node->ss.ss_currentRelation,
-											 tbmpre.blockno,
-											 &node->pvmbuffer));
-
-				if (!skip_fetch)
-					PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre.blockno);
-			}
-		}
-	}
-#endif							/* USE_PREFETCH */
-}
-
 /*
  * BitmapHeapRecheck -- access method routine to recheck a tuple in EvalPlanQual
  */
@@ -572,23 +265,12 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
 	if (node->ss.ss_currentScanDesc)
 		table_rescan(node->ss.ss_currentScanDesc, NULL);
 
-	/* release bitmaps and buffers if any */
-	if (node->prefetch_iterator)
-		tbm_end_iterate(node->prefetch_iterator);
-	if (node->shared_prefetch_iterator)
-		tbm_end_shared_iterate(node->shared_prefetch_iterator);
+	/* release bitmaps if any */
 	if (node->tbm)
 		tbm_free(node->tbm);
-	if (node->pvmbuffer != InvalidBuffer)
-		ReleaseBuffer(node->pvmbuffer);
 	node->tbm = NULL;
-	node->prefetch_iterator = NULL;
 	node->initialized = false;
-	node->shared_prefetch_iterator = NULL;
-	node->pvmbuffer = InvalidBuffer;
 	node->recheck = true;
-	node->blockno = InvalidBlockNumber;
-	node->pfblockno = InvalidBlockNumber;
 
 	ExecScanReScan(&node->ss);
 
@@ -627,16 +309,10 @@ ExecEndBitmapHeapScan(BitmapHeapScanState *node)
 		table_endscan(scanDesc);
 
 	/*
-	 * release bitmaps and buffers if any
+	 * release bitmaps if any
 	 */
-	if (node->prefetch_iterator)
-		tbm_end_iterate(node->prefetch_iterator);
 	if (node->tbm)
 		tbm_free(node->tbm);
-	if (node->shared_prefetch_iterator)
-		tbm_end_shared_iterate(node->shared_prefetch_iterator);
-	if (node->pvmbuffer != InvalidBuffer)
-		ReleaseBuffer(node->pvmbuffer);
 }
 
 /* ----------------------------------------------------------------
@@ -669,18 +345,11 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
 	scanstate->ss.ps.ExecProcNode = ExecBitmapHeapScan;
 
 	scanstate->tbm = NULL;
-	scanstate->pvmbuffer = InvalidBuffer;
 	scanstate->exact_pages = 0;
 	scanstate->lossy_pages = 0;
-	scanstate->prefetch_iterator = NULL;
-	scanstate->prefetch_pages = 0;
-	scanstate->prefetch_target = 0;
 	scanstate->initialized = false;
-	scanstate->shared_prefetch_iterator = NULL;
 	scanstate->pstate = NULL;
 	scanstate->recheck = true;
-	scanstate->blockno = InvalidBlockNumber;
-	scanstate->pfblockno = InvalidBlockNumber;
 
 	/*
 	 * Miscellaneous initialization
@@ -720,13 +389,6 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
 	scanstate->bitmapqualorig =
 		ExecInitQual(node->bitmapqualorig, (PlanState *) scanstate);
 
-	/*
-	 * Maximum number of prefetches for the tablespace if configured,
-	 * otherwise the current value of the effective_io_concurrency GUC.
-	 */
-	scanstate->prefetch_maximum =
-		get_tablespace_io_concurrency(currentRelation->rd_rel->reltablespace);
-
 	scanstate->ss.ss_currentRelation = currentRelation;
 
 	/*
@@ -803,14 +465,10 @@ ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
 		return;
 
 	pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelBitmapHeapState));
-
 	pstate->tbmiterator = 0;
-	pstate->prefetch_iterator = 0;
 
 	/* Initialize the mutex */
 	SpinLockInit(&pstate->mutex);
-	pstate->prefetch_pages = 0;
-	pstate->prefetch_target = 0;
 	pstate->state = BM_INITIAL;
 
 	ConditionVariableInit(&pstate->cv);
@@ -841,11 +499,7 @@ ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node,
 	if (DsaPointerIsValid(pstate->tbmiterator))
 		tbm_free_shared_area(dsa, pstate->tbmiterator);
 
-	if (DsaPointerIsValid(pstate->prefetch_iterator))
-		tbm_free_shared_area(dsa, pstate->prefetch_iterator);
-
 	pstate->tbmiterator = InvalidDsaPointer;
-	pstate->prefetch_iterator = InvalidDsaPointer;
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 3dfb19ec7d5..1cad9c04f01 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -26,6 +26,7 @@
 #include "storage/dsm.h"
 #include "storage/lockdefs.h"
 #include "storage/shm_toc.h"
+#include "storage/streaming_read.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
 
@@ -72,6 +73,9 @@ typedef struct HeapScanDescData
 	 */
 	ParallelBlockTableScanWorkerData *rs_parallelworkerdata;
 
+	/* Streaming read control object for scans supporting it */
+	PgStreamingRead *rs_pgsr;
+
 	/*
 	 * These fields are only used for bitmap scans for the "skip fetch"
 	 * optimization. Bitmap scans needing no fields from the heap may skip
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index d214abeb201..5a963f9293f 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -792,23 +792,11 @@ typedef struct TableAmRoutine
 	 * lossy indicates whether or not the block's representation in the bitmap
 	 * is lossy or exact.
 	 *
-	 * XXX: Currently this may only be implemented if the AM uses md.c as its
-	 * storage manager, and uses ItemPointer->ip_blkid in a manner that maps
-	 * blockids directly to the underlying storage. nodeBitmapHeapscan.c
-	 * performs prefetching directly using that interface.  This probably
-	 * needs to be rectified at a later point.
-	 *
-	 * XXX: Currently this may only be implemented if the AM uses the
-	 * visibilitymap, as nodeBitmapHeapscan.c unconditionally accesses it to
-	 * perform prefetching.  This probably needs to be rectified at a later
-	 * point.
-	 *
 	 * Optional callback, but either both scan_bitmap_next_block and
 	 * scan_bitmap_next_tuple need to exist, or neither.
 	 */
-	bool		(*scan_bitmap_next_block) (TableScanDesc scan,
-										   bool *recheck, bool *lossy,
-										   BlockNumber *blockno);
+	bool		(*scan_bitmap_next_block) (TableScanDesc scan, bool *recheck,
+										   bool *lossy);
 
 	/*
 	 * Fetch the next tuple of a bitmap table scan into `slot` and return true
@@ -1983,8 +1971,7 @@ table_relation_estimate_size(Relation rel, int32 *attr_widths,
  * used after verifying the presence (at plan time or such).
  */
 static inline bool
-table_scan_bitmap_next_block(TableScanDesc scan,
-							 bool *recheck, bool *lossy, BlockNumber *blockno)
+table_scan_bitmap_next_block(TableScanDesc scan, bool *recheck, bool *lossy)
 {
 	/*
 	 * We don't expect direct calls to table_scan_bitmap_next_block with valid
@@ -1994,8 +1981,7 @@ table_scan_bitmap_next_block(TableScanDesc scan,
 	if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
 		elog(ERROR, "unexpected table_scan_bitmap_next_block call during logical decoding");
 
-	return scan->rs_rd->rd_tableam->scan_bitmap_next_block(scan, recheck,
-														   lossy, blockno);
+	return scan->rs_rd->rd_tableam->scan_bitmap_next_block(scan, recheck, lossy);
 }
 
 /*
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 9ec7eaceeb7..6a48f583d7c 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1682,11 +1682,8 @@ typedef enum
 /* ----------------
  *	 ParallelBitmapHeapState information
  *		tbmiterator				iterator for scanning current pages
- *		prefetch_iterator		iterator for prefetching ahead of current page
  *		mutex					mutual exclusion for the prefetching variable
  *								and state
- *		prefetch_pages			# pages prefetch iterator is ahead of current
- *		prefetch_target			current target prefetch distance
  *		state					current state of the TIDBitmap
  *		cv						conditional wait variable
  * ----------------
@@ -1694,10 +1691,7 @@ typedef enum
 typedef struct ParallelBitmapHeapState
 {
 	dsa_pointer tbmiterator;
-	dsa_pointer prefetch_iterator;
 	slock_t		mutex;
-	int			prefetch_pages;
-	int			prefetch_target;
 	SharedBitmapState state;
 	ConditionVariable cv;
 } ParallelBitmapHeapState;
@@ -1707,19 +1701,11 @@ typedef struct ParallelBitmapHeapState
  *
  *		bitmapqualorig	   execution state for bitmapqualorig expressions
  *		tbm				   bitmap obtained from child index scan(s)
- *		pvmbuffer		   buffer for visibility-map lookups of prefetched pages
  *		exact_pages		   total number of exact pages retrieved
  *		lossy_pages		   total number of lossy pages retrieved
- *		prefetch_iterator  iterator for prefetching ahead of current page
- *		prefetch_pages	   # pages prefetch iterator is ahead of current
- *		prefetch_target    current target prefetch distance
- *		prefetch_maximum   maximum value for prefetch_target
  *		initialized		   is node is ready to iterate
- *		shared_prefetch_iterator shared iterator for prefetching
  *		pstate			   shared state for parallel bitmap scan
  *		recheck			   do current page's tuples need recheck
- *		blockno			   used to validate pf and current block in sync
- *		pfblockno		   used to validate pf stays ahead of current block
  * ----------------
  */
 typedef struct BitmapHeapScanState
@@ -1727,19 +1713,11 @@ typedef struct BitmapHeapScanState
 	ScanState	ss;				/* its first field is NodeTag */
 	ExprState  *bitmapqualorig;
 	TIDBitmap  *tbm;
-	Buffer		pvmbuffer;
 	long		exact_pages;
 	long		lossy_pages;
-	TBMIterator *prefetch_iterator;
-	int			prefetch_pages;
-	int			prefetch_target;
-	int			prefetch_maximum;
 	bool		initialized;
-	TBMSharedIterator *shared_prefetch_iterator;
 	ParallelBitmapHeapState *pstate;
 	bool		recheck;
-	BlockNumber blockno;
-	BlockNumber pfblockno;
 } BitmapHeapScanState;
 
 /* ----------------
-- 
2.40.1

