Refactor StartupXLOG?

Started by Thomas Munroover 9 years ago4 messages
#1Thomas Munro
thomas.munro@enterprisedb.com
1 attachment(s)

Hi hackers

StartupXLOG is 1549 lines of code. Its unwieldy size came up in a
discussion in an IRC channel where some PG hackers lurk and I decided
to see how it might be chopped up into subroutines with a clear
purpose and reasonable size, as an exercise.

I came up with the following on a first pass, though obviously you
could go a lot further:

StartupXLOG -- reduced to 651 lines
-> ReportControlFileState() -- 40 lines to log recovery startup message
-> ReportRecoveryTarget() -- 32 lines to log recovery target message
-> ProcessBackupLabel() -- 90 lines to read backup label's checkpoint
-> CleanUpTablespaceMap() -- 33 lines of file cleanup

-> BeginRedo() -- 191 lines for redo setup
-> BeginHotStandby() -- 74 lines for hot standby initialisation
-> ReplayRedo() -- 260 lines for the main redo loop
-> FinishRedo() -- 63 lines for redo completion

-> AssignNewTimeLine() -- 66 lines
-> CleanUpOldTimelineSegments() -- 74 lines of file cleanup
-> RecoveryCheckpoint() -- 71 lines

Unfortunately the attached sketch patch (not tested much) is totally
unreadable as a result of moving so many things around. It would
probably need to be done in a series of small well tested readable
patches moving one thing out at a time. Perhaps with an extra
context/state object to cut down on wide argument lists.

What would the appetite be for that kind of refactoring work,
considering the increased burden on committers who have to backpatch
bug fixes? Is it a project goal to reduce the size of large
complicated functions like StartupXLOG and heap_update? It seems like
a good way for new players to learn how they work.

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

refactor-startupxlog-sketch.patchapplication/octet-stream; name=refactor-startupxlog-sketch.patchDownload
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c1b9a97..5487bfe 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -870,6 +870,16 @@ static void WALInsertLockAcquireExclusive(void);
 static void WALInsertLockRelease(void);
 static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
 
+static void BeginRedo(CheckPoint checkPoint,
+					  XLogRecPtr checkPointLoc, XLogRecPtr RecPtr,
+					  bool haveBackupLabel, bool backupEndRequired,
+					  bool backupFromStandby, bool haveTblspcMap,
+					  bool wasShutdown);
+static bool ReplayRedo(XLogReaderState *xlogreader, CheckPoint checkPoint,
+					   XLogRecPtr RecPtr);
+static void FinishRedo(bool reachedStopPoint);
+static void BeginHotStandby(CheckPoint checkPoint, bool wasShutdown);
+
 /*
  * Insert an XLOG record represented by an already-constructed chain of data
  * chunks.  This is a low-level routine; to construct the WAL record header
@@ -5975,41 +5985,9 @@ CheckRequiredParameterValues(void)
 	}
 }
 
-/*
- * This must be called ONCE during postmaster or standalone-backend startup
- */
-void
-StartupXLOG(void)
+static void
+ReportControlFileState(void)
 {
-	XLogCtlInsert *Insert;
-	CheckPoint	checkPoint;
-	bool		wasShutdown;
-	bool		reachedStopPoint = false;
-	bool		haveBackupLabel = false;
-	bool		haveTblspcMap = false;
-	XLogRecPtr	RecPtr,
-				checkPointLoc,
-				EndOfLog;
-	TimeLineID	EndOfLogTLI;
-	TimeLineID	PrevTimeLineID;
-	XLogRecord *record;
-	TransactionId oldestActiveXID;
-	bool		backupEndRequired = false;
-	bool		backupFromStandby = false;
-	DBState		dbstate_at_startup;
-	XLogReaderState *xlogreader;
-	XLogPageReadPrivate private;
-	bool		fast_promoted = false;
-	struct stat st;
-
-	/*
-	 * Read control file and check XLOG status looks valid.
-	 *
-	 * Note: in most control paths, *ControlFile is already valid and we need
-	 * not do ReadControlFile() here, but might as well do it to be sure.
-	 */
-	ReadControlFile();
-
 	if (ControlFile->state < DB_SHUTDOWNED ||
 		ControlFile->state > DB_IN_PRODUCTION ||
 		!XRecOffIsValid(ControlFile->checkPoint))
@@ -6047,56 +6025,11 @@ StartupXLOG(void)
 		ereport(LOG,
 			  (errmsg("database system was interrupted; last known up at %s",
 					  str_time(ControlFile->time))));
+}
 
