From 37bd2e654345af65749ccff6ca73d3afebf67072 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Thu, 11 Feb 2016 10:44:14 +0800
Subject: [PATCH 1/2] Allow logical slots to follow timeline switches

Make logical replication slots timeline-aware, so replay can
continue from a historical timeline onto the server's current
timeline.

This is required to make failover slots possible and may also
be used by extensions that CreateReplicationSlot on a standby
and replay from that slot once the replica is promoted.

This does NOT add support for replaying from a logical slot on
a standby or for syncing slots to replicas.
---
 src/backend/access/transam/xlogreader.c        |  43 ++++-
 src/backend/access/transam/xlogutils.c         | 240 +++++++++++++++++++++++--
 src/backend/replication/logical/logicalfuncs.c |  38 +++-
 src/include/access/xlogreader.h                |  35 +++-
 src/include/access/xlogutils.h                 |   2 +
 5 files changed, 323 insertions(+), 35 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index fcb0872..5899f44 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -10,6 +10,9 @@
  *
  * NOTES
  *		See xlogreader.h for more notes on this facility.
+ *
+ * 		The xlogreader is compiled as both front-end and backend code so
+ * 		it may not use elog, server-defined static variables, etc.
  *-------------------------------------------------------------------------
  */
 
@@ -116,6 +119,9 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
 		return NULL;
 	}
 
+	/* Will be loaded on first read */
+	state->timelineHistory = NULL;
+
 	return state;
 }
 
@@ -135,6 +141,13 @@ XLogReaderFree(XLogReaderState *state)
 	pfree(state->errormsg_buf);
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
+#ifdef FRONTEND
+	/* FE code doesn't use this and we can't list_free_deep on FE */
+	Assert(state->timelineHistory == NULL);
+#else
+	if (state->timelineHistory)
+		list_free_deep(state->timelineHistory);
+#endif
 	pfree(state->readBuf);
 	pfree(state);
 }
@@ -208,9 +221,11 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 
 	if (RecPtr == InvalidXLogRecPtr)
 	{
+		/* No explicit start point, read the record after the one we just read */
 		RecPtr = state->EndRecPtr;
 
 		if (state->ReadRecPtr == InvalidXLogRecPtr)
+			/* allow readPageTLI to go backward */
 			randAccess = true;
 
 		/*
@@ -223,6 +238,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	else
 	{
 		/*
+		 * Caller supplied a position to start at.
+		 *
 		 * In this case, the passed-in record pointer should already be
 		 * pointing to a valid record starting position.
 		 */
@@ -309,8 +326,9 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 		/* XXX: more validation should be done here */
 		if (total_len < SizeOfXLogRecord)
 		{
-			report_invalid_record(state, "invalid record length at %X/%X",
-								  (uint32) (RecPtr >> 32), (uint32) RecPtr);
+			report_invalid_record(state, "invalid record length at %X/%X: wanted %lu, got %u",
+								  (uint32) (RecPtr >> 32), (uint32) RecPtr,
+								  SizeOfXLogRecord, total_len);
 			goto err;
 		}
 		gotheader = false;
@@ -466,9 +484,7 @@ err:
 	 * Invalidate the xlog page we've cached. We might read from a different
 	 * source after failure.
 	 */
-	state->readSegNo = 0;
-	state->readOff = 0;
-	state->readLen = 0;
+	XLogReaderInvalCache(state);
 
 	if (state->errormsg_buf[0] != '\0')
 		*errormsg = state->errormsg_buf;
@@ -599,9 +615,9 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 {
 	if (record->xl_tot_len < SizeOfXLogRecord)
 	{
-		report_invalid_record(state,
-							  "invalid record length at %X/%X",
-							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
+		report_invalid_record(state, "invalid record length at %X/%X: wanted %lu, got %u",
+							  (uint32) (RecPtr >> 32), (uint32) RecPtr,
+							  SizeOfXLogRecord, record->xl_tot_len);
 		return false;
 	}
 	if (record->xl_rmid > RM_MAX_ID)
@@ -1337,3 +1353,14 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 
 	return true;
 }
+
+/*
+ * Invalidate the xlog reader's cached page to force a re-read
+ */
+void
+XLogReaderInvalCache(XLogReaderState *state)
+{
+	state->readSegNo = 0;
+	state->readOff = 0;
+	state->readLen = 0;
+}
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 444e218..21f2030 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -7,6 +7,9 @@
  * This file contains support routines that are used by XLOG replay functions.
  * None of this code is used during normal system operation.
  *
+ * Unlike xlogreader.c this is only compiled for the backend so it may use
+ * elog, etc.
+ *
  *
  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -21,6 +24,7 @@
 
 #include "miscadmin.h"
 
+#include "access/timeline.h"
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
@@ -651,6 +655,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 	static int	sendFile = -1;
 	static XLogSegNo sendSegNo = 0;
 	static uint32 sendOff = 0;
+	/* So we notice if asked for the same seg on a new tli: */
+	static TimeLineID lastTLI = 0;
 
 	p = buf;
 	recptr = startptr;
@@ -664,11 +670,11 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 
 		startoff = recptr % XLogSegSize;
 
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+		/* Do we need to switch to a new xlog segment? */
+		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) || lastTLI != tli)
 		{
 			char		path[MAXPGPATH];
 
-			/* Switch to another logfile segment */
 			if (sendFile >= 0)
 				close(sendFile);
 
@@ -692,6 +698,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 									path)));
 			}
 			sendOff = 0;
