From b10722996a4f27c44e5b6b07eac306a6cc017ca5 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 23 May 2022 10:49:17 +1200
Subject: [PATCH v1 12/14] WIP: Use streaming reads in recovery.

Replace xlogprefetcher.c's local I/O queue mechanism with the standard
'streaming buffer read' pattern.  Benefits:

* 8KB posix_fadvise() and pread() calls can be batched into larger
  vector I/O calls that transfer ranges of blocks up to 128KB at a time

* the prefetcher receives a stream of already-pinned buffers, which also
  means that buffers that are accessed multiple times within the
  prefetching window can stay pinned; this applies whether or not I/O
  is necessary, so it provides a performance benefit even to well cached
  recovery workloads

Adopting centralized infrastructure also means that future proposed
improvements (for example asynchronous I/O) will automatically apply
here too.

Previously, the GUC "recovery_prefetch" had a default setting "try",
for the benefit of systems with no posix_fadvise() system call, where
there was no point in looking ahead in the WAL.  Now that it is useful
on all system to do so, to build vector I/Os and consolidate pins, the
option is available everywhere so there is no point in the "try"
setting.  It can still be turned off, though, and it is still limited by
maintenance_io_concurrency (0 on systems with no posix_fadvise()).

XXX TODO kill most of the pg_stat_recovery_prefetch hit/miss counters
and push data into the standard pgstatio infrastructure/views.

XXX TODO need to work on the code that handles changes to recovery
settings during recovery (ie buffers on already-decoded records might be
out of sync)

Author: Thomas Munro <thomas.munro@gmail.com>
---
 doc/src/sgml/config.sgml                    |  10 +-
 src/backend/access/transam/xlogprefetcher.c | 601 ++++++++++----------
 src/backend/access/transam/xlogreader.c     |  12 +
 src/backend/access/transam/xlogrecovery.c   |   5 +-
 src/backend/access/transam/xlogutils.c      |  74 ++-
 src/backend/storage/buffer/bufmgr.c         |   6 +-
 src/backend/storage/freespace/freespace.c   |   2 +-
 src/backend/utils/misc/guc_tables.c         |   6 +-
 src/include/access/xlogprefetcher.h         |   3 +-
 src/include/access/xlogreader.h             |   1 +
 src/include/access/xlogutils.h              |   4 +-
 11 files changed, 380 insertions(+), 344 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 694d667bf9..521d3981ec 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3737,13 +3737,9 @@ include_dir 'conf.d'
        <para>
         Whether to try to prefetch blocks that are referenced in the WAL that
         are not yet in the buffer pool, during recovery.  Valid values are
-        <literal>off</literal>, <literal>on</literal> and
-        <literal>try</literal> (the default).  The setting
-        <literal>try</literal> enables
-        prefetching only if the operating system provides the
-        <function>posix_fadvise</function> function, which is currently used
-        to implement prefetching.  Note that some operating systems provide the
-        function, but it doesn't do anything.
+        <literal>off</literal> and <literal>on</literal> (the default).
+        This allows <function>posix_fadvise</function> hints to be provided
+        to some operating systems, and I/O to be performed more efficiently.
        </para>
        <para>
         Prefetching blocks that will soon be needed can reduce I/O wait times
diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c
index 539928cb85..ec068cf1ea 100644
--- a/src/backend/access/transam/xlogprefetcher.c
+++ b/src/backend/access/transam/xlogprefetcher.c
@@ -27,6 +27,7 @@
 
 #include "postgres.h"
 
+#include "access/xact.h"
 #include "access/xlog.h"
 #include "access/xlogprefetcher.h"
 #include "access/xlogreader.h"
@@ -44,6 +45,7 @@
 #include "storage/bufmgr.h"
 #include "storage/shmem.h"
 #include "storage/smgr.h"
+#include "storage/streaming_read.h"
 #include "utils/guc_hooks.h"
 #include "utils/hsearch.h"
 
@@ -53,12 +55,6 @@
  */
 #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
 
-/*
- * To detect repeated access to the same block and skip useless extra system
- * calls, we remember a small window of recently prefetched blocks.
- */
-#define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
-
 /*
  * When maintenance_io_concurrency is not saturated, we're prepared to look
  * ahead up to N times that number of block references.
@@ -69,58 +65,13 @@
 /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
 
 /* GUCs */
-int			recovery_prefetch = RECOVERY_PREFETCH_TRY;
+int			recovery_prefetch = RECOVERY_PREFETCH_ON;
 
-#ifdef USE_PREFETCH
 #define RecoveryPrefetchEnabled() \
-		(recovery_prefetch != RECOVERY_PREFETCH_OFF && \
-		 maintenance_io_concurrency > 0)
-#else
-#define RecoveryPrefetchEnabled() false
-#endif
+		(recovery_prefetch != RECOVERY_PREFETCH_OFF)
 
 static int	XLogPrefetchReconfigureCount = 0;
 
