From 45d3d96ab236f1621c79ce05556acf7aace14a7b Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Tue, 23 Feb 2016 15:59:37 +0800
Subject: [PATCH 2/8] Allow replication slots to follow failover

Originally replication slots were unique to a single node and weren't
recorded in WAL or replicated. A logical decoding client couldn't follow
a physical standby failover and promotion because the promoted replica
didn't have the original master's slots. The replica may not have
retained all required WAL and there was no way to create a new logical
slot and rewind it back to the point the logical client had replayed to.

Failover slots lift this limitation by replicating slots consistently to
physical standbys, keeping them up to date and using them in WAL
retention calculations. This allows a logical decoding client to follow
a physical failover and promotion without losing its place in the change
stream.

A failover slot may only be created on a master server, as it must be
able to write WAL. This limitation may be lifted later.

pg_basebackup is also modified to copy the contents of pg_replslot.
Non-failover slots will now be removed during backend startup instead
of being omitted from the copy.

This patch does not add any user interface for failover slots. There's
no way to create them from SQL or from the walsender. That and the
documentation for failover slots are in the next patch in the series
so that this patch is entirely focused on the implementation.

Craig Ringer, based on a prototype by Simon Riggs
---
 src/backend/access/rmgrdesc/Makefile       |   2 +-
 src/backend/access/rmgrdesc/replslotdesc.c |  65 ++++
 src/backend/access/transam/rmgr.c          |   1 +
 src/backend/access/transam/xlog.c          |   5 +-
 src/backend/commands/dbcommands.c          |   3 +
 src/backend/replication/basebackup.c       |  12 -
 src/backend/replication/logical/decode.c   |   1 +
 src/backend/replication/logical/logical.c  |  25 +-
 src/backend/replication/slot.c             | 586 +++++++++++++++++++++++++++--
 src/backend/replication/slotfuncs.c        |   4 +-
 src/backend/replication/walsender.c        |   8 +-
 src/bin/pg_xlogdump/replslotdesc.c         |   1 +
 src/bin/pg_xlogdump/rmgrdesc.c             |   1 +
 src/include/access/rmgrlist.h              |   1 +
 src/include/replication/slot.h             |  69 +---
 src/include/replication/slot_xlog.h        | 100 +++++
 16 files changed, 755 insertions(+), 129 deletions(-)
 create mode 100644 src/backend/access/rmgrdesc/replslotdesc.c
 create mode 120000 src/bin/pg_xlogdump/replslotdesc.c
 create mode 100644 src/include/replication/slot_xlog.h

diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index c72a1f2..600b544 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -10,7 +10,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \
 	   hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o relmapdesc.o \
-	   replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
+	   replorigindesc.o replslotdesc.o seqdesc.o smgrdesc.o spgdesc.o \
 	   standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/replslotdesc.c b/src/backend/access/rmgrdesc/replslotdesc.c
new file mode 100644
index 0000000..5829e8d
--- /dev/null
+++ b/src/backend/access/rmgrdesc/replslotdesc.c
@@ -0,0 +1,65 @@
+/*-------------------------------------------------------------------------
+ *
+ * replslotdesc.c
+ *	  rmgr descriptor routines for replication/slot.c
+ *
+ * Portions Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/replslotdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/slot_xlog.h"
+
+void
+replslot_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		case XLOG_REPLSLOT_UPDATE:
+			{
+				ReplicationSlotInWAL xlrec;
+
+				xlrec = (ReplicationSlotInWAL) rec;
+
+				appendStringInfo(buf, "of slot %s with restart %X/%X and xid %u confirmed to %X/%X",
+						NameStr(xlrec->name),
+						(uint32)(xlrec->restart_lsn>>32), (uint32)(xlrec->restart_lsn),
+						xlrec->xmin,
+						(uint32)(xlrec->confirmed_flush>>32), (uint32)(xlrec->confirmed_flush));
+
+				break;
+			}
+		case XLOG_REPLSLOT_DROP:
+			{
+				xl_replslot_drop *xlrec;
+
+				xlrec = (xl_replslot_drop *) rec;
+
+				appendStringInfo(buf, "of slot %s", NameStr(xlrec->name));
+
+				break;
+			}
+	}
+}
+
+const char *
+replslot_identify(uint8 info)
+{
+	switch (info)
+	{
+		case XLOG_REPLSLOT_UPDATE:
+			return "UPDATE";
+		case XLOG_REPLSLOT_DROP:
+			return "DROP";
+		default:
+			return NULL;
+	}
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 7c4d773..0bd5796 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,6 +24,7 @@
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
 #include "replication/origin.h"
+#include "replication/slot_xlog.h"
 #include "storage/standby.h"
 #include "utils/relmapper.h"
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 94b79ac..a92f09d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6366,8 +6366,11 @@ StartupXLOG(void)
 	/*
 	 * Initialize replication slots, before there's a chance to remove
 	 * required resources.
+	 *
+	 * If we're in archive recovery then non-failover slots are no
+	 * longer of any use and should be dropped during startup.
 	 */
-	StartupReplicationSlots();
+	StartupReplicationSlots(ArchiveRecoveryRequested);
 
 	/*
 	 * Startup logical state, needs to be setup now so we have proper data
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index c1c0223..61fc45b 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -2114,6 +2114,9 @@ dbase_redo(XLogReaderState *record)
 		/* Clean out the xlog relcache too */
 		XLogDropDatabase(xlrec->db_id);
 