+			lastTLI = tli;
 		}
 
 		/* Need to seek in the file? */
@@ -759,28 +766,62 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	int			count;
 
 	loc = targetPagePtr + reqLen;
+
+	/* Make sure enough xlog is available... */
 	while (1)
 	{
 		/*
-		 * TODO: we're going to have to do something more intelligent about
-		 * timelines on standbys. Use readTimeLineHistory() and
-		 * tliOfPointInHistory() to get the proper LSN? For now we'll catch
-		 * that case earlier, but the code and TODO is left in here for when
-		 * that changes.
+		 * Check which timeline to get the record from.
+		 *
+		 * We have to do it after each loop because if we're in
+		 * recovery as a cascading standby the current timeline
+		 * might've become historical.
 		 */
-		if (!RecoveryInProgress())
+		XLogReadDetermineTimeline(state);
+
+		if (state->currTLI == ThisTimeLineID)
 		{
-			*pageTLI = ThisTimeLineID;
-			flushptr = GetFlushRecPtr();
+			/*
+			 * We're reading from the current timeline so we might
+			 * have to wait for the desired record to be generated
+			 * (or, for a standby, received & replayed)
+			 */
+			if (!RecoveryInProgress())
+			{
+				*pageTLI = ThisTimeLineID;
+				flushptr = GetFlushRecPtr();
+			}
+			else
+				flushptr = GetXLogReplayRecPtr(pageTLI);
+
+			if (loc <= flushptr)
+				break;
+
+			CHECK_FOR_INTERRUPTS();
+			pg_usleep(1000L);
 		}
 		else
-			flushptr = GetXLogReplayRecPtr(pageTLI);
-
-		if (loc <= flushptr)
+		{
+			/*
+			 * We're on a historical timeline, limit reading to the
+			 * switch point where we moved to the next timeline.
+			 */
+			flushptr = state->currTLIValidUntil;
+
+			/*
+			 * FIXME: Setting pageTLI to the TLI the *record* we
+			 * want is on can be slightly wrong; the page might
+			 * begin on an older timeline if it contains a timeline
+			 * switch, since its xlog segment will've been copied
+			 * from the prior timeline. We should really read the
+			 * page header. It's pretty harmless though as nothing
+			 * cares so long as the timeline doesn't go backwards.
+			 */
+			*pageTLI = state->currTLI;
+
+			/* No need to wait on a historical timeline */
 			break;
-
-		CHECK_FOR_INTERRUPTS();
-		pg_usleep(1000L);
+		}
 	}
 
 	/* more than one block available */
