From aed1a4e0d1357938e765758ea9695553f7e9647c Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 5 Sep 2016 15:30:53 +0800
Subject: [PATCH 9/9] Logical decoding on standby

* Make walsender aware of ProcSignal and recovery conflicts, make walsender
  exit with recovery conflict on upstream drop database when it has an active
  logical slot on that database.
* Allow GetOldestXmin to omit catalog_xmin, be called already locked.
* Send catalog_xmin separately in hot_standby_feedback messages.
* Store catalog_xmin separately on a physical slot if received in hot_standby_feedback
* Separate the catalog_xmin used by vacuum from ProcArray's replication_slot_catalog_xmin,
  requiring that xlog be emitted before vacuum can remove no longer needed catalogs, store
  it in checkpoints, make vacuum and bgwriter advance it.
* During decoding startup check whether catalog_xmin requirement can be satisfied
  and bail out if it can not
* Add a new recovery conflict type for conflict with catalog_xmin. Abort
  in-progress logical decoding sessions with conflict with recovery where needed
  catalog_xmin is too old
* Make extra efforts to reserve master's catalog_xmin during decoding startup
  on standby.
* Try to make sure hot_standby_feedback is active when starting
  logical decoding.
* Remove checks preventing starting logical decoding on standby
---
 contrib/pg_visibility/pg_visibility.c              |   4 +-
 contrib/pgstattuple/pgstatapprox.c                 |   2 +-
 doc/src/sgml/protocol.sgml                         |  33 +-
 src/backend/access/heap/heapam.c                   |   2 +-
 src/backend/access/heap/rewriteheap.c              |   3 +-
 src/backend/access/rmgrdesc/xactdesc.c             |   9 +
 src/backend/access/transam/varsup.c                |  15 +
 src/backend/access/transam/xact.c                  |  55 +++
 src/backend/access/transam/xlog.c                  |  26 +-
 src/backend/catalog/index.c                        |   2 +-
 src/backend/commands/analyze.c                     |   2 +-
 src/backend/commands/dbcommands.c                  |   6 +
 src/backend/commands/vacuum.c                      |  13 +-
 src/backend/postmaster/bgwriter.c                  |   9 +
 src/backend/postmaster/pgstat.c                    |   2 +
 src/backend/replication/logical/decode.c           |  11 +
 src/backend/replication/logical/logical.c          | 323 ++++++++++++++-
 src/backend/replication/slot.c                     |  91 ++++-
 src/backend/replication/walreceiver.c              |  52 ++-
 src/backend/replication/walsender.c                | 135 ++++--
 src/backend/storage/ipc/procarray.c                | 201 +++++++--
 src/backend/storage/ipc/procsignal.c               |   3 +
 src/backend/storage/ipc/standby.c                  | 147 ++++++-
 src/backend/tcop/postgres.c                        |  38 +-
 src/bin/pg_controldata/pg_controldata.c            |   2 +
 src/include/access/transam.h                       |   5 +
 src/include/access/xact.h                          |  12 +-
 src/include/catalog/pg_control.h                   |   1 +
 src/include/pgstat.h                               |   3 +-
 src/include/replication/slot.h                     |   1 +
 src/include/replication/walreceiver.h              |   3 +
 src/include/storage/procarray.h                    |   9 +-
 src/include/storage/procsignal.h                   |   1 +
 src/include/storage/standby.h                      |   2 +
 .../recovery/t/010_logical_decoding_on_replica.pl  | 453 +++++++++++++++++++++
 35 files changed, 1547 insertions(+), 129 deletions(-)
 create mode 100644 src/test/recovery/t/010_logical_decoding_on_replica.pl

diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c
index 9985e3e..4fa3ad4 100644
--- a/contrib/pg_visibility/pg_visibility.c
+++ b/contrib/pg_visibility/pg_visibility.c
@@ -538,7 +538,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 	if (all_visible)
 	{
 		/* Don't pass rel; that will fail in recovery. */
-		OldestXmin = GetOldestXmin(NULL, true);
+		OldestXmin = GetOldestXmin(NULL, true, false);
 	}
 
 	rel = relation_open(relid, AccessShareLock);
@@ -660,7 +660,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 				 * a buffer lock. And this shouldn't happen often, so it's
 				 * worth being careful so as to avoid false positives.
 				 */
-				RecomputedOldestXmin = GetOldestXmin(NULL, true);
+				RecomputedOldestXmin = GetOldestXmin(NULL, true, false);
 
 				if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin))
 					record_corrupt_item(items, &tuple.t_self);
diff --git a/contrib/pgstattuple/pgstatapprox.c b/contrib/pgstattuple/pgstatapprox.c
index f524fc4..5b33c97 100644
--- a/contrib/pgstattuple/pgstatapprox.c
+++ b/contrib/pgstattuple/pgstatapprox.c
@@ -70,7 +70,7 @@ statapprox_heap(Relation rel, output_type *stat)
 	TransactionId OldestXmin;
 	uint64		misc_count = 0;
 
-	OldestXmin = GetOldestXmin(rel, true);
+	OldestXmin = GetOldestXmin(rel, true, false);
 	bstrategy = GetAccessStrategy(BAS_BULKREAD);
 
 	nblocks = RelationGetNumberOfBlocks(rel);
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 50cf527..e0fd9aa 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1783,10 +1783,11 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current xmin. This may be 0, if the standby is
-          sending notification that Hot Standby feedback will no longer
-          be sent on this connection. Later non-zero messages may
-          reinitiate the feedback mechanism.
+          The standby's current global xmin, excluding the catalog_xmin from any
+          replication slots. If both this value and the following
+          catalog_xmin are 0 this is treated as a notification that Hot Standby
+          feedback will no longer be sent on this connection. Later non-zero
+          messages may reinitiate the feedback mechanism.
       </para>
       </listitem>
       </varlistentry>
@@ -1796,7 +1797,29 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current epoch.
+          The epoch of the global xmin xid on the standby.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The lowest catalog_xmin of any replication slots on the standby. Set to 0
+          if no catalog_xmin exists on the standby or if hot standby feedback is being
+          disabled. New in 10.0.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The epoch of the catalog_xmin xid on the standby. New in 10.0.
       </para>
       </listitem>
       </varlistentry>
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index b019bc1..12c1b36 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -7300,7 +7300,7 @@ heap_tuple_needs_freeze(HeapTupleHeader tuple, TransactionId cutoff_xid,
  * ratchet forwards latestRemovedXid to the greatest one found.
  * This is used as the basis for generating Hot Standby conflicts, so
  * if a tuple was never visible then removing it should not conflict
- * with queries.
+ * with queries or logical decoding output plugin callbacks.
  */
 void
 HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 17584ba..c514b7b 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -810,7 +810,8 @@ logical_begin_heap_rewrite(RewriteState state)
 	if (!state->rs_logical_rewrite)
 		return;
 
-	ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
+	/* Use the catalog_xmin being retained by vacuum */
+	ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin, NULL);
 
 	/*
 	 * If there are no logical slots in progress we don't need to do anything,
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 91d27d0..f454d9d 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -297,6 +297,12 @@ xact_desc(StringInfo buf, XLogReaderState *record)
 		appendStringInfo(buf, "xtop %u: ", xlrec->xtop);
 		xact_desc_assignment(buf, xlrec);
 	}
+	else if (info == XLOG_XACT_CATALOG_XMIN_ADV)
+	{
+		xl_xact_catalog_xmin_advance *xlrec = (xl_xact_catalog_xmin_advance *) XLogRecGetData(record);
+
+		appendStringInfo(buf, "catalog_xmin %u", xlrec->new_catalog_xmin);
+	}
 }
 
 const char *
@@ -324,6 +330,9 @@ xact_identify(uint8 info)
 		case XLOG_XACT_ASSIGNMENT:
 			id = "ASSIGNMENT";
 			break;
+		case XLOG_XACT_CATALOG_XMIN_ADV:
+			id = "CATALOG_XMIN";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 2f7e645..f786056 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -393,6 +393,21 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
 	}
 }
 
+/*
+ * Set the global oldest catalog_xmin used to determine when tuples
+ * may be removed from catalogs and user-catalogs accessible from logical
+ * decoding.
+ *
+ * Only to be called from the startup process or by UpdateOldestCatalogXmin(),
+ * which ensures the update is properly written to xlog first.
+ */
+void
+SetOldestCatalogXmin(TransactionId oldestCatalogXmin)
+{
+	Assert(InRecovery || !IsUnderPostmaster || AmStartupProcess() || LWLockHeldByMe(ProcArrayLock));
+	elog(DEBUG1, "XXX advancing catalogXmin from %u to %u", ShmemVariableCache->oldestCatalogXmin, oldestCatalogXmin);
+	ShmemVariableCache->oldestCatalogXmin = oldestCatalogXmin;
+}
 
 /*
  * ForceTransactionIdLimitUpdate -- does the XID wrap-limit data need updating?
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index d643216..3377d3e 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5641,6 +5641,61 @@ xact_redo(XLogReaderState *record)
 			ProcArrayApplyXidAssignment(xlrec->xtop,
 										xlrec->nsubxacts, xlrec->xsub);
 	}
+	else if (info == XLOG_XACT_CATALOG_XMIN_ADV)
+	{
+		xl_xact_catalog_xmin_advance *xlrec = (xl_xact_catalog_xmin_advance *) XLogRecGetData(record);
+
+		/*
+		 * Unless logical decoding is possible on this node, we don't care about
+		 * this record.
+		 */
+		if (!XLogLogicalInfoActive() || max_replication_slots == 0)
+			return;
+
+		/*
+		 * Apply the new catalog_xmin limit immediately. New decoding sessions
+		 * will refuse to start if their slot is past it, and old ones will
+		 * notice when we signal them with a recovery conflict. There's no
+		 * effect on the catalogs themselves yet, so it's safe for backends
+		 * with older catalog_xmins to still exist.
+		 *
+		 * We don't have to take ProcArrayLock since only the startup process
+		 * is allowed to change oldestCatalogXmin when we're in recovery.
+		 */
+		SetOldestCatalogXmin(xlrec->new_catalog_xmin);
+
+		/*
+		 * Notify any active logical decoding sessions to terminate if they
+		 * need the catalogs we're going to be allowed to remove after
+		 * replaying this record.
+		 */
+		ResolveRecoveryConflictWithLogicalDecoding(xlrec->new_catalog_xmin);
+	}
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
 }
+
+/*
+ * Record when we advance the catalog_xmin used for tuple removal
+ * so standbys find out before we remove catalog tuples they might
+ * need for logical decoding.
+ */
+XLogRecPtr
+XactLogCatalogXminUpdate(TransactionId new_catalog_xmin)
+{
+	XLogRecPtr ptr = InvalidXLogRecPtr;
+
+	if (XLogInsertAllowed())
+	{
+		xl_xact_catalog_xmin_advance xlrec;
+
+		xlrec.new_catalog_xmin = new_catalog_xmin;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) &xlrec, SizeOfXactCatalogXminAdvance);
+
+		ptr = XLogInsert(RM_XACT_ID, XLOG_XACT_CATALOG_XMIN_ADV);
+	}
+
+	return ptr;
+}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index ce4f1fc..7fbc768 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4815,6 +4815,7 @@ BootStrapXLOG(void)
 	checkPoint.nextMultiOffset = 0;
 	checkPoint.oldestXid = FirstNormalTransactionId;
 	checkPoint.oldestXidDB = TemplateDbOid;
+	checkPoint.oldestCatalogXmin = InvalidTransactionId;
 	checkPoint.oldestMulti = FirstMultiXactId;
 	checkPoint.oldestMultiDB = TemplateDbOid;
 	checkPoint.oldestCommitTsXid = InvalidTransactionId;
@@ -4827,6 +4828,7 @@ BootStrapXLOG(void)
 	ShmemVariableCache->oidCount = 0;
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
 	SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId);
 