+		/* Drop any logical failover slots for this database */
+		ReplicationSlotsDropDBSlots(xlrec->db_id);
+
 		/* And remove the physical files */
 		if (!rmtree(dst_path, true))
 			ereport(WARNING,
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index af0fb09..ab1f271 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -973,18 +973,6 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces,
 		}
 
 		/*
-		 * Skip pg_replslot, not useful to copy. But include it as an empty
-		 * directory anyway, so we get permissions right.
-		 */
-		if (strcmp(de->d_name, "pg_replslot") == 0)
-		{
-			if (!sizeonly)
-				_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
-			size += 512;		/* Size of the header just added */
-			continue;
-		}
-
-		/*
 		 * We can skip pg_xlog, the WAL segments need to be fetched from the
 		 * WAL archive anyway. But include it as an empty directory anyway, so
 		 * we get permissions right.
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 88c3a49..76fc5c7 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -135,6 +135,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 		case RM_BRIN_ID:
 		case RM_COMMIT_TS_ID:
 		case RM_REPLORIGIN_ID:
+		case RM_REPLSLOT_ID:
 			break;
 		case RM_NEXT_ID:
 			elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2e6d3f9..4feb2ca 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -85,16 +85,19 @@ CheckLogicalDecodingRequirements(void)
 				 errmsg("logical decoding requires a database connection")));
 
 	/* ----
-	 * TODO: We got to change that someday soon...
+	 * TODO: Allow logical decoding from a standby
 	 *
-	 * There's basically three things missing to allow this:
+	 * There's some things missing to allow this:
 	 * 1) We need to be able to correctly and quickly identify the timeline a
-	 *	  LSN belongs to
-	 * 2) We need to force hot_standby_feedback to be enabled at all times so
-	 *	  the primary cannot remove rows we need.
-	 * 3) support dropping replication slots referring to a database, in
-	 *	  dbase_redo. There can't be any active ones due to HS recovery
-	 *	  conflicts, so that should be relatively easy.
+	 *    LSN belongs to
+	 * 2) To prevent needed rows from being removed we need we would need
+	 *    to enhance hot_standby_feedback so it sends both xmin and
+	 *    catalog_xmin to the master.  A standby slot can't write WAL, so we
+	 *    wouldn't be able to use it directly for failover, without some very
+	 *    complex state interactions via master.
+	 *
+	 * So this doesn't seem likely to change anytime soon.
+	 *
 	 * ----
 	 */
 	if (RecoveryInProgress())
@@ -272,7 +275,7 @@ CreateInitDecodingContext(char *plugin,
 	slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId();
 	slot->data.catalog_xmin = slot->effective_catalog_xmin;
 
-	ReplicationSlotsComputeRequiredXmin(true);
+	ReplicationSlotsUpdateRequiredXmin(true);
 
 	LWLockRelease(ProcArrayLock);
 
@@ -908,8 +911,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 			MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
 			SpinLockRelease(&MyReplicationSlot->mutex);
 
-			ReplicationSlotsComputeRequiredXmin(false);
-			ReplicationSlotsComputeRequiredLSN();
+			ReplicationSlotsUpdateRequiredXmin(false);
+			ReplicationSlotsUpdateRequiredLSN();
 		}
 	}
 	else
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index affa9b9..915c0af 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -24,7 +24,18 @@
  * directory. Inside that directory the state file will contain the slot's
  * own data. Additional data can be stored alongside that file if required.
  * While the server is running, the state data is also cached in memory for
- * efficiency.
+ * efficiency. Non-failover slots are NOT subject to WAL logging and may
+ * be used on standbys (though that's only supported for physical slots at
+ * the moment). They use tempfile writes and swaps for crash safety.
+ *
+ * A failover slot created on a master node generates WAL records that
+ * maintain a copy of the slot on standby nodes. If a standby node is
+ * promoted the failover slot allows access to be restarted just as if the
+ * the original master node was being accessed, allowing for the timeline
+ * change. The replica considers slot positions when removing WAL to make
+ * sure it can satisfy the needs of slots after promotion.  For logical
+ * decoding slots the slot's internal state is kept up to date so it's
+ * ready for use after promotion.
  *
  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
@@ -44,6 +55,7 @@
 #include "common/string.h"
 #include "miscadmin.h"
 #include "replication/slot.h"
+#include "replication/slot_xlog.h"
 #include "storage/fd.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
@@ -101,10 +113,14 @@ static LWLockTranche ReplSlotIOLWLockTranche;
 static void ReplicationSlotDropAcquired(void);
 
 /* internal persistency functions */
-static void RestoreSlotFromDisk(const char *name);
+static void RestoreSlotFromDisk(const char *name, bool drop_nonfailover_slots);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
 
+/* internal redo functions */
+static void ReplicationSlotRedoCreateOrUpdate(ReplicationSlotInWAL xlrec);
+static void ReplicationSlotRedoDrop(const char * slotname);
+
 /*
  * Report shared-memory space needed by ReplicationSlotShmemInit.
  */
@@ -220,7 +236,8 @@ ReplicationSlotValidateName(const char *name, int elevel)
  */
 void
 ReplicationSlotCreate(const char *name, bool db_specific,
-					  ReplicationSlotPersistency persistency)
+					  ReplicationSlotPersistency persistency,
+					  bool failover)
 {
 	ReplicationSlot *slot = NULL;
 	int			i;
@@ -278,6 +295,15 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
 	slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
 	slot->data.restart_lsn = InvalidXLogRecPtr;
+	/* Slot timeline is unused and always zero */
+	slot->data.restart_tli = 0;
+
+	if (failover && RecoveryInProgress())
+		ereport(ERROR,
+				(errmsg("a failover slot may not be created on a replica"),
+				 errhint("Create the slot on the master server instead")));
+
+	slot->data.failover = failover;
 
 	/*
 	 * Create the slot on disk.  We haven't actually marked the slot allocated
@@ -313,6 +339,10 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 
 /*
  * Find a previously created slot and mark it as used by this backend.
+ *
+ * Sets active_pid and assigns MyReplicationSlot iff successfully acquired.
+ *
+ * ERRORs on an attempt to acquire a failover slot when in recovery.
  */
 void
 ReplicationSlotAcquire(const char *name)
@@ -335,7 +365,11 @@ ReplicationSlotAcquire(const char *name)
 		{
 			SpinLockAcquire(&s->mutex);
 			active_pid = s->active_pid;
-			if (active_pid == 0)
+			/*
+			 * We can only claim a slot for our use if it's not claimed
+			 * by someone else AND it isn't a failover slot on a standby.
+			 */
+			if (active_pid == 0 && !(RecoveryInProgress() && s->data.failover))
 				s->active_pid = MyProcPid;
 			SpinLockRelease(&s->mutex);
 			slot = s;
@@ -349,12 +383,24 @@ ReplicationSlotAcquire(const char *name)
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
 				 errmsg("replication slot \"%s\" does not exist", name)));