@@ -793,7 +834,172 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	else
 		count = flushptr - targetPagePtr;
 
-	XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
+	XLogRead(cur_page, *pageTLI, targetPagePtr, count);
 
 	return count;
 }
+
+/*
+ * Figure out what timeline to look on for the record the xlogreader
+ * is being asked asked to read, in currRecPtr. This may be used
+ * to determine which xlog segment file to open, etc.
+ *
+ * It depends on:
+ *
+ * - Whether we're reading a record immediately following one we read
+ *   before or doing a random read. We can only use the cached
+ *   timeline info if we're reading sequentially.
+ *
+ * - Whether the timeline of the prior record read was historical or
+ *   the current timeline and, if historical, on where it's valid up
+ *   to. On a historical timeline we need to avoid reading past the
+ *   timeline switch point. The records after it are probably invalid,
+ *   but worse, they might be valid but *different*.
+ *
+ * - If the current timeline became historical since the last record
+ *   we read. We need to make sure we don't read past the switch
+ *   point.
+ *
+ * None of this has any effect unless callbacks use currTLI to
+ * determine which timeline to read from and optionally use the
+ * validity limit to avoid reading past the valid end of a page.
+ *
+ * We need to switch to an xlog segment from the new timeline
+ * eagerly when on a historical timeline, as soon as we reach the
+ * start of the xlog segment containing the timeline switch.  The
+ * server copied the segment to the new timeline so all the data up
+ * to the switch point is the same but there's no guarantee the old
+ * segment will still exist. It may have been deleted or renamed
+ * with a .partial suffix so we can't necessarily keep reading from
+ * the old TLI even though tliSwitchPoint says it's OK.
+ *
+ * An xlog segment may contain data from an older timeline
+ * if it was copied during a timeline switch. Callers may NOT assume
+ * that currTLI is the timeline that will be in a given page's
+ * xlp_tli; the page may begin on older timeline or we might be
+ * reading from historical timeline data on a segment that's
+ * been copied to a new timeline.
+ */
+void
+XLogReadDetermineTimeline(XLogReaderState *state)
+{
+	if (state->timelineHistory == NULL)
+		state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+	if (state->currRecPtr != state->EndRecPtr)
+	{
+		/*
+		 * Not reading the immediately following record so
+		 * invalidate cached timeline info.
+		 */
+		state->currTLI = 0;
+		state->currTLIValidUntil = InvalidXLogRecPtr;
+	}
+
+	if (state->currTLIValidUntil == InvalidXLogRecPtr &&
+		state->currTLI != ThisTimeLineID &&
+		state->currTLI != 0)
+	{
+		/*
+		 * We were reading what was the current timeline but it became
+		 * historical. Either we were replaying as a replica and got
+		 * promoted or we're replaying as a cascading replica from a
+		 * parent that got promoted.
+		 *
+		 * Force a re-read of the timeline history.
+		 */
+		list_free_deep(state->timelineHistory);
+		state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+		elog(DEBUG2, "timeline %u became historical during decoding",
+				state->currTLI);
+
+		/* then invalidate the timeline info so we read again */
+		state->currTLI = 0;
+		state->currTLIValidUntil = InvalidXLogRecPtr;
+	}
+
+	if (state->currRecPtr == state->EndRecPtr &&
+		state->currTLI != 0 &&
+		state->currTLIValidUntil != InvalidXLogRecPtr &&
+		state->currRecPtr >= state->currTLIValidUntil)
+	{
+		/*
+		 * We're reading the immedately following record but we're
+		 * at a timeline boundary (or on a segment containing one)
+		 * and must read the next record from the new TLI.
+		 */
+		elog(DEBUG2, "Requested record %X/%X is on segment containing end of TLI %u "
+				"valid until %X/%X, switching to next timeline",
+				(uint32)(state->currRecPtr >> 32),
+				(uint32)state->currRecPtr,
+				state->currTLI,
+				(uint32)(state->currTLIValidUntil >> 32),
+				(uint32)(state->currTLIValidUntil));
+
+		/* Invalidate TLI info so we look up the next TLI */
+		state->currTLI = 0;
+		state->currTLIValidUntil = InvalidXLogRecPtr;
+	}
+
+	if (state->currTLI == 0)
+	{
+		/*
+		 * Something changed. We're not reading the record immediately
+		 * after the one we just read, the previous record was at
+		 * timeline boundary or we didn't yet determine the timeline
+		 * to read from.
+		 *
+		 * Work out what timeline this record is on. We might read
+		 * it from the segment on this TLI or, if the segment
+		 * contains newer timelines, the copy from a newer TLI.
+		 */
+		state->currTLI = tliOfPointInHistory(state->currRecPtr,
+				state->timelineHistory);
+
+		/*
+		 * Look for the most recent timeline that's on the same xlog
+		 * segment as this record, since that's the only one we can
+		 * assume is still readable.
+		 */
+		while (state->currTLI != ThisTimeLineID &&
+			   state->currTLIValidUntil == InvalidXLogRecPtr)
+		{
+			XLogRecPtr	tliSwitch;
+			TimeLineID	nextTLI;
+
+			tliSwitch = tliSwitchPoint(state->currTLI, state->timelineHistory,
+					&nextTLI);
+
+			state->currTLIValidUntil = ((tliSwitch / XLogSegSize) * XLogSegSize);
+
+			if (state->currRecPtr >= state->currTLIValidUntil)
+			{
+				/*
+				 * The new currTLI ends on this WAL segment so
+				 * check the next TLI to see if it's the last
+				 * one on the segment.
+				 *
+				 * If that's the current TLI we'll stop
+				 * searching.
+				 */
+				state->currTLI = nextTLI;
+				state->currTLIValidUntil = InvalidXLogRecPtr;
+			}
+		}
+
+		/*
+		 * We're now either reading from the first xlog seg in the
+		 * current server's timeline or the most recent historical
+		 * timeline that exists on the target segment.
+		 */
+		elog(DEBUG2, "XLog read ptr %X/%X is on seg with tli %u valid until %X/%X, server current tli is %u",
+				(uint32)(state->currRecPtr >> 32),
+				(uint32)state->currRecPtr,
+				state->currTLI,
+				(uint32)(state->currTLIValidUntil >> 32),
+				(uint32)(state->currTLIValidUntil),
+				ThisTimeLineID);
+	}
+}
+
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index f789fc1..f29fca3 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -231,12 +231,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	rsinfo->setResult = p->tupstore;
 	rsinfo->setDesc = p->tupdesc;
 
