From 29fe16d08d3da4bdb6d950f02ba71ae784562663 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 7 Apr 2020 22:56:27 +1200
Subject: [PATCH v6 8/8] Prefetch referenced blocks during recovery.

Introduce a new GUC max_recovery_prefetch_distance.  If it is set to a
positive number of bytes, then read ahead in the WAL at most that
distance, and initiate asynchronous reading of referenced blocks.  The
goal is to avoid I/O stalls and benefit from concurrent I/O.  The number
of concurrency asynchronous reads is capped by the existing
maintenance_io_concurrency GUC.  The feature is enabled by default for
now, but we might reconsider that before release.

Reviewed-by: Tomas Vondra <tomas.vondra@2ndquadrant.com>
Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com
---
 doc/src/sgml/config.sgml                      |  45 +
 doc/src/sgml/monitoring.sgml                  |  71 ++
 doc/src/sgml/wal.sgml                         |  13 +
 src/backend/access/transam/Makefile           |   1 +
 src/backend/access/transam/xlog.c             |  11 +
 src/backend/access/transam/xlogprefetch.c     | 900 ++++++++++++++++++
 src/backend/catalog/system_views.sql          |  14 +
 src/backend/postmaster/pgstat.c               |  96 +-
 src/backend/replication/logical/logical.c     |   2 +-
 src/backend/storage/ipc/ipci.c                |   3 +
 src/backend/utils/misc/guc.c                  |  45 +-
 src/backend/utils/misc/postgresql.conf.sample |   5 +
 src/include/access/xlogprefetch.h             |  81 ++
 src/include/catalog/pg_proc.dat               |   8 +
 src/include/pgstat.h                          |  28 +-
 src/include/utils/guc.h                       |   4 +
 src/test/regress/expected/rules.out           |  11 +
 17 files changed, 1334 insertions(+), 4 deletions(-)
 create mode 100644 src/backend/access/transam/xlogprefetch.c
 create mode 100644 src/include/access/xlogprefetch.h

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index f68c992213..3e60f306ff 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3121,6 +3121,51 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-max-recovery-prefetch-distance" xreflabel="max_recovery_prefetch_distance">
+      <term><varname>max_recovery_prefetch_distance</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>max_recovery_prefetch_distance</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        The maximum distance to look ahead in the WAL during recovery, to find
+        blocks to prefetch.  Prefetching blocks that will soon be needed can
+        reduce I/O wait times.  The number of concurrent prefetches is limited
+        by this setting as well as
+        <xref linkend="guc-maintenance-io-concurrency"/>.  Setting it too high
+        might be counterproductive, if it means that data falls out of the
+        kernel cache before it is needed.  If this value is specified without
+        units, it is taken as bytes.  A setting of -1 disables prefetching
+        during recovery.
+        The default is 256kB on systems that support
+        <function>posix_fadvise</function>, and otherwise -1.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry id="guc-recovery-prefetch-fpw" xreflabel="recovery_prefetch_fpw">
+      <term><varname>recovery_prefetch_fpw</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>recovery_prefetch_fpw</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Whether to prefetch blocks that were logged with full page images,
+        during recovery.  Often this doesn't help, since such blocks will not
+        be read the first time they are needed and might remain in the buffer
+        pool after that.  However, on file systems with a block size larger
+        than
+        <productname>PostgreSQL</productname>'s, prefetching can avoid a
+        costly read-before-write when a blocks are later written.  This
+        setting has no effect unless
+        <xref linkend="guc-max-recovery-prefetch-distance"/> is set to a positive
+        number.  The default is off.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
      </sect2>
      <sect2 id="runtime-config-wal-archiving">
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index c50b72137f..1229a28675 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -320,6 +320,13 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       </entry>
      </row>
 
+     <row>
+      <entry><structname>pg_stat_prefetch_recovery</structname><indexterm><primary>pg_stat_prefetch_recovery</primary></indexterm></entry>
+      <entry>Only one row, showing statistics about blocks prefetched during recovery.
+       See <xref linkend="pg-stat-prefetch-recovery-view"/> for details.
+      </entry>
+     </row>
+
      <row>
       <entry><structname>pg_stat_subscription</structname><indexterm><primary>pg_stat_subscription</primary></indexterm></entry>
       <entry>At least one row per subscription, showing information about
@@ -2223,6 +2230,68 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
    connected server.
   </para>
 
+  <table id="pg-stat-prefetch-recovery-view" xreflabel="pg_stat_prefetch_recovery">
+   <title><structname>pg_stat_prefetch_recovery</structname> View</title>
+   <tgroup cols="3">
+    <thead>
+    <row>
+      <entry>Column</entry>
+      <entry>Type</entry>
+      <entry>Description</entry>
+     </row>
+    </thead>
+
+   <tbody>
+    <row>
+     <entry><structfield>prefetch</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of blocks prefetched because they were not in the buffer pool</entry>
+    </row>
+    <row>
+     <entry><structfield>skip_hit</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of blocks not prefetched because they were already in the buffer pool</entry>
+    </row>
+    <row>
+     <entry><structfield>skip_new</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of blocks not prefetched because they were new (usually relation extension)</entry>
+    </row>
+    <row>
+     <entry><structfield>skip_fpw</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of blocks not prefetched because a full page image was included in the WAL and <xref linkend="guc-recovery-prefetch-fpw"/> was set to <literal>off</literal></entry>
+    </row>
+    <row>
+     <entry><structfield>skip_seq</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of blocks not prefetched because of repeated or sequential access</entry>
+    </row>
+    <row>
+     <entry><structfield>distance</structfield></entry>
+     <entry><type>integer</type></entry>
+     <entry>How far ahead of recovery the WAL prefetcher is currently reading, in bytes</entry>
+    </row>
+    <row>
+     <entry><structfield>queue_depth</structfield></entry>
+     <entry><type>integer</type></entry>
+     <entry>How many prefetches have been initiated but are not yet known to have completed</entry>
+    </row>
+   </tbody>
+   </tgroup>
+  </table>
+
+  <para>
+   The <structname>pg_stat_prefetch_recovery</structname> view will contain only
+   one row.  It is filled with nulls if recovery is not running or WAL
+   prefetching is not enabled.  See <xref linkend="guc-max-recovery-prefetch-distance"/>
+   for more information.  The counters in this view are reset whenever the
+   <xref linkend="guc-max-recovery-prefetch-distance"/>,
+   <xref linkend="guc-recovery-prefetch-fpw"/> or
+   <xref linkend="guc-maintenance-io-concurrency"/> setting is changed and
+   the server configuration is reloaded.
+  </para>
+
   <table id="pg-stat-subscription" xreflabel="pg_stat_subscription">
    <title><structname>pg_stat_subscription</structname> View</title>
    <tgroup cols="3">
@@ -3446,6 +3515,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        counters shown in the <structname>pg_stat_bgwriter</structname> view.
        Calling <literal>pg_stat_reset_shared('archiver')</literal> will zero all the
        counters shown in the <structname>pg_stat_archiver</structname> view.
+       Calling <literal>pg_stat_reset_shared('prefetch_recovery')</literal> will zero all the
+       counters shown in the <structname>pg_stat_prefetch_recovery</structname> view.
       </entry>
      </row>
 
diff --git a/doc/src/sgml/wal.sgml b/doc/src/sgml/wal.sgml
index bd9fae544c..38fc8149a8 100644
--- a/doc/src/sgml/wal.sgml
+++ b/doc/src/sgml/wal.sgml
@@ -719,6 +719,19 @@
    <acronym>WAL</acronym> call being logged to the server log. This
    option might be replaced by a more general mechanism in the future.
   </para>