-/*
- * Enum used to report whether an IO should be started.
- */
-typedef enum
-{
-	LRQ_NEXT_NO_IO,
-	LRQ_NEXT_IO,
-	LRQ_NEXT_AGAIN
-} LsnReadQueueNextStatus;
-
-/*
- * Type of callback that can decide which block to prefetch next.  For now
- * there is only one.
- */
-typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
-													   XLogRecPtr *lsn);
-
-/*
- * A simple circular queue of LSNs, using to control the number of
- * (potentially) inflight IOs.  This stands in for a later more general IO
- * control mechanism, which is why it has the apparently unnecessary
- * indirection through a function pointer.
- */
-typedef struct LsnReadQueue
-{
-	LsnReadQueueNextFun next;
-	uintptr_t	lrq_private;
-	uint32		max_inflight;
-	uint32		inflight;
-	uint32		completed;
-	uint32		head;
-	uint32		tail;
-	uint32		size;
-	struct
-	{
-		bool		io;
-		XLogRecPtr	lsn;
-	}			queue[FLEXIBLE_ARRAY_MEMBER];
-} LsnReadQueue;
-
 /*
  * A prefetcher.  This is a mechanism that wraps an XLogReader, prefetching
  * blocks that will be soon be referenced, to try to avoid IO stalls.
@@ -139,16 +90,11 @@ struct XLogPrefetcher
 	HTAB	   *filter_table;
 	dlist_head	filter_queue;
 
-	/* Book-keeping to avoid repeat prefetches. */
-	RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
-	BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
-	int			recent_idx;
-
 	/* Book-keeping to disable prefetching temporarily. */
 	XLogRecPtr	no_readahead_until;
 
 	/* IO depth manager. */
-	LsnReadQueue *streaming_read;
+	PgStreamingRead *streaming_read;
 
 	XLogRecPtr	begin_ptr;
 
@@ -197,103 +143,16 @@ static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
 											BlockNumber blockno);
 static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
 												 XLogRecPtr replaying_lsn);
-static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
-													  XLogRecPtr *lsn);
+static bool XLogPrefetcherNextBlock(PgStreamingRead *pgsr,
+									uintptr_t pgsr_prviate,
+									void *per_io_data,
+									BufferManagerRelation *bmr,
+									ForkNumber *forknum,
+									BlockNumber *blocknum,
+									ReadBufferMode *mode);
 
 static XLogPrefetchStats *SharedStats;
 