@@ -6405,6 +6407,9 @@ StartupXLOG(void)
 	   (errmsg_internal("oldest unfrozen transaction ID: %u, in database %u",
 						checkPoint.oldestXid, checkPoint.oldestXidDB)));
 	ereport(DEBUG1,
+			(errmsg_internal("oldest catalog-only transaction ID: %u",
+							 checkPoint.oldestCatalogXmin)));
+	ereport(DEBUG1,
 			(errmsg_internal("oldest MultiXactId: %u, in database %u",
 						 checkPoint.oldestMulti, checkPoint.oldestMultiDB)));
 	ereport(DEBUG1,
@@ -6421,6 +6426,7 @@ StartupXLOG(void)
 	ShmemVariableCache->oidCount = 0;
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
 	SetCommitTsLimit(checkPoint.oldestCommitTsXid,
 					 checkPoint.newestCommitTsXid);
@@ -8450,6 +8456,7 @@ CreateCheckPoint(int flags)
 	checkPoint.nextXid = ShmemVariableCache->nextXid;
 	checkPoint.oldestXid = ShmemVariableCache->oldestXid;
 	checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB;
+	checkPoint.oldestCatalogXmin = ShmemVariableCache->oldestCatalogXmin;
 	LWLockRelease(XidGenLock);
 
 	LWLockAcquire(CommitTsLock, LW_SHARED);
@@ -8653,7 +8660,7 @@ CreateCheckPoint(int flags)
 	 * StartupSUBTRANS hasn't been called yet.
 	 */
 	if (!RecoveryInProgress())
-		TruncateSUBTRANS(GetOldestXmin(NULL, false));
+		TruncateSUBTRANS(GetOldestXmin(NULL, false, false));
 
 	/* Real work is done, but log and update stats before releasing lock. */
 	LogCheckpointEnd(false);
@@ -9016,7 +9023,7 @@ CreateRestartPoint(int flags)
 	 * this because StartupSUBTRANS hasn't been called yet.
 	 */
 	if (EnableHotStandby)
-		TruncateSUBTRANS(GetOldestXmin(NULL, false));
+		TruncateSUBTRANS(GetOldestXmin(NULL, false, false));
 
 	/* Real work is done, but log and update before releasing lock. */
 	LogCheckpointEnd(true);
@@ -9204,6 +9211,16 @@ XLogReportParameters(void)
 			XLogFlush(recptr);
 		}
 
+		/*
+		 * If wal_level was lowered from WAL_LEVEL_LOGICAL we no longer
+		 * require oldestCatalogXmin in checkpoints and it no longer
+		 * makes sense, so update shmem and xlog the change. This will
+		 * get written out in the next checkpoint.
+		 */
+		if (ControlFile->wal_level >= WAL_LEVEL_LOGICAL &&
+			wal_level < WAL_LEVEL_LOGICAL)
+			UpdateOldestCatalogXmin(true);
+
 		ControlFile->MaxConnections = MaxConnections;
 		ControlFile->max_worker_processes = max_worker_processes;
 		ControlFile->max_prepared_xacts = max_prepared_xacts;
@@ -9372,6 +9389,7 @@ xlog_redo(XLogReaderState *record)
 		MultiXactAdvanceOldest(checkPoint.oldestMulti,
 							   checkPoint.oldestMultiDB);
 		SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+		SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
 
 		/*
 		 * If we see a shutdown checkpoint while waiting for an end-of-backup
@@ -9470,8 +9488,8 @@ xlog_redo(XLogReaderState *record)
 							   checkPoint.oldestMultiDB);
 		if (TransactionIdPrecedes(ShmemVariableCache->oldestXid,
 								  checkPoint.oldestXid))
-			SetTransactionIdLimit(checkPoint.oldestXid,
-								  checkPoint.oldestXidDB);
+			SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+		SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
 		/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
 		ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
 		ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 08b646d..03976a9 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2272,7 +2272,7 @@ IndexBuildHeapRangeScan(Relation heapRelation,
 	{
 		snapshot = SnapshotAny;
 		/* okay to ignore lazy VACUUMs here */
-		OldestXmin = GetOldestXmin(heapRelation, true);
+		OldestXmin = GetOldestXmin(heapRelation, true, false);
 	}
 
 	scan = heap_beginscan_strat(heapRelation,	/* relation */
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index c617abb..2170566 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -992,7 +992,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	totalblocks = RelationGetNumberOfBlocks(onerel);
 
 	/* Need a cutoff xmin for HeapTupleSatisfiesVacuum */
-	OldestXmin = GetOldestXmin(onerel, true);
+	OldestXmin = GetOldestXmin(onerel, true, false);
 
 	/* Prepare for sampling block numbers */
 	BlockSampler_Init(&bs, totalblocks, targrows, random());
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 0919ad8..3efc833 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -2119,11 +2119,17 @@ dbase_redo(XLogReaderState *record)
 			 * InitPostgres() cannot fully re-execute concurrently. This
 			 * avoids backends re-connecting automatically to same database,
 			 * which can happen in some cases.
+			 *
+			 * This will lock out walsenders trying to connect to db-specific
+			 * slots for logical decoding too, so it's safe for us to drop slots.
 			 */
 			LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
 			ResolveRecoveryConflictWithDatabase(xlrec->db_id);
 		}
 
+		/* Drop any database-specific replication slots */
+		ReplicationSlotsDropDBSlots(xlrec->db_id);
+
 		/* Drop pages for this database that are in the shared buffer cache */
 		DropDatabaseBuffers(xlrec->db_id);
 
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 58bbf55..7c257f5 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -488,6 +488,15 @@ vacuum_set_xid_limits(Relation rel,
 	MultiXactId safeMxactLimit;
 
 	/*
+	 * When logical decoding is enabled, we must write any advance of
+	 * catalog_xmin to xlog before we allow VACUUM to remove those tuples.
+	 * This ensures that any standbys doing logical decoding can cancel
+	 * decoding sessions and invalidate slots if we remove tuples they
+	 * still need.
+	 */
+	UpdateOldestCatalogXmin(false);
+
+	/*
 	 * We can always ignore processes running lazy vacuum.  This is because we
 	 * use these values only for deciding which tuples we must keep in the
 	 * tables.  Since lazy vacuum doesn't write its XID anywhere, it's safe to
@@ -497,7 +506,7 @@ vacuum_set_xid_limits(Relation rel,
 	 * always an independent transaction.
 	 */
 	*oldestXmin =
-		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel);
+		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true, false), rel);
 
 	Assert(TransactionIdIsNormal(*oldestXmin));
 
@@ -909,7 +918,7 @@ vac_update_datfrozenxid(void)
 	 * committed pg_class entries for new tables; see AddNewRelationTuple().
 	 * So we cannot produce a wrong minimum by starting with this.
 	 */
-	newFrozenXid = GetOldestXmin(NULL, true);
+	newFrozenXid = GetOldestXmin(NULL, true, false);
 
 	/*
 	 * Similarly, initialize the MultiXact "min" with the value that would be
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index a31d44e..ba69ae9 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -51,6 +51,7 @@
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
 #include "storage/proc.h"
+#include "storage/procarray.h"
 #include "storage/shmem.h"
 #include "storage/smgr.h"
 #include "storage/spin.h"
@@ -295,6 +296,14 @@ BackgroundWriterMain(void)
 		}
 
 		/*
+		 * Eagerly advance the catalog_xmin used by vacuum if we're not
+		 * a standby. This ensures that standbys waiting for catalog_xmin
+		 * confirmation receive it promptly.
+		 */
+		if (!RecoveryInProgress())
+			UpdateOldestCatalogXmin(false);
+
+		/*
 		 * Log a new xl_running_xacts every now and then so replication can
 		 * get into a consistent state faster (think of suboverflowed
 		 * snapshots) and clean up resources (locks, KnownXids*) more
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index a392197..7127b9f 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3307,6 +3307,8 @@ pgstat_get_wait_activity(WaitEventActivity w)
 		case WAIT_EVENT_WAL_WRITER_MAIN:
 			event_name = "WalWriterMain";
 			break;
+		case WAIT_EVENT_STANDBY_LOGICAL_SLOT_CREATE:
+			event_name = "StandbyLogicalSlotCreate";
 		/* no default case, so that compiler will warn */
 	}
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 46cd5ba..5eaf42f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -288,6 +288,17 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 */
 			ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
 			break;
+		case XLOG_XACT_CATALOG_XMIN_ADV:
+			/*
+			 * The global catalog_xmin has been advanced. By the time we see
+			 * this in logical decoding it no longer matters, since it's
+			 * guaranteed that all later records will be consistent with the
+			 * advanced catalog_xmin, so we ignore it here. If we were running
+			 * on a standby and it applied a catalog xmin advance past our
+			 * needed catalog_xmin we would've already been terminated with a
+			 * conflict with standby error.
+			 */
+			break;
 		default:
 			elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
 	}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1512be5..9912800 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -29,6 +29,7 @@
 #include "postgres.h"
 
 #include "miscadmin.h"
+#include "pgstat.h"
 
 #include "access/xact.h"
 #include "access/xlog_internal.h"
@@ -38,11 +39,14 @@
 #include "replication/reorderbuffer.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
+#include "replication/walreceiver.h"
 
+#include "storage/ipc.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 
 #include "utils/memutils.h"
+#include "utils/ps_status.h"
 
 /* data for errcontext callback */
 typedef struct LogicalErrorCallbackState
@@ -68,6 +72,10 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
 
+static void WaitForMasterCatalogXminReservation(ReplicationSlot *slot);
+
+static void EnsureActiveLogicalSlotValid(void);
+
 /*
  * Make sure the current settings & environment are capable of doing logical
  * decoding.
@@ -87,23 +95,53 @@ CheckLogicalDecodingRequirements(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("logical decoding requires a database connection")));
 
-	/* ----
-	 * TODO: We got to change that someday soon...
-	 *
-	 * There's basically three things missing to allow this:
-	 * 1) We need to be able to correctly and quickly identify the timeline a
-	 *	  LSN belongs to
-	 * 2) We need to force hot_standby_feedback to be enabled at all times so
-	 *	  the primary cannot remove rows we need.
-	 * 3) support dropping replication slots referring to a database, in
-	 *	  dbase_redo. There can't be any active ones due to HS recovery
-	 *	  conflicts, so that should be relatively easy.
-	 * ----
-	 */
 	if (RecoveryInProgress())
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-			   errmsg("logical decoding cannot be used while in recovery")));
+	{
+		bool walrcv_running, walrcv_has_slot;
+
+		SpinLockAcquire(&WalRcv->mutex);
+		walrcv_running = WalRcv->pid != 0;
+		walrcv_has_slot = WalRcv->slotname[0] != '\0';
+		SpinLockRelease(&WalRcv->mutex);
+
+		/*
+		 * The walreceiver should be running when we try to create a slot. If
+		 * we're unlucky enough to catch the walreceiver just as it's
+		 * restarting after an error, well, the client can just retry. We don't
+		 * bother to sleep and re-check.
+		 */
+		if (!walrcv_running)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("streaming replication is not active"),
+					 errhint("Logical decoding on standby requires that streaming replication be configured and active. Ensure that primary_conninfo is correct in recovery.conf and check for streaming replication errors in the logs.")));
+
+		/*
+		 * When decoding on a standby we need a physical slot to be used by the
+		 * walrececiver so we can pin the upstream's catalog_xmin down even
+		 * over connection loss and restarts. This also gives us somewhere to
+		 * record our needed catalog xmin on the master.
+		 */
+		if (!walrcv_has_slot)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("no replication slot configured for connection to master"),
+					 errhint("Logical decoding on standby requires that a physical replication slot be used to connect the standby to the master.")));
+
+		/*
+		 * We need hot_standby_feedback to make sure the master doesn't vacuum
+		 * away tuples we need.
+		 *
+		 * This check doesn't stop the user disabling it once we check, but they
+		 * could also drop and re-create the physical replication slot without
+		 * our noticing or do other silly things. Don't do that. If they do it
+		 * anyway we'll notice and fail with conflict with recovery later.
+		 */
+		if (!hot_standby_feedback)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("hot_standby_feedback is not enabled")));
+	}
 }
 
 /*
@@ -126,6 +164,8 @@ StartupDecodingContext(List *output_plugin_options,
 	/* shorter lines... */
 	slot = MyReplicationSlot;
 
+	EnsureActiveLogicalSlotValid();
+
 	context = AllocSetContextCreate(CurrentMemoryContext,
 									"Logical decoding context",
 									ALLOCSET_DEFAULT_SIZES);
@@ -266,7 +306,9 @@ CreateInitDecodingContext(char *plugin,
 	 * xmin horizons by other backends, get the safe decoding xid, and inform
 	 * the slot machinery about the new limit. Once that's done the
 	 * ProcArrayLock can be released as the slot machinery now is
-	 * protecting against vacuum.
+	 * protecting against vacuum - if we're on the master. If we're running on
+	 * a replica, we have to wait until hot_standby_feedback locks in our
+	 * needed catalogs, per details on WaitForMasterCatalogXminReservation().
 	 * ----
 	 */
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
@@ -276,6 +318,12 @@ CreateInitDecodingContext(char *plugin,
 
 	ReplicationSlotsComputeRequiredXmin(true);
 
+	if (RecoveryInProgress())
+		WaitForMasterCatalogXminReservation(slot);
+
+	Assert(TransactionIdPrecedesOrEquals(ShmemVariableCache->oldestCatalogXmin,
+										 slot->data.catalog_xmin));
+
 	LWLockRelease(ProcArrayLock);
 
 	/*
@@ -963,3 +1011,244 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		SpinLockRelease(&MyReplicationSlot->mutex);
 	}
 }
+
+/*
+ * Wait until the master's catalog_xmin is set, advancing our catalog_xmin
+ * if needed. Caller must hold exclusive ProcArrayLock, which this function will
+ * temporarily release while sleeping but will re-acquire.
+ *
+ * We're pretty much just hoping that, if someone else already has a
+ * catalog_xmin reservation affecting the master, it stays where we want it
+ * until our own hot_standby_feedback can pin it down.
+ *
+ * When we're creating a slot on a standby we can't directly set the
+ * master's catalog_xmin; the catalog_xmin is set locally, then relayed
+ * over hot_standby_feedback. The master may remove the catalogs we
+ * asked to reserve between when we set a local catalog_xmin and when
+ * hs feedback makes that take effect on the master. We need a feedback
+ * reply mechanism here, where:
+ *
+ * - we tentatively reserve catalog_xmin locally
+ * - we wake the walreceiver by setting its latch
+ * - walreceiver sends hs_feedback
+ * - upstream walsender sends a new 'hs_feedback reply' message with
+ *   actual (xmin, catalog_xmin) reserved.
+ * - walreceiver sees reply and updates ShmemVariableCache or some other
+ *   handy bit of shmem with hs feedback reservations from reply
+ * - we poll the reservations while we wait
+ * - we set our catalog_xmin to that value, which might be later if
+ *   we missed our requested reservation, or might be earlier if
+ *   someone else is holding down catalog_xmin on master. We got a hs
+ *   feedback reply so we know it's reserved.
+ *
+ * For cascading, the actual reservation will need to cascade up the
+ * chain by walsender setting its own walreceiver's latch in turn, etc.
+ *
+ * For now, we just set the local slot catalog_xmin and sleep until
+ * oldestCatalogXmin equals or passes our reservation. This is fine if we're
+ * the only decoding session, but it is vulnerable to races if slots on the
+ * master or other decoding sessions on other standbys connected to the same
+ * master exist. They might advance their reservation before our hs_feedback
+ * locks it down, allowing vacuum to remove tuples we need. So we might start
+ * decoding on our slot then error with a conflict with recovery when we see
+ * catalog_xmin advance.
+ */
+static void
+WaitForMasterCatalogXminReservation(ReplicationSlot *slot)
+{
+	TimestampTz waitStart;
+	char	   *new_status;
+	XLogRecPtr firstWaitWalEnd, lastWaitWalEnd;
+
+	Assert(LWLockHeldByMe(ProcArrayLock));
+	Assert(TransactionIdIsValid(slot->effective_catalog_xmin));
+	Assert(slot->effective_catalog_xmin == slot->data.catalog_xmin);
+
+	waitStart = GetCurrentTimestamp();
+	new_status = NULL;			/* we haven't changed the ps display */
+
+	/*
+	 * The master doesn't reply to hot standby feedback explicitly,
+	 * identify which message is the most recent, nor does it report
+	 * the catalog_xmin reserved.
+	 *
+	 * This leaves a potential race. If catalog_xmin is already pinned down by
+	 * some other slot on the master or another standby,
+	 * ShmemVariableCache->oldestCatalogXmin will be set by it. We don't know
+	 * if our hot standby feedback is in effect and pinning down catalog_xmin
+	 * yet. If we start at the current oldestCatalogXmin the other slot might
+	 * advance and allow vacuum to remove tuples we need before our hot standby
+	 * feedback can lock it in. This may result in a conflict with standby at
+	 * some point after we create the slot and start decoding, when we see the
+	 * new xl_xact_catalog_xmin_advance record, unless our own catalog_xmin has
+	 * advanced enough by then that we no longer need the removed catalogs.
+	 * That can only happen if the xact holding down catalog_xmin has committed
+	 * by the time the needed catalogs are removed, so we can decode it,
+	 * advance confirmed_flush_lsn, and advance restart_lsn + catalog_xmin.
+	 *
+	 * To reduce the chances of triggering this race we force immediate
+	 * hot_standby_feedback, wait for a new latestWalEnd report from the
+	 * sender, and wait until we replay past that before we take the
+	 * catalog_xmin to start from. Without the ability to ask the walsender
+	 * to verify receipt of, and successful reservation of, a specific hot
+	 * standby feedback message this is the best we can do.
+	 *
+	 * If we lose the race, decoding will fail with a recovery conflict later.
+	 * The client will have to drop the slot and try again.
+	 *
+	 * Users can further mitigate this risk with a sufficiently high
+	 * vacuum_defer_cleanup_age.
+	 *
+	 * Users can completely prevent this problem by creating a temporary
+	 * logical slot on the master and waiting for the replica to catch up to
+	 * the master's xlog insert position before they create a slot on the
+	 * replica. Then wait until a catalog_xmin is reported on the replica's
+	 * physical slot before dropping the temporary slot on the master.
+	 *
+	 * TODO: get reply from server explicitly confirming that it has applied
+	 * our hs_feedback and what the lowest catalog_xmin it can honour is.
+	 * We'll need some kind of cookie so we can tell the server is replying
+	 * to us not someone else, especially in cascading setups.
+	 */
+
+	firstWaitWalEnd = lastWaitWalEnd = WalRcv->latestWalEnd;
+
+	WalRcvForceReply();
+
+	while (lastWaitWalEnd == firstWaitWalEnd ||
+		   GetXLogReplayRecPtr(NULL) < lastWaitWalEnd ||
+		   !TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin))
+	{
+		int ret;
+		XLogRecPtr ptr = GetXLogReplayRecPtr(NULL);
+
+		elog(DEBUG1, "XXX firstEnd %X/%X, lastEnd %X/%X; ptr %X/%X; oldestCatalogXmin %u",
+			(uint32)(firstWaitWalEnd>>32), (uint32)(firstWaitWalEnd),
+			(uint32)(lastWaitWalEnd>>32), (uint32)(lastWaitWalEnd),
+			(uint32)(ptr>>32), (uint32)(ptr),
+			ShmemVariableCache->oldestCatalogXmin);
+
+		/*
+		 * We need to advance our slot's catalog_xmin to keep pace with the
+		 * latest reported position from the master. That way we won't get
+		 * canceled with a recovery conflict when the master sends catalog_xmin
+		 * updates while we're waiting for redo to catch up with the position
+		 * we saw when we started waiting.
+		 *
+		 * A problem arises here when the server sends an
+		 * xl_xact_catalog_xmin_advance with oldestCatalogXmin = 0, indicating
+		 * it is no longer reserving catalogs. Since we're creating a slot we
+		 * don't mind, but the redo code does not know that and will treat our
+		 * process as conflicting with recovery. To guard against that we'll
+		 * advance our oldestCatalogXmin to the new
+		 * GetOldestSafeDecodingTransactionId() and redo will ignore slots
+		 * whose catalog_xmin is >= nextXid. So long as we loop faster than the
+		 * maximum standby delay we'll keep ahead of recovery cancellations.
+		 * This means we must take XidGenLock once per loop, but it's not like
+		 * we spend a lot of time creating slots.
+		 *
+		 * It's fine for our catalog_xmin to go backwards when the server
+		 * reports it has nailed down catalog_xmin so we just uncondtionally
+		 * reassign our catalog_xmin.
+		 */
+		slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId();
+		slot->data.catalog_xmin = slot->effective_catalog_xmin;
+		ReplicationSlotsComputeRequiredXmin(true);
+
+		LWLockRelease(ProcArrayLock);
+
+		ret = WaitLatch(&MyProc->procLatch,
+						WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+						500, WAIT_EVENT_STANDBY_LOGICAL_SLOT_CREATE);
+
+		if (ret & WL_POSTMASTER_DEATH)
+			proc_exit(1);
+
+		if (ret & WL_LATCH_SET)
+			ResetLatch(&MyProc->procLatch);
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* Notice if the server has reported new WAL since we sent our feedback */
+		if (lastWaitWalEnd == firstWaitWalEnd)
+			lastWaitWalEnd = WalRcv->latestWalEnd;
+
+		/* Update process title if waiting long enough */
+		if (update_process_title && new_status == NULL &&
+			TimestampDifferenceExceeds(waitStart, GetCurrentTimestamp(),
+									   500))
+		{
+			const char *old_status;
+			int			len;
+
+			old_status = get_ps_display(&len);
+			new_status = (char *) palloc(len + 8 + 1);
+			memcpy(new_status, old_status, len);
+			strcpy(new_status + len, " waiting");
+			set_ps_display(new_status, false);
+			new_status[len] = '\0'; /* truncate off " waiting" */
+		}
+
+		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+	}
+
+	if (TransactionIdPrecedes(slot->effective_catalog_xmin, ShmemVariableCache->oldestCatalogXmin))
+	{
+		/*
+		 * We didn't reserve the catalog_xmin we wanted, the master has already removed it.
+		 * We have to start decoding at a later point.
+		 */
+		slot->effective_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
+		slot->data.catalog_xmin = slot->effective_catalog_xmin;
+	}
+
+	ReplicationSlotsComputeRequiredXmin(true);
+
+	/* Tell the master what catalog_xmin we settled on */
+	WalRcvForceReply();
+
+	/* Reset ps display if we changed it */
+	if (new_status)
+	{
+		set_ps_display(new_status, false);
+		pfree(new_status);
+	}
+
+	Assert(TransactionIdFollowsOrEquals(slot->effective_catalog_xmin, ShmemVariableCache->oldestCatalogXmin));
+	Assert(LWLockHeldByMe(ProcArrayLock));
+}
+
+/*
+ * Test to see if the active logical slot is usable.
+ */
+static void
+EnsureActiveLogicalSlotValid()
+{
+	Assert(MyReplicationSlot != NULL);
+
+	/*
+	 * Currently a logical can only become unusable if we're doing logical
+	 * decoding on standby and the master advanced its catalog_xmin past
+	 * the threshold we need, removing tuples that we'll require to start
+	 * decoding at our restart_lsn.
+	 */
+	if (RecoveryInProgress())
+	{
+		/*
+		 * Check if enough catalog is retained for this slot. No locking is needed
+		 * here since oldestCatalogXmin can only advance, so if it's past what
+		 * we need that's not going to change. We have marked our slot as active
+		 * so redo won't replay past our catalog_xmin without first terminating our
+		 * session.
+		 */
+		TransactionId shmem_catalog_xmin =
+			*(volatile TransactionId*)(&ShmemVariableCache->oldestCatalogXmin);
+
+		if (!TransactionIdIsValid(shmem_catalog_xmin) ||
+			TransactionIdFollows(shmem_catalog_xmin, MyReplicationSlot->data.catalog_xmin))
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("replication slot '%s' requires catalogs removed by master",
+							 NameStr(MyReplicationSlot->data.name))));
+	}
+}
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 0b2575e..35920cd 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -758,6 +758,93 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
 	return false;
 }
 