+
+  <para>
+   The <xref linkend="guc-max-recovery-prefetch-distance"/> parameter can
+   be used to improve I/O performance during recovery by instructing
+   <productname>PostgreSQL</productname> to initiate reads
+   of disk blocks that will soon be needed, in combination with the
+   <xref linkend="guc-maintenance-io-concurrency"/> parameter.  The
+   prefetching mechanism is most likely to be effective on systems
+   with <varname>full_page_writes</varname> set to
+   <varname>off</varname> (where that is safe), and where the working
+   set is larger than RAM.  By default, prefetching in recovery is enabled,
+   but it can be disabled by setting the distance to -1.
+  </para>
  </sect1>
 
  <sect1 id="wal-internals">
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 595e02de72..39f9d4e77d 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -31,6 +31,7 @@ OBJS = \
 	xlogarchive.o \
 	xlogfuncs.o \
 	xloginsert.o \
+	xlogprefetch.o \
 	xlogreader.o \
 	xlogutils.o
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 658af40816..4b7f902462 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -35,6 +35,7 @@
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
 #include "access/xloginsert.h"
+#include "access/xlogprefetch.h"
 #include "access/xlogreader.h"
 #include "access/xlogutils.h"
 #include "catalog/catversion.h"
@@ -7116,6 +7117,7 @@ StartupXLOG(void)
 		{
 			ErrorContextCallback errcallback;
 			TimestampTz xtime;
+			XLogPrefetchState prefetch;
 
 			InRedo = true;
 
@@ -7123,6 +7125,9 @@ StartupXLOG(void)
 					(errmsg("redo starts at %X/%X",
 							(uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
 
+			/* Prepare to prefetch, if configured. */
+			XLogPrefetchBegin(&prefetch);
+
 			/*
 			 * main redo apply loop
 			 */
@@ -7152,6 +7157,10 @@ StartupXLOG(void)
 				/* Handle interrupt signals of startup process */
 				HandleStartupProcInterrupts();
 
+				/* Peform WAL prefetching, if enabled. */
+				XLogPrefetch(&prefetch, xlogreader->ReadRecPtr,
+							 currentSource == XLOG_FROM_STREAM);
+
 				/*
 				 * Pause WAL replay, if requested by a hot-standby session via
 				 * SetRecoveryPause().
@@ -7339,6 +7348,7 @@ StartupXLOG(void)
 			/*
 			 * end of main redo apply loop
 			 */
+			XLogPrefetchEnd(&prefetch);
 
 			if (reachedRecoveryTarget)
 			{
@@ -11970,6 +11980,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 					 */
 					currentSource = XLOG_FROM_STREAM;
 					startWalReceiver = true;
+					XLogPrefetchReconfigure();
 					break;
 
 				case XLOG_FROM_STREAM:
diff --git a/src/backend/access/transam/xlogprefetch.c b/src/backend/access/transam/xlogprefetch.c
new file mode 100644
index 0000000000..c190ffb6bd
--- /dev/null
+++ b/src/backend/access/transam/xlogprefetch.c
@@ -0,0 +1,900 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogprefetch.c
+ *		Prefetching support for recovery.
+ *
+ * Portions Copyright (c) 2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *		src/backend/access/transam/xlogprefetch.c
+ *
+ * The goal of this module is to read future WAL records and issue
+ * PrefetchSharedBuffer() calls for referenced blocks, so that we avoid I/O
+ * stalls in the main recovery loop.  Currently, this is achieved by using a
+ * separate XLogReader to read ahead.  In future, we should find a way to
+ * avoid reading and decoding each record twice.
+ *
+ * When examining a WAL record from the future, we need to consider that a
+ * referenced block or segment file might not exist on disk until this record
+ * or some earlier record has been replayed.  After a crash, a file might also
+ * be missing because it was dropped by a later WAL record; in that case, it
+ * will be recreated when this record is replayed.  These cases are handled by
+ * recognizing them and adding a "filter" that prevents all prefetching of a
+ * certain block range until the present WAL record has been replayed.  Blocks
+ * skipped for these reasons are counted as "skip_new" (that is, cases where we
+ * didn't try to prefetch "new" blocks).
+ *
+ * There is some evidence that it's better to let the operating system detect
+ * sequential access and do its own prefetching.  Explicit prefetching is
+ * therefore skipped for sequential blocks, counted with "skip_seq".
+ *
+ * The only way we currently have to know that an I/O initiated with
+ * PrefetchSharedBuffer() has completed is to call ReadBuffer().  Therefore,
+ * we track the number of potentially in-flight I/Os by using a circular
+ * buffer of LSNs.  When it's full, we have to wait for recovery to replay
+ * records so that the queue depth can be reduced, before we can do any more
+ * prefetching.  Ideally, this keeps us the right distance ahead to respect
+ * maintenance_io_concurrency.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "access/xlogprefetch.h"
+#include "access/xlogreader.h"
+#include "access/xlogutils.h"
+#include "catalog/storage_xlog.h"
+#include "utils/fmgrprotos.h"
+#include "utils/timestamp.h"
+#include "funcapi.h"
+#include "pgstat.h"
+#include "miscadmin.h"
+#include "port/atomics.h"
+#include "storage/bufmgr.h"
+#include "storage/shmem.h"
+#include "storage/smgr.h"
+#include "utils/guc.h"
+#include "utils/hsearch.h"
+
+/*
+ * Sample the queue depth and distance every time we replay this much WAL.
+ * This is used to compute avg_queue_depth and avg_distance for the log
+ * message that appears at the end of crash recovery.  It's also used to send
+ * messages periodically to the stats collector, to save the counters on disk.
+ */
+#define XLOGPREFETCHER_SAMPLE_DISTANCE 0x40000
+
+/* GUCs */
+int			max_recovery_prefetch_distance = -1;
+bool		recovery_prefetch_fpw = false;
+
+int			XLogPrefetchReconfigureCount;
+
+/*
+ * A prefetcher object.  There is at most one of these in existence at a time,
+ * recreated whenever there is a configuration change.
+ */
+struct XLogPrefetcher
+{
+	/* Reader and current reading state. */
+	XLogReaderState *reader;
+	XLogReadLocalOptions options;
+	bool			have_record;
+	bool			shutdown;
+	int				next_block_id;
+
+	/* Details of last prefetch to skip repeats and seq scans. */
+	SMgrRelation	last_reln;
+	RelFileNode		last_rnode;
+	BlockNumber		last_blkno;
+
+	/* Online averages. */
+	uint64			samples;
+	double			avg_queue_depth;
+	double			avg_distance;
+	XLogRecPtr		next_sample_lsn;
+
+	/* Book-keeping required to avoid accessing non-existing blocks. */
+	HTAB		   *filter_table;
+	dlist_head		filter_queue;
+
+	/* Book-keeping required to limit concurrent prefetches. */
+	int				prefetch_head;
+	int				prefetch_tail;
+	int				prefetch_queue_size;
+	XLogRecPtr		prefetch_queue[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/*
+ * A temporary filter used to track block ranges that haven't been created
+ * yet, whole relations that haven't been created yet, and whole relations
+ * that we must assume have already been dropped.
+ */
+typedef struct XLogPrefetcherFilter
+{
+	RelFileNode		rnode;
+	XLogRecPtr		filter_until_replayed;
+	BlockNumber		filter_from_block;
+	dlist_node		link;
+} XLogPrefetcherFilter;
+
+/*
+ * Counters exposed in shared memory for pg_stat_prefetch_recovery.
+ */
+typedef struct XLogPrefetchStats
+{
+	pg_atomic_uint64 reset_time; /* Time of last reset. */
+	pg_atomic_uint64 prefetch;	/* Prefetches initiated. */
+	pg_atomic_uint64 skip_hit;	/* Blocks already buffered. */
+	pg_atomic_uint64 skip_new;	/* New/missing blocks filtered. */
+	pg_atomic_uint64 skip_fpw;	/* FPWs skipped. */
+	pg_atomic_uint64 skip_seq;	/* Sequential/repeat blocks skipped. */
+	float			avg_distance;
+	float			avg_queue_depth;
+
+	/* Reset counters */
+	pg_atomic_uint32 reset_request;
+	uint32			reset_handled;
+
+	/* Dynamic values */
+	int				distance;	/* Number of bytes ahead in the WAL. */
+	int				queue_depth; /* Number of I/Os possibly in progress. */
+} XLogPrefetchStats;
+
+static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
+										   RelFileNode rnode,
+										   BlockNumber blockno,
+										   XLogRecPtr lsn);
+static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
+											RelFileNode rnode,
+											BlockNumber blockno);
+static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
+												 XLogRecPtr replaying_lsn);
+static inline void XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher,
+											 XLogRecPtr prefetching_lsn);
+static inline void XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher,
+											 XLogRecPtr replaying_lsn);
+static inline bool XLogPrefetcherSaturated(XLogPrefetcher *prefetcher);
+static void XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher,
+									  XLogRecPtr replaying_lsn);
+static bool XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher);
+static void XLogPrefetchSaveStats(void);
+static void XLogPrefetchRestoreStats(void);
+
+static XLogPrefetchStats *Stats;
+
+size_t
+XLogPrefetchShmemSize(void)
+{
+	return sizeof(XLogPrefetchStats);
+}
+
+static void
+XLogPrefetchResetStats(void)
+{
+	pg_atomic_write_u64(&Stats->reset_time, GetCurrentTimestamp());
+	pg_atomic_write_u64(&Stats->prefetch, 0);
+	pg_atomic_write_u64(&Stats->skip_hit, 0);
+	pg_atomic_write_u64(&Stats->skip_new, 0);
+	pg_atomic_write_u64(&Stats->skip_fpw, 0);
+	pg_atomic_write_u64(&Stats->skip_seq, 0);
+	Stats->avg_distance = 0;
+	Stats->avg_queue_depth = 0;
+}
+
+void
+XLogPrefetchShmemInit(void)
+{
+	bool		found;
+
+	Stats = (XLogPrefetchStats *)
+		ShmemInitStruct("XLogPrefetchStats",
+						sizeof(XLogPrefetchStats),
+						&found);
+	if (!found)
+	{
+		pg_atomic_init_u32(&Stats->reset_request, 0);
+		Stats->reset_handled = 0;
+		pg_atomic_init_u64(&Stats->reset_time, GetCurrentTimestamp());
+		pg_atomic_init_u64(&Stats->prefetch, 0);
+		pg_atomic_init_u64(&Stats->skip_hit, 0);
+		pg_atomic_init_u64(&Stats->skip_new, 0);
+		pg_atomic_init_u64(&Stats->skip_fpw, 0);
+		pg_atomic_init_u64(&Stats->skip_seq, 0);
+		Stats->avg_distance = 0;
+		Stats->avg_queue_depth = 0;
+		Stats->distance = 0;
+		Stats->queue_depth = 0;
+	}
+}
+
+/*
+ * Called when any GUC is changed that affects prefetching.
+ */
+void
+XLogPrefetchReconfigure(void)
+{
+	XLogPrefetchReconfigureCount++;
+}
+
+/*
+ * Called by any backend to request that the stats be reset.
+ */
+void
+XLogPrefetchRequestResetStats(void)
+{
+	pg_atomic_fetch_add_u32(&Stats->reset_request, 1);
+}
+
+/*
+ * Tell the stats collector to serialize the shared memory counters into the
+ * stats file.
+ */
+static void
+XLogPrefetchSaveStats(void)
+{
+	PgStat_RecoveryPrefetchStats serialized = {
+		.prefetch = pg_atomic_read_u64(&Stats->prefetch),
+		.skip_hit = pg_atomic_read_u64(&Stats->skip_hit),
+		.skip_new = pg_atomic_read_u64(&Stats->skip_new),
+		.skip_fpw = pg_atomic_read_u64(&Stats->skip_fpw),
+		.skip_seq = pg_atomic_read_u64(&Stats->skip_seq),
+		.stat_reset_timestamp = pg_atomic_read_u64(&Stats->reset_time)
+	};
+
+	pgstat_send_recoveryprefetch(&serialized);
+}
+
+/*
+ * Try to restore the shared memory counters from the stats file.
+ */
+static void
+XLogPrefetchRestoreStats(void)
+{
+	PgStat_RecoveryPrefetchStats *serialized = pgstat_fetch_recoveryprefetch();
+
+	if (serialized->stat_reset_timestamp != 0)
+	{
+		pg_atomic_write_u64(&Stats->prefetch, serialized->prefetch);
+		pg_atomic_write_u64(&Stats->skip_hit, serialized->skip_hit);
+		pg_atomic_write_u64(&Stats->skip_new, serialized->skip_new);
+		pg_atomic_write_u64(&Stats->skip_fpw, serialized->skip_fpw);
+		pg_atomic_write_u64(&Stats->skip_seq, serialized->skip_seq);
+		pg_atomic_write_u64(&Stats->reset_time, serialized->stat_reset_timestamp);
+	}
+}
+
+/*
+ * Initialize an XLogPrefetchState object and restore the last saved
+ * statistics from disk.
+ */
+void
+XLogPrefetchBegin(XLogPrefetchState *state)
+{
+	XLogPrefetchRestoreStats();
+
+	/* We'll reconfigure on the first call to XLogPrefetch(). */
+	state->prefetcher = NULL;
+	state->reconfigure_count = XLogPrefetchReconfigureCount - 1;
+}
+
+/*
+ * Shut down the prefetching infrastructure, if configured.
+ */
+void
+XLogPrefetchEnd(XLogPrefetchState *state)
+{
+	XLogPrefetchSaveStats();
+
+	if (state->prefetcher)
+		XLogPrefetcherFree(state->prefetcher);
+	state->prefetcher = NULL;
+
+	Stats->queue_depth = 0;
+	Stats->distance = 0;
+}
+
+/*
+ * Create a prefetcher that is ready to begin prefetching blocks referenced by
+ * WAL that is ahead of the given lsn.
+ */
+XLogPrefetcher *
+XLogPrefetcherAllocate(XLogRecPtr lsn, bool streaming)
+{
+	XLogPrefetcher *prefetcher;
+	static HASHCTL hash_table_ctl = {
+		.keysize = sizeof(RelFileNode),
+		.entrysize = sizeof(XLogPrefetcherFilter)
+	};
+
+	/*
+	 * The size of the queue is based on the maintenance_io_concurrency
+	 * setting.  In theory we might have a separate queue for each tablespace,
+	 * but it's not clear how that should work, so for now we'll just use the
+	 * general GUC to rate-limit all prefetching.  We add one to the size
+	 * because our circular buffer has a gap between head and tail when full.
+	 */
+	prefetcher = palloc0(offsetof(XLogPrefetcher, prefetch_queue) +
+						 sizeof(XLogRecPtr) * (maintenance_io_concurrency + 1));
+	prefetcher->prefetch_queue_size = maintenance_io_concurrency + 1;
+	prefetcher->options.nowait = true;
+	if (streaming)
+	{
+		/*
+		 * We're only allowed to read as far as the WAL receiver has written.
+		 * We don't have to wait for it to be flushed, though, as recovery
+		 * does, so that gives us a chance to get a bit further ahead.
+		 */
+		prefetcher->options.read_upto_policy = XLRO_WALRCV_WRITTEN;
+	}
+	else
+	{
+		/* Read as far as we can. */
+		prefetcher->options.read_upto_policy = XLRO_END;
+	}
+	prefetcher->reader = XLogReaderAllocate(wal_segment_size,
+											NULL,
+											read_local_xlog_page,
+											&prefetcher->options);
+	prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
+										   &hash_table_ctl,
+										   HASH_ELEM | HASH_BLOBS);
+	dlist_init(&prefetcher->filter_queue);
+
+	/* Prepare to read at the given LSN. */
+	ereport(LOG,
+			(errmsg("recovery started prefetching at %X/%X",
+					(uint32) (lsn << 32), (uint32) lsn)));
+	XLogBeginRead(prefetcher->reader, lsn);
+
+	Stats->queue_depth = 0;
+	Stats->distance = 0;
+
+	return prefetcher;
+}
+
+/*
+ * Destroy a prefetcher and release all resources.
+ */
+void
+XLogPrefetcherFree(XLogPrefetcher *prefetcher)
+{
+	/* Log final statistics. */
+	ereport(LOG,
+			(errmsg("recovery finished prefetching at %X/%X; "
+					"prefetch = " UINT64_FORMAT ", "
+					"skip_hit = " UINT64_FORMAT ", "
+					"skip_new = " UINT64_FORMAT ", "
+					"skip_fpw = " UINT64_FORMAT ", "
+					"skip_seq = " UINT64_FORMAT ", "
+					"avg_distance = %f, "
+					"avg_queue_depth = %f",
+			 (uint32) (prefetcher->reader->EndRecPtr << 32),
+			 (uint32) (prefetcher->reader->EndRecPtr),
+			 pg_atomic_read_u64(&Stats->prefetch),
+			 pg_atomic_read_u64(&Stats->skip_hit),
+			 pg_atomic_read_u64(&Stats->skip_new),
+			 pg_atomic_read_u64(&Stats->skip_fpw),
+			 pg_atomic_read_u64(&Stats->skip_seq),
+			 Stats->avg_distance,
+			 Stats->avg_queue_depth)));
+	XLogReaderFree(prefetcher->reader);
+	hash_destroy(prefetcher->filter_table);
+	pfree(prefetcher);
+}
+
+/*
+ * Called when recovery is replaying a new LSN, to check if we can read ahead.
+ */
+void
+XLogPrefetcherReadAhead(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+	uint32		reset_request;
+
+	/* If an error has occurred or we've hit the end of the WAL, do nothing. */
+	if (prefetcher->shutdown)
+		return;
+
+	/*
+	 * Have any in-flight prefetches definitely completed, judging by the LSN
+	 * that is currently being replayed?
+	 */
+	XLogPrefetcherCompletedIO(prefetcher, replaying_lsn);
+
+	/*
+	 * Do we already have the maximum permitted number of I/Os running
+	 * (according to the information we have)?  If so, we have to wait for at
+	 * least one to complete, so give up early and let recovery catch up.
+	 */
+	if (XLogPrefetcherSaturated(prefetcher))
+		return;
+
+	/*
+	 * Can we drop any filters yet?  This happens when the LSN that is
+	 * currently being replayed has moved past a record that prevents
+	 * pretching of a block range, such as relation extension.
+	 */
+	XLogPrefetcherCompleteFilters(prefetcher, replaying_lsn);
+
+	/*
+	 * Have we been asked to reset our stats counters?  This is checked with
+	 * an unsynchronized memory read, but we'll see it eventually and we'll be
+	 * accessing that cache line anyway.
+	 */
+	reset_request = pg_atomic_read_u32(&Stats->reset_request);
+	if (reset_request != Stats->reset_handled)
+	{
+		XLogPrefetchResetStats();
+		Stats->reset_handled = reset_request;
+		prefetcher->avg_distance = 0;
+		prefetcher->avg_queue_depth = 0;
+		prefetcher->samples = 0;
+	}
+
+	/* OK, we can now try reading ahead. */
+	XLogPrefetcherScanRecords(prefetcher, replaying_lsn);
+}
+
+/*
+ * Read ahead as far as we are allowed to, considering the LSN that recovery
+ * is currently replaying.
+ */
+static void
+XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+	XLogReaderState *reader = prefetcher->reader;
+
+	Assert(!XLogPrefetcherSaturated(prefetcher));
+
+	for (;;)
+	{
+		char *error;
+		int64		distance;
+
+		/* If we don't already have a record, then try to read one. */
+		if (!prefetcher->have_record)
+		{
+			if (!XLogReadRecord(reader, &error))
+			{
+				/* If we got an error, log it and give up. */
+				if (error)
+				{
+					ereport(LOG, (errmsg("recovery no longer prefetching: %s", error)));
+					prefetcher->shutdown = true;
+					Stats->queue_depth = 0;
+					Stats->distance = 0;
+				}
+				/* Otherwise, we'll try again later when more data is here. */
+				return;
+			}
+			prefetcher->have_record = true;
+			prefetcher->next_block_id = 0;
+		}
+
+		/* How far ahead of replay are we now? */
+		distance = prefetcher->reader->ReadRecPtr - replaying_lsn;
+
+		/* Update distance shown in shm. */
+		Stats->distance = distance;
+
+		/* Periodically recompute some statistics. */
+		if (unlikely(replaying_lsn >= prefetcher->next_sample_lsn))
+		{
+			/* Compute online averages. */
+			prefetcher->samples++;
+			if (prefetcher->samples == 1)
+			{
+				prefetcher->avg_distance = Stats->distance;
+				prefetcher->avg_queue_depth = Stats->queue_depth;
+			}
+			else
+			{
+				prefetcher->avg_distance +=
+					(Stats->distance - prefetcher->avg_distance) /
+					prefetcher->samples;
+				prefetcher->avg_queue_depth +=
+					(Stats->queue_depth - prefetcher->avg_queue_depth) /
+					prefetcher->samples;
+			}
+
+			/* Expose it in shared memory. */
+			Stats->avg_distance = prefetcher->avg_distance;
+			Stats->avg_queue_depth = prefetcher->avg_queue_depth;
+
+			/* Also periodically save the simple counters. */
+			XLogPrefetchSaveStats();
+
+			prefetcher->next_sample_lsn =
+				replaying_lsn + XLOGPREFETCHER_SAMPLE_DISTANCE;
+		}
+
+		/* Are we too far ahead of replay? */
+		if (distance >= max_recovery_prefetch_distance)
+			break;
+
+		/* Are we not far enough ahead? */
+		if (distance <= 0)
+		{
+			prefetcher->have_record = false;	/* skip this record */
+			continue;
+		}
+
+		/*
+		 * If this is a record that creates a new SMGR relation, we'll avoid
+		 * prefetching anything from that rnode until it has been replayed.
+		 */
+		if (replaying_lsn < reader->ReadRecPtr &&
+			XLogRecGetRmid(reader) == RM_SMGR_ID &&
+			(XLogRecGetInfo(reader) & ~XLR_INFO_MASK) == XLOG_SMGR_CREATE)
+		{
+			xl_smgr_create *xlrec = (xl_smgr_create *) XLogRecGetData(reader);
+
+			XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0,
+									reader->ReadRecPtr);
+		}
+
+		/* Scan the record's block references. */
+		if (!XLogPrefetcherScanBlocks(prefetcher))
+			return;
+
+		/* Advance to the next record. */
+		prefetcher->have_record = false;
+	}
+}
+
+/*
+ * Scan the current record for block references, and consider prefetching.
+ *
+ * Return true if we processed the current record to completion and still have
+ * queue space to process a new record, and false if we saturated the I/O
+ * queue and need to wait for recovery to advance before we continue.
+ */
+static bool
+XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher)
+{
+	XLogReaderState *reader = prefetcher->reader;
+
+	Assert(!XLogPrefetcherSaturated(prefetcher));
+
+	/*
+	 * We might already have been partway through processing this record when
+	 * our queue became saturated, so we need to start where we left off.
+	 */
+	for (int block_id = prefetcher->next_block_id;
+		 block_id <= reader->max_block_id;
+		 ++block_id)
+	{
+		PrefetchBufferResult prefetch;
+		DecodedBkpBlock *block = &reader->blocks[block_id];
+		SMgrRelation reln;
+
+		/* Ignore everything but the main fork for now. */
+		if (block->forknum != MAIN_FORKNUM)
+			continue;
+
+		/*
+		 * If there is a full page image attached, we won't be reading the
+		 * page, so you might think we should skip it.  However, if the
+		 * underlying filesystem uses larger logical blocks than us, it
+		 * might still need to perform a read-before-write some time later.
+		 * Therefore, only prefetch if configured to do so.
+		 */
+		if (block->has_image && !recovery_prefetch_fpw)
+		{
+			pg_atomic_unlocked_add_fetch_u64(&Stats->skip_fpw, 1);
+			continue;
+		}
+
+		/*
+		 * If this block will initialize a new page then it's probably an
+		 * extension.  Since it might create a new segment, we can't try
+		 * to prefetch this block until the record has been replayed, or we
+		 * might try to open a file that doesn't exist yet.
+		 */
+		if (block->flags & BKPBLOCK_WILL_INIT)
+		{
+			XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno,
+									reader->ReadRecPtr);
+			pg_atomic_unlocked_add_fetch_u64(&Stats->skip_new, 1);
+			continue;
+		}
+
+		/* Should we skip this block due to a filter? */
+		if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, block->blkno))
+		{
+			pg_atomic_unlocked_add_fetch_u64(&Stats->skip_new, 1);
+			continue;
+		}
+
+		/* Fast path for repeated references to the same relation. */
+		if (RelFileNodeEquals(block->rnode, prefetcher->last_rnode))
+		{
+			/*
+			 * If this is a repeat or sequential access, then skip it.  We
+			 * expect the kernel to detect sequential access on its own and do
+			 * a better job than we could.
+			 */
+			if (block->blkno == prefetcher->last_blkno ||
+				block->blkno == prefetcher->last_blkno + 1)
+			{
+				prefetcher->last_blkno = block->blkno;
+				pg_atomic_unlocked_add_fetch_u64(&Stats->skip_seq, 1);
+				continue;
+			}
+
+			/* We can avoid calling smgropen(). */
+			reln = prefetcher->last_reln;
+		}
+		else
+		{
+			/* Otherwise we have to open it. */
+			reln = smgropen(block->rnode, InvalidBackendId);
+			prefetcher->last_rnode = block->rnode;
+			prefetcher->last_reln = reln;
+		}
+		prefetcher->last_blkno = block->blkno;
+
+		/* Try to prefetch this block! */
+		prefetch = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
+		if (BufferIsValid(prefetch.recent_buffer))
+		{
+			/*
+			 * It was already cached, so do nothing.  Perhaps in future we
+			 * could remember the buffer so that recovery doesn't have to look
+			 * it up again.
+			 */
+			pg_atomic_unlocked_add_fetch_u64(&Stats->skip_hit, 1);
+		}
+		else if (prefetch.initiated_io)
+		{
+			/*
+			 * I/O has possibly been initiated (though we don't know if it
+			 * was already cached by the kernel, so we just have to assume
+			 * that it has due to lack of better information).  Record
+			 * this as an I/O in progress until eventually we replay this
+			 * LSN.
+			 */
+			pg_atomic_unlocked_add_fetch_u64(&Stats->prefetch, 1);
+			XLogPrefetcherInitiatedIO(prefetcher, reader->ReadRecPtr);
+			/*
+			 * If the queue is now full, we'll have to wait before processing
+			 * any more blocks from this record, or move to a new record if
+			 * that was the last block.
+			 */
+			if (XLogPrefetcherSaturated(prefetcher))
+			{
+				prefetcher->next_block_id = block_id + 1;
+				return false;
+			}
+		}
+		else
+		{
+			/*
+			 * Neither cached nor initiated.  The underlying segment file
+			 * doesn't exist.  Presumably it will be unlinked by a later WAL
+			 * record.  When recovery reads this block, it will use the
+			 * EXTENSION_CREATE_RECOVERY flag.  We certainly don't want to do
+			 * that sort of thing while merely prefetching, so let's just
+			 * ignore references to this relation until this record is
+			 * replayed, and let recovery create the dummy file or complain if
+			 * something is wrong.
+			 */
+			XLogPrefetcherAddFilter(prefetcher, block->rnode, 0,
+									reader->ReadRecPtr);
+			pg_atomic_unlocked_add_fetch_u64(&Stats->skip_new, 1);
+		}
+	}
+
+	return true;
+}
+
+/*
+ * Expose statistics about WAL prefetching.
+ */
+Datum
+pg_stat_get_prefetch_recovery(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_PREFETCH_RECOVERY_COLS 10
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	TupleDesc	tupdesc;
+	Tuplestorestate *tupstore;
+	MemoryContext per_query_ctx;
+	MemoryContext oldcontext;
+	Datum		values[PG_STAT_GET_PREFETCH_RECOVERY_COLS];
+	bool		nulls[PG_STAT_GET_PREFETCH_RECOVERY_COLS];
+
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mod required, but it is not allowed in this context")));
+
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	if (pg_atomic_read_u32(&Stats->reset_request) != Stats->reset_handled)
+	{
+		/* There's an unhandled reset request, so just show NULLs */
+		for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i)
+			nulls[i] = true;
+	}
+	else
+	{
+		for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i)
+			nulls[i] = false;
+	}
+
+	values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&Stats->reset_time));
+	values[1] = Int64GetDatum(pg_atomic_read_u64(&Stats->prefetch));
+	values[2] = Int64GetDatum(pg_atomic_read_u64(&Stats->skip_hit));
+	values[3] = Int64GetDatum(pg_atomic_read_u64(&Stats->skip_new));
+	values[4] = Int64GetDatum(pg_atomic_read_u64(&Stats->skip_fpw));
+	values[5] = Int64GetDatum(pg_atomic_read_u64(&Stats->skip_seq));
+	values[6] = Int32GetDatum(Stats->distance);
+	values[7] = Int32GetDatum(Stats->queue_depth);
+	values[8] = Float4GetDatum(Stats->avg_distance);
+	values[9] = Float4GetDatum(Stats->avg_queue_depth);
+	tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+	tuplestore_donestoring(tupstore);
+
+	return (Datum) 0;
+}
+
+/*
+ * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn'
+ * has been replayed.
+ */
+static inline void
+XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode,
+						BlockNumber blockno, XLogRecPtr lsn)
+{
+	XLogPrefetcherFilter *filter;
+	bool		found;
+
+	filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found);
+	if (!found)
+	{
+		/*
+		 * Don't allow any prefetching of this block or higher until replayed.
+		 */
+		filter->filter_until_replayed = lsn;
+		filter->filter_from_block = blockno;
+		dlist_push_head(&prefetcher->filter_queue, &filter->link);
+	}
+	else
+	{
+		/*
+		 * We were already filtering this rnode.  Extend the filter's lifetime
+		 * to cover this WAL record, but leave the (presumably lower) block
+		 * number there because we don't want to have to track individual
+		 * blocks.
+		 */
+		filter->filter_until_replayed = lsn;
+		dlist_delete(&filter->link);
+		dlist_push_head(&prefetcher->filter_queue, &filter->link);
+	}
+}
+
+/*
+ * Have we replayed the records that caused us to begin filtering a block
+ * range?  That means that relations should have been created, extended or
+ * dropped as required, so we can drop relevant filters.
+ */
+static inline void
+XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+	while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
+	{
+		XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
+														  link,
+														  &prefetcher->filter_queue);
+
+		if (filter->filter_until_replayed >= replaying_lsn)
+			break;
+		dlist_delete(&filter->link);
+		hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
+	}
+}
+
+/*
+ * Check if a given block should be skipped due to a filter.
+ */
+static inline bool
+XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode,
+						 BlockNumber blockno)
+{
+	/*
+	 * Test for empty queue first, because we expect it to be empty most of the
+	 * time and we can avoid the hash table lookup in that case.
+	 */
+	if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
+	{
+		XLogPrefetcherFilter *filter = hash_search(prefetcher->filter_table, &rnode,
+												   HASH_FIND, NULL);
+
+		if (filter && filter->filter_from_block <= blockno)
+			return true;
+	}
+
+	return false;
+}
+
+/*
+ * Insert an LSN into the queue.  The queue must not be full already.  This
+ * tracks the fact that we have (to the best of our knowledge) initiated an
+ * I/O, so that we can impose a cap on concurrent prefetching.
+ */
+static inline void
+XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher,
+						  XLogRecPtr prefetching_lsn)
+{
+	Assert(!XLogPrefetcherSaturated(prefetcher));
+	prefetcher->prefetch_queue[prefetcher->prefetch_head++] = prefetching_lsn;
+	prefetcher->prefetch_head %= prefetcher->prefetch_queue_size;
+	Stats->queue_depth++;
+	Assert(Stats->queue_depth <= prefetcher->prefetch_queue_size);
+}
+
+/*
+ * Have we replayed the records that caused us to initiate the oldest
+ * prefetches yet?  That means that they're definitely finished, so we can can
+ * forget about them and allow ourselves to initiate more prefetches.  For now
+ * we don't have any awareness of when I/O really completes.
+ */
+static inline void
+XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+	while (prefetcher->prefetch_head != prefetcher->prefetch_tail &&
+		   prefetcher->prefetch_queue[prefetcher->prefetch_tail] < replaying_lsn)
+	{
+		prefetcher->prefetch_tail++;
+		prefetcher->prefetch_tail %= prefetcher->prefetch_queue_size;
+		Stats->queue_depth--;
+		Assert(Stats->queue_depth >= 0);
+	}
+}
+
+/*
+ * Check if the maximum allowed number of I/Os is already in flight.
+ */
+static inline bool
+XLogPrefetcherSaturated(XLogPrefetcher *prefetcher)
+{
+	return (prefetcher->prefetch_head + 1) % prefetcher->prefetch_queue_size ==
+		prefetcher->prefetch_tail;
+}
+
+void
+assign_max_recovery_prefetch_distance(int new_value, void *extra)
+{
+	/* Reconfigure prefetching, because a setting it depends on changed. */
+	max_recovery_prefetch_distance = new_value;
+	if (AmStartupProcess())
+		XLogPrefetchReconfigure();
+}
+
+void
+assign_recovery_prefetch_fpw(bool new_value, void *extra)
+{
+	/* Reconfigure prefetching, because a setting it depends on changed. */
+	recovery_prefetch_fpw = new_value;
+	if (AmStartupProcess())
+		XLogPrefetchReconfigure();
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 813ea8bfc3..3d5afb633e 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -825,6 +825,20 @@ CREATE VIEW pg_stat_wal_receiver AS
     FROM pg_stat_get_wal_receiver() s
     WHERE s.pid IS NOT NULL;
 
+CREATE VIEW pg_stat_prefetch_recovery AS
+    SELECT
+            s.stats_reset,
+            s.prefetch,
+            s.skip_hit,
+            s.skip_new,
+            s.skip_fpw,
+            s.skip_seq,
+            s.distance,
+            s.queue_depth,
+	    s.avg_distance,
+	    s.avg_queue_depth
+     FROM pg_stat_get_prefetch_recovery() s;
+
 CREATE VIEW pg_stat_subscription AS
     SELECT
             su.oid AS subid,
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 9ebde47dea..c0f7333808 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -38,6 +38,7 @@
 #include "access/transam.h"
 #include "access/twophase_rmgr.h"
 #include "access/xact.h"
+#include "access/xlogprefetch.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_proc.h"
 #include "common/ip.h"
@@ -276,6 +277,7 @@ static int	localNumBackends = 0;
 static PgStat_ArchiverStats archiverStats;
 static PgStat_GlobalStats globalStats;
 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
+static PgStat_RecoveryPrefetchStats recoveryPrefetchStats;
 
 /*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
@@ -348,6 +350,7 @@ static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
 static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len);
 static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
 static void pgstat_recv_slru(PgStat_MsgSLRU *msg, int len);
+static void pgstat_recv_recoveryprefetch(PgStat_MsgRecoveryPrefetch *msg, int len);
 static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len);
 static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
 static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len);
@@ -1364,11 +1367,20 @@ pgstat_reset_shared_counters(const char *target)
 		msg.m_resettarget = RESET_ARCHIVER;
 	else if (strcmp(target, "bgwriter") == 0)
 		msg.m_resettarget = RESET_BGWRITER;
+	else if (strcmp(target, "prefetch_recovery") == 0)
+	{
+		/*
+		 * We can't ask the stats collector to do this for us as it is not
+		 * attached to shared memory.
+		 */
+		XLogPrefetchRequestResetStats();
+		return;
+	}
 	else
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 				 errmsg("unrecognized reset target: \"%s\"", target),
-				 errhint("Target must be \"archiver\" or \"bgwriter\".")));
+				 errhint("Target must be \"archiver\", \"bgwriter\" or \"prefetch_recovery\".")));
 
 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER);
 	pgstat_send(&msg, sizeof(msg));