-static inline LsnReadQueue *
-lrq_alloc(uint32 max_distance,
-		  uint32 max_inflight,
-		  uintptr_t lrq_private,
-		  LsnReadQueueNextFun next)
-{
-	LsnReadQueue *lrq;
-	uint32		size;
-
-	Assert(max_distance >= max_inflight);
-
-	size = max_distance + 1;	/* full ring buffer has a gap */
-	lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
-	lrq->lrq_private = lrq_private;
-	lrq->max_inflight = max_inflight;
-	lrq->size = size;
-	lrq->next = next;
-	lrq->head = 0;
-	lrq->tail = 0;
-	lrq->inflight = 0;
-	lrq->completed = 0;
-
-	return lrq;
-}
-
-static inline void
-lrq_free(LsnReadQueue *lrq)
-{
-	pfree(lrq);
-}
-
-static inline uint32
-lrq_inflight(LsnReadQueue *lrq)
-{
-	return lrq->inflight;
-}
-
-static inline uint32
-lrq_completed(LsnReadQueue *lrq)
-{
-	return lrq->completed;
-}
-
-static inline void
-lrq_prefetch(LsnReadQueue *lrq)
-{
-	/* Try to start as many IOs as we can within our limits. */
-	while (lrq->inflight < lrq->max_inflight &&
-		   lrq->inflight + lrq->completed < lrq->size - 1)
-	{
-		Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
-		switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
-		{
-			case LRQ_NEXT_AGAIN:
-				return;
-			case LRQ_NEXT_IO:
-				lrq->queue[lrq->head].io = true;
-				lrq->inflight++;
-				break;
-			case LRQ_NEXT_NO_IO:
-				lrq->queue[lrq->head].io = false;
-				lrq->completed++;
-				break;
-		}
-		lrq->head++;
-		if (lrq->head == lrq->size)
-			lrq->head = 0;
-	}
-}
-
-static inline void
-lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
-{
-	/*
-	 * We know that LSNs before 'lsn' have been replayed, so we can now assume
-	 * that any IOs that were started before then have finished.
-	 */
-	while (lrq->tail != lrq->head &&
-		   lrq->queue[lrq->tail].lsn < lsn)
-	{
-		if (lrq->queue[lrq->tail].io)
-			lrq->inflight--;
-		else
-			lrq->completed--;
-		lrq->tail++;
-		if (lrq->tail == lrq->size)
-			lrq->tail = 0;
-	}
-	if (RecoveryPrefetchEnabled())
-		lrq_prefetch(lrq);
-}
-
 size_t
 XLogPrefetchShmemSize(void)
 {
@@ -395,7 +254,23 @@ XLogPrefetcherAllocate(XLogReaderState *reader)
 void
 XLogPrefetcherFree(XLogPrefetcher *prefetcher)
 {
-	lrq_free(prefetcher->streaming_read);
+	/*
+	 * Redo records are normally expected to read and then release any buffers
+	 * referenced by a WAL record, but we may need to release buffers for the
+	 * final block if recovery ends without replaying a record.
+	 */
+	if (prefetcher->reader->record)
+	{
+		DecodedXLogRecord *last_record = prefetcher->reader->record;
+
+		for (int i = 0; i <= last_record->max_block_id; ++i)
+		{
+			if (last_record->blocks[i].in_use &&
+				BufferIsValid(last_record->blocks[i].prefetch_buffer))
+				ReleaseBuffer(last_record->blocks[i].prefetch_buffer);
+		}
+	}
+	pg_streaming_read_free(prefetcher->streaming_read);
 	hash_destroy(prefetcher->filter_table);
 	pfree(prefetcher);
 }
@@ -415,8 +290,8 @@ XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
 void
 XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
 {
-	uint32		io_depth;
-	uint32		completed;
+	int			ios;
+	int			pins;
 	int64		wal_distance;
 
 
@@ -432,13 +307,13 @@ XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
 		wal_distance = 0;
 	}
 
-	/* How many IOs are currently in flight and completed? */
-	io_depth = lrq_inflight(prefetcher->streaming_read);
-	completed = lrq_completed(prefetcher->streaming_read);
+	/* How many IOs are currently in progress, and how many pins do we have? */
+	ios = pg_streaming_read_ios(prefetcher->streaming_read);
+	pins = pg_streaming_read_pins(prefetcher->streaming_read);
 
 	/* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
-	SharedStats->io_depth = io_depth;
-	SharedStats->block_distance = io_depth + completed;
+	SharedStats->io_depth = ios;
+	SharedStats->block_distance = pins;
 	SharedStats->wal_distance = wal_distance;
 
 	prefetcher->next_stats_shm_lsn =
@@ -446,23 +321,18 @@ XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
 }
 
 /*
- * A callback that examines the next block reference in the WAL, and possibly
- * starts an IO so that a later read will be fast.
- *
- * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
- *
- * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
- * that isn't in the buffer pool, and the kernel has been asked to start
- * reading it to make a future read system call faster. An LSN is written to
- * *lsn, and the I/O will be considered to have completed once that LSN is
- * replayed.
- *
- * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
- * that it was already in the buffer pool, or we decided for various reasons
- * not to prefetch.
+ * A PgStreamingRead callback that generates a stream of block references by
+ * looking ahead in the WAL, which XLogPrefetcherReadRecord() will later
+ * retrieve.
  */
-static LsnReadQueueNextStatus
-XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
+static bool
+XLogPrefetcherNextBlock(PgStreamingRead *pgsr,
+						uintptr_t pgsr_private,
+						void *per_io_data,
+						BufferManagerRelation *bmr,
+						ForkNumber *forknum,
+						BlockNumber *blocknum,
+						ReadBufferMode *mode)
 {
 	XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
 	XLogReaderState *reader = prefetcher->reader;
@@ -475,6 +345,8 @@ XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
 	for (;;)
 	{
 		DecodedXLogRecord *record;
+		uint8		rmid;
+		uint8		record_type;
 
 		/* Try to read a new future record, if we don't already have one. */
 		if (prefetcher->record == NULL)
@@ -491,7 +363,7 @@ XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
 
 			/* Readahead is disabled until we replay past a certain point. */
 			if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
-				return LRQ_NEXT_AGAIN;
+				return false;
 
 			record = XLogReadAhead(prefetcher->reader, nonblocking);
 			if (record == NULL)
@@ -505,23 +377,20 @@ XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
 					prefetcher->no_readahead_until =
 						prefetcher->reader->decode_queue_tail->lsn;
 
-				return LRQ_NEXT_AGAIN;
-			}
-
-			/*
-			 * If prefetching is disabled, we don't need to analyze the record
-			 * or issue any prefetches.  We just need to cause one record to
-			 * be decoded.
-			 */
-			if (!RecoveryPrefetchEnabled())
-			{
-				*lsn = InvalidXLogRecPtr;
-				return LRQ_NEXT_NO_IO;
+				return false;
 			}
 
 			/* We have a new record to process. */
 			prefetcher->record = record;
 			prefetcher->next_block_id = 0;
+
+			/*
+			 * If prefetching is disabled, we don't need to analyze the
+			 * record.  We just needed to cause one record to be decoded, so
+			 * we can give up now.
+			 */
+			if (!RecoveryPrefetchEnabled())
+				return false;
 		}
 		else
 		{
@@ -529,15 +398,15 @@ XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
 			record = prefetcher->record;
 		}
 
+		rmid = record->header.xl_rmid;
+		record_type = record->header.xl_info & ~XLR_INFO_MASK;
+
 		/*
 		 * Check for operations that require us to filter out block ranges, or
 		 * pause readahead completely.
 		 */
 		if (replaying_lsn < record->lsn)
 		{
-			uint8		rmid = record->header.xl_rmid;
-			uint8		record_type = record->header.xl_info & ~XLR_INFO_MASK;
-
 			if (rmid == RM_XLOG_ID)
 			{
 				if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
@@ -589,6 +458,51 @@ XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
 						 rlocator.dbOid,
 						 LSN_FORMAT_ARGS(record->lsn));
 #endif
+
+					/*
+					 * Don't prefetch anything in the source database either,
+					 * because FlushDatabaseBuffers() doesn't like to see any
+					 * pinned buffers.
+					 */
+					rlocator.dbOid = xlrec->src_db_id;
+					XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
+
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+					elog(XLOGPREFETCHER_DEBUG_LEVEL,
+						 "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
+						 rlocator.dbOid,
+						 LSN_FORMAT_ARGS(record->lsn));
+#endif
+				}
+				else if (record_type == XLOG_DBASE_CREATE_WAL_LOG)
+				{
+					xl_dbase_create_wal_log_rec *xlrec =
+						(xl_dbase_create_wal_log_rec *) record->main_data;
+					RelFileLocator rlocator =
+					{InvalidOid, xlrec->db_id, InvalidRelFileNumber};
+
+					/*
+					 * As above, we don't want to pin buffers on the other
+					 * side of a DROP, CREATE DATABASE sequence that recycles
+					 * database OIDs.
+					 */
+					XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
+				}
+				else if (record_type == XLOG_DBASE_DROP)
+				{
+					xl_dbase_drop_rec *xlrec =
+						(xl_dbase_drop_rec *) record->main_data;
+					RelFileLocator rlocator =
+					{InvalidOid, xlrec->db_id, InvalidRelFileNumber};
+
+					/*
+					 * XLOG_DBASE_DROP can be used while moving between
+					 * tablespaces, and in that case it invalidates buffers
+					 * from all tablespaces.  That means that we also have to
+					 * wait for the drop before prefetching anything in this
+					 * whole DB.
+					 */
+					XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
 				}
 			}
 			else if (rmid == RM_SMGR_ID)
