Reduce the time required for a database recovery from archive.

Started by Dmitry Shulgaover 5 years ago14 messages
#1Dmitry Shulga
d.shulga@postgrespro.ru
1 attachment(s)

Hello hackers,

Currently, database recovery from archive is performed sequentially,
by reading archived WAL files and applying their records to the database.

Overall archive file processing is done one by one, and this might
create a performance bottleneck if archived WAL files are delivered slowly,
because the database server has to wait for arrival of the next
WAL segment before applying its records.

To address this issue it is proposed to receive archived WAL files in parallel
so that when the next WAL segment file is required for processing of redo log
records it would be already available.

Implementation of this approach assumes running several background processes (bgworkers)
each of which runs a shell command specified by the parameter restore_command
to deliver an archived WAL file. Number of running parallel processes is limited
by the new parameter max_restore_command_workers. If this parameter has value 0
then WAL files delivery is performed using the original algorithm, that is in
one-by-one manner. If this parameter has value greater than 0 then the database
server starts several bgworker processes up to the limit specified by
the parameter max_restore_command_workers and passes to every process
WAL file name to deliver. Active processes start prefetching of specified
WAL files and store received files in the directory pg_wal/pgsql_tmp. After
bgworker process finishes receiving a file it marks itself as a free process
and waits for a new request to receive a next WAL file. The main process
performing database recovery still handles WAL files in one-by-one manner,
but instead of waiting for a next required WAL file's availability it checks for
that file in the prefetched directory. If a new file is present there,
the main process starts its processing.

The patch implemeting the described approach is attached to this email.
The patch contains a test in the file src/test/recovery/t/021_xlogrestore.pl
Although the test result depends on real execution time and hardly could be
approved for including to the repository it was added in order to show
a positive effect from applying the new algorithm. In my environment restoring
from archive with parallel prefetching is twice as faster than in original
mode.

Regards,
Dmitry.

Attachments:

archive_recovery_speedup.patchapplication/octet-stream; name=archive_recovery_speedup.patch; x-unix-mode=0644Download
commit fba430be4f6f1d18630ffa13dc5d73a96ccaff01
Author: Dmitry Shulga <d.shulga@postgrespro.ru>
Date:   Wed Sep 2 15:59:01 2020 +0700

    Reduce time required to recover database from archive.
    
    Originally database recovering from archive was performed by
    sequential receiving of files with WAL records and applying them against
    the database. Delivering of files containing WAL records are performed
    by running a command specified by the GUC parameter restore_command.
    In case receiving of every file containing WAL records takes long time
    it results in standing idle most of time waiting until files be recevied.
    If time required to apply WAL records from an archive file is significantly
    lesser than time required to deliver the file from archive it leads
    to nonproductive standing idle after current WAL segment is appled and
    before next WAL segment be received.
    
    As a consequence a wall time required to recover a database from archive
    log can be unacceptably long.
    
    To reduce total time required to restore database from archive running
    of a command specified by the GUC parameter restore_command  has been
    isolated into a separate bgworker process and run in parallel with applying
    records contained in archived files. Number of started parallel bgworker
    processes for delivering WAL segment from archive is limited by the new
    GUC parameter max_restore_command_workers.
    
    Additionally, refactoring was done to extract duplicate code used
    in files xlog.c and xlogrecovery.c and move it in standalone
    functions.
    
    Author: Ivan Taranov
    Reviewed-by: Anna Akenteva, Marina Polyakova, Dmitry Shulga
    Tested-by: Roman Zharkov

diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 595e02de722..ffbf8090f45 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -32,7 +32,8 @@ OBJS = \
 	xlogfuncs.o \
 	xloginsert.o \
 	xlogreader.o \
-	xlogutils.o
+	xlogutils.o \
+	xlogrestore.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 09c01ed4ae4..a513174f0f6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -37,6 +37,7 @@
 #include "access/xloginsert.h"
 #include "access/xlogreader.h"
 #include "access/xlogutils.h"
+#include "access/xlogrestore.h"
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
 #include "catalog/pg_database.h"
@@ -3681,10 +3682,11 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 					 xlogfname);
 			set_ps_display(activitymsg);
 
-			restoredFromArchive = RestoreArchivedFile(path, xlogfname,
-													  "RECOVERYXLOG",
-													  wal_segment_size,
-													  InRedo);
+			restoredFromArchive = RestoreCommandXLog(path, xlogfname,
+													 "RECOVERYXLOG",
+													 wal_segment_size,
+													 InRedo);
+
 			if (!restoredFromArchive)
 				return -1;
 			break;
diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c
index 8f8734dc1d4..86afd48ae5b 100644
--- a/src/backend/access/transam/xlogarchive.c
+++ b/src/backend/access/transam/xlogarchive.c
@@ -22,6 +22,7 @@
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
+#include "access/xlogutils.h"
 #include "common/archive.h"
 #include "miscadmin.h"
 #include "postmaster/startup.h"
@@ -55,13 +56,8 @@ RestoreArchivedFile(char *path, const char *xlogfname,
 					bool cleanupEnabled)
 {
 	char		xlogpath[MAXPGPATH];
-	char	   *xlogRestoreCmd;
 	char		lastRestartPointFname[MAXPGPATH];
 	int			rc;
-	struct stat stat_buf;
-	XLogSegNo	restartSegNo;
-	XLogRecPtr	restartRedoPtr;
-	TimeLineID	restartTli;
 
 	/*
 	 * Ignore restore_command when not in archive recovery (meaning we are in
@@ -102,22 +98,7 @@ RestoreArchivedFile(char *path, const char *xlogfname,
 	/*
 	 * Make sure there is no existing file named recovername.
 	 */
-	if (stat(xlogpath, &stat_buf) != 0)
-	{
-		if (errno != ENOENT)
-			ereport(FATAL,
-					(errcode_for_file_access(),
-					 errmsg("could not stat file \"%s\": %m",
-							xlogpath)));
-	}
-	else
-	{
-		if (unlink(xlogpath) != 0)
-			ereport(FATAL,
-					(errcode_for_file_access(),
-					 errmsg("could not remove file \"%s\": %m",
-							xlogpath)));
-	}
+	FileUnlink(xlogpath);
 
 	/*
 	 * Calculate the archive file cutoff point for use during log shipping
@@ -136,96 +117,24 @@ RestoreArchivedFile(char *path, const char *xlogfname,
 	 * flags to signify the point when we can begin deleting WAL files from
 	 * the archive.
 	 */
-	if (cleanupEnabled)
-	{
-		GetOldestRestartPoint(&restartRedoPtr, &restartTli);
-		XLByteToSeg(restartRedoPtr, restartSegNo, wal_segment_size);
-		XLogFileName(lastRestartPointFname, restartTli, restartSegNo,
-					 wal_segment_size);
-		/* we shouldn't need anything earlier than last restart point */
-		Assert(strcmp(lastRestartPointFname, xlogfname) <= 0);
-	}
-	else
-		XLogFileName(lastRestartPointFname, 0, 0L, wal_segment_size);
+	XLogFileNameLastPoint(lastRestartPointFname, cleanupEnabled);
+	Assert(strcmp(lastRestartPointFname, xlogfname) <= 0);
 
-	/* Build the restore command to execute */
-	xlogRestoreCmd = BuildRestoreCommand(recoveryRestoreCommand,
-										 xlogpath, xlogfname,
-										 lastRestartPointFname);
-	if (xlogRestoreCmd == NULL)
-		elog(ERROR, "could not build restore command \"%s\"",
-			 recoveryRestoreCommand);
-
-	ereport(DEBUG3,
-			(errmsg_internal("executing restore command \"%s\"",
-							 xlogRestoreCmd)));
-
-	/*
-	 * Check signals before restore command and reset afterwards.
-	 */
-	PreRestoreCommand();
-
-	/*
-	 * Copy xlog from archival storage to XLOGDIR
-	 */
-	rc = system(xlogRestoreCmd);
-
-	PostRestoreCommand();
-	pfree(xlogRestoreCmd);
+	rc = DoRestore(xlogpath, xlogfname, lastRestartPointFname);
 
 	if (rc == 0)
 	{
-		/*
-		 * command apparently succeeded, but let's make sure the file is
-		 * really there now and has the correct size.
-		 */
-		if (stat(xlogpath, &stat_buf) == 0)
+		bool file_not_found;
+		bool ret = FileValidateSize(xlogpath, expectedSize, xlogfname,
+									&file_not_found);
+		if (ret)
 		{
-			if (expectedSize > 0 && stat_buf.st_size != expectedSize)
-			{
-				int			elevel;
+			strcpy(path, xlogpath);
+			return true;
+		}
 
-				/*
-				 * If we find a partial file in standby mode, we assume it's
-				 * because it's just being copied to the archive, and keep
-				 * trying.
-				 *
-				 * Otherwise treat a wrong-sized file as FATAL to ensure the
-				 * DBA would notice it, but is that too strong? We could try
-				 * to plow ahead with a local copy of the file ... but the
-				 * problem is that there probably isn't one, and we'd
-				 * incorrectly conclude we've reached the end of WAL and we're
-				 * done recovering ...
-				 */
-				if (StandbyMode && stat_buf.st_size < expectedSize)
-					elevel = DEBUG1;
-				else
-					elevel = FATAL;
-				ereport(elevel,
-						(errmsg("archive file \"%s\" has wrong size: %lu instead of %lu",
-								xlogfname,
-								(unsigned long) stat_buf.st_size,
-								(unsigned long) expectedSize)));
-				return false;
-			}
-			else
-			{
-				ereport(LOG,
-						(errmsg("restored log file \"%s\" from archive",
-								xlogfname)));
-				strcpy(path, xlogpath);
-				return true;
-			}
-		}
-		else
-		{
-			/* stat failed */
-			if (errno != ENOENT)
-				ereport(FATAL,
-						(errcode_for_file_access(),
-						 errmsg("could not stat file \"%s\": %m",
-								xlogpath)));
-		}
+		if (!file_not_found)
+			return false;
 	}
 
 	/*
diff --git a/src/backend/access/transam/xlogrestore.c b/src/backend/access/transam/xlogrestore.c
new file mode 100644
index 00000000000..f5c6a14cbe1
--- /dev/null
+++ b/src/backend/access/transam/xlogrestore.c
@@ -0,0 +1,830 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogrestore.c
+ *	  Infrastructure for parallel executing restore commands
+ *
+ * Copyright (c) 2014-2020, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/transam/xlogrestore.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "access/xlogrestore.h"
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xlogarchive.h"
+#include "access/xlogdefs.h"
+#include "access/xlogutils.h"
+#include "common/archive.h"
+#include "common/file_perm.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "port.h"
+#include "port/atomics.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/startup.h"
+#include "storage/ipc.h"
+#include "storage/spin.h"
+#include "storage/shmem.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/latch.h"
+#include "storage/lock.h"
+#include "tcop/tcopprot.h"
+#include "utils/timestamp.h"
+#include "utils/memutils.h"
+
+#define PREFETCH_DIR XLOGDIR "/" PG_TEMP_FILES_DIR
+#define PREFETCH_RATIO (16)
+
+int			max_restore_command_workers;
+
+/*
+ * Data for restore_command bgworker.
+ */
+typedef struct RestoreSlot
+{
+	slock_t		spin;
+
+	/*
+	 * The name of the archive file %f.
+	 */
+	char		xlogfname[MAXFNAMELEN];
+
+	/*
+	 * The name of the last recovery point file %r.
+	 */
+	char		pointfname[MAXFNAMELEN];
+
+	/*
+	 * The restore_command was called.
+	 */
+	bool		bgwdone;
+
+	/*
+	 * restore_command exit code
+	 */
+	int			bgwrc;
+
+	/*
+	 * Used to check that this slot is still linked with the bgworker.
+	 */
+	uint64		bgwid;
+
+	/*
+	 * bgworker start time.
+	 */
+	TimestampTz bgwtime;
+
+	/*
+	 * The pointer is valid for the launcher only.
+	 */
+	BackgroundWorkerHandle *bgwhandle;
+}			RestoreSlot;
+
+typedef struct RestoreDataStruct
+{
+	/*
+	 * Counter for bgworkers identification.
+	 */
+	uint64		counter;
+
+	/*
+	 * Number of prefetched files. Required to limit the number of prefetched
+	 * files (max_restore_command_workers*PREFETCH_RATIO). Replaces direct
+	 * counting of files in a PREFETCH_DIR.
+	 */
+	pg_atomic_uint32 nprefetched;
+
+	/*
+	 * Data for background workers.
+	 */
+	RestoreSlot slots[FLEXIBLE_ARRAY_MEMBER];
+}			RestoreDataStruct;
+
+RestoreDataStruct *RestoreData = NULL;
+
+static bool RestoreCommandPrefetch(char *, const char *, const char *);
+static void RestoreCommandPredict(const char *);
+static int	RestoreCommandGarbage(void);
+static bool RestoreSlotSpawn(RestoreSlot *);
+static void RestoreSlotWait(RestoreSlot *);
+static void RestoreSlotReset(RestoreSlot *);
+static void RestoreSlotSetup(RestoreSlot *, const char *, const char *);
+static RestoreSlot * RestoreSlotEmplace(bool);
+static RestoreSlot * RestoreSlotFind(const char *);
+static RestoreSlot * RestoreSlotPredict(const char *, const char *);
+static void XLogFileNamePredict(char *, const char *);
+static void XLogFilePathPrefetch(char *, const char *);
+static bool FilePathExists(const char *);
+
+/*
+ * Calculate a size of shared memory used for storing bgworker slots.
+ */
+Size
+RestoreCommandShmemSize(void)
+{
+	Size		size;
+
+	size = sizeof(RestoreDataStruct);
+	size = MAXALIGN(size);
+	size = add_size(size, mul_size(max_restore_command_workers,
+								   sizeof(RestoreSlot)));
+	return size;
+
+}
+
+/*
+ * Create a temporary directory to store prepfetched files
+ * and initialize a shared memory used for storing bgworker slots.
+ */
+void
+RestoreCommandShmemInit(void)
+{
+	bool		found;
+
+	RestoreData = (RestoreDataStruct *)
+		ShmemInitStruct("Restore Command Workers Data",
+						RestoreCommandShmemSize(),
+						&found);
+
+	if (!found)
+	{
+		int			i;
+
+		/* nprefetched is also set to 0 by this */
+		memset(RestoreData, 0, RestoreCommandShmemSize());
+
+		/*
+		 * Initialize memory and spin locks for each worker slot.
+		 */
+		for (i = 0; i < max_restore_command_workers; ++i)
+		{
+			RestoreSlot *slot = &RestoreData->slots[i];
+
+			memset(slot, 0, sizeof(RestoreSlot));
+			SpinLockInit(&slot->spin);
+
+		}
+
+		/* Create or clear temporary wals. */
+		PathNameCreateTemporaryDir(XLOGDIR, PREFETCH_DIR);
+		RemovePgTempFilesInDir(PREFETCH_DIR, true, true);
+		/* before_shmem_exit(pg_on_exit_callback function, Datum arg) */
+	}
+}
+
+/*
+ * This function is analogue of RestoreArchivedFile function with xlogs
+ * prefetching.
+ *
+ * If successful, subsequent WAL files will be predicted and
+ * bgworkers processes be run to restore the predicted files.
+ *
+ * The number of predicted files will be limited to the number of free slots.
+ *
+ * If not successful, then fallback RestoreArchivedFile will be called.
+ */
+bool
+RestoreCommandXLog(char *path, const char *xlogfname, const char *recovername,
+				   const off_t expectedSize, bool cleanupEnabled)
+{
+
+	char		pointfname[MAXFNAMELEN];
+	char		xlogpath[MAXPGPATH];
+#ifdef USE_ASSERT_CHECKING
+	uint32		new_val;
+#endif
+
+	/*
+	 * Synchronous mode.
+	 */
+	if (max_restore_command_workers < 1)
+		goto fallback;
+
+	/*
+	 * Ignore restore_command when not in archive recovery (meaning we are in
+	 * crash recovery).
+	 */
+	if (!ArchiveRecoveryRequested)
+		goto fallback;
+
+	/*
+	 * In standby mode, restore_command might not be supplied.
+	 */
+	if (recoveryRestoreCommand == NULL ||
+		strcmp(recoveryRestoreCommand, "") == 0)
+		goto fallback;
+
+	/*
+	 * Create the last restart point file name.
+	 */
+	XLogFileNameLastPoint(pointfname, cleanupEnabled);
+
+	/*
+	 * We shouldn't need anything earlier than the last restart point.
+	 */
+	Assert(strcmp(pointfname, xlogfname) <= 0);
+
+	/*
+	 * If the restore failed, try fallback result.
+	 */
+	if (!RestoreCommandPrefetch(xlogpath, xlogfname, pointfname))
+		goto fallback;
+
+	/*
+	 * Make sure the file is really there now and has the correct size.
+	 */
+	if (!FileValidateSize(xlogpath, expectedSize, xlogfname, NULL))
+	{
+		/* Remove artifacts. */
+		FileUnlink(xlogpath);
+		goto fallback;
+	}
+
+	/*
+	 * Move file to target path.
+	 */
+	snprintf(path, MAXPGPATH, XLOGDIR "/%s", recovername);
+	durable_rename(xlogpath, path, ERROR);
+
+#ifdef USE_ASSERT_CHECKING
+	new_val = pg_atomic_sub_fetch_u32(&RestoreData->nprefetched, 1);
+
+	/*
+	 * new_val expected to be >= 0. The assert below checks that
+	 * RestoreData->nprefetched is not wrapped around 0 after atomic decrement.
+	 */
+	Assert(new_val != UINT_MAX);
+#else
+	pg_atomic_sub_fetch_u32(&RestoreData->nprefetched, 1);
+#endif
+
+	/*
+	 * Log message like in RestoreArchivedFile.
+	 */
+	ereport(LOG,
+			(errmsg("restored log file \"%s\" from archive",
+					xlogfname)));
+
+	/*
+	 * Remove obsolete slots.
+	 */
+	if (RestoreCommandGarbage() > 0)
+	{
+		/*
+		 * Predict next logs and spawn bgworkers.
+		 */
+		RestoreCommandPredict(xlogfname);
+	}
+
+	return true;
+
+fallback:
+
+	/*
+	 * On any errors - try default implementation
+	 */
+	return RestoreArchivedFile(path, xlogfname, recovername, expectedSize,
+							   cleanupEnabled);
+}
+
+/*
+ * Attempt to retrieve the specified file from off-line archival storage
+ * to PREFDIR directory.
+ *
+ * Fill "path" with its complete path.
+ *
+ * Return true if command was executed successfully and file exists, or the
+ * file is found in the PREFDIR directory.
+ */
+static bool
+RestoreCommandPrefetch(char *xlogpath, const char *xlogfname,
+					   const char *pointfname)
+{
+	RestoreSlot *slot;
+	bool		bgwdone;
+	int			rc;
+
+	/*
+	 * Make prefetched path for file.
+	 */
+	XLogFilePathPrefetch(xlogpath, xlogfname);
+
+	/*
+	 * Check if file already on bgworker pool.
+	 */
+	if (!(slot = RestoreSlotFind(xlogfname)))
+	{
+		/*
+		 * Check if file already on prefetch dir.
+		 */
+		if (FilePathExists(xlogpath))
+			return true;
+
+		/*
+		 * Emplace a new slot and spawn bgworker.
+		 */
+		slot = RestoreSlotEmplace(true);
+		Assert(slot);
+
+		/*
+		 * When the function RestoreSlotEmplace is invoked with the 'force'
+		 * argument having true value this function calls the function
+		 * RestoreSlotReset(). The function RestoreSlotReset() terminates active
+		 * bgworker process if there is a bgwhandle associated with the slot.
+		 * In result, when RestoreSlotEmplace(true) returns a control flow,
+		 * the process that executes RestoreCommandWorkerMain() has been already
+		 * finished or being finished. Anyway, it is safe to reset slot's data
+		 * used from RestoreCommandWorkerMain() without first taking a lock
+		 * on the spin lock slot->spin.
+		 */
+		RestoreSlotSetup(slot, xlogfname, pointfname);
+
+		if (!RestoreSlotSpawn(slot))
+			ereport(FATAL,
+					(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+					 errmsg("out of background worker slots"),
+					 errhint("You might need to increase max_worker_processes.")
+					));
+
+	}
+
+	RestoreSlotWait(slot);
+
+	bgwdone = slot->bgwdone;
+	rc = slot->bgwrc;
+
+	RestoreSlotReset(slot);
+
+	/*
+	 * bgworker didn't execute restore_command
+	 */
+	if (!bgwdone)
+		return false;
+
+	/*
+	 * bgworker failed
+	 */
+	if (rc)
+	{
+		ereport(wait_result_is_any_signal(rc, true) ? FATAL : DEBUG2,
+				(errmsg("could not restore file \"%s\" from archive: %s",
+						xlogfname, wait_result_to_str(rc))));
+	}
+
+	return FilePathExists(xlogpath);
+}
+
+/*
+ * Predict next logs and spawn bgworkers while possible.
+ */
+static void
+RestoreCommandPredict(const char *xlogfname)
+{
+	char		pointfname[MAXFNAMELEN];
+	const char	*xlogfnext;
+	RestoreSlot	*slot;
+	int			spawn_limit;
+
+	XLogFileNameLastPoint(pointfname, false);
+	xlogfnext = xlogfname;
+
+	spawn_limit = PREFETCH_RATIO * max_restore_command_workers -
+		pg_atomic_read_u32(&RestoreData->nprefetched);
+
+	while (spawn_limit-- > 0)
+	{
+		if (!(slot = RestoreSlotPredict(xlogfnext, pointfname)))
+			break;
+
+		if (!RestoreSlotSpawn(slot))
+		{
+			ereport(WARNING,
+					(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+					 errmsg("out of background worker slots"),
+					 errhint("You might need to increase max_worker_processes.")
+					));
+			break;
+		}
+
+		ereport(DEBUG1,
+				(errmsg("predicted log file \"%s\"",
+						slot->xlogfname)));
+
+		xlogfnext = slot->xlogfname;
+	}
+}
+
+/*
+ * Cleaning garbage on pool.
+ *
+ * Reset the slots linked to completed bgworkers.
+ *
+ * Return free slots count.
+ */
+static int
+RestoreCommandGarbage(void)
+{
+	int			capacity;
+	int			i;
+
+	for (i = 0, capacity = 0; i < max_restore_command_workers; ++i)
+	{
+		RestoreSlot *slot = &RestoreData->slots[i];
+
+		SpinLockAcquire(&slot->spin);
+		if (slot->bgwdone)
+		{
+			int			bgwrc = slot->bgwrc;
+
+			/*
+			 * RestoreSlotReset() terminates a process if it does exist and
+			 * releases a memory pointed by slot->bgwhandle. RestoreSlotReset()
+			 * must be called before invoking of ereport() since the latter
+			 * can result in termination of current process. In this case,
+			 * bgworker process be orphaned and memory referenced by the
+			 * slot->bgwhandle data member wouldn't be released. Although this
+			 * memory will be released anyway on process termination, some
+			 * memory sanitizer tools could produce error report for such memory
+			 * chunks.
+			 */
+			RestoreSlotReset(slot);
+
+			if (bgwrc)
+			{
+				ereport(wait_result_is_any_signal(bgwrc, true) ? FATAL :
+																 DEBUG2,
+						(errmsg("restore_command error: %s",
+								wait_result_to_str(bgwrc))));
+			}
+
+			capacity++;
+		}
+		else if (slot->bgwhandle == NULL)
+			/*
+			 * Slots that are not assigned to any bgworker should be also
+			 * counted.
+			 */
+			capacity++;
+
+		SpinLockRelease(&slot->spin);
+	}
+
+	return capacity;
+}
+
+/*
+ * The main entry point for bgworker.
+ */
+void
+RestoreCommandWorkerMain(Datum main_arg)
+{
+	int			rc;
+	RestoreSlot *slot;
+	uint64		bgwid;
+	bool		linked;
+	char		xlogfname[MAXFNAMELEN];
+	char		pointfname[MAXFNAMELEN];
+	char		xlogpath[MAXPGPATH];
+
+	/* Establish signal handlers. */
+	pqsignal(SIGTERM, die);
+	/* We're now ready to receive signals. */
+	BackgroundWorkerUnblockSignals();
+
+	/* Get bgwid */
+	memcpy(&bgwid, MyBgworkerEntry->bgw_extra, sizeof(bgwid));
+
+	/* Get RestoreSlot */
+	slot = &RestoreData->slots[DatumGetInt16(main_arg)];
+	SpinLockAcquire(&slot->spin);
+	if ((linked = (slot->bgwid == bgwid)))
+	{
+		strlcpy(xlogfname, slot->xlogfname, sizeof(xlogfname));
+		strlcpy(pointfname, slot->pointfname, sizeof(pointfname));
+	}
+	SpinLockRelease(&slot->spin);
+
+	if (!linked)
+		ereport(FATAL,
+				(errmsg("slot " UINT64_FORMAT " is unlinked during a restore",
+						bgwid)));
+
+	/* Prepare path. */
+	XLogFilePathPrefetch(xlogpath, xlogfname);
+
+	/* Make sure there is no existing file named recovername. */
+	FileUnlink(xlogpath);
+
+	/* Prepare and execute the restore command. */
+	if ((rc = DoRestore(xlogpath, xlogfname, pointfname)))
+		FileUnlink(xlogpath);
+
+	CHECK_FOR_INTERRUPTS();
+
+	/* Keep the results. */
+	SpinLockAcquire(&slot->spin);
+	/*
+	 * Retesting of the condition 'slot->bgwid == bgwid' is required to
+	 * guard against reusing of a slot inside RestoreCommandPrefetch function
+	 * when RestoreSlotEmplace function called with argument value equals true.
+	*/
+	if ((linked = (slot->bgwid == bgwid)))
+	{
+		slot->bgwdone = true;
+		slot->bgwrc = rc;
+
+		if (FilePathExists(xlogpath))
+			pg_atomic_add_fetch_u32(&RestoreData->nprefetched, 1);
+
+	}
+	SpinLockRelease(&slot->spin);
+
+	/* If slot was unlinked - delete restored file. */
+	if (!linked)
+	{
+		FileUnlink(xlogpath);
+
+		ereport(FATAL,
+				(errmsg("slot %s is unlinked during a restore",
+						xlogfname)));
+	}
+	else
+		ereport(DEBUG2,
+				(errmsg_internal("slot %s done %d \"%s\"",
+								 xlogfname, rc, xlogpath)));
+
+	proc_exit(0);
+}
+
+
+/*
+ * Setup and spawn bgworker.
+ * Link it to the slot by bgwhandle.
+ */
+static bool
+RestoreSlotSpawn(RestoreSlot *slot)
+{
+	BackgroundWorker bgw;
+
+	memset(&bgw, 0, sizeof(bgw));
+	snprintf(bgw.bgw_name, sizeof(bgw.bgw_name), "restore %s", slot->xlogfname);
+
+	/*
+	 * Length of the string literal "Restore Command Worker" is less than
+	 * size of a buffer referenced by the data member bgw.bgw_type (the size is
+	 * limited by the constant BGW_MAXLEN that currently has value 96).
+	 * Therefore we can use function strcpy() instead of strncpy/strlcpy to copy
+	 * the string literal into the buffer bgw.bgw_type. The same is true for
+	 * other two string literals "postgres" and "RestoreCommandWorkerMain" and
+	 * their corresponding destination buffers referenced by the data members
+	 * bgw.bgw_library_name, bgw.bgw_function_name.
+	 * To guards against further possible change of limit represented by the
+	 * constant BGW_MAXLEN the asserts have been inserted before invoking of
+	 * the function strcpy() as a sanity check. In case some of these asserts be
+	 * fired it means that some really drastic change was done in the core
+	 * source code that should be carefully studied.
+	 */
+	Assert(sizeof(bgw.bgw_type) >= sizeof("Restore Command Worker"));
+	Assert(sizeof(bgw.bgw_library_name) >= sizeof("postgres"));
+	Assert(sizeof(bgw.bgw_function_name) >= sizeof("RestoreCommandWorkerMain"));
+
+	strcpy(bgw.bgw_type, "Restore Command Worker");
+	strcpy(bgw.bgw_library_name, "postgres");
+	strcpy(bgw.bgw_function_name, "RestoreCommandWorkerMain");
+
+	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
+
+	/*
+	 * BgWorkerStart_PostmasterStart for PM_RECOVERY, PM_STARTUP
+	 * BgWorkerStart_ConsistentState for PM_HOT_STANDBY
+	 */
+	bgw.bgw_start_time = HotStandbyActive() ? BgWorkerStart_ConsistentState :
+		BgWorkerStart_PostmasterStart;
+
+	bgw.bgw_restart_time = BGW_NEVER_RESTART;
+	bgw.bgw_main_arg = Int16GetDatum(slot - RestoreData->slots);
+	memcpy(bgw.bgw_extra, &slot->bgwid, sizeof(slot->bgwid));
+	bgw.bgw_notify_pid = MyProcPid;
+	return RegisterDynamicBackgroundWorker(&bgw, &slot->bgwhandle);
+}
+
+/*
+ * Wait linked bgworker to shutdown.
+ */
+static void
+RestoreSlotWait(RestoreSlot *slot)
+{
+	BgwHandleStatus status;
+	pid_t		pid;
+
+	/* is linked slot by bgworker */
+	if (slot->bgwhandle == NULL)
+		return;
+
+	/* WaitForBackgroundWorkerShutdown  */
+	for (;;)
+	{
+		status = GetBackgroundWorkerPid(slot->bgwhandle, &pid);
+		if (status == BGWH_STOPPED)
+			break;
+
+		WaitLatch(MyLatch,
+				  WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+				  8,
+				  PG_WAIT_EXTENSION);
+
+		ResetLatch(MyLatch);
+		HandleStartupProcInterrupts();
+	}
+
+	pfree(slot->bgwhandle);
+
+	slot->bgwhandle = NULL;
+}
+
+/*
+ * Reset slot params and terminate linked bgworker if exists.
+ */
+static void
+RestoreSlotReset(RestoreSlot *slot)
+{
+	slot->xlogfname[0] = '\0';
+	slot->pointfname[0] = '\0';
+	slot->bgwdone = false;
+	slot->bgwrc = 0;
+	slot->bgwid = 0;
+	slot->bgwtime = 0;
+	if (slot->bgwhandle)
+	{
+		/* If there's a bgworker tied up, stop it. */
+		TerminateBackgroundWorker(slot->bgwhandle);
+
+		/* Release memory. */
+		pfree(slot->bgwhandle);
+		slot->bgwhandle = NULL;
+	}
+}
+
+/*
+ *  Configure slot options.
+ */
+static void
+RestoreSlotSetup(RestoreSlot * slot, const char *xlogfname,
+				 const char *pointfname)
+{
+	strlcpy(slot->xlogfname, xlogfname, sizeof(slot->xlogfname));
+	strlcpy(slot->pointfname, pointfname, sizeof(slot->pointfname));
+	slot->bgwid = RestoreData->counter++;
+	slot->bgwtime = GetCurrentTimestamp();
+}
+
+/*
+ * Get a free slot.
+ *
+ * Return NULL if no free slots.
+ *
+ * If force param is TRUE and there are no free slots,
+ * then return the earliest slot.
+ */
+static RestoreSlot *
+RestoreSlotEmplace(bool force)
+{
+	int			i;
+	RestoreSlot *reslot;
+
+	reslot = NULL;
+	for (i = 0; i < max_restore_command_workers; ++i)
+	{
+		RestoreSlot *slot = &RestoreData->slots[i];
+
+		if (!slot->bgwhandle)
+		{
+			reslot = slot;
+			break;
+		}
+
+		if (reslot == NULL || slot->bgwtime < reslot->bgwtime)
+			reslot = slot;
+	}
+
+	if (!reslot)
+		return NULL;
+
+	/* Do not use linked slots in unforced mode. */
+	if (!force && reslot->bgwhandle)
+		return NULL;
+
+	/* Reset slot (unlink if linked).  */
+	SpinLockAcquire(&reslot->spin);
+	RestoreSlotReset(reslot);
+	SpinLockRelease(&reslot->spin);
+
+	return reslot;
+}
+
+/*
+ * Find a slot on pool by WAL name.
+ */
+static RestoreSlot *
+RestoreSlotFind(const char *xlogfname)
+{
+	int			i;
+
+	for (i = 0; i < max_restore_command_workers; ++i)
+	{
+		RestoreSlot *slot = &RestoreData->slots[i];
+
+		if (!strcmp(slot->xlogfname, xlogfname))
+			return slot;
+	}
+
+	return NULL;
+}
+
+/*
+ * Predict the next WAL name and allocate a slot for it.
+ * Return NULL if no slots are available.
+ */
+static RestoreSlot *
+RestoreSlotPredict(const char *xlogfname, const char *pointfname)
+{
+	char		xlogfnext[MAXFNAMELEN];
+	char		xlogpath[MAXPGPATH];
+
+	strlcpy(xlogfnext, xlogfname, sizeof(xlogfnext));
+
+	for (;;)
+	{
+		RestoreSlot *slot;
+
+		/* already in pool */
+		XLogFileNamePredict(xlogfnext, xlogfnext);
+
+		if (RestoreSlotFind(xlogfnext))
+			continue;
+
+		/* already restored */
+		XLogFilePathPrefetch(xlogpath, xlogfnext);
+
+		if (FilePathExists(xlogpath))
+			continue;
+
+		if (!(slot = RestoreSlotEmplace(false)))
+			break;
+
+		RestoreSlotSetup(slot, xlogfnext, pointfname);
+		return slot;
+	}
+
+	return NULL;
+}
+
+/*
+ * Predict the name of the next WAL file "xlognext",
+ * based on the specified "xlogfname".
+ */
+static void
+XLogFileNamePredict(char *xlogfnext, const char *xlogfname)
+{
+	uint32		tli;
+	XLogSegNo	segno;
+
+	XLogFromFileName(xlogfname, &tli, &segno, wal_segment_size);
+	XLogFileName(xlogfnext, tli, segno + 1, wal_segment_size);
+}
+
+static void
+XLogFilePathPrefetch(char *path, const char *xlogfname)
+{
+	snprintf(path, MAXPGPATH, PREFETCH_DIR "/%s", xlogfname);
+}
+
+/*
+ * Check that the path does exist.
+ */
+static bool
+FilePathExists(const char *xlogpath)
+{
+	struct stat statbuf;
+
+	if (stat(xlogpath, &statbuf) == 0)
+		return true;
+
+	if (errno != ENOENT)
+		ereport(FATAL,
+				(errcode_for_file_access(),
+				 errmsg("could not stat file \"%s\": %m",
+						xlogpath)));
+
+	return false;
+}
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 7e915bcadf1..78afaf31f6b 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -18,13 +18,16 @@
 #include "postgres.h"
 
 #include <unistd.h>
+#include <sys/stat.h>
 
 #include "access/timeline.h"
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
+#include "common/archive.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/startup.h"
 #include "storage/smgr.h"
 #include "utils/guc.h"
 #include "utils/hsearch.h"
@@ -979,3 +982,169 @@ WALReadRaiseError(WALReadError *errinfo)
 						(Size) errinfo->wre_req)));
 	}
 }
+
+/*
+ * Remove a file if it does exist.
+ */
+void
+FileUnlink(const char *file_path)
+{
+	struct stat statbuf;
+
+	if (stat(file_path, &statbuf))
+	{
+		if (errno != ENOENT)
+			ereport(FATAL,
+					(errcode_for_file_access(),
+					 errmsg("could not stat file \"%s\": %m",
+							file_path)));
+	}
+	else
+	{
+		if (unlink(file_path) != 0)
+			ereport(FATAL,
+					(errcode_for_file_access(),
+					 errmsg("could not remove file \"%s\": %m",
+							file_path)));
+	}
+}
+
+/*
+ * Get the last valid restart point file name.
+ *
+ * If cleanup is not enabled, initialise the last restart point file name
+ * with InvalidXLogRecPtr, which will prevent the deletion of any WAL files
+ * from the archive because of the alphabetic sorting property of WAL
+ * filenames.
+ */
+void
+XLogFileNameLastPoint(char *lastRestartPointFname, bool cleanupEnabled)
+{
+	XLogSegNo	restartSegNo;
+	XLogRecPtr	restartRedoPtr;
+	TimeLineID	restartTli;
+
+	if (cleanupEnabled)
+	{
+		GetOldestRestartPoint(&restartRedoPtr, &restartTli);
+		XLByteToSeg(restartRedoPtr, restartSegNo, wal_segment_size);
+		XLogFileName(lastRestartPointFname, restartTli, restartSegNo,
+					 wal_segment_size);
+	}
+	else
+		XLogFileName(lastRestartPointFname, 0, 0L, wal_segment_size);
+}
+
+/*
+ * Check a file is really there now and has correct size.
+ *
+ * Return true if the file does exist and has correct size,
+ * else return false.
+ *
+ * If the output variable file_not_found is not null it's assigned
+ * either true or false value depending on whether the file does exist
+ * or not.
+ */
+bool
+FileValidateSize(const char *xlogpath, off_t expectedSize,
+				 const char *xlogfname, bool *file_not_found)
+{
+	struct stat stat_buf;
+
+	if (stat(xlogpath, &stat_buf) == 0)
+	{
+		if (file_not_found)
+			*file_not_found = false;
+
+		if (expectedSize > 0 && stat_buf.st_size != expectedSize)
+		{
+			int			elevel;
+
+			/*
+			 * If we find a partial file in standby mode, we assume it's
+			 * because it's just being copied to the archive, and keep
+			 * trying.
+			 *
+			 * Otherwise treat a wrong-sized file as FATAL to ensure the
+			 * DBA would notice it, but is that too strong? We could try
+			 * to plow ahead with a local copy of the file ... but the
+			 * problem is that there probably isn't one, and we'd
+			 * incorrectly conclude we've reached the end of WAL and we're
+			 * done recovering ...
+			 */
+			if (StandbyMode && stat_buf.st_size < expectedSize)
+				elevel = DEBUG1;
+			else
+				elevel = FATAL;
+			ereport(elevel,
+					(errmsg("archive file \"%s\" has wrong size: %lu instead of %lu",
+							xlogfname,
+							(unsigned long) stat_buf.st_size,
+							(unsigned long) expectedSize)));
+			return false;
+		}
+		else
+		{
+			ereport(LOG,
+					(errmsg("restored log file \"%s\" from archive",
+							xlogfname)));
+			return true;
+		}
+	}
+	else
+	{
+		/* stat failed */
+		if (errno != ENOENT)
+			ereport(FATAL,
+					(errcode_for_file_access(),
+							errmsg("could not stat file \"%s\": %m",
+									xlogpath)));
+		if (file_not_found)
+			*file_not_found = true;
+
+		return false;
+	}
+
+}
+
+/*
+ * Build and execute restore_command.
+ *
+ * Return the result of command execution (the exit status of the shell),
+ * or -1 if a system error occurred. A return value of 127 means
+ * the execution of the shell failed.
+ */
+int
+DoRestore(const char *xlogpath, const char *xlogfname, const char *pointfname)
+{
+	char	   *xlogRestoreCmd;
+	int			rc;
+
+	/* Build a restore command to execute */
+	xlogRestoreCmd = BuildRestoreCommand(recoveryRestoreCommand, xlogpath,
+										 xlogfname, pointfname);
+
+	if (xlogRestoreCmd == NULL)
+		elog(PANIC, "could not build restore command \"%s\"",
+			 recoveryRestoreCommand);
+
+	ereport(DEBUG3,
+			(errmsg_internal("executing restore command \"%s\"",
+							 xlogRestoreCmd)));
+
+	/*
+	 * Check signals before restore command and reset afterwards.
+	 */
+	PreRestoreCommand();
+
+	/*
+	 * Execute
+	 */
+	rc = system(xlogRestoreCmd);
+
+	PostRestoreCommand();
+
+	pfree(xlogRestoreCmd);
+
+	return rc;
+}
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index beb5e85434c..55c21490945 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "access/parallel.h"
+#include "access/xlogrestore.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -128,6 +129,9 @@ static const struct
 	},
 	{
 		"ApplyWorkerMain", ApplyWorkerMain
+	},
+	{
+		"RestoreCommandWorkerMain", RestoreCommandWorkerMain
 	}
 };
 
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 96c2aaabbd6..93e0167454f 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
 #include "access/subtrans.h"
 #include "access/syncscan.h"
 #include "access/twophase.h"
+#include "access/xlogrestore.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -149,6 +150,7 @@ CreateSharedMemoryAndSemaphores(void)
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+		size = add_size(size, RestoreCommandShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -259,6 +261,7 @@ CreateSharedMemoryAndSemaphores(void)
 	WalSndShmemInit();
 	WalRcvShmemInit();
 	ApplyLauncherShmemInit();
+	RestoreCommandShmemInit();
 
 	/*
 	 * Set up other modules that need some shared memory space
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index de87ad6ef70..d3e633dd12e 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -37,6 +37,7 @@
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/xlogrestore.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_authid.h"
 #include "catalog/storage.h"
@@ -196,6 +197,7 @@ static const char *show_tcp_keepalives_count(void);
 static const char *show_tcp_user_timeout(void);
 static bool check_maxconnections(int *newval, void **extra, GucSource source);
 static bool check_max_worker_processes(int *newval, void **extra, GucSource source);
+static bool check_max_restore_command_workers(int *newval, void **extra, GucSource source);
 static bool check_autovacuum_max_workers(int *newval, void **extra, GucSource source);
 static bool check_max_wal_senders(int *newval, void **extra, GucSource source);
 static bool check_autovacuum_work_mem(int *newval, void **extra, GucSource source);
@@ -2982,6 +2984,18 @@ static struct config_int ConfigureNamesInt[] =
 		check_max_worker_processes, NULL, NULL
 	},
 
+	{
+		{"max_restore_command_workers",
+			PGC_POSTMASTER,
+			WAL_ARCHIVE_RECOVERY,
+			gettext_noop("Maximum number of restore_command worker processes."),
+			NULL,
+		},
+		&max_restore_command_workers,
+		0, 0, MAX_PARALLEL_WORKER_LIMIT,
+		check_max_restore_command_workers, NULL, NULL
+	},
+
 	{
 		{"max_logical_replication_workers",
 			PGC_POSTMASTER,
@@ -11568,6 +11582,19 @@ check_max_worker_processes(int *newval, void **extra, GucSource source)
 	return true;
 }
 
+static bool
+check_max_restore_command_workers(int *newval, void **extra, GucSource source)
+{
+	if (*newval > max_worker_processes)
+	{
+		GUC_check_errdetail("A value of max_restore_command_worker can't "
+			"exceed a value of max_worker_processes=%d", max_worker_processes);
+		return false;
+	}
+
+	return true;
+}
+
 static bool
 check_effective_io_concurrency(int *newval, void **extra, GucSource source)
 {
diff --git a/src/include/access/xlogrestore.h b/src/include/access/xlogrestore.h
new file mode 100644
index 00000000000..3e5cf867a56
--- /dev/null
+++ b/src/include/access/xlogrestore.h
@@ -0,0 +1,16 @@
+#ifndef XLOGRESTORE_H
+#define XLOGRESTORE_H
+
+#include "postgres.h"
+
+extern int	max_restore_command_workers;
+
+extern Size RestoreCommandShmemSize(void);
+extern void RestoreCommandShmemInit(void);
+extern bool RestoreCommandXLog(char *path, const char *xlogfname,
+							   const char *recovername,
+							   const off_t expectedSize,
+							   bool cleanupEnabled);
+extern void RestoreCommandWorkerMain(Datum main_arg) pg_attribute_noreturn();
+
+#endif							/* XLOGRESTORE_H */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index e59b6cf3a9f..c040a8ced70 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -60,4 +60,13 @@ extern void XLogReadDetermineTimeline(XLogReaderState *state,
 
 extern void WALReadRaiseError(WALReadError *errinfo);
 
+extern void FileUnlink(const char *xlogpath);
+extern void XLogFileNameLastPoint(char *lastRestartPointFname,
+								  bool cleanupEnabled);
+extern bool FileValidateSize(char const *xlogpath, off_t expectedSize,
+							 char const *xlogfname, bool *file_not_found);
+
+extern int DoRestore(char const *xlogpath, char const *xlogfname,
+					 char const *pointfname);
+
 #endif
diff --git a/src/test/recovery/t/021_xlogrestore.pl b/src/test/recovery/t/021_xlogrestore.pl
new file mode 100644
index 00000000000..970ae6778ca
--- /dev/null
+++ b/src/test/recovery/t/021_xlogrestore.pl
@@ -0,0 +1,143 @@
+#
+# Test for xlogrestore with max_restore_command_workers parameter
+#
+
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 9;
+
+sub measure_replica_restore_time
+{
+	my ( $replica_name, $node_primary, $backup_name, $last_lsn, $tab_int_count, $config ) = @_;
+	my $timer = time();
+
+	# Initialize replica node from backup, fetching WAL from archives
+	my $node_replica = get_new_node( $replica_name );
+	$node_replica->init_from_backup( $node_primary, $backup_name,
+		has_restoring => 1 );
+	$node_replica->append_conf( 'postgresql.conf', $config );
+	$node_replica->start();
+
+	# Wait until necessary replay has been done on replica
+	my $caughtup_query =
+	  "SELECT '$last_lsn'::pg_lsn <= pg_last_wal_replay_lsn()";
+	$node_replica->poll_query_until( 'postgres', $caughtup_query )
+	  or die "Timed out while waiting for replica to catch up";
+
+	# Check tab_int's rows count
+	my $replica_tab_int_count =
+	  $node_replica->safe_psql( 'postgres', "SELECT count(*) FROM tab_int" );
+	is( $replica_tab_int_count, $tab_int_count, 'tab_int sizes are equal' );
+
+	# Check the presence of temporary files specifically generated during
+	# archive recovery.
+	$node_replica->promote();
+
+	my $node_replica_data = $node_replica->data_dir;
+	ok( !-f "$node_replica_data/pg_wal/RECOVERYHISTORY",
+		"RECOVERYHISTORY removed after promotion");
+	ok( !-f "$node_replica_data/pg_wal/RECOVERYXLOG",
+			"RECOVERYXLOG removed after promotion");
+	ok( !-d "$node_replica_data/pg_wal/prefetch",
+		"pg_wal/prefetch dir removed after promotion");
+
+	my $res = time() - $timer;
+
+	$node_replica->stop();
+	return $res;
+}
+
+# WAL produced count
+my $wal_count = 64;
+
+# Size of data portion
+my $wal_data_portion = 128;
+
+# Restore bgworkers count
+my $max_restore_command_workers = 4;
+
+# Sleep to imitate restore delays
+my $restore_sleep = 0.256;
+
+# Minimum expected acceleration of the restore process.
+# Is this formula correct?
+my $handicap = ( $wal_count * $restore_sleep ) / $max_restore_command_workers;
+
+# Initialize primary node, doing archives
+my $node_primary = get_new_node( 'primary' );
+$node_primary->init(
+	has_archiving    => 1,
+	allows_streaming => 1
+);
+
+# Start it
+$node_primary->start;
+
+# Take backup for replica.
+my $backup_name = 'my_backup';
+$node_primary->backup( $backup_name );
+
+# Create some content on primary server that will be not present on replicas.
+for ( my $i = 0; $i < $wal_count; $i++ )
+{
+	if ( $i == 0 ) {
+		$node_primary->safe_psql('postgres',
+			"CREATE TABLE tab_int ( a SERIAL NOT NULL PRIMARY KEY );")
+	} else {
+		$node_primary->safe_psql('postgres',
+			"INSERT INTO tab_int SELECT FROM generate_series( 1, $wal_data_portion );")
+	}
+	$node_primary->safe_psql('postgres', "SELECT pg_switch_wal()");
+}
+
+my $last_lsn = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn();");
+my $tab_int_count = $node_primary->safe_psql('postgres', "SELECT count(*) FROM tab_int;");
+
+$node_primary->stop();
+
+#	Restore command
+my $restore_command;
+my $path = TestLib::perl2host( $node_primary->archive_dir );
+if ( $TestLib::windows_os ) {
+	$path =~ s{\\}{\\\\}g;
+	$restore_command = qq(perl -e "select( undef, undef, undef, $restore_sleep );" & copy "$path\\\\%f" "%p);
+} else {
+	$restore_command = qq(sleep ) . $restore_sleep . qq( && cp "$path/%f" "%p");
+}
+
+#diag( $restore_command );
+
+# Compare the replica restore times with different max_restore_command_workers values.
+#diag( 'multiple_workers_restore_time' );
+my $multiple_workers_restore_time = measure_replica_restore_time(
+	'fast_restored_replica',
+	$node_primary,
+	$backup_name,
+	$last_lsn,
+	$tab_int_count,
+qq(
+wal_retrieve_retry_interval = '100ms'
+max_restore_command_workers = $max_restore_command_workers
+restore_command = '$restore_command'
+));
+
+#diag( 'single_worker_restore_time' );
+my $single_worker_restore_time = measure_replica_restore_time(
+	'normal_restored_replica',
+	$node_primary,
+	$backup_name,
+	$last_lsn,
+	$tab_int_count,
+qq(
+wal_retrieve_retry_interval = '100ms'
+max_restore_command_workers = 0
+restore_command = '$restore_command'
+));
+
+#diag( $multiple_workers_restore_time );
+#diag( $single_worker_restore_time );
+#diag( $handicap );
+
+ok( $multiple_workers_restore_time + $handicap < $single_worker_restore_time, "Multiple workers are faster than a single worker" );
diff --git a/src/test/regress/expected/guc.out b/src/test/regress/expected/guc.out
index 811f80a0976..be35f346df6 100644
--- a/src/test/regress/expected/guc.out
+++ b/src/test/regress/expected/guc.out
@@ -776,3 +776,24 @@ set default_with_oids to f;
 -- Should not allow to set it to true.
 set default_with_oids to t;
 ERROR:  tables declared WITH OIDS are not supported
+--
+-- PGPRO-3691: Check that a value for the new configration parameter
+-- max_restore_command_workers is limited by a value of the parameter
+-- max_worker_processes
+SHOW max_worker_processes;
+ max_worker_processes 
+----------------------
+ 8
+(1 row)
+
+-- max_worker_processes has default value 8
+-- Check that an attempt to set the parameter max_restore_command_workers
+-- to a value execeeding this limit results in error
+ALTER SYSTEM SET max_restore_command_workers = 16; -- fails, it is expected behaviour
+ERROR:  invalid value for parameter "max_restore_command_workers": 16
+DETAIL:  A value of max_restore_command_worker can't exceed a value of max_worker_processes=8
+-- Check that a value lesser than max_worker_processes can be assigned
+-- to the parameter max_restore_command_workers
+ALTER SYSTEM SET max_restore_command_workers = 7; -- ok since 7 < max_worker_processes
+-- Reset to default
+ALTER SYSTEM RESET max_restore_command_workers;
diff --git a/src/test/regress/sql/guc.sql b/src/test/regress/sql/guc.sql
index 43dbba3775e..e7b8a4c0c5e 100644
--- a/src/test/regress/sql/guc.sql
+++ b/src/test/regress/sql/guc.sql
@@ -296,3 +296,22 @@ reset check_function_bodies;
 set default_with_oids to f;
 -- Should not allow to set it to true.
 set default_with_oids to t;
+
+--
+-- PGPRO-3691: Check that a value for the new configration parameter
+-- max_restore_command_workers is limited by a value of the parameter
+-- max_worker_processes
+SHOW max_worker_processes;
+
+-- max_worker_processes has default value 8
+-- Check that an attempt to set the parameter max_restore_command_workers
+-- to a value execeeding this limit results in error
+
+ALTER SYSTEM SET max_restore_command_workers = 16; -- fails, it is expected behaviour
+
+-- Check that a value lesser than max_worker_processes can be assigned
+-- to the parameter max_restore_command_workers
+ALTER SYSTEM SET max_restore_command_workers = 7; -- ok since 7 < max_worker_processes
+
+-- Reset to default
+ALTER SYSTEM RESET max_restore_command_workers;
#2Pavel Stehule
pavel.stehule@gmail.com
In reply to: Dmitry Shulga (#1)
Re: Reduce the time required for a database recovery from archive.

út 8. 9. 2020 v 6:51 odesílatel Dmitry Shulga <d.shulga@postgrespro.ru>
napsal:

Hello hackers,

Currently, database recovery from archive is performed sequentially,
by reading archived WAL files and applying their records to the database.

Overall archive file processing is done one by one, and this might
create a performance bottleneck if archived WAL files are delivered slowly,
because the database server has to wait for arrival of the next
WAL segment before applying its records.

To address this issue it is proposed to receive archived WAL files in
parallel
so that when the next WAL segment file is required for processing of redo
log
records it would be already available.

Implementation of this approach assumes running several background
processes (bgworkers)
each of which runs a shell command specified by the parameter
restore_command
to deliver an archived WAL file. Number of running parallel processes is
limited
by the new parameter max_restore_command_workers. If this parameter has
value 0
then WAL files delivery is performed using the original algorithm, that is
in
one-by-one manner. If this parameter has value greater than 0 then the
database
server starts several bgworker processes up to the limit specified by
the parameter max_restore_command_workers and passes to every process
WAL file name to deliver. Active processes start prefetching of specified
WAL files and store received files in the directory pg_wal/pgsql_tmp. After
bgworker process finishes receiving a file it marks itself as a free
process
and waits for a new request to receive a next WAL file. The main process
performing database recovery still handles WAL files in one-by-one manner,
but instead of waiting for a next required WAL file's availability it
checks for
that file in the prefetched directory. If a new file is present there,
the main process starts its processing.

The patch implemeting the described approach is attached to this email.
The patch contains a test in the file src/test/recovery/t/
021_xlogrestore.pl
Although the test result depends on real execution time and hardly could be
approved for including to the repository it was added in order to show
a positive effect from applying the new algorithm. In my environment
restoring
from archive with parallel prefetching is twice as faster than in original
mode.

+1

it is interesting feature

Regards

Pavel

Show quoted text

Regards,
Dmitry.

#3Stephen Frost
sfrost@snowman.net
In reply to: Dmitry Shulga (#1)
Re: Reduce the time required for a database recovery from archive.

Greetings,

* Dmitry Shulga (d.shulga@postgrespro.ru) wrote:

Overall archive file processing is done one by one, and this might
create a performance bottleneck if archived WAL files are delivered slowly,
because the database server has to wait for arrival of the next
WAL segment before applying its records.

To address this issue it is proposed to receive archived WAL files in parallel
so that when the next WAL segment file is required for processing of redo log
records it would be already available.

Yes, pgbackrest already does exactly this (if configured)- uses parallel
processes to fetch the WAL and have it be available ahead of time.

Implementation of this approach assumes running several background processes (bgworkers)
each of which runs a shell command specified by the parameter restore_command
to deliver an archived WAL file. Number of running parallel processes is limited
by the new parameter max_restore_command_workers. If this parameter has value 0
then WAL files delivery is performed using the original algorithm, that is in
one-by-one manner. If this parameter has value greater than 0 then the database
server starts several bgworker processes up to the limit specified by
the parameter max_restore_command_workers and passes to every process
WAL file name to deliver. Active processes start prefetching of specified
WAL files and store received files in the directory pg_wal/pgsql_tmp. After
bgworker process finishes receiving a file it marks itself as a free process
and waits for a new request to receive a next WAL file. The main process
performing database recovery still handles WAL files in one-by-one manner,
but instead of waiting for a next required WAL file's availability it checks for
that file in the prefetched directory. If a new file is present there,
the main process starts its processing.

I'm a bit confused about this description- surely it makes sense for the
parallel workers to continue to loop and fetch up to some specified
max..? Then to monitor and to fetch more when the amount pre-fetched so
far drops before that level? The description above makes it sound like
X WAL will be fetched ahead of time, and then the recovery process will
go through those until it runs out and then it'll have to wait for the
next X WAL to be fetched, which means it's still going to end up being
delayed even with these parallel processes, which isn't good.

Does this also properly handle timeline switches..?

Thanks,

Stephen

#4Dmitry Shulga
d.shulga@postgrespro.ru
In reply to: Stephen Frost (#3)
Re: Reduce the time required for a database recovery from archive.

Hello Stephen

On 9 Sep 2020, at 21:26, Stephen Frost <sfrost@snowman.net> wrote:

Greetings,

* Dmitry Shulga (d.shulga@postgrespro.ru) wrote:

Overall archive file processing is done one by one, and this might
create a performance bottleneck if archived WAL files are delivered slowly,
because the database server has to wait for arrival of the next
WAL segment before applying its records.

To address this issue it is proposed to receive archived WAL files in parallel
so that when the next WAL segment file is required for processing of redo log
records it would be already available.

Yes, pgbackrest already does exactly this (if configured)- uses parallel
processes to fetch the WAL and have it be available ahead of time.

pgbackrest is a third-party software that should be additionally installed on customer's premises.

On the other hand, built-in support of this optimization in PostgresSQL is a good argument to add
this feature and provide it to customers just out of the box.

Implementation of this approach assumes running several background processes (bgworkers)
each of which runs a shell command specified by the parameter restore_command
to deliver an archived WAL file. Number of running parallel processes is limited
by the new parameter max_restore_command_workers. If this parameter has value 0
then WAL files delivery is performed using the original algorithm, that is in
one-by-one manner. If this parameter has value greater than 0 then the database
server starts several bgworker processes up to the limit specified by
the parameter max_restore_command_workers and passes to every process
WAL file name to deliver. Active processes start prefetching of specified
WAL files and store received files in the directory pg_wal/pgsql_tmp. After
bgworker process finishes receiving a file it marks itself as a free process
and waits for a new request to receive a next WAL file. The main process
performing database recovery still handles WAL files in one-by-one manner,
but instead of waiting for a next required WAL file's availability it checks for
that file in the prefetched directory. If a new file is present there,
the main process starts its processing.

I'm a bit confused about this description- surely it makes sense for the

OK. The description I originally provided was probably pretty misleading so I will try to clarify it a bit.

So, as soon as a bgworker process finishes delivering a WAL file it marks itself as a free.

WAL records applier working in parallel and processing the WAL files in sequential manner.
Once it finishes handling of the current WAL file, it checks whether it is possible to run extra bgworker processes
to deliver WAL files which will be required a bit later. If there are free bgworker processes then applier requests
to start downloading of one or more extra WAL files. After that applier determines a name of next WAL file to handle
and checks whether it exist in the prefetching directory. If it does exist then applier starts handling it and
processing loop is repeated.

parallel workers to continue to loop and fetch up to some specified
max..? Then to monitor and to fetch more when the amount pre-fetched so
far drops before that level? The description above makes it sound like
X WAL will be fetched ahead of time, and then the recovery process will
go through those until it runs out and then it'll have to wait for the
next X WAL to be fetched, which means it's still going to end up being
delayed even with these parallel processes, which isn't good.

Does this also properly handle timeline switches..?

Thanks,

Stephen

Regards,
Dmitry

#5Stephen Frost
sfrost@snowman.net
In reply to: Dmitry Shulga (#4)
Re: Reduce the time required for a database recovery from archive.

Greetings,

* Dmitry Shulga (d.shulga@postgrespro.ru) wrote:

On 9 Sep 2020, at 21:26, Stephen Frost <sfrost@snowman.net> wrote:
* Dmitry Shulga (d.shulga@postgrespro.ru) wrote:

Overall archive file processing is done one by one, and this might
create a performance bottleneck if archived WAL files are delivered slowly,
because the database server has to wait for arrival of the next
WAL segment before applying its records.

To address this issue it is proposed to receive archived WAL files in parallel
so that when the next WAL segment file is required for processing of redo log
records it would be already available.

Yes, pgbackrest already does exactly this (if configured)- uses parallel
processes to fetch the WAL and have it be available ahead of time.

pgbackrest is a third-party software that should be additionally installed on customer's premises.

On the other hand, built-in support of this optimization in PostgresSQL is a good argument to add
this feature and provide it to customers just out of the box.

Sure, having core do pre-fetching could be useful, though there's the
downside that it, unfortunately, can't know how much WAL is actually
going to be needed as we play forward since we don't know where we will
end up finding the target we've been asked for. Unlikely that'll be too
much of an issue with the traditional 16 MB WAL files, but having a more
integrated backup/restore solution would be able to address that by
tracking the restore targets that are in each WAL (which is something
we've had on our todo list for pgbackrest for a while, and that would
also let someone ask "am I able to reach this restore target?").

Implementation of this approach assumes running several background processes (bgworkers)
each of which runs a shell command specified by the parameter restore_command
to deliver an archived WAL file. Number of running parallel processes is limited
by the new parameter max_restore_command_workers. If this parameter has value 0
then WAL files delivery is performed using the original algorithm, that is in
one-by-one manner. If this parameter has value greater than 0 then the database
server starts several bgworker processes up to the limit specified by
the parameter max_restore_command_workers and passes to every process
WAL file name to deliver. Active processes start prefetching of specified
WAL files and store received files in the directory pg_wal/pgsql_tmp. After
bgworker process finishes receiving a file it marks itself as a free process
and waits for a new request to receive a next WAL file. The main process
performing database recovery still handles WAL files in one-by-one manner,
but instead of waiting for a next required WAL file's availability it checks for
that file in the prefetched directory. If a new file is present there,
the main process starts its processing.

I'm a bit confused about this description- surely it makes sense for the

OK. The description I originally provided was probably pretty misleading so I will try to clarify it a bit.

So, as soon as a bgworker process finishes delivering a WAL file it marks itself as a free.

WAL records applier working in parallel and processing the WAL files in sequential manner.
Once it finishes handling of the current WAL file, it checks whether it is possible to run extra bgworker processes
to deliver WAL files which will be required a bit later. If there are free bgworker processes then applier requests
to start downloading of one or more extra WAL files. After that applier determines a name of next WAL file to handle
and checks whether it exist in the prefetching directory. If it does exist then applier starts handling it and
processing loop is repeated.

Ok- so the idea is that each time the applying process finishes with a
WAL file then it'll see if there's an available worker and, if so, will
give it the next file to go get (which would presumably be some number
in the future and the actual next file the applying process needs is
already available). That sounds better, at least, though I'm not sure
why we're making it the job of the applying process to push the workers
each time..? Also, I'm not sure about the interface- wouldn't it make
more sense to have a "pre-fetch this amount of WAL" kind of parameter
directly instead of tying that to the number of background workers? You
might only need one or two processes doing WAL fetching to be able to
fetch faster than the applying process is able to apply it, but you
probably want to pre-fetch more than just one or two 16 MB WAL files.

In other words, I would have thought we'd have:

wal_prefetch_amount = 1GB
max_restore_command_workers = 2

and then you'd have up to 2 worker processes running and they'd be
keeping 1GB of WAL pre-fetched at all times. If we have just
'max_restore_command_workers' and you want to pre-fetch 1GB of WAL then
you'd have to have a pretty high value there and you'd end up with
a bunch of threads that all spike to go do work each time the applying
process finishes a WAL file but then just sit around doing nothing while
waiting for the applying process to finish another segment.

Thanks,

Stephen

#6Dmitry Shulga
d.shulga@postgrespro.ru
In reply to: Stephen Frost (#5)
Re: Reduce the time required for a database recovery from archive.

Hello Stephen,

On 19 Oct 2020, at 23:25, Stephen Frost <sfrost@snowman.net> wrote:

Greetings,

Implementation of this approach assumes running several background processes (bgworkers)
each of which runs a shell command specified by the parameter restore_command
to deliver an archived WAL file. Number of running parallel processes is limited
by the new parameter max_restore_command_workers. If this parameter has value 0
then WAL files delivery is performed using the original algorithm, that is in
one-by-one manner. If this parameter has value greater than 0 then the database
server starts several bgworker processes up to the limit specified by
the parameter max_restore_command_workers and passes to every process
WAL file name to deliver. Active processes start prefetching of specified
WAL files and store received files in the directory pg_wal/pgsql_tmp. After
bgworker process finishes receiving a file it marks itself as a free process
and waits for a new request to receive a next WAL file. The main process
performing database recovery still handles WAL files in one-by-one manner,
but instead of waiting for a next required WAL file's availability it checks for
that file in the prefetched directory. If a new file is present there,
the main process starts its processing.

I'm a bit confused about this description- surely it makes sense for the

OK. The description I originally provided was probably pretty misleading so I will try to clarify it a bit.

So, as soon as a bgworker process finishes delivering a WAL file it marks itself as a free.

WAL records applier working in parallel and processing the WAL files in sequential manner.
Once it finishes handling of the current WAL file, it checks whether it is possible to run extra bgworker processes
to deliver WAL files which will be required a bit later. If there are free bgworker processes then applier requests
to start downloading of one or more extra WAL files. After that applier determines a name of next WAL file to handle
and checks whether it exist in the prefetching directory. If it does exist then applier starts handling it and
processing loop is repeated.

Ok- so the idea is that each time the applying process finishes with a
WAL file then it'll see if there's an available worker and, if so, will
give it the next file to go get (which would presumably be some number
in the future and the actual next file the applying process needs is
already available). That sounds better, at least, though I'm not sure
why we're making it the job of the applying process to push the workers
each time..?

Every bgwork serves as a task to deliver a WAL file. Considering a task as an active entity is well-known approach in software design.
So I don't see any issues with such implementation. Moreover, implementation of this approach is probably simpler than any other alternatives
and still providing positive performance impact in comparing with current (non optimized) implementation.

Also, I'm not sure about the interface- wouldn't it make
more sense to have a "pre-fetch this amount of WAL" kind of parameter
directly instead of tying that to the number of background workers?

This approach was originally considered and closely discussed.
Finally, it was decided that introducing an extra GUC parameter to control pre-fetch limit is not practical since it shifts responsibility for tuning prefetching
mechanism from postgres server to a user.
From my point of view the fewer parameters exist to set up some feature the better.

You
might only need one or two processes doing WAL fetching to be able to
fetch faster than the applying process is able to apply it, but you
probably want to pre-fetch more than just one or two 16 MB WAL files.

Every time when prefetching is started a number of potentially prefetched files is calculated by expression
PREFETCH_RATION * max_restore_command_workers - 'number of already prefetched files'
where PREFETCH_RATION is compiled-in constant and has value 16.

After that a task for delivering a next WAL file is placed to a current free bgworker process up until no more free bgworker processes.

In other words, I would have thought we'd have:

wal_prefetch_amount = 1GB
max_restore_command_workers = 2

and then you'd have up to 2 worker processes running and they'd be
keeping 1GB of WAL pre-fetched at all times. If we have just
'max_restore_command_workers' and you want to pre-fetch 1GB of WAL then
you'd have to have a pretty high value there and you'd end up with
a bunch of threads that all spike to go do work each time the applying

Sorry, I don't see how we can end up with a bunch of threads?
max_restore_command_workers has value 2 in your example meaning that no more than 2 bgworkers could be run concurrently for the sake of WAL files prefetching

process finishes a WAL file but then just sit around doing nothing while
waiting for the applying process to finish another segment.

I believe that for typical set-up the parameter max_restore_command_workers would have value 2 or 3 in order to supply
a delivered WAL file on time just before it be started processing.

This use case is for environment where time required for delivering WAL file from archive is greater than time required for applying records contained in the WAL file.
If time required for WAL file delivering lesser than than time required for handling records contained in it then max_restore_command_workers shouldn't be specified at all

Show quoted text

Thanks,

Stephen

#7Stephen Frost
sfrost@snowman.net
In reply to: Dmitry Shulga (#6)
Re: Reduce the time required for a database recovery from archive.

Greetings,

* Dmitry Shulga (d.shulga@postgrespro.ru) wrote:

On 19 Oct 2020, at 23:25, Stephen Frost <sfrost@snowman.net> wrote:

Implementation of this approach assumes running several background processes (bgworkers)
each of which runs a shell command specified by the parameter restore_command
to deliver an archived WAL file. Number of running parallel processes is limited
by the new parameter max_restore_command_workers. If this parameter has value 0
then WAL files delivery is performed using the original algorithm, that is in
one-by-one manner. If this parameter has value greater than 0 then the database
server starts several bgworker processes up to the limit specified by
the parameter max_restore_command_workers and passes to every process
WAL file name to deliver. Active processes start prefetching of specified
WAL files and store received files in the directory pg_wal/pgsql_tmp. After
bgworker process finishes receiving a file it marks itself as a free process
and waits for a new request to receive a next WAL file. The main process
performing database recovery still handles WAL files in one-by-one manner,
but instead of waiting for a next required WAL file's availability it checks for
that file in the prefetched directory. If a new file is present there,
the main process starts its processing.

I'm a bit confused about this description- surely it makes sense for the

OK. The description I originally provided was probably pretty misleading so I will try to clarify it a bit.

So, as soon as a bgworker process finishes delivering a WAL file it marks itself as a free.

WAL records applier working in parallel and processing the WAL files in sequential manner.
Once it finishes handling of the current WAL file, it checks whether it is possible to run extra bgworker processes
to deliver WAL files which will be required a bit later. If there are free bgworker processes then applier requests
to start downloading of one or more extra WAL files. After that applier determines a name of next WAL file to handle
and checks whether it exist in the prefetching directory. If it does exist then applier starts handling it and
processing loop is repeated.

Ok- so the idea is that each time the applying process finishes with a
WAL file then it'll see if there's an available worker and, if so, will
give it the next file to go get (which would presumably be some number
in the future and the actual next file the applying process needs is
already available). That sounds better, at least, though I'm not sure
why we're making it the job of the applying process to push the workers
each time..?

Every bgwork serves as a task to deliver a WAL file. Considering a task as an active entity is well-known approach in software design.
So I don't see any issues with such implementation. Moreover, implementation of this approach is probably simpler than any other alternatives
and still providing positive performance impact in comparing with current (non optimized) implementation.

I don't think we look only at if something is an improvement or not over
the current situation when we consider changes.

The relatively simple approach I was thinking was that a couple of
workers would be started and they'd have some prefetch amount that needs
to be kept out ahead of the applying process, which they could
potentially calculate themselves without needing to be pushed forward by
the applying process.

Also, I'm not sure about the interface- wouldn't it make
more sense to have a "pre-fetch this amount of WAL" kind of parameter
directly instead of tying that to the number of background workers?

This approach was originally considered and closely discussed.
Finally, it was decided that introducing an extra GUC parameter to control pre-fetch limit is not practical since it shifts responsibility for tuning prefetching
mechanism from postgres server to a user.
From my point of view the fewer parameters exist to set up some feature the better.

I agree in general that it's better to have fewer parameters, but I
disagree that this isn't an important option for users to be able to
tune- the rate of fetching WAL and of applying WAL varies quite a bit
from system to system. Being able to tune the pre-fetch seems like it'd
actually be more important to a user than the number of processes
required to keep up with that amount of pre-fetching, which is something
we could actually figure out on our own...

You
might only need one or two processes doing WAL fetching to be able to
fetch faster than the applying process is able to apply it, but you
probably want to pre-fetch more than just one or two 16 MB WAL files.

Every time when prefetching is started a number of potentially prefetched files is calculated by expression
PREFETCH_RATION * max_restore_command_workers - 'number of already prefetched files'
where PREFETCH_RATION is compiled-in constant and has value 16.

After that a task for delivering a next WAL file is placed to a current free bgworker process up until no more free bgworker processes.

Ah, it wasn't mentioned that we've got a multiplier in here, but it
still ends up meaning that if a user actually wants to tune the amount
of pre-fetching being done, they're going to end up having to tune the,
pretty much entirely unrelated, value of max_restore_command_workers.
That really seems entirely backwards to me from what I would think the
user would actually want to tune.

In other words, I would have thought we'd have:

wal_prefetch_amount = 1GB
max_restore_command_workers = 2

and then you'd have up to 2 worker processes running and they'd be
keeping 1GB of WAL pre-fetched at all times. If we have just
'max_restore_command_workers' and you want to pre-fetch 1GB of WAL then
you'd have to have a pretty high value there and you'd end up with
a bunch of threads that all spike to go do work each time the applying

Sorry, I don't see how we can end up with a bunch of threads?
max_restore_command_workers has value 2 in your example meaning that no more than 2 bgworkers could be run concurrently for the sake of WAL files prefetching

If you don't give the user the option to configure the prefetch amount,
except indirectly by changing the number of max restore workers, then to
get a higher prefetch amount they have to increase the number of
workers. That's what I'm referring to above, and previously, here.

process finishes a WAL file but then just sit around doing nothing while
waiting for the applying process to finish another segment.

I believe that for typical set-up the parameter max_restore_command_workers would have value 2 or 3 in order to supply
a delivered WAL file on time just before it be started processing.

This use case is for environment where time required for delivering WAL file from archive is greater than time required for applying records contained in the WAL file.
If time required for WAL file delivering lesser than than time required for handling records contained in it then max_restore_command_workers shouldn't be specified at all

That's certainly not correct at all- the two aren't really all that
related, because any time spent waiting for a WAL file to be delivered
is time that the applying process *could* be working to apply WAL
instead of waiting. At a minimum, I'd expect us to want to have, by
default, at least one worker process running out in front of the
applying process to hopefully eliminate most, if not all, time where the
applying process is waiting for a WAL to show up. In cases where the
applying process is faster than a single fetching process, a user might
want to have two or more restore workers, though ultimately I still
contend that what they really want is as many workers as needed to make
sure that the applying process doesn't ever need to wait- up to some
limit based on the amount of space that's available.

And back to the configuration side of this- have you considered the
challenge that a user who is using very large WAL files might run
into with the proposed approach that doesn't allow them to control the
amount of space used? If I'm using 1G WAL files, then I need to have
16G available to have *any* pre-fetching done with this proposed
approach, right? That doesn't seem great.

Thanks,

Stephen

#8Anastasia Lubennikova
a.lubennikova@postgrespro.ru
In reply to: Stephen Frost (#7)
Re: Reduce the time required for a database recovery from archive.

On 09.11.2020 19:31, Stephen Frost wrote:

Greetings,

* Dmitry Shulga (d.shulga@postgrespro.ru) wrote:

On 19 Oct 2020, at 23:25, Stephen Frost <sfrost@snowman.net> wrote:

process finishes a WAL file but then just sit around doing nothing while
waiting for the applying process to finish another segment.

I believe that for typical set-up the parameter max_restore_command_workers would have value 2 or 3 in order to supply
a delivered WAL file on time just before it be started processing.

This use case is for environment where time required for delivering WAL file from archive is greater than time required for applying records contained in the WAL file.
If time required for WAL file delivering lesser than than time required for handling records contained in it then max_restore_command_workers shouldn't be specified at all

That's certainly not correct at all- the two aren't really all that
related, because any time spent waiting for a WAL file to be delivered
is time that the applying process *could* be working to apply WAL
instead of waiting. At a minimum, I'd expect us to want to have, by
default, at least one worker process running out in front of the
applying process to hopefully eliminate most, if not all, time where the
applying process is waiting for a WAL to show up. In cases where the
applying process is faster than a single fetching process, a user might
want to have two or more restore workers, though ultimately I still
contend that what they really want is as many workers as needed to make
sure that the applying process doesn't ever need to wait- up to some
limit based on the amount of space that's available.

And back to the configuration side of this- have you considered the
challenge that a user who is using very large WAL files might run
into with the proposed approach that doesn't allow them to control the
amount of space used? If I'm using 1G WAL files, then I need to have
16G available to have *any* pre-fetching done with this proposed
approach, right? That doesn't seem great.

Thanks,

Stephen

Status update for a commitfest entry.

The commitfest is closed now. As this entry has been Waiting on Author
for a while, I've marked it as returned with feedback. Dmitry, feel free
to resubmit an updated version to a future commitfest.

--
Anastasia Lubennikova
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

#9Dmitry Shulga
d.shulga@postgrespro.ru
In reply to: Stephen Frost (#7)
1 attachment(s)
Re: Reduce the time required for a database recovery from archive.

Hi Stephen

Based on our last discussion I redesigned the implementation of WAL archive recovery speed-up. The main idea of the new implementation was partly borrowed from your proposal, to be more accurate from the following one:

On 9 Nov 2020, at 23:31, Stephen Frost <sfrost@snowman.net> wrote:

The relatively simple approach I was thinking was that a couple of
workers would be started and they'd have some prefetch amount that needs
to be kept out ahead of the applying process, which they could
potentially calculate themselves without needing to be pushed forward by
the applying process.

In the new implementation, several workers are spawned on server start up for delivering WAL segements from archive. The number of workers to spawn is specfied by the GUC parameter wal_prefetch_workers; the max. number of files to preload from the archive is determined by the GUC parameter wal_max_prefetch_amount. The applier of WAL records still handles WAL files one-by-one, but since several prefetching processes are loading files from the archive, there is a high probability that when the next WAL file is requested by the applier of WAL records, it has already been delivered from the archive.

Every time any of the running workers is going to preload the next WAL file, it checks whether a limit imposed by the parameter wal_max_prefetch_amount was reached. If it was, then the process suspends preloading until the WAL applier process handles some of the already preloaded WAL files and the total number of already loaded but not yet processed WAL files drops below this limit.

At the moment I didn't implement a mechanism for dynamic calculation of the number of workers required for loading the WAL files in time. We can consider current (simplified) implementation as a base for further discussion and turn to this matter in the next iteration if it be needed.

Also I would like to ask your opinion about the issue I'm thinking about:
Parallel workers spawned for preloading WAL files from archive use the original mechanism for delivering files from archive - they run a command specified by the GUC parameter restore_command. One of the possible parameters accepted by the restore_command is %r, which specifies the filename of the last restart point. If several workers preload WAL files simultaneously with another process applying the preloaded WAL files, I’m not sure what is correct way to determine the last restart point value that WAL-preloading processes should use, because this value can be updated at any time by the process that applies WALs.

Another issue that I would like to ask your opinion about regards to choosing correct value for a max size of the hash table stored in shared memory. Currently, wal_max_prefetch_amount is passed as the value for max. hash table size that I'm not sure is the best decision.

Thanks in advance for feedback.

Regards,
Dmitry

Attachments:

0001-Reduce-time-required-to-recover-database-from-archiv.patchapplication/octet-stream; name=0001-Reduce-time-required-to-recover-database-from-archiv.patch; x-unix-mode=0644Download
From c071e8ee78aac811feaf54c4374c1a998409733e Mon Sep 17 00:00:00 2001
From: Dmitry Shulga <d.shulga@postgrespro.ru>
Date: Fri, 18 Dec 2020 12:38:58 +0700
Subject: [PATCH] Reduce time required to recover database from archive.

Originally database recovering from archive was performed by
sequential receiving of files with WAL records and applying them against
the database. Delivering of files containing WAL records are performed
by running a command specified by the GUC parameter restore_command.
In case receiving of every file containing WAL records takes long time
it results in standing idle most of time waiting until files be received.
If time required to apply WAL records from an archive file is significantly
lesser than time required to deliver the file from archive it leads
to nonproductive standing idle after current WAL segment is applied and
before next WAL segment be received.  As a consequence a wall time required
to recover a database from archive log can be unacceptably long.

To reduce total time required to restore database from archive the procedure
for delivering of WAL files was redesigned in order to allow concurrent
loading of WAL files. At postmaster start a few background processes are
spawned to load WAL files from archive in parallel. A number of processes
that started to perform preloading of WAL files is determined by
the new GUC parameter wal_prefetch_workers. A number of WAL files to prefetch
from archive is limited by the new GUC parameter wal_max_prefetch_amount.

Additionally, refactoring was done to extract duplicate code, used
in the files xlogarchive.c and xlogrestore.c, into stanalone functions and
move it to the file xlogutils.c

Author: Dmitry Shulga
Reviewed-by: Anna Akenteva
Tested-by: Roman Zharkov

diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 595e02de722..ffbf8090f45 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -32,7 +32,8 @@ OBJS = \
 	xlogfuncs.o \
 	xloginsert.o \
 	xlogreader.o \
-	xlogutils.o
+	xlogutils.o \
+	xlogrestore.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 13f1d8c3dc7..f0a0c68725e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -37,6 +37,7 @@
 #include "access/xloginsert.h"
 #include "access/xlogreader.h"
 #include "access/xlogutils.h"
+#include "access/xlogrestore.h"
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
 #include "catalog/pg_database.h"
@@ -3684,10 +3685,11 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 					 xlogfname);
 			set_ps_display(activitymsg);
 
-			restoredFromArchive = RestoreArchivedFile(path, xlogfname,
-													  "RECOVERYXLOG",
-													  wal_segment_size,
-													  InRedo);
+			restoredFromArchive = RestoreCommandXLog(path, xlogfname,
+													 "RECOVERYXLOG",
+													 wal_segment_size,
+													 InRedo);
+
 			if (!restoredFromArchive)
 				return -1;
 			break;
diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c
index f39dc4ddf1a..fb4023f1cec 100644
--- a/src/backend/access/transam/xlogarchive.c
+++ b/src/backend/access/transam/xlogarchive.c
@@ -22,6 +22,7 @@
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
+#include "access/xlogutils.h"
 #include "common/archive.h"
 #include "miscadmin.h"
 #include "postmaster/startup.h"
@@ -55,13 +56,8 @@ RestoreArchivedFile(char *path, const char *xlogfname,
 					bool cleanupEnabled)
 {
 	char		xlogpath[MAXPGPATH];
-	char	   *xlogRestoreCmd;
 	char		lastRestartPointFname[MAXPGPATH];
 	int			rc;
-	struct stat stat_buf;
-	XLogSegNo	restartSegNo;
-	XLogRecPtr	restartRedoPtr;
-	TimeLineID	restartTli;
 
 	/*
 	 * Ignore restore_command when not in archive recovery (meaning we are in
@@ -102,22 +98,7 @@ RestoreArchivedFile(char *path, const char *xlogfname,
 	/*
 	 * Make sure there is no existing file named recovername.
 	 */
-	if (stat(xlogpath, &stat_buf) != 0)
-	{
-		if (errno != ENOENT)
-			ereport(FATAL,
-					(errcode_for_file_access(),
-					 errmsg("could not stat file \"%s\": %m",
-							xlogpath)));
-	}
-	else
-	{
-		if (unlink(xlogpath) != 0)
-			ereport(FATAL,
-					(errcode_for_file_access(),
-					 errmsg("could not remove file \"%s\": %m",
-							xlogpath)));
-	}
+	FileUnlink(xlogpath);
 
 	/*
 	 * Calculate the archive file cutoff point for use during log shipping
@@ -136,97 +117,28 @@ RestoreArchivedFile(char *path, const char *xlogfname,
 	 * flags to signify the point when we can begin deleting WAL files from
 	 * the archive.
 	 */
-	if (cleanupEnabled)
-	{
-		GetOldestRestartPoint(&restartRedoPtr, &restartTli);
-		XLByteToSeg(restartRedoPtr, restartSegNo, wal_segment_size);
-		XLogFileName(lastRestartPointFname, restartTli, restartSegNo,
-					 wal_segment_size);
-		/* we shouldn't need anything earlier than last restart point */
-		Assert(strcmp(lastRestartPointFname, xlogfname) <= 0);
-	}
-	else
-		XLogFileName(lastRestartPointFname, 0, 0L, wal_segment_size);
+	XLogFileNameLastPoint(lastRestartPointFname, cleanupEnabled);
+	Assert(strcmp(lastRestartPointFname, xlogfname) <= 0);
 
-	/* Build the restore command to execute */
-	xlogRestoreCmd = BuildRestoreCommand(recoveryRestoreCommand,
-										 xlogpath, xlogfname,
-										 lastRestartPointFname);
-	if (xlogRestoreCmd == NULL)
-		elog(ERROR, "could not build restore command \"%s\"",
-			 recoveryRestoreCommand);
-
-	ereport(DEBUG3,
-			(errmsg_internal("executing restore command \"%s\"",
-							 xlogRestoreCmd)));
-
-	/*
-	 * Check signals before restore command and reset afterwards.
-	 */
-	PreRestoreCommand();
-
-	/*
-	 * Copy xlog from archival storage to XLOGDIR
-	 */
-	rc = system(xlogRestoreCmd);
-
-	PostRestoreCommand();
-	pfree(xlogRestoreCmd);
+	rc = DoRestore(xlogpath, xlogfname, lastRestartPointFname);
 
 	if (rc == 0)
 	{
-		/*
-		 * command apparently succeeded, but let's make sure the file is
-		 * really there now and has the correct size.
-		 */
-		if (stat(xlogpath, &stat_buf) == 0)
+		bool		file_not_found;
+		bool		ret = FileValidateSize(xlogpath, expectedSize, xlogfname,
+										   &file_not_found);
+
+		if (ret)
 		{
-			if (expectedSize > 0 && stat_buf.st_size != expectedSize)
-			{
-				int			elevel;
-
-				/*
-				 * If we find a partial file in standby mode, we assume it's
-				 * because it's just being copied to the archive, and keep
-				 * trying.
-				 *
-				 * Otherwise treat a wrong-sized file as FATAL to ensure the
-				 * DBA would notice it, but is that too strong? We could try
-				 * to plow ahead with a local copy of the file ... but the
-				 * problem is that there probably isn't one, and we'd
-				 * incorrectly conclude we've reached the end of WAL and we're
-				 * done recovering ...
-				 */
-				if (StandbyMode && stat_buf.st_size < expectedSize)
-					elevel = DEBUG1;
-				else
-					elevel = FATAL;
-				ereport(elevel,
-						(errmsg("archive file \"%s\" has wrong size: %lld instead of %lld",
-								xlogfname,
-								(long long int) stat_buf.st_size,
-								(long long int) expectedSize)));
-				return false;
-			}
-			else
-			{
-				ereport(LOG,
-						(errmsg("restored log file \"%s\" from archive",
-								xlogfname)));
-				strcpy(path, xlogpath);
-				return true;
-			}
+			ereport(LOG,
+					(errmsg("restored log file \"%s\" from archive",
+							xlogfname)));
+			strcpy(path, xlogpath);
+			return true;
 		}
-		else
-		{
-			/* stat failed */
-			int			elevel = (errno == ENOENT) ? LOG : FATAL;
 
-			ereport(elevel,
-					(errcode_for_file_access(),
-					 errmsg("could not stat file \"%s\": %m", xlogpath),
-					 errdetail("restore_command returned a zero exit status, but stat() failed.")));
-		}
+		if (!file_not_found)
+			return false;
 	}
 
 	/*
diff --git a/src/backend/access/transam/xlogrestore.c b/src/backend/access/transam/xlogrestore.c
new file mode 100644
index 00000000000..4cd87e296ef
--- /dev/null
+++ b/src/backend/access/transam/xlogrestore.c
@@ -0,0 +1,951 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogrestore.c
+ *	  Infrastructure for parallel restore commands execution
+ *
+ * Copyright (c) 2020, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/transam/xlogrestore.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "access/xlogrestore.h"
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xlogarchive.h"
+#include "access/xlogdefs.h"
+#include "access/xlogutils.h"
+#include "common/archive.h"
+#include "common/file_perm.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "port.h"
+#include "port/atomics.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/startup.h"
+#include "storage/ipc.h"
+#include "storage/spin.h"
+#include "storage/shmem.h"
+#include "storage/latch.h"
+#include "storage/lock.h"
+#include "tcop/tcopprot.h"
+#include "utils/timestamp.h"
+#include "utils/memutils.h"
+
+/*
+ * The max number of WAL files to prefetch from archive.
+ */
+int			wal_max_prefetch_amount;
+
+/*
+ * Number of background workers to run on postmaster startup for retrieving
+ * WAL files from archive. Zero value of this variable turns off prefetching of
+ * WAL files from archive.
+ */
+int			wal_prefetch_workers;
+
+/*
+ * Data for restore_command bgworker.
+ */
+typedef struct RestoreSlot
+{
+	/*
+	 * The handle corresponding to a running bgworker process.
+	 */
+	BackgroundWorkerHandle *bgwhandle;
+
+	/*
+	 * The latch used for signaling that bgworker can continue downloading of
+	 * WAL files since a number of already pre-fetched	WAL files dropped below
+	 * the limit imposed by the GUC parameter wal_max_prefetch_amount.
+	 */
+	Latch		continuePrefetching;
+
+	/*
+	 * The latch to notify an invoker that a bgworker process has been
+	 * successfully run.
+	 */
+	Latch		workerReady;
+
+	/*
+	 * This flag is set by bgworker process if it was started and run
+	 * successfully.
+	 */
+	bool		workerStarted;
+} RestoreSlot;
+
+typedef struct PrefetchedFile
+{
+	/*
+	 * The name of the archive file %f.
+	 */
+	char		xlogfname[MAXFNAMELEN];
+} PrefetchedFile;
+
+/*
+ * Type of values stored in hash table RestoreDataStruct->hashtab
+ */
+typedef struct PrefetchedFileEntry
+{
+	PrefetchedFile key;
+
+	/*
+	 * True if a file with a name equals to the key does exist on a file
+	 * system, else false.
+	 */
+	bool		file_exist;
+
+	/*
+	 * True if a file with a name equals to the key has been already processed
+	 * during recovery procedure.
+	 */
+	bool		file_was_processed;
+} PrefetchedFileEntry;
+
+typedef struct RestoreDataStruct
+{
+	/*
+	 * The lock to guard against concurrent modification of structure's
+	 * members from parallel running threads.
+	 */
+	slock_t		lock;
+
+	/*
+	 * The latch to support for producer/consumer pattern. Producers are
+	 * bgworker processes pre-fetching WAL files from archive, Consumer is a
+	 * recovery process who waiting until the next required WAL file be
+	 * downloaded from archive to continue database recovering. This latch is
+	 * used for notifying the consumer that a new file was retrieved by one of
+	 * running producers.
+	 */
+	Latch		fileAvailable;
+
+	/*
+	 * Hash table to trace what WAL files have been pre-fetched.
+	 */
+	HTAB	   *hashtab;
+
+	/*
+	 * The name of the last recovery point file %r.
+	 */
+	char		pointfname[MAXFNAMELEN];
+
+	/*
+	 * TLI and an initial segment number from which to start a database
+	 * recovery
+	 */
+	XLogSegNo	restartSegNo;
+	TimeLineID	restartTli;
+
+	/*
+	 * Number of pre-fetched WAL files.
+	 */
+	int			nprefetched;
+
+	/*
+	 * Data for background workers.
+	 */
+	RestoreSlot slots[FLEXIBLE_ARRAY_MEMBER];
+
+} RestoreDataStruct;
+
+static RestoreDataStruct *RestoreData = NULL;
+
+typedef enum WALPrefetchingState_e
+{
+	WALPrefetchingIsInactive,
+	WALPrefetchingIsActive,
+	WALPrefetchingShutdown
+} WALPrefetchingState_e;
+
+static WALPrefetchingState_e WALPrefetchingState = WALPrefetchingIsInactive;
+
+static void XLogFilePathPrefetch(char *path, const char *xlogfname);
+static bool FilePathExists(const char *xlogpath);
+static void StartWALPrefetchWorkers(const char *xlogfname);
+static void ShutdownWALPrefetchWorkers(int last_process_idx);
+static bool WaitUntilFileRetrieved(const char *xlogfname,
+								   bool *wal_file_processed);
+
+/*
+ * Calculate a size of shared memory used for storing bgworker slots.
+ */
+Size
+RestoreCommandShmemSize(void)
+{
+	Size		size;
+
+	size = sizeof(RestoreDataStruct);
+	size = MAXALIGN(size);
+	size = add_size(size, mul_size(wal_prefetch_workers, sizeof(RestoreSlot)));
+	return size;
+}
+
+#define PREFETCH_DIR XLOGDIR "/" PG_TEMP_FILES_DIR
+
+/*
+ * Create a temporary directory to store prepfetched files
+ * and initialize a shared memory used for storing bgworker slots.
+ */
+void
+RestoreCommandShmemInit(void)
+{
+	bool		found;
+
+	RestoreData = (RestoreDataStruct *)
+		ShmemInitStruct("Restore Command Workers Data",
+						RestoreCommandShmemSize(),
+						&found);
+
+	if (!found)
+	{
+		int			i;
+		HASHCTL		hash_ctl;
+
+		memset(RestoreData, 0, RestoreCommandShmemSize());
+
+		SpinLockInit(&RestoreData->lock);
+
+		InitSharedLatch(&RestoreData->fileAvailable);
+
+		/* Create the hash table */
+		memset(&hash_ctl, 0, sizeof(hash_ctl));
+
+		hash_ctl.keysize = sizeof(PrefetchedFile);
+		hash_ctl.entrysize = sizeof(PrefetchedFileEntry);
+
+		RestoreData->hashtab = ShmemInitHash("Pre-fetched WAL files",
+											 wal_max_prefetch_amount,
+											 wal_max_prefetch_amount,
+											 &hash_ctl,
+											 HASH_ELEM);
+
+		/*
+		 * Initialize memory for each worker slot.
+		 */
+		for (i = 0; i < wal_prefetch_workers; ++i)
+		{
+			RestoreSlot *slot = &RestoreData->slots[i];
+
+			memset(slot, 0, sizeof(RestoreSlot));
+			InitSharedLatch(&slot->continuePrefetching);
+			InitSharedLatch(&slot->workerReady);
+		}
+
+		/* Create or clear temporary wals. */
+		PathNameCreateTemporaryDir(XLOGDIR, PREFETCH_DIR);
+		RemovePgTempFilesInDir(PREFETCH_DIR, true, true);
+	}
+}
+
+/*
+ * Iterate along bgworkers slots and notify everyone bgworker process
+ * waiting on the continuePrefetching Latch to resume retrieving of WAL files
+ * from archive.
+ */
+static void
+ResumePrefetching()
+{
+	unsigned	i;
+
+	for (i = 0; i < wal_prefetch_workers; ++i)
+	{
+		SetLatch(&RestoreData->slots[i].continuePrefetching);
+	}
+}
+
+/*
+ * This function is counterpart of RestoreArchivedFile function with xlogs
+ * pre-fetching.
+ *
+ * On success the requested WAL file has been retrieved from archive.
+ * Invocation of this function also initiates loading of WAL files that
+ * will be required later. For this goal several brworker processes
+ * are started and perform loading of WAL files. A name of file to start
+ * loading is assigned to every background worker process together with
+ * a delta value that will be applied to a segment number of a WAL file just
+ * received in order to calculate a next file name to pre-load.
+ *
+ * A number of background workers started for WAL files loading is determined
+ * by the new GUC parameter wal_prefetch_workers. A number of WAL files to
+ * prefetch is limited by the new GUC parameter wal_max_prefetch_amount.
+ * If wal_max_prefetch_amount has value 0 no background worker processes
+ * are started and WAL files preloading is not performed. In this case regular
+ * (in one by one manner) loading of WAL files is performed.
+ *
+ * Input:
+ *		path - the path to a WAL file retrieved from archive
+ *		xlogfname - a name of WAL file to retrieve from archive
+ *		recovername - the directory name where a retrieved WAL file
+ *					  has to be placed
+ *		expectedSize - expected size of the requested WAL file
+ *		cleanupEnabled - true if start recovering from last restart point,
+ *						 false if start recovering from the very outset.
+ *	Return:
+ *		true on success, false on error
+ */
+bool
+RestoreCommandXLog(char *path, const char *xlogfname, const char *recovername,
+				   const off_t expectedSize, bool cleanupEnabled)
+{
+	char		xlogpath[MAXPGPATH];
+	bool		prefetchedFileNotFound,
+				wal_file_already_processed;
+	int			nprefetched;
+	PrefetchedFileEntry *foundEntry;
+
+	/*
+	 * Synchronous mode.
+	 */
+	if (wal_max_prefetch_amount < 1)
+		goto fallback;
+
+	/*
+	 * Ignore restore_command when not in archive recovery (meaning we are in
+	 * crash recovery).
+	 */
+	if (!ArchiveRecoveryRequested)
+		goto fallback;
+
+	/*
+	 * In standby mode, restore_command might not be supplied.
+	 */
+	if (recoveryRestoreCommand == NULL ||
+		strcmp(recoveryRestoreCommand, "") == 0)
+		goto fallback;
+
+	/*
+	 * Create the last restart point file name.
+	 */
+	XLogFileNameLastPoint(RestoreData->pointfname, cleanupEnabled);
+
+	/*
+	 * Run WAL pre-fetching processes if they haven't been started yet.
+	 */
+	StartWALPrefetchWorkers(xlogfname);
+
+	/*
+	 * We shouldn't need anything earlier than the last restart point.
+	 */
+	Assert(strcmp(RestoreData->pointfname, xlogfname) <= 0);
+
+	/*
+	 * Make prefetched path for file.
+	 */
+	XLogFilePathPrefetch(xlogpath, xlogfname);
+
+	/*
+	 * Wait until file be retrieved from archive.
+	 */
+	if (!WaitUntilFileRetrieved(xlogfname, &wal_file_already_processed))
+	{
+		/*
+		 * WaitUntilFileRetrieved() returns false in case there is no more WAL
+		 * files to retrieve.
+		 */
+		snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
+		return false;
+	}
+
+	if (wal_file_already_processed)
+		return false;
+
+	/*
+	 * Make sure the file is really there now and has the correct size.
+	 */
+	if (!FileValidateSize(xlogpath, expectedSize, xlogfname,
+						  &prefetchedFileNotFound))
+	{
+		if (prefetchedFileNotFound)
+			snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
+		else
+			/* Remove artifacts. */
+			FileUnlink(xlogpath);
+
+		return false;
+	}
+
+	/*
+	 * Move file to target path.
+	 */
+	snprintf(path, MAXPGPATH, XLOGDIR "/%s", recovername);
+	durable_rename(xlogpath, path, ERROR);
+
+	/*
+	 * Decrease by one a number of prefetched files and wake up any of
+	 * pre-fetching processes suspended on this latch.
+	 */
+	SpinLockAcquire(&RestoreData->lock);
+
+	Assert(RestoreData->nprefetched > 0);
+
+	nprefetched = RestoreData->nprefetched;
+	RestoreData->nprefetched = RestoreData->nprefetched - 1;
+
+	/*
+	 * Check whether a number of already prefetched files greater or equal the
+	 * limit wal_max_prefetch_amount and whether this number dropped below the
+	 * limit after its decrement.
+	 */
+	if (nprefetched >= wal_max_prefetch_amount &&
+		RestoreData->nprefetched < wal_max_prefetch_amount)
+
+		/*
+		 * The value of RestoreData->nprefetched dropped below the
+		 * wal_max_prefetch_amount limit, signal background processes to
+		 * continue prefetching of WAL files from archive.
+		 */
+		ResumePrefetching();
+
+	foundEntry =
+	(PrefetchedFileEntry *) hash_search(RestoreData->hashtab, xlogfname,
+										HASH_FIND, NULL);
+
+	foundEntry->file_was_processed = true;
+	SpinLockRelease(&RestoreData->lock);
+
+	/*
+	 * Log message like in RestoreArchivedFile.
+	 */
+	ereport(LOG,
+			(errmsg("restored log file \"%s\" from archive",
+					xlogfname)));
+	return true;
+
+fallback:
+
+	/*
+	 * On any errors - try default implementation
+	 */
+	return RestoreArchivedFile(path, xlogfname, recovername, expectedSize,
+							   cleanupEnabled);
+}
+
+/*
+ * Waiting until a file with the name specified by the parameter xlogfname
+ * be received from archive and written to file system.
+ *
+ * Input:
+ *		xlogfname - a name of file to wait for delivering from archive
+ * Return:
+ *		false in case there is no more file in archive to retrieve, else true
+ */
+static bool
+WaitUntilFileRetrieved(const char *xlogfname, bool *wal_file_processed)
+{
+	bool		found;
+
+	do
+	{
+		PrefetchedFileEntry *foundEntry;
+
+		SpinLockAcquire(&RestoreData->lock);
+
+		/*
+		 * Check whether the file name does exist in the hash table. If it
+		 * does then restore_command was executed on behalf of this file name
+		 * and a file was probably copied to a destination directory. The
+		 * actual presence of the file in the destination directory is
+		 * determined by the the data member file_exist of the structure
+		 * PrefetchedFileEntry.
+		 */
+		foundEntry =
+			(PrefetchedFileEntry *) hash_search(RestoreData->hashtab, xlogfname,
+												HASH_FIND, NULL);
+
+		if (foundEntry != NULL)
+		{
+			/*
+			 * The data member file_exist of the structure PrefetchedFileEntry
+			 * has the false value if restore_command was executed but the
+			 * file wasn't copied to a destination directory by some reason,
+			 * e.g. since no more file exist in archive.
+			 */
+			found = foundEntry->file_exist;
+			*wal_file_processed = foundEntry->file_was_processed;
+
+			SpinLockRelease(&RestoreData->lock);
+			break;
+		}
+		SpinLockRelease(&RestoreData->lock);
+
+		/*
+		 * There is no an entry in hash table corresponding to a name
+		 * specified by the parameter xlogfname. Wait on the latch
+		 * RestoreData->fileAvailable located in the shared memory until a
+		 * file be retrieved from archive. bgworker processes run for
+		 * delivering WAL files from archive will trigger this latch every
+		 * time a new WAL file be delivered.
+		 */
+		(void) WaitLatch(&RestoreData->fileAvailable,
+						 WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
+						 0, PG_WAIT_EXTENSION);
+		ResetLatch(&RestoreData->fileAvailable);
+		CHECK_FOR_INTERRUPTS();
+	}
+	while (true);
+
+	return found;
+}
+
+/*
+ * Insert a file name into the hash table and wake up a thread that is waiting
+ * until file retrieved from archive.
+ *
+ * Input:
+ *		xlogfname - name of pre-fetched file.
+ */
+static void
+SignalFileDelivered(const char *xlogfname)
+{
+	PrefetchedFileEntry newFileEntry;
+	PrefetchedFileEntry *insertedElement;
+
+#ifdef USE_ASSERT_CHECKING
+	bool		found = false;
+#endif
+
+	strcpy(newFileEntry.key.xlogfname, xlogfname);
+
+	SpinLockAcquire(&RestoreData->lock);
+
+	/*
+	 * Add the new file name to the hash of file names that have been already
+	 * delivered from archive. Out of memory error is reported by ereport, so
+	 * it is not required to check the return value of hash_search().
+	 */
+#ifdef USE_ASSERT_CHECKING
+	insertedElement = hash_search(RestoreData->hashtab, &newFileEntry,
+								  HASH_ENTER, &found);
+
+	/*
+	 * An entry with such a key mustn't exist in the hash table at the tim of
+	 * insertion. If it does something really wrong happened.
+	 */
+	Assert(!found);
+#else
+	insertedElement = hash_search(RestoreData->hashtab, &newFileEntry,
+								  HASH_ENTER, NULL);
+#endif
+
+	insertedElement->file_exist = true;
+	insertedElement->file_was_processed = false;
+
+	/*
+	 * Increase by one a number of pre-fetched files
+	 */
+	RestoreData->nprefetched = RestoreData->nprefetched + 1;
+
+	/*
+	 * Wake up the thread that executing RestoreCommandXLog() to continue
+	 * processing of WAL files.
+	 */
+	SetLatch(&RestoreData->fileAvailable);
+
+	SpinLockRelease(&RestoreData->lock);
+}
+
+/*
+ * Mark that the specified WAL file doesn't exist in archive and wake up
+ * a thread that is waiting in RestoreCommandXLog() to finish WAL file
+ * processing.
+ */
+static void
+SignalFileNotExist(const char *xlogfname)
+{
+	PrefetchedFileEntry *insertedElement;
+	PrefetchedFileEntry newFileEntry;
+#ifdef USE_ASSERT_CHECKING
+	bool		found = false;
+#endif
+
+	strcpy(newFileEntry.key.xlogfname, xlogfname);
+
+	SpinLockAcquire(&RestoreData->lock);
+
+	/*
+	 * Add the new file name to the hash of file names that have been already
+	 * delivered from archive. Out of memory error is reported by ereport, so
+	 * it is not required to check the return value of hash_search().
+	 */
+#ifdef USE_ASSERT_CHECKING
+	insertedElement = hash_search(RestoreData->hashtab, &newFileEntry,
+								  HASH_ENTER, &found);
+
+	/*
+	 * We tried to add the new file name and discovered that such file does
+	 * already exist. It seems something wrong happens.
+	 */
+	Assert(!found);
+	insertedElement->file_exist = false;
+	insertedElement->file_was_processed = false;
+
+	Assert(insertedElement->file_exist == false &&
+		   strcmp(insertedElement->key.xlogfname, xlogfname) == 0);
+#else
+	insertedElement = hash_search(RestoreData->hashtab, &newFileEntry,
+								  HASH_ENTER, NULL);
+	insertedElement->file_exist = false;
+	insertedElement->file_was_processed = false;
+#endif
+
+	/*
+	 * Wake up the thread executing RestoreCommandXLog() to finish WAL files
+	 * processing since no more files left in archive.
+	 */
+	SetLatch(&RestoreData->fileAvailable);
+
+	SpinLockRelease(&RestoreData->lock);
+}
+
+/*
+ * Check whether a limit imposed by the GUC parameter wal_max_prefetch_amount
+ * has been exceeded on prefetching of WAL files and suspend further
+ * downloading of WAL files until a notification be received to resume it.
+ * Input:
+ *			bgwid - an index of bgworker process that has to suspend prefetching of
+ *			WAL files.
+ */
+static void
+SuspendPrefetchingIfRequired(uint16 bgwid)
+{
+	while (RestoreData->nprefetched >= wal_max_prefetch_amount)
+	{
+		/*
+		 * If a number of already pre-fetched WAL files exceeds a limit
+		 * imposed by the GUC parameter 'wal_max_prefetch_amount', suspend
+		 * execution until some of already retrieved WAL files be processed
+		 * and a number of pre-fetched files dropped below this limit.
+		 */
+		(void) WaitLatch(&RestoreData->slots[bgwid].continuePrefetching,
+						 WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
+						 0, PG_WAIT_EXTENSION);
+		ResetLatch(&RestoreData->slots[bgwid].continuePrefetching);
+		CHECK_FOR_INTERRUPTS();
+	}
+}
+
+/*
+ * The main entry point for bgworker.
+ * Input:
+ *		main_arg -	id value assigned to each pre-fetching bgworker process.
+ *				This value is used both as an index in array of active bgworker
+ *				processes and for calculating the number of the first segment
+ *				from that to start WAL files pre-fetching by corresponding
+ *				bgworker process.
+ *
+ * This function returns the control flow if a file that currently
+ * being processed is not found, meaning that all files were already delivered
+ * from archive and the requested file is one that was never stored
+ * in the archive.
+ */
+void
+WALPrefetchWorkerMain(Datum main_arg)
+{
+	int			rc;
+	char		xlogpath[MAXPGPATH];
+	char		xlogfnext[MAXFNAMELEN];
+	unsigned	increment;
+	XLogSegNo	nextSegNo;
+	uint16		bgwid;
+
+	/* Establish signal handlers. */
+	pqsignal(SIGTERM, die);
+	/* We're now ready to receive signals. */
+	BackgroundWorkerUnblockSignals();
+
+	/* Get RestoreSlot */
+	bgwid = DatumGetUInt16(main_arg);
+
+	nextSegNo = RestoreData->restartSegNo + bgwid;
+	increment = wal_prefetch_workers;
+
+	OwnLatch(&RestoreData->slots[bgwid].continuePrefetching);
+
+	/*
+	 * Notify invoker that the WAL files prefetching worker has just been
+	 * successfully started.
+	 */
+	RestoreData->slots[bgwid].workerStarted = true;
+	SetLatch(&RestoreData->slots[bgwid].workerReady);
+
+	while (true)
+	{
+		SuspendPrefetchingIfRequired(bgwid);
+
+		XLogFileName(xlogfnext, RestoreData->restartTli, nextSegNo,
+					 wal_segment_size);
+
+		/* Prepare path. */
+		XLogFilePathPrefetch(xlogpath, xlogfnext);
+
+		/*
+		 * Make sure there is no such file in a directory for prefetched
+		 * files.
+		 */
+		FileUnlink(xlogpath);
+
+		/* Prepare and execute the restore command. */
+		if ((rc = DoRestore(xlogpath, xlogfnext, RestoreData->pointfname)))
+		{
+			FileUnlink(xlogpath);
+
+			if (wait_result_is_any_signal(rc, true))
+				proc_exit(1);
+
+			if (!FilePathExists(xlogpath))
+				SignalFileNotExist(xlogfnext);
+			else
+
+				/*
+				 * Although execution of external program specified by the GUC
+				 * parameter 'restore_command' can failed since there is no
+				 * such file in archive, a file with this name can exist in
+				 * prefetch directory since it left from the last server start
+				 * up. If it's true put the file name into the hash table and
+				 * wake up the thread that is waiting in RestoreCommandXLog()
+				 * to continue WAL file processing.
+				 */
+				SignalFileDelivered(xlogfnext);
+
+			ereport(INFO,
+					(errmsg("could not restore file \"%s\" from archive: %s",
+							xlogfnext, wait_result_to_str(rc))));
+
+			break;
+		}
+		CHECK_FOR_INTERRUPTS();
+
+		/*
+		 * Check that file has been really written to file system and if it
+		 * does then wake up the thread that is waiting in
+		 * RestoreCommandXLog() to continue WAL file processing.
+		 */
+		if (FilePathExists(xlogpath))
+		{
+			SignalFileDelivered(xlogfnext);
+
+			ereport(INFO, errmsg("The file %s was retrieved to \"%s\"",
+								 xlogfnext, xlogpath));
+		}
+		else
+		{
+			/*
+			 * DoRestore() finished with success that means invocation of
+			 * system() API function completed without error. On the other
+			 * hand, the requested file is not found. That means something
+			 * wrong happened with script run by the API function system(),
+			 * e.g. the script doens't do really something useful, or may be
+			 * it put a file to a wrong destination. Anyway, it is time to
+			 * exit and give a chance to system administrator to fix the
+			 * issue.
+			 */
+			SignalFileNotExist(xlogfnext);
+
+			ereport(INFO, errmsg("The file %s is not found", xlogfnext));
+			break;
+		}
+
+		nextSegNo = nextSegNo + increment;
+	}
+	proc_exit(0);
+}
+
+/*
+ * Setup and spawn bgworker to prefetch WAL files from archive.
+ *
+ * Input:
+ *		bgwid - sequence number of bgworker process we are going to spawn
+ *
+ * Returns true on success and false on failure.
+ */
+static bool
+SpawnWALPrefetchWorker(uint16 bgwid)
+{
+	BackgroundWorker bgw;
+	RestoreSlot *slot = &RestoreData->slots[bgwid];
+
+	memset(&bgw, 0, sizeof(bgw));
+	snprintf(bgw.bgw_name, sizeof(bgw.bgw_name), "WAL prefetching worker #%d",
+			 bgwid);
+
+	/*
+	 * Length of the string literal "Restore Command Worker" is less than size
+	 * of a buffer referenced by the data member bgw.bgw_type (the size is
+	 * limited by the constant BGW_MAXLEN that currently has value 96).
+	 * Therefore we can use function strcpy() instead of strncpy/strlcpy to
+	 * copy the string literal into the buffer bgw.bgw_type. The same is true
+	 * for other two string literals "postgres" and "RestoreCommandWorkerMain"
+	 * and their corresponding destination buffers referenced by the data
+	 * members bgw.bgw_library_name, bgw.bgw_function_name. To guards against
+	 * further possible change of limit represented by the constant BGW_MAXLEN
+	 * the asserts have been inserted before invoking of the function strcpy()
+	 * as a sanity check. In case some of these asserts be fired it means that
+	 * some really drastic change was done in the core source code that should
+	 * be carefully studied.
+	 */
+	Assert(sizeof(bgw.bgw_type) >= sizeof("WAL files pre-fetching Worker"));
+	Assert(sizeof(bgw.bgw_library_name) >= sizeof("postgres"));
+	Assert(sizeof(bgw.bgw_function_name) >= sizeof("WALPrefetchWorkerMain"));
+
+	strcpy(bgw.bgw_type, "WAL files pre-fetching Worker");
+	strcpy(bgw.bgw_library_name, "postgres");
+	strcpy(bgw.bgw_function_name, "WALPrefetchWorkerMain");
+
+	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
+
+	/*
+	 * BgWorkerStart_PostmasterStart for PM_RECOVERY, PM_STARTUP
+	 * BgWorkerStart_ConsistentState for PM_HOT_STANDBY
+	 */
+	bgw.bgw_start_time = HotStandbyActive() ? BgWorkerStart_ConsistentState :
+		BgWorkerStart_PostmasterStart;
+
+	bgw.bgw_restart_time = BGW_NEVER_RESTART;
+
+	/*
+	 * The value of bgw.bgw_main_arg is passed as an argument to the function
+	 * WALPrefetchWorkerMain()
+	 */
+	bgw.bgw_main_arg = UInt16GetDatum(bgwid);
+	bgw.bgw_notify_pid = MyProcPid;
+
+	return RegisterDynamicBackgroundWorker(&bgw, &slot->bgwhandle);
+}
+
+/*
+ * Terminate bgworker process whose slot addressed by specified index and
+ * free memory allocated for the slot.
+ *
+ * Input:
+ *		slot_idx -	index of a slot in the array RestoreData->slot[] that
+ *					contains data about bgworker process.
+ */
+static void
+ShutdownWALPrefetchWorker(uint16 slot_idx)
+{
+	RestoreSlot *slot = &RestoreData->slots[slot_idx];
+
+	if (slot->bgwhandle != NULL)
+	{
+		TerminateBackgroundWorker(slot->bgwhandle);
+		pfree(slot->bgwhandle);
+		slot->bgwhandle = NULL;
+	}
+}
+
+/*
+ * Stop every WAL prefetching process started from the last spawned one
+ * specified by the parameter failed_process_idx. This function is called
+ * either on postmaster shutdown or on postmaster starting up in case some of
+ * WAL prefetching workers failed to start.
+ *
+ * Input:
+ *		failed_process_idx - sequence number (starting from 0) of a bgworker
+ *							 process that failed to start.
+ */
+static void
+ShutdownWALPrefetchWorkers(int last_process_idx)
+{
+	while (last_process_idx > 0)
+		ShutdownWALPrefetchWorker(--last_process_idx);
+
+	WALPrefetchingState = WALPrefetchingShutdown;
+}
+
+/*
+ * Start bgworker processes for retrieving WAL files from archive in
+ * pre-fetching mode. Wait until all spawned processes be run. A number of
+ * bgworker processes to spawn is determined by the GUC parameter
+ * wal_prefetch_workers.
+ *
+ * Input:
+ *		xlogfname - the name of a WAL file from which to start recovery
+ *
+ * Throw error if any of WAL files pre-fetching workers fail to start.
+ */
+static void
+StartWALPrefetchWorkers(const char *xlogfname)
+{
+	int			i;
+
+	if (WALPrefetchingState != WALPrefetchingIsInactive)
+		return;
+
+	XLogFromFileName(xlogfname, &RestoreData->restartTli,
+					 &RestoreData->restartSegNo, wal_segment_size);
+
+	for (i = 0; i < wal_prefetch_workers; ++i)
+		OwnLatch(&RestoreData->slots[i].workerReady);
+
+	OwnLatch(&RestoreData->fileAvailable);
+
+	for (i = 0; i < wal_prefetch_workers; ++i)
+	{
+		if (!SpawnWALPrefetchWorker(i))
+		{
+			ShutdownWALPrefetchWorkers(i);
+			ereport(FATAL,
+					(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+					 errmsg("could not run background process for WAL files "
+							"pre-fetching")));
+		}
+
+	}
+
+	/*
+	 * Wait until all spawned workers will be successfully started.
+	 */
+	for (i = 0; i < wal_prefetch_workers; ++i)
+	{
+		while (!RestoreData->slots[i].workerStarted)
+		{
+			(void) WaitLatch(&RestoreData->slots[i].workerReady,
+							 WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
+							 0, PG_WAIT_EXTENSION);
+			ResetLatch(&RestoreData->slots[i].workerReady);
+		}
+	}
+
+	WALPrefetchingState = WALPrefetchingIsActive;
+}
+
+/*
+ * Get a path to the WAL file pre-fetched from archive.
+ */
+static void
+XLogFilePathPrefetch(char *path, const char *xlogfname)
+{
+	snprintf(path, MAXPGPATH, PREFETCH_DIR "/%s", xlogfname);
+}
+
+/*
+ * Check that the path does exist.
+ * Return:
+ *		true if file does exist, else false.
+ * Throw error on failure.
+ */
+static bool
+FilePathExists(const char *xlogpath)
+{
+	struct stat statbuf;
+
+	if (stat(xlogpath, &statbuf) == 0)
+		return true;
+
+	if (errno != ENOENT)
+		ereport(FATAL,
+				(errcode_for_file_access(),
+				 errmsg("could not stat file \"%s\": %m",
+						xlogpath)));
+
+	return false;
+}
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 32a3099c1f4..1b826d0719a 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -18,13 +18,16 @@
 #include "postgres.h"
 
 #include <unistd.h>
+#include <sys/stat.h>
 
 #include "access/timeline.h"
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
+#include "common/archive.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/startup.h"
 #include "storage/smgr.h"
 #include "utils/guc.h"
 #include "utils/hsearch.h"
@@ -979,3 +982,162 @@ WALReadRaiseError(WALReadError *errinfo)
 						(Size) errinfo->wre_req)));
 	}
 }
+
+/*
+ * Remove a file if it does exist.
+ */
+void
+FileUnlink(const char *file_path)
+{
+	struct stat statbuf;
+
+	if (stat(file_path, &statbuf))
+	{
+		if (errno != ENOENT)
+			ereport(FATAL,
+					(errcode_for_file_access(),
+					 errmsg("could not stat file \"%s\": %m",
+							file_path)));
+	}
+	else
+	{
+		if (unlink(file_path) != 0)
+			ereport(FATAL,
+					(errcode_for_file_access(),
+					 errmsg("could not remove file \"%s\": %m",
+							file_path)));
+	}
+}
+
+/*
+ * Get the last valid restart point file name.
+ *
+ * If cleanup is not enabled, initialise the last restart point file name
+ * with InvalidXLogRecPtr, which will prevent the deletion of any WAL files
+ * from the archive because of the alphabetic sorting property of WAL
+ * filenames.
+ */
+void
+XLogFileNameLastPoint(char *lastRestartPointFname, bool cleanupEnabled)
+{
+	XLogSegNo	restartSegNo;
+	XLogRecPtr	restartRedoPtr;
+	TimeLineID	restartTli;
+
+	if (cleanupEnabled)
+	{
+		GetOldestRestartPoint(&restartRedoPtr, &restartTli);
+		XLByteToSeg(restartRedoPtr, restartSegNo, wal_segment_size);
+		XLogFileName(lastRestartPointFname, restartTli, restartSegNo,
+					 wal_segment_size);
+	}
+	else
+		XLogFileName(lastRestartPointFname, 0, 0L, wal_segment_size);
+}
+
+/*
+ * Check a file is really there now and has correct size.
+ *
+ * Return true if the file does exist and has correct size,
+ * else return false.
+ *
+ * If the output variable file_not_found is not null it's assigned
+ * either true or false value depending on whether the file does exist
+ * or not.
+ */
+bool
+FileValidateSize(const char *xlogpath, off_t expectedSize,
+				 const char *xlogfname, bool *file_not_found)
+{
+	struct stat stat_buf;
+
+	if (stat(xlogpath, &stat_buf) == 0)
+	{
+		if (file_not_found)
+			*file_not_found = false;
+
+		if (expectedSize > 0 && stat_buf.st_size != expectedSize)
+		{
+			int			elevel;
+
+			/*
+			 * If we find a partial file in standby mode, we assume it's
+			 * because it's just being copied to the archive, and keep trying.
+			 *
+			 * Otherwise treat a wrong-sized file as FATAL to ensure the DBA
+			 * would notice it, but is that too strong? We could try to plow
+			 * ahead with a local copy of the file ... but the problem is that
+			 * there probably isn't one, and we'd incorrectly conclude we've
+			 * reached the end of WAL and we're done recovering ...
+			 */
+			if (StandbyMode && stat_buf.st_size < expectedSize)
+				elevel = DEBUG1;
+			else
+				elevel = FATAL;
+			ereport(elevel,
+					(errmsg("archive file \"%s\" has wrong size: %lld instead of %lld",
+							xlogfname,
+							(long long int) stat_buf.st_size,
+							(long long int) expectedSize)));
+			return false;
+		}
+		else
+			return true;
+	}
+	else
+	{
+		/* stat failed */
+		if (errno != ENOENT)
+			ereport(FATAL,
+					(errcode_for_file_access(),
+					 errmsg("could not stat file \"%s\": %m",
+							xlogpath)));
+		if (file_not_found)
+			*file_not_found = true;
+
+		return false;
+	}
+
+}
+
+/*
+ * Build and execute restore_command.
+ *
+ * Return the result of command execution (the exit status of the shell),
+ * or -1 if a system error occurred. A return value of 127 means
+ * the execution of the shell failed.
+ */
+int
+DoRestore(const char *xlogpath, const char *xlogfname, const char *pointfname)
+{
+	char	   *xlogRestoreCmd;
+	int			rc;
+
+	/* Build a restore command to execute */
+	xlogRestoreCmd = BuildRestoreCommand(recoveryRestoreCommand, xlogpath,
+										 xlogfname, pointfname);
+
+	if (xlogRestoreCmd == NULL)
+		elog(PANIC, "could not build restore command \"%s\"",
+			 recoveryRestoreCommand);
+
+	ereport(DEBUG3,
+			(errmsg_internal("executing restore command \"%s\"",
+							 xlogRestoreCmd)));
+
+	/*
+	 * Check signals before restore command and reset afterwards.
+	 */
+	PreRestoreCommand();
+
+	/*
+	 * Execute
+	 */
+	rc = system(xlogRestoreCmd);
+
+	PostRestoreCommand();
+
+	pfree(xlogRestoreCmd);
+
+	return rc;
+}
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 5a9a0e34353..7e2b2db6a27 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "access/parallel.h"
+#include "access/xlogrestore.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -128,6 +129,9 @@ static const struct
 	},
 	{
 		"ApplyWorkerMain", ApplyWorkerMain
+	},
+	{
+		"WALPrefetchWorkerMain", WALPrefetchWorkerMain
 	}
 };
 
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 96c2aaabbd6..93e0167454f 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
 #include "access/subtrans.h"
 #include "access/syncscan.h"
 #include "access/twophase.h"
+#include "access/xlogrestore.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -149,6 +150,7 @@ CreateSharedMemoryAndSemaphores(void)
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+		size = add_size(size, RestoreCommandShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -259,6 +261,7 @@ CreateSharedMemoryAndSemaphores(void)
 	WalSndShmemInit();
 	WalRcvShmemInit();
 	ApplyLauncherShmemInit();
+	RestoreCommandShmemInit();
 
 	/*
 	 * Set up other modules that need some shared memory space
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 02d2d267b5c..8e62bbe3014 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -37,6 +37,7 @@
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/xlogrestore.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_authid.h"
 #include "catalog/storage.h"
@@ -225,6 +226,8 @@ static bool check_recovery_target_lsn(char **newval, void **extra, GucSource sou
 static void assign_recovery_target_lsn(const char *newval, void *extra);
 static bool check_primary_slot_name(char **newval, void **extra, GucSource source);
 static bool check_default_with_oids(bool *newval, void **extra, GucSource source);
+static bool check_wal_prefetch_workers(int *newval, void **extra,
+									   GucSource source);
 
 /* Private functions in guc-file.l that need to be called from guc.c */
 static ConfigVariable *ProcessConfigFileInternal(GucContext context,
@@ -3399,13 +3402,38 @@ static struct config_int ConfigureNamesInt[] =
 		check_huge_page_size, NULL, NULL
 	},
 
+	{
+		{"wal_max_prefetch_amount",
+			PGC_POSTMASTER,
+			WAL_ARCHIVE_RECOVERY,
+			gettext_noop("Set a max number of WAL files to keep up prefetched "
+						 "from archive"),
+			NULL
+		},
+		&wal_max_prefetch_amount,
+		DEFAULT_WAL_MAX_PREFETCH_AMOUNT, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"wal_prefetch_workers",
+			PGC_POSTMASTER,
+			WAL_ARCHIVE_RECOVERY,
+			gettext_noop("Set a number of background workers to run for "
+						 "prefetching WAL files from archive"),
+			NULL
+		},
+		&wal_prefetch_workers,
+		DEFAULT_TOTAL_PREFETCH_WORKERS, 0, MAX_BACKENDS,
+		check_wal_prefetch_workers, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
 	}
 };
 
-
 static struct config_real ConfigureNamesReal[] =
 {
 	{
@@ -11662,6 +11690,20 @@ check_max_worker_processes(int *newval, void **extra, GucSource source)
 	return true;
 }
 
+static bool
+check_wal_prefetch_workers(int *newval, void **extra, GucSource source)
+{
+	if (*newval > max_worker_processes)
+	{
+		GUC_check_errdetail("A value of wal_prefetch_workers can't exceed "
+							"a value of max_worker_processes=%d",
+							max_worker_processes);
+		return false;
+	}
+
+	return true;
+}
+
 static bool
 check_effective_io_concurrency(int *newval, void **extra, GucSource source)
 {
diff --git a/src/include/access/xlogrestore.h b/src/include/access/xlogrestore.h
new file mode 100644
index 00000000000..c657dd8c6e2
--- /dev/null
+++ b/src/include/access/xlogrestore.h
@@ -0,0 +1,43 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogrestore.h
+ *		Prototypes and definitions for parallel restore commands execution
+ *
+ * Copyright (c) 2020, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/include/access/xlogrestore.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef XLOGRESTORE_H
+#define XLOGRESTORE_H
+
+#include "postgres.h"
+
+/* GUC variables */
+extern int	wal_max_prefetch_amount;
+extern int	wal_prefetch_workers;
+
+/*
+ * Default max number of WAL files for pre-fetching from archive.
+ * Zero value means that WAL files prefetching is turned off by default.
+ */
+#define DEFAULT_WAL_MAX_PREFETCH_AMOUNT	0
+
+/*
+ * Default value for a number of prefetch workers spawned by postmaster
+ * on server startup for database recovering from archive.
+ */
+#define DEFAULT_TOTAL_PREFETCH_WORKERS 2
+
+extern Size RestoreCommandShmemSize(void);
+extern void RestoreCommandShmemInit(void);
+extern bool RestoreCommandXLog(char *path, const char *xlogfname,
+							   const char *recovername,
+							   const off_t expectedSize,
+							   bool cleanupEnabled);
+extern void WALPrefetchWorkerMain(Datum main_arg) pg_attribute_noreturn();
+
+#endif							/* XLOGRESTORE_H */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index e59b6cf3a9f..98078518e88 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -60,4 +60,13 @@ extern void XLogReadDetermineTimeline(XLogReaderState *state,
 
 extern void WALReadRaiseError(WALReadError *errinfo);
 
+extern void FileUnlink(const char *xlogpath);
+extern void XLogFileNameLastPoint(char *lastRestartPointFname,
+								  bool cleanupEnabled);
+extern bool FileValidateSize(char const *xlogpath, off_t expectedSize,
+							 char const *xlogfname, bool *file_not_found);
+
+extern int	DoRestore(char const *xlogpath, char const *xlogfname,
+					  char const *pointfname);
+
 #endif
diff --git a/src/test/recovery/t/021_xlogrestore.pl b/src/test/recovery/t/021_xlogrestore.pl
new file mode 100644
index 00000000000..df5d573a4a7
--- /dev/null
+++ b/src/test/recovery/t/021_xlogrestore.pl
@@ -0,0 +1,138 @@
+#
+# Test for xlogrestore with wal_max_prefetch_amount parameter
+#
+
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 9;
+
+sub measure_replica_restore_time
+{
+	my ( $replica_name, $node_primary, $backup_name, $last_lsn, $tab_int_count, $config ) = @_;
+	my $timer = time();
+
+	# Initialize replica node from backup, fetching WAL from archives
+	my $node_replica = get_new_node( $replica_name );
+	$node_replica->init_from_backup( $node_primary, $backup_name,
+		has_restoring => 1 );
+	$node_replica->append_conf( 'postgresql.conf', $config );
+	$node_replica->start();
+
+	# Wait until necessary replay has been done on replica
+	my $caughtup_query =
+	  "SELECT '$last_lsn'::pg_lsn <= pg_last_wal_replay_lsn()";
+	$node_replica->poll_query_until( 'postgres', $caughtup_query )
+	  or die "Timed out while waiting for replica to catch up";
+
+	# Check tab_int's rows count
+	my $replica_tab_int_count =
+	  $node_replica->safe_psql( 'postgres', "SELECT count(*) FROM tab_int" );
+	is( $replica_tab_int_count, $tab_int_count, 'tab_int sizes are equal' );
+
+	# Check the presence of temporary files specifically generated during
+	# archive recovery.
+	$node_replica->promote();
+
+	my $node_replica_data = $node_replica->data_dir;
+	ok( !-f "$node_replica_data/pg_wal/RECOVERYHISTORY",
+		"RECOVERYHISTORY removed after promotion");
+	ok( !-f "$node_replica_data/pg_wal/RECOVERYXLOG",
+			"RECOVERYXLOG removed after promotion");
+	ok( !-d "$node_replica_data/pg_wal/prefetch",
+		"pg_wal/prefetch dir removed after promotion");
+
+	my $res = time() - $timer;
+
+	$node_replica->stop();
+	return $res;
+}
+
+# WAL produced count
+my $wal_count = 64;
+
+# Size of data portion
+my $wal_data_portion = 128;
+
+# Sleep to imitate restore delays
+my $restore_sleep = 0.256;
+
+# Initialize primary node, doing archives
+my $node_primary = get_new_node( 'primary' );
+$node_primary->init(
+	has_archiving    => 1,
+	allows_streaming => 1
+);
+
+# Start it
+$node_primary->start;
+
+# Take backup for replica.
+my $backup_name = 'my_backup';
+$node_primary->backup( $backup_name );
+
+# Create some content on primary server that won't be present on replicas.
+for ( my $i = 0; $i < $wal_count; $i++ )
+{
+	if ( $i == 0 ) {
+		$node_primary->safe_psql('postgres',
+			"CREATE TABLE tab_int ( a SERIAL NOT NULL PRIMARY KEY );")
+	} else {
+		$node_primary->safe_psql('postgres',
+			"INSERT INTO tab_int SELECT FROM generate_series( 1, $wal_data_portion );")
+	}
+	$node_primary->safe_psql('postgres', "SELECT pg_switch_wal()");
+}
+
+my $last_lsn = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn();");
+my $tab_int_count = $node_primary->safe_psql('postgres', "SELECT count(*) FROM tab_int;");
+
+$node_primary->stop();
+
+#	Restore command
+my $restore_command;
+my $path = TestLib::perl2host( $node_primary->archive_dir );
+if ( $TestLib::windows_os ) {
+	$path =~ s{\\}{\\\\}g;
+	$restore_command = qq(perl -e "select( undef, undef, undef, $restore_sleep );" & copy "$path\\\\%f" "%p);
+} else {
+	$restore_command = qq(sleep ) . $restore_sleep . qq( && cp "$path/%f" "%p");
+}
+
+# DEBUG: Don't forget remove it
+diag("restore_command=$restore_command");
+
+# Compare the replica restore times with different max_prefetch_workers value.
+diag('Run database restoring with prefetching of WAL files');
+my $multiple_workers_restore_time = measure_replica_restore_time(
+	'fast_restored_replica',
+	$node_primary,
+	$backup_name,
+	$last_lsn,
+	$tab_int_count,
+qq(
+wal_retrieve_retry_interval = '100ms'
+wal_max_prefetch_amount = 8
+wal_prefetch_workers = 4
+restore_command = '$restore_command'
+log_min_messages = INFO
+));
+
+diag('Run database restoring in regular way without prefetching of WAL files');
+my $single_worker_restore_time = measure_replica_restore_time(
+	'normal_restored_replica',
+	$node_primary,
+	$backup_name,
+	$last_lsn,
+	$tab_int_count,
+qq(
+wal_retrieve_retry_interval = '100ms'
+wal_max_prefetch_amount = 0
+restore_command = '$restore_command'
+));
+
+diag("multiple_workers_restore_time = $multiple_workers_restore_time");
+diag("single_worker_restore_time = $single_worker_restore_time");
+
+ok( $multiple_workers_restore_time < $single_worker_restore_time, "Multiple workers are faster than a single worker" );
diff --git a/src/test/regress/expected/guc.out b/src/test/regress/expected/guc.out
index 811f80a0976..8283816e3c8 100644
--- a/src/test/regress/expected/guc.out
+++ b/src/test/regress/expected/guc.out
@@ -776,3 +776,24 @@ set default_with_oids to f;
 -- Should not allow to set it to true.
 set default_with_oids to t;
 ERROR:  tables declared WITH OIDS are not supported
+--
+-- Check that a value for the new configration parameter
+-- wal_prefetch_workers is limited by a value of the parameter
+-- max_worker_processes
+SHOW max_worker_processes;
+ max_worker_processes 
+----------------------
+ 8
+(1 row)
+
+-- max_worker_processes has default value 8
+-- Check that an attempt to set the parameter wal_prefetch_workers
+-- to a value execeeding this limit results in error
+ALTER SYSTEM SET wal_prefetch_workers = 16; -- fails, it is expected behaviour
+ERROR:  invalid value for parameter "wal_prefetch_workers": 16
+DETAIL:  A value of wal_prefetch_workers can't exceed a value of max_worker_processes=8
+-- Check that a value lesser than max_worker_processes can be assigned
+-- to the parameter wal_prefetch_workers
+ALTER SYSTEM SET wal_prefetch_workers = 7; -- ok since 7 < max_worker_processes
+-- Reset to default
+ALTER SYSTEM RESET wal_prefetch_workers;
diff --git a/src/test/regress/sql/guc.sql b/src/test/regress/sql/guc.sql
index 43dbba3775e..d9215da9ee3 100644
--- a/src/test/regress/sql/guc.sql
+++ b/src/test/regress/sql/guc.sql
@@ -296,3 +296,22 @@ reset check_function_bodies;
 set default_with_oids to f;
 -- Should not allow to set it to true.
 set default_with_oids to t;
+
+--
+-- Check that a value for the new configration parameter
+-- wal_prefetch_workers is limited by a value of the parameter
+-- max_worker_processes
+SHOW max_worker_processes;
+
+-- max_worker_processes has default value 8
+-- Check that an attempt to set the parameter wal_prefetch_workers
+-- to a value execeeding this limit results in error
+
+ALTER SYSTEM SET wal_prefetch_workers = 16; -- fails, it is expected behaviour
+
+-- Check that a value lesser than max_worker_processes can be assigned
+-- to the parameter wal_prefetch_workers
+ALTER SYSTEM SET wal_prefetch_workers = 7; -- ok since 7 < max_worker_processes
+
+-- Reset to default
+ALTER SYSTEM RESET wal_prefetch_workers;
-- 
2.24.3 (Apple Git-128)

#10David Steele
david@pgmasters.net
In reply to: Dmitry Shulga (#9)
Re: Reduce the time required for a database recovery from archive.

Hi Dimtry,

On 1/11/21 2:51 AM, Dmitry Shulga wrote:

Hi Stephen

Based on our last discussion I redesigned the implementation of WAL
archive recovery speed-up.

Seems like there should have been a patch attached? In any case the
current patch no longer applies so marked Waiting on Author.

Personally, I'm not too keen on this patch as implemented. Several
third-party backup solutions support parallel archive get so it would be
nice to support an interface that simply says to the restore_command,
"go get 1gb of WAL and write the files here." This patch still assumes
that the user has written their own restore command, which is
third-party by definition, so I can't see how interfacing with
third-party software is an issue here.

Also, having multiple workers blindly asking for WAL can cause quite a
bit of traffic and cost because PG knows what WAL it wants but it
doesn't know what exists. On the other hand, a backup solution can
cheaply determine what is available to prevent hammering the archive
with requests for files that don't exist.

Regards,
--
-David
david@pgmasters.net

#11Andrey Borodin
x4mmm@yandex-team.ru
In reply to: David Steele (#10)
Re: Reduce the time required for a database recovery from archive.

18 марта 2021 г., в 20:04, David Steele <david@pgmasters.net> написал(а):
it would be nice to support an interface that simply says to the restore_command, "go get 1gb of WAL and write the files here."

+1 to redesigning restore_command and archive_command.

Best regards, Andrey Borodin.

#12David Steele
david@pgmasters.net
In reply to: Andrey Borodin (#11)
Re: Reduce the time required for a database recovery from archive.

On 3/18/21 11:37 AM, Andrey Borodin wrote:

18 марта 2021 г., в 20:04, David Steele <david@pgmasters.net> написал(а):
it would be nice to support an interface that simply says to the restore_command, "go get 1gb of WAL and write the files here."

+1 to redesigning restore_command and archive_command.

Indeed, archive_command would benefit from the same treatment. The need
to call archive_command for each WAL segment even when parallel
processing is going on behind the scenes is a bottleneck.

Larger WAL segments sizes can be used to mitigate this issue but an
improvement would be welcome.

Regards,
--
-David
david@pgmasters.net

#13Marina Polyakova
m.polyakova@postgrespro.ru
In reply to: David Steele (#10)
1 attachment(s)
Re: Reduce the time required for a database recovery from archive.

Hello everyone in this thread!

On 2021-03-18 18:04, David Steele wrote:

Seems like there should have been a patch attached?

IMO there's a technical problem with sending, receiving (or displaying
on the site) emails from the list pgsql-hackers. By subsribing to this
list I received the attached patch from the email [1]/messages/by-id/4047CC05-1AF5-454B-850B-ED37374A2AC0@postgrespro.ru. And my colleague
Roman Zharkov said that the button 'Resend email' from that link helped
him to receive the email with the attached patch. On the other hand
follwing this link in the browser I do not see the attached patch. Do
you think it is worth to write about this issue to
webmaster(dot)postgresql(dot)org?..

Just in case I'm lucky this email contains the lost patch.

[1]: /messages/by-id/4047CC05-1AF5-454B-850B-ED37374A2AC0@postgrespro.ru
/messages/by-id/4047CC05-1AF5-454B-850B-ED37374A2AC0@postgrespro.ru

--
Marina Polyakova
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

Attachments:

0001-Reduce-time-required-to-recover-database-from-archiv.patchtext/x-diff; name=0001-Reduce-time-required-to-recover-database-from-archiv.patchDownload
From c071e8ee78aac811feaf54c4374c1a998409733e Mon Sep 17 00:00:00 2001
From: Dmitry Shulga <d.shulga@postgrespro.ru>
Date: Fri, 18 Dec 2020 12:38:58 +0700
Subject: [PATCH] Reduce time required to recover database from archive.

Originally database recovering from archive was performed by
sequential receiving of files with WAL records and applying them against
the database. Delivering of files containing WAL records are performed
by running a command specified by the GUC parameter restore_command.
In case receiving of every file containing WAL records takes long time
it results in standing idle most of time waiting until files be received.
If time required to apply WAL records from an archive file is significantly
lesser than time required to deliver the file from archive it leads
to nonproductive standing idle after current WAL segment is applied and
before next WAL segment be received.  As a consequence a wall time required
to recover a database from archive log can be unacceptably long.

To reduce total time required to restore database from archive the procedure
for delivering of WAL files was redesigned in order to allow concurrent
loading of WAL files. At postmaster start a few background processes are
spawned to load WAL files from archive in parallel. A number of processes
that started to perform preloading of WAL files is determined by
the new GUC parameter wal_prefetch_workers. A number of WAL files to prefetch
from archive is limited by the new GUC parameter wal_max_prefetch_amount.

Additionally, refactoring was done to extract duplicate code, used
in the files xlogarchive.c and xlogrestore.c, into stanalone functions and
move it to the file xlogutils.c

Author: Dmitry Shulga
Reviewed-by: Anna Akenteva
Tested-by: Roman Zharkov

diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 595e02de722..ffbf8090f45 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -32,7 +32,8 @@ OBJS = \
 	xlogfuncs.o \
 	xloginsert.o \
 	xlogreader.o \
-	xlogutils.o
+	xlogutils.o \
+	xlogrestore.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 13f1d8c3dc7..f0a0c68725e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -37,6 +37,7 @@
 #include "access/xloginsert.h"
 #include "access/xlogreader.h"
 #include "access/xlogutils.h"
+#include "access/xlogrestore.h"
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
 #include "catalog/pg_database.h"
@@ -3684,10 +3685,11 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 					 xlogfname);
 			set_ps_display(activitymsg);
 
-			restoredFromArchive = RestoreArchivedFile(path, xlogfname,
-													  "RECOVERYXLOG",
-													  wal_segment_size,
-													  InRedo);
+			restoredFromArchive = RestoreCommandXLog(path, xlogfname,
+													 "RECOVERYXLOG",
+													 wal_segment_size,
+													 InRedo);
+
 			if (!restoredFromArchive)
 				return -1;
 			break;
diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c
index f39dc4ddf1a..fb4023f1cec 100644
--- a/src/backend/access/transam/xlogarchive.c
+++ b/src/backend/access/transam/xlogarchive.c
@@ -22,6 +22,7 @@
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
+#include "access/xlogutils.h"
 #include "common/archive.h"
 #include "miscadmin.h"
 #include "postmaster/startup.h"
@@ -55,13 +56,8 @@ RestoreArchivedFile(char *path, const char *xlogfname,
 					bool cleanupEnabled)
 {
 	char		xlogpath[MAXPGPATH];
-	char	   *xlogRestoreCmd;
 	char		lastRestartPointFname[MAXPGPATH];
 	int			rc;
-	struct stat stat_buf;
-	XLogSegNo	restartSegNo;
-	XLogRecPtr	restartRedoPtr;
-	TimeLineID	restartTli;
 
 	/*
 	 * Ignore restore_command when not in archive recovery (meaning we are in
@@ -102,22 +98,7 @@ RestoreArchivedFile(char *path, const char *xlogfname,
 	/*
 	 * Make sure there is no existing file named recovername.
 	 */
-	if (stat(xlogpath, &stat_buf) != 0)
-	{
-		if (errno != ENOENT)
-			ereport(FATAL,
-					(errcode_for_file_access(),
-					 errmsg("could not stat file \"%s\": %m",
-							xlogpath)));
-	}
-	else
-	{
-		if (unlink(xlogpath) != 0)
-			ereport(FATAL,
-					(errcode_for_file_access(),
-					 errmsg("could not remove file \"%s\": %m",
-							xlogpath)));
-	}
+	FileUnlink(xlogpath);
 
 	/*
 	 * Calculate the archive file cutoff point for use during log shipping
@@ -136,97 +117,28 @@ RestoreArchivedFile(char *path, const char *xlogfname,
 	 * flags to signify the point when we can begin deleting WAL files from
 	 * the archive.
 	 */
-	if (cleanupEnabled)
-	{
-		GetOldestRestartPoint(&restartRedoPtr, &restartTli);
-		XLByteToSeg(restartRedoPtr, restartSegNo, wal_segment_size);
-		XLogFileName(lastRestartPointFname, restartTli, restartSegNo,
-					 wal_segment_size);
-		/* we shouldn't need anything earlier than last restart point */
-		Assert(strcmp(lastRestartPointFname, xlogfname) <= 0);
-	}
-	else
-		XLogFileName(lastRestartPointFname, 0, 0L, wal_segment_size);
+	XLogFileNameLastPoint(lastRestartPointFname, cleanupEnabled);
+	Assert(strcmp(lastRestartPointFname, xlogfname) <= 0);
 
-	/* Build the restore command to execute */
-	xlogRestoreCmd = BuildRestoreCommand(recoveryRestoreCommand,
-										 xlogpath, xlogfname,
-										 lastRestartPointFname);
-	if (xlogRestoreCmd == NULL)
-		elog(ERROR, "could not build restore command \"%s\"",
-			 recoveryRestoreCommand);
-
-	ereport(DEBUG3,
-			(errmsg_internal("executing restore command \"%s\"",
-							 xlogRestoreCmd)));
-
-	/*
-	 * Check signals before restore command and reset afterwards.
-	 */
-	PreRestoreCommand();
-
-	/*
-	 * Copy xlog from archival storage to XLOGDIR
-	 */
-	rc = system(xlogRestoreCmd);
-
-	PostRestoreCommand();
-	pfree(xlogRestoreCmd);
+	rc = DoRestore(xlogpath, xlogfname, lastRestartPointFname);
 
 	if (rc == 0)
 	{
-		/*
-		 * command apparently succeeded, but let's make sure the file is
-		 * really there now and has the correct size.
-		 */
-		if (stat(xlogpath, &stat_buf) == 0)
+		bool		file_not_found;
+		bool		ret = FileValidateSize(xlogpath, expectedSize, xlogfname,
+										   &file_not_found);
+
+		if (ret)
 		{
-			if (expectedSize > 0 && stat_buf.st_size != expectedSize)
-			{
-				int			elevel;
-
-				/*
-				 * If we find a partial file in standby mode, we assume it's
-				 * because it's just being copied to the archive, and keep
-				 * trying.
-				 *
-				 * Otherwise treat a wrong-sized file as FATAL to ensure the
-				 * DBA would notice it, but is that too strong? We could try
-				 * to plow ahead with a local copy of the file ... but the
-				 * problem is that there probably isn't one, and we'd
-				 * incorrectly conclude we've reached the end of WAL and we're
-				 * done recovering ...
-				 */
-				if (StandbyMode && stat_buf.st_size < expectedSize)
-					elevel = DEBUG1;
-				else
-					elevel = FATAL;
-				ereport(elevel,
-						(errmsg("archive file \"%s\" has wrong size: %lld instead of %lld",
-								xlogfname,
-								(long long int) stat_buf.st_size,
-								(long long int) expectedSize)));
-				return false;
-			}
-			else
-			{
-				ereport(LOG,
-						(errmsg("restored log file \"%s\" from archive",
-								xlogfname)));
-				strcpy(path, xlogpath);
-				return true;
-			}
+			ereport(LOG,
+					(errmsg("restored log file \"%s\" from archive",
+							xlogfname)));
+			strcpy(path, xlogpath);
+			return true;
 		}
-		else
-		{
-			/* stat failed */
-			int			elevel = (errno == ENOENT) ? LOG : FATAL;
 
-			ereport(elevel,
-					(errcode_for_file_access(),
-					 errmsg("could not stat file \"%s\": %m", xlogpath),
-					 errdetail("restore_command returned a zero exit status, but stat() failed.")));
-		}
+		if (!file_not_found)
+			return false;
 	}
 
 	/*
diff --git a/src/backend/access/transam/xlogrestore.c b/src/backend/access/transam/xlogrestore.c
new file mode 100644
index 00000000000..4cd87e296ef
--- /dev/null
+++ b/src/backend/access/transam/xlogrestore.c
@@ -0,0 +1,951 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogrestore.c
+ *	  Infrastructure for parallel restore commands execution
+ *
+ * Copyright (c) 2020, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/transam/xlogrestore.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "access/xlogrestore.h"
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xlogarchive.h"
+#include "access/xlogdefs.h"
+#include "access/xlogutils.h"
+#include "common/archive.h"
+#include "common/file_perm.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "port.h"
+#include "port/atomics.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/startup.h"
+#include "storage/ipc.h"
+#include "storage/spin.h"
+#include "storage/shmem.h"
+#include "storage/latch.h"
+#include "storage/lock.h"
+#include "tcop/tcopprot.h"
+#include "utils/timestamp.h"
+#include "utils/memutils.h"
+
+/*
+ * The max number of WAL files to prefetch from archive.
+ */
+int			wal_max_prefetch_amount;
+
+/*
+ * Number of background workers to run on postmaster startup for retrieving
+ * WAL files from archive. Zero value of this variable turns off prefetching of
+ * WAL files from archive.
+ */
+int			wal_prefetch_workers;
+
+/*
+ * Data for restore_command bgworker.
+ */
+typedef struct RestoreSlot
+{
+	/*
+	 * The handle corresponding to a running bgworker process.
+	 */
+	BackgroundWorkerHandle *bgwhandle;
+
+	/*
+	 * The latch used for signaling that bgworker can continue downloading of
+	 * WAL files since a number of already pre-fetched	WAL files dropped below
+	 * the limit imposed by the GUC parameter wal_max_prefetch_amount.
+	 */
+	Latch		continuePrefetching;
+
+	/*
+	 * The latch to notify an invoker that a bgworker process has been
+	 * successfully run.
+	 */
+	Latch		workerReady;
+
+	/*
+	 * This flag is set by bgworker process if it was started and run
+	 * successfully.
+	 */
+	bool		workerStarted;
+} RestoreSlot;
+
+typedef struct PrefetchedFile
+{
+	/*
+	 * The name of the archive file %f.
+	 */
+	char		xlogfname[MAXFNAMELEN];
+} PrefetchedFile;
+
+/*
+ * Type of values stored in hash table RestoreDataStruct->hashtab
+ */
+typedef struct PrefetchedFileEntry
+{
+	PrefetchedFile key;
+
+	/*
+	 * True if a file with a name equals to the key does exist on a file
+	 * system, else false.
+	 */
+	bool		file_exist;
+
+	/*
+	 * True if a file with a name equals to the key has been already processed
+	 * during recovery procedure.
+	 */
+	bool		file_was_processed;
+} PrefetchedFileEntry;
+
+typedef struct RestoreDataStruct
+{
+	/*
+	 * The lock to guard against concurrent modification of structure's
+	 * members from parallel running threads.
+	 */
+	slock_t		lock;
+
+	/*
+	 * The latch to support for producer/consumer pattern. Producers are
+	 * bgworker processes pre-fetching WAL files from archive, Consumer is a
+	 * recovery process who waiting until the next required WAL file be
+	 * downloaded from archive to continue database recovering. This latch is
+	 * used for notifying the consumer that a new file was retrieved by one of
+	 * running producers.
+	 */
+	Latch		fileAvailable;
+
+	/*
+	 * Hash table to trace what WAL files have been pre-fetched.
+	 */
+	HTAB	   *hashtab;
+
+	/*
+	 * The name of the last recovery point file %r.
+	 */
+	char		pointfname[MAXFNAMELEN];
+
+	/*
+	 * TLI and an initial segment number from which to start a database
+	 * recovery
+	 */
+	XLogSegNo	restartSegNo;
+	TimeLineID	restartTli;
+
+	/*
+	 * Number of pre-fetched WAL files.
+	 */
+	int			nprefetched;
+
+	/*
+	 * Data for background workers.
+	 */
+	RestoreSlot slots[FLEXIBLE_ARRAY_MEMBER];
+
+} RestoreDataStruct;
+
+static RestoreDataStruct *RestoreData = NULL;
+
+typedef enum WALPrefetchingState_e
+{
+	WALPrefetchingIsInactive,
+	WALPrefetchingIsActive,
+	WALPrefetchingShutdown
+} WALPrefetchingState_e;
+
+static WALPrefetchingState_e WALPrefetchingState = WALPrefetchingIsInactive;
+
+static void XLogFilePathPrefetch(char *path, const char *xlogfname);
+static bool FilePathExists(const char *xlogpath);
+static void StartWALPrefetchWorkers(const char *xlogfname);
+static void ShutdownWALPrefetchWorkers(int last_process_idx);
+static bool WaitUntilFileRetrieved(const char *xlogfname,
+								   bool *wal_file_processed);
+
+/*
+ * Calculate a size of shared memory used for storing bgworker slots.
+ */
+Size
+RestoreCommandShmemSize(void)
+{
+	Size		size;
+
+	size = sizeof(RestoreDataStruct);
+	size = MAXALIGN(size);
+	size = add_size(size, mul_size(wal_prefetch_workers, sizeof(RestoreSlot)));
+	return size;
+}
+
+#define PREFETCH_DIR XLOGDIR "/" PG_TEMP_FILES_DIR
+
+/*
+ * Create a temporary directory to store prepfetched files
+ * and initialize a shared memory used for storing bgworker slots.
+ */
+void
+RestoreCommandShmemInit(void)
+{
+	bool		found;
+
+	RestoreData = (RestoreDataStruct *)
+		ShmemInitStruct("Restore Command Workers Data",
+						RestoreCommandShmemSize(),
+						&found);
+
+	if (!found)
+	{
+		int			i;
+		HASHCTL		hash_ctl;
+
+		memset(RestoreData, 0, RestoreCommandShmemSize());
+
+		SpinLockInit(&RestoreData->lock);
+
+		InitSharedLatch(&RestoreData->fileAvailable);
+
+		/* Create the hash table */
+		memset(&hash_ctl, 0, sizeof(hash_ctl));
+
+		hash_ctl.keysize = sizeof(PrefetchedFile);
+		hash_ctl.entrysize = sizeof(PrefetchedFileEntry);
+
+		RestoreData->hashtab = ShmemInitHash("Pre-fetched WAL files",
+											 wal_max_prefetch_amount,
+											 wal_max_prefetch_amount,
+											 &hash_ctl,
+											 HASH_ELEM);
+
+		/*
+		 * Initialize memory for each worker slot.
+		 */
+		for (i = 0; i < wal_prefetch_workers; ++i)
+		{
+			RestoreSlot *slot = &RestoreData->slots[i];
+
+			memset(slot, 0, sizeof(RestoreSlot));
+			InitSharedLatch(&slot->continuePrefetching);
+			InitSharedLatch(&slot->workerReady);
+		}
+
+		/* Create or clear temporary wals. */
+		PathNameCreateTemporaryDir(XLOGDIR, PREFETCH_DIR);
+		RemovePgTempFilesInDir(PREFETCH_DIR, true, true);
+	}
+}
+
+/*
+ * Iterate along bgworkers slots and notify everyone bgworker process
+ * waiting on the continuePrefetching Latch to resume retrieving of WAL files
+ * from archive.
+ */
+static void
+ResumePrefetching()
+{
+	unsigned	i;
+
+	for (i = 0; i < wal_prefetch_workers; ++i)
+	{
+		SetLatch(&RestoreData->slots[i].continuePrefetching);
+	}
+}
+
+/*
+ * This function is counterpart of RestoreArchivedFile function with xlogs
+ * pre-fetching.
+ *
+ * On success the requested WAL file has been retrieved from archive.
+ * Invocation of this function also initiates loading of WAL files that
+ * will be required later. For this goal several brworker processes
+ * are started and perform loading of WAL files. A name of file to start
+ * loading is assigned to every background worker process together with
+ * a delta value that will be applied to a segment number of a WAL file just
+ * received in order to calculate a next file name to pre-load.
+ *
+ * A number of background workers started for WAL files loading is determined
+ * by the new GUC parameter wal_prefetch_workers. A number of WAL files to
+ * prefetch is limited by the new GUC parameter wal_max_prefetch_amount.
+ * If wal_max_prefetch_amount has value 0 no background worker processes
+ * are started and WAL files preloading is not performed. In this case regular
+ * (in one by one manner) loading of WAL files is performed.
+ *
+ * Input:
+ *		path - the path to a WAL file retrieved from archive
+ *		xlogfname - a name of WAL file to retrieve from archive
+ *		recovername - the directory name where a retrieved WAL file
+ *					  has to be placed
+ *		expectedSize - expected size of the requested WAL file
+ *		cleanupEnabled - true if start recovering from last restart point,
+ *						 false if start recovering from the very outset.
+ *	Return:
+ *		true on success, false on error
+ */
+bool
+RestoreCommandXLog(char *path, const char *xlogfname, const char *recovername,
+				   const off_t expectedSize, bool cleanupEnabled)
+{
+	char		xlogpath[MAXPGPATH];
+	bool		prefetchedFileNotFound,
+				wal_file_already_processed;
+	int			nprefetched;
+	PrefetchedFileEntry *foundEntry;
+
+	/*
+	 * Synchronous mode.
+	 */
+	if (wal_max_prefetch_amount < 1)
+		goto fallback;
+
+	/*
+	 * Ignore restore_command when not in archive recovery (meaning we are in
+	 * crash recovery).
+	 */
+	if (!ArchiveRecoveryRequested)
+		goto fallback;
+
+	/*
+	 * In standby mode, restore_command might not be supplied.
+	 */
+	if (recoveryRestoreCommand == NULL ||
+		strcmp(recoveryRestoreCommand, "") == 0)
+		goto fallback;
+
+	/*
+	 * Create the last restart point file name.
+	 */
+	XLogFileNameLastPoint(RestoreData->pointfname, cleanupEnabled);
+
+	/*
+	 * Run WAL pre-fetching processes if they haven't been started yet.
+	 */
+	StartWALPrefetchWorkers(xlogfname);
+
+	/*
+	 * We shouldn't need anything earlier than the last restart point.
+	 */
+	Assert(strcmp(RestoreData->pointfname, xlogfname) <= 0);
+
+	/*
+	 * Make prefetched path for file.
+	 */
+	XLogFilePathPrefetch(xlogpath, xlogfname);
+
+	/*
+	 * Wait until file be retrieved from archive.
+	 */
+	if (!WaitUntilFileRetrieved(xlogfname, &wal_file_already_processed))
+	{
+		/*
+		 * WaitUntilFileRetrieved() returns false in case there is no more WAL
+		 * files to retrieve.
+		 */
+		snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
+		return false;
+	}
+
+	if (wal_file_already_processed)
+		return false;
+
+	/*
+	 * Make sure the file is really there now and has the correct size.
+	 */
+	if (!FileValidateSize(xlogpath, expectedSize, xlogfname,
+						  &prefetchedFileNotFound))
+	{
+		if (prefetchedFileNotFound)
+			snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
+		else
+			/* Remove artifacts. */
+			FileUnlink(xlogpath);
+
+		return false;
+	}
+
+	/*
+	 * Move file to target path.
+	 */
+	snprintf(path, MAXPGPATH, XLOGDIR "/%s", recovername);
+	durable_rename(xlogpath, path, ERROR);
+
+	/*
+	 * Decrease by one a number of prefetched files and wake up any of
+	 * pre-fetching processes suspended on this latch.
+	 */
+	SpinLockAcquire(&RestoreData->lock);
+
+	Assert(RestoreData->nprefetched > 0);
+
+	nprefetched = RestoreData->nprefetched;
+	RestoreData->nprefetched = RestoreData->nprefetched - 1;
+
+	/*
+	 * Check whether a number of already prefetched files greater or equal the
+	 * limit wal_max_prefetch_amount and whether this number dropped below the
+	 * limit after its decrement.
+	 */
+	if (nprefetched >= wal_max_prefetch_amount &&
+		RestoreData->nprefetched < wal_max_prefetch_amount)
+
+		/*
+		 * The value of RestoreData->nprefetched dropped below the
+		 * wal_max_prefetch_amount limit, signal background processes to
+		 * continue prefetching of WAL files from archive.
+		 */
+		ResumePrefetching();
+
+	foundEntry =
+	(PrefetchedFileEntry *) hash_search(RestoreData->hashtab, xlogfname,
+										HASH_FIND, NULL);
+
+	foundEntry->file_was_processed = true;
+	SpinLockRelease(&RestoreData->lock);
+
+	/*
+	 * Log message like in RestoreArchivedFile.
+	 */
+	ereport(LOG,
+			(errmsg("restored log file \"%s\" from archive",
+					xlogfname)));
+	return true;
+
+fallback:
+
+	/*
+	 * On any errors - try default implementation
+	 */
+	return RestoreArchivedFile(path, xlogfname, recovername, expectedSize,
+							   cleanupEnabled);
+}
+
+/*
+ * Waiting until a file with the name specified by the parameter xlogfname
+ * be received from archive and written to file system.
+ *
+ * Input:
+ *		xlogfname - a name of file to wait for delivering from archive
+ * Return:
+ *		false in case there is no more file in archive to retrieve, else true
+ */
+static bool
+WaitUntilFileRetrieved(const char *xlogfname, bool *wal_file_processed)
+{
+	bool		found;
+
+	do
+	{
+		PrefetchedFileEntry *foundEntry;
+
+		SpinLockAcquire(&RestoreData->lock);
+
+		/*
+		 * Check whether the file name does exist in the hash table. If it
+		 * does then restore_command was executed on behalf of this file name
+		 * and a file was probably copied to a destination directory. The
+		 * actual presence of the file in the destination directory is
+		 * determined by the the data member file_exist of the structure
+		 * PrefetchedFileEntry.
+		 */
+		foundEntry =
+			(PrefetchedFileEntry *) hash_search(RestoreData->hashtab, xlogfname,
+												HASH_FIND, NULL);
+
+		if (foundEntry != NULL)
+		{
+			/*
+			 * The data member file_exist of the structure PrefetchedFileEntry
+			 * has the false value if restore_command was executed but the
+			 * file wasn't copied to a destination directory by some reason,
+			 * e.g. since no more file exist in archive.
+			 */
+			found = foundEntry->file_exist;
+			*wal_file_processed = foundEntry->file_was_processed;
+
+			SpinLockRelease(&RestoreData->lock);
+			break;
+		}
+		SpinLockRelease(&RestoreData->lock);
+
+		/*
+		 * There is no an entry in hash table corresponding to a name
+		 * specified by the parameter xlogfname. Wait on the latch
+		 * RestoreData->fileAvailable located in the shared memory until a
+		 * file be retrieved from archive. bgworker processes run for
+		 * delivering WAL files from archive will trigger this latch every
+		 * time a new WAL file be delivered.
+		 */
+		(void) WaitLatch(&RestoreData->fileAvailable,
+						 WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
+						 0, PG_WAIT_EXTENSION);
+		ResetLatch(&RestoreData->fileAvailable);
+		CHECK_FOR_INTERRUPTS();
+	}
+	while (true);
+
+	return found;
+}
+
+/*
+ * Insert a file name into the hash table and wake up a thread that is waiting
+ * until file retrieved from archive.
+ *
+ * Input:
+ *		xlogfname - name of pre-fetched file.
+ */
+static void
+SignalFileDelivered(const char *xlogfname)
+{
+	PrefetchedFileEntry newFileEntry;
+	PrefetchedFileEntry *insertedElement;
+
+#ifdef USE_ASSERT_CHECKING
+	bool		found = false;
+#endif
+
+	strcpy(newFileEntry.key.xlogfname, xlogfname);
+
+	SpinLockAcquire(&RestoreData->lock);
+
+	/*
+	 * Add the new file name to the hash of file names that have been already
+	 * delivered from archive. Out of memory error is reported by ereport, so
+	 * it is not required to check the return value of hash_search().
+	 */
+#ifdef USE_ASSERT_CHECKING
+	insertedElement = hash_search(RestoreData->hashtab, &newFileEntry,
+								  HASH_ENTER, &found);
+
+	/*
+	 * An entry with such a key mustn't exist in the hash table at the tim of
+	 * insertion. If it does something really wrong happened.
+	 */
+	Assert(!found);
+#else
+	insertedElement = hash_search(RestoreData->hashtab, &newFileEntry,
+								  HASH_ENTER, NULL);
+#endif
+
+	insertedElement->file_exist = true;
+	insertedElement->file_was_processed = false;
+
+	/*
+	 * Increase by one a number of pre-fetched files
+	 */
+	RestoreData->nprefetched = RestoreData->nprefetched + 1;
+
+	/*
+	 * Wake up the thread that executing RestoreCommandXLog() to continue
+	 * processing of WAL files.
+	 */
+	SetLatch(&RestoreData->fileAvailable);
+
+	SpinLockRelease(&RestoreData->lock);
+}
+
+/*
+ * Mark that the specified WAL file doesn't exist in archive and wake up
+ * a thread that is waiting in RestoreCommandXLog() to finish WAL file
+ * processing.
+ */
+static void
+SignalFileNotExist(const char *xlogfname)
+{
+	PrefetchedFileEntry *insertedElement;
+	PrefetchedFileEntry newFileEntry;
+#ifdef USE_ASSERT_CHECKING
+	bool		found = false;
+#endif
+
+	strcpy(newFileEntry.key.xlogfname, xlogfname);
+
+	SpinLockAcquire(&RestoreData->lock);
+
+	/*
+	 * Add the new file name to the hash of file names that have been already
+	 * delivered from archive. Out of memory error is reported by ereport, so
+	 * it is not required to check the return value of hash_search().
+	 */
+#ifdef USE_ASSERT_CHECKING
+	insertedElement = hash_search(RestoreData->hashtab, &newFileEntry,
+								  HASH_ENTER, &found);
+
+	/*
+	 * We tried to add the new file name and discovered that such file does
+	 * already exist. It seems something wrong happens.
+	 */
+	Assert(!found);
+	insertedElement->file_exist = false;
+	insertedElement->file_was_processed = false;
+
+	Assert(insertedElement->file_exist == false &&
+		   strcmp(insertedElement->key.xlogfname, xlogfname) == 0);
+#else
+	insertedElement = hash_search(RestoreData->hashtab, &newFileEntry,
+								  HASH_ENTER, NULL);
+	insertedElement->file_exist = false;
+	insertedElement->file_was_processed = false;
+#endif
+
+	/*
+	 * Wake up the thread executing RestoreCommandXLog() to finish WAL files
+	 * processing since no more files left in archive.
+	 */
+	SetLatch(&RestoreData->fileAvailable);
+
+	SpinLockRelease(&RestoreData->lock);
+}
+
+/*
+ * Check whether a limit imposed by the GUC parameter wal_max_prefetch_amount
+ * has been exceeded on prefetching of WAL files and suspend further
+ * downloading of WAL files until a notification be received to resume it.
+ * Input:
+ *			bgwid - an index of bgworker process that has to suspend prefetching of
+ *			WAL files.
+ */
+static void
+SuspendPrefetchingIfRequired(uint16 bgwid)
+{
+	while (RestoreData->nprefetched >= wal_max_prefetch_amount)
+	{
+		/*
+		 * If a number of already pre-fetched WAL files exceeds a limit
+		 * imposed by the GUC parameter 'wal_max_prefetch_amount', suspend
+		 * execution until some of already retrieved WAL files be processed
+		 * and a number of pre-fetched files dropped below this limit.
+		 */
+		(void) WaitLatch(&RestoreData->slots[bgwid].continuePrefetching,
+						 WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
+						 0, PG_WAIT_EXTENSION);
+		ResetLatch(&RestoreData->slots[bgwid].continuePrefetching);
+		CHECK_FOR_INTERRUPTS();
+	}
+}
+
+/*
+ * The main entry point for bgworker.
+ * Input:
+ *		main_arg -	id value assigned to each pre-fetching bgworker process.
+ *				This value is used both as an index in array of active bgworker
+ *				processes and for calculating the number of the first segment
+ *				from that to start WAL files pre-fetching by corresponding
+ *				bgworker process.
+ *
+ * This function returns the control flow if a file that currently
+ * being processed is not found, meaning that all files were already delivered
+ * from archive and the requested file is one that was never stored
+ * in the archive.
+ */
+void
+WALPrefetchWorkerMain(Datum main_arg)
+{
+	int			rc;
+	char		xlogpath[MAXPGPATH];
+	char		xlogfnext[MAXFNAMELEN];
+	unsigned	increment;
+	XLogSegNo	nextSegNo;
+	uint16		bgwid;
+
+	/* Establish signal handlers. */
+	pqsignal(SIGTERM, die);
+	/* We're now ready to receive signals. */
+	BackgroundWorkerUnblockSignals();
+
+	/* Get RestoreSlot */
+	bgwid = DatumGetUInt16(main_arg);
+
+	nextSegNo = RestoreData->restartSegNo + bgwid;
+	increment = wal_prefetch_workers;
+
+	OwnLatch(&RestoreData->slots[bgwid].continuePrefetching);
+
+	/*
+	 * Notify invoker that the WAL files prefetching worker has just been
+	 * successfully started.
+	 */
+	RestoreData->slots[bgwid].workerStarted = true;
+	SetLatch(&RestoreData->slots[bgwid].workerReady);
+
+	while (true)
+	{
+		SuspendPrefetchingIfRequired(bgwid);
+
+		XLogFileName(xlogfnext, RestoreData->restartTli, nextSegNo,
+					 wal_segment_size);
+
+		/* Prepare path. */
+		XLogFilePathPrefetch(xlogpath, xlogfnext);
+
+		/*
+		 * Make sure there is no such file in a directory for prefetched
+		 * files.
+		 */
+		FileUnlink(xlogpath);
+
+		/* Prepare and execute the restore command. */
+		if ((rc = DoRestore(xlogpath, xlogfnext, RestoreData->pointfname)))
+		{
+			FileUnlink(xlogpath);
+
+			if (wait_result_is_any_signal(rc, true))
+				proc_exit(1);
+
+			if (!FilePathExists(xlogpath))
+				SignalFileNotExist(xlogfnext);
+			else
+
+				/*
+				 * Although execution of external program specified by the GUC
+				 * parameter 'restore_command' can failed since there is no
+				 * such file in archive, a file with this name can exist in
+				 * prefetch directory since it left from the last server start
+				 * up. If it's true put the file name into the hash table and
+				 * wake up the thread that is waiting in RestoreCommandXLog()
+				 * to continue WAL file processing.
+				 */
+				SignalFileDelivered(xlogfnext);
+
+			ereport(INFO,
+					(errmsg("could not restore file \"%s\" from archive: %s",
+							xlogfnext, wait_result_to_str(rc))));
+
+			break;
+		}
+		CHECK_FOR_INTERRUPTS();
+
+		/*
+		 * Check that file has been really written to file system and if it
+		 * does then wake up the thread that is waiting in
+		 * RestoreCommandXLog() to continue WAL file processing.
+		 */
+		if (FilePathExists(xlogpath))
+		{
+			SignalFileDelivered(xlogfnext);
+
+			ereport(INFO, errmsg("The file %s was retrieved to \"%s\"",
+								 xlogfnext, xlogpath));
+		}
+		else
+		{
+			/*
+			 * DoRestore() finished with success that means invocation of
+			 * system() API function completed without error. On the other
+			 * hand, the requested file is not found. That means something
+			 * wrong happened with script run by the API function system(),
+			 * e.g. the script doens't do really something useful, or may be
+			 * it put a file to a wrong destination. Anyway, it is time to
+			 * exit and give a chance to system administrator to fix the
+			 * issue.
+			 */
+			SignalFileNotExist(xlogfnext);
+
+			ereport(INFO, errmsg("The file %s is not found", xlogfnext));
+			break;
+		}
+
+		nextSegNo = nextSegNo + increment;
+	}
+	proc_exit(0);
+}
+
+/*
+ * Setup and spawn bgworker to prefetch WAL files from archive.
+ *
+ * Input:
+ *		bgwid - sequence number of bgworker process we are going to spawn
+ *
+ * Returns true on success and false on failure.
+ */
+static bool
+SpawnWALPrefetchWorker(uint16 bgwid)
+{
+	BackgroundWorker bgw;
+	RestoreSlot *slot = &RestoreData->slots[bgwid];
+
+	memset(&bgw, 0, sizeof(bgw));
+	snprintf(bgw.bgw_name, sizeof(bgw.bgw_name), "WAL prefetching worker #%d",
+			 bgwid);
+
+	/*
+	 * Length of the string literal "Restore Command Worker" is less than size
+	 * of a buffer referenced by the data member bgw.bgw_type (the size is
+	 * limited by the constant BGW_MAXLEN that currently has value 96).
+	 * Therefore we can use function strcpy() instead of strncpy/strlcpy to
+	 * copy the string literal into the buffer bgw.bgw_type. The same is true
+	 * for other two string literals "postgres" and "RestoreCommandWorkerMain"
+	 * and their corresponding destination buffers referenced by the data
+	 * members bgw.bgw_library_name, bgw.bgw_function_name. To guards against
+	 * further possible change of limit represented by the constant BGW_MAXLEN
+	 * the asserts have been inserted before invoking of the function strcpy()
+	 * as a sanity check. In case some of these asserts be fired it means that
+	 * some really drastic change was done in the core source code that should
+	 * be carefully studied.
+	 */
+	Assert(sizeof(bgw.bgw_type) >= sizeof("WAL files pre-fetching Worker"));
+	Assert(sizeof(bgw.bgw_library_name) >= sizeof("postgres"));
+	Assert(sizeof(bgw.bgw_function_name) >= sizeof("WALPrefetchWorkerMain"));
+
+	strcpy(bgw.bgw_type, "WAL files pre-fetching Worker");
+	strcpy(bgw.bgw_library_name, "postgres");
+	strcpy(bgw.bgw_function_name, "WALPrefetchWorkerMain");
+
+	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
+
+	/*
+	 * BgWorkerStart_PostmasterStart for PM_RECOVERY, PM_STARTUP
+	 * BgWorkerStart_ConsistentState for PM_HOT_STANDBY
+	 */
+	bgw.bgw_start_time = HotStandbyActive() ? BgWorkerStart_ConsistentState :
+		BgWorkerStart_PostmasterStart;
+
+	bgw.bgw_restart_time = BGW_NEVER_RESTART;
+
+	/*
+	 * The value of bgw.bgw_main_arg is passed as an argument to the function
+	 * WALPrefetchWorkerMain()
+	 */
+	bgw.bgw_main_arg = UInt16GetDatum(bgwid);
+	bgw.bgw_notify_pid = MyProcPid;
+
+	return RegisterDynamicBackgroundWorker(&bgw, &slot->bgwhandle);
+}
+
+/*
+ * Terminate bgworker process whose slot addressed by specified index and
+ * free memory allocated for the slot.
+ *
+ * Input:
+ *		slot_idx -	index of a slot in the array RestoreData->slot[] that
+ *					contains data about bgworker process.
+ */
+static void
+ShutdownWALPrefetchWorker(uint16 slot_idx)
+{
+	RestoreSlot *slot = &RestoreData->slots[slot_idx];
+
+	if (slot->bgwhandle != NULL)
+	{
+		TerminateBackgroundWorker(slot->bgwhandle);
+		pfree(slot->bgwhandle);
+		slot->bgwhandle = NULL;
+	}
+}
+
+/*
+ * Stop every WAL prefetching process started from the last spawned one
+ * specified by the parameter failed_process_idx. This function is called
+ * either on postmaster shutdown or on postmaster starting up in case some of
+ * WAL prefetching workers failed to start.
+ *
+ * Input:
+ *		failed_process_idx - sequence number (starting from 0) of a bgworker
+ *							 process that failed to start.
+ */
+static void
+ShutdownWALPrefetchWorkers(int last_process_idx)
+{
+	while (last_process_idx > 0)
+		ShutdownWALPrefetchWorker(--last_process_idx);
+
+	WALPrefetchingState = WALPrefetchingShutdown;
+}
+
+/*
+ * Start bgworker processes for retrieving WAL files from archive in
+ * pre-fetching mode. Wait until all spawned processes be run. A number of
+ * bgworker processes to spawn is determined by the GUC parameter
+ * wal_prefetch_workers.
+ *
+ * Input:
+ *		xlogfname - the name of a WAL file from which to start recovery
+ *
+ * Throw error if any of WAL files pre-fetching workers fail to start.
+ */
+static void
+StartWALPrefetchWorkers(const char *xlogfname)
+{
+	int			i;
+
+	if (WALPrefetchingState != WALPrefetchingIsInactive)
+		return;
+
+	XLogFromFileName(xlogfname, &RestoreData->restartTli,
+					 &RestoreData->restartSegNo, wal_segment_size);
+
+	for (i = 0; i < wal_prefetch_workers; ++i)
+		OwnLatch(&RestoreData->slots[i].workerReady);
+
+	OwnLatch(&RestoreData->fileAvailable);
+
+	for (i = 0; i < wal_prefetch_workers; ++i)
+	{
+		if (!SpawnWALPrefetchWorker(i))
+		{
+			ShutdownWALPrefetchWorkers(i);
+			ereport(FATAL,
+					(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+					 errmsg("could not run background process for WAL files "
+							"pre-fetching")));
+		}
+
+	}
+
+	/*
+	 * Wait until all spawned workers will be successfully started.
+	 */
+	for (i = 0; i < wal_prefetch_workers; ++i)
+	{
+		while (!RestoreData->slots[i].workerStarted)
+		{
+			(void) WaitLatch(&RestoreData->slots[i].workerReady,
+							 WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
+							 0, PG_WAIT_EXTENSION);
+			ResetLatch(&RestoreData->slots[i].workerReady);
+		}
+	}
+
+	WALPrefetchingState = WALPrefetchingIsActive;
+}
+
+/*
+ * Get a path to the WAL file pre-fetched from archive.
+ */
+static void
+XLogFilePathPrefetch(char *path, const char *xlogfname)
+{
+	snprintf(path, MAXPGPATH, PREFETCH_DIR "/%s", xlogfname);
+}
+
+/*
+ * Check that the path does exist.
+ * Return:
+ *		true if file does exist, else false.
+ * Throw error on failure.
+ */
+static bool
+FilePathExists(const char *xlogpath)
+{
+	struct stat statbuf;
+
+	if (stat(xlogpath, &statbuf) == 0)
+		return true;
+
+	if (errno != ENOENT)
+		ereport(FATAL,
+				(errcode_for_file_access(),
+				 errmsg("could not stat file \"%s\": %m",
+						xlogpath)));
+
+	return false;
+}
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 32a3099c1f4..1b826d0719a 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -18,13 +18,16 @@
 #include "postgres.h"
 
 #include <unistd.h>
+#include <sys/stat.h>
 
 #include "access/timeline.h"
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
+#include "common/archive.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/startup.h"
 #include "storage/smgr.h"
 #include "utils/guc.h"
 #include "utils/hsearch.h"
@@ -979,3 +982,162 @@ WALReadRaiseError(WALReadError *errinfo)
 						(Size) errinfo->wre_req)));
 	}
 }
+
+/*
+ * Remove a file if it does exist.
+ */
+void
+FileUnlink(const char *file_path)
+{
+	struct stat statbuf;
+
+	if (stat(file_path, &statbuf))
+	{
+		if (errno != ENOENT)
+			ereport(FATAL,
+					(errcode_for_file_access(),
+					 errmsg("could not stat file \"%s\": %m",
+							file_path)));
+	}
+	else
+	{
+		if (unlink(file_path) != 0)
+			ereport(FATAL,
+					(errcode_for_file_access(),
+					 errmsg("could not remove file \"%s\": %m",
+							file_path)));
+	}
+}
+
+/*
+ * Get the last valid restart point file name.
+ *
+ * If cleanup is not enabled, initialise the last restart point file name
+ * with InvalidXLogRecPtr, which will prevent the deletion of any WAL files
+ * from the archive because of the alphabetic sorting property of WAL
+ * filenames.
+ */
+void
+XLogFileNameLastPoint(char *lastRestartPointFname, bool cleanupEnabled)
+{
+	XLogSegNo	restartSegNo;
+	XLogRecPtr	restartRedoPtr;
+	TimeLineID	restartTli;
+
+	if (cleanupEnabled)
+	{
+		GetOldestRestartPoint(&restartRedoPtr, &restartTli);
+		XLByteToSeg(restartRedoPtr, restartSegNo, wal_segment_size);
+		XLogFileName(lastRestartPointFname, restartTli, restartSegNo,
+					 wal_segment_size);
+	}
+	else
+		XLogFileName(lastRestartPointFname, 0, 0L, wal_segment_size);
+}
+
+/*
+ * Check a file is really there now and has correct size.
+ *
+ * Return true if the file does exist and has correct size,
+ * else return false.
+ *
+ * If the output variable file_not_found is not null it's assigned
+ * either true or false value depending on whether the file does exist
+ * or not.
+ */
+bool
+FileValidateSize(const char *xlogpath, off_t expectedSize,
+				 const char *xlogfname, bool *file_not_found)
+{
+	struct stat stat_buf;
+
+	if (stat(xlogpath, &stat_buf) == 0)
+	{
+		if (file_not_found)
+			*file_not_found = false;
+
+		if (expectedSize > 0 && stat_buf.st_size != expectedSize)
+		{
+			int			elevel;
+
+			/*
+			 * If we find a partial file in standby mode, we assume it's
+			 * because it's just being copied to the archive, and keep trying.
+			 *
+			 * Otherwise treat a wrong-sized file as FATAL to ensure the DBA
+			 * would notice it, but is that too strong? We could try to plow
+			 * ahead with a local copy of the file ... but the problem is that
+			 * there probably isn't one, and we'd incorrectly conclude we've
+			 * reached the end of WAL and we're done recovering ...
+			 */
+			if (StandbyMode && stat_buf.st_size < expectedSize)
+				elevel = DEBUG1;
+			else
+				elevel = FATAL;
+			ereport(elevel,
+					(errmsg("archive file \"%s\" has wrong size: %lld instead of %lld",
+							xlogfname,
+							(long long int) stat_buf.st_size,
+							(long long int) expectedSize)));
+			return false;
+		}
+		else
+			return true;
+	}
+	else
+	{
+		/* stat failed */
+		if (errno != ENOENT)
+			ereport(FATAL,
+					(errcode_for_file_access(),
+					 errmsg("could not stat file \"%s\": %m",
+							xlogpath)));
+		if (file_not_found)
+			*file_not_found = true;
+
+		return false;
+	}
+
+}
+
+/*
+ * Build and execute restore_command.
+ *
+ * Return the result of command execution (the exit status of the shell),
+ * or -1 if a system error occurred. A return value of 127 means
+ * the execution of the shell failed.
+ */
+int
+DoRestore(const char *xlogpath, const char *xlogfname, const char *pointfname)
+{
+	char	   *xlogRestoreCmd;
+	int			rc;
+
+	/* Build a restore command to execute */
+	xlogRestoreCmd = BuildRestoreCommand(recoveryRestoreCommand, xlogpath,
+										 xlogfname, pointfname);
+
+	if (xlogRestoreCmd == NULL)
+		elog(PANIC, "could not build restore command \"%s\"",
+			 recoveryRestoreCommand);
+
+	ereport(DEBUG3,
+			(errmsg_internal("executing restore command \"%s\"",
+							 xlogRestoreCmd)));
+
+	/*
+	 * Check signals before restore command and reset afterwards.
+	 */
+	PreRestoreCommand();
+
+	/*
+	 * Execute
+	 */
+	rc = system(xlogRestoreCmd);
+
+	PostRestoreCommand();
+
+	pfree(xlogRestoreCmd);
+
+	return rc;
+}
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 5a9a0e34353..7e2b2db6a27 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "access/parallel.h"
+#include "access/xlogrestore.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -128,6 +129,9 @@ static const struct
 	},
 	{
 		"ApplyWorkerMain", ApplyWorkerMain
+	},
+	{
+		"WALPrefetchWorkerMain", WALPrefetchWorkerMain
 	}
 };
 
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 96c2aaabbd6..93e0167454f 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
 #include "access/subtrans.h"
 #include "access/syncscan.h"
 #include "access/twophase.h"
+#include "access/xlogrestore.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -149,6 +150,7 @@ CreateSharedMemoryAndSemaphores(void)
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+		size = add_size(size, RestoreCommandShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -259,6 +261,7 @@ CreateSharedMemoryAndSemaphores(void)
 	WalSndShmemInit();
 	WalRcvShmemInit();
 	ApplyLauncherShmemInit();
+	RestoreCommandShmemInit();
 
 	/*
 	 * Set up other modules that need some shared memory space
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 02d2d267b5c..8e62bbe3014 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -37,6 +37,7 @@
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/xlogrestore.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_authid.h"
 #include "catalog/storage.h"
@@ -225,6 +226,8 @@ static bool check_recovery_target_lsn(char **newval, void **extra, GucSource sou
 static void assign_recovery_target_lsn(const char *newval, void *extra);
 static bool check_primary_slot_name(char **newval, void **extra, GucSource source);
 static bool check_default_with_oids(bool *newval, void **extra, GucSource source);
+static bool check_wal_prefetch_workers(int *newval, void **extra,
+									   GucSource source);
 
 /* Private functions in guc-file.l that need to be called from guc.c */
 static ConfigVariable *ProcessConfigFileInternal(GucContext context,
@@ -3399,13 +3402,38 @@ static struct config_int ConfigureNamesInt[] =
 		check_huge_page_size, NULL, NULL
 	},
 
+	{
+		{"wal_max_prefetch_amount",
+			PGC_POSTMASTER,
+			WAL_ARCHIVE_RECOVERY,
+			gettext_noop("Set a max number of WAL files to keep up prefetched "
+						 "from archive"),
+			NULL
+		},
+		&wal_max_prefetch_amount,
+		DEFAULT_WAL_MAX_PREFETCH_AMOUNT, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"wal_prefetch_workers",
+			PGC_POSTMASTER,
+			WAL_ARCHIVE_RECOVERY,
+			gettext_noop("Set a number of background workers to run for "
+						 "prefetching WAL files from archive"),
+			NULL
+		},
+		&wal_prefetch_workers,
+		DEFAULT_TOTAL_PREFETCH_WORKERS, 0, MAX_BACKENDS,
+		check_wal_prefetch_workers, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
 	}
 };
 
-
 static struct config_real ConfigureNamesReal[] =
 {
 	{
@@ -11662,6 +11690,20 @@ check_max_worker_processes(int *newval, void **extra, GucSource source)
 	return true;
 }
 
+static bool
+check_wal_prefetch_workers(int *newval, void **extra, GucSource source)
+{
+	if (*newval > max_worker_processes)
+	{
+		GUC_check_errdetail("A value of wal_prefetch_workers can't exceed "
+							"a value of max_worker_processes=%d",
+							max_worker_processes);
+		return false;
+	}
+
+	return true;
+}
+
 static bool
 check_effective_io_concurrency(int *newval, void **extra, GucSource source)
 {
diff --git a/src/include/access/xlogrestore.h b/src/include/access/xlogrestore.h
new file mode 100644
index 00000000000..c657dd8c6e2
--- /dev/null
+++ b/src/include/access/xlogrestore.h
@@ -0,0 +1,43 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogrestore.h
+ *		Prototypes and definitions for parallel restore commands execution
+ *
+ * Copyright (c) 2020, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/include/access/xlogrestore.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef XLOGRESTORE_H
+#define XLOGRESTORE_H
+
+#include "postgres.h"
+
+/* GUC variables */
+extern int	wal_max_prefetch_amount;
+extern int	wal_prefetch_workers;
+
+/*
+ * Default max number of WAL files for pre-fetching from archive.
+ * Zero value means that WAL files prefetching is turned off by default.
+ */
+#define DEFAULT_WAL_MAX_PREFETCH_AMOUNT	0
+
+/*
+ * Default value for a number of prefetch workers spawned by postmaster
+ * on server startup for database recovering from archive.
+ */
+#define DEFAULT_TOTAL_PREFETCH_WORKERS 2
+
+extern Size RestoreCommandShmemSize(void);
+extern void RestoreCommandShmemInit(void);
+extern bool RestoreCommandXLog(char *path, const char *xlogfname,
+							   const char *recovername,
+							   const off_t expectedSize,
+							   bool cleanupEnabled);
+extern void WALPrefetchWorkerMain(Datum main_arg) pg_attribute_noreturn();
+
+#endif							/* XLOGRESTORE_H */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index e59b6cf3a9f..98078518e88 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -60,4 +60,13 @@ extern void XLogReadDetermineTimeline(XLogReaderState *state,
 
 extern void WALReadRaiseError(WALReadError *errinfo);
 
+extern void FileUnlink(const char *xlogpath);
+extern void XLogFileNameLastPoint(char *lastRestartPointFname,
+								  bool cleanupEnabled);
+extern bool FileValidateSize(char const *xlogpath, off_t expectedSize,
+							 char const *xlogfname, bool *file_not_found);
+
+extern int	DoRestore(char const *xlogpath, char const *xlogfname,
+					  char const *pointfname);
+
 #endif
diff --git a/src/test/recovery/t/021_xlogrestore.pl b/src/test/recovery/t/021_xlogrestore.pl
new file mode 100644
index 00000000000..df5d573a4a7
--- /dev/null
+++ b/src/test/recovery/t/021_xlogrestore.pl
@@ -0,0 +1,138 @@
+#
+# Test for xlogrestore with wal_max_prefetch_amount parameter
+#
+
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 9;
+
+sub measure_replica_restore_time
+{
+	my ( $replica_name, $node_primary, $backup_name, $last_lsn, $tab_int_count, $config ) = @_;
+	my $timer = time();
+
+	# Initialize replica node from backup, fetching WAL from archives
+	my $node_replica = get_new_node( $replica_name );
+	$node_replica->init_from_backup( $node_primary, $backup_name,
+		has_restoring => 1 );
+	$node_replica->append_conf( 'postgresql.conf', $config );
+	$node_replica->start();
+
+	# Wait until necessary replay has been done on replica
+	my $caughtup_query =
+	  "SELECT '$last_lsn'::pg_lsn <= pg_last_wal_replay_lsn()";
+	$node_replica->poll_query_until( 'postgres', $caughtup_query )
+	  or die "Timed out while waiting for replica to catch up";
+
+	# Check tab_int's rows count
+	my $replica_tab_int_count =
+	  $node_replica->safe_psql( 'postgres', "SELECT count(*) FROM tab_int" );
+	is( $replica_tab_int_count, $tab_int_count, 'tab_int sizes are equal' );
+
+	# Check the presence of temporary files specifically generated during
+	# archive recovery.
+	$node_replica->promote();
+
+	my $node_replica_data = $node_replica->data_dir;
+	ok( !-f "$node_replica_data/pg_wal/RECOVERYHISTORY",
+		"RECOVERYHISTORY removed after promotion");
+	ok( !-f "$node_replica_data/pg_wal/RECOVERYXLOG",
+			"RECOVERYXLOG removed after promotion");
+	ok( !-d "$node_replica_data/pg_wal/prefetch",
+		"pg_wal/prefetch dir removed after promotion");
+
+	my $res = time() - $timer;
+
+	$node_replica->stop();
+	return $res;
+}
+
+# WAL produced count
+my $wal_count = 64;
+
+# Size of data portion
+my $wal_data_portion = 128;
+
+# Sleep to imitate restore delays
+my $restore_sleep = 0.256;
+
+# Initialize primary node, doing archives
+my $node_primary = get_new_node( 'primary' );
+$node_primary->init(
+	has_archiving    => 1,
+	allows_streaming => 1
+);
+
+# Start it
+$node_primary->start;
+
+# Take backup for replica.
+my $backup_name = 'my_backup';
+$node_primary->backup( $backup_name );
+
+# Create some content on primary server that won't be present on replicas.
+for ( my $i = 0; $i < $wal_count; $i++ )
+{
+	if ( $i == 0 ) {
+		$node_primary->safe_psql('postgres',
+			"CREATE TABLE tab_int ( a SERIAL NOT NULL PRIMARY KEY );")
+	} else {
+		$node_primary->safe_psql('postgres',
+			"INSERT INTO tab_int SELECT FROM generate_series( 1, $wal_data_portion );")
+	}
+	$node_primary->safe_psql('postgres', "SELECT pg_switch_wal()");
+}
+
+my $last_lsn = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn();");
+my $tab_int_count = $node_primary->safe_psql('postgres', "SELECT count(*) FROM tab_int;");
+
+$node_primary->stop();
+
+#	Restore command
+my $restore_command;
+my $path = TestLib::perl2host( $node_primary->archive_dir );
+if ( $TestLib::windows_os ) {
+	$path =~ s{\\}{\\\\}g;
+	$restore_command = qq(perl -e "select( undef, undef, undef, $restore_sleep );" & copy "$path\\\\%f" "%p);
+} else {
+	$restore_command = qq(sleep ) . $restore_sleep . qq( && cp "$path/%f" "%p");
+}
+
+# DEBUG: Don't forget remove it
+diag("restore_command=$restore_command");
+
+# Compare the replica restore times with different max_prefetch_workers value.
+diag('Run database restoring with prefetching of WAL files');
+my $multiple_workers_restore_time = measure_replica_restore_time(
+	'fast_restored_replica',
+	$node_primary,
+	$backup_name,
+	$last_lsn,
+	$tab_int_count,
+qq(
+wal_retrieve_retry_interval = '100ms'
+wal_max_prefetch_amount = 8
+wal_prefetch_workers = 4
+restore_command = '$restore_command'
+log_min_messages = INFO
+));
+
+diag('Run database restoring in regular way without prefetching of WAL files');
+my $single_worker_restore_time = measure_replica_restore_time(
+	'normal_restored_replica',
+	$node_primary,
+	$backup_name,
+	$last_lsn,
+	$tab_int_count,
+qq(
+wal_retrieve_retry_interval = '100ms'
+wal_max_prefetch_amount = 0
+restore_command = '$restore_command'
+));
+
+diag("multiple_workers_restore_time = $multiple_workers_restore_time");
+diag("single_worker_restore_time = $single_worker_restore_time");
+
+ok( $multiple_workers_restore_time < $single_worker_restore_time, "Multiple workers are faster than a single worker" );
diff --git a/src/test/regress/expected/guc.out b/src/test/regress/expected/guc.out
index 811f80a0976..8283816e3c8 100644
--- a/src/test/regress/expected/guc.out
+++ b/src/test/regress/expected/guc.out
@@ -776,3 +776,24 @@ set default_with_oids to f;
 -- Should not allow to set it to true.
 set default_with_oids to t;
 ERROR:  tables declared WITH OIDS are not supported
+--
+-- Check that a value for the new configration parameter
+-- wal_prefetch_workers is limited by a value of the parameter
+-- max_worker_processes
+SHOW max_worker_processes;
+ max_worker_processes 
+----------------------
+ 8
+(1 row)
+
+-- max_worker_processes has default value 8
+-- Check that an attempt to set the parameter wal_prefetch_workers
+-- to a value execeeding this limit results in error
+ALTER SYSTEM SET wal_prefetch_workers = 16; -- fails, it is expected behaviour
+ERROR:  invalid value for parameter "wal_prefetch_workers": 16
+DETAIL:  A value of wal_prefetch_workers can't exceed a value of max_worker_processes=8
+-- Check that a value lesser than max_worker_processes can be assigned
+-- to the parameter wal_prefetch_workers
+ALTER SYSTEM SET wal_prefetch_workers = 7; -- ok since 7 < max_worker_processes
+-- Reset to default
+ALTER SYSTEM RESET wal_prefetch_workers;
diff --git a/src/test/regress/sql/guc.sql b/src/test/regress/sql/guc.sql
index 43dbba3775e..d9215da9ee3 100644
--- a/src/test/regress/sql/guc.sql
+++ b/src/test/regress/sql/guc.sql
@@ -296,3 +296,22 @@ reset check_function_bodies;
 set default_with_oids to f;
 -- Should not allow to set it to true.
 set default_with_oids to t;
+
+--
+-- Check that a value for the new configration parameter
+-- wal_prefetch_workers is limited by a value of the parameter
+-- max_worker_processes
+SHOW max_worker_processes;
+
+-- max_worker_processes has default value 8
+-- Check that an attempt to set the parameter wal_prefetch_workers
+-- to a value execeeding this limit results in error
+
+ALTER SYSTEM SET wal_prefetch_workers = 16; -- fails, it is expected behaviour
+
+-- Check that a value lesser than max_worker_processes can be assigned
+-- to the parameter wal_prefetch_workers
+ALTER SYSTEM SET wal_prefetch_workers = 7; -- ok since 7 < max_worker_processes
+
+-- Reset to default
+ALTER SYSTEM RESET wal_prefetch_workers;
-- 
2.24.3 (Apple Git-128)

#14David Steele
david@pgmasters.net
In reply to: Marina Polyakova (#13)
Re: Reduce the time required for a database recovery from archive.

On 3/19/21 4:32 AM, Marina Polyakova wrote:

On 2021-03-18 18:04, David Steele wrote:

Seems like there should have been a patch attached?

IMO there's a technical problem with sending, receiving (or displaying
on the site) emails from the list pgsql-hackers. By subsribing to this
list I received the attached patch from the email [1]. And my colleague
Roman Zharkov said that the button 'Resend email' from that link helped
him to receive the email with the attached patch. On the other hand
follwing this link in the browser I do not see the attached patch. Do
you think it is worth to write about this issue to
webmaster(dot)postgresql(dot)org?..

Just in case I'm lucky this email contains the lost patch.

You are correct -- if I send the email to myself I can see the patch (I
can also see it on the website in raw form). I usually read the thread
on the website before replying and I did not notice that the actual
email had an attachment.

My guess about why it is not showing up on the website is that the
encoding is a bit unusual (quoted-printable vs base64). Or it may have
to do with the inclusion of an HTML version of the message. You are
welcome to followup with the webmaster and see if this is a known issue.

Regards,
--
-David
david@pgmasters.net