@@ -2690,6 +2702,22 @@ pgstat_fetch_slru(void)
 }
 
 
+/*
+ * ---------
+ * pgstat_fetch_recoveryprefetch() -
+ *
+ *	Support function for restoring the counters managed by xlogprefetch.c.
+ * ---------
+ */
+PgStat_RecoveryPrefetchStats *
+pgstat_fetch_recoveryprefetch(void)
+{
+	backend_read_statsfile();
+
+	return &recoveryPrefetchStats;
+}
+
+
 /* ------------------------------------------------------------
  * Functions for management of the shared-memory PgBackendStatus array
  * ------------------------------------------------------------
@@ -4440,6 +4468,23 @@ pgstat_send_slru(void)
 }
 
 
+/* ----------
+ * pgstat_send_recoveryprefetch() -
+ *
+ *		Send recovery prefetch statistics to the collector
+ * ----------
+ */
+void
+pgstat_send_recoveryprefetch(PgStat_RecoveryPrefetchStats *stats)
+{
+	PgStat_MsgRecoveryPrefetch msg;
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYPREFETCH);
+	msg.m_stats = *stats;
+	pgstat_send(&msg, sizeof(msg));
+}
+
+
 /* ----------
  * PgstatCollectorMain() -
  *
@@ -4636,6 +4681,10 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_slru(&msg.msg_slru, len);
 					break;
 
+				case PGSTAT_MTYPE_RECOVERYPREFETCH:
+					pgstat_recv_recoveryprefetch(&msg.msg_recoveryprefetch, len);
+					break;
+
 				case PGSTAT_MTYPE_FUNCSTAT:
 					pgstat_recv_funcstat(&msg.msg_funcstat, len);
 					break;
@@ -4911,6 +4960,13 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 	rc = fwrite(slruStats, sizeof(slruStats), 1, fpout);
 	(void) rc;					/* we'll check for error with ferror */
 
+	/*
+	 * Write recovery prefetch stats struct
+	 */
+	rc = fwrite(&recoveryPrefetchStats, sizeof(recoveryPrefetchStats), 1,
+				fpout);
+	(void) rc;					/* we'll check for error with ferror */
+
 	/*
 	 * Walk through the database table.
 	 */
@@ -5170,6 +5226,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 	memset(&globalStats, 0, sizeof(globalStats));
 	memset(&archiverStats, 0, sizeof(archiverStats));
 	memset(&slruStats, 0, sizeof(slruStats));
+	memset(&recoveryPrefetchStats, 0, sizeof(recoveryPrefetchStats));
 
 	/*
 	 * Set the current timestamp (will be kept only in case we can't load an
@@ -5257,6 +5314,18 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 		goto done;
 	}
 
+	/*
+	 * Read recoveryPrefetchStats struct
+	 */
+	if (fread(&recoveryPrefetchStats, 1, sizeof(recoveryPrefetchStats),
+			  fpin) != sizeof(recoveryPrefetchStats))
+	{
+		ereport(pgStatRunningInCollector ? LOG : WARNING,
+				(errmsg("corrupted statistics file \"%s\"", statfile)));
+		memset(&recoveryPrefetchStats, 0, sizeof(recoveryPrefetchStats));
+		goto done;
+	}
+
 	/*
 	 * We found an existing collector stats file. Read it and put all the
 	 * hashtable entries into place.
@@ -5556,6 +5625,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 	PgStat_GlobalStats myGlobalStats;
 	PgStat_ArchiverStats myArchiverStats;
 	PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
+	PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats;
 	FILE	   *fpin;
 	int32		format_id;
 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
@@ -5621,6 +5691,18 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 		return false;
 	}
 
+	/*
+	 * Read recovery prefetch stats struct
+	 */
+	if (fread(&myRecoveryPrefetchStats, 1, sizeof(myRecoveryPrefetchStats),
+			  fpin) != sizeof(myRecoveryPrefetchStats))
+	{
+		ereport(pgStatRunningInCollector ? LOG : WARNING,
+				(errmsg("corrupted statistics file \"%s\"", statfile)));
+		FreeFile(fpin);
+		return false;
+	}
+
 	/* By default, we're going to return the timestamp of the global file. */
 	*ts = myGlobalStats.stats_timestamp;
 