+/*
+ * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
+ * passed database oid. The caller should hold an exclusive lock on the database
+ * to ensure no replication slots on the database are in use.
+ *
+ * If we fail here we'll leave the in-memory state of replication slots
+ * inconsistent with its on-disk state, so we need to PANIC.
+ *
+ * This routine isn't as efficient as it could be - but we don't drop databases
+ * often, especially databases with lots of slots.
+ */
+void
+ReplicationSlotsDropDBSlots(Oid dboid)
+{
+	int			i;
+
+	if (max_replication_slots <= 0)
+		return;
+
+	/*
+	 * We only need a shared lock here even though we activate slots,
+	 * because we have an exclusive lock on the database we're dropping
+	 * slots on and don't touch other databases' slots.
+	 */
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s;
+		NameData slotname;
+		int active_pid;
+
+		s = &ReplicationSlotCtl->replication_slots[i];
+
+		/* cannot change while ReplicationSlotCtlLock is held */
+		if (!s->in_use)
+			continue;
+
+		/* only logical slots are database specific, skip */
+		if (!SlotIsLogical(s))
+			continue;
+
+		/* not our database, skip */
+		if (s->data.database != dboid)
+			continue;
+
+		/* Claim the slot, as if ReplicationSlotAcquire()ing */
+		SpinLockAcquire(&s->mutex);
+		strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN);
+		NameStr(slotname)[NAMEDATALEN-1] = '\0';
+		active_pid = s->active_pid;
+		if (active_pid == 0)
+		{
+			MyReplicationSlot = s;
+			s->active_pid = MyProcPid;
+		}
+		SpinLockRelease(&s->mutex);
+
+		/*
+		 * The caller should have an exclusive lock on the database so
+		 * we'll never have any in-use slots, but just in case...
+		 */
+		if (active_pid)
+			elog(PANIC, "replication slot %s is in use by pid %d",
+				 NameStr(slotname), active_pid);
+
+		/*
+		 * To avoid largely duplicating ReplicationSlotDropAcquired() or
+		 * complicating it with already_locked flags for ProcArrayLock,
+		 * ReplicationSlotControlLock and ReplicationSlotAllocationLock, we
+		 * just release our ReplicationSlotControlLock to drop the slot.
+		 *
+		 * There's no race here: we acquired this slot, and no slot "behind"
+		 * our scan can be created or become active with our target dboid due
+		 * to our exclusive lock on the DB.
+		 */
+		LWLockRelease(ReplicationSlotControlLock);
+		ReplicationSlotDropAcquired();
+		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	/* recompute limits once after all slots are dropped */
+	ReplicationSlotsComputeRequiredXmin(false);
+	ReplicationSlotsComputeRequiredLSN();
+}
+
 
 /*
  * Check whether the server's configuration supports using replication
@@ -805,7 +892,9 @@ ReplicationSlotReserveWal(void)
 		/*
 		 * For logical slots log a standby snapshot and start logical decoding
 		 * at exactly that position. That allows the slot to start up more
-		 * quickly.
+		 * quickly. We can't do that on a standby; there we must wait for the
+		 * bgwriter to get around to logging its periodic standby snapshot.
+		 * (TODO: ask walreceiver to ask walsender to log it or ask bgworker to log it)
 		 *
 		 * That's not needed (or indeed helpful) for physical slots as they'll
 		 * start replay at the last logged checkpoint anyway. Instead return
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 2bb3dce..c887523 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -499,9 +499,15 @@ WalReceiverMain(void)
 						 * feedback now.  Make sure the flag is really set to
 						 * false in shared memory before sending the reply, so
 						 * we don't miss a new request for a reply.
+						 *
+						 * If logical decoding information is enabled, we also
+						 * send immediate hot standby feedback so as to reduce
+						 * the delay before our needed catalogs are locked in.
 						 */
 						walrcv->force_reply = false;
 						pg_memory_barrier();
+						if (XLogLogicalInfoActive())
+							XLogWalRcvSendHSFeedback(true);
 						XLogWalRcvSendReply(true, false);
 					}
 				}
@@ -1164,8 +1170,8 @@ XLogWalRcvSendHSFeedback(bool immed)
 {
 	TimestampTz now;
 	TransactionId nextXid;
-	uint32		nextEpoch;
-	TransactionId xmin;
+	uint32		xmin_epoch, catalog_xmin_epoch;
+	TransactionId xmin, catalog_xmin;
 	static TimestampTz sendTime = 0;
 	static bool master_has_standby_xmin = false;
 
@@ -1206,29 +1212,57 @@ XLogWalRcvSendHSFeedback(bool immed)
 	 * everything else has been checked.
 	 */
 	if (hot_standby_feedback)
-		xmin = GetOldestXmin(NULL, false);
+	{
+		/*
+		 * Usually GetOldestXmin() would include the catalog_xmin in its
+		 * calculations, but we don't want to hold upstream back from vacuuming
+		 * normal user table tuples just because they're within the
+		 * catalog_xmin horizon of logical replication slots on this standby.
+		 * Instead we report the catalog_xmin to the upstream separately.
+		 */
+		xmin = GetOldestXmin(NULL,
+							 false, /* don't ignore vacuum */
+							 true /* ignore catalog xmin */);
+		
+		/*
+		 * The catalog_Xmin reported by GetOldestXmin is the effective
+		 * catalog_xmin used by vacuum, as set by xl_xact_catalog_xmin_advance
+		 * records from the master. Sending it back to the master would be
+		 * circular and prevent its catalog_xmin ever advancing once set.
+		 * We should only send the catalog_xmin we actually need for slots.
+		 */
+		ProcArrayGetReplicationSlotXmin(NULL, NULL, &catalog_xmin);
+	}
 	else
+	{
 		xmin = InvalidTransactionId;
+		catalog_xmin = InvalidTransactionId;
+	}
 
 	/*
 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
 	 * the epoch boundary.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+	GetNextXidAndEpoch(&nextXid, &xmin_epoch);
+	catalog_xmin_epoch = xmin_epoch;
 	if (nextXid < xmin)
-		nextEpoch--;
+		xmin_epoch --;
+	if (nextXid < catalog_xmin)
+		catalog_xmin_epoch --;
 
-	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
-		 xmin, nextEpoch);
+	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
+		 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
 
 	/* Construct the message and send it. */
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, 'h');
 	pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
 	pq_sendint(&reply_message, xmin, 4);
-	pq_sendint(&reply_message, nextEpoch, 4);
+	pq_sendint(&reply_message, xmin_epoch, 4);
+	pq_sendint(&reply_message, catalog_xmin, 4);
+	pq_sendint(&reply_message, catalog_xmin_epoch, 4);
 	walrcv_send(reply_message.data, reply_message.len);
-	if (TransactionIdIsValid(xmin))
+	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
 		master_has_standby_xmin = true;
 	else
 		master_has_standby_xmin = false;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 694e777..4b63af9 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -188,7 +188,6 @@ static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
 
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
-static void WalSndXLogSendHandler(SIGNAL_ARGS);
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
@@ -217,6 +216,7 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
@@ -1554,6 +1554,11 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	 * be energy wasted - the worst lost information can do here is give us
 	 * wrong information in a statistics view - we'll just potentially be more
 	 * conservative in removing files.
+	 *
+	 * We don't have to do any effective_xmin / effective_catalog_xmin testing
+	 * here either, like for LogicalConfirmReceivedLocation. If we received
+	 * the xmin and catalog_xmin from downstream replication slots we know they
+	 * were already confirmed there,
 	 */
 }
 
@@ -1616,7 +1621,7 @@ ProcessStandbyReplyMessage(void)
 
 /* compute new replication slot xmin horizon if needed */
 static void
-PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 {
 	bool		changed = false;
 	ReplicationSlot *slot = MyReplicationSlot;
@@ -1637,6 +1642,22 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 		slot->data.xmin = feedbackXmin;
 		slot->effective_xmin = feedbackXmin;
 	}
+	/*
+	 * If the physical slot is relaying catalog_xmin for logical replication
+	 * slots on the replica it's safe to act on catalog_xmin advances
+	 * immediately too. The replica will only send a new catalog_xmin via
+	 * feedback when it advances its effective_catalog_xmin, so it's done the
+	 * delay-until-confirmed dance for us and knows it won't need the data
+	 * we're protecting from vacuum again.
+	 */
+	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
+		!TransactionIdIsNormal(feedbackCatalogXmin) ||
+		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
+	{
+		changed = true;
+		slot->data.catalog_xmin = feedbackCatalogXmin;
+		slot->effective_catalog_xmin = feedbackCatalogXmin;
+	}
 	SpinLockRelease(&slot->mutex);
 
 	if (changed)
@@ -1647,59 +1668,92 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 }
 
 /*
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
+ *
+ * This check doesn't care about whether clog exists for these xids
+ * at all.
+ */
+static bool
+TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
+{
+	TransactionId nextXid;
+	uint32		nextEpoch;
+
+	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+	if (xid <= nextXid)
+	{
+		if (epoch != nextEpoch)
+			return false;
+	}
+	else
+	{
+		if (epoch + 1 != nextEpoch)
+			return false;
+	}
+
+	if (!TransactionIdPrecedesOrEquals(xid, nextXid))
+		return false;				/* epoch OK, but it's wrapped around */
+
+	return true;
+}
+
+/*
  * Hot Standby feedback
  */
 static void
 ProcessStandbyHSFeedbackMessage(void)
 {
-	TransactionId nextXid;
-	uint32		nextEpoch;
 	TransactionId feedbackXmin;
 	uint32		feedbackEpoch;
+	TransactionId feedbackCatalogXmin;
+	uint32		feedbackCatalogEpoch;
 
 	/*
 	 * Decipher the reply message. The caller already consumed the msgtype
-	 * byte.
+	 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
+	 * of this message.
 	 */
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
 	feedbackXmin = pq_getmsgint(&reply_message, 4);
 	feedbackEpoch = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
 
-	elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+	elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
 		 feedbackXmin,
-		 feedbackEpoch);
+		 feedbackEpoch,
+		 feedbackCatalogXmin,
+		 feedbackCatalogEpoch);
 
-	/* Unset WalSender's xmin if the feedback message value is invalid */
-	if (!TransactionIdIsNormal(feedbackXmin))
+	/*
+	 * Unset WalSender's xmins if the feedback message values are invalid.
+	 * This happens when the downstream turned hot_standby_feedback off.
+	 */
+	if (!TransactionIdIsNormal(feedbackXmin)
+		&& !TransactionIdIsNormal(feedbackCatalogXmin))
 	{
 		MyPgXact->xmin = InvalidTransactionId;
 		if (MyReplicationSlot != NULL)
-			PhysicalReplicationSlotNewXmin(feedbackXmin);
+			PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 		return;
 	}
 
 	/*
 	 * Check that the provided xmin/epoch are sane, that is, not in the future
 	 * and not so far back as to be already wrapped around.  Ignore if not.
-	 *
-	 * Epoch of nextXid should be same as standby, or if the counter has
-	 * wrapped, then one greater than standby.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
-
-	if (feedbackXmin <= nextXid)
-	{
-		if (feedbackEpoch != nextEpoch)
-			return;
-	}
-	else
-	{
-		if (feedbackEpoch + 1 != nextEpoch)
-			return;
-	}
+	if (TransactionIdIsNormal(feedbackXmin) &&
+		!TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
+		return;
 
-	if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
-		return;					/* epoch OK, but it's wrapped around */
+	if (TransactionIdIsNormal(feedbackCatalogXmin) &&
+		!TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
+		return;
 
 	/*
 	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
@@ -1724,15 +1778,23 @@ ProcessStandbyHSFeedbackMessage(void)
 	 * already since a VACUUM could have just finished calling GetOldestXmin.)
 	 *
 	 * If we're using a replication slot we reserve the xmin via that,
-	 * otherwise via the walsender's PGXACT entry.
+	 * otherwise via the walsender's PGXACT entry. We can only track the
+	 * catalog xmin separately when using a slot, so we store the least
+	 * of the two provided when not using a slot.
 	 *
 	 * XXX: It might make sense to generalize the ephemeral slot concept and
 	 * always use the slot mechanism to handle the feedback xmin.
 	 */
 	if (MyReplicationSlot != NULL)		/* XXX: persistency configurable? */
-		PhysicalReplicationSlotNewXmin(feedbackXmin);
+		PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 	else
-		MyPgXact->xmin = feedbackXmin;
+	{
+		if (TransactionIdIsNormal(feedbackCatalogXmin)
+			&& TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
+			MyPgXact->xmin = feedbackCatalogXmin;
+		else
+			MyPgXact->xmin = feedbackXmin;
+	}
 }
 
 /*
@@ -2605,17 +2667,6 @@ WalSndSigHupHandler(SIGNAL_ARGS)
 	errno = save_errno;
 }
 
-/* SIGUSR1: set flag to send WAL records */
-static void
-WalSndXLogSendHandler(SIGNAL_ARGS)
-{
-	int			save_errno = errno;
-
-	latch_sigusr1_handler();
-
-	errno = save_errno;
-}
-
 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
 static void
 WalSndLastCycleHandler(SIGNAL_ARGS)
@@ -2649,7 +2700,7 @@ WalSndSignals(void)
 	pqsignal(SIGQUIT, quickdie);	/* hard crash time */
 	InitializeTimeouts();		/* establishes SIGALRM handler */
 	pqsignal(SIGPIPE, SIG_IGN);
-	pqsignal(SIGUSR1, WalSndXLogSendHandler);	/* request WAL sending */
+	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
 	pqsignal(SIGUSR2, WalSndLastCycleHandler);	/* request a last cycle and
 												 * shutdown */
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index e5d487d..ecde732 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1298,17 +1298,22 @@ TransactionIdIsActive(TransactionId xid)
  * process can set its xmin based on transactions that are no longer running
  * in the master but are still being replayed on the standby, thus possibly
  * making the GetOldestXmin reading go backwards.  In this case there is a
- * possibility that we lose data that the standby would like to have, but
- * there is little we can do about that --- data is only protected if the
- * walsender runs continuously while queries are executed on the standby.
- * (The Hot Standby code deals with such cases by failing standby queries
- * that needed to access already-removed data, so there's no integrity bug.)
+ * possibility that we lose data that the standby would like to have
+ * unless the standby uses a replication slot to make its xmin persistent
+ * even when it isn't connected. The Hot Standby code deals with such cases by
+ * failing standby queries that needed to access already-removed data, so
+ * there's no integrity bug.
+ *
  * The return value is also adjusted with vacuum_defer_cleanup_age, so
  * increasing that setting on the fly is another easy way to make
  * GetOldestXmin() move backwards, with no consequences for data integrity.
+ *
+ * The caller may request that replication slots' catalog_xmin values be
+ * disregarded when calculating the global xmin. The caller must account
+ * for catalog_xmin separately.
  */
 TransactionId
-GetOldestXmin(Relation rel, bool ignoreVacuum)
+GetOldestXmin(Relation rel, bool ignoreVacuum, bool ignoreCatalogXmin)
 {
 	ProcArrayStruct *arrayP = procArray;
 	TransactionId result;
@@ -1382,9 +1387,13 @@ GetOldestXmin(Relation rel, bool ignoreVacuum)
 		}
 	}
 
-	/* fetch into volatile var while ProcArrayLock is held */
+	/*
+	 * Fetch slot xmins into volatile var while ProcArrayLock is held. Note that
+	 * we're using the effective catalog_xmin for vacuum's tuple removal here,
+	 * as copied over by UpdateOldestCatalogXmin().
+	 */
 	replication_slot_xmin = procArray->replication_slot_xmin;
-	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+	replication_slot_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
 
 	if (RecoveryInProgress())
 	{
@@ -1433,19 +1442,93 @@ GetOldestXmin(Relation rel, bool ignoreVacuum)
 		NormalTransactionIdPrecedes(replication_slot_xmin, result))
 		result = replication_slot_xmin;
 
+	if (!ignoreCatalogXmin && (rel == NULL || RelationIsAccessibleInLogicalDecoding(rel)))
+	{
+		/*
+		 * After locks have been released and defer_cleanup_age has been applied,
+		 * check whether we need to back up further to make logical decoding
+		 * safe. We need to do so if we're computing the global limit (rel =
+		 * NULL) or if the passed relation is a catalog relation of some kind.
+		 */
+		if (TransactionIdIsValid(replication_slot_catalog_xmin) &&
+			NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
+			result = replication_slot_catalog_xmin;
+	}
+
+	return result;
+}
+
+/*
+ * Return true if ShmemVariableCache->oldestCatalogXmin needs to be updated
+ * to reflect an advance in procArray->replication_slot_catalog_xmin or
+ * it becoming newly set or unset.
+ *
+ */
+static bool
+CatalogXminNeedsUpdate(TransactionId vacuum_catalog_xmin, TransactionId slots_catalog_xmin)
+{
+	return (TransactionIdPrecedes(vacuum_catalog_xmin, slots_catalog_xmin)
+			|| (TransactionIdIsValid(vacuum_catalog_xmin) != TransactionIdIsValid(slots_catalog_xmin)));
+}
+
+/*
+ * If necessary, copy the current catalog_xmin needed by repliation slots to
+ * the effective catalog_xmin used for dead tuple removal.
+ *
+ * When logical decoding is enabled we write a WAL record before advancing the
+ * effective value so that standbys find out if catalog tuples they still need
+ * get removed, and can properly cancel decoding sessions and invalidate slots.
+ *
+ * The 'force' option is used when we're turning WAL_LEVEL_LOGICAL off
+ * and need to clear the shmem state, since we want to bypass the wal_level
+ * check and force xlog writing.
+ */
+void
+UpdateOldestCatalogXmin(bool force)
+{
+	TransactionId vacuum_catalog_xmin;
+	TransactionId slots_catalog_xmin;
+
+	/*
+	 * If we're not recording logical decoding information, catalog_xmin
+	 * must be unset and we don't need to do any work here.
+	 *
+	 * XXX TODO make sure we zero the checkpointed value when we turn logical decoding
+	 * off, and check it during startup!!
+	 */
+	if (!XLogLogicalInfoActive() && !force)
+	{
+		Assert(!TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin));
+		Assert(!TransactionIdIsValid(procArray->replication_slot_catalog_xmin));
+	}
+
+	Assert(XLogInsertAllowed());
+
 	/*
-	 * After locks have been released and defer_cleanup_age has been applied,
-	 * check whether we need to back up further to make logical decoding
-	 * possible. We need to do so if we're computing the global limit (rel =
-	 * NULL) or if the passed relation is a catalog relation of some kind.
+	 * Do an unlocked check first. This is obviously race-prone especially
+	 * since replication_slot_catalog_xmin could be updated after we read
+	 * oldestCatalogXmin. But it doesn't matter if we get wrong results here,
+	 * it'll either cause us to take an unnecessary ProcArrayLock to recheck,
+	 * or delay an update until the next vacuum run.
 	 */
-	if ((rel == NULL ||
-		 RelationIsAccessibleInLogicalDecoding(rel)) &&
-		TransactionIdIsValid(replication_slot_catalog_xmin) &&
-		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
-		result = replication_slot_catalog_xmin;
+	vacuum_catalog_xmin = *((volatile TransactionId*)&ShmemVariableCache->oldestCatalogXmin);
+	slots_catalog_xmin = *((volatile TransactionId*)&procArray->replication_slot_catalog_xmin);
 
-	return result;
+	if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin) || force)
+	{
+		XactLogCatalogXminUpdate(slots_catalog_xmin);
+
+		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+		/*
+		 * A concurrent updater could've changed these values so we need to re-check
+		 * under ProcArrayLock before updating.
+		 */
+		vacuum_catalog_xmin = *((volatile TransactionId*)&ShmemVariableCache->oldestCatalogXmin);
+		slots_catalog_xmin = *((volatile TransactionId*)&procArray->replication_slot_catalog_xmin);
+		if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin))
+			SetOldestCatalogXmin(slots_catalog_xmin);
+		LWLockRelease(ProcArrayLock);
+	}
 }
 
 /*
@@ -2173,14 +2256,20 @@ GetOldestSafeDecodingTransactionId(void)
 	oldestSafeXid = ShmemVariableCache->nextXid;
 
 	/*
-	 * If there's already a slot pegging the xmin horizon, we can start with
-	 * that value, it's guaranteed to be safe since it's computed by this
-	 * routine initially and has been enforced since.
+	 * If there's already an effectiveCatalogXmin held down by vacuum
+	 * it's definitely safe to start there, and it can't advance
+	 * while we hold ProcArrayLock.
 	 */
-	if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
-		TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
+	if (TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin) &&
+		TransactionIdPrecedes(ShmemVariableCache->oldestCatalogXmin,
 							  oldestSafeXid))
-		oldestSafeXid = procArray->replication_slot_catalog_xmin;
+		oldestSafeXid = ShmemVariableCache->oldestCatalogXmin;
+
+	/*
+	 * TODO: If we're on replica and using hot standby feedback to set catalog_xmin
+	 * we should be able to directly check the value reserved by feedback via shmem
+	 * from walreceiver, even if xlog replay hasn't passed that point yet.
+	 */
 
 	/*
 	 * If we're not in recovery, we walk over the procarray and collect the
@@ -2662,6 +2751,53 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
 }
 
 /*
+ * Notify a logical decoding session that it conflicts with a
+ * newly set catalog_xmin from the master.
+ */
+void
+CancelLogicalDecodingSessionWithRecoveryConflict(pid_t session_pid)
+{
+	ProcArrayStruct *arrayP = procArray;
+	int			index;
+
+	/*
+	 * We have to scan ProcArray to find the process and set a pending recovery
+	 * conflict even though we know the pid. At least we can get the BackendId
+	 * and void a ProcSignal scan later.
+	 *
+	 * The pid might've gone away, in which case we got the desired
+	 * outcome anyway.
+	 */
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+	for (index = 0; index < arrayP->numProcs; index++)
+	{
+		int			pgprocno = arrayP->pgprocnos[index];
+		volatile PGPROC *proc = &allProcs[pgprocno];
+
+		if (proc->pid == session_pid)
+		{
+			VirtualTransactionId procvxid;
+
+			GET_VXID_FROM_PGPROC(procvxid, *proc);
+
+			proc->recoveryConflictPending = true;
+
+			/*
+			 * Kill the pid if it's still here. If not, that's what we
+			 * wanted so ignore any errors.
+			 */
+			(void) SendProcSignal(session_pid,
+				PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN, procvxid.backendId);
+			
+			break;
+		}
+	}
+
+	LWLockRelease(ProcArrayLock);
+}
+
+/*
  * MinimumActiveBackends --- count backends (other than myself) that are
  *		in active transactions.  Return true if the count exceeds the
  *		minimum threshold passed.  This is used as a heuristic to decide if
@@ -2936,18 +3072,29 @@ ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin,
  *
  * Return the current slot xmin limits. That's useful to be able to remove
  * data that's older than those limits.
+ *
+ * For logical replication slots' catalog_xmin, we return both the effective
+ * catalog_xmin being used for tuple removal (retained catalog_xmin) and the
+ * catalog_xmin actually needed by replication slots (needed_catalog_xmin).
+ * retained_catalog_xmin should be older than needed_catalog_xmin but is not
+ * guaranteed to be if there are replication slots on a replica currently
+ * attempting to start up and reserve catalogs.
  */
 void
 ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
-								TransactionId *catalog_xmin)
+								TransactionId *retained_catalog_xmin,
+								TransactionId *needed_catalog_xmin)
 {
 	LWLockAcquire(ProcArrayLock, LW_SHARED);
 
 	if (xmin != NULL)
 		*xmin = procArray->replication_slot_xmin;
 
-	if (catalog_xmin != NULL)
-		*catalog_xmin = procArray->replication_slot_catalog_xmin;
+	if (retained_catalog_xmin != NULL)
+		 *retained_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
+
+	if (needed_catalog_xmin != NULL)
+		 *needed_catalog_xmin = procArray->replication_slot_catalog_xmin;
 
 	LWLockRelease(ProcArrayLock);
 }
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index a3d6ac5..d17dba1 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -273,6 +273,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE);
 
+	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN))
+		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN);
+
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_TABLESPACE))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_TABLESPACE);
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 875dcec..a0a051b 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -29,6 +29,7 @@
 #include "storage/procarray.h"
 #include "storage/sinvaladt.h"
 #include "storage/standby.h"
+#include "replication/slot.h"
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
@@ -152,7 +153,9 @@ GetStandbyLimitTime(void)
 static int	standbyWait_us = STANDBY_INITIAL_WAIT_US;
 
 /*
- * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
+ * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs and
+ * ResolveRecoveryConflictWithLogicalDecoding.
+ *
  * We wait here for a while then return. If we decide we can't wait any
  * more then we return true, if we can wait some more return false.
  */
@@ -1105,3 +1108,145 @@ LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
 					 nmsgs * sizeof(SharedInvalidationMessage));
 	XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS);
 }
+
+/*
+ * Scan to see if any clients are using replication slots that are below the
+ * new catalog_xmin theshold and sigal them to terminate with a recovery
+ * conflict.
+ *
+ * We already applied the new catalog_xmin record and updated the shmem
+ * catalog_xmin state, so new clients that try to use a replication slot
+ * whose on-disk catalog_xmin is below the new threshold will ERROR, and we
+ * don't have to guard against them here.
+ *
+ * Replay can only continue safely once every slot that needs the catalogs
+ * we're going to free for removal is gone. So if any conflicting sessions
+ * exist, wait for any standby conflict grace period then signal them to exit.
+ *
+ * The master might clear its reserved catalog_xmin if all upstream slots are
+ * removed or clear their feedback reservations, sending us
+ * InvalidTransactionId. If we're concurrently trying to create a new slot and
+ * reserve catalogs the InvalidXid reservation report might come in while we
+ * have a slot waiting for hs_feedback confirmation of its reservation. That
+ * would cause the waiting process to get canceled with a conflict with
+ * recovery here since its tentative reservation conflicts with the master's
+ * report of 'nothing reserved'. To allow it to continue to seek a startpoint
+ * we ignore slots whose catalog_xmin is >= nextXid, indicating that they're
+ * still looking for where to start. We'll sometimes notice a conflict but the
+ * slot will advance its catalog_xmin to a more recent nextXid and cease to
+ * conflict when we re-check. (The alternative is to track slots being created
+ * differently to slots actively decoding in shmem, which seems unnecessary. Or
+ * to separate the 'tentative catalog_xmin reservation' of a slot from its
+ * actual needed catalog_xmin.)
+ *
+ * We can't use ResolveRecoveryConflictWithVirtualXIDs() here because
+ * walsender-based logical decoding sessions won't have any virtualxid for much
+ * of their life and the end of their virtualxids doesn't mean the end of a
+ * potential conflict. It would also cancel too aggressively, since it cares
+ * about the backend's xmin and logical decoding only needs the catalog_xmin.
+ */
+void
+ResolveRecoveryConflictWithLogicalDecoding(TransactionId new_catalog_xmin)
+{
+	int i;
+
+	if (!InHotStandby)
+		/* nobody can be actively using logical slots */
+		return;
+
+	/* Already applied new limit, can't have replayed later one yet */
+	Assert(ShmemVariableCache->oldestCatalogXmin == new_catalog_xmin);
+
+	/*
+	 * Find the first conflicting active slot and wait for it to be free,
+	 * signalling it if necessary, then repeat until there are no more
+	 * conflicts.
+	 */
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *slot;
+		pid_t active_pid;
+
+		/* Reset standby wait back-off delay for each session waited for */
+		standbyWait_us = STANDBY_INITIAL_WAIT_US;
+
+		slot = &ReplicationSlotCtl->replication_slots[i];
+
+		/*
+		 * Physical slots can have a catalog_xmin, but if we're an intermediate
+		 * cascading standby all we do is pass the catalog_xmin up to our
+		 * master and relay WAL down to the cascaded replica. Conflicts are the
+		 * cascaded replica's problem.
+		 */
+		if (!(slot->in_use && SlotIsLogical(slot)))
+			continue;
+
+		/*
+		 * We only care about the effective_catalog_xmin of in-use logical slots.
+		 * Inactive slots have the same effective and actual catalog_xmin, and
+		 * we'll detect conflicts with those when an attempt is made to use
+		 * them. Active slots' catalog_xmin can't go backwards unless they
+		 * become inactive.
+		 *
+		 * We specifically ignore catalog_xmin reservations >= nextXid here to allow
+		 * for slots still being created; see function comment.
+		 */
+		while (slot->in_use && slot->active_pid != 0 &&
+			   TransactionIdIsValid(slot->effective_catalog_xmin) &&
+			   (!TransactionIdIsValid(new_catalog_xmin) ||
+				TransactionIdPrecedes(slot->effective_catalog_xmin, new_catalog_xmin)) &&
+			   TransactionIdPrecedes(slot->effective_catalog_xmin, ShmemVariableCache->nextXid))
+		{
+			/*
+			 * Wait for the conflicting session to exit, signalling it with
+			 * a conflict if necessary.
+			 *
+			 * We'll sleep here, so release the replication slot control lock. No
+			 * new conflicts can appear "behind" our scan of the replication_slots
+			 * array because sessions check the oldestCatalogXmin on decoding
+			 * startup. This lets the exiting backend clear the slot's its
+			 * active_pid.
+			 */
+			active_pid = slot->active_pid;
+			LWLockRelease(ReplicationSlotControlLock);
+
+			if (WaitExceedsMaxStandbyDelay())
+			{
+				/* 
+				 * As a safeguard against signalling the wrong process in case of
+				 * pid reassignment, check that the slot's active_pid hasn't been
+				 * cleared or changed. Do an unlocked read here since the worst
+				 * wrong outcome even in the case of garbage read is an extra
+				 * sleep. If you get a new backend with the same pid in the
+				 * same slot array position you have terrible luck, and it
+				 * might get cancelled with a spurious conflict. 
+				 */
+				if (active_pid != slot->active_pid)
+					continue;
+
+				ereport(INFO,
+						(errmsg("terminating logical decoding session due to recovery conflict"),
+						 errdetail("Pid %u requires catalog_xmin %u for replication slot '%s' but the master has removed catalogs up to xid %u.",
+								   active_pid, slot->effective_catalog_xmin,
+								   NameStr(slot->data.name), new_catalog_xmin)));
+
+				CancelLogicalDecodingSessionWithRecoveryConflict(active_pid);
+
+				/*
+				 * Wait a little bit for it to die so that we avoid flooding
+				 * an unresponsive backend when system is heavily loaded.
+				 */
+				pg_usleep(5000L);
+			}
+			
+			/*
+			 * We need to re-acquire the lock before re-checking the slot or
+			 * continuing the scan.
+			 */
+			LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+		}
+
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index cc84754..a6baa33 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2262,6 +2262,9 @@ errdetail_recovery_conflict(void)
 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
 			errdetail("User transaction caused buffer deadlock with recovery.");
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN:
+			errdetail("Logical replication slot requires catalog rows that will be removed.");
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_DATABASE:
 			errdetail("User was connected to a database that must be dropped.");
 			break;
@@ -2698,8 +2701,12 @@ SigHupHandler(SIGNAL_ARGS)
 /*
  * RecoveryConflictInterrupt: out-of-line portion of recovery conflict
  * handling following receipt of SIGUSR1. Designed to be similar to die()
- * and StatementCancelHandler(). Called only by a normal user backend
- * that begins a transaction during recovery.
+ * and StatementCancelHandler().
+ *
+ * Called by normal user backends running during recovery. Also used by the
+ * walsender to handle recovery conflicts with logical decoding, and by
+ * background workers that call CHECK_FOR_INTERRUPTS() and respect recovery
+ * conflicts.
  */
 void
 RecoveryConflictInterrupt(ProcSignalReason reason)
@@ -2781,6 +2788,7 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
 
 				/* Intentional drop through to session cancel */
 
+			case PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN:
 			case PROCSIG_RECOVERY_CONFLICT_DATABASE:
 				RecoveryConflictPending = true;
 				ProcDiePending = true;
@@ -2795,12 +2803,13 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
 		Assert(RecoveryConflictPending && (QueryCancelPending || ProcDiePending));
 
 		/*
-		 * All conflicts apart from database cause dynamic errors where the
-		 * command or transaction can be retried at a later point with some
-		 * potential for success. No need to reset this, since non-retryable
-		 * conflict errors are currently FATAL.
+		 * All conflicts apart from database and catalog_xmin cause dynamic
+		 * errors where the command or transaction can be retried at a later
+		 * point with some potential for success. No need to reset this, since
+		 * non-retryable conflict errors are currently FATAL.
 		 */
-		if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE)
+		if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE ||
+			reason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN)
 			RecoveryConflictRetryable = false;
 	}
 
@@ -2855,11 +2864,20 @@ ProcessInterrupts(void)
 		}
 		else if (RecoveryConflictPending)
 		{
-			/* Currently there is only one non-retryable recovery conflict */
-			Assert(RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_DATABASE);
+			int code;
+
+			Assert(RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_DATABASE ||
+				   RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN);
+
+			if (RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN)
+				/* XXX more appropriate error code? */
+				code = ERRCODE_PROGRAM_LIMIT_EXCEEDED;
+			else
+				code = ERRCODE_DATABASE_DROPPED;
+
 			pgstat_report_recovery_conflict(RecoveryConflictReason);
 			ereport(FATAL,
-					(errcode(ERRCODE_DATABASE_DROPPED),
+					(errcode(code),
 			  errmsg("terminating connection due to conflict with recovery"),
 					 errdetail_recovery_conflict()));
 		}
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index 20077a6..3bad417 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -242,6 +242,8 @@ main(int argc, char *argv[])
 		   ControlFile->checkPointCopy.oldestCommitTsXid);
 	printf(_("Latest checkpoint's newestCommitTsXid:%u\n"),
 		   ControlFile->checkPointCopy.newestCommitTsXid);
+	printf(_("Latest checkpoint's oldestCatalogXmin:%u\n"),
+		   ControlFile->checkPointCopy.oldestCatalogXmin);
 	printf(_("Time of latest checkpoint:            %s\n"),
 		   ckpttime_str);
 	printf(_("Fake LSN counter for unlogged rels:   %X/%X\n"),
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 969eff9..50f68e8 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -134,6 +134,10 @@ typedef struct VariableCacheData
 	 */
 	TransactionId latestCompletedXid;	/* newest XID that has committed or
 										 * aborted */
+
+	TransactionId oldestCatalogXmin; /* oldest xid where complete catalog state
+									  * is guaranteed to still exist */
+
 } VariableCacheData;
 
 typedef VariableCacheData *VariableCache;
@@ -173,6 +177,7 @@ extern TransactionId GetNewTransactionId(bool isSubXact);
 extern TransactionId ReadNewTransactionId(void);
 extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,
 					  Oid oldest_datoid);
+extern void SetOldestCatalogXmin(TransactionId oldestCatalogXmin);
 extern bool ForceTransactionIdLimitUpdate(void);
 extern Oid	GetNewObjectId(void);
 
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index a123d2a..17e4306 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -118,7 +118,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XLOG_XACT_COMMIT_PREPARED	0x30
 #define XLOG_XACT_ABORT_PREPARED	0x40
 #define XLOG_XACT_ASSIGNMENT		0x50
-/* free opcode 0x60 */
+#define XLOG_XACT_CATALOG_XMIN_ADV	0x60
 /* free opcode 0x70 */
 
 /* mask for filtering opcodes out of xl_info */
@@ -167,6 +167,13 @@ typedef struct xl_xact_assignment
 
 #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub)
 
+typedef struct xl_xact_catalog_xmin_advance
+{
+	TransactionId new_catalog_xmin;
+} xl_xact_catalog_xmin_advance;
+
+#define SizeOfXactCatalogXminAdvance (offsetof(xl_xact_catalog_xmin_advance, new_catalog_xmin) + sizeof(TransactionId))
+
 /*
  * Commit and abort records can contain a lot of information. But a large
  * portion of the records won't need all possible pieces of information. So we
@@ -370,6 +377,9 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
 				   TransactionId twophase_xid);
+
+extern XLogRecPtr XactLogCatalogXminUpdate(TransactionId new_catalog_xmin);
+
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 0bc41ab..df19adc 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -43,6 +43,7 @@ typedef struct CheckPoint
 	MultiXactOffset nextMultiOffset;	/* next free MultiXact offset */
 	TransactionId oldestXid;	/* cluster-wide minimum datfrozenxid */
 	Oid			oldestXidDB;	/* database with minimum datfrozenxid */
+	TransactionId oldestCatalogXmin; /* catalog retained after this xid */
 	MultiXactId oldestMulti;	/* cluster-wide minimum datminmxid */
 	Oid			oldestMultiDB;	/* database with minimum datminmxid */
 	pg_time_t	time;			/* time stamp of checkpoint */
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 0b85b7a..d7817d4 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -746,7 +746,8 @@ typedef enum
 	WAIT_EVENT_SYSLOGGER_MAIN,
 	WAIT_EVENT_WAL_RECEIVER_MAIN,
 	WAIT_EVENT_WAL_SENDER_MAIN,
-	WAIT_EVENT_WAL_WRITER_MAIN
+	WAIT_EVENT_WAL_WRITER_MAIN,
+	WAIT_EVENT_STANDBY_LOGICAL_SLOT_CREATE
 } WaitEventActivity;
 
 /* ----------
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index e00562d..4ad2bcf 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -175,6 +175,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
 extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
+extern void ReplicationSlotsDropDBSlots(Oid dboid);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index cd787c9..5ba4ae8 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -116,6 +116,9 @@ typedef struct
 	/*
 	 * force walreceiver reply?  This doesn't need to be locked; memory
 	 * barriers for ordering are sufficient.
+	 *
+	 * If hot standby feedback is enabled, a hot standby feedback message
+	 * will also be sent.
 	 */
 	bool		force_reply;
 
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index dd37c0c..0592aff 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -53,7 +53,7 @@ extern RunningTransactions GetRunningTransactionData(void);
 
 extern bool TransactionIdIsInProgress(TransactionId xid);
 extern bool TransactionIdIsActive(TransactionId xid);
-extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
+extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum, bool ignoreCatalogXmin);
 extern TransactionId GetOldestActiveTransactionId(void);
 extern TransactionId GetOldestSafeDecodingTransactionId(void);
 
@@ -78,6 +78,8 @@ extern int	CountUserBackends(Oid roleid);
 extern bool CountOtherDBBackends(Oid databaseId,
 					 int *nbackends, int *nprepared);
 
+extern void CancelLogicalDecodingSessionWithRecoveryConflict(pid_t session_pid);
+
 extern void XidCacheRemoveRunningXids(TransactionId xid,
 						  int nxids, const TransactionId *xids,
 						  TransactionId latestXid);
@@ -86,6 +88,9 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
 							TransactionId catalog_xmin, bool already_locked);
 
 extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
-								TransactionId *catalog_xmin);
+								TransactionId *retained_catalog_xmin,
+								TransactionId *needed_catalog_xmin);
+
+extern void UpdateOldestCatalogXmin(bool force);
 
 #endif   /* PROCARRAY_H */
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index f67b982..8e37e29 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -40,6 +40,7 @@ typedef enum
 	PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
 	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
 	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
+	PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN,
 
 	NUM_PROCSIGNALS				/* Must be last! */
 } ProcSignalReason;
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index dcebf72..cc04186 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -34,6 +34,8 @@ extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
 
 extern void ResolveRecoveryConflictWithLock(LOCKTAG locktag);
 extern void ResolveRecoveryConflictWithBufferPin(void);
+extern void ResolveRecoveryConflictWithLogicalDecoding(
+	TransactionId new_catalog_xmin);
 extern void CheckRecoveryConflictDeadlock(void);
 extern void StandbyDeadLockHandler(void);
 extern void StandbyTimeoutHandler(void);
diff --git a/src/test/recovery/t/010_logical_decoding_on_replica.pl b/src/test/recovery/t/010_logical_decoding_on_replica.pl
new file mode 100644
index 0000000..9082ddd
--- /dev/null
+++ b/src/test/recovery/t/010_logical_decoding_on_replica.pl
@@ -0,0 +1,453 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Test logical decoding on a standby.
+#
+use strict;
+use warnings;
+use 5.8.0;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 63;
+use RecursiveCopy;
+use File::Copy;
+use pg_lsn qw(parse_lsn);
+use Time::HiRes;
+
+my ($stdin, $stdout, $stderr, $ret, $handle, $return);
+my $backup_name;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 4\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 4\n");
+$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->append_conf('postgresql.conf', "log_error_verbosity = verbose\n");
+$node_master->append_conf('postgresql.conf', "hot_standby_feedback = on\n");
+# send status rapidly so we promptly advance xmin on master
+$node_master->append_conf('postgresql.conf', "wal_receiver_status_interval = 1\n");
+# very promptly terminate conflicting backends
+$node_master->append_conf('postgresql.conf', "max_standby_streaming_delay = '2s'\n");
+$node_master->dump_info;
+$node_master->start;
+
+$node_master->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_master->safe_psql('testdb', q[SELECT * FROM pg_create_physical_replication_slot('decoding_standby');]);
+$backup_name = 'b1';
+my $backup_dir = $node_master->backup_dir . "/" . $backup_name;
+TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', $node_master->connstr('testdb'), '--xlog-method=stream', '--write-recovery-conf', '--slot=decoding_standby');
+
+open(my $fh, "<", $backup_dir . "/recovery.conf")
+  or die "can't open recovery.conf";
+
+my $found = 0;
+while (my $line = <$fh>)
+{
+	chomp($line);
+	if ($line eq "primary_slot_name = 'decoding_standby'")
+	{
+		$found = 1;
+		last;
+	}
+}
+ok($found, "using physical slot for standby");
+
+sub print_phys_xmin
+{
+	my $slot = $node_master->slot('decoding_standby');
+	return ($slot->{'xmin'}, $slot->{'catalog_xmin'});
+}
+
+my ($xmin, $catalog_xmin) = print_phys_xmin();
+# without the catalog_xmin hot standby feedback patch, catalog_xmin is always null
+# and xmin is the min(xmin, catalog_xmin) of all slots on the standby + anything else
+# holding down xmin.
+ok(!$xmin, "xmin null");
+ok(!$catalog_xmin, "catalog_xmin null");
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+
+$node_replica->start;
+
+$node_master->wait_for_catchup($node_replica);
+sleep(2); # ensure walreceiver feedback sent
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+ok($xmin, "xmin not null");
+ok(!$catalog_xmin, "catalog_xmin null");
+
+# Create new slots on the replica, ignoring the ones on the master completely.
+diag "creating slot standby_logical";
+my $start_time = [Time::HiRes::gettimeofday()];
+is($node_replica->psql('testdb', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]),
+   0, 'logical slot creation on standby succeeded');
+diag sprintf("Creation took %.2d seconds", Time::HiRes::tv_interval($start_time));
+
+sub print_logical_xmin
+{
+	my $slot = $node_replica->slot('standby_logical');
+	return ($slot->{'xmin'}, $slot->{'catalog_xmin'});
+}
+
+$node_master->wait_for_catchup($node_replica);
+sleep(2); # ensure walreceiver feedback sent
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+isnt($xmin, '', "physical xmin not null");
+isnt($catalog_xmin, '', "physical catalog_xmin not null");
+
+($xmin, $catalog_xmin) = print_logical_xmin();
+is($xmin, '', "logical xmin null");
+isnt($catalog_xmin, '', "logical catalog_xmin not null");
+
+$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)');
+$node_master->safe_psql('testdb', q[INSERT INTO test_table(blah) values ('itworks')]);
+
+$node_master->wait_for_catchup($node_replica);
+sleep(2); # ensure walreceiver feedback sent
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+isnt($xmin, '', "physical xmin not null");
+isnt($catalog_xmin, '', "physical catalog_xmin not null");
+
+$node_master->wait_for_catchup($node_replica);
+sleep(2); # ensure walreceiver feedback sent
+
+($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($ret, 0, 'replay from slot succeeded');
+is($stdout, q{BEGIN
+table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks'
+COMMIT}, 'replay results match');
+is($stderr, '', 'stderr is empty');
+
+$node_master->wait_for_catchup($node_replica);
+sleep(2); # ensure walreceiver feedback sent
+
+my ($physical_xmin, $physical_catalog_xmin) = print_phys_xmin();
+isnt($physical_xmin, '', "physical xmin not null");
+isnt($physical_catalog_xmin, '', "physical catalog_xmin not null");
+
+my ($logical_xmin, $logical_catalog_xmin) = print_logical_xmin();
+is($logical_xmin, '', "logical xmin null");
+isnt($logical_catalog_xmin, '', "logical catalog_xmin not null");
+
+# Ok, do a pile of tx's and make sure xmin advances.
+# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot,
+# we hold down xmin.
+$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_1();]);
+for my $i (0 .. 2000)
+{
+    $node_master->safe_psql('testdb', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]);
+}
+$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_2();]);
+$node_master->safe_psql('testdb', 'VACUUM');
+
+my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+cmp_ok($new_logical_catalog_xmin, "==", $logical_catalog_xmin, "logical slot catalog_xmin hasn't advanced before get_changes");
+
+($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($ret, 0, 'replay of big series succeeded');
+isnt($stdout, '', 'replayed some rows');
+
+($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+is($new_logical_xmin, '', "logical xmin null");
+isnt($new_logical_catalog_xmin, '', "logical slot catalog_xmin not null");
+cmp_ok($new_logical_catalog_xmin, ">", $logical_catalog_xmin, "logical slot catalog_xmin advanced after get_changes");
+
+$node_master->wait_for_catchup($node_replica);
+sleep(2); # ensure walreceiver feedback sent
+
+$node_master->wait_for_catchup($node_replica);
+sleep(2); # ensure walreceiver feedback sent
+
+my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin();
+isnt($new_physical_xmin, '', "physical xmin not null");
+# hot standby feedback should advance phys catalog_xmin now the standby's slot
+# doesn't hold it down as far.
+isnt($new_physical_catalog_xmin, '', "physical catalog_xmin not null");
+cmp_ok($new_physical_catalog_xmin, ">", $physical_catalog_xmin, "physical catalog_xmin advanced");
+
+cmp_ok($new_physical_catalog_xmin, "<=", $new_logical_catalog_xmin, 'upstream physical slot catalog_xmin not past downstream catalog_xmin with hs_feedback on');
+
+#########################################################
+# Upstream catalog retention
+#########################################################
+
+sub test_catalog_xmin_retention()
+{
+	# First burn some xids on the master in another DB, so we push the master's
+	# nextXid ahead.
+	foreach my $i (1 .. 100)
+	{
+		$node_master->safe_psql('postgres', 'SELECT txid_current()');
+	}
+
+	# Force vacuum freeze on the master and ensure its oldestXmin doesn't advance
+	# past our needed xmin. The only way we have visibility into that is to force
+	# a checkpoint.
+	$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'");
+	foreach my $dbname ('template1', 'postgres', 'testdb', 'template0')
+	{
+		$node_master->safe_psql($dbname, 'VACUUM FREEZE');
+	}
+	sleep(1);
+	$node_master->safe_psql('postgres', 'CHECKPOINT');
+	IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout)
+		or die "pg_controldata failed with $?";
+	my @checkpoint = split('\n', $stdout);
+	my ($oldestXid, $oldestCatalogXmin, $nextXid) = ('', '', '');
+	foreach my $line (@checkpoint)
+	{
+		if ($line =~ qr/^Latest checkpoint's NextXID:\s+\d+:(\d+)/)
+		{
+			$nextXid = $1;
+		}
+		if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/)
+		{
+			$oldestXid = $1;
+		}
+		if ($line =~ qr/^Latest checkpoint's oldestCatalogXmin:\s*(\d+)/)
+		{
+			$oldestCatalogXmin = $1;
+		}
+	}
+	die 'no oldestXID found in checkpoint' unless $oldestXid;
+
+	my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin();
+	my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+
+	diag "upstream oldestXid $oldestXid, oldestCatalogXmin $oldestCatalogXmin, nextXid $nextXid, phys slot catalog_xmin $new_physical_catalog_xmin, downstream catalog_xmin $new_logical_catalog_xmin";
+
+	$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'");
+
+	return ($oldestXid, $oldestCatalogXmin);
+}
+
+diag "Testing catalog_xmin retention with hs_feedback on";
+my ($oldestXid, $oldestCatalogXmin) = test_catalog_xmin_retention();
+
+cmp_ok($oldestXid, "<=", $new_logical_catalog_xmin, 'upstream oldestXid not past downstream catalog_xmin with hs_feedback on');
+cmp_ok($oldestCatalogXmin, ">=", $oldestXid, "oldestCatalogXmin >= oldestXid");
+cmp_ok($oldestCatalogXmin, "<=", $new_logical_catalog_xmin,, "oldestCatalogXmin >= downstream catalog_xmin");
+
+#########################################################
+# Conflict with recovery: xmin cancels decoding session
+#########################################################
+#
+# Start a transaction on the replica then perform work that should cause a
+# recovery conflict with it. We'll check to make sure the client gets
+# terminated with recovery conflict.
+#
+# Temporarily disable hs feedback so we can test recovery conflicts.
+# It's fine to continue using a physical slot, the xmin should be
+# cleared. We only check hot_standby_feedback when establishing
+# an initial decoding session so this approach circumvents the safeguards
+# in place and forces a conflict.
+#
+# We'll also create an unrelated table so we can drop it later, making
+# sure there are catalog changes to replay.
+$node_master->safe_psql('testdb', 'CREATE TABLE dummy_table(blah integer)');
+
+# Start pg_recvlogical before we turn off hs_feedback so its slot's
+# catalog_xmin is above the downstream's catalog_threshold when we start
+# decoding.
+$handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'standby_logical', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr);
+
+$node_replica->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off');
+$node_replica->reload;
+
+sleep(2);
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+is($xmin, '', "physical xmin null after hs_feedback disabled");
+is($catalog_xmin, '', "physical catalog_xmin null after hs_feedback disabled");
+
+# Burn a bunch of XIDs and make sure upstream catalog_xmin is past what we'll
+# need here
+($oldestXid, $oldestCatalogXmin) = test_catalog_xmin_retention();
+cmp_ok($oldestXid, ">", $new_logical_catalog_xmin, 'upstream oldestXid advanced past downstream catalog_xmin with hs_feedback off');
+cmp_ok($oldestCatalogXmin, "==", 0, "oldestCatalogXmin = InvalidTransactionId with hs_feedback off");
+
+# Data-only changes, no effect on catalogs. We should replay them fine
+# without a conflict, since they advance xmin but not catalog_xmin.
+$node_master->safe_psql('testdb', 'DELETE FROM test_table');
+$node_master->safe_psql('testdb', 'VACUUM FULL test_table');
+$node_master->safe_psql('testdb', 'VACUUM;');
+
+diag "waiting for catchup";
+$node_master->wait_for_catchup($node_replica);
+
+diag "pumping";
+$handle->pump;
+diag "pumped";
+
+# If we change the catalogs, we'll get a conflict with recovery, but only
+# if there's an active xact when decoding. Logical decoding
+# doesn't keep a virtualxid while waiting for WAL, only when calling output
+# plugins, so this won't work damn.
+diag "dropping dummy_table";
+$node_master->safe_psql('testdb', 'DROP TABLE dummy_table;');
+
+diag "waiting for catchup";
+$node_master->wait_for_catchup($node_replica);
+diag "caught up, waiting for client";
+
+# client dies?
+eval {
+	$handle->finish;
+};
+$return = $?;
+if ($return) {
+	is($return, 256, "pg_recvlogical terminated by server on recovery conflict");
+	like($stderr, qr/terminating connection due to conflict with recovery/, 'recvlogical recovery conflict errmsg');
+	like($stderr, qr/requires catalog rows that will be removed/, 'pg_recvlogical exited with catalog_xmin conflict');
+}
+else
+{
+	fail("pg_recvlogical returned ok $return with stdout '$stdout', stderr '$stderr'");
+}
+
+#####################################################################
+# Conflict with recovery: refuse to run without hot_standby_feedback
+#####################################################################
+#
+# When hot_standby_feedback is off, new connections should fail.
+#
+
+IPC::Run::run(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'standby_logical', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr);
+is($?, 256, 'pg_recvlogical failed to connect to slot while hot_standby_feedback off');
+like($stderr, qr/hot_standby_feedback/, 'recvlogical recovery conflict errmsg');
+
+#####################################################################
+# Conflict with recovery: catalog_xmin advance invalidates idle slot
+#####################################################################
+#
+# The slot that pg_recvlogical was using before it was terminated
+# should not accept new connections now, since its catalog_xmin
+# is lower than the replica's threshold. Even once we re-enable
+# hot_standby_feedback, the removed tuples won't somehow come back.
+#
+
+$node_replica->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on');
+$node_replica->reload;
+sleep(2);
+
+IPC::Run::run(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'standby_logical', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr);
+is($?, 256, 'pg_recvlogical failed to connect to slot with past catalog_xmin');
+like($stderr, qr/replication slot '.*' requires catalogs removed by master/, 'recvlogical recovery conflict errmsg');
+
+
+##################################################
+# Drop slot
+##################################################
+#
+is($node_replica->safe_psql('postgres', 'SHOW hot_standby_feedback'), 'on', 'hs_feedback is on');
+
+# Make sure slots on replicas are droppable, and properly clear the upstream's xmin
+$node_replica->psql('testdb', q[SELECT pg_drop_replication_slot('standby_logical')]);
+
+is($node_replica->slot('standby_logical')->{'slot_type'}, '', 'slot on standby dropped manually');
+
+$node_master->wait_for_catchup($node_replica);
+sleep(2); # ensure walreceiver feedback sent
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+is($catalog_xmin, '', "physical catalog_xmin null");
+
+
+
+
+##################################################
+# Recovery: drop database drops idle slots
+##################################################
+
+# Create a couple of slots on the DB to ensure they are dropped when we drop
+# the DB on the upstream if they're on the right DB, or not dropped if on
+# another DB.
+
+diag "Testing dropdb when downstream slot is not in-use";
+diag "creating slot dodropslot";
+$start_time = [Time::HiRes::gettimeofday()];
+$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-P', 'test_decoding', '-S', 'dodropslot', '--create-slot'], 'pg_recvlogical created dodropslot');
+diag sprintf("Creation took %.2d seconds", Time::HiRes::tv_interval($start_time));
+diag "creating slot otherslot";
+$start_time = [Time::HiRes::gettimeofday()];
+$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('postgres'), '-P', 'test_decoding', '-S', 'otherslot', '--create-slot'], 'pg_recvlogical created otherslot');
+diag sprintf("Creation took %.2d seconds", Time::HiRes::tv_interval($start_time));
+
+is($node_replica->slot('dodropslot')->{'slot_type'}, 'logical', 'slot dodropslot on standby created');
+is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'slot otherslot on standby created');
+
+# dropdb on the master to verify slots are dropped on standby
+$node_master->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_master->wait_for_catchup($node_replica);
+
+is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f',
+  'database dropped on standby');
+
+is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped');
+is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'otherslot on standby not dropped');
+
+
+##################################################
+# Recovery: drop database drops in-use slots
+##################################################
+
+# This time, have the slot in-use on the downstream DB when we drop it.
+diag "Testing dropdb when downstream slot is in-use";
+$node_master->psql('postgres', q[CREATE DATABASE testdb2]);
+
+diag "creaitng slot dodropslot2";
+$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-P', 'test_decoding', '-S', 'dodropslot2', '--create-slot']);
+is($node_replica->slot('dodropslot2')->{'slot_type'}, 'logical', 'slot dodropslot2 on standby created');
+
+# make sure the slot is in use
+diag "starting pg_recvlogical";
+$handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-S', 'dodropslot2', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr);
+sleep(1);
+
+is($node_replica->slot('dodropslot2')->{'active'}, 't', 'slot on standby is active')
+  or BAIL_OUT("slot not active on standby, cannot continue. pg_recvlogical exited with '$stdout', '$stderr'");
+
+diag "pg_recvlogical backend pid is " . $node_replica->slot('dodropslot2')->{'active_pid'};
+
+# Master doesn't know the replica's slot is busy so dropdb should succeed
+$node_master->safe_psql('postgres', q[DROP DATABASE testdb2]);
+ok(1, 'dropdb finished');
+
+while ($node_replica->slot('dodropslot2')->{'active_pid'})
+{
+	sleep(1);
+	diag "waiting for walsender to exit";
+}
+
+diag "walsender exited, waiting for pg_recvlogical to exit";
+
+# our client should've terminated in response to the walsender error
+eval {
+	$handle->finish;
+};
+$return = $?;
+if ($return) {
+	is($return, 256, "pg_recvlogical terminated by server");
+	like($stderr, qr/terminating connection due to conflict with recovery/, 'recvlogical recovery conflict');
+	like($stderr, qr/User was connected to a database that must be dropped./, 'recvlogical recovery conflict db');
+}
+
+is($node_replica->slot('dodropslot2')->{'active_pid'}, '', 'walsender backend exited');
+
+# The slot should be dropped by recovery now
+$node_master->wait_for_catchup($node_replica);
+
+is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb2')]), 'f',
+  'database dropped on standby');
+
+is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped');
-- 
2.5.5