+
 	if (active_pid != 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_IN_USE),
 			   errmsg("replication slot \"%s\" is active for PID %d",
 					  name, active_pid)));
 
+	/*
+	 * An attempt to use a failover slot from a standby must fail since
+	 * we can't write WAL from a standby and there's no sensible way
+	 * to advance slot position from both replica and master anyway.
+	 */
+	if (RecoveryInProgress() && slot->data.failover)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_IN_USE),
+				 errmsg("replication slot \"%s\" is reserved for use after failover",
+					  name)));
+
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = slot;
 }
@@ -411,6 +457,9 @@ ReplicationSlotDrop(const char *name)
 /*
  * Permanently drop the currently acquired replication slot which will be
  * released by the point this function returns.
+ *
+ * Callers must NOT hold ReplicationSlotControlLock in SHARED mode.  EXCLUSIVE
+ * is OK, or not held at all.
  */
 static void
 ReplicationSlotDropAcquired(void)
@@ -418,9 +467,14 @@ ReplicationSlotDropAcquired(void)
 	char		path[MAXPGPATH];
 	char		tmppath[MAXPGPATH];
 	ReplicationSlot *slot = MyReplicationSlot;
+	bool slot_is_failover;
+	bool took_control_lock = false,
+		 took_allocation_lock = false;
 
 	Assert(MyReplicationSlot != NULL);
 
+	slot_is_failover = slot->data.failover;
+
 	/* slot isn't acquired anymore */
 	MyReplicationSlot = NULL;
 
@@ -428,8 +482,27 @@ ReplicationSlotDropAcquired(void)
 	 * If some other backend ran this code concurrently with us, we might try
 	 * to delete a slot with a certain name while someone else was trying to
 	 * create a slot with the same name.
+	 *
+	 * If called with the lock already held it MUST be held in
+	 * EXCLUSIVE mode.
 	 */
-	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
+	if (!LWLockHeldByMe(ReplicationSlotAllocationLock))
+	{
+		took_allocation_lock = true;
+		LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
+	}
+
+	/* Record the drop in XLOG if we aren't replaying WAL */
+	if (XLogInsertAllowed() && slot_is_failover)
+	{
+		xl_replslot_drop xlrec;
+
+		memcpy(&(xlrec.name), NameStr(slot->data.name), NAMEDATALEN);
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
+		(void) XLogInsert(RM_REPLSLOT_ID, XLOG_REPLSLOT_DROP);
+	}
 
 	/* Generate pathnames. */
 	sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
@@ -459,7 +532,11 @@ ReplicationSlotDropAcquired(void)
 	}
 	else
 	{
-		bool		fail_softly = slot->data.persistency == RS_EPHEMERAL;
+		bool		fail_softly = false;
+
+		if (RecoveryInProgress() ||
+			slot->data.persistency == RS_EPHEMERAL)
+			fail_softly = true;
 
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
@@ -477,18 +554,27 @@ ReplicationSlotDropAcquired(void)
 	 * grabbing the mutex because nobody else can be scanning the array here,
 	 * and nobody can be attached to this slot and thus access it without
 	 * scanning the array.
+	 *
+	 * You must hold the lock in EXCLUSIVE mode or not at all.
 	 */
-	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+	if (!LWLockHeldByMe(ReplicationSlotControlLock))
+	{
+		took_control_lock = true;
+		LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+	}
+
 	slot->active_pid = 0;
 	slot->in_use = false;
-	LWLockRelease(ReplicationSlotControlLock);
+
+	if (took_control_lock)
+		LWLockRelease(ReplicationSlotControlLock);
 
 	/*
 	 * Slot is dead and doesn't prevent resource removal anymore, recompute
 	 * limits.
 	 */
-	ReplicationSlotsComputeRequiredXmin(false);
-	ReplicationSlotsComputeRequiredLSN();
+	ReplicationSlotsUpdateRequiredXmin(false);
+	ReplicationSlotsUpdateRequiredLSN();
 
 	/*
 	 * If removing the directory fails, the worst thing that will happen is
@@ -504,7 +590,8 @@ ReplicationSlotDropAcquired(void)
 	 * We release this at the very end, so that nobody starts trying to create
 	 * a slot while we're still cleaning up the detritus of the old one.
 	 */
-	LWLockRelease(ReplicationSlotAllocationLock);
+	if (took_allocation_lock)
+		LWLockRelease(ReplicationSlotAllocationLock);
 }
 
 /*
@@ -544,6 +631,9 @@ ReplicationSlotMarkDirty(void)
 /*
  * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
  * guaranteeing it will be there after an eventual crash.
+ *
+ * Failover slots will emit a create xlog record at this time, having
+ * not been previously written to xlog.
  */
 void
 ReplicationSlotPersist(void)
@@ -565,7 +655,7 @@ ReplicationSlotPersist(void)
  * Compute the oldest xmin across all slots and store it in the ProcArray.
  */
 void
-ReplicationSlotsComputeRequiredXmin(bool already_locked)
+ReplicationSlotsUpdateRequiredXmin(bool already_locked)
 {
 	int			i;
 	TransactionId agg_xmin = InvalidTransactionId;
@@ -610,10 +700,20 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 }
 
 /*
- * Compute the oldest restart LSN across all slots and inform xlog module.
+ * Update the xlog module's copy of the minimum restart lsn across all slots
  */
 void
-ReplicationSlotsComputeRequiredLSN(void)
+ReplicationSlotsUpdateRequiredLSN(void)
+{
+	XLogSetReplicationSlotMinimumLSN(ReplicationSlotsComputeRequiredLSN(false));
+}
+
+/*
+ * Compute the oldest restart LSN across all slots (or optionally
+ * only failover slots) and return it.
+ */
+XLogRecPtr
+ReplicationSlotsComputeRequiredLSN(bool failover_only)
 {
 	int			i;
 	XLogRecPtr	min_required = InvalidXLogRecPtr;
@@ -625,14 +725,19 @@ ReplicationSlotsComputeRequiredLSN(void)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		XLogRecPtr	restart_lsn;
+		bool		failover;
 
 		if (!s->in_use)
 			continue;
 
 		SpinLockAcquire(&s->mutex);
 		restart_lsn = s->data.restart_lsn;
+		failover = s->data.failover;
 		SpinLockRelease(&s->mutex);
 
+		if (failover_only && !failover)
+			continue;
+
 		if (restart_lsn != InvalidXLogRecPtr &&
 			(min_required == InvalidXLogRecPtr ||
 			 restart_lsn < min_required))
@@ -640,7 +745,7 @@ ReplicationSlotsComputeRequiredLSN(void)
 	}
 	LWLockRelease(ReplicationSlotControlLock);
 
-	XLogSetReplicationSlotMinimumLSN(min_required);
+	return min_required;
 }
 
 /*
@@ -649,7 +754,7 @@ ReplicationSlotsComputeRequiredLSN(void)
  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
  * slots exist.
  *
- * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
+ * NB: this returns a value >= ReplicationSlotsUpdateRequiredLSN(), since it
  * ignores physical replication slots.
  *
  * The results aren't required frequently, so we don't maintain a precomputed
@@ -747,6 +852,45 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
 	return false;
 }
 
+void
+ReplicationSlotsDropDBSlots(Oid dboid)
+{
+	int			i;
+
+	Assert(MyReplicationSlot == NULL);
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (s->data.database == dboid)
+		{
+			/*
+			 * There should be no connections to this dbid
+			 * therefore all slots for this dbid should be
+			 * logical, inactive failover slots.
+			 */
+			Assert(s->active_pid == 0);
+			Assert(s->in_use == false);
+			Assert(SlotIsLogical(s));
+
+			/*
+			 * Acquire the replication slot
+			 */
+			MyReplicationSlot = s;
+
+			/*
+			 * No need to deactivate slot, especially since we
+			 * already hold ReplicationSlotControlLock.
+			 */
+			ReplicationSlotDropAcquired();
+		}
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	MyReplicationSlot = NULL;
+}
 
 /*
  * Check whether the server's configuration supports using replication
@@ -779,12 +923,13 @@ ReplicationSlotReserveWal(void)
 
 	Assert(slot != NULL);
 	Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
+	Assert(slot->data.restart_tli == 0);
 
 	/*
 	 * The replication slot mechanism is used to prevent removal of required
 	 * WAL. As there is no interlock between this routine and checkpoints, WAL
 	 * segments could concurrently be removed when a now stale return value of
-	 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
+	 * ReplicationSlotsUpdateRequiredLSN() is used. In the unlikely case that
 	 * this happens we'll just retry.
 	 */
 	while (true)
@@ -821,12 +966,12 @@ ReplicationSlotReserveWal(void)
 		}
 
 		/* prevent WAL removal as fast as possible */
-		ReplicationSlotsComputeRequiredLSN();
+		ReplicationSlotsUpdateRequiredLSN();
 
 		/*
 		 * If all required WAL is still there, great, otherwise retry. The
 		 * slot should prevent further removal of WAL, unless there's a
-		 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
+		 * concurrent ReplicationSlotsUpdateRequiredLSN() after we've written
 		 * the new restart_lsn above, so normally we should never need to loop
 		 * more than twice.
 		 */
@@ -878,7 +1023,7 @@ CheckPointReplicationSlots(void)
  * needs to be run before we start crash recovery.
  */
 void
-StartupReplicationSlots(void)
+StartupReplicationSlots(bool drop_nonfailover_slots)
 {
 	DIR		   *replication_dir;
 	struct dirent *replication_de;
@@ -917,7 +1062,7 @@ StartupReplicationSlots(void)
 		}
 
 		/* looks like a slot in a normal state, restore */
-		RestoreSlotFromDisk(replication_de->d_name);
+		RestoreSlotFromDisk(replication_de->d_name, drop_nonfailover_slots);
 	}
 	FreeDir(replication_dir);
 
@@ -926,8 +1071,8 @@ StartupReplicationSlots(void)
 		return;
 
 	/* Now that we have recovered all the data, compute replication xmin */
-	ReplicationSlotsComputeRequiredXmin(false);
-	ReplicationSlotsComputeRequiredLSN();
+	ReplicationSlotsUpdateRequiredXmin(false);
+	ReplicationSlotsUpdateRequiredLSN();
 }
 
 /* ----
@@ -996,6 +1141,8 @@ CreateSlotOnDisk(ReplicationSlot *slot)
 
 /*
  * Shared functionality between saving and creating a replication slot.
+ *
+ * For failover slots this is where we emit xlog.
  */
 static void
 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
@@ -1006,15 +1153,18 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
 	ReplicationSlotOnDisk cp;
 	bool		was_dirty;
 
-	/* first check whether there's something to write out */
-	SpinLockAcquire(&slot->mutex);
-	was_dirty = slot->dirty;
-	slot->just_dirtied = false;
-	SpinLockRelease(&slot->mutex);
+	if (!RecoveryInProgress())
+	{
+		/* first check whether there's something to write out */
+		SpinLockAcquire(&slot->mutex);
+		was_dirty = slot->dirty;
+		slot->just_dirtied = false;
+		SpinLockRelease(&slot->mutex);
 
-	/* and don't do anything if there's nothing to write */
-	if (!was_dirty)
-		return;
+		/* and don't do anything if there's nothing to write */
+		if (!was_dirty)
+			return;
+	}
 
 	LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
 
@@ -1047,6 +1197,25 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
 
 	SpinLockRelease(&slot->mutex);
 
+	/*
+	 * If needed, record this action in WAL
+	 */
+	if (slot->data.failover &&
+		slot->data.persistency == RS_PERSISTENT &&
+		!RecoveryInProgress())
+	{
+		XLogBeginInsert();
+		XLogRegisterData((char *) (&cp.slotdata), sizeof(ReplicationSlotPersistentData));
+		/*
+		 * Note that slot creation on the downstream is also an "update".
+		 *
+		 * Slots can start off ephemeral and be updated to persistent. We just
+		 * log the update and the downstream creates the new slot if it doesn't
+		 * exist yet.
+		 */
+		(void) XLogInsert(RM_REPLSLOT_ID, XLOG_REPLSLOT_UPDATE);
+	}
+
 	COMP_CRC32C(cp.checksum,
 				(char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
 				SnapBuildOnDiskChecksummedSize);
@@ -1116,7 +1285,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
  * Load a single slot from disk into memory.
  */
 static void
-RestoreSlotFromDisk(const char *name)
+RestoreSlotFromDisk(const char *name, bool drop_nonfailover_slots)
 {
 	ReplicationSlotOnDisk cp;
 	int			i;
@@ -1235,10 +1404,21 @@ RestoreSlotFromDisk(const char *name)
 						path, checksum, cp.checksum)));
 
 	/*
-	 * If we crashed with an ephemeral slot active, don't restore but delete
-	 * it.
+	 * If we crashed with an ephemeral slot active, don't restore but
+	 * delete it.
+	 *
+	 * Similarly, if we're in archive recovery and will be running as
+	 * a standby (when drop_nonfailover_slots is set), non-failover
+	 * slots can't be relied upon. Logical slots might have a catalog
+	 * xmin lower than reality because the original slot on the master
+	 * advanced past the point the stale slot on the replica is stuck
+	 * at. Additionally slots might have been copied while being
+	 * written to if the basebackup copy method was not atomic.
+	 * Failover slots are safe since they're WAL-logged and follow the
+	 * master's slot position.
 	 */
-	if (cp.slotdata.persistency != RS_PERSISTENT)
+	if (cp.slotdata.persistency != RS_PERSISTENT
+			|| (drop_nonfailover_slots && !cp.slotdata.failover))
 	{
 		sprintf(path, "pg_replslot/%s", name);
 
@@ -1249,6 +1429,14 @@ RestoreSlotFromDisk(const char *name)
 					 errmsg("could not remove directory \"%s\"", path)));
 		}
 		fsync_fname("pg_replslot", true);
+
+		if (cp.slotdata.persistency == RS_PERSISTENT)
+		{
+			ereport(LOG,
+					(errmsg("dropped non-failover slot %s during archive recovery",
+							 NameStr(cp.slotdata.name))));
+		}
+
 		return;
 	}
 
@@ -1285,5 +1473,331 @@ RestoreSlotFromDisk(const char *name)
 	if (!restored)
 		ereport(PANIC,
 				(errmsg("too many replication slots active before shutdown"),
-				 errhint("Increase max_replication_slots and try again.")));
+				 errhint("Increase max_replication_slots (currently %u) and try again.",
+					 max_replication_slots)));
+}
+
+/*
+ * This usually just writes new persistent data to the slot state, but an
+ * update record might create a new slot on the downstream if we changed a
+ * previously ephemeral slot to persistent. We have to decide which
+ * by looking for the existing slot.
+ */
+static void
+ReplicationSlotRedoCreateOrUpdate(ReplicationSlotInWAL xlrec)
+{
+	ReplicationSlot *slot;
+	bool	found_available = false;
+	bool	found_duplicate = false;
+	int		use_slotid = 0;
+	int		i;
+
+	/*
+	 * We're in redo, but someone could still create a local
+	 * non-failover slot and race with us unless we take the
+	 * allocation lock.
+	 */
+	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
+
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		slot = &ReplicationSlotCtl->replication_slots[i];
+
+		/*
+		 * Find first unused position in the slots array, but keep on
+		 * scanning in case there's an existing slot with the same
+		 * name.
+		 */
+		if (!slot->in_use && !found_available)
+		{
+			use_slotid = i;
+			found_available = true;
+		}
+
+		/*
+		 * Existing slot with same name? It could be our failover slot
+		 * to update or a non-failover slot with a conflicting name.
+		 */
+		if (strcmp(NameStr(xlrec->name), NameStr(slot->data.name)) == 0)
+		{
+			use_slotid = i;
+			found_available = true;
+			found_duplicate = true;
+			break;
+		}
+	}
+
+	if (found_duplicate && !slot->data.failover)
+	{
+		/*
+		 * A local non-failover slot exists with the same name as
+		 * the failover slot we're creating.
+		 *
+		 * Clobber the client, drop its slot, and carry on with
+		 * our business.
+		 *
+		 * First we must temporarily release the allocation lock while
+		 * we try to terminate the process that holds the slot, since
+		 * we don't want to hold the LWlock for ages. We'll reacquire
+		 * it later.
+		 */
+		LWLockRelease(ReplicationSlotAllocationLock);
+
+		/* We might race with other clients, so retry-loop */
+		do
+		{
+			int active_pid = slot->active_pid;
+			int max_sleep_millis = 120 * 1000;
+			int millis_per_sleep = 1000;
+
+			if (active_pid != 0)
+			{
+				ereport(INFO,
+						(errmsg("terminating active connection by pid %u to local slot %s because of conflict with recovery",
+							active_pid, NameStr(slot->data.name))));
+
+				if (kill(active_pid, SIGTERM))
+					elog(DEBUG1, "failed to signal pid %u to terminate on slot conflict: %m",
+							active_pid);
+
+				/*
+				 * No way to wait for the process since it's not a child
+				 * of ours and there's no latch to set, so poll.
+				 *
+				 * We're checking this without any locks held, but
+				 * we'll recheck when we attempt to drop the slot.
+				 */
+				while (slot->in_use && slot->active_pid == active_pid
+						&& max_sleep_millis > 0)
+				{
+					int rc;
+
+					rc = WaitLatch(MyLatch,
+							WL_TIMEOUT | WL_LATCH_SET | WL_POSTMASTER_DEATH,
+							millis_per_sleep);
+
+					if (rc & WL_POSTMASTER_DEATH)
+						elog(FATAL, "exiting after postmaster termination");
+
+					/*
+					 * Might be shorter if something sets our latch, but
+					 * we don't care much.
+					 */
+					max_sleep_millis -= millis_per_sleep;
+				}
+
+				if (max_sleep_millis <= 0)
+					elog(WARNING, "process %u is taking too long to terminate after SIGTERM",
+							slot->active_pid);
+			}
+
+			if (slot->active_pid == 0)
+			{
+				/* Try to acquire and drop the slot */
+				SpinLockAcquire(&slot->mutex);
+
+				if (slot->active_pid != 0)
+				{
+					/* Lost the race, go around */
+				}
+				else
+				{
+					/* Claim the slot for ourselves */
+					slot->active_pid = MyProcPid;
+					MyReplicationSlot = slot;
+				}
+				SpinLockRelease(&slot->mutex);
+			}
+
+			if (slot->active_pid == MyProcPid)
+			{
+				NameData slotname;
+				strncpy(NameStr(slotname), NameStr(slot->data.name), NAMEDATALEN);
+				(NameStr(slotname))[NAMEDATALEN-1] = '\0';
+
+				/*
+				 * Reclaim the allocation lock and THEN drop the slot,
+				 * so nobody else can grab the name until we've
+				 * finished redo.
+				 */
+				LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
+				ReplicationSlotDropAcquired();
+				/* We clobbered the duplicate, treat it as new */
+				found_duplicate = false;
+
+				ereport(WARNING,
+						(errmsg("dropped local replication slot %s because of conflict with recovery",
+								NameStr(slotname)),
+						 errdetail("A failover slot with the same name was created on the master server")));
+			}
+		}
+		while (slot->in_use);
+	}
+
+	Assert(LWLockHeldByMe(ReplicationSlotAllocationLock));
+
+	/*
+	 * This is either an empty slot control position to make a new slot or it's
+	 * an existing entry for this failover slot that we need to update.
+	 */
+	if (found_available)
+	{
+		LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+
+		slot = &ReplicationSlotCtl->replication_slots[use_slotid];
+
+		/* restore the entire set of persistent data */
+		memcpy(&slot->data, xlrec,
+			   sizeof(ReplicationSlotPersistentData));
+
+		Assert(strcmp(NameStr(xlrec->name), NameStr(slot->data.name)) == 0);
+		Assert(slot->data.failover && slot->data.persistency == RS_PERSISTENT);
+
+		/* Update the non-persistent in-memory state */
+		slot->effective_xmin = xlrec->xmin;
+		slot->effective_catalog_xmin = xlrec->catalog_xmin;
+
+		if (found_duplicate)
+		{
+			char		path[MAXPGPATH];
+
+			/* Write an existing slot to disk */
+			Assert(slot->in_use);
+			Assert(slot->active_pid == 0); /* can't be replaying from failover slot */
+
+			sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
+			slot->dirty = true;
+			SaveSlotToPath(slot, path, ERROR);
+		}
+		else
+		{
+			Assert(!slot->in_use);
+			/* In-memory state that's only set on create, not update */
+			slot->active_pid = 0;
+			slot->in_use = true;
+			slot->candidate_catalog_xmin = InvalidTransactionId;
+			slot->candidate_xmin_lsn = InvalidXLogRecPtr;
+			slot->candidate_restart_lsn = InvalidXLogRecPtr;
+			slot->candidate_restart_valid = InvalidXLogRecPtr;
+
+			CreateSlotOnDisk(slot);
+		}
+
+		LWLockRelease(ReplicationSlotControlLock);
+
+		ReplicationSlotsUpdateRequiredXmin(false);
+		ReplicationSlotsUpdateRequiredLSN();
+	}
+
+	LWLockRelease(ReplicationSlotAllocationLock);
+
+	if (!found_available)
+	{
+		/*
+		 * Because the standby should have the same or greater max_replication_slots
+		 * as the master this shouldn't happen, but just in case...
+		 */
+		ereport(ERROR,
+				(errmsg("max_replication_slots exceeded, cannot replay failover slot creation"),
+				 errhint("Increase max_replication_slots")));
+	}
+}
+
+/*
+ * Redo a slot drop of a failover slot. This might be a redo during crash
+ * recovery on the master or it may be replay on a standby.
+ */
+static void
+ReplicationSlotRedoDrop(const char * slotname)
+{
+	/*
+	 * Acquire the failover slot that's to be dropped.
+	 *
+	 * We can't ReplicationSlotAcquire here because we want to acquire
+	 * a replication slot during replay, which isn't usually allowed.
+	 * Also, because we might crash midway through a drop we can't
+	 * assume we'll actually find the slot so it's not an error for
+	 * the slot to be missing.
+	 */
+	int			i;
+
+	Assert(MyReplicationSlot == NULL);
+
+	ReplicationSlotValidateName(slotname, ERROR);
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (s->in_use && strcmp(slotname, NameStr(s->data.name)) == 0)
+		{
+			if (!s->data.persistency == RS_PERSISTENT)
+			{
+				/* shouldn't happen */
+				elog(WARNING, "found conflicting non-persistent slot during failover slot drop");
+				break;
+			}
+
+			if (!s->data.failover)
+			{
+				/* shouldn't happen */
+				elog(WARNING, "found non-failover slot during redo of slot drop");
+				break;
+			}
+
+			/* A failover slot can't be active during recovery */
+			Assert(s->active_pid == 0);
+
+			/* Claim the slot */
+			s->active_pid = MyProcPid;
+			MyReplicationSlot = s;
+
+			break;
+		}
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	if (MyReplicationSlot != NULL)
+	{
+		ReplicationSlotDropAcquired();
+	}
+	else
+	{
+		elog(WARNING, "failover slot %s not found during redo of drop",
+				slotname);
+	}
+}
+
+void
+replslot_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		/*
+		 * Update the values for an existing failover slot or, when a slot
+		 * is first logged as persistent, create it on the downstream.
+		 */
+		case XLOG_REPLSLOT_UPDATE:
+			ReplicationSlotRedoCreateOrUpdate((ReplicationSlotInWAL) XLogRecGetData(record));
+			break;
+
+		/*
+		 * Drop an existing failover slot.
+		 */
+		case XLOG_REPLSLOT_DROP:
+			{
+				xl_replslot_drop *xlrec =
+				(xl_replslot_drop *) XLogRecGetData(record);
+
+				ReplicationSlotRedoDrop(NameStr(xlrec->name));
+
+				break;
+			}
+
+		default:
+			elog(PANIC, "replslot_redo: unknown op code %u", info);
+	}
 }
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 9cc24ea..f430714 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -57,7 +57,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 	CheckSlotRequirements();
 
 	/* acquire replication slot, this will check for conflicting names */
-	ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
+	ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT, false);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 	nulls[0] = false;
@@ -120,7 +120,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 * errors during initialization because it'll get dropped if this
 	 * transaction fails. We'll make it persistent at the end.
 	 */
-	ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL);
+	ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL, false);
 
 	/*
 	 * Create logical decoding context, to build the initial snapshot.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c03e045..1583862 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -792,7 +792,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 	if (cmd->kind == REPLICATION_KIND_PHYSICAL)
 	{
-		ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT);
+		ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT, false);
 	}
 	else
 	{
@@ -803,7 +803,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 * handle errors during initialization because it'll get dropped if
 		 * this transaction fails. We'll make it persistent at the end.
 		 */
-		ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL);
+		ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL, false);
 	}
 
 	initStringInfo(&output_message);
@@ -1523,7 +1523,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	if (changed)
 	{
 		ReplicationSlotMarkDirty();
-		ReplicationSlotsComputeRequiredLSN();
+		ReplicationSlotsUpdateRequiredLSN();
 	}
 
 	/*
@@ -1619,7 +1619,7 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 	if (changed)
 	{
 		ReplicationSlotMarkDirty();
-		ReplicationSlotsComputeRequiredXmin(false);
+		ReplicationSlotsUpdateRequiredXmin(false);
 	}
 }
 
diff --git a/src/bin/pg_xlogdump/replslotdesc.c b/src/bin/pg_xlogdump/replslotdesc.c
new file mode 120000
index 0000000..2e088d2
--- /dev/null
+++ b/src/bin/pg_xlogdump/replslotdesc.c
@@ -0,0 +1 @@
+../../../src/backend/access/rmgrdesc/replslotdesc.c
\ No newline at end of file
diff --git a/src/bin/pg_xlogdump/rmgrdesc.c b/src/bin/pg_xlogdump/rmgrdesc.c
index f9cd395..73ed7d4 100644
--- a/src/bin/pg_xlogdump/rmgrdesc.c
+++ b/src/bin/pg_xlogdump/rmgrdesc.c
@@ -26,6 +26,7 @@
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
 #include "replication/origin.h"
+#include "replication/slot_xlog.h"
 #include "rmgrdesc.h"
 #include "storage/standbydefs.h"
 #include "utils/relmapper.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index fab912d..124b7e5 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_start
 PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
 PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
+PG_RMGR(RM_REPLSLOT_ID, "ReplicationSlot", replslot_redo, replslot_desc, replslot_identify, NULL, NULL)
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 8be8ab6..cdcbd37 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -11,69 +11,12 @@
 
 #include "fmgr.h"
 #include "access/xlog.h"
-#include "access/xlogreader.h"
+#include "replication/slot_xlog.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
 
 /*
- * Behaviour of replication slots, upon release or crash.
- *
- * Slots marked as PERSISTENT are crashsafe and will not be dropped when
- * released. Slots marked as EPHEMERAL will be dropped when released or after
- * restarts.
- *
- * EPHEMERAL slots can be made PERSISTENT by calling ReplicationSlotPersist().
- */
-typedef enum ReplicationSlotPersistency
-{
-	RS_PERSISTENT,
-	RS_EPHEMERAL
-} ReplicationSlotPersistency;
-
-/*
- * On-Disk data of a replication slot, preserved across restarts.
- */
-typedef struct ReplicationSlotPersistentData
-{
-	/* The slot's identifier */
-	NameData	name;
-
-	/* database the slot is active on */
-	Oid			database;
-
-	/*
-	 * The slot's behaviour when being dropped (or restored after a crash).
-	 */
-	ReplicationSlotPersistency persistency;
-
-	/*
-	 * xmin horizon for data
-	 *
-	 * NB: This may represent a value that hasn't been written to disk yet;
-	 * see notes for effective_xmin, below.
-	 */
-	TransactionId xmin;
-
-	/*
-	 * xmin horizon for catalog tuples
-	 *
-	 * NB: This may represent a value that hasn't been written to disk yet;
-	 * see notes for effective_xmin, below.
-	 */
-	TransactionId catalog_xmin;
-
-	/* oldest LSN that might be required by this replication slot */
-	XLogRecPtr	restart_lsn;
-
-	/* oldest LSN that the client has acked receipt for */
-	XLogRecPtr	confirmed_flush;
-
-	/* plugin name */
-	NameData	plugin;
-} ReplicationSlotPersistentData;
-
-/*
  * Shared memory state of a single replication slot.
  */
 typedef struct ReplicationSlot
@@ -155,7 +98,7 @@ extern void ReplicationSlotsShmemInit(void);
 
 /* management of individual slots */
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
-					  ReplicationSlotPersistency p);
+					  ReplicationSlotPersistency p, bool failover);
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name);
 
@@ -167,12 +110,14 @@ extern void ReplicationSlotMarkDirty(void);
 /* misc stuff */
 extern bool ReplicationSlotValidateName(const char *name, int elevel);
 extern void ReplicationSlotReserveWal(void);
-extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
-extern void ReplicationSlotsComputeRequiredLSN(void);
+extern void ReplicationSlotsUpdateRequiredXmin(bool already_locked);
+extern void ReplicationSlotsUpdateRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
+extern XLogRecPtr ReplicationSlotsComputeRequiredLSN(bool failover_only);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
+extern void ReplicationSlotsDropDBSlots(Oid dboid);
 
-extern void StartupReplicationSlots(void);
+extern void StartupReplicationSlots(bool drop_nonfailover_slots);
 extern void CheckPointReplicationSlots(void);
 
 extern void CheckSlotRequirements(void);
diff --git a/src/include/replication/slot_xlog.h b/src/include/replication/slot_xlog.h
new file mode 100644
index 0000000..e3211f5
--- /dev/null
+++ b/src/include/replication/slot_xlog.h
@@ -0,0 +1,100 @@
+/*-------------------------------------------------------------------------
+ * slot_xlog.h
+ *	   Replication slot management.
+ *
+ * Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * src/include/replication/slot_xlog.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef SLOT_XLOG_H
+#define SLOT_XLOG_H
+
+#include "fmgr.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+
+/*
+ * Behaviour of replication slots, upon release or crash.
+ *
+ * Slots marked as PERSISTENT are crashsafe and will not be dropped when
+ * released. Slots marked as EPHEMERAL will be dropped when released or after
+ * restarts.
+ *
+ * EPHEMERAL slots can be made PERSISTENT by calling ReplicationSlotPersist().
+ */
+typedef enum ReplicationSlotPersistency
+{
+	RS_PERSISTENT,
+	RS_EPHEMERAL
+} ReplicationSlotPersistency;
+
+/*
+ * On-Disk data of a replication slot, preserved across restarts.
+ */
+typedef struct ReplicationSlotPersistentData
+{
+	/* The slot's identifier */
+	NameData	name;
+
+	/* database the slot is active on */
+	Oid			database;
+
+	/*
+	 * The slot's behaviour when being dropped (or restored after a crash).
+	 */
+	ReplicationSlotPersistency persistency;
+
+	/*
+	 * Slots created on master become failover-slots and are maintained
+	 * on all standbys, but are only assignable after failover.
+	 */
+	bool		failover;
+
+	/*
+	 * xmin horizon for data
+	 *
+	 * NB: This may represent a value that hasn't been written to disk yet;
+	 * see notes for effective_xmin, below.
+	 */
+	TransactionId xmin;
+
+	/*
+	 * xmin horizon for catalog tuples
+	 *
+	 * NB: This may represent a value that hasn't been written to disk yet;
+	 * see notes for effective_xmin, below.
+	 */
+	TransactionId catalog_xmin;
+
+	/* oldest LSN that might be required by this replication slot */
+	XLogRecPtr	restart_lsn;
+	TimeLineID	restart_tli;
+
+	/* oldest LSN that the client has acked receipt for */
+	XLogRecPtr	confirmed_flush;
+
+	/* plugin name */
+	NameData	plugin;
+} ReplicationSlotPersistentData;
+
+typedef ReplicationSlotPersistentData *ReplicationSlotInWAL;
+
+/*
+ * WAL records for failover slots
+ */
+#define XLOG_REPLSLOT_UPDATE	0x10
+#define XLOG_REPLSLOT_DROP		0x20
+
+typedef struct xl_replslot_drop
+{
+	NameData	name;
+} xl_replslot_drop;
+
+/* WAL logging */
+extern void replslot_redo(XLogReaderState *record);
+extern void replslot_desc(StringInfo buf, XLogReaderState *record);
+extern const char *replslot_identify(uint8 info);
+
+#endif   /* SLOT_XLOG_H */
-- 
2.1.0