@@ -645,6 +559,53 @@ XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
 #endif
 				}
 			}
+			else if (rmid == RM_XACT_ID)
+			{
+				/*
+				 * If this is a COMMIT/ABORT that drops relations, the
+				 * SMgrRelation pointers to those relations that we have
+				 * already queued for streaming read will become dangling
+				 * pointers after this is replayed due to smgrclose(), so
+				 * pause prefetching here until that's done.
+				 */
+				if (unlikely(record->header.xl_info & XACT_XINFO_HAS_RELFILELOCATORS))
+				{
+					RelFileLocator *xlocators;
+					int			nrels = 0;
+
+					if (record_type == XLOG_XACT_COMMIT ||
+						record_type == XLOG_XACT_COMMIT_PREPARED)
+					{
+						xl_xact_parsed_commit parsed;
+
+						ParseCommitRecord(record->header.xl_info,
+										  (xl_xact_commit *) record->main_data,
+										  &parsed);
+
+						xlocators = parsed.xlocators;
+						nrels = parsed.nrels;
+					}
+					else if (record_type == XLOG_XACT_ABORT)
+					{
+						xl_xact_parsed_abort parsed;
+
+						ParseAbortRecord(record->header.xl_info,
+										 (xl_xact_abort *) record->main_data,
+										 &parsed);
+
+						xlocators = parsed.xlocators;
+						nrels = parsed.nrels;
+					}
+
+					/*
+					 * Filter out these relations until the record is
+					 * replayed.
+					 */
+					for (int i = 0; i < nrels; ++i)
+						XLogPrefetcherAddFilter(prefetcher, xlocators[i], 0,
+												record->lsn);
+				}
+			}
 		}
 
 		/* Scan the block references, starting where we left off last time. */
@@ -653,69 +614,39 @@ XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
 			int			block_id = prefetcher->next_block_id++;
 			DecodedBkpBlock *block = &record->blocks[block_id];
 			SMgrRelation reln;
-			PrefetchBufferResult result;
 
 			if (!block->in_use)
 				continue;
 
 			Assert(!BufferIsValid(block->prefetch_buffer));
+			Assert(!block->prefetch_buffer_streamed);
 
 			/*
-			 * Record the LSN of this record.  When it's replayed,
-			 * LsnReadQueue will consider any IOs submitted for earlier LSNs
-			 * to be finished.
+			 * We only stream the main fork only, for now.  Some of the other
+			 * forks require RBM_ZERO_ON_ERROR and have unusual logging
+			 * protocols.
 			 */
-			*lsn = record->lsn;
-
-			/* We don't try to prefetch anything but the main fork for now. */
 			if (block->forknum != MAIN_FORKNUM)
-			{
-				return LRQ_NEXT_NO_IO;
-			}
+				continue;
 
 			/*
-			 * If there is a full page image attached, we won't be reading the
-			 * page, so don't bother trying to prefetch.
+			 * FPI_FOR_HINT is special, with full_page_writes = off and
+			 * wal_log_hints = true, the record is solely used to include
+			 * knowledge about modified blocks in the WAL.
 			 */
-			if (block->has_image)
+			if (rmid == RM_XLOG_ID && record_type == XLOG_FPI_FOR_HINT &&
+				!block->has_image)
 			{
 				XLogPrefetchIncrement(&SharedStats->skip_fpw);
-				return LRQ_NEXT_NO_IO;
-			}
-
-			/* There is no point in reading a page that will be zeroed. */
-			if (block->flags & BKPBLOCK_WILL_INIT)
-			{
-				XLogPrefetchIncrement(&SharedStats->skip_init);
-				return LRQ_NEXT_NO_IO;
+				continue;
 			}
 
 			/* Should we skip prefetching this block due to a filter? */
 			if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
 			{
 				XLogPrefetchIncrement(&SharedStats->skip_new);
-				return LRQ_NEXT_NO_IO;
-			}
-
-			/* There is no point in repeatedly prefetching the same block. */
-			for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
-			{
-				if (block->blkno == prefetcher->recent_block[i] &&
-					RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
-				{
-					/*
-					 * XXX If we also remembered where it was, we could set
-					 * recent_buffer so that recovery could skip smgropen()
-					 * and a buffer table lookup.
-					 */
-					XLogPrefetchIncrement(&SharedStats->skip_rep);
-					return LRQ_NEXT_NO_IO;
-				}
+				continue;
 			}
-			prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
-			prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
-			prefetcher->recent_idx =
-				(prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
 
 			/*
 			 * We could try to have a fast path for repeated references to the
@@ -744,7 +675,7 @@ XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
 				XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
 										record->lsn);
 				XLogPrefetchIncrement(&SharedStats->skip_new);
-				return LRQ_NEXT_NO_IO;
+				continue;
 			}
 
 			/*
@@ -766,41 +697,33 @@ XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
 				XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
 										record->lsn);
 				XLogPrefetchIncrement(&SharedStats->skip_new);
-				return LRQ_NEXT_NO_IO;
+				continue;
 			}
 
-			/* Try to initiate prefetching. */
-			result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
-			if (BufferIsValid(result.recent_buffer))
-			{
-				/* Cache hit, nothing to do. */
-				XLogPrefetchIncrement(&SharedStats->hit);
-				block->prefetch_buffer = result.recent_buffer;
-				return LRQ_NEXT_NO_IO;
-			}
-			else if (result.initiated_io)
-			{
-				/* Cache miss, I/O (presumably) started. */
-				XLogPrefetchIncrement(&SharedStats->prefetch);
-				block->prefetch_buffer = InvalidBuffer;
-				return LRQ_NEXT_IO;
-			}
-			else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
-			{
-				/*
-				 * This shouldn't be possible, because we already determined
-				 * that the relation exists on disk and is big enough.
-				 * Something is wrong with the cache invalidation for
-				 * smgrexists(), smgrnblocks(), or the file was unlinked or
-				 * truncated beneath our feet?
-				 */
-				elog(ERROR,
-					 "could not prefetch relation %u/%u/%u block %u",
-					 reln->smgr_rlocator.locator.spcOid,
-					 reln->smgr_rlocator.locator.dbOid,
-					 reln->smgr_rlocator.locator.relNumber,
-					 block->blkno);
-			}
+			/*
+			 * Stream this block!  It will be looked up and pinned by the
+			 * streaming read (possibly combining I/Os with nearby blocks),
+			 * and XLogPrefetcherReadRecord() will consume it from the
+			 * streaming read object, and make it available for
+			 * XLogReadBufferForRedo() to provide to a redo routine.
+			 */
+			block->prefetch_buffer_streamed = true;
+
+			/*
+			 * If there's an FPI or redo is just going to zero it, we can skip
+			 * a useless I/O, and stream a pinned buffer that
+			 * XLogReadBufferForRedo() will pass to ZeroBuffer().
+			 */
+			if (block->apply_image ||
+				(block->flags & BKPBLOCK_WILL_INIT))
+				*mode = RBM_WILL_ZERO;
+			else
+				*mode = RBM_NORMAL;
+			*bmr = BMR_SMGR(reln, RELPERSISTENCE_PERMANENT);
+			*forknum = block->forknum;
+			*blocknum = block->blkno;
+
+			return true;
 		}
 
 		/*
@@ -815,7 +738,7 @@ XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
 		 */
 		if (prefetcher->reader->decode_queue_tail &&
 			prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
-			return LRQ_NEXT_AGAIN;
+			return false;
 
 		/* Advance to the next record. */
 		prefetcher->record = NULL;
@@ -989,34 +912,64 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
 	DecodedXLogRecord *record;
 	XLogRecPtr	replayed_up_to;
 
+#ifdef USE_ASSERT_CHECKING
+
+	/*
+	 * The recovery code (ie individual redo routines) should have called
+	 * XLogReadBufferForRedo() for all registered buffers.  Here we'll assert
+	 * that that's the case.
+	 */
+	if (prefetcher->reader->record)
+	{
+		DecodedXLogRecord *last_record = prefetcher->reader->record;
+
+		for (int i = 0; i <= last_record->max_block_id; ++i)
+		{
+			if (last_record->blocks[i].in_use &&
+				BufferIsValid(last_record->blocks[i].prefetch_buffer))
+				elog(ERROR,
+					 "redo routine did not read buffer pinned by prefetcher, LSN %X/%X",
+					 LSN_FORMAT_ARGS(last_record->lsn));
+		}
+	}
+#endif
+
 	/*
 	 * See if it's time to reset the prefetching machinery, because a relevant
 	 * GUC was changed.
 	 */
 	if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
 	{
-		uint32		max_distance;
-		uint32		max_inflight;
+		int			max_ios;
 
 		if (prefetcher->streaming_read)
-			lrq_free(prefetcher->streaming_read);
+			pg_streaming_read_free(prefetcher->streaming_read);
 
 		if (RecoveryPrefetchEnabled())
 		{
-			Assert(maintenance_io_concurrency > 0);
-			max_inflight = maintenance_io_concurrency;
-			max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
+			/*
+			 * maintenance_io_concurrency might be 0 on some systems, but we
+			 * need at least 1 to function.
+			 */
+			max_ios = Max(maintenance_io_concurrency, 1);
 		}
 		else
 		{
-			max_inflight = 1;
-			max_distance = 1;
+			/*
+			 * Non-zero values are needed, but the NextBlock callback will
+			 * never actually do any prefetching, so the streaming read will
+			 * not be used except as a way to force records to be decoded one
+			 * by one.
+			 */
+			max_ios = 1;
 		}
 
-		prefetcher->streaming_read = lrq_alloc(max_distance,
-											   max_inflight,
-											   (uintptr_t) prefetcher,
-											   XLogPrefetcherNextBlock);
+		prefetcher->streaming_read =
+			pg_streaming_read_buffer_alloc(max_ios,
+										   0,
+										   (uintptr_t) prefetcher,
+										   NULL,
+										   XLogPrefetcherNextBlock);
 
 		prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
 	}
@@ -1035,20 +988,15 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
 	XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
 
 	/*
-	 * All IO initiated by earlier WAL is now completed.  This might trigger
-	 * further prefetching.
-	 */
-	lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
-
-	/*
-	 * If there's nothing queued yet, then start prefetching to cause at least
-	 * one record to be queued.
+	 * If there's nothing queued yet, then start prefetching.  Normally this
+	 * happens automatically when we call streaming_read_get_next() below to
+	 * complete earlier IOs, but if we didn't have a special case for an empty
+	 * queue we'd never be able to get started.
 	 */
 	if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
 	{
-		Assert(lrq_inflight(prefetcher->streaming_read) == 0);
-		Assert(lrq_completed(prefetcher->streaming_read) == 0);
-		lrq_prefetch(prefetcher->streaming_read);
+		pg_streaming_read_reset(prefetcher->streaming_read);
+		pg_streaming_read_prefetch(prefetcher->streaming_read);
 	}
 
 	/* Read the next record. */
@@ -1078,23 +1026,64 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
 	if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
 		XLogPrefetcherComputeStats(prefetcher);
 
-	Assert(record == prefetcher->reader->record);
+	/*
+	 * Make sure that any IOs we initiated earlier for this record are
+	 * completed, by pulling the buffers out of the StreamingRead.
+	 */
+	for (int block_id = 0; block_id <= record->max_block_id; ++block_id)
+	{
+		DecodedBkpBlock *block = &record->blocks[block_id];
+		Buffer		buffer;
 
-	return &record->header;
-}
+		if (!block->in_use)
+			continue;
 
-bool
-check_recovery_prefetch(int *new_value, void **extra, GucSource source)
-{
-#ifndef USE_PREFETCH
-	if (*new_value == RECOVERY_PREFETCH_ON)
-	{
-		GUC_check_errdetail("recovery_prefetch is not supported on platforms that lack posix_fadvise().");
-		return false;
-	}
+		/*
+		 * If we haven't streamed this buffer, then
+		 * XLogReadBufferForRedoExtended() will just have to read it the
+		 * traditional way.
+		 */
+		if (!block->prefetch_buffer_streamed)
+			continue;
+
+		Assert(!BufferIsValid(block->prefetch_buffer));
+
+		/*
+		 * Otherwise we already have a pinned buffer waiting for us in the
+		 * streaming read.
+		 */
+		buffer = pg_streaming_read_buffer_get_next(prefetcher->streaming_read, NULL);
+
+		if (buffer == InvalidBuffer)
+			elog(PANIC, "unexpectedly ran out of buffers in streaming read");
+
+		block->prefetch_buffer = buffer;
+		block->prefetch_buffer_streamed = false;
+
+		/*
+		 * Assert that we're in sync with XLogPrefetcherNextBlock(), which is
+		 * feeding blocks into the far end of the pipe.  For every decoded
+		 * block that has the prefetch_buffer_streamed flag set, in order, we
+		 * expect the corresponding already-pinned buffer to be the next to
+		 * come out of streaming_read.
+		 */
+#ifdef USE_ASSERT_CHECKING
+		{
+			RelFileLocator rlocator;
+			ForkNumber	forknum;
+			BlockNumber blocknum;
+
+			BufferGetTag(buffer, &rlocator, &forknum, &blocknum);
+			Assert(RelFileLocatorEquals(rlocator, block->rlocator));
+			Assert(forknum == block->forknum);
+			Assert(blocknum == block->blkno);
+		}
 #endif
+	}
 
-	return true;
+	Assert(record == prefetcher->reader->record);
+
+	return &record->header;
 }
 
 void
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index c9f9f6e98f..de6cd926ad 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1750,6 +1750,7 @@ DecodeXLogRecord(XLogReaderState *state,
 			blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
 
 			blk->prefetch_buffer = InvalidBuffer;
+			blk->prefetch_buffer_streamed = false;
 
 			COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
 			/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
@@ -1978,6 +1979,9 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
  * If the WAL record contains a block reference with the given ID, *rlocator,
  * *forknum, *blknum and *prefetch_buffer are filled in (if not NULL), and
  * returns true.  Otherwise returns false.
+ *
+ * If prefetch_buffer is not NULL, the buffer is already pinned, and ownership
+ * of the pin is transferred to the caller.
  */
 bool
 XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id,
@@ -1998,7 +2002,15 @@ XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id,
 	if (blknum)
 		*blknum = bkpb->blkno;
 	if (prefetch_buffer)
+	{
 		*prefetch_buffer = bkpb->prefetch_buffer;
+
+		/*
+		 * Clear this flag is so that we can assert that redo records take
+		 * ownership of all buffers pinned by xlogprefetcher.c.
+		 */
+		bkpb->prefetch_buffer = InvalidBuffer;
+	}
 	return true;
 }
 
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index becc2bda62..204ec48072 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1551,8 +1551,8 @@ ShutdownWalRecovery(void)
 		close(readFile);
 		readFile = -1;
 	}