-	/* This is just to allow attaching to startup process with a debugger */
-#ifdef XLOG_REPLAY_DELAY
-	if (ControlFile->state != DB_SHUTDOWNED)
-		pg_usleep(60000000L);
-#endif
-
-	/*
-	 * Verify that pg_xlog and pg_xlog/archive_status exist.  In cases where
-	 * someone has performed a copy for PITR, these directories may have been
-	 * excluded and need to be re-created.
-	 */
-	ValidateXLOGDirectoryStructure();
-
-	/*
-	 * If we previously crashed, there might be data which we had written,
-	 * intending to fsync it, but which we had not actually fsync'd yet.
-	 * Therefore, a power failure in the near future might cause earlier
-	 * unflushed writes to be lost, even though more recent data written to
-	 * disk from here on would be persisted.  To avoid that, fsync the entire
-	 * data directory.
-	 */
-	if (ControlFile->state != DB_SHUTDOWNED &&
-		ControlFile->state != DB_SHUTDOWNED_IN_RECOVERY)
-		SyncDataDirectory();
-
-	/*
-	 * Initialize on the assumption we want to recover to the latest timeline
-	 * that's active according to pg_control.
-	 */
-	if (ControlFile->minRecoveryPointTLI >
-		ControlFile->checkPointCopy.ThisTimeLineID)
-		recoveryTargetTLI = ControlFile->minRecoveryPointTLI;
-	else
-		recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
-
-	/*
-	 * Check for recovery control file, and if so set up state for offline
-	 * recovery
-	 */
-	readRecoveryCommandFile();
-
-	/*
-	 * Save archive_cleanup_command in shared memory so that other processes
-	 * can see it.
-	 */
-	strlcpy(XLogCtl->archiveCleanupCommand,
-			archiveCleanupCommand ? archiveCleanupCommand : "",
-			sizeof(XLogCtl->archiveCleanupCommand));
-
+static void
+ReportRecoveryTarget(void)
+{
 	if (ArchiveRecoveryRequested)
 	{
 		if (StandbyModeRequested)
@@ -6126,231 +6059,561 @@ StartupXLOG(void)
 			ereport(LOG,
 					(errmsg("starting archive recovery")));
 	}
+}
+
+static void
+AssignNewTimeline(void)
+{
+	TimeLineID	PrevTimeLineID;
 
 	/*
-	 * Take ownership of the wakeup latch if we're going to sleep during
-	 * recovery.
+	 * Consider whether we need to assign a new timeline ID.
+	 *
+	 * If we are doing an archive recovery, we always assign a new ID.  This
+	 * handles a couple of issues.  If we stopped short of the end of WAL
+	 * during recovery, then we are clearly generating a new timeline and must
+	 * assign it a unique new ID.  Even if we ran to the end, modifying the
+	 * current last segment is problematic because it may result in trying to
+	 * overwrite an already-archived copy of that segment, and we encourage
+	 * DBAs to make their archive_commands reject that.  We can dodge the
+	 * problem by making the new active segment have a new timeline ID.
+	 *
+	 * In a normal crash recovery, we can just extend the timeline we were in.
 	 */
-	if (StandbyModeRequested)
-		OwnLatch(&XLogCtl->recoveryWakeupLatch);
+	PrevTimeLineID = ThisTimeLineID;
+	if (ArchiveRecoveryRequested)
+	{
+		char		reason[200];
 
-	/* Set up XLOG reader facility */
-	MemSet(&private, 0, sizeof(XLogPageReadPrivate));
-	xlogreader = XLogReaderAllocate(&XLogPageRead, &private);
-	if (!xlogreader)
-		ereport(ERROR,
-				(errcode(ERRCODE_OUT_OF_MEMORY),
-				 errmsg("out of memory"),
-		   errdetail("Failed while allocating an XLog reading processor.")));
-	xlogreader->system_identifier = ControlFile->system_identifier;
+		Assert(InArchiveRecovery);
 
-	if (read_backup_label(&checkPointLoc, &backupEndRequired,
-						  &backupFromStandby))
-	{
-		List	   *tablespaces = NIL;
+		ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
+		ereport(LOG,
+				(errmsg("selected new timeline ID: %u", ThisTimeLineID)));
 
 		/*
-		 * Archive recovery was requested, and thanks to the backup label
-		 * file, we know how far we need to replay to reach consistency. Enter
-		 * archive recovery directly.
+		 * Create a comment for the history file to explain why and where
+		 * timeline changed.
 		 */
-		InArchiveRecovery = true;
-		if (StandbyModeRequested)
-			StandbyMode = true;
+		if (recoveryTarget == RECOVERY_TARGET_XID)
+			snprintf(reason, sizeof(reason),
+					 "%s transaction %u",
+					 recoveryStopAfter ? "after" : "before",
+					 recoveryStopXid);
+		else if (recoveryTarget == RECOVERY_TARGET_TIME)
+			snprintf(reason, sizeof(reason),
+					 "%s %s\n",
+					 recoveryStopAfter ? "after" : "before",
+					 timestamptz_to_str(recoveryStopTime));
+		else if (recoveryTarget == RECOVERY_TARGET_LSN)
+			snprintf(reason, sizeof(reason),
+					 "%s LSN %X/%X\n",
+					 recoveryStopAfter ? "after" : "before",
+					 (uint32 ) (recoveryStopLSN >> 32),
+					 (uint32) recoveryStopLSN);
+		else if (recoveryTarget == RECOVERY_TARGET_NAME)
+			snprintf(reason, sizeof(reason),
+					 "at restore point \"%s\"",
+					 recoveryStopName);
+		else if (recoveryTarget == RECOVERY_TARGET_IMMEDIATE)
+			snprintf(reason, sizeof(reason), "reached consistency");
+		else
+			snprintf(reason, sizeof(reason), "no recovery target specified");
 
-		/*
-		 * When a backup_label file is present, we want to roll forward from
-		 * the checkpoint it identifies, rather than using pg_control.
-		 */
-		record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0, true);
-		if (record != NULL)
-		{
-			memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
-			wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
-			ereport(DEBUG1,
-					(errmsg("checkpoint record is at %X/%X",
-				   (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
-			InRecovery = true;	/* force recovery even if SHUTDOWNED */
+		writeTimeLineHistory(ThisTimeLineID, recoveryTargetTLI,
+							 EndRecPtr, reason);
+	}
 
-			/*
-			 * Make sure that REDO location exists. This may not be the case
-			 * if there was a crash during an online backup, which left a
-			 * backup_label around that references a WAL segment that's
-			 * already been archived.
-			 */
-			if (checkPoint.redo < checkPointLoc)
-			{
-				if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false))
-					ereport(FATAL,
-							(errmsg("could not find redo location referenced by checkpoint record"),
-							 errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
-			}
-		}
-		else
-		{
-			ereport(FATAL,
-					(errmsg("could not locate required checkpoint record"),
-					 errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
-			wasShutdown = false;	/* keep compiler quiet */
-		}
+	/* Save the selected TimeLineID in shared memory, too */
+	XLogCtl->ThisTimeLineID = ThisTimeLineID;
+	XLogCtl->PrevTimeLineID = PrevTimeLineID;
+}
 
-		/* read the tablespace_map file if present and create symlinks. */
-		if (read_tablespace_map(&tablespaces))
-		{
-			ListCell   *lc;
+static XLogRecPtr
+RecoveryCheckpoint(XLogReaderState *xlogreader, XLogRecPtr checkPointLoc,
+				   bool fast_promoted)
+{
+	XLogRecord *record;
 
-			foreach(lc, tablespaces)
+	if (InRecovery)
+	{
+		/*
+		 * Perform a checkpoint to update all our recovery activity to disk.
+		 *
+		 * Note that we write a shutdown checkpoint rather than an on-line
+		 * one. This is not particularly critical, but since we may be
+		 * assigning a new TLI, using a shutdown checkpoint allows us to have
+		 * the rule that TLI only changes in shutdown checkpoints, which
+		 * allows some extra error checking in xlog_redo.
+		 *
+		 * In fast promotion, only create a lightweight end-of-recovery record
+		 * instead of a full checkpoint. A checkpoint is requested later,
+		 * after we're fully out of recovery mode and already accepting
+		 * queries.
+		 */
+		if (bgwriterLaunched)
+		{
+			if (fast_promote)
 			{
-				tablespaceinfo *ti = lfirst(lc);
-				char	   *linkloc;
-
-				linkloc = psprintf("pg_tblspc/%s", ti->oid);
+				checkPointLoc = ControlFile->prevCheckPoint;
 
 				/*
-				 * Remove the existing symlink if any and Create the symlink
-				 * under PGDATA.
+				 * Confirm the last checkpoint is available for us to recover
+				 * from if we fail. Note that we don't check for the secondary
+				 * checkpoint since that isn't available in most base backups.
 				 */
-				remove_tablespace_symlink(linkloc);
-
-				if (symlink(ti->path, linkloc) < 0)
-					ereport(ERROR,
-							(errcode_for_file_access(),
-						  errmsg("could not create symbolic link \"%s\": %m",
-								 linkloc)));
-
-				pfree(ti->oid);
-				pfree(ti->path);
-				pfree(ti);
-			}
-
-			/* set flag to delete it later */
-			haveTblspcMap = true;
-		}
+				record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1, false);
+				if (record != NULL)
+				{
+					fast_promoted = true;
 
-		/* set flag to delete it later */
-		haveBackupLabel = true;
+					/*
+					 * Insert a special WAL record to mark the end of
+					 * recovery, since we aren't doing a checkpoint. That
+					 * means that the checkpointer process may likely be in
+					 * the middle of a time-smoothed restartpoint and could
+					 * continue to be for minutes after this. That sounds
+					 * strange, but the effect is roughly the same and it
+					 * would be stranger to try to come out of the
+					 * restartpoint and then checkpoint. We request a
+					 * checkpoint later anyway, just for safety.
+					 */
+					CreateEndOfRecoveryRecord();
+				}
+			}
+
+			if (!fast_promoted)
+				RequestCheckpoint(CHECKPOINT_END_OF_RECOVERY |
+								  CHECKPOINT_IMMEDIATE |
+								  CHECKPOINT_WAIT);
+		}
+		else
+			CreateCheckPoint(CHECKPOINT_END_OF_RECOVERY | CHECKPOINT_IMMEDIATE);
+
+		/*
+		 * And finally, execute the recovery_end_command, if any.
+		 */
+		if (recoveryEndCommand)
+			ExecuteRecoveryCommand(recoveryEndCommand,
+								   "recovery_end_command",
+								   true);
 	}
-	else
+
+	return checkPointLoc;
+}
+
+static void
+CleanUpOldTimelineSegments(XLogRecPtr EndOfLog, TimeLineID EndOfLogTLI)
+{
+	if (ArchiveRecoveryRequested)
 	{
 		/*
-		 * If tablespace_map file is present without backup_label file, there
-		 * is no use of such file.  There is no harm in retaining it, but it
-		 * is better to get rid of the map file so that we don't have any
-		 * redundant file in data directory and it will avoid any sort of
-		 * confusion.  It seems prudent though to just rename the file out of
-		 * the way rather than delete it completely, also we ignore any error
-		 * that occurs in rename operation as even if map file is present
-		 * without backup_label file, it is harmless.
+		 * We switched to a new timeline. Clean up segments on the old
+		 * timeline.
+		 *
+		 * If there are any higher-numbered segments on the old timeline,
+		 * remove them. They might contain valid WAL, but they might also be
+		 * pre-allocated files containing garbage. In any case, they are not
+		 * part of the new timeline's history so we don't need them.
 		 */
-		if (stat(TABLESPACE_MAP, &st) == 0)
-		{
-			unlink(TABLESPACE_MAP_OLD);
-			if (durable_rename(TABLESPACE_MAP, TABLESPACE_MAP_OLD, DEBUG1) == 0)
-				ereport(LOG,
-				(errmsg("ignoring file \"%s\" because no file \"%s\" exists",
-						TABLESPACE_MAP, BACKUP_LABEL_FILE),
-				 errdetail("File \"%s\" was renamed to \"%s\".",
-						   TABLESPACE_MAP, TABLESPACE_MAP_OLD)));
-			else
-				ereport(LOG,
-				(errmsg("ignoring file \"%s\" because no file \"%s\" exists",
-						TABLESPACE_MAP, BACKUP_LABEL_FILE),
-				 errdetail("Could not rename file \"%s\" to \"%s\": %m.",
-						   TABLESPACE_MAP, TABLESPACE_MAP_OLD)));
-		}
+		RemoveNonParentXlogFiles(EndOfLog, ThisTimeLineID);
 
 		/*
-		 * It's possible that archive recovery was requested, but we don't
-		 * know how far we need to replay the WAL before we reach consistency.
-		 * This can happen for example if a base backup is taken from a
-		 * running server using an atomic filesystem snapshot, without calling
-		 * pg_start/stop_backup. Or if you just kill a running master server
-		 * and put it into archive recovery by creating a recovery.conf file.
+		 * If the switch happened in the middle of a segment, what to do with
+		 * the last, partial segment on the old timeline? If we don't archive
+		 * it, and the server that created the WAL never archives it either
+		 * (e.g. because it was hit by a meteor), it will never make it to the
+		 * archive. That's OK from our point of view, because the new segment
+		 * that we created with the new TLI contains all the WAL from the old
+		 * timeline up to the switch point. But if you later try to do PITR to
+		 * the "missing" WAL on the old timeline, recovery won't find it in
+		 * the archive. It's physically present in the new file with new TLI,
+		 * but recovery won't look there when it's recovering to the older
+		 * timeline. On the other hand, if we archive the partial segment, and
+		 * the original server on that timeline is still running and archives
+		 * the completed version of the same segment later, it will fail. (We
+		 * used to do that in 9.4 and below, and it caused such problems).
 		 *
-		 * Our strategy in that case is to perform crash recovery first,
-		 * replaying all the WAL present in pg_xlog, and only enter archive
-		 * recovery after that.
+		 * As a compromise, we rename the last segment with the .partial
+		 * suffix, and archive it. Archive recovery will never try to read
+		 * .partial segments, so they will normally go unused. But in the odd
+		 * PITR case, the administrator can copy them manually to the pg_xlog
+		 * directory (removing the suffix). They can be useful in debugging,
+		 * too.
 		 *
-		 * But usually we already know how far we need to replay the WAL (up
-		 * to minRecoveryPoint, up to backupEndPoint, or until we see an
-		 * end-of-backup record), and we can enter archive recovery directly.
+		 * If a .done or .ready file already exists for the old timeline,
+		 * however, we had already determined that the segment is complete, so
+		 * we can let it be archived normally. (In particular, if it was
+		 * restored from the archive to begin with, it's expected to have a
+		 * .done file).
 		 */
-		if (ArchiveRecoveryRequested &&
-			(ControlFile->minRecoveryPoint != InvalidXLogRecPtr ||
-			 ControlFile->backupEndRequired ||
-			 ControlFile->backupEndPoint != InvalidXLogRecPtr ||
-			 ControlFile->state == DB_SHUTDOWNED))
+		if (EndOfLog % XLOG_SEG_SIZE != 0 && XLogArchivingActive())
 		{
-			InArchiveRecovery = true;
-			if (StandbyModeRequested)
-				StandbyMode = true;
+			char		origfname[MAXFNAMELEN];
+			XLogSegNo	endLogSegNo;
+
+			XLByteToPrevSeg(EndOfLog, endLogSegNo);
+			XLogFileName(origfname, EndOfLogTLI, endLogSegNo);
+
+			if (!XLogArchiveIsReadyOrDone(origfname))
+			{
+				char		origpath[MAXPGPATH];
+				char		partialfname[MAXFNAMELEN];
+				char		partialpath[MAXPGPATH];
+
+				XLogFilePath(origpath, EndOfLogTLI, endLogSegNo);
+				snprintf(partialfname, MAXFNAMELEN, "%s.partial", origfname);
+				snprintf(partialpath, MAXPGPATH, "%s.partial", origpath);
+
+				/*
+				 * Make sure there's no .done or .ready file for the .partial
+				 * file.
+				 */
+				XLogArchiveCleanup(partialfname);
+
+				durable_rename(origpath, partialpath, ERROR);
+				XLogArchiveNotify(partialfname);
+			}
 		}
+	}
+}
+
+static bool
+ProcessBackupLabel(XLogReaderState *xlogreader, CheckPoint *checkPoint,
+				   XLogRecPtr checkPointLoc, bool *haveTblspcMap)
+{
+	XLogRecord *record;
+	bool wasShutdown;
+
+	{
+		List	   *tablespaces = NIL;
 
 		/*
-		 * Get the last valid checkpoint record.  If the latest one according
-		 * to pg_control is broken, try the next-to-last one.
+		 * Archive recovery was requested, and thanks to the backup label
+		 * file, we know how far we need to replay to reach consistency. Enter
+		 * archive recovery directly.
 		 */
-		checkPointLoc = ControlFile->checkPoint;
-		RedoStartLSN = ControlFile->checkPointCopy.redo;
-		record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1, true);
+		InArchiveRecovery = true;
+		if (StandbyModeRequested)
+			StandbyMode = true;
+
+		/*
+		 * When a backup_label file is present, we want to roll forward from
+		 * the checkpoint it identifies, rather than using pg_control.
+		 */
+		record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0, true);
 		if (record != NULL)
 		{
+			memcpy(checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
+			wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
 			ereport(DEBUG1,
 					(errmsg("checkpoint record is at %X/%X",
 				   (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
-		}
-		else if (StandbyMode)
-		{
+			InRecovery = true;	/* force recovery even if SHUTDOWNED */
+
 			/*
-			 * The last valid checkpoint record required for a streaming
-			 * recovery exists in neither standby nor the primary.
+			 * Make sure that REDO location exists. This may not be the case
+			 * if there was a crash during an online backup, which left a
+			 * backup_label around that references a WAL segment that's
+			 * already been archived.
 			 */
-			ereport(PANIC,
-					(errmsg("could not locate a valid checkpoint record")));
+			if (checkPoint->redo < checkPointLoc)
+			{
+				if (!ReadRecord(xlogreader, checkPoint->redo, LOG, false))
+					ereport(FATAL,
+							(errmsg("could not find redo location referenced by checkpoint record"),
+							 errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
+			}
 		}
 		else
 		{
-			checkPointLoc = ControlFile->prevCheckPoint;
-			record = ReadCheckpointRecord(xlogreader, checkPointLoc, 2, true);
-			if (record != NULL)
+			ereport(FATAL,
+					(errmsg("could not locate required checkpoint record"),
+					 errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
+			wasShutdown = false;	/* keep compiler quiet */
+		}
+
+		/* read the tablespace_map file if present and create symlinks. */
+		if (read_tablespace_map(&tablespaces))
+		{
+			ListCell   *lc;
+
+			foreach(lc, tablespaces)
 			{
-				ereport(LOG,
-						(errmsg("using previous checkpoint record at %X/%X",
-				   (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
-				InRecovery = true;		/* force recovery even if SHUTDOWNED */
+				tablespaceinfo *ti = lfirst(lc);
+				char	   *linkloc;
+
+				linkloc = psprintf("pg_tblspc/%s", ti->oid);
+
+				/*
+				 * Remove the existing symlink if any and Create the symlink
+				 * under PGDATA.
+				 */
+				remove_tablespace_symlink(linkloc);
+
+				if (symlink(ti->path, linkloc) < 0)
+					ereport(ERROR,
+							(errcode_for_file_access(),
+						  errmsg("could not create symbolic link \"%s\": %m",
+								 linkloc)));
+
+				pfree(ti->oid);
+				pfree(ti->path);
+				pfree(ti);
 			}
-			else
-				ereport(PANIC,
-					 (errmsg("could not locate a valid checkpoint record")));
+
+			/* set flag to delete it later */
+			*haveTblspcMap = true;
 		}
-		memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
-		wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
 	}
 
-	/*
-	 * Clear out any old relcache cache files.  This is *necessary* if we do
-	 * any WAL replay, since that would probably result in the cache files
-	 * being out of sync with database reality.  In theory we could leave them
-	 * in place if the database had been cleanly shut down, but it seems
-	 * safest to just remove them always and let them be rebuilt during the
-	 * first backend startup.  These files needs to be removed from all
-	 * directories including pg_tblspc, however the symlinks are created only
-	 * after reading tablespace_map file in case of archive recovery from
-	 * backup, so needs to clear old relcache files here after creating
-	 * symlinks.
-	 */
-	RelationCacheInitFileRemove();
+	return wasShutdown;
+}
 
-	/*
-	 * If the location of the checkpoint record is not on the expected
-	 * timeline in the history of the requested timeline, we cannot proceed:
-	 * the backup is not part of the history of the requested timeline.
-	 */
-	Assert(expectedTLEs);		/* was initialized by reading checkpoint
-								 * record */
-	if (tliOfPointInHistory(checkPointLoc, expectedTLEs) !=
-		checkPoint.ThisTimeLineID)
-	{
+static void
+CleanUpTablespaceMap(void)
+{
+	struct stat st;
+
+	{
+		/*
+		 * If tablespace_map file is present without backup_label file, there
+		 * is no use of such file.  There is no harm in retaining it, but it
+		 * is better to get rid of the map file so that we don't have any
+		 * redundant file in data directory and it will avoid any sort of
+		 * confusion.  It seems prudent though to just rename the file out of
+		 * the way rather than delete it completely, also we ignore any error
+		 * that occurs in rename operation as even if map file is present
+		 * without backup_label file, it is harmless.
+		 */
+		if (stat(TABLESPACE_MAP, &st) == 0)
+		{
+			unlink(TABLESPACE_MAP_OLD);
+			if (durable_rename(TABLESPACE_MAP, TABLESPACE_MAP_OLD, DEBUG1) == 0)
+				ereport(LOG,
+				(errmsg("ignoring file \"%s\" because no file \"%s\" exists",
+						TABLESPACE_MAP, BACKUP_LABEL_FILE),
+				 errdetail("File \"%s\" was renamed to \"%s\".",
+						   TABLESPACE_MAP, TABLESPACE_MAP_OLD)));
+			else
+				ereport(LOG,
+				(errmsg("ignoring file \"%s\" because no file \"%s\" exists",
+						TABLESPACE_MAP, BACKUP_LABEL_FILE),
+				 errdetail("Could not rename file \"%s\" to \"%s\": %m.",
+						   TABLESPACE_MAP, TABLESPACE_MAP_OLD)));
+		}
+	}
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend startup
+ */
+void
+StartupXLOG(void)
+{
+	XLogCtlInsert *Insert;
+	CheckPoint	checkPoint;
+	bool		wasShutdown;
+	bool		haveBackupLabel = false;
+	bool		haveTblspcMap = false;
+	XLogRecPtr	RecPtr,
+				checkPointLoc,
+				EndOfLog;
+	TimeLineID	EndOfLogTLI;
+	XLogRecord *record;
+	TransactionId oldestActiveXID;
+	bool		backupEndRequired = false;
+	bool		backupFromStandby = false;
+	XLogReaderState *xlogreader;
+	XLogPageReadPrivate private;
+	bool		fast_promoted = false;
+	bool		reachedStopPoint;
+
+	/*
+	 * Read control file and check XLOG status looks valid.
+	 *
+	 * Note: in most control paths, *ControlFile is already valid and we need
+	 * not do ReadControlFile() here, but might as well do it to be sure.
+	 */
+	ReadControlFile();
+	ReportControlFileState();
+
+	/* This is just to allow attaching to startup process with a debugger */
+#ifdef XLOG_REPLAY_DELAY
+	if (ControlFile->state != DB_SHUTDOWNED)
+		pg_usleep(60000000L);
+#endif
+
+	/*
+	 * Verify that pg_xlog and pg_xlog/archive_status exist.  In cases where
+	 * someone has performed a copy for PITR, these directories may have been
+	 * excluded and need to be re-created.
+	 */
+	ValidateXLOGDirectoryStructure();
+
+	/*
+	 * If we previously crashed, there might be data which we had written,
+	 * intending to fsync it, but which we had not actually fsync'd yet.
+	 * Therefore, a power failure in the near future might cause earlier
+	 * unflushed writes to be lost, even though more recent data written to
+	 * disk from here on would be persisted.  To avoid that, fsync the entire
+	 * data directory.
+	 */
+	if (ControlFile->state != DB_SHUTDOWNED &&
+		ControlFile->state != DB_SHUTDOWNED_IN_RECOVERY)
+		SyncDataDirectory();
+
+	/*
+	 * Initialize on the assumption we want to recover to the latest timeline
+	 * that's active according to pg_control.
+	 */
+	if (ControlFile->minRecoveryPointTLI >
+		ControlFile->checkPointCopy.ThisTimeLineID)
+		recoveryTargetTLI = ControlFile->minRecoveryPointTLI;
+	else
+		recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
+
+	/*
+	 * Check for recovery control file, and if so set up state for offline
+	 * recovery
+	 */
+	readRecoveryCommandFile();
+
+	/*
+	 * Save archive_cleanup_command in shared memory so that other processes
+	 * can see it.
+	 */
+	strlcpy(XLogCtl->archiveCleanupCommand,
+			archiveCleanupCommand ? archiveCleanupCommand : "",
+			sizeof(XLogCtl->archiveCleanupCommand));
+
+	/* Log recovery target message. */
+	ReportRecoveryTarget();
+
+	/*
+	 * Take ownership of the wakeup latch if we're going to sleep during
+	 * recovery.
+	 */
+	if (StandbyModeRequested)
+		OwnLatch(&XLogCtl->recoveryWakeupLatch);
+
+	/* Set up XLOG reader facility */
+	MemSet(&private, 0, sizeof(XLogPageReadPrivate));
+	xlogreader = XLogReaderAllocate(&XLogPageRead, &private);
+	if (!xlogreader)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory"),
+		   errdetail("Failed while allocating an XLog reading processor.")));
+	xlogreader->system_identifier = ControlFile->system_identifier;
+
+	/* Try to find a backup label. */
+	if (read_backup_label(&checkPointLoc, &backupEndRequired,
+						  &backupFromStandby))
+	{
+		wasShutdown = ProcessBackupLabel(xlogreader, &checkPoint, checkPointLoc,
+										 &haveTblspcMap);
+
+		/* set flag to delete it later */
+		haveBackupLabel = true;
+	}
+	else
+	{
+		/* Clean up any orphaned tablespace map files with no backup label. */
+		CleanUpTablespaceMap();
+
+		/*
+		 * It's possible that archive recovery was requested, but we don't
+		 * know how far we need to replay the WAL before we reach consistency.
+		 * This can happen for example if a base backup is taken from a
+		 * running server using an atomic filesystem snapshot, without calling
+		 * pg_start/stop_backup. Or if you just kill a running master server
+		 * and put it into archive recovery by creating a recovery.conf file.
+		 *
+		 * Our strategy in that case is to perform crash recovery first,
+		 * replaying all the WAL present in pg_xlog, and only enter archive
+		 * recovery after that.
+		 *
+		 * But usually we already know how far we need to replay the WAL (up
+		 * to minRecoveryPoint, up to backupEndPoint, or until we see an
+		 * end-of-backup record), and we can enter archive recovery directly.
+		 */
+		if (ArchiveRecoveryRequested &&
+			(ControlFile->minRecoveryPoint != InvalidXLogRecPtr ||
+			 ControlFile->backupEndRequired ||
+			 ControlFile->backupEndPoint != InvalidXLogRecPtr ||
+			 ControlFile->state == DB_SHUTDOWNED))
+		{
+			InArchiveRecovery = true;
+			if (StandbyModeRequested)
+				StandbyMode = true;
+		}
+
+		/*
+		 * Get the last valid checkpoint record.  If the latest one according
+		 * to pg_control is broken, try the next-to-last one.
+		 */
+		checkPointLoc = ControlFile->checkPoint;
+		RedoStartLSN = ControlFile->checkPointCopy.redo;
+		record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1, true);
+		if (record != NULL)
+		{
+			ereport(DEBUG1,
+					(errmsg("checkpoint record is at %X/%X",
+				   (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
+		}
+		else if (StandbyMode)
+		{
+			/*
+			 * The last valid checkpoint record required for a streaming
+			 * recovery exists in neither standby nor the primary.
+			 */
+			ereport(PANIC,
+					(errmsg("could not locate a valid checkpoint record")));
+		}
+		else
+		{
+			checkPointLoc = ControlFile->prevCheckPoint;
+			record = ReadCheckpointRecord(xlogreader, checkPointLoc, 2, true);
+			if (record != NULL)
+			{
+				ereport(LOG,
+						(errmsg("using previous checkpoint record at %X/%X",
+				   (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
+				InRecovery = true;		/* force recovery even if SHUTDOWNED */
+			}
+			else
+				ereport(PANIC,
+					 (errmsg("could not locate a valid checkpoint record")));
+		}
+		memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
+		wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
+	}
+
+	/*
+	 * Clear out any old relcache cache files.  This is *necessary* if we do
+	 * any WAL replay, since that would probably result in the cache files
+	 * being out of sync with database reality.  In theory we could leave them
+	 * in place if the database had been cleanly shut down, but it seems
+	 * safest to just remove them always and let them be rebuilt during the
+	 * first backend startup.  These files needs to be removed from all
+	 * directories including pg_tblspc, however the symlinks are created only
+	 * after reading tablespace_map file in case of archive recovery from
+	 * backup, so needs to clear old relcache files here after creating
+	 * symlinks.
+	 */
+	RelationCacheInitFileRemove();
+
+	/*
+	 * If the location of the checkpoint record is not on the expected
+	 * timeline in the history of the requested timeline, we cannot proceed:
+	 * the backup is not part of the history of the requested timeline.
+	 */
+	Assert(expectedTLEs);		/* was initialized by reading checkpoint
+								 * record */
+	if (tliOfPointInHistory(checkPointLoc, expectedTLEs) !=
+		checkPoint.ThisTimeLineID)
+	{
 		XLogRecPtr	switchpoint;
 
 		/*
@@ -6513,1021 +6776,885 @@ StartupXLOG(void)
 		InRecovery = true;
 	}
 
-	/* REDO */
+	/* Run REDO. */
+	BeginRedo(checkPoint, checkPointLoc, RecPtr,
+			  haveBackupLabel, backupEndRequired, backupFromStandby,
+			  haveTblspcMap, wasShutdown);
+	reachedStopPoint = ReplayRedo(xlogreader, checkPoint, RecPtr);
+	FinishRedo(reachedStopPoint);
+
+	/*
+	 * Kill WAL receiver, if it's still running, before we continue to write
+	 * the startup checkpoint record. It will trump over the checkpoint and
+	 * subsequent records if it's still alive when we start writing WAL.
+	 */
+	ShutdownWalRcv();
+
+	/*
+	 * Reset unlogged relations to the contents of their INIT fork. This is
+	 * done AFTER recovery is complete so as to include any unlogged relations
+	 * created during recovery, but BEFORE recovery is marked as having
+	 * completed successfully. Otherwise we'd not retry if any of the post
+	 * end-of-recovery steps fail.
+	 */
 	if (InRecovery)
-	{
-		int			rmid;
+		ResetUnloggedRelations(UNLOGGED_RELATION_INIT);
 
-		/*
-		 * Update pg_control to show that we are recovering and to show the
-		 * selected checkpoint as the place we are starting from. We also mark
-		 * pg_control with any minimum recovery stop point obtained from a
-		 * backup history file.
-		 */
-		dbstate_at_startup = ControlFile->state;
-		if (InArchiveRecovery)
-			ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
-		else
-		{
-			ereport(LOG,
-					(errmsg("database system was not properly shut down; "
-							"automatic recovery in progress")));
-			if (recoveryTargetTLI > ControlFile->checkPointCopy.ThisTimeLineID)
-				ereport(LOG,
-						(errmsg("crash recovery starts in timeline %u "
-								"and has target timeline %u",
-								ControlFile->checkPointCopy.ThisTimeLineID,
-								recoveryTargetTLI)));
-			ControlFile->state = DB_IN_CRASH_RECOVERY;
-		}
-		ControlFile->prevCheckPoint = ControlFile->checkPoint;
-		ControlFile->checkPoint = checkPointLoc;
-		ControlFile->checkPointCopy = checkPoint;
-		if (InArchiveRecovery)
-		{
-			/* initialize minRecoveryPoint if not set yet */
-			if (ControlFile->minRecoveryPoint < checkPoint.redo)
-			{
-				ControlFile->minRecoveryPoint = checkPoint.redo;
-				ControlFile->minRecoveryPointTLI = checkPoint.ThisTimeLineID;
-			}
-		}
-
-		/*
-		 * Set backupStartPoint if we're starting recovery from a base backup.
-		 *
-		 * Also set backupEndPoint and use minRecoveryPoint as the backup end
-		 * location if we're starting recovery from a base backup which was
-		 * taken from a standby. In this case, the database system status in
-		 * pg_control must indicate that the database was already in recovery.
-		 * Usually that will be DB_IN_ARCHIVE_RECOVERY but also can be
-		 * DB_SHUTDOWNED_IN_RECOVERY if recovery previously was interrupted
-		 * before reaching this point; e.g. because restore_command or
-		 * primary_conninfo were faulty.
-		 *
-		 * Any other state indicates that the backup somehow became corrupted
-		 * and we can't sensibly continue with recovery.
-		 */
-		if (haveBackupLabel)
-		{
-			ControlFile->backupStartPoint = checkPoint.redo;
-			ControlFile->backupEndRequired = backupEndRequired;
+	/*
+	 * We don't need the latch anymore. It's not strictly necessary to disown
+	 * it, but let's do it for the sake of tidiness.
+	 */
+	if (StandbyModeRequested)
+		DisownLatch(&XLogCtl->recoveryWakeupLatch);
 
-			if (backupFromStandby)
-			{
-				if (dbstate_at_startup != DB_IN_ARCHIVE_RECOVERY &&
-					dbstate_at_startup != DB_SHUTDOWNED_IN_RECOVERY)
-					ereport(FATAL,
-							(errmsg("backup_label contains data inconsistent with control file"),
-							 errhint("This means that the backup is corrupted and you will "
-							   "have to use another backup for recovery.")));
-				ControlFile->backupEndPoint = ControlFile->minRecoveryPoint;
-			}
-		}
-		ControlFile->time = (pg_time_t) time(NULL);
-		/* No need to hold ControlFileLock yet, we aren't up far enough */
-		UpdateControlFile();
+	/*
+	 * We are now done reading the xlog from stream. Turn off streaming
+	 * recovery to force fetching the files (which would be required at end of
+	 * recovery, e.g., timeline history file) from archive or pg_xlog.
+	 */
+	StandbyMode = false;
 
-		/* initialize our local copy of minRecoveryPoint */
-		minRecoveryPoint = ControlFile->minRecoveryPoint;
-		minRecoveryPointTLI = ControlFile->minRecoveryPointTLI;
+	/*
+	 * Re-fetch the last valid or last applied record, so we can identify the
+	 * exact endpoint of what we consider the valid portion of WAL.
+	 */
+	record = ReadRecord(xlogreader, LastRec, PANIC, false);
+	EndOfLog = EndRecPtr;
 
-		/*
-		 * Reset pgstat data, because it may be invalid after recovery.
-		 */
-		pgstat_reset_all();
+	/*
+	 * EndOfLogTLI is the TLI in the filename of the XLOG segment containing
+	 * the end-of-log. It could be different from the timeline that EndOfLog
+	 * nominally belongs to, if there was a timeline switch in that segment,
+	 * and we were reading the old WAL from a segment belonging to a higher
+	 * timeline.
+	 */
+	EndOfLogTLI = xlogreader->readPageTLI;
 
+	/*
+	 * Complain if we did not roll forward far enough to render the backup
+	 * dump consistent.  Note: it is indeed okay to look at the local variable
+	 * minRecoveryPoint here, even though ControlFile->minRecoveryPoint might
+	 * be further ahead --- ControlFile->minRecoveryPoint cannot have been
+	 * advanced beyond the WAL we processed.
+	 */
+	if (InRecovery &&
+		(EndOfLog < minRecoveryPoint ||
+		 !XLogRecPtrIsInvalid(ControlFile->backupStartPoint)))
+	{
 		/*
-		 * If there was a backup label file, it's done its job and the info
-		 * has now been propagated into pg_control.  We must get rid of the
-		 * label file so that if we crash during recovery, we'll pick up at
-		 * the latest recovery restartpoint instead of going all the way back
-		 * to the backup start point.  It seems prudent though to just rename
-		 * the file out of the way rather than delete it completely.
+		 * Ran off end of WAL before reaching end-of-backup WAL record, or
+		 * minRecoveryPoint. That's usually a bad sign, indicating that you
+		 * tried to recover from an online backup but never called
+		 * pg_stop_backup(), or you didn't archive all the WAL up to that
+		 * point. However, this also happens in crash recovery, if the system
+		 * crashes while an online backup is in progress. We must not treat
+		 * that as an error, or the database will refuse to start up.
 		 */
-		if (haveBackupLabel)
+		if (ArchiveRecoveryRequested || ControlFile->backupEndRequired)
 		{
-			unlink(BACKUP_LABEL_OLD);
-			durable_rename(BACKUP_LABEL_FILE, BACKUP_LABEL_OLD, FATAL);
+			if (ControlFile->backupEndRequired)
+				ereport(FATAL,
+						(errmsg("WAL ends before end of online backup"),
+						 errhint("All WAL generated while online backup was taken must be available at recovery.")));
+			else if (!XLogRecPtrIsInvalid(ControlFile->backupStartPoint))
+				ereport(FATAL,
+						(errmsg("WAL ends before end of online backup"),
+						 errhint("Online backup started with pg_start_backup() must be ended with pg_stop_backup(), and all WAL up to that point must be available at recovery.")));
+			else
+				ereport(FATAL,
+					  (errmsg("WAL ends before consistent recovery point")));
 		}
+	}
 
-		/*
-		 * If there was a tablespace_map file, it's done its job and the
-		 * symlinks have been created.  We must get rid of the map file so
-		 * that if we crash during recovery, we don't create symlinks again.
-		 * It seems prudent though to just rename the file out of the way
-		 * rather than delete it completely.
-		 */
-		if (haveTblspcMap)
-		{
-			unlink(TABLESPACE_MAP_OLD);
-			durable_rename(TABLESPACE_MAP, TABLESPACE_MAP_OLD, FATAL);
-		}
+	/* Consider whether we need to assign a new timeline ID. */
+	AssignNewTimeline();
 
-		/* Check that the GUCs used to generate the WAL allow recovery */
-		CheckRequiredParameterValues();
+	/*
+	 * We are now done reading the old WAL.  Turn off archive fetching if it
+	 * was active, and make a writable copy of the last WAL segment. (Note
+	 * that we also have a copy of the last block of the old WAL in readBuf;
+	 * we will use that below.)
+	 */
+	if (ArchiveRecoveryRequested)
+		exitArchiveRecovery(EndOfLogTLI, EndOfLog);
 
-		/*
-		 * We're in recovery, so unlogged relations may be trashed and must be
-		 * reset.  This should be done BEFORE allowing Hot Standby
-		 * connections, so that read-only backends don't try to read whatever
-		 * garbage is left over from before.
-		 */
-		ResetUnloggedRelations(UNLOGGED_RELATION_CLEANUP);
+	/*
+	 * Prepare to write WAL starting at EndOfLog position, and init xlog
+	 * buffer cache using the block containing the last record from the
+	 * previous incarnation.
+	 */
+	Insert = &XLogCtl->Insert;
+	Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec);
+	Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
 
-		/*
-		 * Likewise, delete any saved transaction snapshot files that got left
-		 * behind by crashed backends.
-		 */
-		DeleteAllExportedSnapshotFiles();
+	/*
+	 * Tricky point here: readBuf contains the *last* block that the LastRec
+	 * record spans, not the one it starts in.  The last block is indeed the
+	 * one we want to use.
+	 */
+	if (EndOfLog % XLOG_BLCKSZ != 0)
+	{
+		char	   *page;
+		int			len;
+		int			firstIdx;
+		XLogRecPtr	pageBeginPtr;
 
-		/*
-		 * Initialize for Hot Standby, if enabled. We won't let backends in
-		 * yet, not until we've reached the min recovery point specified in
-		 * control file and we've established a recovery snapshot from a
-		 * running-xacts WAL record.
-		 */
-		if (ArchiveRecoveryRequested && EnableHotStandby)
-		{
-			TransactionId *xids;
-			int			nxids;
+		pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ);
+		Assert(readOff == pageBeginPtr % XLogSegSize);
 
-			ereport(DEBUG1,
-					(errmsg("initializing for hot standby")));
+		firstIdx = XLogRecPtrToBufIdx(EndOfLog);
 
-			InitRecoveryTransactionEnvironment();
+		/* Copy the valid part of the last block, and zero the rest */
+		page = &XLogCtl->pages[firstIdx * XLOG_BLCKSZ];
+		len = EndOfLog % XLOG_BLCKSZ;
+		memcpy(page, xlogreader->readBuf, len);
+		memset(page + len, 0, XLOG_BLCKSZ - len);
 
-			if (wasShutdown)
-				oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids);
-			else
-				oldestActiveXID = checkPoint.oldestActiveXid;
-			Assert(TransactionIdIsValid(oldestActiveXID));
+		XLogCtl->xlblocks[firstIdx] = pageBeginPtr + XLOG_BLCKSZ;
+		XLogCtl->InitializedUpTo = pageBeginPtr + XLOG_BLCKSZ;
+	}
+	else
+	{
+		/*
+		 * There is no partial block to copy. Just set InitializedUpTo, and
+		 * let the first attempt to insert a log record to initialize the next
+		 * buffer.
+		 */
+		XLogCtl->InitializedUpTo = EndOfLog;
+	}
 
-			/* Tell procarray about the range of xids it has to deal with */
-			ProcArrayInitRecovery(ShmemVariableCache->nextXid);
+	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
-			/*
-			 * Startup commit log and subtrans only.  MultiXact and commit
-			 * timestamp have already been started up and other SLRUs are not
-			 * maintained during recovery and need not be started yet.
-			 */
-			StartupCLOG();
-			StartupSUBTRANS(oldestActiveXID);
+	XLogCtl->LogwrtResult = LogwrtResult;
 
-			/*
-			 * If we're beginning at a shutdown checkpoint, we know that
-			 * nothing was running on the master at this point. So fake-up an
-			 * empty running-xacts record and use that here and now. Recover
-			 * additional standby state for prepared transactions.
-			 */
-			if (wasShutdown)
-			{
-				RunningTransactionsData running;
-				TransactionId latestCompletedXid;
+	XLogCtl->LogwrtRqst.Write = EndOfLog;
+	XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
-				/*
-				 * Construct a RunningTransactions snapshot representing a
-				 * shut down server, with only prepared transactions still
-				 * alive. We're never overflowed at this point because all
-				 * subxids are listed with their parent prepared transactions.
-				 */
-				running.xcnt = nxids;
-				running.subxcnt = 0;
-				running.subxid_overflow = false;
-				running.nextXid = checkPoint.nextXid;
-				running.oldestRunningXid = oldestActiveXID;
-				latestCompletedXid = checkPoint.nextXid;
-				TransactionIdRetreat(latestCompletedXid);
-				Assert(TransactionIdIsNormal(latestCompletedXid));
-				running.latestCompletedXid = latestCompletedXid;
-				running.xids = xids;
-
-				ProcArrayApplyRecoveryInfo(&running);
-
-				StandbyRecoverPreparedTransactions(false);
-			}
-		}
-
-		/* Initialize resource managers */
-		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-		{
-			if (RmgrTable[rmid].rm_startup != NULL)
-				RmgrTable[rmid].rm_startup();
-		}
-
-		/*
-		 * Initialize shared variables for tracking progress of WAL replay, as
-		 * if we had just replayed the record before the REDO location (or the
-		 * checkpoint record itself, if it's a shutdown checkpoint).
-		 */
-		SpinLockAcquire(&XLogCtl->info_lck);
-		if (checkPoint.redo < RecPtr)
-			XLogCtl->replayEndRecPtr = checkPoint.redo;
-		else
-			XLogCtl->replayEndRecPtr = EndRecPtr;
-		XLogCtl->replayEndTLI = ThisTimeLineID;
-		XLogCtl->lastReplayedEndRecPtr = XLogCtl->replayEndRecPtr;
-		XLogCtl->lastReplayedTLI = XLogCtl->replayEndTLI;
-		XLogCtl->recoveryLastXTime = 0;
-		XLogCtl->currentChunkStartTime = 0;
-		XLogCtl->recoveryPause = false;
-		SpinLockRelease(&XLogCtl->info_lck);
-
-		/* Also ensure XLogReceiptTime has a sane value */
-		XLogReceiptTime = GetCurrentTimestamp();
-
-		/*
-		 * Let postmaster know we've started redo now, so that it can launch
-		 * checkpointer to perform restartpoints.  We don't bother during
-		 * crash recovery as restartpoints can only be performed during
-		 * archive recovery.  And we'd like to keep crash recovery simple, to
-		 * avoid introducing bugs that could affect you when recovering after
-		 * crash.
-		 *
-		 * After this point, we can no longer assume that we're the only
-		 * process in addition to postmaster!  Also, fsync requests are
-		 * subsequently to be handled by the checkpointer, not locally.
-		 */
-		if (ArchiveRecoveryRequested && IsUnderPostmaster)
-		{
-			PublishStartupProcessInformation();
-			SetForwardFsyncRequests();
-			SendPostmasterSignal(PMSIGNAL_RECOVERY_STARTED);
-			bgwriterLaunched = true;
-		}
-
-		/*
-		 * Allow read-only connections immediately if we're consistent
-		 * already.
-		 */
-		CheckRecoveryConsistency();
-
-		/*
-		 * Find the first record that logically follows the checkpoint --- it
-		 * might physically precede it, though.
-		 */
-		if (checkPoint.redo < RecPtr)
-		{
-			/* back up to find the record */
-			record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
-		}
-		else
-		{
-			/* just have to read next record after CheckPoint */
-			record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
-		}
-
-		if (record != NULL)
-		{
-			ErrorContextCallback errcallback;
-			TimestampTz xtime;
-
-			InRedo = true;
-
-			ereport(LOG,
-					(errmsg("redo starts at %X/%X",
-						 (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
-
-			/*
-			 * main redo apply loop
-			 */
-			do
-			{
-				bool		switchedTLI = false;
-
-#ifdef WAL_DEBUG
-				if (XLOG_DEBUG ||
-				 (rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) ||
-					(rmid != RM_XACT_ID && trace_recovery_messages <= DEBUG3))
-				{
-					StringInfoData buf;
-
-					initStringInfo(&buf);
-					appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
-							(uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr,
-							 (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr);
-					xlog_outrec(&buf, xlogreader);
-					appendStringInfoString(&buf, " - ");
-					xlog_outdesc(&buf, xlogreader);
-					elog(LOG, "%s", buf.data);
-					pfree(buf.data);
-				}
-#endif
-
-				/* Handle interrupt signals of startup process */
-				HandleStartupProcInterrupts();
-
-				/*
-				 * Pause WAL replay, if requested by a hot-standby session via
-				 * SetRecoveryPause().
-				 *
-				 * Note that we intentionally don't take the info_lck spinlock
-				 * here.  We might therefore read a slightly stale value of
-				 * the recoveryPause flag, but it can't be very stale (no
-				 * worse than the last spinlock we did acquire).  Since a
-				 * pause request is a pretty asynchronous thing anyway,
-				 * possibly responding to it one WAL record later than we
-				 * otherwise would is a minor issue, so it doesn't seem worth
-				 * adding another spinlock cycle to prevent that.
-				 */
-				if (((volatile XLogCtlData *) XLogCtl)->recoveryPause)
-					recoveryPausesHere();
-
-				/*
-				 * Have we reached our recovery target?
-				 */
-				if (recoveryStopsBefore(xlogreader))
-				{
-					reachedStopPoint = true;	/* see below */
-					break;
-				}
-
-				/*
-				 * If we've been asked to lag the master, wait on latch until
-				 * enough time has passed.
-				 */
-				if (recoveryApplyDelay(xlogreader))
-				{
-					/*
-					 * We test for paused recovery again here. If user sets
-					 * delayed apply, it may be because they expect to pause
-					 * recovery in case of problems, so we must test again
-					 * here otherwise pausing during the delay-wait wouldn't
-					 * work.
-					 */
-					if (((volatile XLogCtlData *) XLogCtl)->recoveryPause)
-						recoveryPausesHere();
-				}
-
-				/* Setup error traceback support for ereport() */
-				errcallback.callback = rm_redo_error_callback;
-				errcallback.arg = (void *) xlogreader;
-				errcallback.previous = error_context_stack;
-				error_context_stack = &errcallback;
-
-				/*
-				 * ShmemVariableCache->nextXid must be beyond record's xid.
-				 *
-				 * We don't expect anyone else to modify nextXid, hence we
-				 * don't need to hold a lock while examining it.  We still
-				 * acquire the lock to modify it, though.
-				 */
-				if (TransactionIdFollowsOrEquals(record->xl_xid,
-												 ShmemVariableCache->nextXid))
-				{
-					LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
-					ShmemVariableCache->nextXid = record->xl_xid;
-					TransactionIdAdvance(ShmemVariableCache->nextXid);
-					LWLockRelease(XidGenLock);
-				}
-
-				/*
-				 * Before replaying this record, check if this record causes
-				 * the current timeline to change. The record is already
-				 * considered to be part of the new timeline, so we update
-				 * ThisTimeLineID before replaying it. That's important so
-				 * that replayEndTLI, which is recorded as the minimum
-				 * recovery point's TLI if recovery stops after this record,
-				 * is set correctly.
-				 */
-				if (record->xl_rmid == RM_XLOG_ID)
-				{
-					TimeLineID	newTLI = ThisTimeLineID;
-					TimeLineID	prevTLI = ThisTimeLineID;
-					uint8		info = record->xl_info & ~XLR_INFO_MASK;
-
-					if (info == XLOG_CHECKPOINT_SHUTDOWN)
-					{
-						CheckPoint	checkPoint;
-
-						memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
-						newTLI = checkPoint.ThisTimeLineID;
-						prevTLI = checkPoint.PrevTimeLineID;
-					}
-					else if (info == XLOG_END_OF_RECOVERY)
-					{
-						xl_end_of_recovery xlrec;
-
-						memcpy(&xlrec, XLogRecGetData(xlogreader), sizeof(xl_end_of_recovery));
-						newTLI = xlrec.ThisTimeLineID;
-						prevTLI = xlrec.PrevTimeLineID;
-					}
-
-					if (newTLI != ThisTimeLineID)
-					{
-						/* Check that it's OK to switch to this TLI */
-						checkTimeLineSwitch(EndRecPtr, newTLI, prevTLI);
-
-						/* Following WAL records should be run with new TLI */
-						ThisTimeLineID = newTLI;
-						switchedTLI = true;
-					}
-				}
-
-				/*
-				 * Update shared replayEndRecPtr before replaying this record,
-				 * so that XLogFlush will update minRecoveryPoint correctly.
-				 */
-				SpinLockAcquire(&XLogCtl->info_lck);
-				XLogCtl->replayEndRecPtr = EndRecPtr;
-				XLogCtl->replayEndTLI = ThisTimeLineID;
-				SpinLockRelease(&XLogCtl->info_lck);
-
-				/*
-				 * If we are attempting to enter Hot Standby mode, process
-				 * XIDs we see
-				 */
-				if (standbyState >= STANDBY_INITIALIZED &&
-					TransactionIdIsValid(record->xl_xid))
-					RecordKnownAssignedTransactionIds(record->xl_xid);
-
-				/* Now apply the WAL record itself */
-				RmgrTable[record->xl_rmid].rm_redo(xlogreader);
-
-				/* Pop the error context stack */
-				error_context_stack = errcallback.previous;
-
-				/*
-				 * Update lastReplayedEndRecPtr after this record has been
-				 * successfully replayed.
-				 */
-				SpinLockAcquire(&XLogCtl->info_lck);
-				XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
-				XLogCtl->lastReplayedTLI = ThisTimeLineID;
-				SpinLockRelease(&XLogCtl->info_lck);
-
-				/*
-				 * If rm_redo called XLogRequestWalReceiverReply, then we wake
-				 * up the receiver so that it notices the updated
-				 * lastReplayedEndRecPtr and sends a reply to the master.
-				 */
-				if (doRequestWalReceiverReply)
-				{
-					doRequestWalReceiverReply = false;
-					WalRcvForceReply();
-				}
-
-				/* Remember this record as the last-applied one */
-				LastRec = ReadRecPtr;
-
-				/* Allow read-only connections if we're consistent now */
-				CheckRecoveryConsistency();
-
-				/* Is this a timeline switch? */
-				if (switchedTLI)
-				{
-					/*
-					 * Before we continue on the new timeline, clean up any
-					 * (possibly bogus) future WAL segments on the old
-					 * timeline.
-					 */
-					RemoveNonParentXlogFiles(EndRecPtr, ThisTimeLineID);
-
-					/*
-					 * Wake up any walsenders to notice that we are on a new
-					 * timeline.
-					 */
-					if (switchedTLI && AllowCascadeReplication())
-						WalSndWakeup();
-				}
-
-				/* Exit loop if we reached inclusive recovery target */
-				if (recoveryStopsAfter(xlogreader))
-				{
-					reachedStopPoint = true;
-					break;
-				}
-
-				/* Else, try to fetch the next WAL record */
-				record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
-			} while (record != NULL);
-
-			/*
-			 * end of main redo apply loop
-			 */
-
-			if (reachedStopPoint)
-			{
-				if (!reachedConsistency)
-					ereport(FATAL,
-							(errmsg("requested recovery stop point is before consistent recovery point")));
-
-				/*
-				 * This is the last point where we can restart recovery with a
-				 * new recovery target, if we shutdown and begin again. After
-				 * this, Resource Managers may choose to do permanent
-				 * corrective actions at end of recovery.
-				 */
-				switch (recoveryTargetAction)
-				{
-					case RECOVERY_TARGET_ACTION_SHUTDOWN:
+	/* Pre-scan prepared transactions to find out the range of XIDs present */
+	oldestActiveXID = PrescanPreparedTransactions(NULL, NULL);
 
-						/*
-						 * exit with special return code to request shutdown
-						 * of postmaster.  Log messages issued from
-						 * postmaster.
-						 */
-						proc_exit(3);
+	/*
+	 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
+	 * record before resource manager writes cleanup WAL records or checkpoint
+	 * record is written.
+	 */
+	Insert->fullPageWrites = lastFullPageWrites;
+	LocalSetXLogInsertAllowed();
+	UpdateFullPageWrites();
+	LocalXLogInsertAllowed = -1;
 
-					case RECOVERY_TARGET_ACTION_PAUSE:
-						SetRecoveryPause(true);
-						recoveryPausesHere();
+	/*
+	 * Perform a checkpoint to update our recovery activity to disk if
+	 * appropriate.
+	 */
+	checkPointLoc = RecoveryCheckpoint(xlogreader, checkPointLoc, fast_promoted);
 
-						/* drop into promote */
+	/* If we switched to a new timeline, clean up old segments. */
+	CleanUpOldTimelineSegments(EndOfLog, EndOfLogTLI);
 
-					case RECOVERY_TARGET_ACTION_PROMOTE:
-						break;
-				}
-			}
+	/*
+	 * Preallocate additional log files, if wanted.
+	 */
+	PreallocXlogFiles(EndOfLog);
 
-			/* Allow resource managers to do any required cleanup. */
-			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-			{
-				if (RmgrTable[rmid].rm_cleanup != NULL)
-					RmgrTable[rmid].rm_cleanup();
-			}
+	/*
+	 * Okay, we're officially UP.
+	 */
+	InRecovery = false;
 
-			ereport(LOG,
-					(errmsg("redo done at %X/%X",
-						 (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
-			xtime = GetLatestXTime();
-			if (xtime)
-				ereport(LOG,
-					 (errmsg("last completed transaction was at log time %s",
-							 timestamptz_to_str(xtime))));
+	/* start the archive_timeout timer running */
+	XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
 
-			InRedo = false;
-		}
-		else
-		{
-			/* there are no WAL records following the checkpoint */
-			ereport(LOG,
-					(errmsg("redo is not required")));
-		}
+	/* also initialize latestCompletedXid, to nextXid - 1 */
+	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+	ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
+	TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
+	LWLockRelease(ProcArrayLock);
+
+	/*
+	 * Start up the commit log and subtrans, if not already done for hot
+	 * standby.  (commit timestamps are started below, if necessary.)
+	 */
+	if (standbyState == STANDBY_DISABLED)
+	{
+		StartupCLOG();
+		StartupSUBTRANS(oldestActiveXID);
 	}
 
 	/*
-	 * Kill WAL receiver, if it's still running, before we continue to write
-	 * the startup checkpoint record. It will trump over the checkpoint and
-	 * subsequent records if it's still alive when we start writing WAL.
+	 * Perform end of recovery actions for any SLRUs that need it.
 	 */
-	ShutdownWalRcv();
+	TrimCLOG();
+	TrimMultiXact();
+
+	/* Reload shared-memory state for prepared transactions */
+	RecoverPreparedTransactions();
 
 	/*
-	 * Reset unlogged relations to the contents of their INIT fork. This is
-	 * done AFTER recovery is complete so as to include any unlogged relations
-	 * created during recovery, but BEFORE recovery is marked as having
-	 * completed successfully. Otherwise we'd not retry if any of the post
-	 * end-of-recovery steps fail.
+	 * Shutdown the recovery environment. This must occur after
+	 * RecoverPreparedTransactions(), see notes for lock_twophase_recover()
 	 */
-	if (InRecovery)
-		ResetUnloggedRelations(UNLOGGED_RELATION_INIT);
+	if (standbyState != STANDBY_DISABLED)
+		ShutdownRecoveryTransactionEnvironment();
+
+	/* Shut down xlogreader */
+	if (readFile >= 0)
+	{
+		close(readFile);
+		readFile = -1;
+	}
+	XLogReaderFree(xlogreader);
 
 	/*
-	 * We don't need the latch anymore. It's not strictly necessary to disown
-	 * it, but let's do it for the sake of tidiness.
+	 * If any of the critical GUCs have changed, log them before we allow
+	 * backends to write WAL.
 	 */
-	if (StandbyModeRequested)
-		DisownLatch(&XLogCtl->recoveryWakeupLatch);
+	LocalSetXLogInsertAllowed();
+	XLogReportParameters();
 
 	/*
-	 * We are now done reading the xlog from stream. Turn off streaming
-	 * recovery to force fetching the files (which would be required at end of
-	 * recovery, e.g., timeline history file) from archive or pg_xlog.
+	 * Local WAL inserts enabled, so it's time to finish initialization of
+	 * commit timestamp.
 	 */
-	StandbyMode = false;
+	CompleteCommitTsInitialization();
 
 	/*
-	 * Re-fetch the last valid or last applied record, so we can identify the
-	 * exact endpoint of what we consider the valid portion of WAL.
+	 * All done with end-of-recovery actions.
+	 *
+	 * Now allow backends to write WAL and update the control file status in
+	 * consequence.  The boolean flag allowing backends to write WAL is
+	 * updated while holding ControlFileLock to prevent other backends to look
+	 * at an inconsistent state of the control file in shared memory.  There
+	 * is still a small window during which backends can write WAL and the
+	 * control file is still referring to a system not in DB_IN_PRODUCTION
+	 * state while looking at the on-disk control file.
+	 *
+	 * Also, although the boolean flag to allow WAL is probably atomic in
+	 * itself, we use the info_lck here to ensure that there are no race
+	 * conditions concerning visibility of other recent updates to shared
+	 * memory.
 	 */
-	record = ReadRecord(xlogreader, LastRec, PANIC, false);
-	EndOfLog = EndRecPtr;
+	LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+	ControlFile->state = DB_IN_PRODUCTION;
+	ControlFile->time = (pg_time_t) time(NULL);
+
+	SpinLockAcquire(&XLogCtl->info_lck);
+	XLogCtl->SharedRecoveryInProgress = false;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	UpdateControlFile();
+	LWLockRelease(ControlFileLock);
 
 	/*
-	 * EndOfLogTLI is the TLI in the filename of the XLOG segment containing
-	 * the end-of-log. It could be different from the timeline that EndOfLog
-	 * nominally belongs to, if there was a timeline switch in that segment,
-	 * and we were reading the old WAL from a segment belonging to a higher
-	 * timeline.
+	 * If there were cascading standby servers connected to us, nudge any wal
+	 * sender processes to notice that we've been promoted.
 	 */
-	EndOfLogTLI = xlogreader->readPageTLI;
+	WalSndWakeup();
+
+	/*
+	 * If this was a fast promotion, request an (online) checkpoint now. This
+	 * isn't required for consistency, but the last restartpoint might be far
+	 * back, and in case of a crash, recovering from it might take a longer
+	 * than is appropriate now that we're not in standby mode anymore.
+	 */
+	if (fast_promoted)
+		RequestCheckpoint(CHECKPOINT_FORCE);
+}
+
+static void
+BeginRedo(CheckPoint checkPoint, XLogRecPtr checkPointLoc, XLogRecPtr RecPtr,
+		  bool haveBackupLabel, bool backupEndRequired, bool backupFromStandby, bool haveTblspcMap,
+		  bool wasShutdown)
+{
+	DBState		dbstate_at_startup;
+
+	if (InRecovery)
+	{
+		uint8		rmid;
+
+		/*
+		 * Update pg_control to show that we are recovering and to show the
+		 * selected checkpoint as the place we are starting from. We also mark
+		 * pg_control with any minimum recovery stop point obtained from a
+		 * backup history file.
+		 */
+		dbstate_at_startup = ControlFile->state;
+		if (InArchiveRecovery)
+			ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
+		else
+		{
+			ereport(LOG,
+					(errmsg("database system was not properly shut down; "
+							"automatic recovery in progress")));
+			if (recoveryTargetTLI > ControlFile->checkPointCopy.ThisTimeLineID)
+				ereport(LOG,
+						(errmsg("crash recovery starts in timeline %u "
+								"and has target timeline %u",
+								ControlFile->checkPointCopy.ThisTimeLineID,
+								recoveryTargetTLI)));
+			ControlFile->state = DB_IN_CRASH_RECOVERY;
+		}
+		ControlFile->prevCheckPoint = ControlFile->checkPoint;
+		ControlFile->checkPoint = checkPointLoc;
+		ControlFile->checkPointCopy = checkPoint;
+		if (InArchiveRecovery)
+		{
+			/* initialize minRecoveryPoint if not set yet */
+			if (ControlFile->minRecoveryPoint < checkPoint.redo)
+			{
+				ControlFile->minRecoveryPoint = checkPoint.redo;
+				ControlFile->minRecoveryPointTLI = checkPoint.ThisTimeLineID;
+			}
+		}
+
+		/*
+		 * Set backupStartPoint if we're starting recovery from a base backup.
+		 *
+		 * Also set backupEndPoint and use minRecoveryPoint as the backup end
+		 * location if we're starting recovery from a base backup which was
+		 * taken from a standby. In this case, the database system status in
+		 * pg_control must indicate that the database was already in recovery.
+		 * Usually that will be DB_IN_ARCHIVE_RECOVERY but also can be
+		 * DB_SHUTDOWNED_IN_RECOVERY if recovery previously was interrupted
+		 * before reaching this point; e.g. because restore_command or
+		 * primary_conninfo were faulty.
+		 *
+		 * Any other state indicates that the backup somehow became corrupted
+		 * and we can't sensibly continue with recovery.
+		 */
+		if (haveBackupLabel)
+		{
+			ControlFile->backupStartPoint = checkPoint.redo;
+			ControlFile->backupEndRequired = backupEndRequired;
+
+			if (backupFromStandby)
+			{
+				if (dbstate_at_startup != DB_IN_ARCHIVE_RECOVERY &&
+					dbstate_at_startup != DB_SHUTDOWNED_IN_RECOVERY)
+					ereport(FATAL,
+							(errmsg("backup_label contains data inconsistent with control file"),
+							 errhint("This means that the backup is corrupted and you will "
+							   "have to use another backup for recovery.")));
+				ControlFile->backupEndPoint = ControlFile->minRecoveryPoint;
+			}
+		}
+		ControlFile->time = (pg_time_t) time(NULL);
+		/* No need to hold ControlFileLock yet, we aren't up far enough */
+		UpdateControlFile();
+
+		/* initialize our local copy of minRecoveryPoint */
+		minRecoveryPoint = ControlFile->minRecoveryPoint;
+		minRecoveryPointTLI = ControlFile->minRecoveryPointTLI;
+
+		/*
+		 * Reset pgstat data, because it may be invalid after recovery.
+		 */
+		pgstat_reset_all();
+
+		/*
+		 * If there was a backup label file, it's done its job and the info
+		 * has now been propagated into pg_control.  We must get rid of the
+		 * label file so that if we crash during recovery, we'll pick up at
+		 * the latest recovery restartpoint instead of going all the way back
+		 * to the backup start point.  It seems prudent though to just rename
+		 * the file out of the way rather than delete it completely.
+		 */
+		if (haveBackupLabel)
+		{
+			unlink(BACKUP_LABEL_OLD);
+			durable_rename(BACKUP_LABEL_FILE, BACKUP_LABEL_OLD, FATAL);
+		}
 
-	/*
-	 * Complain if we did not roll forward far enough to render the backup
-	 * dump consistent.  Note: it is indeed okay to look at the local variable
-	 * minRecoveryPoint here, even though ControlFile->minRecoveryPoint might
-	 * be further ahead --- ControlFile->minRecoveryPoint cannot have been
-	 * advanced beyond the WAL we processed.
-	 */
-	if (InRecovery &&
-		(EndOfLog < minRecoveryPoint ||
-		 !XLogRecPtrIsInvalid(ControlFile->backupStartPoint)))
-	{
 		/*
-		 * Ran off end of WAL before reaching end-of-backup WAL record, or
-		 * minRecoveryPoint. That's usually a bad sign, indicating that you
-		 * tried to recover from an online backup but never called
-		 * pg_stop_backup(), or you didn't archive all the WAL up to that
-		 * point. However, this also happens in crash recovery, if the system
-		 * crashes while an online backup is in progress. We must not treat
-		 * that as an error, or the database will refuse to start up.
+		 * If there was a tablespace_map file, it's done its job and the
+		 * symlinks have been created.  We must get rid of the map file so
+		 * that if we crash during recovery, we don't create symlinks again.
+		 * It seems prudent though to just rename the file out of the way
+		 * rather than delete it completely.
 		 */
-		if (ArchiveRecoveryRequested || ControlFile->backupEndRequired)
+		if (haveTblspcMap)
 		{
-			if (ControlFile->backupEndRequired)
-				ereport(FATAL,
-						(errmsg("WAL ends before end of online backup"),
-						 errhint("All WAL generated while online backup was taken must be available at recovery.")));
-			else if (!XLogRecPtrIsInvalid(ControlFile->backupStartPoint))
-				ereport(FATAL,
-						(errmsg("WAL ends before end of online backup"),
-						 errhint("Online backup started with pg_start_backup() must be ended with pg_stop_backup(), and all WAL up to that point must be available at recovery.")));
-			else
-				ereport(FATAL,
-					  (errmsg("WAL ends before consistent recovery point")));
+			unlink(TABLESPACE_MAP_OLD);
+			durable_rename(TABLESPACE_MAP, TABLESPACE_MAP_OLD, FATAL);
 		}
-	}
 
-	/*
-	 * Consider whether we need to assign a new timeline ID.
-	 *
-	 * If we are doing an archive recovery, we always assign a new ID.  This
-	 * handles a couple of issues.  If we stopped short of the end of WAL
-	 * during recovery, then we are clearly generating a new timeline and must
-	 * assign it a unique new ID.  Even if we ran to the end, modifying the
-	 * current last segment is problematic because it may result in trying to
-	 * overwrite an already-archived copy of that segment, and we encourage
-	 * DBAs to make their archive_commands reject that.  We can dodge the
-	 * problem by making the new active segment have a new timeline ID.
-	 *
-	 * In a normal crash recovery, we can just extend the timeline we were in.
-	 */
-	PrevTimeLineID = ThisTimeLineID;
-	if (ArchiveRecoveryRequested)
-	{
-		char		reason[200];
+		/* Check that the GUCs used to generate the WAL allow recovery */
+		CheckRequiredParameterValues();
 
-		Assert(InArchiveRecovery);
+		/*
+		 * We're in recovery, so unlogged relations may be trashed and must be
+		 * reset.  This should be done BEFORE allowing Hot Standby
+		 * connections, so that read-only backends don't try to read whatever
+		 * garbage is left over from before.
+		 */
+		ResetUnloggedRelations(UNLOGGED_RELATION_CLEANUP);
 
-		ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
-		ereport(LOG,
-				(errmsg("selected new timeline ID: %u", ThisTimeLineID)));
+		/*
+		 * Likewise, delete any saved transaction snapshot files that got left
+		 * behind by crashed backends.
+		 */
+		DeleteAllExportedSnapshotFiles();
+
+		/* Initialize for Hot Standby, if enabled. */
+		BeginHotStandby(checkPoint, wasShutdown);
+
+		/* Initialize resource managers */
+		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
+		{
+			if (RmgrTable[rmid].rm_startup != NULL)
+				RmgrTable[rmid].rm_startup();
+		}
 
 		/*
-		 * Create a comment for the history file to explain why and where
-		 * timeline changed.
+		 * Initialize shared variables for tracking progress of WAL replay, as
+		 * if we had just replayed the record before the REDO location (or the
+		 * checkpoint record itself, if it's a shutdown checkpoint).
 		 */
-		if (recoveryTarget == RECOVERY_TARGET_XID)
-			snprintf(reason, sizeof(reason),
-					 "%s transaction %u",
-					 recoveryStopAfter ? "after" : "before",
-					 recoveryStopXid);
-		else if (recoveryTarget == RECOVERY_TARGET_TIME)
-			snprintf(reason, sizeof(reason),
-					 "%s %s\n",
-					 recoveryStopAfter ? "after" : "before",
-					 timestamptz_to_str(recoveryStopTime));
-		else if (recoveryTarget == RECOVERY_TARGET_LSN)
-			snprintf(reason, sizeof(reason),
-					 "%s LSN %X/%X\n",
-					 recoveryStopAfter ? "after" : "before",
-					 (uint32 ) (recoveryStopLSN >> 32),
-					 (uint32) recoveryStopLSN);
-		else if (recoveryTarget == RECOVERY_TARGET_NAME)
-			snprintf(reason, sizeof(reason),
-					 "at restore point \"%s\"",
-					 recoveryStopName);
-		else if (recoveryTarget == RECOVERY_TARGET_IMMEDIATE)
-			snprintf(reason, sizeof(reason), "reached consistency");
+		SpinLockAcquire(&XLogCtl->info_lck);
+		if (checkPoint.redo < RecPtr)
+			XLogCtl->replayEndRecPtr = checkPoint.redo;
 		else
-			snprintf(reason, sizeof(reason), "no recovery target specified");
+			XLogCtl->replayEndRecPtr = EndRecPtr;
+		XLogCtl->replayEndTLI = ThisTimeLineID;
+		XLogCtl->lastReplayedEndRecPtr = XLogCtl->replayEndRecPtr;
+		XLogCtl->lastReplayedTLI = XLogCtl->replayEndTLI;
+		XLogCtl->recoveryLastXTime = 0;
+		XLogCtl->currentChunkStartTime = 0;
+		XLogCtl->recoveryPause = false;
+		SpinLockRelease(&XLogCtl->info_lck);
 
-		writeTimeLineHistory(ThisTimeLineID, recoveryTargetTLI,
-							 EndRecPtr, reason);
-	}
+		/* Also ensure XLogReceiptTime has a sane value */
+		XLogReceiptTime = GetCurrentTimestamp();
 
-	/* Save the selected TimeLineID in shared memory, too */
-	XLogCtl->ThisTimeLineID = ThisTimeLineID;
-	XLogCtl->PrevTimeLineID = PrevTimeLineID;
+		/*
+		 * Let postmaster know we've started redo now, so that it can launch
+		 * checkpointer to perform restartpoints.  We don't bother during
+		 * crash recovery as restartpoints can only be performed during
+		 * archive recovery.  And we'd like to keep crash recovery simple, to
+		 * avoid introducing bugs that could affect you when recovering after
+		 * crash.
+		 *
+		 * After this point, we can no longer assume that we're the only
+		 * process in addition to postmaster!  Also, fsync requests are
+		 * subsequently to be handled by the checkpointer, not locally.
+		 */
+		if (ArchiveRecoveryRequested && IsUnderPostmaster)
+		{
+			PublishStartupProcessInformation();
+			SetForwardFsyncRequests();
+			SendPostmasterSignal(PMSIGNAL_RECOVERY_STARTED);
+			bgwriterLaunched = true;
+		}
 
-	/*
-	 * We are now done reading the old WAL.  Turn off archive fetching if it
-	 * was active, and make a writable copy of the last WAL segment. (Note
-	 * that we also have a copy of the last block of the old WAL in readBuf;
-	 * we will use that below.)
-	 */
-	if (ArchiveRecoveryRequested)
-		exitArchiveRecovery(EndOfLogTLI, EndOfLog);
+		/*
+		 * Allow read-only connections immediately if we're consistent
+		 * already.
+		 */
+		CheckRecoveryConsistency();
+	}
+}
 
-	/*
-	 * Prepare to write WAL starting at EndOfLog position, and init xlog
-	 * buffer cache using the block containing the last record from the
-	 * previous incarnation.
-	 */
-	Insert = &XLogCtl->Insert;
-	Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec);
-	Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
+static void
+BeginHotStandby(CheckPoint checkPoint, bool wasShutdown)
+{
+	XLogRecPtr oldestActiveXID;
 
-	/*
-	 * Tricky point here: readBuf contains the *last* block that the LastRec
-	 * record spans, not the one it starts in.  The last block is indeed the
-	 * one we want to use.
-	 */
-	if (EndOfLog % XLOG_BLCKSZ != 0)
+	if (InRecovery)
 	{
-		char	   *page;
-		int			len;
-		int			firstIdx;
-		XLogRecPtr	pageBeginPtr;
+		/*
+		 * Initialize for Hot Standby, if enabled. We won't let backends in
+		 * yet, not until we've reached the min recovery point specified in
+		 * control file and we've established a recovery snapshot from a
+		 * running-xacts WAL record.
+		 */
+		if (ArchiveRecoveryRequested && EnableHotStandby)
+		{
+			TransactionId *xids;
+			int			nxids;
 
-		pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ);
-		Assert(readOff == pageBeginPtr % XLogSegSize);
+			ereport(DEBUG1,
+					(errmsg("initializing for hot standby")));
 
-		firstIdx = XLogRecPtrToBufIdx(EndOfLog);
+			InitRecoveryTransactionEnvironment();
 
-		/* Copy the valid part of the last block, and zero the rest */
-		page = &XLogCtl->pages[firstIdx * XLOG_BLCKSZ];
-		len = EndOfLog % XLOG_BLCKSZ;
-		memcpy(page, xlogreader->readBuf, len);
-		memset(page + len, 0, XLOG_BLCKSZ - len);
+			if (wasShutdown)
+				oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids);
+			else
+				oldestActiveXID = checkPoint.oldestActiveXid;
+			Assert(TransactionIdIsValid(oldestActiveXID));
+
+			/* Tell procarray about the range of xids it has to deal with */
+			ProcArrayInitRecovery(ShmemVariableCache->nextXid);
+
+			/*
+			 * Startup commit log and subtrans only.  MultiXact and commit
+			 * timestamp have already been started up and other SLRUs are not
+			 * maintained during recovery and need not be started yet.
+			 */
+			StartupCLOG();
+			StartupSUBTRANS(oldestActiveXID);
+
+			/*
+			 * If we're beginning at a shutdown checkpoint, we know that
+			 * nothing was running on the master at this point. So fake-up an
+			 * empty running-xacts record and use that here and now. Recover
+			 * additional standby state for prepared transactions.
+			 */
+			if (wasShutdown)
+			{
+				RunningTransactionsData running;
+				TransactionId latestCompletedXid;
+
+				/*
+				 * Construct a RunningTransactions snapshot representing a
+				 * shut down server, with only prepared transactions still
+				 * alive. We're never overflowed at this point because all
+				 * subxids are listed with their parent prepared transactions.
+				 */
+				running.xcnt = nxids;
+				running.subxcnt = 0;
+				running.subxid_overflow = false;
+				running.nextXid = checkPoint.nextXid;
+				running.oldestRunningXid = oldestActiveXID;
+				latestCompletedXid = checkPoint.nextXid;
+				TransactionIdRetreat(latestCompletedXid);
+				Assert(TransactionIdIsNormal(latestCompletedXid));
+				running.latestCompletedXid = latestCompletedXid;
+				running.xids = xids;
 
-		XLogCtl->xlblocks[firstIdx] = pageBeginPtr + XLOG_BLCKSZ;
-		XLogCtl->InitializedUpTo = pageBeginPtr + XLOG_BLCKSZ;
+				ProcArrayApplyRecoveryInfo(&running);
+
+				StandbyRecoverPreparedTransactions(false);
+			}
+		}
 	}
-	else
+}
+
+static bool
+ReplayRedo(XLogReaderState *xlogreader, CheckPoint checkPoint, XLogRecPtr RecPtr)
+{
+	XLogRecord *record;
+	bool reachedStopPoint = false;
+
+	if (InRecovery)
 	{
 		/*
-		 * There is no partial block to copy. Just set InitializedUpTo, and
-		 * let the first attempt to insert a log record to initialize the next
-		 * buffer.
+		 * Find the first record that logically follows the checkpoint --- it
+		 * might physically precede it, though.
 		 */
-		XLogCtl->InitializedUpTo = EndOfLog;
-	}
+		if (checkPoint.redo < RecPtr)
+		{
+			/* back up to find the record */
+			record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
+		}
+		else
+		{
+			/* just have to read next record after CheckPoint */
+			record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
+		}
 
-	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+		if (record != NULL)
+		{
+			ErrorContextCallback errcallback;
 
-	XLogCtl->LogwrtResult = LogwrtResult;
+			InRedo = true;
 
-	XLogCtl->LogwrtRqst.Write = EndOfLog;
-	XLogCtl->LogwrtRqst.Flush = EndOfLog;
+			ereport(LOG,
+					(errmsg("redo starts at %X/%X",
+						 (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
 
-	/* Pre-scan prepared transactions to find out the range of XIDs present */
-	oldestActiveXID = PrescanPreparedTransactions(NULL, NULL);
+			/*
+			 * main redo apply loop
+			 */
+			do
+			{
+				bool		switchedTLI = false;
 
-	/*
-	 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
-	 * record before resource manager writes cleanup WAL records or checkpoint
-	 * record is written.
-	 */
-	Insert->fullPageWrites = lastFullPageWrites;
-	LocalSetXLogInsertAllowed();
-	UpdateFullPageWrites();
-	LocalXLogInsertAllowed = -1;
+#ifdef WAL_DEBUG
+				if (XLOG_DEBUG ||
+				 (rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) ||
+					(rmid != RM_XACT_ID && trace_recovery_messages <= DEBUG3))
+				{
+					StringInfoData buf;
 
-	if (InRecovery)
-	{
-		/*
-		 * Perform a checkpoint to update all our recovery activity to disk.
-		 *
-		 * Note that we write a shutdown checkpoint rather than an on-line
-		 * one. This is not particularly critical, but since we may be
-		 * assigning a new TLI, using a shutdown checkpoint allows us to have
-		 * the rule that TLI only changes in shutdown checkpoints, which
-		 * allows some extra error checking in xlog_redo.
-		 *
-		 * In fast promotion, only create a lightweight end-of-recovery record
-		 * instead of a full checkpoint. A checkpoint is requested later,
-		 * after we're fully out of recovery mode and already accepting
-		 * queries.
-		 */
-		if (bgwriterLaunched)
-		{
-			if (fast_promote)
-			{
-				checkPointLoc = ControlFile->prevCheckPoint;
+					initStringInfo(&buf);
+					appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
+							(uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr,
+							 (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr);
+					xlog_outrec(&buf, xlogreader);
+					appendStringInfoString(&buf, " - ");
+					xlog_outdesc(&buf, xlogreader);
+					elog(LOG, "%s", buf.data);
+					pfree(buf.data);
+				}
+#endif
+
+				/* Handle interrupt signals of startup process */
+				HandleStartupProcInterrupts();
 
 				/*
-				 * Confirm the last checkpoint is available for us to recover
-				 * from if we fail. Note that we don't check for the secondary
-				 * checkpoint since that isn't available in most base backups.
+				 * Pause WAL replay, if requested by a hot-standby session via
+				 * SetRecoveryPause().
+				 *
+				 * Note that we intentionally don't take the info_lck spinlock
+				 * here.  We might therefore read a slightly stale value of
+				 * the recoveryPause flag, but it can't be very stale (no
+				 * worse than the last spinlock we did acquire).  Since a
+				 * pause request is a pretty asynchronous thing anyway,
+				 * possibly responding to it one WAL record later than we
+				 * otherwise would is a minor issue, so it doesn't seem worth
+				 * adding another spinlock cycle to prevent that.
 				 */
-				record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1, false);
-				if (record != NULL)
+				if (((volatile XLogCtlData *) XLogCtl)->recoveryPause)
+					recoveryPausesHere();
+
+				/*
+				 * Have we reached our recovery target?
+				 */
+				if (recoveryStopsBefore(xlogreader))
 				{
-					fast_promoted = true;
+					reachedStopPoint = true;	/* see below */
+					break;
+				}
 
+				/*
+				 * If we've been asked to lag the master, wait on latch until
+				 * enough time has passed.
+				 */
+				if (recoveryApplyDelay(xlogreader))
+				{
 					/*
-					 * Insert a special WAL record to mark the end of
-					 * recovery, since we aren't doing a checkpoint. That
-					 * means that the checkpointer process may likely be in
-					 * the middle of a time-smoothed restartpoint and could
-					 * continue to be for minutes after this. That sounds
-					 * strange, but the effect is roughly the same and it
-					 * would be stranger to try to come out of the
-					 * restartpoint and then checkpoint. We request a
-					 * checkpoint later anyway, just for safety.
+					 * We test for paused recovery again here. If user sets
+					 * delayed apply, it may be because they expect to pause
+					 * recovery in case of problems, so we must test again
+					 * here otherwise pausing during the delay-wait wouldn't
+					 * work.
 					 */
-					CreateEndOfRecoveryRecord();
+					if (((volatile XLogCtlData *) XLogCtl)->recoveryPause)
+						recoveryPausesHere();
 				}
-			}
 
-			if (!fast_promoted)
-				RequestCheckpoint(CHECKPOINT_END_OF_RECOVERY |
-								  CHECKPOINT_IMMEDIATE |
-								  CHECKPOINT_WAIT);
-		}
-		else
-			CreateCheckPoint(CHECKPOINT_END_OF_RECOVERY | CHECKPOINT_IMMEDIATE);
+				/* Setup error traceback support for ereport() */
+				errcallback.callback = rm_redo_error_callback;
+				errcallback.arg = (void *) xlogreader;
+				errcallback.previous = error_context_stack;
+				error_context_stack = &errcallback;
 
-		/*
-		 * And finally, execute the recovery_end_command, if any.
-		 */
-		if (recoveryEndCommand)
-			ExecuteRecoveryCommand(recoveryEndCommand,
-								   "recovery_end_command",
-								   true);
-	}
+				/*
+				 * ShmemVariableCache->nextXid must be beyond record's xid.
+				 *
+				 * We don't expect anyone else to modify nextXid, hence we
+				 * don't need to hold a lock while examining it.  We still
+				 * acquire the lock to modify it, though.
+				 */
+				if (TransactionIdFollowsOrEquals(record->xl_xid,
+												 ShmemVariableCache->nextXid))
+				{
+					LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
+					ShmemVariableCache->nextXid = record->xl_xid;
+					TransactionIdAdvance(ShmemVariableCache->nextXid);
+					LWLockRelease(XidGenLock);
+				}
 
-	if (ArchiveRecoveryRequested)
-	{
-		/*
-		 * We switched to a new timeline. Clean up segments on the old
-		 * timeline.
-		 *
-		 * If there are any higher-numbered segments on the old timeline,
-		 * remove them. They might contain valid WAL, but they might also be
-		 * pre-allocated files containing garbage. In any case, they are not
-		 * part of the new timeline's history so we don't need them.
-		 */
-		RemoveNonParentXlogFiles(EndOfLog, ThisTimeLineID);
+				/*
+				 * Before replaying this record, check if this record causes
+				 * the current timeline to change. The record is already
+				 * considered to be part of the new timeline, so we update
+				 * ThisTimeLineID before replaying it. That's important so
+				 * that replayEndTLI, which is recorded as the minimum
+				 * recovery point's TLI if recovery stops after this record,
+				 * is set correctly.
+				 */
+				if (record->xl_rmid == RM_XLOG_ID)
+				{
+					TimeLineID	newTLI = ThisTimeLineID;
+					TimeLineID	prevTLI = ThisTimeLineID;
+					uint8		info = record->xl_info & ~XLR_INFO_MASK;
 
-		/*
-		 * If the switch happened in the middle of a segment, what to do with
-		 * the last, partial segment on the old timeline? If we don't archive
-		 * it, and the server that created the WAL never archives it either
-		 * (e.g. because it was hit by a meteor), it will never make it to the
-		 * archive. That's OK from our point of view, because the new segment
-		 * that we created with the new TLI contains all the WAL from the old
-		 * timeline up to the switch point. But if you later try to do PITR to
-		 * the "missing" WAL on the old timeline, recovery won't find it in
-		 * the archive. It's physically present in the new file with new TLI,
-		 * but recovery won't look there when it's recovering to the older
-		 * timeline. On the other hand, if we archive the partial segment, and
-		 * the original server on that timeline is still running and archives
-		 * the completed version of the same segment later, it will fail. (We
-		 * used to do that in 9.4 and below, and it caused such problems).
-		 *
-		 * As a compromise, we rename the last segment with the .partial
-		 * suffix, and archive it. Archive recovery will never try to read
-		 * .partial segments, so they will normally go unused. But in the odd
-		 * PITR case, the administrator can copy them manually to the pg_xlog
-		 * directory (removing the suffix). They can be useful in debugging,
-		 * too.
-		 *
-		 * If a .done or .ready file already exists for the old timeline,
-		 * however, we had already determined that the segment is complete, so
-		 * we can let it be archived normally. (In particular, if it was
-		 * restored from the archive to begin with, it's expected to have a
-		 * .done file).
-		 */
-		if (EndOfLog % XLOG_SEG_SIZE != 0 && XLogArchivingActive())
-		{
-			char		origfname[MAXFNAMELEN];
-			XLogSegNo	endLogSegNo;
+					if (info == XLOG_CHECKPOINT_SHUTDOWN)
+					{
+						CheckPoint	checkPoint;
+
+						memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
+						newTLI = checkPoint.ThisTimeLineID;
+						prevTLI = checkPoint.PrevTimeLineID;
+					}
+					else if (info == XLOG_END_OF_RECOVERY)
+					{
+						xl_end_of_recovery xlrec;
 
-			XLByteToPrevSeg(EndOfLog, endLogSegNo);
-			XLogFileName(origfname, EndOfLogTLI, endLogSegNo);
+						memcpy(&xlrec, XLogRecGetData(xlogreader), sizeof(xl_end_of_recovery));
+						newTLI = xlrec.ThisTimeLineID;
+						prevTLI = xlrec.PrevTimeLineID;
+					}
 
-			if (!XLogArchiveIsReadyOrDone(origfname))
-			{
-				char		origpath[MAXPGPATH];
-				char		partialfname[MAXFNAMELEN];
-				char		partialpath[MAXPGPATH];
+					if (newTLI != ThisTimeLineID)
+					{
+						/* Check that it's OK to switch to this TLI */
+						checkTimeLineSwitch(EndRecPtr, newTLI, prevTLI);
 
-				XLogFilePath(origpath, EndOfLogTLI, endLogSegNo);
-				snprintf(partialfname, MAXFNAMELEN, "%s.partial", origfname);
-				snprintf(partialpath, MAXPGPATH, "%s.partial", origpath);
+						/* Following WAL records should be run with new TLI */
+						ThisTimeLineID = newTLI;
+						switchedTLI = true;
+					}
+				}
 
 				/*
-				 * Make sure there's no .done or .ready file for the .partial
-				 * file.
+				 * Update shared replayEndRecPtr before replaying this record,
+				 * so that XLogFlush will update minRecoveryPoint correctly.
 				 */
-				XLogArchiveCleanup(partialfname);
+				SpinLockAcquire(&XLogCtl->info_lck);
+				XLogCtl->replayEndRecPtr = EndRecPtr;
+				XLogCtl->replayEndTLI = ThisTimeLineID;
+				SpinLockRelease(&XLogCtl->info_lck);
 
-				durable_rename(origpath, partialpath, ERROR);
-				XLogArchiveNotify(partialfname);
-			}
-		}
-	}
+				/*
+				 * If we are attempting to enter Hot Standby mode, process
+				 * XIDs we see
+				 */
+				if (standbyState >= STANDBY_INITIALIZED &&
+					TransactionIdIsValid(record->xl_xid))
+					RecordKnownAssignedTransactionIds(record->xl_xid);
 
-	/*
-	 * Preallocate additional log files, if wanted.
-	 */
-	PreallocXlogFiles(EndOfLog);
+				/* Now apply the WAL record itself */
+				RmgrTable[record->xl_rmid].rm_redo(xlogreader);
 
-	/*
-	 * Okay, we're officially UP.
-	 */
-	InRecovery = false;
+				/* Pop the error context stack */
+				error_context_stack = errcallback.previous;
 
-	/* start the archive_timeout timer running */
-	XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
+				/*
+				 * Update lastReplayedEndRecPtr after this record has been
+				 * successfully replayed.
+				 */
+				SpinLockAcquire(&XLogCtl->info_lck);
+				XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
+				XLogCtl->lastReplayedTLI = ThisTimeLineID;
+				SpinLockRelease(&XLogCtl->info_lck);
 
-	/* also initialize latestCompletedXid, to nextXid - 1 */
-	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-	ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
-	TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
-	LWLockRelease(ProcArrayLock);
+				/*
+				 * If rm_redo called XLogRequestWalReceiverReply, then we wake
+				 * up the receiver so that it notices the updated
+				 * lastReplayedEndRecPtr and sends a reply to the master.
+				 */
+				if (doRequestWalReceiverReply)
+				{
+					doRequestWalReceiverReply = false;
+					WalRcvForceReply();
+				}
 
-	/*
-	 * Start up the commit log and subtrans, if not already done for hot
-	 * standby.  (commit timestamps are started below, if necessary.)
-	 */
-	if (standbyState == STANDBY_DISABLED)
-	{
-		StartupCLOG();
-		StartupSUBTRANS(oldestActiveXID);
-	}
+				/* Remember this record as the last-applied one */
+				LastRec = ReadRecPtr;
 
-	/*
-	 * Perform end of recovery actions for any SLRUs that need it.
-	 */
-	TrimCLOG();
-	TrimMultiXact();
+				/* Allow read-only connections if we're consistent now */
+				CheckRecoveryConsistency();
 
-	/* Reload shared-memory state for prepared transactions */
-	RecoverPreparedTransactions();
+				/* Is this a timeline switch? */
+				if (switchedTLI)
+				{
+					/*
+					 * Before we continue on the new timeline, clean up any
+					 * (possibly bogus) future WAL segments on the old
+					 * timeline.
+					 */
+					RemoveNonParentXlogFiles(EndRecPtr, ThisTimeLineID);
 
-	/*
-	 * Shutdown the recovery environment. This must occur after
-	 * RecoverPreparedTransactions(), see notes for lock_twophase_recover()
-	 */
-	if (standbyState != STANDBY_DISABLED)
-		ShutdownRecoveryTransactionEnvironment();
+					/*
+					 * Wake up any walsenders to notice that we are on a new
+					 * timeline.
+					 */
+					if (switchedTLI && AllowCascadeReplication())
+						WalSndWakeup();
+				}
 
-	/* Shut down xlogreader */
-	if (readFile >= 0)
-	{
-		close(readFile);
-		readFile = -1;
+				/* Exit loop if we reached inclusive recovery target */
+				if (recoveryStopsAfter(xlogreader))
+				{
+					reachedStopPoint = true;
+					break;
+				}
+
+				/* Else, try to fetch the next WAL record */
+				record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
+			} while (record != NULL);
+
+			/*
+			 * end of main redo apply loop
+			 */
+		}
+		else
+		{
+			/* there are no WAL records following the checkpoint */
+			ereport(LOG,
+					(errmsg("redo is not required")));
+		}
 	}
-	XLogReaderFree(xlogreader);
 
-	/*
-	 * If any of the critical GUCs have changed, log them before we allow
-	 * backends to write WAL.
-	 */
-	LocalSetXLogInsertAllowed();
-	XLogReportParameters();
+	return reachedStopPoint;
+}
 
-	/*
-	 * Local WAL inserts enabled, so it's time to finish initialization of
-	 * commit timestamp.
-	 */
-	CompleteCommitTsInitialization();
+static void
+FinishRedo(bool reachedStopPoint)
+{
+	TimestampTz xtime;
+	uint8		rmid;
 
-	/*
-	 * All done with end-of-recovery actions.
-	 *
-	 * Now allow backends to write WAL and update the control file status in
-	 * consequence.  The boolean flag allowing backends to write WAL is
-	 * updated while holding ControlFileLock to prevent other backends to look
-	 * at an inconsistent state of the control file in shared memory.  There
-	 * is still a small window during which backends can write WAL and the
-	 * control file is still referring to a system not in DB_IN_PRODUCTION
-	 * state while looking at the on-disk control file.
-	 *
-	 * Also, although the boolean flag to allow WAL is probably atomic in
-	 * itself, we use the info_lck here to ensure that there are no race
-	 * conditions concerning visibility of other recent updates to shared
-	 * memory.
-	 */
-	LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
-	ControlFile->state = DB_IN_PRODUCTION;
-	ControlFile->time = (pg_time_t) time(NULL);
+	if (InRecovery)
+	{
+		if (InRedo)
+		{
+			if (reachedStopPoint)
+			{
+				if (!reachedConsistency)
+					ereport(FATAL,
+							(errmsg("requested recovery stop point is before consistent recovery point")));
 
-	SpinLockAcquire(&XLogCtl->info_lck);
-	XLogCtl->SharedRecoveryInProgress = false;
-	SpinLockRelease(&XLogCtl->info_lck);
+				/*
+				 * This is the last point where we can restart recovery with a
+				 * new recovery target, if we shutdown and begin again. After
+				 * this, Resource Managers may choose to do permanent
+				 * corrective actions at end of recovery.
+				 */
+				switch (recoveryTargetAction)
+				{
+					case RECOVERY_TARGET_ACTION_SHUTDOWN:
 
-	UpdateControlFile();
-	LWLockRelease(ControlFileLock);
+						/*
+						 * exit with special return code to request shutdown
+						 * of postmaster.  Log messages issued from
+						 * postmaster.
+						 */
+						proc_exit(3);
 
-	/*
-	 * If there were cascading standby servers connected to us, nudge any wal
-	 * sender processes to notice that we've been promoted.
-	 */
-	WalSndWakeup();
+					case RECOVERY_TARGET_ACTION_PAUSE:
+						SetRecoveryPause(true);
+						recoveryPausesHere();
 
-	/*
-	 * If this was a fast promotion, request an (online) checkpoint now. This
-	 * isn't required for consistency, but the last restartpoint might be far
-	 * back, and in case of a crash, recovering from it might take a longer
-	 * than is appropriate now that we're not in standby mode anymore.
-	 */
-	if (fast_promoted)
-		RequestCheckpoint(CHECKPOINT_FORCE);
+						/* drop into promote */
+
+					case RECOVERY_TARGET_ACTION_PROMOTE:
+						break;
+				}
+			}
+
+			/* Allow resource managers to do any required cleanup. */
+			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
+			{
+				if (RmgrTable[rmid].rm_cleanup != NULL)
+					RmgrTable[rmid].rm_cleanup();
+			}
+
+			ereport(LOG,
+					(errmsg("redo done at %X/%X",
+						 (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
+			xtime = GetLatestXTime();
+			if (xtime)
+				ereport(LOG,
+					 (errmsg("last completed transaction was at log time %s",
+							 timestamptz_to_str(xtime))));
+
+			InRedo = false;
+		}
+	}
 }
 
 /*
#2Heikki Linnakangas
hlinnaka@iki.fi
In reply to: Thomas Munro (#1)
Re: Refactor StartupXLOG?

On 09/24/2016 05:01 AM, Thomas Munro wrote:

What would the appetite be for that kind of refactoring work,
considering the increased burden on committers who have to backpatch
bug fixes? Is it a project goal to reduce the size of large
complicated functions like StartupXLOG and heap_update? It seems like
a good way for new players to learn how they work.

+1. Yes, it does increase the burden of backpatching, but I think it'd
still be very much worth it.

A couple of little details that caught my eye at a quick read:

/* Try to find a backup label. */
if (read_backup_label(&checkPointLoc, &backupEndRequired,
&backupFromStandby))
{
wasShutdown = ProcessBackupLabel(xlogreader, &checkPoint, checkPointLoc,
&haveTblspcMap);

/* set flag to delete it later */
haveBackupLabel = true;
}
else
{
/* Clean up any orphaned tablespace map files with no backup label. */
CleanUpTablespaceMap();
...

This is a bit asymmetric: In the true-branch, ProcessBackupLabel() reads
the tablespace map, and sets InArchiveRecovery and StandbyMode, but in
the false-branch, StartupXLog() calls CleanupTablespaceMap() and sets
those variables directly.

For functions like BeginRedo, BeginHotStandby, ReplayRedo, etc., I think
it'd be better to have the "if (InRecovery)" checks in the caller,
rather than in the functions.

- Heikki

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Thomas Munro
thomas.munro@enterprisedb.com
In reply to: Heikki Linnakangas (#2)
Re: Refactor StartupXLOG?

On Sat, Sep 24, 2016 at 8:24 PM, Heikki Linnakangas <hlinnaka@iki.fi> wrote:

On 09/24/2016 05:01 AM, Thomas Munro wrote:

What would the appetite be for that kind of refactoring work,
considering the increased burden on committers who have to backpatch
bug fixes? Is it a project goal to reduce the size of large
complicated functions like StartupXLOG and heap_update? It seems like
a good way for new players to learn how they work.

+1. Yes, it does increase the burden of backpatching, but I think it'd still
be very much worth it.

Cool.

A couple of little details that caught my eye at a quick read:

/* Try to find a backup label. */
if (read_backup_label(&checkPointLoc, &backupEndRequired,
&backupFromStandby))
{
wasShutdown = ProcessBackupLabel(xlogreader, &checkPoint,
checkPointLoc,

&haveTblspcMap);

/* set flag to delete it later */
haveBackupLabel = true;
}
else
{
/* Clean up any orphaned tablespace map files with no
backup label. */
CleanUpTablespaceMap();
...

This is a bit asymmetric: In the true-branch, ProcessBackupLabel() reads the
tablespace map, and sets InArchiveRecovery and StandbyMode, but in the
false-branch, StartupXLog() calls CleanupTablespaceMap() and sets those
variables directly.

Right. I need to move all or some of the other branch out to its own
function too.

For functions like BeginRedo, BeginHotStandby, ReplayRedo, etc., I think
it'd be better to have the "if (InRecovery)" checks in the caller, rather
than in the functions.

Yeah. I was thinking that someone might value the preservation of
indention level, since that might make small localised bug fixes
easier to backport to the monolithic StartupXLOG. Plain old
otherwise-redundant curly braces would achieve that. Or maybe it's
better not to worry about preserving that.

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Michael Paquier
michael.paquier@gmail.com
In reply to: Thomas Munro (#1)
Re: Refactor StartupXLOG?

On Sat, Sep 24, 2016 at 11:01 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

What would the appetite be for that kind of refactoring work,
considering the increased burden on committers who have to backpatch
bug fixes? Is it a project goal to reduce the size of large
complicated functions like StartupXLOG and heap_update? It seems like
a good way for new players to learn how they work.

A lot of appetite. The size of xlog.c is out of control, so something
that would be really cool to see is spliiting the whole logic of
xlog.c into more independent files, for example low-level file only
operations could go into xlogfile.c, backup code paths in
xlogbackup.c, etc. This would make necessary to expose some of the
shared-memory structures now at the top of xlog.c like XLogCtl but I
think that would be really worth it at the end, and closer to the
things like xloginsert.c and xlogarchive.c that began such a move.
--
Michael

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers