From 477ac4a1f280faf189da52e635cea15367a262a8 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 2 Mar 2020 15:33:51 +1300
Subject: [PATCH 5/5] Prefetch referenced blocks during recovery.

Introduce a new GUC max_wal_prefetch_distance.  If it is set to a
positive number of bytes, then read ahead in the WAL at most that
distance, and initiate asynchronous reading of referenced blocks.  The
goal is to avoid I/O stalls and benefit from concurrent I/O.  The number
of concurrency asynchronous reads is capped by the existing
maintenance_io_concurrency GUC.  The feature is disabled by default.

Reviewed-by: Tomas Vondra <tomas.vondra@2ndquadrant.com>
Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com>
Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com
---
 doc/src/sgml/config.sgml                    |  38 ++
 doc/src/sgml/monitoring.sgml                |  69 +++
 doc/src/sgml/wal.sgml                       |  12 +
 src/backend/access/transam/Makefile         |   1 +
 src/backend/access/transam/xlog.c           |  64 ++
 src/backend/access/transam/xlogprefetcher.c | 654 ++++++++++++++++++++
 src/backend/access/transam/xlogutils.c      |  23 +-
 src/backend/catalog/system_views.sql        |  11 +
 src/backend/replication/logical/logical.c   |   2 +-
 src/backend/storage/ipc/ipci.c              |   3 +
 src/backend/utils/misc/guc.c                |  38 +-
 src/include/access/xlog.h                   |   4 +
 src/include/access/xlogprefetcher.h         |  28 +
 src/include/access/xlogutils.h              |  20 +
 src/include/catalog/pg_proc.dat             |   8 +
 src/include/storage/bufmgr.h                |   5 +
 src/include/utils/guc.h                     |   2 +
 src/test/regress/expected/rules.out         |   8 +
 18 files changed, 987 insertions(+), 3 deletions(-)
 create mode 100644 src/backend/access/transam/xlogprefetcher.c
 create mode 100644 src/include/access/xlogprefetcher.h

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 672bf6f1ee..8249ec0139 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3102,6 +3102,44 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-max-wal-prefetch-distance" xreflabel="max_wal_prefetch_distance">
+      <term><varname>max_wal_prefetch_distance</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>max_wal_prefetch_distance</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        The maximum distance to look ahead in the WAL during recovery, to find
+        blocks to prefetch.  Prefetching blocks that will soon be needed can
+        reduce I/O wait times.  The number of concurrent prefetches is limited
+        by this setting as well as <xref linkend="guc-maintenance-io-concurrency"/>.
+        If this value is specified without units, it is taken as bytes.
+        The default is -1, meaning that WAL prefetching is disabled.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry id="guc-wal-prefetch-fpw" xreflabel="wal_prefetch_fpw">
+      <term><varname>wal_prefetch_fpw</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>wal_prefetch_fpw</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Whether to prefetch blocks with full page images during recovery.
+        Usually this doesn't help, since such blocks will not be read.  However,
+        on file systems with a block size larger than
+        <productname>PostgreSQL</productname>'s, prefetching can avoid a costly
+        read-before-write when a blocks are later written.
+        This setting has no effect unless
+        <xref linkend="guc-max-wal-prefetch-distance"/> is set to a positive number.
+        The default is off.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
      </sect2>
      <sect2 id="runtime-config-wal-archiving">
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 987580d6df..df4291092b 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -320,6 +320,13 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       </entry>
      </row>
 
+     <row>
+      <entry><structname>pg_stat_wal_prefetcher</structname><indexterm><primary>pg_stat_wal_prefetcher</primary></indexterm></entry>
+      <entry>Only one row, showing statistics about blocks prefetched during recovery.
+       See <xref linkend="pg-stat-wal-prefetcher-view"/> for details.
+      </entry>
+     </row>
+
      <row>
       <entry><structname>pg_stat_subscription</structname><indexterm><primary>pg_stat_subscription</primary></indexterm></entry>
       <entry>At least one row per subscription, showing information about
@@ -2192,6 +2199,68 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
    connected server.
   </para>
 
+  <table id="pg-stat-wal-prefetcher-view" xreflabel="pg_stat_wal_prefetcher">
+   <title><structname>pg_stat_wal_prefetcher</structname> View</title>
+   <tgroup cols="3">
+    <thead>
+    <row>
+      <entry>Column</entry>
+      <entry>Type</entry>
+      <entry>Description</entry>
+     </row>
+    </thead>
+
+   <tbody>
+    <row>
+     <entry><structfield>prefetch</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of blocks prefetched because they were not in the buffer pool</entry>
+    </row>
+    <row>
+     <entry><structfield>skip_hit</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of blocks not prefetched because they were already in the buffer pool</entry>
+    </row>
+    <row>
+     <entry><structfield>skip_new</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of blocks not prefetched because they were new (usually relation extension)</entry>
+    </row>
+    <row>
+     <entry><structfield>skip_fpw</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of blocks not prefetched because a full page image was included in the WAL and <xref linkend="guc-wal-prefetch-fpw"/> was set to <literal>off</literal></entry>
+    </row>
+    <row>
+     <entry><structfield>skip_seq</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of blocks not prefetched because of repeated or sequential access</entry>
+    </row>
+    <row>
+     <entry><structfield>distance</structfield></entry>
+     <entry><type>integer</type></entry>
+     <entry>How far ahead of recovery the WAL prefetcher is currently reading, in bytes</entry>
+    </row>
+    <row>
+     <entry><structfield>queue_depth</structfield></entry>
+     <entry><type>integer</type></entry>
+     <entry>How many prefetches have been initiated but are not yet known to have completed</entry>
+    </row>
+   </tbody>
+   </tgroup>
+  </table>
+
+  <para>
+   The <structname>pg_stat_wal_prefetcher</structname> view will contain only
+   one row.  It is filled with nulls if recovery is not running or WAL
+   prefetching is not enabled.  See <xref linkend="guc-max-wal-prefetch-distance"/>
+   for more information.  The counters in this view are reset whenever the
+   <xref linkend="guc-max-wal-prefetch-distance"/>,
+   <xref linkend="guc-wal-prefetch-fpw"/> or
+   <xref linkend="guc-maintenance-io-concurrency"/> setting is changed and
+   the server configuration is reloaded.
+  </para>
+
   <table id="pg-stat-subscription" xreflabel="pg_stat_subscription">
    <title><structname>pg_stat_subscription</structname> View</title>
    <tgroup cols="3">
diff --git a/doc/src/sgml/wal.sgml b/doc/src/sgml/wal.sgml
index bd9fae544c..9e956ad2a1 100644
--- a/doc/src/sgml/wal.sgml
+++ b/doc/src/sgml/wal.sgml
@@ -719,6 +719,18 @@
    <acronym>WAL</acronym> call being logged to the server log. This
    option might be replaced by a more general mechanism in the future.
   </para>
+
+  <para>
+   The <xref linkend="guc-max-wal-prefetch-distance"/> parameter can be
+   used to improve I/O performance during recovery by instructing
+   <productname>PostgreSQL</productname> to initiate reads
+   of disk blocks that will soon be needed, in combination with the
+   <xref linkend="guc-maintenance-io-concurrency"/> parameter.  The
+   prefetching mechanism is most likely to be effective on systems
+   with <varname>full_page_writes</varname> set to
+   <varname>off</varname> (where that is safe), and where the working
+   set is larger than RAM.  By default, WAL prefetching is disabled.
+  </para>
  </sect1>
 
  <sect1 id="wal-internals">
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 595e02de72..20e044c7c8 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -31,6 +31,7 @@ OBJS = \
 	xlogarchive.o \
 	xlogfuncs.o \
 	xloginsert.o \
+	xlogprefetcher.o \
 	xlogreader.o \
 	xlogutils.o
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index fd30e27425..f01a24f577 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -34,6 +34,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "access/xloginsert.h"
+#include "access/xlogprefetcher.h"
 #include "access/xlogreader.h"
 #include "access/xlogutils.h"
 #include "catalog/catversion.h"
@@ -105,6 +106,8 @@ int			wal_level = WAL_LEVEL_MINIMAL;
 int			CommitDelay = 0;	/* precommit delay in microseconds */
 int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int			wal_retrieve_retry_interval = 5000;
+int			max_wal_prefetch_distance = -1;
+bool		wal_prefetch_fpw = false;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -806,6 +809,7 @@ static XLogSource readSource = XLOG_FROM_ANY;
  */
 static XLogSource currentSource = XLOG_FROM_ANY;
 static bool lastSourceFailed = false;
+static bool reset_wal_prefetcher = false;
 
 typedef struct XLogPageReadPrivate
 {
@@ -6213,6 +6217,7 @@ CheckRequiredParameterValues(void)
 	}
 }
 
+
 /*
  * This must be called ONCE during postmaster or standalone-backend startup
  */