-	XLogReaderFree(xlogreader);
 	XLogPrefetcherFree(xlogprefetcher);
+	XLogReaderFree(xlogreader);
 
 	if (ArchiveRecoveryRequested)
 	{
@@ -2420,8 +2420,7 @@ verifyBackupPageConsistency(XLogReaderState *record)
 		 * temporary page.
 		 */
 		buf = XLogReadBufferExtended(rlocator, forknum, blkno,
-									 RBM_NORMAL_NO_LOG,
-									 InvalidBuffer);
+									 RBM_NORMAL_NO_LOG);
 		if (!BufferIsValid(buf))
 			continue;
 
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index bdc0f7e1da..b3382f9534 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -360,13 +360,13 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
 	RelFileLocator rlocator;
 	ForkNumber	forknum;
 	BlockNumber blkno;
-	Buffer		prefetch_buffer;
+	Buffer		streamed_buffer;
 	Page		page;
 	bool		zeromode;
 	bool		willinit;
 
 	if (!XLogRecGetBlockTagExtended(record, block_id, &rlocator, &forknum, &blkno,
-									&prefetch_buffer))
+									&streamed_buffer))
 	{
 		/* Caller specified a bogus block_id */
 		elog(PANIC, "failed to locate backup block with ID %d in WAL record",
@@ -384,13 +384,47 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
 	if (!willinit && zeromode)
 		elog(PANIC, "block to be initialized in redo routine must be marked with WILL_INIT flag in the WAL record");
 
+	/* Has xlogprefetcher.c streamed a pinned buffer to us? */
+	if (BufferIsValid(streamed_buffer))
+	{
+#ifdef USE_ASSERT_CHECKING
+		RelFileLocator xrlocator;
+		ForkNumber	xforknum;
+		BlockNumber xblocknum;
+
+		/* It must match the block requested in the WAL. */
+		BufferGetTag(streamed_buffer, &xrlocator, &xforknum, &xblocknum);
+		Assert(RelFileLocatorEquals(xrlocator, rlocator));
+		Assert(xforknum == forknum);
+		Assert(xblocknum == blkno);
+#endif
+	}
+
 	/* If it has a full-page image and it should be restored, do it. */
 	if (XLogRecBlockImageApply(record, block_id))
 	{
 		Assert(XLogRecHasBlockImage(record, block_id));
-		*buf = XLogReadBufferExtended(rlocator, forknum, blkno,
-									  get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK,
-									  prefetch_buffer);
+		if (BufferIsValid(streamed_buffer))
+		{
+			/*
+			 * The prefetcher has pinned this buffer, but used RBM_WILL_ZERO
+			 * when it saw that an image would be restored, so the buffer
+			 * wasn't made BM_VALID.  The only thing we are allowed to do now
+			 * is call ZeroBuffer().  Doing this at the last moment means that
+			 * we only lock one block at a time, when the redo routine is
+			 * ready for it, and can tell us which kind of lock it wants.
+			 *
+			 * XXX There doesn't seem to be a good reason to zero a page
+			 * before copying an image over the top, but this matches the
+			 * traditional behavior for now.
+			 */
+			ZeroBuffer(streamed_buffer,
+					   get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_CLEANUP_LOCK);
+			*buf = streamed_buffer;
+		}
+		else
+			*buf = XLogReadBufferExtended(rlocator, forknum, blkno,
+										  get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK);
 		page = BufferGetPage(*buf);
 		if (!RestoreBlockImage(record, block_id, page))
 			ereport(ERROR,
@@ -421,7 +455,22 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
 	}
 	else
 	{
-		*buf = XLogReadBufferExtended(rlocator, forknum, blkno, mode, prefetch_buffer);
+		if (BufferIsValid(streamed_buffer))
+		{
+			/*
+			 * If the redo routine wants a zeroed page, we do that here.  The
+			 * prefetcher streamed us a pinned but not necessarily BM_VALID
+			 * buffer, because it saw the BKPBLOCK_WILL_INIT flag and used
+			 * PGSR_NEXT_WILL_ZERO to avoid I/O for a page that will be
+			 * zeroed.  We only lock one page at a time, when the redo routine
+			 * is ready to tell us which type of lock it wants.
+			 */
+			if (zeromode)
+				ZeroBuffer(streamed_buffer, mode);
+			*buf = streamed_buffer;
+		}
+		else
+			*buf = XLogReadBufferExtended(rlocator, forknum, blkno, mode);
 		if (BufferIsValid(*buf))
 		{
 			if (mode != RBM_ZERO_AND_LOCK && mode != RBM_ZERO_AND_CLEANUP_LOCK)
@@ -472,8 +521,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
  */
 Buffer
 XLogReadBufferExtended(RelFileLocator rlocator, ForkNumber forknum,
-					   BlockNumber blkno, ReadBufferMode mode,
-					   Buffer recent_buffer)
+					   BlockNumber blkno, ReadBufferMode mode)
 {
 	BlockNumber lastblock;
 	Buffer		buffer;
@@ -481,15 +529,6 @@ XLogReadBufferExtended(RelFileLocator rlocator, ForkNumber forknum,
 
 	Assert(blkno != P_NEW);
 
-	/* Do we have a clue where the buffer might be already? */
-	if (BufferIsValid(recent_buffer) &&
-		mode == RBM_NORMAL &&
-		ReadRecentBuffer(rlocator, forknum, blkno, recent_buffer))
-	{
-		buffer = recent_buffer;
-		goto recent_buffer_fast_path;
-	}
-
 	/* Open the relation at smgr level */
 	smgr = smgropen(rlocator, InvalidBackendId);
 
@@ -533,7 +572,6 @@ XLogReadBufferExtended(RelFileLocator rlocator, ForkNumber forknum,
 									 mode);
 	}
 
-recent_buffer_fast_path:
 	if (mode == RBM_NORMAL)
 	{
 		/* check that page has been initialized */
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 678e3239a9..04dbede971 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -4977,7 +4977,11 @@ LockBufferForCleanup(Buffer buffer)
 	Assert(BufferIsPinned(buffer));
 	Assert(PinCountWaitBuf == NULL);
 
-	CheckBufferIsPinnedOnce(buffer);
+#if 0
+	/* Due to prefetching, recovery might hold more than one local pin. */
+	if (!InRecovery)
+		BufferCheckOneLocalPin(buffer);
+#endif
 
 	/* Nobody else to wait for */
 	if (BufferIsLocal(buffer))
diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index fb9440ff72..dbc9d87d42 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -212,7 +212,7 @@ XLogRecordPageWithFreeSpace(RelFileLocator rlocator, BlockNumber heapBlk,
 
 	/* If the page doesn't exist already, extend */
 	buf = XLogReadBufferExtended(rlocator, FSM_FORKNUM, blkno,
-								 RBM_ZERO_ON_ERROR, InvalidBuffer);
+								 RBM_ZERO_ON_ERROR);
 	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
 	page = BufferGetPage(buf);
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index e565a3092f..08dca1d8e6 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -375,7 +375,7 @@ static const struct config_enum_entry huge_pages_status_options[] = {
 static const struct config_enum_entry recovery_prefetch_options[] = {
 	{"off", RECOVERY_PREFETCH_OFF, false},
 	{"on", RECOVERY_PREFETCH_ON, false},
-	{"try", RECOVERY_PREFETCH_TRY, false},
+	{"try", RECOVERY_PREFETCH_ON, false},
 	{"true", RECOVERY_PREFETCH_ON, true},
 	{"false", RECOVERY_PREFETCH_OFF, true},
 	{"yes", RECOVERY_PREFETCH_ON, true},
@@ -4895,8 +4895,8 @@ struct config_enum ConfigureNamesEnum[] =
 			gettext_noop("Look ahead in the WAL to find references to uncached data.")
 		},
 		&recovery_prefetch,
-		RECOVERY_PREFETCH_TRY, recovery_prefetch_options,
-		check_recovery_prefetch, assign_recovery_prefetch, NULL
+		RECOVERY_PREFETCH_ON, recovery_prefetch_options,
+		NULL, assign_recovery_prefetch, NULL
 	},
 
 	{
diff --git a/src/include/access/xlogprefetcher.h b/src/include/access/xlogprefetcher.h
index 7dd7f20ad0..91c9b25050 100644
--- a/src/include/access/xlogprefetcher.h
+++ b/src/include/access/xlogprefetcher.h
@@ -24,8 +24,7 @@ extern PGDLLIMPORT int recovery_prefetch;
 typedef enum
 {
 	RECOVERY_PREFETCH_OFF,
-	RECOVERY_PREFETCH_ON,
-	RECOVERY_PREFETCH_TRY
+	RECOVERY_PREFETCH_ON
 }			RecoveryPrefetchValue;
 
 struct XLogPrefetcher;
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index da32c7db77..c11c364cbf 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -128,6 +128,7 @@ typedef struct
 
 	/* Prefetching workspace. */
 	Buffer		prefetch_buffer;
+	bool		prefetch_buffer_streamed;
 
 	/* copy of the fork_flags field from the XLogRecordBlockHeader */
 	uint8		flags;
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 5b77b11f50..daf064924e 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -88,10 +88,8 @@ extern XLogRedoAction XLogReadBufferForRedoExtended(XLogReaderState *record,
 													uint8 block_id,
 													ReadBufferMode mode, bool get_cleanup_lock,
 													Buffer *buf);
-
 extern Buffer XLogReadBufferExtended(RelFileLocator rlocator, ForkNumber forknum,
-									 BlockNumber blkno, ReadBufferMode mode,
-									 Buffer recent_buffer);
+									 BlockNumber blkno, ReadBufferMode mode);
 
 extern Relation CreateFakeRelcacheEntry(RelFileLocator rlocator);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
-- 
2.39.2

