diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index fcb0872..7b60b8f 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -10,9 +10,11 @@
  *
  * NOTES
  *		See xlogreader.h for more notes on this facility.
+ *
+ *		This file is compiled as both front-end and backend code, so it
+ *		may not use ereport, server-defined static variables, etc.
  *-------------------------------------------------------------------------
  */
-
 #include "postgres.h"
 
 #include "access/transam.h"
@@ -116,6 +118,11 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
 		return NULL;
 	}
 
+#ifndef FRONTEND
+	/* Will be loaded on first read */
+	state->timelineHistory = NIL;
+#endif
+
 	return state;
 }
 
@@ -135,6 +142,10 @@ XLogReaderFree(XLogReaderState *state)
 	pfree(state->errormsg_buf);
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
+#ifndef FRONTEND
+	if (state->timelineHistory)
+		list_free_deep(state->timelineHistory);
+#endif
 	pfree(state->readBuf);
 	pfree(state);
 }
@@ -208,10 +219,11 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 
 	if (RecPtr == InvalidXLogRecPtr)
 	{
+		/* No explicit start point; read the record after the one we just read */
 		RecPtr = state->EndRecPtr;
 
 		if (state->ReadRecPtr == InvalidXLogRecPtr)
-			randAccess = true;
+			randAccess = true;	/* allow readPageTLI to go backwards */
 
 		/*
 		 * RecPtr is pointing to end+1 of the previous WAL record.  If we're
@@ -223,6 +235,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	else
 	{
 		/*
+		 * Caller supplied a position to start at.
+		 *
 		 * In this case, the passed-in record pointer should already be
 		 * pointing to a valid record starting position.
 		 */
@@ -309,8 +323,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 		/* XXX: more validation should be done here */
 		if (total_len < SizeOfXLogRecord)
 		{
-			report_invalid_record(state, "invalid record length at %X/%X",
-								  (uint32) (RecPtr >> 32), (uint32) RecPtr);
+			report_invalid_record(state,
+						"invalid record length at %X/%X: wanted %lu, got %u",
+								  (uint32) (RecPtr >> 32), (uint32) RecPtr,
+								  SizeOfXLogRecord, total_len);
 			goto err;
 		}
 		gotheader = false;
@@ -466,9 +482,7 @@ err:
 	 * Invalidate the xlog page we've cached. We might read from a different
 	 * source after failure.
 	 */
-	state->readSegNo = 0;
-	state->readOff = 0;
-	state->readLen = 0;
+	XLogReaderInvalCache(state);
 
 	if (state->errormsg_buf[0] != '\0')
 		*errormsg = state->errormsg_buf;
@@ -580,10 +594,19 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	return readLen;
 
 err:
+	XLogReaderInvalCache(state);
+	return -1;
+}
+
+/*
+ * Invalidate the xlogreader's cached page to force a re-read.
+ */
+void
+XLogReaderInvalCache(XLogReaderState *state)
+{
 	state->readSegNo = 0;
 	state->readOff = 0;
 	state->readLen = 0;
-	return -1;
 }
 
 /*
@@ -600,8 +623,9 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 	if (record->xl_tot_len < SizeOfXLogRecord)
 	{
 		report_invalid_record(state,
-							  "invalid record length at %X/%X",
-							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
+						"invalid record length at %X/%X: wanted %lu, got %u",
+							  (uint32) (RecPtr >> 32), (uint32) RecPtr,
+							  SizeOfXLogRecord, record->xl_tot_len);
 		return false;
 	}
 	if (record->xl_rmid > RM_MAX_ID)
@@ -907,11 +931,9 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 err:
 out:
 	/* Reset state to what we had before finding the record */
-	state->readSegNo = 0;
-	state->readOff = 0;
-	state->readLen = 0;
 	state->ReadRecPtr = saved_state.ReadRecPtr;
 	state->EndRecPtr = saved_state.EndRecPtr;
+	XLogReaderInvalCache(state);
 
 	return found;
 }
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 444e218..67318d4 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -19,12 +19,12 @@
 
 #include <unistd.h>
 
-#include "miscadmin.h"
-
+#include "access/timeline.h"
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
+#include "miscadmin.h"
 #include "storage/smgr.h"
 #include "utils/guc.h"
 #include "utils/hsearch.h"
@@ -638,8 +638,17 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
 }
 
 /*
- * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
- * we currently don't have the infrastructure (elog!) to share it.
+ * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
+ * in timeline 'tli'.
+ *
+ * Will open, and keep open, one WAL segment stored in the static file
+ * descriptor 'sendFile'. This means if XLogRead is used once, there will
+ * always be one descriptor left open until the process ends, but never
+ * more than one.
+ *
+ * XXX This is very similar to pg_xlogdump's XLogDumpXLogRead and to XLogRead
+ * in walsender.c but for small differences (such as lack of elog() in
+ * frontend).  Probably these should be merged at some point.
  */
 static void
 XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
@@ -648,8 +657,12 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 	XLogRecPtr	recptr;
 	Size		nbytes;
 
+	/*
+	 * Cached state across calls.
+	 */
 	static int	sendFile = -1;
 	static XLogSegNo sendSegNo = 0;
+	static TimeLineID sendTLI = 0;
 	static uint32 sendOff = 0;
 
 	p = buf;
@@ -664,11 +677,12 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 
 		startoff = recptr % XLogSegSize;
 
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+		/* Do we need to open a new xlog segment? */
+		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
+			sendTLI != tli)
 		{
 			char		path[MAXPGPATH];
 
-			/* Switch to another logfile segment */
 			if (sendFile >= 0)
 				close(sendFile);
 
@@ -692,22 +706,17 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 									path)));
 			}
 			sendOff = 0;
+			sendTLI = tli;
 		}
 
 		/* Need to seek in the file? */
 		if (sendOff != startoff)
 		{
 			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-			{
-				char		path[MAXPGPATH];
-
-				XLogFilePath(path, tli, sendSegNo);
-
 				ereport(ERROR,
 						(errcode_for_file_access(),
 				  errmsg("could not seek in log segment %s to offset %u: %m",
-						 path, startoff)));
-			}
+						 XLogFileNameP(tli, sendSegNo), startoff)));
 			sendOff = startoff;
 		}
 
@@ -719,16 +728,11 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 
 		readbytes = read(sendFile, p, segbytes);
 		if (readbytes <= 0)
-		{
-			char		path[MAXPGPATH];
-
-			XLogFilePath(path, tli, sendSegNo);
-
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not read from log segment %s, offset %u, length %lu: %m",
-							path, sendOff, (unsigned long) segbytes)));
-		}
+							XLogFileNameP(tli, sendSegNo), sendOff,
+							(unsigned long) segbytes)));
 
 		/* Update state for read */
 		recptr += readbytes;
@@ -740,12 +744,151 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 }
 
 /*
- * read_page callback for reading local xlog files
+ * Determine XLogReaderState->currTLI and ->currTLIValidUntil;
+ * XLogReadState->EndRecPtr, ->currRecPtr and ThisTimeLineID affect the
+ * decision.  This may later be used to determine which xlog segment file to
+ * open, etc.
+ *
+ * We switch to an xlog segment from the new timeline eagerly when on a
+ * historical timeline, as soon as we reach the start of the xlog segment
+ * containing the timeline switch.  The server copied the segment to the new
+ * timeline so all the data up to the switch point is the same, but there's no
+ * guarantee the old segment will still exist. It may have been deleted or
+ * renamed with a .partial suffix so we can't necessarily keep reading from
+ * the old TLI even though tliSwitchPoint says it's OK.
+ *
+ * Because of this, callers MAY NOT assume that currTLI is the timeline that
+ * will be in a page's xlp_tli; the page may begin on an older timeline or we
+ * might be reading from historical timeline data on a segment that's been
+ * copied to a new timeline.
+ */
+static void
+XLogReadDetermineTimeline(XLogReaderState *state)
+{
+	/* Read the history on first time through */
+	if (state->timelineHistory == NIL)
+		state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+	/*
+	 * Are we reading the record immediately following the one we read last
+	 * time?  If not, then don't use the cached timeline info.
+	 */
+	if (state->currRecPtr != state->EndRecPtr)
+	{
+		state->currTLI = 0;
+		state->currTLIValidUntil = InvalidXLogRecPtr;
+	}
+
+	/*
+	 * Are we reading a timeline that used to be the latest one, but became
+	 * historical?	This can happen in a replica that gets promoted, and in a
+	 * cascading replica whose upstream gets promoted.  In either case,
+	 * re-read the timeline history data.  We cannot read past the timeline
+	 * switch point, because either the records in the old timeline might be
+	 * invalid, or worse, they may valid but *different* from the ones we
+	 * should be reading.
+	 */
+	if (state->currTLIValidUntil == InvalidXLogRecPtr &&
+		state->currTLI != ThisTimeLineID &&
+		state->currTLI != 0)
+	{
+		/* re-read timeline history */
+		list_free_deep(state->timelineHistory);
+		state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+		elog(DEBUG2, "timeline %u became historical during decoding",
+			 state->currTLI);
+
+		/* then invalidate the cached timeline info */
+		state->currTLI = 0;
+		state->currTLIValidUntil = InvalidXLogRecPtr;
+	}
+
+	/*
+	 * Are we reading a record immediately following a timeline switch?  If
+	 * so, we must follow the switch too.
+	 */
+	if (state->currRecPtr == state->EndRecPtr &&
+		state->currTLI != 0 &&
+		state->currTLIValidUntil != InvalidXLogRecPtr &&
+		state->currRecPtr >= state->currTLIValidUntil)
+	{
+		elog(DEBUG2,
+			 "requested record %X/%X is on segment containing end of TLI %u valid until %X/%X, switching to next timeline",
+			 (uint32) (state->currRecPtr >> 32),
+			 (uint32) state->currRecPtr,
+			 state->currTLI,
+			 (uint32) (state->currTLIValidUntil >> 32),
+			 (uint32) (state->currTLIValidUntil));
+
+		/* invalidate TLI info so we look up the next TLI */
+		state->currTLI = 0;
+		state->currTLIValidUntil = InvalidXLogRecPtr;
+	}
+
+	if (state->currTLI == 0)
+	{
+		/*
+		 * Something changed; work out what timeline this record is on. We
+		 * might read it from the segment on this TLI or, if the segment is
+		 * also contained by newer timelines, the copy from a newer TLI.
+		 */
+		state->currTLI = tliOfPointInHistory(state->currRecPtr,
+											 state->timelineHistory);
+
+		/*
+		 * Look for the most recent timeline that's on the same xlog segment
+		 * as this record, since that's the only one we can assume is still
+		 * readable.
+		 */
+		while (state->currTLI != ThisTimeLineID &&
+			   state->currTLIValidUntil == InvalidXLogRecPtr)
+		{
+			XLogRecPtr	tliSwitch;
+			TimeLineID	nextTLI;
+
+			tliSwitch = tliSwitchPoint(state->currTLI, state->timelineHistory,
+									   &nextTLI);
+
+			/* round ValidUntil down to start of seg containing the switch */
+			state->currTLIValidUntil =
+				((tliSwitch / XLogSegSize) * XLogSegSize);
+
+			if (state->currRecPtr >= state->currTLIValidUntil)
+			{
+				/*
+				 * The new currTLI ends on this WAL segment so check the next
+				 * TLI to see if it's the last one on the segment.
+				 *
+				 * If that's the current TLI we'll stop searching.
+				 */
+				state->currTLI = nextTLI;
+				state->currTLIValidUntil = InvalidXLogRecPtr;
+			}
+		}
+
+		/*
+		 * We're now either reading from the first xlog segment in the current
+		 * server's timeline or the most recent historical timeline that
+		 * exists on the target segment.
+		 */
+		elog(DEBUG2, "XLog read ptr %X/%X is on segment with TLI %u valid until %X/%X, server current TLI is %u",
+			 (uint32) (state->currRecPtr >> 32),
+			 (uint32) state->currRecPtr,
+			 state->currTLI,
+			 (uint32) (state->currTLIValidUntil >> 32),
+			 (uint32) (state->currTLIValidUntil),
+			 ThisTimeLineID);
+	}
+}
+
+/*
+ * XLogPageReadCB callback for reading local xlog files
  *
  * Public because it would likely be very helpful for someone writing another
  * output method outside walsender, e.g. in a bgworker.
  *
- * TODO: The walsender has it's own version of this, but it relies on the
+ * TODO: The walsender has its own version of this, but it relies on the
  * walsender's latch being set whenever WAL is flushed. No such infrastructure
  * exists for normal backends, so we have to do a check/sleep/repeat style of
  * loop for now.
@@ -754,46 +897,88 @@ int
 read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 {
-	XLogRecPtr	flushptr,
+	XLogRecPtr	read_upto,
 				loc;
 	int			count;
 
 	loc = targetPagePtr + reqLen;
+
+	/* Make sure enough xlog is available... */
 	while (1)
 	{
 		/*
-		 * TODO: we're going to have to do something more intelligent about
-		 * timelines on standbys. Use readTimeLineHistory() and
-		 * tliOfPointInHistory() to get the proper LSN? For now we'll catch
-		 * that case earlier, but the code and TODO is left in here for when
-		 * that changes.
+		 * Check which timeline to get the record from.
+		 *
+		 * We have to do it each time through the loop because if we're in
+		 * recovery as a cascading standby, the current timeline might've
+		 * become historical.
 		 */
-		if (!RecoveryInProgress())
+		XLogReadDetermineTimeline(state);
+
+		if (state->currTLI == ThisTimeLineID)
 		{
-			*pageTLI = ThisTimeLineID;
-			flushptr = GetFlushRecPtr();
+			/*
+			 * We're reading from the current timeline so we might have to
+			 * wait for the desired record to be generated (or, for a standby,
+			 * received & replayed)
+			 */
+			if (!RecoveryInProgress())
+			{
+				*pageTLI = ThisTimeLineID;
+				read_upto = GetFlushRecPtr();
+			}
+			else
+				read_upto = GetXLogReplayRecPtr(pageTLI);
+
+			if (loc <= read_upto)
+				break;
+
+			CHECK_FOR_INTERRUPTS();
+			pg_usleep(1000L);
 		}
 		else
-			flushptr = GetXLogReplayRecPtr(pageTLI);
+		{
+			/*
+			 * We're on a historical timeline, so limit reading to the switch
+			 * point where we moved to the next timeline.
+			 */
+			read_upto = state->currTLIValidUntil;
 
-		if (loc <= flushptr)
+			/*
+			 * Setting pageTLI to our wanted record's TLI is slightly wrong;
+			 * the page might begin on an older timeline if it contains a
+			 * timeline switch, since its xlog segment will have been copied
+			 * from the prior timeline. This is pretty harmless though, as
+			 * nothing cares so long as the timeline doesn't go backwards.  We
+			 * should read the page header instead; FIXME someday.
+			 */
+			*pageTLI = state->currTLI;
+
+			/* No need to wait on a historical timeline */
 			break;
-
-		CHECK_FOR_INTERRUPTS();
-		pg_usleep(1000L);
+		}
 	}
 
-	/* more than one block available */
-	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
+	if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
+	{
+		/*
+		 * more than one block available; read only that block, have caller
+		 * come back if they need more.
+		 */
 		count = XLOG_BLCKSZ;
-	/* not enough data there */
-	else if (targetPagePtr + reqLen > flushptr)
+	}
+	else if (targetPagePtr + reqLen > read_upto)
+	{
+		/* not enough data there */
 		return -1;
-	/* part of the page available */
+	}
 	else
-		count = flushptr - targetPagePtr;
+	{
+		/* enough bytes available to satisfy the request */
+		count = read_upto - targetPagePtr;
+	}
 
-	XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
+	XLogRead(cur_page, *pageTLI, targetPagePtr, count);
 
 	return count;
 }
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index f789fc1..dde130a 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -115,7 +115,7 @@ logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 {
 	return read_local_xlog_page(state, targetPagePtr, reqLen,
-						 targetRecPtr, cur_page, pageTLI);
+								targetRecPtr, cur_page, pageTLI);
 }
 
 /*
@@ -231,12 +231,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	rsinfo->setResult = p->tupstore;
 	rsinfo->setDesc = p->tupdesc;
 
-	/* compute the current end-of-wal */
-	if (!RecoveryInProgress())
-		end_of_wal = GetFlushRecPtr();
-	else
-		end_of_wal = GetXLogReplayRecPtr(NULL);
-
 	ReplicationSlotAcquire(NameStr(*name));
 
 	PG_TRY();
@@ -263,6 +257,15 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 
 		ctx->output_writer_private = p;
 
+		/*
+		 * We start reading xlog from the restart lsn, even though in
+		 * CreateDecodingContext we set the snapshot builder up using the
+		 * slot's candidate_restart_lsn. This means we might read xlog we
+		 * don't actually decode rows from, but the snapshot builder might
+		 * need it to get to a consistent point. The point we start returning
+		 * data to *users* at is the candidate restart lsn from the decoding
+		 * context.
+		 */
 		startptr = MyReplicationSlot->data.restart_lsn;
 
 		CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, "logical decoding");
@@ -270,8 +273,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 		/* invalidate non-timetravel entries */
 		InvalidateSystemCaches();
 
+		if (!RecoveryInProgress())
+			end_of_wal = GetFlushRecPtr();
+		else
+			end_of_wal = GetXLogReplayRecPtr(NULL);
+
+		/* Decode until we run out of records */
 		while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
-			 (ctx->reader->EndRecPtr && ctx->reader->EndRecPtr < end_of_wal))
+			   (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
 		{
 			XLogRecord *record;
 			char	   *errm = NULL;
@@ -280,6 +289,10 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 			if (errm)
 				elog(ERROR, "%s", errm);
 
+			/*
+			 * Now that we've set up the xlog reader state subsequent calls
+			 * pass InvalidXLogRecPtr to say "continue from last record"
+			 */
 			startptr = InvalidXLogRecPtr;
 
 			/*
@@ -299,6 +312,18 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 			CHECK_FOR_INTERRUPTS();
 		}
 
+		/* Make sure timeline lookups use the start of the next record */
+		startptr = ctx->reader->EndRecPtr;
+
+		/*
+		 * The XLogReader will read a page past the valid end of WAL because
+		 * it doesn't know about timelines. When we switch timelines and ask
+		 * it for the first page on the new timeline it will think it has it
+		 * cached, but it'll have the old partial page and say it can't find
+		 * the next record. So flush the cache.
+		 */
+		XLogReaderInvalCache(ctx->reader);
+
 		tuplestore_donestoring(tupstore);
 
 		CurrentResourceOwner = old_resowner;
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 7553cc4..31cafd3 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -27,6 +27,10 @@
 
 #include "access/xlogrecord.h"
 
+#ifndef FRONTEND
+#include "nodes/pg_list.h"
+#endif
+
 typedef struct XLogReaderState XLogReaderState;
 
 /* Function type definition for the read_page callback */
@@ -139,26 +143,50 @@ struct XLogReaderState
 	 * ----------------------------------------
 	 */
 
-	/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
+	/*
+	 * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least
+	 * readLen bytes)
+	 */
 	char	   *readBuf;
 
-	/* last read segment, segment offset, read length, TLI */
+	/*
+	 * last read segment, segment offset, read length, TLI for data currently
+	 * in readBuf.
+	 */
 	XLogSegNo	readSegNo;
 	uint32		readOff;
 	uint32		readLen;
 	TimeLineID	readPageTLI;
 
-	/* beginning of last page read, and its TLI  */
+	/*
+	 * beginning of prior page read, and its TLI. Doesn't necessarily
+	 * correspond to what's in readBuf, used for timeline sanity checks.
+	 */
 	XLogRecPtr	latestPagePtr;
 	TimeLineID	latestPageTLI;
 
 	/* beginning of the WAL record being read. */
 	XLogRecPtr	currRecPtr;
+	/* timeline to read it from, 0 if a lookup is required */
+	TimeLineID	currTLI;
+
+	/*
+	 * Pointer to the end of the last whole segment on the timeline in currTLI
+	 * if it's historical or InvalidXLogRecPtr if currTLI is the current
+	 * timeline. This is *not* the tliSwitchPoint but it's guaranteed safe to
+	 * read up to this point from currTLI.
+	 */
+	XLogRecPtr	currTLIValidUntil;
 
 	/* Buffer for current ReadRecord result (expandable) */
 	char	   *readRecordBuf;
 	uint32		readRecordBufSize;
 
+#ifndef FRONTEND
+	/* cached timeline history, only available in backend */
+	List	   *timelineHistory;
+#endif
+
 	/* Buffer to hold error message */
 	char	   *errormsg_buf;
 };
@@ -174,6 +202,9 @@ extern void XLogReaderFree(XLogReaderState *state);
 extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
 			   XLogRecPtr recptr, char **errormsg);
 
+/* Flush any cached page */
+extern void XLogReaderInvalCache(XLogReaderState *state);
+
 #ifdef FRONTEND
 extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
 #endif   /* FRONTEND */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 1b9abce..c9df35e 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -48,6 +48,6 @@ extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
 
 extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
-	int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI);
+   int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI);
 
 #endif
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 6167ec1..bbaf94f 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -7,6 +7,7 @@ include $(top_builddir)/src/Makefile.global
 SUBDIRS = \
 		  brin \
 		  commit_ts \
+		  decoding_failover \
 		  dummy_seclabel \
 		  test_ddl_deparse \
 		  test_extensions \
diff --git a/src/test/modules/decoding_failover/.gitignore b/src/test/modules/decoding_failover/.gitignore
new file mode 100644
index 0000000..543c50d
--- /dev/null
+++ b/src/test/modules/decoding_failover/.gitignore
@@ -0,0 +1,3 @@
+results/
+tmp_check/
+log/
diff --git a/src/test/modules/decoding_failover/Makefile b/src/test/modules/decoding_failover/Makefile
new file mode 100644
index 0000000..6d1c034
--- /dev/null
+++ b/src/test/modules/decoding_failover/Makefile
@@ -0,0 +1,22 @@
+# src/test/modules/decoding_failover/Makefile
+
+MODULES = decoding_failover
+PGFILEDESC = "decoding_failover - test utility for logical decoding"
+
+EXTENSION = decoding_failover
+DATA = decoding_failover--1.0.sql
+
+EXTRA_INSTALL=contrib/test_decoding
+REGRESS=load_extension
+REGRESS_OPTS = --temp-config=$(top_srcdir)/src/test/modules/decoding_failover/decoding_failover.conf
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/decoding_failover
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/decoding_failover/README b/src/test/modules/decoding_failover/README
new file mode 100644
index 0000000..585f02f
--- /dev/null
+++ b/src/test/modules/decoding_failover/README
@@ -0,0 +1,19 @@
+A test module for logical decoding failover and timeline following.
+
+This module provides a minimal way to maintain logical slots on replicas
+that mirror the state on the master. It doesn't make decoding possible,
+just tracking slot state so that a decoding client that's using the master
+can follow a physical failover to the standby. The master doesn't know
+about the slots on the standby, they're synced by a client that connects
+to both.
+
+This is intentionally not part of the test_decoding module because that's meant
+to serve as example code, where this module exercises internal server features
+by unsafely exposing internal state to SQL. It's not the right way to do
+failover, it's just a simple way to test it from the perl TAP framework to
+prove the feature works.
+
+In a practical implementation of this approach a bgworker on the master would
+monitor slot positions and relay them to a bgworker on the standby that applies
+the position updates without exposing slot internals to SQL. That's too complex
+for this test framework though.
diff --git a/src/test/modules/decoding_failover/decoding_failover--1.0.sql b/src/test/modules/decoding_failover/decoding_failover--1.0.sql
new file mode 100644
index 0000000..078b65e
--- /dev/null
+++ b/src/test/modules/decoding_failover/decoding_failover--1.0.sql
@@ -0,0 +1,16 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION decoding_failover" to load this file. \quit
+
+CREATE OR REPLACE FUNCTION decoding_failover_create_logical_slot(slot_name text, plugin text)
+RETURNS void
+LANGUAGE c AS 'MODULE_PATHNAME';
+
+COMMENT ON FUNCTION decoding_failover_create_logical_slot(text, text)
+IS 'Create a logical slot at a particular lsn and xid. Do not use in production servers, it is not safe. The slot is created with an invalid xmin and lsn.';
+
+CREATE OR REPLACE FUNCTION decoding_failover_advance_logical_slot(slot_name text, new_xmin bigint, new_catalog_xmin bigint, new_restart_lsn pg_lsn, new_confirmed_lsn pg_lsn)
+RETURNS void
+LANGUAGE c AS 'MODULE_PATHNAME';
+
+COMMENT ON FUNCTION decoding_failover_advance_logical_slot(text, bigint, bigint, pg_lsn, pg_lsn)
+IS 'Advance a logical slot directly. Do not use this in production servers, it is not safe.';
diff --git a/src/test/modules/decoding_failover/decoding_failover.c b/src/test/modules/decoding_failover/decoding_failover.c
new file mode 100644
index 0000000..669e6c4
--- /dev/null
+++ b/src/test/modules/decoding_failover/decoding_failover.c
@@ -0,0 +1,122 @@
+/*
+ * decoding_failover.c
+ *		Testing helper for replication slots
+ */
+#include "postgres.h"
+
+#include "access/transam.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "replication/slot.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(decoding_failover_create_logical_slot);
+PG_FUNCTION_INFO_V1(decoding_failover_advance_logical_slot);
+
+static void clear_slot_transient_state(void);
+
+/*
+ * Create a new logical slot, with invalid LSN and xid, directly. This does not
+ * use the snapshot builder or logical decoding machinery. It's only intended
+ * for creating a slot on a replica that mirrors the state of a slot on an
+ * upstream master.
+ *
+ * You should immediately decoding_failover_advance_logical_slot(...) it
+ * after creation.
+ */
+Datum
+decoding_failover_create_logical_slot(PG_FUNCTION_ARGS)
+{
+	char	   *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
+	char	   *plugin = text_to_cstring(PG_GETARG_TEXT_P(1));
+
+	CheckSlotRequirements();
+
+	ReplicationSlotCreate(slotname, true, RS_PERSISTENT);
+
+	/* register the plugin name with the slot */
+	StrNCpy(NameStr(MyReplicationSlot->data.plugin), plugin, NAMEDATALEN);
+
+	/*
+	 * Initialize persistent state to placeholders to be set by
+	 * decoding_failover_advance_logical_slot .
+	 */
+	MyReplicationSlot->data.xmin = InvalidTransactionId;
+	MyReplicationSlot->data.catalog_xmin = InvalidTransactionId;
+	MyReplicationSlot->data.restart_lsn = InvalidXLogRecPtr;
+	MyReplicationSlot->data.confirmed_flush = InvalidXLogRecPtr;
+
+	clear_slot_transient_state();
+
+	ReplicationSlotRelease();
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Set the state of a slot.
+ *
+ * This doesn't maintain the non-persistent state at all,
+ * but since the slot isn't in use that's OK.
+ *
+ * There's intentionally no check to prevent slots going backwards
+ * because they can actually go backwards if the master crashes when
+ * it hasn't yet flushed slot state to disk then we copy the older
+ * slot state after recovery.
+ *
+ * There's no checking done for xmin or catalog xmin either, since
+ * we can't really do anything useful that accounts for xid wrap-around.
+ */
+Datum
+decoding_failover_advance_logical_slot(PG_FUNCTION_ARGS)
+{
+	char	   *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
+	TransactionId new_xmin = (TransactionId) PG_GETARG_INT64(1);
+	TransactionId new_catalog_xmin = (TransactionId) PG_GETARG_INT64(2);
+	XLogRecPtr	restart_lsn = PG_GETARG_LSN(3);
+	XLogRecPtr	confirmed_lsn = PG_GETARG_LSN(4);
+
+	CheckSlotRequirements();
+
+	ReplicationSlotAcquire(slotname);
+
+	if (MyReplicationSlot->data.database != MyDatabaseId)
+		elog(ERROR, "Trying to update a slot on a different database");
+
+	MyReplicationSlot->data.xmin = new_xmin;
+	MyReplicationSlot->data.catalog_xmin = new_catalog_xmin;
+	MyReplicationSlot->data.restart_lsn = restart_lsn;
+	MyReplicationSlot->data.confirmed_flush = confirmed_lsn;
+
+	clear_slot_transient_state();
+
+	ReplicationSlotMarkDirty();
+	ReplicationSlotSave();
+	ReplicationSlotRelease();
+
+	ReplicationSlotsComputeRequiredXmin(false);
+	ReplicationSlotsComputeRequiredLSN();
+
+	PG_RETURN_VOID();
+}
+
+static void
+clear_slot_transient_state(void)
+{
+	Assert(MyReplicationSlot != NULL);
+
+	/*
+	 * Make sure the slot state is the same as if it were newly loaded from
+	 * disk on recovery.
+	 */
+	MyReplicationSlot->effective_xmin = MyReplicationSlot->data.xmin;
+	MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
+
+	MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
+	MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
+	MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
+	MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
+}
diff --git a/src/test/modules/decoding_failover/decoding_failover.conf b/src/test/modules/decoding_failover/decoding_failover.conf
new file mode 100644
index 0000000..56b46d7
--- /dev/null
+++ b/src/test/modules/decoding_failover/decoding_failover.conf
@@ -0,0 +1,2 @@
+max_replication_slots=2
+wal_level=logical
diff --git a/src/test/modules/decoding_failover/decoding_failover.control b/src/test/modules/decoding_failover/decoding_failover.control
new file mode 100644
index 0000000..ef05ea2
--- /dev/null
+++ b/src/test/modules/decoding_failover/decoding_failover.control
@@ -0,0 +1,5 @@
+# decoding_failover extension
+comment = 'Logical decoding failover tests'
+default_version = '1.0'
+module_pathname = '$libdir/decoding_failover'
+relocatable = true
diff --git a/src/test/modules/decoding_failover/expected/load_extension.out b/src/test/modules/decoding_failover/expected/load_extension.out
new file mode 100644
index 0000000..fe3c54d
--- /dev/null
+++ b/src/test/modules/decoding_failover/expected/load_extension.out
@@ -0,0 +1,19 @@
+CREATE EXTENSION decoding_failover;
+SELECT decoding_failover_create_logical_slot('test_slot', 'test_decoding');
+ decoding_failover_create_logical_slot 
+---------------------------------------
+ 
+(1 row)
+
+SELECT decoding_failover_advance_logical_slot('test_slot', txid_current(), txid_current(), pg_current_xlog_location(), pg_current_xlog_location());
+ decoding_failover_advance_logical_slot 
+----------------------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('test_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/src/test/modules/decoding_failover/sql/load_extension.sql b/src/test/modules/decoding_failover/sql/load_extension.sql
new file mode 100644
index 0000000..4ea9f77
--- /dev/null
+++ b/src/test/modules/decoding_failover/sql/load_extension.sql
@@ -0,0 +1,7 @@
+CREATE EXTENSION decoding_failover;
+
+SELECT decoding_failover_create_logical_slot('test_slot', 'test_decoding');
+
+SELECT decoding_failover_advance_logical_slot('test_slot', txid_current(), txid_current(), pg_current_xlog_location(), pg_current_xlog_location());
+
+SELECT pg_drop_replication_slot('test_slot');
diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile
index 9290719..9710370 100644
--- a/src/test/recovery/Makefile
+++ b/src/test/recovery/Makefile
@@ -9,6 +9,8 @@
 #
 #-------------------------------------------------------------------------
 
+EXTRA_INSTALL=contrib/test_decoding src/test/modules/decoding_failover
+
 subdir = src/test/recovery
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
diff --git a/src/test/recovery/t/006_logical_decoding_timelines.pl b/src/test/recovery/t/006_logical_decoding_timelines.pl
new file mode 100644
index 0000000..c212bc0
--- /dev/null
+++ b/src/test/recovery/t/006_logical_decoding_timelines.pl
@@ -0,0 +1,303 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Logical replication slots can follow timeline switches but it's
+# normally not possible to have a logical slot on a replica where
+# promotion and a timeline switch can occur. The only ways
+# we can create that circumstance are:
+#
+# * By doing a filesystem-level copy of the DB, since pg_basebackup
+#   excludes pg_replslot but we can copy it directly; or
+#
+# * by creating a slot directly at the C level on the replica and
+#   advancing it as we go using the low level APIs. It can't be done
+#   from SQL since logical decoding isn't allowed on replicas.
+#
+# This module uses the first approach to show that timeline following
+# on a logical slot works.
+#
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 20;
+use RecursiveCopy;
+use File::Copy;
+
+my ($stdout, $stderr, $ret);
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
+$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->dump_info;
+$node_master->start;
+
+diag "Testing logical timeline following with a filesystem-level copy";
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('beforebb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+my $backup_name = 'b1';
+$node_master->backup_fs_hot($backup_name);
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$node_replica->start;
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('afterbb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+# Verify that only the before base_backup slot is on the replica
+$stdout = $node_replica->safe_psql('postgres',
+	'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, 'before_basebackup',
+	'Expected to find only slot before_basebackup on replica');
+
+# Boom, crash
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+	"SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# Shouldn't be able to read from slot created after base backup
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"
+);
+is($ret, 3, 'replaying from after_basebackup slot fails');
+like(
+	$stderr,
+	qr/replication slot "after_basebackup" does not exist/,
+	'after_basebackup slot missing');
+
+# Should be able to read from slot created before base backup
+($ret, $stdout, $stderr) = $node_replica->psql(
+	'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+	timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+# We don't need the standby anymore
+$node_replica->teardown_node();
+
+
+# OK, time to try the same thing again, but this time we'll be using slot
+# mirroring on the standby and a pg_basebackup of the master.
+
+diag "Testing logical timeline following with decoding_failover module";
+
+$node_master->start();
+
+# Clean up after the last test
+$node_master->safe_psql('postgres', 'DELETE FROM decoding;');
+is( $node_master->psql(
+		'postgres',
+'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;'),
+	0,
+	'dropping slots succeeds via pg_drop_replication_slot');
+
+# Same as before, we'll make one slot before basebackup, one after. This time
+# the basebackup will be with pg_basebackup so it'll omit both slots, then
+# we'll use SQL functions provided by the decoding_failover test module to sync
+# them to the replica, do some work, sync them and fail over then test again.
+# This time we should have both the before- and after-basebackup slots working.
+
+is( $node_master->psql(
+		'postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+	),
+	0,
+	'creating slot before_basebackup succeeds');
+
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('beforebb');");
+
+$backup_name = 'b2';
+$node_master->backup($backup_name);
+
+is( $node_master->psql(
+		'postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+	),
+	0,
+	'creating slot after_basebackup succeeds');
+
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('afterbb');");
+
+$node_replica = get_new_node('replica2');
+$node_replica->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+
+$node_replica->start;
+
+# Verify the slots are both absent on the replica
+$stdout = $node_replica->safe_psql('postgres',
+	'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, '', 'No slots exist on the replica');
+
+# Now do our magic to sync the slot states across. Normally
+# this would be being done continuously by a bgworker but
+# we're just doing it by hand for this test. This is exposing
+# postgres innards to SQL so it's unsafe except for testing.
+$node_master->safe_psql('postgres', 'CREATE EXTENSION decoding_failover;');
+my $slotinfo = $node_master->safe_psql('postgres',
+'SELECT slot_name, plugin, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots ORDER BY slot_name'
+);
+diag "Copying slots to replica";
+open my $fh, '<', \$slotinfo or die $!;
+while (<$fh>)
+{
+	print $_;
+	chomp $_;
+	my ($slot_name, $plugin, $xmin, $catalog_xmin, $restart_lsn,
+		$confirmed_flush_lsn)
+	  = map {
+		if   ($_ ne '') { "'$_'" }
+		else            { 'NULL' }
+	  } split qr/\|/, $_;
+
+	print
+"# Copying slot $slot_name,$plugin,$xmin,$catalog_xmin,$restart_lsn,$confirmed_flush_lsn\n";
+	$node_replica->safe_psql('postgres',
+		"SELECT decoding_failover_create_logical_slot($slot_name, $plugin);");
+	$node_replica->safe_psql('postgres',
+"SELECT decoding_failover_advance_logical_slot($slot_name, $xmin, $catalog_xmin, $restart_lsn, $confirmed_flush_lsn);"
+	);
+}
+close $fh or die $!;
+
+# Now both slots are present on the replica and exactly match the master
+$stdout = $node_replica->safe_psql('postgres',
+	'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is( $stdout,
+	"after_basebackup\nbefore_basebackup",
+	'both slots now exist on replica');
+
+$stdout = $node_replica->safe_psql(
+	'postgres',
+	qq{SELECT slot_name, plugin, xmin, catalog_xmin,
+			  restart_lsn, confirmed_flush_lsn
+		 FROM pg_replication_slots
+	 ORDER BY slot_name});
+is($stdout, $slotinfo,
+	"slot data read back from replica matches slot data on master");
+
+# We now have to copy some extra WAL to satisfy the requirements of the oldest
+# replication slot. pg_basebackup doesn't know to copy the extra WAL for slots
+# so we have to help out. We know the WAL is still retained on the master
+# because we haven't advanced the slots there.
+#
+# Figure out what the oldest segment we need is by looking at the restart_lsn
+# of the oldest slot.
+#
+# It only makes sense to do this once the slots are created on the replica,
+# otherwise it might just delete the segments again.
+
+my $oldest_needed_segment = $node_master->safe_psql(
+	'postgres',
+	qq{SELECT pg_xlogfile_name((
+      SELECT restart_lsn
+      FROM pg_replication_slots
+      ORDER BY restart_lsn ASC
+      LIMIT 1
+     ));}
+);
+
+diag "oldest needed xlog seg is $oldest_needed_segment ";
+
+# WAL segment names sort lexically so we can just grab everything > than this
+# segment.
+opendir(my $pg_xlog, $node_master->data_dir . "/pg_xlog") or die $!;
+while (my $seg = readdir $pg_xlog)
+{
+	next unless $seg >= $oldest_needed_segment && $seg =~ /^[0-9]{24}/;
+	diag "copying xlog seg $seg";
+	copy(
+		$node_master->data_dir . "/pg_xlog/" . $seg,
+		$node_replica->data_dir . "/pg_xlog/" . $seg
+	) or die "copy of xlog seg $seg failed: $!";
+}
+closedir $pg_xlog;
+
+# Boom, crash the master
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+	"SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# This time we can read from both slots
+($ret, $stdout, $stderr) = $node_replica->psql(
+	'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+	timeout => 30);
+is($ret, 0, 'replay from slot after_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot after_basebackup');
+is($stderr, '', 'replay from slot after_basebackup produces no stderr');
+
+# Should be able to read from slot created before base backup
+#
+# This would fail with an error about missing WAL segments if we hadn't
+# copied extra WAL earlier.
+($ret, $stdout, $stderr) = $node_replica->psql(
+	'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+	timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+	'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;');
+is($ret,    0,  'dropping slots succeeds via pg_drop_replication_slot');
+is($stderr, '', 'dropping slots produces no stderr output');
+
+1;