@@ -7069,6 +7074,7 @@ StartupXLOG(void)
 		{
 			ErrorContextCallback errcallback;
 			TimestampTz xtime;
+			XLogPrefetcher *prefetcher = NULL;
 
 			InRedo = true;
 
@@ -7076,6 +7082,9 @@ StartupXLOG(void)
 					(errmsg("redo starts at %X/%X",
 							(uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
 
+			/* the first time through, see if we need to enable prefetching */
+			ResetWalPrefetcher();
+
 			/*
 			 * main redo apply loop
 			 */
@@ -7105,6 +7114,31 @@ StartupXLOG(void)
 				/* Handle interrupt signals of startup process */
 				HandleStartupProcInterrupts();
 
+				/*
+				 * The first time through, or if any relevant settings or the
+				 * WAL source changes, we'll restart the prefetching machinery
+				 * as appropriate.  This is simpler than trying to handle
+				 * various complicated state changes.
+				 */
+				if (unlikely(reset_wal_prefetcher))
+				{
+					/* If we had one already, destroy it. */
+					if (prefetcher)
+					{
+						XLogPrefetcherFree(prefetcher);
+						prefetcher = NULL;
+					}
+					/* If we want one, create it. */
+					if (max_wal_prefetch_distance > 0)
+							prefetcher = XLogPrefetcherAllocate(xlogreader->ReadRecPtr,
+																currentSource == XLOG_FROM_STREAM);
+					reset_wal_prefetcher = false;
+				}
+
+				/* Peform WAL prefetching, if enabled. */
+				if (prefetcher)
+					XLogPrefetcherReadAhead(prefetcher, xlogreader->ReadRecPtr);
+
 				/*
 				 * Pause WAL replay, if requested by a hot-standby session via
 				 * SetRecoveryPause().
@@ -7292,6 +7326,8 @@ StartupXLOG(void)
 			/*
 			 * end of main redo apply loop
 			 */
+			if (prefetcher)
+				XLogPrefetcherFree(prefetcher);
 
 			if (reachedRecoveryTarget)
 			{
@@ -10155,6 +10191,24 @@ assign_xlog_sync_method(int new_sync_method, void *extra)
 	}
 }
 
+void
+assign_max_wal_prefetch_distance(int new_value, void *extra)
+{
+	/* Reset the WAL prefetcher, because a setting it depends on changed. */
+	max_wal_prefetch_distance = new_value;
+	if (AmStartupProcess())
+		ResetWalPrefetcher();
+}
+
+void
+assign_wal_prefetch_fpw(bool new_value, void *extra)
+{
+	/* Reset the WAL prefetcher, because a setting it depends on changed. */
+	wal_prefetch_fpw = new_value;
+	if (AmStartupProcess())
+		ResetWalPrefetcher();
+}
+
 
 /*
  * Issue appropriate kind of fsync (if any) for an XLOG output file.
@@ -11961,6 +12015,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 					 * and move on to the next state.
 					 */
 					currentSource = XLOG_FROM_STREAM;
+					ResetWalPrefetcher();
 					break;
 
 				case XLOG_FROM_STREAM:
@@ -12390,3 +12445,12 @@ XLogRequestWalReceiverReply(void)
 {
 	doRequestWalReceiverReply = true;
 }
+
+/*
+ * Schedule a WAL prefetcher reset, on change of relevant settings.
+ */
+void
+ResetWalPrefetcher(void)
+{
+	reset_wal_prefetcher = true;
+}
diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c
new file mode 100644
index 0000000000..1d0bce692a
--- /dev/null
+++ b/src/backend/access/transam/xlogprefetcher.c
@@ -0,0 +1,654 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogprefetcher.c
+ *		Prefetching support for PostgreSQL write-ahead log manager
+ *
+ * Portions Copyright (c) 2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *		src/backend/access/transam/xlogprefetcher.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "access/xlogprefetcher.h"
+#include "access/xlogreader.h"
+#include "access/xlogutils.h"
+#include "catalog/storage_xlog.h"
+#include "utils/fmgrprotos.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "port/atomics.h"
+#include "storage/bufmgr.h"
+#include "storage/shmem.h"
+#include "storage/smgr.h"
+#include "utils/hsearch.h"
+
+/*
+ * Sample the queue depth and distance every time we replay this much WAL.
+ * This is used to compute avg_queue_depth and avg_distance for the log message
+ * that appears at the end of crash recovery.
+ */
+#define XLOGPREFETCHER_MONITORING_SAMPLE_STEP 32768
+
+/*
+ * Internal state used for book-keeping.
+ */
+struct XLogPrefetcher
+{
+	/* Reader and current reading state. */
+	XLogReaderState *reader;
+	XLogReadLocalOptions options;
+	bool			have_record;
+	bool			shutdown;
+	int				next_block_id;
+
+	/* Book-keeping required to avoid accessing non-existing blocks. */
+	HTAB		   *filter_table;
+	dlist_head		filter_queue;
+
+	/* Book-keeping required to limit concurrent prefetches. */
+	XLogRecPtr	   *prefetch_queue;
+	int				prefetch_queue_size;
+	int				prefetch_head;
+	int				prefetch_tail;
+
+	/* Details of last prefetch to skip repeats and seq scans. */
+	SMgrRelation	last_reln;
+	RelFileNode		last_rnode;
+	BlockNumber		last_blkno;
+
+	/* Counters used to compute avg_queue_depth and avg_distance. */
+	double			samples;
+	double			queue_depth_sum;
+	double			distance_sum;
+	XLogRecPtr		next_sample_lsn;
+};
+
+/*
+ * A temporary filter used to track block ranges that haven't been created
+ * yet, whole relations that haven't been created yet, and whole relations
+ * that we must assume have already been dropped.
+ */
+typedef struct XLogPrefetcherFilter
+{
+	RelFileNode		rnode;
+	XLogRecPtr		filter_until_replayed;
+	BlockNumber		filter_from_block;
+	dlist_node		link;
+} XLogPrefetcherFilter;
+
+/*
+ * Counters exposed in shared memory just for the benefit of monitoring
+ * functions.
+ */
+typedef struct XLogPrefetcherMonitoringStats
+{
+	pg_atomic_uint64 prefetch;	/* Prefetches initiated. */
+	pg_atomic_uint64 skip_hit;	/* Blocks already buffered. */
+	pg_atomic_uint64 skip_new;	/* New/missing blocks filtered. */
+	pg_atomic_uint64 skip_fpw;	/* FPWs skipped. */
+	pg_atomic_uint64 skip_seq;	/* Sequential/repeat blocks skipped. */
+	int				distance;	/* Number of bytes ahead in the WAL. */
+	int				queue_depth; /* Number of I/Os possibly in progress. */
+} XLogPrefetcherMonitoringStats;
+
+static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
+										   RelFileNode rnode,
+										   BlockNumber blockno,
+										   XLogRecPtr lsn);
+static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
+											RelFileNode rnode,
+											BlockNumber blockno);
+static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
+												 XLogRecPtr replaying_lsn);
+static inline void XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher,
+											 XLogRecPtr prefetching_lsn);
+static inline void XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher,
+											 XLogRecPtr replaying_lsn);
+static inline bool XLogPrefetcherSaturated(XLogPrefetcher *prefetcher);
+
+/*
+ * On modern systems this is really just *counter++.  On some older systems
+ * there might be more to it, due to inability to read and write 64 bit values
+ * atomically.
+ */
+static inline void inc_counter(pg_atomic_uint64 *counter)
+{
+	pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
+}
+
+static XLogPrefetcherMonitoringStats *MonitoringStats;
+
+size_t
+XLogPrefetcherShmemSize(void)
+{
+	return sizeof(XLogPrefetcherMonitoringStats);
+}
+
+static void
+XLogPrefetcherResetMonitoringStats(void)
+{
+	pg_atomic_init_u64(&MonitoringStats->prefetch, 0);
+	pg_atomic_init_u64(&MonitoringStats->skip_hit, 0);
+	pg_atomic_init_u64(&MonitoringStats->skip_new, 0);
+	pg_atomic_init_u64(&MonitoringStats->skip_fpw, 0);
+	pg_atomic_init_u64(&MonitoringStats->skip_seq, 0);
+	MonitoringStats->distance = -1;
+	MonitoringStats->queue_depth = 0;
+}
+
+void
+XLogPrefetcherShmemInit(void)
+{
+	bool		found;
+
+	MonitoringStats = (XLogPrefetcherMonitoringStats *)
+		ShmemInitStruct("XLogPrefetcherMonitoringStats",
+						sizeof(XLogPrefetcherMonitoringStats),
+						&found);
+	if (!found)
+		XLogPrefetcherResetMonitoringStats();
+}
+
+/*
+ * Create a prefetcher that is ready to begin prefetching blocks referenced by
+ * WAL that is ahead of the given lsn.
+ */
+XLogPrefetcher *
+XLogPrefetcherAllocate(XLogRecPtr lsn, bool streaming)
+{
+	static HASHCTL hash_table_ctl = {
+		.keysize = sizeof(RelFileNode),
+		.entrysize = sizeof(XLogPrefetcherFilter)
+	};
+	XLogPrefetcher *prefetcher = palloc0(sizeof(*prefetcher));
+
+	prefetcher->options.nowait = true;
+	if (streaming)
+	{
+		/*
+		 * We're only allowed to read as far as the WAL receiver has written.
+		 * We don't have to wait for it to be flushed, though, as recovery
+		 * does, so that gives us a chance to get a bit further ahead.
+		 */
+		prefetcher->options.read_upto_policy = XLRO_WALRCV_WRITTEN;
+	}
+	else
+	{
+		/* We're allowed to read as far as we can. */
+		prefetcher->options.read_upto_policy = XLRO_LSN;
+		prefetcher->options.lsn = (XLogRecPtr) -1;
+	}
+	prefetcher->reader = XLogReaderAllocate(wal_segment_size,
+											NULL,
+											read_local_xlog_page,
+											&prefetcher->options);
+	prefetcher->filter_table = hash_create("PrefetchFilterTable", 1024,
+										   &hash_table_ctl,
+										   HASH_ELEM | HASH_BLOBS);
+	dlist_init(&prefetcher->filter_queue);
+
+	/*
+	 * The size of the queue is based on the maintenance_io_concurrency
+	 * setting.  In theory we might have a separate queue for each tablespace,
+	 * but it's not clear how that should work, so for now we'll just use the
+	 * general GUC to rate-limit all prefetching.
+	 */
+	prefetcher->prefetch_queue_size = maintenance_io_concurrency;
+	prefetcher->prefetch_queue = palloc0(sizeof(XLogRecPtr) * prefetcher->prefetch_queue_size);
+	prefetcher->prefetch_head = prefetcher->prefetch_tail = 0;
+
+	/* Prepare to read at the given LSN. */
+	elog(LOG, "WAL prefetch started at %X/%X",
+		 (uint32) (lsn << 32), (uint32) lsn);
+	XLogBeginRead(prefetcher->reader, lsn);
+
+	XLogPrefetcherResetMonitoringStats();
+
+	return prefetcher;
+}
+
+/*
+ * Destroy a prefetcher and release all resources.
+ */
+void
+XLogPrefetcherFree(XLogPrefetcher *prefetcher)
+{
+	double		avg_distance = 0;
+	double		avg_queue_depth = 0;
+
+	/* Log final statistics. */
+	if (prefetcher->samples > 0)
+	{
+		avg_distance = prefetcher->distance_sum / prefetcher->samples;
+		avg_queue_depth = prefetcher->queue_depth_sum / prefetcher->samples;
+	}
+	elog(LOG,
+		 "WAL prefetch finished at %X/%X; "
+		 "prefetch = " UINT64_FORMAT ", "
+		 "skip_hit = " UINT64_FORMAT ", "
+		 "skip_new = " UINT64_FORMAT ", "
+		 "skip_fpw = " UINT64_FORMAT ", "
+		 "skip_seq = " UINT64_FORMAT ", "
+		 "avg_distance = %f, "
+		 "avg_queue_depth = %f",
+		 (uint32) (prefetcher->reader->EndRecPtr << 32),
+		 (uint32) (prefetcher->reader->EndRecPtr),
+		 pg_atomic_read_u64(&MonitoringStats->prefetch),
+		 pg_atomic_read_u64(&MonitoringStats->skip_hit),
+		 pg_atomic_read_u64(&MonitoringStats->skip_new),
+		 pg_atomic_read_u64(&MonitoringStats->skip_fpw),
+		 pg_atomic_read_u64(&MonitoringStats->skip_seq),
+		 avg_distance,
+		 avg_queue_depth);
+	XLogReaderFree(prefetcher->reader);
+	hash_destroy(prefetcher->filter_table);
+	pfree(prefetcher->prefetch_queue);
+	pfree(prefetcher);
+
+	XLogPrefetcherResetMonitoringStats();
+}
+
+/*
+ * Read ahead in the WAL, as far as we can within the limits set by the user.
+ * Begin fetching any referenced blocks that are not already in the buffer
+ * pool.
+ */
+void
+XLogPrefetcherReadAhead(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+	/* If an error has occurred or we've hit the end of the WAL, do nothing. */
+	if (prefetcher->shutdown)
+		return;
+
+	/*
+	 * Have any in-flight prefetches definitely completed, judging by the LSN
+	 * that is currently being replayed?
+	 */
+	XLogPrefetcherCompletedIO(prefetcher, replaying_lsn);
+
+	/*
+	 * Do we already have the maximum permitted number of I/Os running
+	 * (according to the information we have)?  If so, we have to wait for at
+	 * least one to complete, so give up early.
+	 */
+	if (XLogPrefetcherSaturated(prefetcher))
+		return;
+
+	/* Can we drop any filters yet, due to problem records begin replayed? */
+	XLogPrefetcherCompleteFilters(prefetcher, replaying_lsn);
+
+	/* Main prefetch loop. */
+	for (;;)
+	{
+		XLogReaderState *reader = prefetcher->reader;
+		char *error;
+		int64		distance;
+
+		/* If we don't already have a record, then try to read one. */
+		if (!prefetcher->have_record)
+		{
+			if (!XLogReadRecord(reader, &error))
+			{
+				/* If we got an error, log it and give up. */
+				if (error)
+				{
+					elog(LOG, "WAL prefetch error: %s", error);
+					prefetcher->shutdown = true;
+				}
+				/* Otherwise, we'll try again later when more data is here. */
+				return;
+			}
+			prefetcher->have_record = true;
+			prefetcher->next_block_id = 0;
+		}
+
+		/* How far ahead of replay are we now? */
+		distance = prefetcher->reader->ReadRecPtr - replaying_lsn;
+
+		/* Update distance shown in shm. */
+		MonitoringStats->distance = distance;
+
+		/* Sample the averages so we can log them at end of recovery. */
+		if (unlikely(replaying_lsn >= prefetcher->next_sample_lsn))
+		{
+			prefetcher->distance_sum += MonitoringStats->distance;
+			prefetcher->queue_depth_sum += MonitoringStats->queue_depth;
+			prefetcher->samples += 1.0;
+			prefetcher->next_sample_lsn =
+				replaying_lsn + XLOGPREFETCHER_MONITORING_SAMPLE_STEP;
+		}
+
+		/* Are we too far ahead of replay? */
+		if (distance >= max_wal_prefetch_distance)
+			break;
+
+		/*
+		 * If this is a record that creates a new SMGR relation, we'll avoid
+		 * prefetching anything from that rnode until it has been replayed.
+		 */
+		if (replaying_lsn < reader->ReadRecPtr &&
+			XLogRecGetRmid(reader) == RM_SMGR_ID &&
+			(XLogRecGetInfo(reader) & ~XLR_INFO_MASK) == XLOG_SMGR_CREATE)
+		{
+			xl_smgr_create *xlrec = (xl_smgr_create *) XLogRecGetData(reader);
+
+			XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0,
+									reader->ReadRecPtr);
+		}
+
+		/*
+		 * Scan the record for block references.  We might already have been
+		 * partway through processing this record when we hit maximum I/O
+		 * concurrency, so start where we left off.
+		 */
+		for (int i = prefetcher->next_block_id; i <= reader->max_block_id; ++i)
+		{
+			DecodedBkpBlock *block = &reader->blocks[i];
+			SMgrRelation reln;
+
+			/* Ignore everything but the main fork for now. */
+			if (block->forknum != MAIN_FORKNUM)
+				continue;
+
+			/*
+			 * If there is a full page image attached, we won't be reading the
+			 * page, so you might thing we should skip it.  However, if the
+			 * underlying filesystem uses larger logical blocks than us, it
+			 * might still need to perform a read-before-write some time later.
+			 * Therefore, only prefetch if configured to do so.
+			 */
+			if (block->has_image && !wal_prefetch_fpw)
+			{
+				inc_counter(&MonitoringStats->skip_fpw);
+				continue;
+			}
+
+			/*
+			 * If this block will initialize a new page then it's probably an
+			 * extension.  Since it might create a new segment, we can't try
+			 * to prefetch this block until the record has been replayed, or we
+			 * might try to open a file that doesn't exist yet.
+			 */
+			if (block->flags & BKPBLOCK_WILL_INIT)
+			{
+				XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno,
+										reader->ReadRecPtr);
+				inc_counter(&MonitoringStats->skip_new);
+				continue;
+			}
+
+			/* Should we skip this block due to a filter? */
+			if (XLogPrefetcherIsFiltered(prefetcher, block->rnode,
+										 block->blkno))
+			{
+				inc_counter(&MonitoringStats->skip_new);
+				continue;
+			}
+
+			/* Fast path for repeated references to the same relation. */
+			if (RelFileNodeEquals(block->rnode, prefetcher->last_rnode))
+			{
+				/*
+				 * If this is a repeat or sequential access, then skip it.  We
+				 * expect the kernel to detect sequential access on its own
+				 * and do a better job than we could.
+				 */
+				if (block->blkno == prefetcher->last_blkno ||
+					block->blkno == prefetcher->last_blkno + 1)
+				{
+					prefetcher->last_blkno = block->blkno;
+					inc_counter(&MonitoringStats->skip_seq);
+					continue;
+				}
+
+				/* We can avoid calling smgropen(). */
+				reln = prefetcher->last_reln;
+			}
+			else
+			{
+				/* Otherwise we have to open it. */
+				reln = smgropen(block->rnode, InvalidBackendId);
+				prefetcher->last_rnode = block->rnode;
+				prefetcher->last_reln = reln;
+			}
+			prefetcher->last_blkno = block->blkno;
+
+			/* Try to prefetch this block! */
+			switch (PrefetchSharedBuffer(reln, block->forknum, block->blkno))
+			{
+			case PREFETCH_BUFFER_HIT:
+				/* It's already cached, so do nothing. */
+				inc_counter(&MonitoringStats->skip_hit);
+				break;
+			case PREFETCH_BUFFER_MISS:
+				/*
+				 * I/O has possibly been initiated (though we don't know if it
+				 * was already cached by the kernel, so we just have to assume
+				 * that it has due to lack of better information).  Record
+				 * this as an I/O in progress until eventually we replay this
+				 * LSN.
+				 */
+				inc_counter(&MonitoringStats->prefetch);
+				XLogPrefetcherInitiatedIO(prefetcher, reader->ReadRecPtr);
+				/*
+				 * If the queue is now full, we'll have to wait before
+				 * processing any more blocks from this record.
+				 */
+				if (XLogPrefetcherSaturated(prefetcher))
+				{
+					prefetcher->next_block_id = i + 1;
+					return;
+				}
+				break;
+			case PREFETCH_BUFFER_NOREL:
+				/*
+				 * The underlying segment file doesn't exist.  Presumably it
+				 * will be unlinked by a later WAL record.  When recovery
+				 * reads this block, it will use the EXTENSION_CREATE_RECOVERY
+				 * flag.  We certainly don't want to do that sort of thing
+				 * while merely prefetching, so let's just ignore references
+				 * to this relation until this record is replayed, and let
+				 * recovery create the dummy file or complain if something is
+				 * wrong.
+				 */
+				XLogPrefetcherAddFilter(prefetcher, block->rnode, 0,
+										reader->ReadRecPtr);
+				inc_counter(&MonitoringStats->skip_new);
+				break;
+			}
+		}
+
+		/* Advance to the next record. */
+		prefetcher->have_record = false;
+	}
+}
+
+/*
+ * Expose statistics about WAL prefetching.
+ */
+Datum
+pg_stat_get_wal_prefetcher(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_WAL_PREFETCHER_COLS 7
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	TupleDesc	tupdesc;
+	Tuplestorestate *tupstore;
+	MemoryContext per_query_ctx;
+	MemoryContext oldcontext;
+	Datum		values[PG_STAT_GET_WAL_PREFETCHER_COLS];
+	bool		nulls[PG_STAT_GET_WAL_PREFETCHER_COLS];
+
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mod required, but it is not allowed in this context")));
+
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	if (MonitoringStats->distance < 0)
+	{
+		for (int i = 0; i < PG_STAT_GET_WAL_PREFETCHER_COLS; ++i)
+			nulls[i] = true;
+	}
+	else
+	{
+		for (int i = 0; i < PG_STAT_GET_WAL_PREFETCHER_COLS; ++i)
+			nulls[i] = false;
+		values[0] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->prefetch));
+		values[1] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->skip_hit));
+		values[2] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->skip_new));
+		values[3] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->skip_fpw));
+		values[4] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->skip_seq));
+		values[5] = Int32GetDatum(MonitoringStats->distance);
+		values[6] = Int32GetDatum(MonitoringStats->queue_depth);
+	}
+	tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+	tuplestore_donestoring(tupstore);
+
+	return (Datum) 0;
+}
+
+/*
+ * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn'
+ * has been replayed.
+ */
+static inline void
+XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode,
+						BlockNumber blockno, XLogRecPtr lsn)
+{
+	XLogPrefetcherFilter *filter;
+	bool		found;
+
+	filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found);
+	if (!found)
+	{
+		/*
+		 * Don't allow any prefetching of this block or higher until replayed.
+		 */
+		filter->filter_until_replayed = lsn;
+		filter->filter_from_block = blockno;
+		dlist_push_head(&prefetcher->filter_queue, &filter->link);
+	}
+	else
+	{
+		/*
+		 * We were already filtering this rnode.  Extend the filter's lifetime
+		 * to cover this WAL record, but leave the (presumably lower) block
+		 * number there because we don't want to have to track individual
+		 * blocks.
+		 */
+		filter->filter_until_replayed = lsn;
+		dlist_delete(&filter->link);
+		dlist_push_head(&prefetcher->filter_queue, &filter->link);
+	}
+}
+
+/*
+ * Have we replayed the records that caused us to begin filtering a block
+ * range?  That means that relations should have been created, extended or
+ * dropped as required, so we can drop relevant filters.
+ */
+static inline void
+XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+	while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
+	{
+		XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
+														  link,
+														  &prefetcher->filter_queue);
+
+		if (filter->filter_until_replayed >= replaying_lsn)
+			break;
+		dlist_delete(&filter->link);
+		hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
+	}
+}
+
+/*
+ * Check if a given block should be skipped due to a filter.
+ */
+static inline bool
+XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode,
+						 BlockNumber blockno)
+{
+	/*
+	 * Test for empty queue first, because we expect it to be empty most of the
+	 * time and we can avoid the hash table lookup in that case.
+	 */
+	if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
+	{
+		XLogPrefetcherFilter *filter = hash_search(prefetcher->filter_table, &rnode,
+												   HASH_FIND, NULL);
+
+		if (filter && filter->filter_from_block <= blockno)
+			return true;
+	}
+
+	return false;
+}
+
+/*
+ * Insert an LSN into the queue.  The queue must not be full already.  This
+ * tracks the fact that we have (to the best of our knowledge) initiated an
+ * I/O, so that we can impose a cap on concurrent prefetching.
+ */
+static inline void
+XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher,
+						  XLogRecPtr prefetching_lsn)
+{
+	Assert(!XLogPrefetcherSaturated(prefetcher));
+	prefetcher->prefetch_queue[prefetcher->prefetch_head++] = prefetching_lsn;
+	prefetcher->prefetch_head %= prefetcher->prefetch_queue_size;
+	MonitoringStats->queue_depth++;
+	Assert(MonitoringStats->queue_depth <= prefetcher->prefetch_queue_size);
+}
+
+/*
+ * Have we replayed the records that caused us to initiate the oldest
+ * prefetches yet?  That means that they're definitely finished, so we can can
+ * forget about them and allow ourselves to initiate more prefetches.  For now
+ * we don't have any awareness of when I/O really completes.
+ */
+static inline void
+XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+	while (prefetcher->prefetch_head != prefetcher->prefetch_tail &&
+		   prefetcher->prefetch_queue[prefetcher->prefetch_tail] < replaying_lsn)
+	{
+		prefetcher->prefetch_tail++;
+		prefetcher->prefetch_tail %= prefetcher->prefetch_queue_size;
+		MonitoringStats->queue_depth--;
+		Assert(MonitoringStats->queue_depth >= 0);
+	}
+}
+
+/*
+ * Check if the maximum allowed number of I/Os is already in flight.
+ */
+static inline bool
+XLogPrefetcherSaturated(XLogPrefetcher *prefetcher)
+{
+	return (prefetcher->prefetch_head + 1) % prefetcher->prefetch_queue_size ==
+		prefetcher->prefetch_tail;
+}
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index b217ffa52f..fad2acb514 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -25,6 +25,7 @@
 #include "access/xlogutils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/walreceiver.h"
 #include "storage/smgr.h"
 #include "utils/guc.h"
 #include "utils/hsearch.h"
@@ -827,6 +828,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	TimeLineID	tli;
 	int			count;
 	WALReadError errinfo;
+	XLogReadLocalOptions *options = (XLogReadLocalOptions *) state->private_data;
 
 	loc = targetPagePtr + reqLen;
 
@@ -841,7 +843,23 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 		 * notices recovery finishes, so we only have to maintain it for the
 		 * local process until recovery ends.
 		 */
-		if (!RecoveryInProgress())
+		if (options)
+		{
+			switch (options->read_upto_policy)
+			{
+			case XLRO_WALRCV_WRITTEN:
+				read_upto = GetWalRcvWriteRecPtr();
+				break;
+			case XLRO_LSN:
+				read_upto = options->lsn;
+				break;
+			default:
+				read_upto = 0;
+				elog(ERROR, "unknown read_upto_policy value");
+				break;
+			}
+		}
+		else if (!RecoveryInProgress())
 			read_upto = GetFlushRecPtr();
 		else
 			read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
@@ -879,6 +897,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 			if (loc <= read_upto)
 				break;
 
+			if (options && options->nowait)
+				break;
+
 			CHECK_FOR_INTERRUPTS();
 			pg_usleep(1000L);
 		}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index b8a3f46912..7b27ac4805 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -811,6 +811,17 @@ CREATE VIEW pg_stat_wal_receiver AS
     FROM pg_stat_get_wal_receiver() s
     WHERE s.pid IS NOT NULL;
 
+CREATE VIEW pg_stat_wal_prefetcher AS
+    SELECT
+            s.prefetch,
+            s.skip_hit,
+            s.skip_new,
+            s.skip_fpw,
+            s.skip_seq,
+            s.distance,
+            s.queue_depth
+     FROM pg_stat_get_wal_prefetcher() s;
+
 CREATE VIEW pg_stat_subscription AS
     SELECT
             su.oid AS subid,
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index e3da7d3625..34f3017871 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options,
 
 	ctx->slot = slot;
 
-	ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx);
+	ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, NULL);
 	if (!ctx->reader)
 		ereport(ERROR,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 427b0d59cd..5ca98b8886 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -21,6 +21,7 @@
 #include "access/nbtree.h"
 #include "access/subtrans.h"
 #include "access/twophase.h"
+#include "access/xlogprefetcher.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -124,6 +125,7 @@ CreateSharedMemoryAndSemaphores(void)
 		size = add_size(size, PredicateLockShmemSize());
 		size = add_size(size, ProcGlobalShmemSize());
 		size = add_size(size, XLOGShmemSize());
+		size = add_size(size, XLogPrefetcherShmemSize());
 		size = add_size(size, CLOGShmemSize());
 		size = add_size(size, CommitTsShmemSize());
 		size = add_size(size, SUBTRANSShmemSize());
@@ -212,6 +214,7 @@ CreateSharedMemoryAndSemaphores(void)
 	 * Set up xlog, clog, and buffers
 	 */
 	XLOGShmemInit();
+	XLogPrefetcherShmemInit();
 	CLOGShmemInit();
 	CommitTsShmemInit();
 	SUBTRANSShmemInit();
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 68082315ac..a2a9f62160 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -197,6 +197,7 @@ static bool check_max_wal_senders(int *newval, void **extra, GucSource source);
 static bool check_autovacuum_work_mem(int *newval, void **extra, GucSource source);
 static bool check_effective_io_concurrency(int *newval, void **extra, GucSource source);
 static bool check_maintenance_io_concurrency(int *newval, void **extra, GucSource source);
+static void assign_maintenance_io_concurrency(int newval, void *extra);
 static void assign_pgstat_temp_directory(const char *newval, void *extra);
 static bool check_application_name(char **newval, void **extra, GucSource source);
 static void assign_application_name(const char *newval, void *extra);
@@ -1241,6 +1242,18 @@ static struct config_bool ConfigureNamesBool[] =
 		true,
 		NULL, NULL, NULL
 	},
+	{
+		{"wal_prefetch_fpw", PGC_SIGHUP, WAL_SETTINGS,
+			gettext_noop("Prefetch blocks that have full page images in the WAL"),
+			gettext_noop("On some systems, there is no benefit to prefetching pages that will be "
+						 "entirely overwritten, but if the logical page size of the filesystem is "
+						 "larger than PostgreSQL's, this can be beneficial.  This option has no "
+						 "effect unless max_wal_prefetch_distance is set to a positive number.")
+		},
+		&wal_prefetch_fpw,
+		false,
+		NULL, assign_wal_prefetch_fpw, NULL
+	},
 
 	{
 		{"wal_log_hints", PGC_POSTMASTER, WAL_SETTINGS,
@@ -2627,6 +2640,17 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_wal_prefetch_distance", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY,
+			gettext_noop("Maximum number of bytes to read ahead in the WAL to prefetch referenced blocks."),
+			gettext_noop("Set to -1 to disable WAL prefetching."),
+			GUC_UNIT_BYTE
+		},
+		&max_wal_prefetch_distance,
+		-1, -1, INT_MAX,
+		NULL, assign_max_wal_prefetch_distance, NULL
+	},
+
 	{
 		{"wal_keep_segments", PGC_SIGHUP, REPLICATION_SENDING,
 			gettext_noop("Sets the number of WAL files held for standby servers."),
@@ -2900,7 +2924,8 @@ static struct config_int ConfigureNamesInt[] =
 		0,
 #endif
 		0, MAX_IO_CONCURRENCY,
-		check_maintenance_io_concurrency, NULL, NULL
+		check_maintenance_io_concurrency, assign_maintenance_io_concurrency,
+		NULL
 	},
 
 	{
@@ -11498,6 +11523,17 @@ check_maintenance_io_concurrency(int *newval, void **extra, GucSource source)
 	return true;
 }
 
+static void
+assign_maintenance_io_concurrency(int newval, void *extra)
+{
+#ifdef USE_PREFETCH
+	/* Reset the WAL prefetcher, because a setting it depends on changed. */
+	maintenance_io_concurrency = newval;
+	if (AmStartupProcess())
+		ResetWalPrefetcher();
+#endif
+}
+
 static void
 assign_pgstat_temp_directory(const char *newval, void *extra)
 {
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 98b033fc20..82829d7854 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -111,6 +111,8 @@ extern int	wal_keep_segments;
 extern int	XLOGbuffers;
 extern int	XLogArchiveTimeout;
 extern int	wal_retrieve_retry_interval;
+extern int	max_wal_prefetch_distance;
+extern bool wal_prefetch_fpw;
 extern char *XLogArchiveCommand;
 extern bool EnableHotStandby;
 extern bool fullPageWrites;
@@ -319,6 +321,8 @@ extern void SetWalWriterSleeping(bool sleeping);
 
 extern void XLogRequestWalReceiverReply(void);
 
+extern void ResetWalPrefetcher(void);
+
 extern void assign_max_wal_size(int newval, void *extra);
 extern void assign_checkpoint_completion_target(double newval, void *extra);
 
diff --git a/src/include/access/xlogprefetcher.h b/src/include/access/xlogprefetcher.h
new file mode 100644
index 0000000000..585f5564a3
--- /dev/null
+++ b/src/include/access/xlogprefetcher.h
@@ -0,0 +1,28 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogprefetcher.h
+ *		Declarations for the XLog prefetching facility
+ *
+ * Portions Copyright (c) 2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *		src/include/access/xlogprefetcher.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef XLOGPREFETCHER_H
+#define XLOGPREFETCHER_H
+
+#include "access/xlogdefs.h"
+
+struct XLogPrefetcher;
+typedef struct XLogPrefetcher XLogPrefetcher;
+
+extern XLogPrefetcher *XLogPrefetcherAllocate(XLogRecPtr lsn, bool streaming);
+extern void XLogPrefetcherFree(XLogPrefetcher *prefetcher);
+extern void XLogPrefetcherReadAhead(XLogPrefetcher *prefetch, XLogRecPtr replaying_lsn);
+
+extern size_t XLogPrefetcherShmemSize(void);
+extern void XLogPrefetcherShmemInit(void);
+
+#endif
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 5181a077d9..1c8e67d74a 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -47,6 +47,26 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
 
+/*
+ * A pointer to an XLogReadLocalOptions struct can supplied as the private
+ * data for an xlog reader, causing read_local_xlog_page to modify its
+ * behavior.
+ */
+typedef struct XLogReadLocalOptions
+{
+	/* Don't block waiting for new WAL to arrive. */
+	bool		nowait;
+
+	/* How far to read. */
+	enum {
+		XLRO_WALRCV_WRITTEN,
+		XLRO_LSN
+	} read_upto_policy;
+
+	/* If read_upto_policy is XLRO_LSN, the LSN. */
+	XLogRecPtr lsn;
+} XLogReadLocalOptions;
+
 extern int	read_local_xlog_page(XLogReaderState *state,
 								 XLogRecPtr targetPagePtr, int reqLen,
 								 XLogRecPtr targetRecPtr, char *cur_page);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 7fb574f9dc..742741afa1 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6082,6 +6082,14 @@
   prorettype => 'bool', proargtypes => '',
   prosrc => 'pg_is_wal_replay_paused' },
 
+{ oid => '9085', descr => 'statistics: information about WAL prefetching',
+  proname => 'pg_stat_get_wal_prefetcher', prorows => '1', provolatile => 'v',
+  proretset => 't', prorettype => 'record', proargtypes => '',
+  proallargtypes => '{int8,int8,int8,int8,int8,int4,int4}',
+  proargmodes => '{o,o,o,o,o,o,o}',
+  proargnames => '{prefetch,skip_hit,skip_new,skip_fpw,skip_seq,distance,queue_depth}',
+  prosrc => 'pg_stat_get_wal_prefetcher' },
+
 { oid => '2621', descr => 'reload configuration files',
   proname => 'pg_reload_conf', provolatile => 'v', prorettype => 'bool',
   proargtypes => '', prosrc => 'pg_reload_conf' },
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 1210d1e7e8..3ca171adb8 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -159,6 +159,11 @@ extern PGDLLIMPORT int32 *LocalRefCount;
  */
 #define BufferGetPage(buffer) ((Page)BufferGetBlock(buffer))
 
+/*
+ * When you try to prefetch a buffer, there are three possibilities: it's
+ * already cached in our buffer pool, it's not cached but we can ask the kernel
+ * we'll be loading it soon, or the relation file doesn't exist.
+ */
 typedef enum PrefetchBufferResult
 {
 	PREFETCH_BUFFER_HIT,
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index ce93ace76c..7d076a9743 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -438,5 +438,7 @@ extern void assign_search_path(const char *newval, void *extra);
 /* in access/transam/xlog.c */
 extern bool check_wal_buffers(int *newval, void **extra, GucSource source);
 extern void assign_xlog_sync_method(int new_sync_method, void *extra);
+extern void assign_max_wal_prefetch_distance(int new_value, void *extra);
+extern void assign_wal_prefetch_fpw(bool new_value, void *extra);
 
 #endif							/* GUC_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index c7304611c3..63bbb796fc 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2102,6 +2102,14 @@ pg_stat_user_tables| SELECT pg_stat_all_tables.relid,
     pg_stat_all_tables.autoanalyze_count
    FROM pg_stat_all_tables
   WHERE ((pg_stat_all_tables.schemaname <> ALL (ARRAY['pg_catalog'::name, 'information_schema'::name])) AND (pg_stat_all_tables.schemaname !~ '^pg_toast'::text));
+pg_stat_wal_prefetcher| SELECT s.prefetch,
+    s.skip_hit,
+    s.skip_new,
+    s.skip_fpw,
+    s.skip_seq,
+    s.distance,
+    s.queue_depth
+   FROM pg_stat_get_wal_prefetcher() s(prefetch, skip_hit, skip_new, skip_fpw, skip_seq, distance, queue_depth);
 pg_stat_wal_receiver| SELECT s.pid,
     s.status,
     s.receive_start_lsn,
-- 
2.20.1