@@ -6422,6 +6504,18 @@ pgstat_recv_slru(PgStat_MsgSLRU *msg, int len)
 	slruStats[msg->m_index].truncate += msg->m_truncate;
 }
 
+/* ----------
+ * pgstat_recv_recoveryprefetch() -
+ *
+ *	Process a recovery prefetch message.
+ * ----------
+ */
+static void
+pgstat_recv_recoveryprefetch(PgStat_MsgRecoveryPrefetch *msg, int len)
+{
+	recoveryPrefetchStats = msg->m_stats;
+}
+
 /* ----------
  * pgstat_recv_recoveryconflict() -
  *
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5adf253583..792d90ef4c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options,
 
 	ctx->slot = slot;
 
-	ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx);
+	ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, NULL);
 	if (!ctx->reader)
 		ereport(ERROR,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 427b0d59cd..221081bddc 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -21,6 +21,7 @@
 #include "access/nbtree.h"
 #include "access/subtrans.h"
 #include "access/twophase.h"
+#include "access/xlogprefetch.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -124,6 +125,7 @@ CreateSharedMemoryAndSemaphores(void)
 		size = add_size(size, PredicateLockShmemSize());
 		size = add_size(size, ProcGlobalShmemSize());
 		size = add_size(size, XLOGShmemSize());
+		size = add_size(size, XLogPrefetchShmemSize());
 		size = add_size(size, CLOGShmemSize());
 		size = add_size(size, CommitTsShmemSize());
 		size = add_size(size, SUBTRANSShmemSize());
@@ -212,6 +214,7 @@ CreateSharedMemoryAndSemaphores(void)
 	 * Set up xlog, clog, and buffers
 	 */
 	XLOGShmemInit();
+	XLogPrefetchShmemInit();
 	CLOGShmemInit();
 	CommitTsShmemInit();
 	SUBTRANSShmemInit();
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 03a22d71ac..6fc9ceb196 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -34,6 +34,7 @@
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/xlogprefetch.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_authid.h"
 #include "catalog/storage.h"
@@ -198,6 +199,7 @@ static bool check_max_wal_senders(int *newval, void **extra, GucSource source);
 static bool check_autovacuum_work_mem(int *newval, void **extra, GucSource source);
 static bool check_effective_io_concurrency(int *newval, void **extra, GucSource source);
 static bool check_maintenance_io_concurrency(int *newval, void **extra, GucSource source);
+static void assign_maintenance_io_concurrency(int newval, void *extra);
 static void assign_pgstat_temp_directory(const char *newval, void *extra);
 static bool check_application_name(char **newval, void **extra, GucSource source);
 static void assign_application_name(const char *newval, void *extra);
@@ -1272,6 +1274,18 @@ static struct config_bool ConfigureNamesBool[] =
 		true,
 		NULL, NULL, NULL
 	},
+	{
+		{"recovery_prefetch_fpw", PGC_SIGHUP, WAL_SETTINGS,
+			gettext_noop("Prefetch blocks that have full page images in the WAL"),
+			gettext_noop("On some systems, there is no benefit to prefetching pages that will be "
+						 "entirely overwritten, but if the logical page size of the filesystem is "
+						 "larger than PostgreSQL's, this can be beneficial.  This option has no "
+						 "effect unless max_recovery_prefetch_distance is set to a positive number.")
+		},
+		&recovery_prefetch_fpw,
+		false,
+		NULL, assign_recovery_prefetch_fpw, NULL
+	},
 
 	{
 		{"wal_log_hints", PGC_POSTMASTER, WAL_SETTINGS,
@@ -2649,6 +2663,22 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_recovery_prefetch_distance", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY,
+			gettext_noop("Maximum number of bytes to read ahead in the WAL to prefetch referenced blocks."),
+			gettext_noop("Set to -1 to disable prefetching during recovery."),
+			GUC_UNIT_BYTE
+		},
+		&max_recovery_prefetch_distance,
+#ifdef USE_PREFETCH
+		256 * 1024,
+#else
+		-1,
+#endif
+		-1, INT_MAX,
+		NULL, assign_max_recovery_prefetch_distance, NULL
+	},
+
 	{
 		{"wal_keep_segments", PGC_SIGHUP, REPLICATION_SENDING,
 			gettext_noop("Sets the number of WAL files held for standby servers."),
@@ -2955,7 +2985,8 @@ static struct config_int ConfigureNamesInt[] =
 		0,
 #endif
 		0, MAX_IO_CONCURRENCY,
-		check_maintenance_io_concurrency, NULL, NULL
+		check_maintenance_io_concurrency, assign_maintenance_io_concurrency,
+		NULL
 	},
 
 	{
@@ -11573,6 +11604,18 @@ check_maintenance_io_concurrency(int *newval, void **extra, GucSource source)
 	return true;
 }
 
+static void
+assign_maintenance_io_concurrency(int newval, void *extra)
+{
+#ifdef USE_PREFETCH
+	/* Reconfigure WAL prefetching, because a setting it depends on
+	 * changed. */
+	maintenance_io_concurrency = newval;
+	if (AmStartupProcess())
+		XLogPrefetchReconfigure();
+#endif
+}
+
 static void
 assign_pgstat_temp_directory(const char *newval, void *extra)
 {
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 1ae8b77306..fd7406b399 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -230,6 +230,11 @@
 #checkpoint_flush_after = 0		# measured in pages, 0 disables
 #checkpoint_warning = 30s		# 0 disables
 
+# - Prefetching during recovery -
+
+#max_recovery_prefetch_distance = 256kB	# -1 disables prefetching
+#recovery_prefetch_fpw = off	# whether to prefetch pages logged with FPW
+
 # - Archiving -
 
 #archive_mode = off		# enables archiving; off, on, or always
diff --git a/src/include/access/xlogprefetch.h b/src/include/access/xlogprefetch.h
new file mode 100644
index 0000000000..afd807c408
--- /dev/null
+++ b/src/include/access/xlogprefetch.h
@@ -0,0 +1,81 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogprefetch.h
+ *		Declarations for the recovery prefetching module.
+ *
+ * Portions Copyright (c) 2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *		src/include/access/xlogprefetch.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef XLOGPREFETCH_H
+#define XLOGPREFETCH_H
+
+#include "access/xlogdefs.h"
+
+/* GUCs */
+extern int	max_recovery_prefetch_distance;
+extern bool recovery_prefetch_fpw;
+
+struct XLogPrefetcher;
+typedef struct XLogPrefetcher XLogPrefetcher;
+
+extern int XLogPrefetchReconfigureCount;
+
+typedef struct XLogPrefetchState
+{
+	XLogPrefetcher *prefetcher;
+	int			reconfigure_count;
+} XLogPrefetchState;
+
+/* Functions exposed only for use by the static inline wrappers below. */
+extern XLogPrefetcher *XLogPrefetcherAllocate(XLogRecPtr lsn, bool streaming);
+extern void XLogPrefetcherFree(XLogPrefetcher *prefetcher);
+extern void XLogPrefetcherReadAhead(XLogPrefetcher *prefetch,
+									XLogRecPtr replaying_lsn);
+
+extern size_t XLogPrefetchShmemSize(void);
+extern void XLogPrefetchShmemInit(void);
+
+extern void XLogPrefetchReconfigure(void);
+extern void XLogPrefetchRequestResetStats(void);
+
+extern void XLogPrefetchBegin(XLogPrefetchState *state);
+extern void XLogPrefetchEnd(XLogPrefetchState *state);
+
+/*
+ * Tell the prefetching module that we are now replaying a given LSN, so that
+ * it can decide how far ahead to read in the WAL, if configured.
+ */
+static inline void
+XLogPrefetch(XLogPrefetchState *state,
+			 XLogRecPtr replaying_lsn,
+			 bool from_stream)
+{
+	/*
+	 * Handle any configuration changes.  Rather than trying to deal with
+	 * various parameter changes, we just tear down and set up a new
+	 * prefetcher if anything we depend on changes.
+	 */
+	if (unlikely(state->reconfigure_count != XLogPrefetchReconfigureCount))
+	{
+		/* If we had a prefetcher, tear it down. */
+		if (state->prefetcher)
+		{
+			XLogPrefetcherFree(state->prefetcher);
+			state->prefetcher = NULL;
+		}
+		/* If we want a prefetcher, set it up. */
+		if (max_recovery_prefetch_distance > 0)
+			state->prefetcher = XLogPrefetcherAllocate(replaying_lsn,
+													   from_stream);
+		state->reconfigure_count = XLogPrefetchReconfigureCount;
+	}
+
+	if (state->prefetcher)
+		XLogPrefetcherReadAhead(state->prefetcher, replaying_lsn);
+}
+
+#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 2d1862a9d8..a0dabe2d18 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6138,6 +6138,14 @@
   prorettype => 'bool', proargtypes => '',
   prosrc => 'pg_is_wal_replay_paused' },
 
+{ oid => '9085', descr => 'statistics: information about WAL prefetching',
+  proname => 'pg_stat_get_prefetch_recovery', prorows => '1', provolatile => 'v',
+  proretset => 't', prorettype => 'record', proargtypes => '',
+  proallargtypes => '{timestamptz,int8,int8,int8,int8,int8,int4,int4,float4,float4}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{stats_reset,prefetch,skip_hit,skip_new,skip_fpw,skip_seq,distance,queue_depth,avg_distance,avg_queue_depth}',
+  prosrc => 'pg_stat_get_prefetch_recovery' },
+
 { oid => '2621', descr => 'reload configuration files',
   proname => 'pg_reload_conf', provolatile => 'v', prorettype => 'bool',
   proargtypes => '', prosrc => 'pg_reload_conf' },
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index b8041d9988..701eeaeb01 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -63,6 +63,7 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_ARCHIVER,
 	PGSTAT_MTYPE_BGWRITER,
 	PGSTAT_MTYPE_SLRU,
+	PGSTAT_MTYPE_RECOVERYPREFETCH,
 	PGSTAT_MTYPE_FUNCSTAT,
 	PGSTAT_MTYPE_FUNCPURGE,
 	PGSTAT_MTYPE_RECOVERYCONFLICT,
@@ -183,6 +184,19 @@ typedef struct PgStat_TableXactStatus
 	struct PgStat_TableXactStatus *next;	/* next of same subxact */
 } PgStat_TableXactStatus;
 