-	/* compute the current end-of-wal */
-	if (!RecoveryInProgress())
-		end_of_wal = GetFlushRecPtr();
-	else
-		end_of_wal = GetXLogReplayRecPtr(NULL);
-
 	ReplicationSlotAcquire(NameStr(*name));
 
 	PG_TRY();
@@ -263,6 +257,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 
 		ctx->output_writer_private = p;
 
+		/*
+		 * We start reading xlog from the restart lsn, even though in
+		 * CreateDecodingContext we set the snapshot builder up using the
+		 * slot's candidate_restart_lsn. This means we might read xlog we don't
+		 * actually decode rows from, but the snapshot builder might need it to
+		 * get to a consistent point. The point we start returning data to
+		 * *users* at is the candidate restart lsn from the decoding context.
+		 */
 		startptr = MyReplicationSlot->data.restart_lsn;
 
 		CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, "logical decoding");
@@ -270,8 +272,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 		/* invalidate non-timetravel entries */
 		InvalidateSystemCaches();
 
+		if (!RecoveryInProgress())
+			end_of_wal = GetFlushRecPtr();
+		else
+			end_of_wal = GetXLogReplayRecPtr(NULL);
+
+		/* Decode until we run out of records */
 		while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
-			 (ctx->reader->EndRecPtr && ctx->reader->EndRecPtr < end_of_wal))
+			 (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
 		{
 			XLogRecord *record;
 			char	   *errm = NULL;
@@ -280,6 +288,10 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 			if (errm)
 				elog(ERROR, "%s", errm);
 
+			/*
+			 * Now that we've set up the xlog reader state subsequent calls
+			 * pass InvalidXLogRecPtr to say "continue from last record"
+			 */
 			startptr = InvalidXLogRecPtr;
 
 			/*
@@ -299,6 +311,18 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 			CHECK_FOR_INTERRUPTS();
 		}
 
+		/* Make sure timeline lookups use the start of the next record */
+		startptr = ctx->reader->EndRecPtr;
+
+		/*
+		 * The XLogReader will read a page past the valid end of WAL
+		 * because it doesn't know about timelines. When we switch
+		 * timelines and ask it for the first page on the new timeline it
+		 * will think it has it cached, but it'll have the old partial
+		 * page and say it can't find the next record. So flush the cache.
+		 */
+		XLogReaderInvalCache(ctx->reader);
+
 		tuplestore_donestoring(tupstore);
 
 		CurrentResourceOwner = old_resowner;
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 7553cc4..20e4bca 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -20,12 +20,16 @@
  *		with the XLogRec* macros and functions. You can also decode a
  *		record that's already constructed in memory, without reading from
  *		disk, by calling the DecodeXLogRecord() function.
+ *
+ * 		The xlogreader is compiled as both front-end and backend code so
+ * 		it may not use elog, server-defined static variables, etc.
  *-------------------------------------------------------------------------
  */
 #ifndef XLOGREADER_H
 #define XLOGREADER_H
 
 #include "access/xlogrecord.h"
+#include "nodes/pg_list.h"
 
 typedef struct XLogReaderState XLogReaderState;
 
@@ -139,26 +143,48 @@ struct XLogReaderState
 	 * ----------------------------------------
 	 */
 
-	/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
+	/*
+	 * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to
+	 * at least readLen bytes)
+	 */
 	char	   *readBuf;
 
-	/* last read segment, segment offset, read length, TLI */
+	/*
+	 * last read segment, segment offset, read length, TLI for
+	 * data currently in readBuf.
+	 */
 	XLogSegNo	readSegNo;
 	uint32		readOff;
 	uint32		readLen;
 	TimeLineID	readPageTLI;
 
-	/* beginning of last page read, and its TLI  */
+	/*
+	 * beginning of prior page read, and its TLI. Doesn't
+	 * necessarily correspond to what's in readBuf, used for
+	 * timeline sanity checks.
+	 */
 	XLogRecPtr	latestPagePtr;
 	TimeLineID	latestPageTLI;
 
 	/* beginning of the WAL record being read. */
 	XLogRecPtr	currRecPtr;
+	/* timeline to read it from, 0 if a lookup is required */
+	TimeLineID  currTLI;
+	/*
+	 * Pointer to the end of the last whole segment on the timeline in currTLI
+	 * if it's historical or InvalidXLogRecPtr if currTLI is the current
+	 * timeline. This is *not* the tliSwitchPoint but it's guaranteed safe
+	 * to read up to this point from currTLI.
+	 */
+	XLogRecPtr	currTLIValidUntil;
 
 	/* Buffer for current ReadRecord result (expandable) */
 	char	   *readRecordBuf;
 	uint32		readRecordBufSize;
 
+	/* cached timeline history */
+	List	   *timelineHistory;
+
 	/* Buffer to hold error message */
 	char	   *errormsg_buf;
 };
@@ -174,6 +200,9 @@ extern void XLogReaderFree(XLogReaderState *state);
 extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
 			   XLogRecPtr recptr, char **errormsg);
 
+/* Flush any cached page */
+extern void XLogReaderInvalCache(XLogReaderState *state);
+
 #ifdef FRONTEND
 extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
 #endif   /* FRONTEND */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 1b9abce..86df8cf 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -50,4 +50,6 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
 extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI);
 
+extern void XLogReadDetermineTimeline(XLogReaderState *state);
+
 #endif
-- 
2.1.0