+/*
+ * Recovery prefetching statistics persisted on disk by pgstat.c, but kept in
+ * shared memory by xlogprefetch.c.
+ */
+typedef struct PgStat_RecoveryPrefetchStats
+{
+	PgStat_Counter prefetch;
+	PgStat_Counter skip_hit;
+	PgStat_Counter skip_new;
+	PgStat_Counter skip_fpw;
+	PgStat_Counter skip_seq;
+	TimestampTz stat_reset_timestamp;
+} PgStat_RecoveryPrefetchStats;
 
 /* ------------------------------------------------------------
  * Message formats follow
@@ -454,6 +468,16 @@ typedef struct PgStat_MsgSLRU
 	PgStat_Counter m_truncate;
 } PgStat_MsgSLRU;
 
+/* ----------
+ * PgStat_MsgRecoveryPrefetch			Sent by XLogPrefetch to save statistics.
+ * ----------
+ */
+typedef struct PgStat_MsgRecoveryPrefetch
+{
+	PgStat_MsgHdr m_hdr;
+	PgStat_RecoveryPrefetchStats m_stats;
+} PgStat_MsgRecoveryPrefetch;
+
 /* ----------
  * PgStat_MsgRecoveryConflict	Sent by the backend upon recovery conflict
  * ----------
@@ -598,6 +622,7 @@ typedef union PgStat_Msg
 	PgStat_MsgArchiver msg_archiver;
 	PgStat_MsgBgWriter msg_bgwriter;
 	PgStat_MsgSLRU msg_slru;
+	PgStat_MsgRecoveryPrefetch msg_recoveryprefetch;
 	PgStat_MsgFuncstat msg_funcstat;
 	PgStat_MsgFuncpurge msg_funcpurge;
 	PgStat_MsgRecoveryConflict msg_recoveryconflict;
@@ -761,7 +786,6 @@ typedef struct PgStat_SLRUStats
 	TimestampTz stat_reset_timestamp;
 } PgStat_SLRUStats;
 
-
 /* ----------
  * Backend states
  * ----------
@@ -1464,6 +1488,7 @@ extern void pgstat_twophase_postabort(TransactionId xid, uint16 info,
 
 extern void pgstat_send_archiver(const char *xlog, bool failed);
 extern void pgstat_send_bgwriter(void);
+extern void pgstat_send_recoveryprefetch(PgStat_RecoveryPrefetchStats *stats);
 
 /* ----------
  * Support functions for the SQL-callable functions to
@@ -1479,6 +1504,7 @@ extern int	pgstat_fetch_stat_numbackends(void);
 extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
 extern PgStat_GlobalStats *pgstat_fetch_global(void);
 extern PgStat_SLRUStats *pgstat_fetch_slru(void);
+extern PgStat_RecoveryPrefetchStats *pgstat_fetch_recoveryprefetch(void);
 
 extern void pgstat_count_slru_page_zeroed(SlruCtl ctl);
 extern void pgstat_count_slru_page_hit(SlruCtl ctl);
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 2819282181..976cf8b116 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -440,4 +440,8 @@ extern void assign_search_path(const char *newval, void *extra);
 extern bool check_wal_buffers(int *newval, void **extra, GucSource source);
 extern void assign_xlog_sync_method(int new_sync_method, void *extra);
 
+/* in access/transam/xlogprefetch.c */
+extern void assign_max_recovery_prefetch_distance(int new_value, void *extra);
+extern void assign_recovery_prefetch_fpw(bool new_value, void *extra);
+
 #endif							/* GUC_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 6eec8ec568..9eda632b3c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1855,6 +1855,17 @@ pg_stat_gssapi| SELECT s.pid,
     s.gss_enc AS encrypted
    FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
   WHERE (s.client_port IS NOT NULL);
+pg_stat_prefetch_recovery| SELECT s.stats_reset,
+    s.prefetch,
+    s.skip_hit,
+    s.skip_new,
+    s.skip_fpw,
+    s.skip_seq,
+    s.distance,
+    s.queue_depth,
+    s.avg_distance,
+    s.avg_queue_depth
+   FROM pg_stat_get_prefetch_recovery() s(stats_reset, prefetch, skip_hit, skip_new, skip_fpw, skip_seq, distance, queue_depth, avg_distance, avg_queue_depth);
 pg_stat_progress_analyze| SELECT s.pid,
     s.datid,
     d.datname,
-- 
2.20.1

