[PATCH] Send catalog_xmin separately in hot standby feedback

Started by Craig Ringerover 9 years ago5 messages
#1Craig Ringer
craig@2ndquadrant.com
1 attachment(s)

Hi all

Currently hot standby feedback sends GetOldestXmin()'s result to the
upstream as the required xmin. GetOldestXmin() returns a slot's
catalog_xmin if that's the lowest xmin on the system.

That's fine so long as we don't do logical decoding on standbys, but
if we start allowing logical slots on standbys it'll cause the master
to retain too much bloat since it'll pin the master's xmin (not
catalog_xmin) down based on the catalog_xmin of any slots on the
downstream.

To fix that, add new fields to the hot standby feedback protocol
message to carry a separate catalog_xmin.

This doesn't need any special care for backward compatibility because
the only thing that has any business sending hot standby feedback is a
physical standby and they're required to be the same major version as
the master. If someone tries to connect with a standby of the wrong
version they'll fail long before this, and even if they didn't they'd
just get an error saying there's not enough data in the message.
pg_basebackup, pg_recvlogical and pg_receivexlog don't send hot
standby feedback messages.

I'm posting this now because Petr was interested in it for his work on
logical replication. I'll be following it a subsequent patch to allow
logical slot creation on physical replicas if they're using a slot to
talk to the master and have hot_standby_feedback enabled, just so you
know the direction this is going in.

Passes 'make check', src/bin/pg_basebackup and src/test/recovery TAP
tests. I haven't added specific tests for this functionality since
there isn't (yet) a way to set catalog_xmin separately on a physical
standby without a dedicated test module.

The logical decoding timeline following patch[1]https://commitfest.postgresql.org/10/779/ is also relevant for
this, since it is required for logical decoding on standby to survive
promotion.

Next steps will be:

* Expose information about whether or not a slot is in use from walreceiver.c

* Allow logical slots to be created on replicas if
hot_standby_feedback is enabled and a logical slot is in use. Return
null as the exported snapshot ID when creating over the walsender
protocol, since we can't export a snapshot on a standby due to the
need to allocate an xid. (That can be addressed separately).

* Now that recovery tests are possible, write the recovery test suite
for logical decoding on standby

* Auto-drop replication slots when dropping a database in dbase_redo

* Add a safety mechanism to stop users disabling hs feedback on the
replica or stopping using a physical slot to the upstream while
logical slots exist on the replica. Or mark such logical slots as
unusable using a new persistent field on the slot. Not trivial because
we must allow crash recovery without a slot to upstream (obviously),
and should preferably also allow fallback to archive recovery when
server with slot is temporarily unreachable. Must also consider
handling of physical slot with catalog_xmin set from cascading
physical replica with logical slots on it.

* Extend the logical replication patch to add support for following
physical failover using this functionality, likely in 11.0.

[1]: https://commitfest.postgresql.org/10/779/

I'll add this to the next CF, but I realise the inability to test it
standalone may mean it can only be committed as part of a series along
with full support for logical decoding from standby.

--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

Attachments:

0001-Send-catalog_xmin-in-hot-standby-feedback.patchtext/x-patch; charset=US-ASCII; name=0001-Send-catalog_xmin-in-hot-standby-feedback.patchDownload
From 041682447587190e1d5b8ebea437f1bb33c33a84 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Wed, 13 Jul 2016 15:23:08 +0800
Subject: [PATCH] Send catalog_xmin in hot standby feedback

Add catalog_xmin to the to the hot standby feedback protocol so a read replica
that has logical slots can use its physical slot to the master to hold down the
master's catalog_xmin. This lets a replica prevent vacuuming of catalog tuples
still required by the replica's logical slots.

This functionality is required, but not sufficient, to allow logical decoding
to work on a replia. It is also useful for handling physical failover of
database instances that serve as upstreams for logical decoding clients.
---
 contrib/pg_visibility/pg_visibility.c     |  4 +-
 contrib/pgstattuple/pgstatapprox.c        |  2 +-
 doc/src/sgml/protocol.sgml                | 33 ++++++++++--
 src/backend/access/transam/xlog.c         | 36 ++++++++++++-
 src/backend/catalog/index.c               |  2 +-
 src/backend/commands/analyze.c            |  2 +-
 src/backend/commands/vacuum.c             |  4 +-
 src/backend/replication/logical/logical.c | 20 +++++--
 src/backend/replication/walreceiver.c     | 54 ++++++++++++++++---
 src/backend/replication/walsender.c       | 90 ++++++++++++++++++++++---------
 src/backend/storage/ipc/procarray.c       | 51 +++++++++---------
 src/include/access/xlog.h                 |  1 +
 src/include/replication/slot.h            | 30 +++++++++--
 src/include/storage/procarray.h           |  2 +-
 14 files changed, 252 insertions(+), 79 deletions(-)

diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c
index 7034066..318caab 100644
--- a/contrib/pg_visibility/pg_visibility.c
+++ b/contrib/pg_visibility/pg_visibility.c
@@ -523,7 +523,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 	if (all_visible)
 	{
 		/* Don't pass rel; that will fail in recovery. */
-		OldestXmin = GetOldestXmin(NULL, true);
+		OldestXmin = GetOldestXmin(NULL, true, false);
 	}
 
 	rel = relation_open(relid, AccessShareLock);
@@ -646,7 +646,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 				 * a buffer lock. And this shouldn't happen often, so it's
 				 * worth being careful so as to avoid false positives.
 				 */
-				RecomputedOldestXmin = GetOldestXmin(NULL, true);
+				RecomputedOldestXmin = GetOldestXmin(NULL, true, false);
 
 				if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin))
 					record_corrupt_item(items, &tuple.t_data->t_ctid);
diff --git a/contrib/pgstattuple/pgstatapprox.c b/contrib/pgstattuple/pgstatapprox.c
index a49ff54..3674c05 100644
--- a/contrib/pgstattuple/pgstatapprox.c
+++ b/contrib/pgstattuple/pgstatapprox.c
@@ -67,7 +67,7 @@ statapprox_heap(Relation rel, output_type *stat)
 	TransactionId OldestXmin;
 	uint64		misc_count = 0;
 
-	OldestXmin = GetOldestXmin(rel, true);
+	OldestXmin = GetOldestXmin(rel, true, false);
 	bstrategy = GetAccessStrategy(BAS_BULKREAD);
 
 	nblocks = RelationGetNumberOfBlocks(rel);
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 68b0941..c4e41ca 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1783,10 +1783,11 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current xmin. This may be 0, if the standby is
-          sending notification that Hot Standby feedback will no longer
-          be sent on this connection. Later non-zero messages may
-          reinitiate the feedback mechanism.
+          The standby's current global xmin, excluding the catalog_xmin from any
+          replication slots. If both this value and the following
+          catalog_xmin are 0 this is treated as a notification that Hot Standby
+          feedback will no longer be sent on this connection. Later non-zero
+          messages may reinitiate the feedback mechanism.
       </para>
       </listitem>
       </varlistentry>
@@ -1796,7 +1797,29 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current epoch.
+          The epoch of the global xmin xid on the standby.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The lowest catalog_xmin of any replication slots on the standby. Set to 0
+          if no catalog_xmin exists on the standby or if hot standby feedback is being
+          disabled. New in 10.0.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The epoch of the catalog_xmin xid on the standby. New in 10.0.
       </para>
       </listitem>
       </varlistentry>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0b991bb..550bb40 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7962,6 +7962,38 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
 }
 
 /*
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
+ */
+bool
+TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
+{
+	TransactionId nextXid;
+	uint32		nextEpoch;
+
+	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+	if (xid <= nextXid)
+	{
+		if (epoch != nextEpoch)
+			return false;
+	}
+	else
+	{
+		if (epoch + 1 != nextEpoch)
+			return false;
+	}
+
+	if (!TransactionIdPrecedesOrEquals(xid, nextXid))
+		return false;				/* epoch OK, but it's wrapped around */
+
+	return true;
+}
+
+/*
  * This must be called ONCE during postmaster or standalone-backend shutdown
  */
 void
@@ -8565,7 +8597,7 @@ CreateCheckPoint(int flags)
 	 * StartupSUBTRANS hasn't been called yet.
 	 */
 	if (!RecoveryInProgress())
-		TruncateSUBTRANS(GetOldestXmin(NULL, false));
+		TruncateSUBTRANS(GetOldestXmin(NULL, false, false));
 
 	/* Real work is done, but log and update stats before releasing lock. */
 	LogCheckpointEnd(false);
@@ -8904,7 +8936,7 @@ CreateRestartPoint(int flags)
 	 * this because StartupSUBTRANS hasn't been called yet.
 	 */
 	if (EnableHotStandby)
-		TruncateSUBTRANS(GetOldestXmin(NULL, false));
+		TruncateSUBTRANS(GetOldestXmin(NULL, false, false));
 
 	/* Real work is done, but log and update before releasing lock. */
 	LogCheckpointEnd(true);
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index b0b43cf..fd86530 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2255,7 +2255,7 @@ IndexBuildHeapRangeScan(Relation heapRelation,
 	{
 		snapshot = SnapshotAny;
 		/* okay to ignore lazy VACUUMs here */
-		OldestXmin = GetOldestXmin(heapRelation, true);
+		OldestXmin = GetOldestXmin(heapRelation, true, false);
 	}
 
 	scan = heap_beginscan_strat(heapRelation,	/* relation */
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index c617abb..2170566 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -992,7 +992,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	totalblocks = RelationGetNumberOfBlocks(onerel);
 
 	/* Need a cutoff xmin for HeapTupleSatisfiesVacuum */
-	OldestXmin = GetOldestXmin(onerel, true);
+	OldestXmin = GetOldestXmin(onerel, true, false);
 
 	/* Prepare for sampling block numbers */
 	BlockSampler_Init(&bs, totalblocks, targrows, random());
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 58bbf55..79ec690 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -497,7 +497,7 @@ vacuum_set_xid_limits(Relation rel,
 	 * always an independent transaction.
 	 */
 	*oldestXmin =
-		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel);
+		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true, false), rel);
 
 	Assert(TransactionIdIsNormal(*oldestXmin));
 
@@ -909,7 +909,7 @@ vac_update_datfrozenxid(void)
 	 * committed pg_class entries for new tables; see AddNewRelationTuple().
 	 * So we cannot produce a wrong minimum by starting with this.
 	 */
-	newFrozenXid = GetOldestXmin(NULL, true);
+	newFrozenXid = GetOldestXmin(NULL, true, false);
 
 	/*
 	 * Similarly, initialize the MultiXact "min" with the value that would be
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1512be5..85f8f0e 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -88,16 +88,28 @@ CheckLogicalDecodingRequirements(void)
 				 errmsg("logical decoding requires a database connection")));
 
 	/* ----
-	 * TODO: We got to change that someday soon...
+	 * To allow logical decoding on a standby we must ensure that:
 	 *
-	 * There's basically three things missing to allow this:
 	 * 1) We need to be able to correctly and quickly identify the timeline a
-	 *	  LSN belongs to
+	 *	  LSN belongs to so we can follow timeline switches
+	 *
 	 * 2) We need to force hot_standby_feedback to be enabled at all times so
 	 *	  the primary cannot remove rows we need.
-	 * 3) support dropping replication slots referring to a database, in
+	 *
+	 * 3) ensure a replication slot is used to connect to the upstream so
+	 *    we know the catalog_xmin is persistent even over connection loss.
+	 *
+	 * 4) support dropping replication slots referring to a database, in
 	 *	  dbase_redo. There can't be any active ones due to HS recovery
 	 *	  conflicts, so that should be relatively easy.
+	 *
+	 * This means we can't allow logical decoding from a standby that's only
+	 * configured for archive recovery. It would be OK to run temporarily in
+	 * archive recovery during connectivity drops so long as we have a slot
+	 * with a catalog_xmin set; it'd cause extra bloat on the master until we
+	 * can reconnect, but that's unavoidable. We don't currently have any
+	 * book-keeping about whether we have a slot unless it's in active use,
+	 * though, so we have to assume there's no slot.
 	 * ----
 	 */
 	if (RecoveryInProgress())
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 413ee3a..8e682bb 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1161,8 +1161,8 @@ XLogWalRcvSendHSFeedback(bool immed)
 {
 	TimestampTz now;
 	TransactionId nextXid;
-	uint32		nextEpoch;
-	TransactionId xmin;
+	uint32		nextEpoch, xmin_epoch, catalog_xmin_epoch, slot_xmin;
+	TransactionId xmin, catalog_xmin;
 	static TimestampTz sendTime = 0;
 	static bool master_has_standby_xmin = false;
 
@@ -1203,29 +1203,67 @@ XLogWalRcvSendHSFeedback(bool immed)
 	 * everything else has been checked.
 	 */
 	if (hot_standby_feedback)
-		xmin = GetOldestXmin(NULL, false);
+	{
+		/*
+		 * Usually GetOldestXmin() would include the catalog_xmin in its
+		 * calculations, but we don't want to hold upstream back from vacuuming
+		 * normal user table tuples just because they're within the
+		 * catalog_xmin horizon of logical replication slots on this standby.
+		 * Instead we report the catalog_xmin to the upstream separately.
+		 *
+		 * (XXX-REVIEW-ATTENTION)
+		 * By doing the GetOldestXmin(...) lookup separately we might produce
+		 * an xmin and catalog xmin slightly later than what was in the
+		 * procarray at the time we previously called GetOldestXmin() for
+		 * session state, since it can be advanced in-between.  That's harmless
+		 * though, since we only move the position backwards here.
+		 *
+		 * The alternative here is to extend GetOldestXmin() to take an
+		 * out-param to report the slot catalog xmin. That really just
+		 * duplicates ProcArrayGetReplicationSlotXmin but means we can grab it
+		 * within a single ProcArray lock. A variant of GetOldestXmin that
+		 * takes an already-locked flag would work too, but would hold the lock
+		 * across parts of GetOldestXmin() that currently don't retain it.
+		 * (XXX-REVIEW-ATTENTION)
+		 */
+		xmin = GetOldestXmin(NULL, false, true);
+		ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
+
+		if (TransactionIdIsValid(slot_xmin) &&
+			NormalTransactionIdPrecedes(slot_xmin, xmin))
+			xmin = slot_xmin;
+	}
 	else
+	{
 		xmin = InvalidTransactionId;
+		catalog_xmin = InvalidTransactionId;
+	}
 
 	/*
 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
 	 * the epoch boundary.
 	 */
 	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+	xmin_epoch = nextEpoch;
 	if (nextXid < xmin)
-		nextEpoch--;
+		xmin_epoch --;
+	catalog_xmin_epoch = nextEpoch;
+	if (nextXid < catalog_xmin)
+		catalog_xmin_epoch --;
 
-	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
-		 xmin, nextEpoch);
+	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
+		 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
 
 	/* Construct the message and send it. */
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, 'h');
 	pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
 	pq_sendint(&reply_message, xmin, 4);
-	pq_sendint(&reply_message, nextEpoch, 4);
+	pq_sendint(&reply_message, xmin_epoch, 4);
+	pq_sendint(&reply_message, catalog_xmin, 4);
+	pq_sendint(&reply_message, catalog_xmin_epoch, 4);
 	walrcv_send(reply_message.data, reply_message.len);
-	if (TransactionIdIsValid(xmin))
+	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
 		master_has_standby_xmin = true;
 	else
 		master_has_standby_xmin = false;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1ea2a5c..a1ff16f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1531,6 +1531,11 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	 * be energy wasted - the worst lost information can do here is give us
 	 * wrong information in a statistics view - we'll just potentially be more
 	 * conservative in removing files.
+	 *
+	 * We don't have to do any effective_xmin / effective_catalog_xmin testing
+	 * here either, like for LogicalConfirmReceivedLocation. If we received
+	 * the xmin and catalog_xmin from downstream replication slots we know they
+	 * were already confirmed there,
 	 */
 }
 
@@ -1593,7 +1598,7 @@ ProcessStandbyReplyMessage(void)
 
 /* compute new replication slot xmin horizon if needed */
 static void
-PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 {
 	bool		changed = false;
 	ReplicationSlot *slot = MyReplicationSlot;
@@ -1614,6 +1619,22 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 		slot->data.xmin = feedbackXmin;
 		slot->effective_xmin = feedbackXmin;
 	}
+	/*
+	 * If the physical slot is relaying catalog_xmin for logical replication
+	 * slots on the replica it's safe to act on catalog_xmin advances
+	 * immediately too. The replica will only send a new catalog_xmin via
+	 * feedback when it advances its effective_catalog_xmin, so it's done the
+	 * delay-until-confirmed dance for us and knows it won't need the data
+	 * we're protecting from vacuum again.
+	 */
+	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
+		!TransactionIdIsNormal(feedbackCatalogXmin) ||
+		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
+	{
+		changed = true;
+		slot->data.catalog_xmin = feedbackCatalogXmin;
+		slot->effective_catalog_xmin = feedbackCatalogXmin;
+	}
 	SpinLockRelease(&slot->mutex);
 
 	if (changed)
@@ -1629,10 +1650,10 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 static void
 ProcessStandbyHSFeedbackMessage(void)
 {
-	TransactionId nextXid;
-	uint32		nextEpoch;
 	TransactionId feedbackXmin;
 	uint32		feedbackEpoch;
+	TransactionId feedbackCatalogXmin;
+	uint32		feedbackCatalogEpoch;
 
 	/*
 	 * Decipher the reply message. The caller already consumed the msgtype
@@ -1641,43 +1662,50 @@ ProcessStandbyHSFeedbackMessage(void)
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
 	feedbackXmin = pq_getmsgint(&reply_message, 4);
 	feedbackEpoch = pq_getmsgint(&reply_message, 4);
+	/*
+	 * A 10.0+ standby's walsender passes the lowest catalog xmin of any
+	 * replication slot up to the master. There's need to handle backward
+	 * compatibility here as only the same major version Pg has any business
+	 * sending us hot standby feedback. pg_receivexlog, pg_basebackup etc don't
+	 * use hs feedback.
+	 */
+	feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
 
-	elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+	elog(DEBUG2, "hot standby feedback xmin %u catalog_xmin %u epoch %u",
 		 feedbackXmin,
+		 feedbackCatalogXmin,
 		 feedbackEpoch);
 
-	/* Unset WalSender's xmin if the feedback message value is invalid */
-	if (!TransactionIdIsNormal(feedbackXmin))
+	/*
+	 * Unset WalSender's xmins if the feedback message values are invalid.
+	 * This happens when the downstream turned hot_standby_feedback off.
+	 */
+	if (!TransactionIdIsNormal(feedbackXmin)
+		&& !TransactionIdIsNormal(feedbackCatalogXmin))
 	{
 		MyPgXact->xmin = InvalidTransactionId;
 		if (MyReplicationSlot != NULL)
-			PhysicalReplicationSlotNewXmin(feedbackXmin);
+			PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 		return;
 	}
 
 	/*
 	 * Check that the provided xmin/epoch are sane, that is, not in the future
 	 * and not so far back as to be already wrapped around.  Ignore if not.
-	 *
-	 * Epoch of nextXid should be same as standby, or if the counter has
-	 * wrapped, then one greater than standby.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
-
-	if (feedbackXmin <= nextXid)
+	if (TransactionIdIsNormal(feedbackXmin) &&
+		!TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
 	{
-		if (feedbackEpoch != nextEpoch)
-			return;
+		return;
 	}
-	else
+
+	if (TransactionIdIsNormal(feedbackCatalogXmin) &&
+		!TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
 	{
-		if (feedbackEpoch + 1 != nextEpoch)
-			return;
+		return;
 	}
 
-	if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
-		return;					/* epoch OK, but it's wrapped around */
-
 	/*
 	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
 	 * the xmin will be taken into account by GetOldestXmin.  This will hold
@@ -1701,15 +1729,29 @@ ProcessStandbyHSFeedbackMessage(void)
 	 * already since a VACUUM could have just finished calling GetOldestXmin.)
 	 *
 	 * If we're using a replication slot we reserve the xmin via that,
-	 * otherwise via the walsender's PGXACT entry.
+	 * otherwise via the walsender's PGXACT entry. We can only track the
+	 * catalog xmin separately when using a slot, so we store the least
+	 * of the two provided when not using a slot.
 	 *
 	 * XXX: It might make sense to generalize the ephemeral slot concept and
 	 * always use the slot mechanism to handle the feedback xmin.
 	 */
 	if (MyReplicationSlot != NULL)		/* XXX: persistency configurable? */
-		PhysicalReplicationSlotNewXmin(feedbackXmin);
+	{
+		PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
+	}
 	else
-		MyPgXact->xmin = feedbackXmin;
+	{
+		if (TransactionIdIsNormal(feedbackCatalogXmin)
+			&& TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
+		{
+			MyPgXact->xmin = feedbackCatalogXmin;
+		}
+		else
+		{
+			MyPgXact->xmin = feedbackXmin;
+		}
+	}
 }
 
 /*
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index e5d487d..99535b4 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1298,17 +1298,18 @@ TransactionIdIsActive(TransactionId xid)
  * process can set its xmin based on transactions that are no longer running
  * in the master but are still being replayed on the standby, thus possibly
  * making the GetOldestXmin reading go backwards.  In this case there is a
- * possibility that we lose data that the standby would like to have, but
- * there is little we can do about that --- data is only protected if the
- * walsender runs continuously while queries are executed on the standby.
- * (The Hot Standby code deals with such cases by failing standby queries
- * that needed to access already-removed data, so there's no integrity bug.)
+ * possibility that we lose data that the standby would like to have
+ * unless the standby uses a replication slot to make its xmin persistent
+ * even when it isn't connected. The Hot Standby code deals with such cases by
+ * failing standby queries that needed to access already-removed data, so
+ * there's no integrity bug.
+ *
  * The return value is also adjusted with vacuum_defer_cleanup_age, so
  * increasing that setting on the fly is another easy way to make
  * GetOldestXmin() move backwards, with no consequences for data integrity.
  */
 TransactionId
-GetOldestXmin(Relation rel, bool ignoreVacuum)
+GetOldestXmin(Relation rel, bool ignoreVacuum, bool ignoreSlots)
 {
 	ProcArrayStruct *arrayP = procArray;
 	TransactionId result;
@@ -1426,25 +1427,27 @@ GetOldestXmin(Relation rel, bool ignoreVacuum)
 			result = FirstNormalTransactionId;
 	}
 
-	/*
-	 * Check whether there are replication slots requiring an older xmin.
-	 */
-	if (TransactionIdIsValid(replication_slot_xmin) &&
-		NormalTransactionIdPrecedes(replication_slot_xmin, result))
-		result = replication_slot_xmin;
-
-	/*
-	 * After locks have been released and defer_cleanup_age has been applied,
-	 * check whether we need to back up further to make logical decoding
-	 * possible. We need to do so if we're computing the global limit (rel =
-	 * NULL) or if the passed relation is a catalog relation of some kind.
-	 */
-	if ((rel == NULL ||
-		 RelationIsAccessibleInLogicalDecoding(rel)) &&
-		TransactionIdIsValid(replication_slot_catalog_xmin) &&
-		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
-		result = replication_slot_catalog_xmin;
+	if (!ignoreSlots)
+	{
+		/*
+		 * Check whether there are replication slots requiring an older xmin.
+		 */
+		if (TransactionIdIsValid(replication_slot_xmin) &&
+			NormalTransactionIdPrecedes(replication_slot_xmin, result))
+			result = replication_slot_xmin;
 
+		/*
+		 * After locks have been released and defer_cleanup_age has been applied,
+		 * check whether we need to back up further to make logical decoding
+		 * possible. We need to do so if we're computing the global limit (rel =
+		 * NULL) or if the passed relation is a catalog relation of some kind.
+		 */
+		if ((rel == NULL ||
+			 RelationIsAccessibleInLogicalDecoding(rel)) &&
+			TransactionIdIsValid(replication_slot_catalog_xmin) &&
+			NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
+			result = replication_slot_catalog_xmin;
+	}
 	return result;
 }
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 14b7f7f..27990fa 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -262,6 +262,7 @@ extern XLogRecPtr GetRedoRecPtr(void);
 extern XLogRecPtr GetInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(void);
 extern void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch);
+extern bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 extern void RemovePromoteSignalFiles(void);
 
 extern bool CheckPromoteSignal(void);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index e00562d..d777644 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -59,11 +59,24 @@ typedef struct ReplicationSlotPersistentData
 	 * xmin horizon for catalog tuples
 	 *
 	 * NB: This may represent a value that hasn't been written to disk yet;
-	 * see notes for effective_xmin, below.
+	 * see notes for effective_catalog_xmin, below.
 	 */
 	TransactionId catalog_xmin;
 
-	/* oldest LSN that might be required by this replication slot */
+	/*
+	 * oldest LSN that might be required by this replication slot
+	 *
+	 * For logical slots this is the location of the most recent
+	 * xl_running_xacts record prior to the start of the oldest xact still
+	 * in-progress as of the confirmed_flush lsn. We must resume decoding at
+	 * this point to ensure we see every change made by every xact that we
+	 * might have to replay to the client.
+	 *
+	 * For physical slots this is the confirmed flush pointer from
+	 * the most recent standby reply message.
+	 *
+	 * WAL older than restart_lsn may not be removed.
+	 */
 	XLogRecPtr	restart_lsn;
 
 	/*
@@ -71,6 +84,14 @@ typedef struct ReplicationSlotPersistentData
 	 * start_lsn point in case the client doesn't specify one, and also as a
 	 * safety measure to jump forwards in case the client specifies a
 	 * start_lsn that's further in the past than this value.
+	 *
+	 * We may skip past the client's requested start point to our
+	 * confirmed_flush point when the client previously sent us feedback
+	 * in response to keepalives, empty transactions that caused no
+	 * client-side writes and no replication identifier update, then
+	 * crashed. On reconnect the client won't know it's further ahead than
+	 * it remembers, but we know it won't need this data to be processed
+	 * since it didn't do anything with it the first time.
 	 */
 	XLogRecPtr	confirmed_flush;
 
@@ -104,8 +125,9 @@ typedef struct ReplicationSlot
 	 * too soon, but the worst consequence we might encounter there is
 	 * unwanted query cancellations on the standby.  Thus, for logical
 	 * decoding, this value represents the latest xmin that has actually been
-	 * written to disk, whereas for streaming replication, it's just the same
-	 * as the persistent value (data.xmin).
+	 * written to disk and can be safely used by vacuum, etc, whereas for
+	 * streaming replication it's just the same as the persistent value
+	 * (data.xmin).
 	 */
 	TransactionId effective_xmin;
 	TransactionId effective_catalog_xmin;
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index dd37c0c..a803268 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -53,7 +53,7 @@ extern RunningTransactions GetRunningTransactionData(void);
 
 extern bool TransactionIdIsInProgress(TransactionId xid);
 extern bool TransactionIdIsActive(TransactionId xid);
-extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
+extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum, bool ignoreSlots);
 extern TransactionId GetOldestActiveTransactionId(void);
 extern TransactionId GetOldestSafeDecodingTransactionId(void);
 
-- 
2.5.5

#2Craig Ringer
craig@2ndquadrant.com
In reply to: Craig Ringer (#1)
Re: [PATCH] Send catalog_xmin separately in hot standby feedback

On 5 September 2016 at 12:40, Craig Ringer <craig@2ndquadrant.com> wrote:

Hi all

Currently hot standby feedback sends GetOldestXmin()'s result to the
upstream as the required xmin. GetOldestXmin() returns a slot's
catalog_xmin if that's the lowest xmin on the system.

Note that this patch changes the API to GetOldestXmin(), adding a new
boolean to allow it to disregard the catalog_xmin of slots.

Per Simon's feedback I'm going to split that out into a separate
patch, so will post a follow-up split one soon as the series.

--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Craig Ringer
craig@2ndquadrant.com
In reply to: Craig Ringer (#2)
4 attachment(s)
Re: [PATCH] Send catalog_xmin separately in hot standby feedback

On 5 September 2016 at 14:44, Craig Ringer <craig@2ndquadrant.com> wrote:

On 5 September 2016 at 12:40, Craig Ringer <craig@2ndquadrant.com> wrote:

Hi all

Currently hot standby feedback sends GetOldestXmin()'s result to the
upstream as the required xmin. GetOldestXmin() returns a slot's
catalog_xmin if that's the lowest xmin on the system.

Note that this patch changes the API to GetOldestXmin(), adding a new
boolean to allow it to disregard the catalog_xmin of slots.

Per Simon's feedback I'm going to split that out into a separate
patch, so will post a follow-up split one soon as the series.

Now formatted a series:

1. Send catalog_xmin in hot standby feedback protocol
2. Make walsender respect catalog_xmin in hot standby feedback messages
3. Allow GetOldestXmin(...) to optionally disregard the catalog_xmin
4. Send catalog_xmin separately in hot_standby_feedback messages

Descriptions are in the patch headers.

1 adds the protocol field only. The value is at this point always sent
as 0 by walreceiver and ignored by walsender. There's need to handle
backward compatibility in the addition to the hot standby protocol
message here as only the same major version Pg has any business
sending us hot standby feedback. pg_receivexlog, pg_basebackup etc
don't use hs feedback. Includes protocol docs change.

2 makes walsender now pay attention to the sent catalog_xmin.
walreceiver doesn't set it yet and has no way to get it separately.

3 Provides a way to get the global xmin without considering the
catalog_xmin so walreceiver can use it.

4 makes walsender use the modified GetOldestXmin()

(3) needs additional attention:

By ignoring slot catalog_xmin in the GetOldestXmin() call then
separately calling ProcArrayGetReplicationSlotXmin() to get the
catalog_xmin to we might produce a catalog xmin slightly later than
what was in the procarray at the time we previously called
GetOldestXmin() to examine backend/session state. ProcArrayLock is
released so it can be advanced in-between the calls. That's harmless -
it isn't necessary for the reported catalog_xmin to be exactly
consistent with backend state. If it advances it's safe to report the
new position since we know the confirmed positions are on-disk
locally.

The alternative here is to extend GetOldestXmin() to take an out-param
to report the slot catalog xmin. That really just duplicates the
functionality of ProcArrayGetReplicationSlotXmin but means we can grab
it within a single ProcArray lock. Variants of GetOldestXmin and
ProcArrayGetReplicationSlotXmin that take an already-locked flag would
work too, but would hold the lock across parts of GetOldestXmin() that
currently don't retain it. I could also convert the current boolean
param ignoreVacuum into a flags argument instead of adding another
boolean. No real preference from me.

I cut out some comment changes to be submitted separately; otherwise
this series is much the same as the original patch upthread.

Also available at
https://github.com/2ndQuadrant/postgres/tree/dev/feedback-catalog-xmin
(and tagged dev/feedback-catalog-xmin). Branch subject to rebasing.

--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

Attachments:

0001-Send-catalog_xmin-in-hot-standby-feedback-protocol.patchtext/x-patch; charset=US-ASCII; name=0001-Send-catalog_xmin-in-hot-standby-feedback-protocol.patchDownload
From 275c6422962f1fc326cc5f0c92186de0b127c472 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 5 Sep 2016 15:30:53 +0800
Subject: [PATCH 1/6] Send catalog_xmin in hot standby feedback protocol

Add catalog_xmin to the to the hot standby feedback protocol so a read replica
that has logical slots can use its physical slot to the master to hold down the
master's catalog_xmin. This information will let a replica prevent vacuuming of
catalog tuples still required by the replica's logical slots.

This is the hot standby feedback protocol change, the new value is always set
to zero by the walreceiver and is ignored by the walsender.
---
 doc/src/sgml/protocol.sgml            | 33 ++++++++++++++++++++++++++++-----
 src/backend/replication/walreceiver.c | 20 ++++++++++++++------
 src/backend/replication/walsender.c   | 14 ++++++++++++--
 3 files changed, 54 insertions(+), 13 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 68b0941..c4e41ca 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1783,10 +1783,11 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current xmin. This may be 0, if the standby is
-          sending notification that Hot Standby feedback will no longer
-          be sent on this connection. Later non-zero messages may
-          reinitiate the feedback mechanism.
+          The standby's current global xmin, excluding the catalog_xmin from any
+          replication slots. If both this value and the following
+          catalog_xmin are 0 this is treated as a notification that Hot Standby
+          feedback will no longer be sent on this connection. Later non-zero
+          messages may reinitiate the feedback mechanism.
       </para>
       </listitem>
       </varlistentry>
@@ -1796,7 +1797,29 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current epoch.
+          The epoch of the global xmin xid on the standby.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The lowest catalog_xmin of any replication slots on the standby. Set to 0
+          if no catalog_xmin exists on the standby or if hot standby feedback is being
+          disabled. New in 10.0.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The epoch of the catalog_xmin xid on the standby. New in 10.0.
       </para>
       </listitem>
       </varlistentry>
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 413ee3a..0b92aac 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1161,8 +1161,8 @@ XLogWalRcvSendHSFeedback(bool immed)
 {
 	TimestampTz now;
 	TransactionId nextXid;
-	uint32		nextEpoch;
-	TransactionId xmin;
+	uint32		nextEpoch, xmin_epoch, catalog_xmin_epoch;
+	TransactionId xmin, catalog_xmin;
 	static TimestampTz sendTime = 0;
 	static bool master_has_standby_xmin = false;
 
@@ -1207,23 +1207,31 @@ XLogWalRcvSendHSFeedback(bool immed)
 	else
 		xmin = InvalidTransactionId;
 
+	catalog_xmin = InvalidTransactionId;
+
 	/*
 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
 	 * the epoch boundary.
 	 */
 	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+	xmin_epoch = nextEpoch;
 	if (nextXid < xmin)
-		nextEpoch--;
+		xmin_epoch --;
+	catalog_xmin_epoch = nextEpoch;
+	if (nextXid < catalog_xmin)
+		catalog_xmin_epoch --;
 
-	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
-		 xmin, nextEpoch);
+	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
+		 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
 
 	/* Construct the message and send it. */
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, 'h');
 	pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
 	pq_sendint(&reply_message, xmin, 4);
-	pq_sendint(&reply_message, nextEpoch, 4);
+	pq_sendint(&reply_message, xmin_epoch, 4);
+	pq_sendint(&reply_message, catalog_xmin, 4);
+	pq_sendint(&reply_message, catalog_xmin_epoch, 4);
 	walrcv_send(reply_message.data, reply_message.len);
 	if (TransactionIdIsValid(xmin))
 		master_has_standby_xmin = true;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1ea2a5c..efa76e1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1633,6 +1633,8 @@ ProcessStandbyHSFeedbackMessage(void)
 	uint32		nextEpoch;
 	TransactionId feedbackXmin;
 	uint32		feedbackEpoch;
+	TransactionId feedbackCatalogXmin;
+	uint32		feedbackCatalogEpoch;
 
 	/*
 	 * Decipher the reply message. The caller already consumed the msgtype
@@ -1641,10 +1643,18 @@ ProcessStandbyHSFeedbackMessage(void)
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
 	feedbackXmin = pq_getmsgint(&reply_message, 4);
 	feedbackEpoch = pq_getmsgint(&reply_message, 4);
+	/*
+	 * A 10.0+ standby's walsender passes the lowest catalog xmin of any
+	 * replication slot up to the master.
+	 */
+	feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
 
-	elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+	elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
 		 feedbackXmin,
-		 feedbackEpoch);
+		 feedbackEpoch,
+		 feedbackCatalogXmin,
+		 feedbackCatalogEpoch);
 
 	/* Unset WalSender's xmin if the feedback message value is invalid */
 	if (!TransactionIdIsNormal(feedbackXmin))
-- 
2.5.5

0002-Make-walsender-respect-catalog_xmin-in-hot-standby-f.patchtext/x-patch; charset=US-ASCII; name=0002-Make-walsender-respect-catalog_xmin-in-hot-standby-f.patchDownload
From ade1ef3e9907bd6729dfff4840d0ec79f155a868 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 5 Sep 2016 15:38:40 +0800
Subject: [PATCH 2/6] Make walsender respect catalog_xmin in hot standby
 feedback messages

The walsender now respects the new catalog_xmin field in the hot standby
feedback message. It uses it to set the catalog_xmin field on its physical
replication slot if one is in use. Otherwise it sets its process xmin to the
older of the xmin and catalog_xmin, so the outcome is the same as before
the protocol change.

In the process, factor out walsender's sanity check for xid+epoch wraparound
into a separate TransactionIdInRecentPast() function since we're now checking
it in two places.
---
 src/backend/replication/walsender.c | 109 ++++++++++++++++++++++++++++--------
 1 file changed, 86 insertions(+), 23 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index efa76e1..c40f3e1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -215,6 +215,7 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
@@ -1531,6 +1532,11 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	 * be energy wasted - the worst lost information can do here is give us
 	 * wrong information in a statistics view - we'll just potentially be more
 	 * conservative in removing files.
+	 *
+	 * We don't have to do any effective_xmin / effective_catalog_xmin testing
+	 * here either, like for LogicalConfirmReceivedLocation. If we received
+	 * the xmin and catalog_xmin from downstream replication slots we know they
+	 * were already confirmed there,
 	 */
 }
 
@@ -1593,7 +1599,7 @@ ProcessStandbyReplyMessage(void)
 
 /* compute new replication slot xmin horizon if needed */
 static void
-PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 {
 	bool		changed = false;
 	ReplicationSlot *slot = MyReplicationSlot;
@@ -1614,6 +1620,22 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 		slot->data.xmin = feedbackXmin;
 		slot->effective_xmin = feedbackXmin;
 	}
+	/*
+	 * If the physical slot is relaying catalog_xmin for logical replication
+	 * slots on the replica it's safe to act on catalog_xmin advances
+	 * immediately too. The replica will only send a new catalog_xmin via
+	 * feedback when it advances its effective_catalog_xmin, so it's done the
+	 * delay-until-confirmed dance for us and knows it won't need the data
+	 * we're protecting from vacuum again.
+	 */
+	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
+		!TransactionIdIsNormal(feedbackCatalogXmin) ||
+		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
+	{
+		changed = true;
+		slot->data.catalog_xmin = feedbackCatalogXmin;
+		slot->effective_catalog_xmin = feedbackCatalogXmin;
+	}
 	SpinLockRelease(&slot->mutex);
 
 	if (changed)
@@ -1624,13 +1646,43 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 }
 
 /*
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
+ */
+static bool
+TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
+{
+	TransactionId nextXid;
+	uint32		nextEpoch;
+
+	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+	if (xid <= nextXid)
+	{
+		if (epoch != nextEpoch)
+			return false;
+	}
+	else
+	{
+		if (epoch + 1 != nextEpoch)
+			return false;
+	}
+
+	if (!TransactionIdPrecedesOrEquals(xid, nextXid))
+		return false;				/* epoch OK, but it's wrapped around */
+
+	return true;
+}
+
+/*
  * Hot Standby feedback
  */
 static void
 ProcessStandbyHSFeedbackMessage(void)
 {
-	TransactionId nextXid;
-	uint32		nextEpoch;
 	TransactionId feedbackXmin;
 	uint32		feedbackEpoch;
 	TransactionId feedbackCatalogXmin;
@@ -1656,38 +1708,35 @@ ProcessStandbyHSFeedbackMessage(void)
 		 feedbackCatalogXmin,
 		 feedbackCatalogEpoch);
 
-	/* Unset WalSender's xmin if the feedback message value is invalid */
-	if (!TransactionIdIsNormal(feedbackXmin))
+	/*
+	 * Unset WalSender's xmins if the feedback message values are invalid.
+	 * This happens when the downstream turned hot_standby_feedback off.
+	 */
+	if (!TransactionIdIsNormal(feedbackXmin)
+		&& !TransactionIdIsNormal(feedbackCatalogXmin))
 	{
 		MyPgXact->xmin = InvalidTransactionId;
 		if (MyReplicationSlot != NULL)
-			PhysicalReplicationSlotNewXmin(feedbackXmin);
+			PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 		return;
 	}
 
 	/*
 	 * Check that the provided xmin/epoch are sane, that is, not in the future
 	 * and not so far back as to be already wrapped around.  Ignore if not.
-	 *
-	 * Epoch of nextXid should be same as standby, or if the counter has
-	 * wrapped, then one greater than standby.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
-
-	if (feedbackXmin <= nextXid)
+	if (TransactionIdIsNormal(feedbackXmin) &&
+		!TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
 	{
-		if (feedbackEpoch != nextEpoch)
-			return;
+		return;
 	}
-	else
+
+	if (TransactionIdIsNormal(feedbackCatalogXmin) &&
+		!TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
 	{
-		if (feedbackEpoch + 1 != nextEpoch)
-			return;
+		return;
 	}
 
-	if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
-		return;					/* epoch OK, but it's wrapped around */
-
 	/*
 	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
 	 * the xmin will be taken into account by GetOldestXmin.  This will hold
@@ -1711,15 +1760,29 @@ ProcessStandbyHSFeedbackMessage(void)
 	 * already since a VACUUM could have just finished calling GetOldestXmin.)
 	 *
 	 * If we're using a replication slot we reserve the xmin via that,
-	 * otherwise via the walsender's PGXACT entry.
+	 * otherwise via the walsender's PGXACT entry. We can only track the
+	 * catalog xmin separately when using a slot, so we store the least
+	 * of the two provided when not using a slot.
 	 *
 	 * XXX: It might make sense to generalize the ephemeral slot concept and
 	 * always use the slot mechanism to handle the feedback xmin.
 	 */
 	if (MyReplicationSlot != NULL)		/* XXX: persistency configurable? */
-		PhysicalReplicationSlotNewXmin(feedbackXmin);
+	{
+		PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
+	}
 	else
-		MyPgXact->xmin = feedbackXmin;
+	{
+		if (TransactionIdIsNormal(feedbackCatalogXmin)
+			&& TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
+		{
+			MyPgXact->xmin = feedbackCatalogXmin;
+		}
+		else
+		{
+			MyPgXact->xmin = feedbackXmin;
+		}
+	}
 }
 
 /*
-- 
2.5.5

0003-Allow-GetOldestXmin-.-to-optionally-disregard-the-ca.patchtext/x-patch; charset=US-ASCII; name=0003-Allow-GetOldestXmin-.-to-optionally-disregard-the-ca.patchDownload
From c85aec273b4345ee4198ce69cb1f8a29279495b7 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 5 Sep 2016 16:13:35 +0800
Subject: [PATCH 3/6] Allow GetOldestXmin(...) to optionally disregard the
 catalog_xmin

Add a new ignoreCatalogXmin option to GetOldestXmin(...), for use when
calculating hot standby feedback xmins. Adjust existing call sites.
---
 contrib/pg_visibility/pg_visibility.c |  4 ++--
 contrib/pgstattuple/pgstatapprox.c    |  2 +-
 src/backend/access/transam/xlog.c     |  4 ++--
 src/backend/catalog/index.c           |  2 +-
 src/backend/commands/analyze.c        |  2 +-
 src/backend/commands/vacuum.c         |  4 ++--
 src/backend/replication/walreceiver.c |  2 +-
 src/backend/storage/ipc/procarray.c   | 42 +++++++++++++++++++++--------------
 src/include/storage/procarray.h       |  2 +-
 9 files changed, 36 insertions(+), 28 deletions(-)

diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c
index 7034066..318caab 100644
--- a/contrib/pg_visibility/pg_visibility.c
+++ b/contrib/pg_visibility/pg_visibility.c
@@ -523,7 +523,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 	if (all_visible)
 	{
 		/* Don't pass rel; that will fail in recovery. */
-		OldestXmin = GetOldestXmin(NULL, true);
+		OldestXmin = GetOldestXmin(NULL, true, false);
 	}
 
 	rel = relation_open(relid, AccessShareLock);
@@ -646,7 +646,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 				 * a buffer lock. And this shouldn't happen often, so it's
 				 * worth being careful so as to avoid false positives.
 				 */
-				RecomputedOldestXmin = GetOldestXmin(NULL, true);
+				RecomputedOldestXmin = GetOldestXmin(NULL, true, false);
 
 				if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin))
 					record_corrupt_item(items, &tuple.t_data->t_ctid);
diff --git a/contrib/pgstattuple/pgstatapprox.c b/contrib/pgstattuple/pgstatapprox.c
index a49ff54..3674c05 100644
--- a/contrib/pgstattuple/pgstatapprox.c
+++ b/contrib/pgstattuple/pgstatapprox.c
@@ -67,7 +67,7 @@ statapprox_heap(Relation rel, output_type *stat)
 	TransactionId OldestXmin;
 	uint64		misc_count = 0;
 
-	OldestXmin = GetOldestXmin(rel, true);
+	OldestXmin = GetOldestXmin(rel, true, false);
 	bstrategy = GetAccessStrategy(BAS_BULKREAD);
 
 	nblocks = RelationGetNumberOfBlocks(rel);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 2189c22..8546649 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -8635,7 +8635,7 @@ CreateCheckPoint(int flags)
 	 * StartupSUBTRANS hasn't been called yet.
 	 */
 	if (!RecoveryInProgress())
-		TruncateSUBTRANS(GetOldestXmin(NULL, false));
+		TruncateSUBTRANS(GetOldestXmin(NULL, false, false));
 
 	/* Real work is done, but log and update stats before releasing lock. */
 	LogCheckpointEnd(false);
@@ -8974,7 +8974,7 @@ CreateRestartPoint(int flags)
 	 * this because StartupSUBTRANS hasn't been called yet.
 	 */
 	if (EnableHotStandby)
-		TruncateSUBTRANS(GetOldestXmin(NULL, false));
+		TruncateSUBTRANS(GetOldestXmin(NULL, false, false));
 
 	/* Real work is done, but log and update before releasing lock. */
 	LogCheckpointEnd(true);
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index b0b43cf..fd86530 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2255,7 +2255,7 @@ IndexBuildHeapRangeScan(Relation heapRelation,
 	{
 		snapshot = SnapshotAny;
 		/* okay to ignore lazy VACUUMs here */
-		OldestXmin = GetOldestXmin(heapRelation, true);
+		OldestXmin = GetOldestXmin(heapRelation, true, false);
 	}
 
 	scan = heap_beginscan_strat(heapRelation,	/* relation */
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index c617abb..2170566 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -992,7 +992,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	totalblocks = RelationGetNumberOfBlocks(onerel);
 
 	/* Need a cutoff xmin for HeapTupleSatisfiesVacuum */
-	OldestXmin = GetOldestXmin(onerel, true);
+	OldestXmin = GetOldestXmin(onerel, true, false);
 
 	/* Prepare for sampling block numbers */
 	BlockSampler_Init(&bs, totalblocks, targrows, random());
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 58bbf55..79ec690 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -497,7 +497,7 @@ vacuum_set_xid_limits(Relation rel,
 	 * always an independent transaction.
 	 */
 	*oldestXmin =
-		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel);
+		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true, false), rel);
 
 	Assert(TransactionIdIsNormal(*oldestXmin));
 
@@ -909,7 +909,7 @@ vac_update_datfrozenxid(void)
 	 * committed pg_class entries for new tables; see AddNewRelationTuple().
 	 * So we cannot produce a wrong minimum by starting with this.
 	 */
-	newFrozenXid = GetOldestXmin(NULL, true);
+	newFrozenXid = GetOldestXmin(NULL, true, false);
 
 	/*
 	 * Similarly, initialize the MultiXact "min" with the value that would be
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 0b92aac..0f38328 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1203,7 +1203,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 	 * everything else has been checked.
 	 */
 	if (hot_standby_feedback)
-		xmin = GetOldestXmin(NULL, false);
+		xmin = GetOldestXmin(NULL, false, false);
 	else
 		xmin = InvalidTransactionId;
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index e5d487d..1912790 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1298,17 +1298,22 @@ TransactionIdIsActive(TransactionId xid)
  * process can set its xmin based on transactions that are no longer running
  * in the master but are still being replayed on the standby, thus possibly
  * making the GetOldestXmin reading go backwards.  In this case there is a
- * possibility that we lose data that the standby would like to have, but
- * there is little we can do about that --- data is only protected if the
- * walsender runs continuously while queries are executed on the standby.
- * (The Hot Standby code deals with such cases by failing standby queries
- * that needed to access already-removed data, so there's no integrity bug.)
+ * possibility that we lose data that the standby would like to have
+ * unless the standby uses a replication slot to make its xmin persistent
+ * even when it isn't connected. The Hot Standby code deals with such cases by
+ * failing standby queries that needed to access already-removed data, so
+ * there's no integrity bug.
+ *
  * The return value is also adjusted with vacuum_defer_cleanup_age, so
  * increasing that setting on the fly is another easy way to make
  * GetOldestXmin() move backwards, with no consequences for data integrity.
+ *
+ * The caller may request that replication slots' catalog_xmin values be
+ * disregarded when calculating the global xmin. The caller must account
+ * for catalog_xmin separately.
  */
 TransactionId
-GetOldestXmin(Relation rel, bool ignoreVacuum)
+GetOldestXmin(Relation rel, bool ignoreVacuum, bool ignoreCatalogXmin)
 {
 	ProcArrayStruct *arrayP = procArray;
 	TransactionId result;
@@ -1433,17 +1438,20 @@ GetOldestXmin(Relation rel, bool ignoreVacuum)
 		NormalTransactionIdPrecedes(replication_slot_xmin, result))
 		result = replication_slot_xmin;
 
-	/*
-	 * After locks have been released and defer_cleanup_age has been applied,
-	 * check whether we need to back up further to make logical decoding
-	 * possible. We need to do so if we're computing the global limit (rel =
-	 * NULL) or if the passed relation is a catalog relation of some kind.
-	 */
-	if ((rel == NULL ||
-		 RelationIsAccessibleInLogicalDecoding(rel)) &&
-		TransactionIdIsValid(replication_slot_catalog_xmin) &&
-		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
-		result = replication_slot_catalog_xmin;
+	if (!ignoreCatalogXmin)
+	{
+		/*
+		 * After locks have been released and defer_cleanup_age has been applied,
+		 * check whether we need to back up further to make logical decoding
+		 * possible. We need to do so if we're computing the global limit (rel =
+		 * NULL) or if the passed relation is a catalog relation of some kind.
+		 */
+		if ((rel == NULL ||
+			 RelationIsAccessibleInLogicalDecoding(rel)) &&
+			TransactionIdIsValid(replication_slot_catalog_xmin) &&
+			NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
+			result = replication_slot_catalog_xmin;
+	}
 
 	return result;
 }
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index dd37c0c..a63f6ac 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -53,7 +53,7 @@ extern RunningTransactions GetRunningTransactionData(void);
 
 extern bool TransactionIdIsInProgress(TransactionId xid);
 extern bool TransactionIdIsActive(TransactionId xid);
-extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
+extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum, bool ignoreCatalogXmin);
 extern TransactionId GetOldestActiveTransactionId(void);
 extern TransactionId GetOldestSafeDecodingTransactionId(void);
 
-- 
2.5.5

0004-Send-catalog_xmin-separately-in-hot_standby_feedback.patchtext/x-patch; charset=US-ASCII; name=0004-Send-catalog_xmin-separately-in-hot_standby_feedback.patchDownload
From 89956e6f6154d43f4b0aa89fd234e4b4a3ffb171 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 5 Sep 2016 16:23:57 +0800
Subject: [PATCH 4/6] Send catalog_xmin separately in hot_standby_feedback
 messages

Now that the protocol supports reporting catalog_xmin separately and
GetOldestXmin() allows us to exclude the catalog_xmin from the calculated xmin,
actually send a separate catalog_xmin to the master.

This change is necessary, but not sufficient, to allow logical decoding
on a standby.
---
 src/backend/replication/walreceiver.c | 19 +++++++++++++++----
 1 file changed, 15 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 0f38328..344f0e8 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1203,11 +1203,22 @@ XLogWalRcvSendHSFeedback(bool immed)
 	 * everything else has been checked.
 	 */
 	if (hot_standby_feedback)
-		xmin = GetOldestXmin(NULL, false, false);
+	{
+		/*
+		 * Usually GetOldestXmin() would include the catalog_xmin in its
+		 * calculations, but we don't want to hold upstream back from vacuuming
+		 * normal user table tuples just because they're within the
+		 * catalog_xmin horizon of logical replication slots on this standby.
+		 * Instead we report the catalog_xmin to the upstream separately.
+		 */
+		xmin = GetOldestXmin(NULL, false, true);
+		ProcArrayGetReplicationSlotXmin(NULL, &catalog_xmin);
+	}
 	else
+	{
 		xmin = InvalidTransactionId;
-
-	catalog_xmin = InvalidTransactionId;
+		catalog_xmin = InvalidTransactionId;
+	}
 
 	/*
 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
@@ -1233,7 +1244,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 	pq_sendint(&reply_message, catalog_xmin, 4);
 	pq_sendint(&reply_message, catalog_xmin_epoch, 4);
 	walrcv_send(reply_message.data, reply_message.len);
-	if (TransactionIdIsValid(xmin))
+	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
 		master_has_standby_xmin = true;
 	else
 		master_has_standby_xmin = false;
-- 
2.5.5

#4Petr Jelinek
petr@2ndquadrant.com
In reply to: Craig Ringer (#3)
Re: [PATCH] Send catalog_xmin separately in hot standby feedback

Hi Craig,

On 05/09/16 11:28, Craig Ringer wrote:

On 5 September 2016 at 14:44, Craig Ringer <craig@2ndquadrant.com> wrote:

On 5 September 2016 at 12:40, Craig Ringer <craig@2ndquadrant.com> wrote:

Hi all

Currently hot standby feedback sends GetOldestXmin()'s result to the
upstream as the required xmin. GetOldestXmin() returns a slot's
catalog_xmin if that's the lowest xmin on the system.

Note that this patch changes the API to GetOldestXmin(), adding a new
boolean to allow it to disregard the catalog_xmin of slots.

Per Simon's feedback I'm going to split that out into a separate
patch, so will post a follow-up split one soon as the series.

Here is my review of them.

Now formatted a series:

1. Send catalog_xmin in hot standby feedback protocol

+	xmin_epoch = nextEpoch;
if (nextXid < xmin)
-		nextEpoch--;
+		xmin_epoch --;
+	catalog_xmin_epoch = nextEpoch;
+	if (nextXid < catalog_xmin)
+		catalog_xmin_epoch --;

Don't understand why you keep the nextEpoch here, it's not used anywhere
that I can see, you could just as well use the xmin_epoch directly if
that's how you want to name it.

+	/*
+	 * A 10.0+ standby's walsender passes the lowest catalog xmin of any
+	 * replication slot up to the master.
+	 */
+	feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);

I'd be more interested to know why this is sent rather than it's sent
since version 10+ in this comment.

2. Make walsender respect catalog_xmin in hot standby feedback messages

+		if (TransactionIdIsNormal(feedbackCatalogXmin)
+			&& TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
+		{
+			MyPgXact->xmin = feedbackCatalogXmin;
+		}
+		else
+		{
+			MyPgXact->xmin = feedbackXmin;
+		}

This not how we usually use the {} brackets (there are some more
instances of using them for one line of code in this particular commit).

3. Allow GetOldestXmin(...) to optionally disregard the catalog_xmin
By ignoring slot catalog_xmin in the GetOldestXmin() call then
separately calling ProcArrayGetReplicationSlotXmin() to get the
catalog_xmin to we might produce a catalog xmin slightly later than
what was in the procarray at the time we previously called
GetOldestXmin() to examine backend/session state. ProcArrayLock is
released so it can be advanced in-between the calls. That's harmless -
it isn't necessary for the reported catalog_xmin to be exactly
consistent with backend state. If it advances it's safe to report the
new position since we know the confirmed positions are on-disk
locally.

The alternative here is to extend GetOldestXmin() to take an out-param
to report the slot catalog xmin. That really just duplicates the
functionality of ProcArrayGetReplicationSlotXmin but means we can grab
it within a single ProcArray lock. Variants of GetOldestXmin and
ProcArrayGetReplicationSlotXmin that take an already-locked flag would
work too, but would hold the lock across parts of GetOldestXmin() that
currently don't retain it. I could also convert the current boolean
param ignoreVacuum into a flags argument instead of adding another
boolean. No real preference from me.

I would honestly prefer the change to GetOldestXmin to return the
catalog_xmin. It seems both cleaner and does less locking.

4. Send catalog_xmin separately in hot_standby_feedback messages

This looks okay (provided the change above).

In general it's simpler patch than I expected which is good. But it
would be good to have some tests.

--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#5Craig Ringer
craig@2ndquadrant.com
In reply to: Petr Jelinek (#4)
5 attachment(s)
Re: [PATCH] Send catalog_xmin separately in hot standby feedback

On 25 October 2016 at 00:19, Petr Jelinek <petr@2ndquadrant.com> wrote:

Now formatted a series:

1. Send catalog_xmin in hot standby feedback protocol

+     xmin_epoch = nextEpoch;
if (nextXid < xmin)
-             nextEpoch--;
+             xmin_epoch --;
+     catalog_xmin_epoch = nextEpoch;
+     if (nextXid < catalog_xmin)
+             catalog_xmin_epoch --;

Don't understand why you keep the nextEpoch here, it's not used anywhere
that I can see, you could just as well use the xmin_epoch directly if
that's how you want to name it.

Fixed, thanks.

+     /*
+      * A 10.0+ standby's walsender passes the lowest catalog xmin of any
+      * replication slot up to the master.
+      */
+     feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
+     feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);

I'd be more interested to know why this is sent rather than it's sent
since version 10+ in this comment.

Removed. It's explained in a comment inside the if
(hot_standby_feedback) block in walreceiver anyway.

2. Make walsender respect catalog_xmin in hot standby feedback messages

+             if (TransactionIdIsNormal(feedbackCatalogXmin)
+                     && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
+             {
+                     MyPgXact->xmin = feedbackCatalogXmin;
+             }
+             else
+             {
+                     MyPgXact->xmin = feedbackXmin;
+             }

This not how we usually use the {} brackets (there are some more
instances of using them for one line of code in this particular commit).

Whoops. Thanks. I find the Pg convention pretty ghastly when dealing
with multi-line 'if' conditions followed by a single-line statement,
but it's still the convention whether I like it or not.

3. Allow GetOldestXmin(...) to optionally disregard the catalog_xmin
By ignoring slot catalog_xmin in the GetOldestXmin() call then
separately calling ProcArrayGetReplicationSlotXmin() to get the
catalog_xmin to we might produce a catalog xmin slightly later than
what was in the procarray at the time we previously called
GetOldestXmin() to examine backend/session state. ProcArrayLock is
released so it can be advanced in-between the calls. That's harmless -
it isn't necessary for the reported catalog_xmin to be exactly
consistent with backend state. If it advances it's safe to report the
new position since we know the confirmed positions are on-disk
locally.

The alternative here is to extend GetOldestXmin() to take an out-param
to report the slot catalog xmin. That really just duplicates the
functionality of ProcArrayGetReplicationSlotXmin but means we can grab
it within a single ProcArray lock. Variants of GetOldestXmin and
ProcArrayGetReplicationSlotXmin that take an already-locked flag would
work too, but would hold the lock across parts of GetOldestXmin() that
currently don't retain it. I could also convert the current boolean
param ignoreVacuum into a flags argument instead of adding another
boolean. No real preference from me.

I would honestly prefer the change to GetOldestXmin to return the
catalog_xmin. It seems both cleaner and does less locking.

Fair enough. Done.

In general it's simpler patch than I expected which is good. But it
would be good to have some tests.

Agreed. OK, I've added basic tests for physical replication slots and
hot_standby_feedback to t/001_stream_rep.pl since it make sense to
test both along with stream_rep.pl's tests of cascading, etc and I
don't think a separate test is needed.

It's not actually practical to add tests for the catalog_xmin on
standby functionality until the next patch in the series (pending)
which enables logical decoding on standby. Currently you can't create
a slot on a standby so you can't cause the standby to hold down
catalog_xmin. But the tests show things work how they should within
the range of currently exposed functionality.

In the process I noticed how skeletal those tests still are. We have a
great framework now (thanks Michael!) and I'd like to start filling it
out with tests involving unclean shutdowns, promotions, etc. There's a
lot still to write to get solid coverage. Tests aren't hard. Who's
keen to write some? I'll happily help any volunteers out.

New patch series attached. 0001 is the new tests. The guts is patches
2-5. I'm not sure whether 2, 3, 4 and 5 should be squashed for commit
or not, but I left them separate for easier review.

For complete functionality this series will want to be coupled with
logical decoding timeline following and a pending patch to enable
logical decoding on standby.

--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

Attachments:

0005-Send-catalog_xmin-separately-in-hot_standby_feedback.patchtext/x-patch; charset=US-ASCII; name=0005-Send-catalog_xmin-separately-in-hot_standby_feedback.patchDownload
From 80964a3b4f6a98f83a123f69924f2627d89c7a5b Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 5 Sep 2016 16:23:57 +0800
Subject: [PATCH 5/7] Send catalog_xmin separately in hot_standby_feedback
 messages

Now that the protocol supports reporting catalog_xmin separately and
GetOldestXmin() allows us to exclude the catalog_xmin from the calculated xmin,
actually send a separate catalog_xmin to the master.

This change is necessary, but not sufficient, to allow logical decoding
on a standby.
---
 src/backend/replication/walreceiver.c | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 80cc482..318d8ce 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1206,11 +1206,21 @@ XLogWalRcvSendHSFeedback(bool immed)
 	 * everything else has been checked.
 	 */
 	if (hot_standby_feedback)
-		xmin = GetOldestXmin(NULL, false, NULL);
+	{
+		/*
+		 * Usually GetOldestXmin() would include the catalog_xmin in its
+		 * calculations, but we don't want to hold upstream back from vacuuming
+		 * normal user table tuples just because they're within the
+		 * catalog_xmin horizon of logical replication slots on this standby.
+		 * Instead we report the catalog_xmin to the upstream separately.
+		 */
+		xmin = GetOldestXmin(NULL, false, &catalog_xmin);
+	}
 	else
+	{
 		xmin = InvalidTransactionId;
-
-	catalog_xmin = InvalidTransactionId;
+		catalog_xmin = InvalidTransactionId;
+	}
 
 	/*
 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
@@ -1235,7 +1245,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 	pq_sendint(&reply_message, catalog_xmin, 4);
 	pq_sendint(&reply_message, catalog_xmin_epoch, 4);
 	walrcv_send(reply_message.data, reply_message.len);
-	if (TransactionIdIsValid(xmin))
+	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
 		master_has_standby_xmin = true;
 	else
 		master_has_standby_xmin = false;
-- 
2.5.5

0004-Allow-GetOldestXmin-.-to-optionally-disregard-the-ca.patchtext/x-patch; charset=US-ASCII; name=0004-Allow-GetOldestXmin-.-to-optionally-disregard-the-ca.patchDownload
From 35cd1cf69f11afece009f4508f8194f61ceb70cc Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 5 Sep 2016 16:13:35 +0800
Subject: [PATCH 4/7] Allow GetOldestXmin(...) to optionally disregard the
 catalog_xmin

Add a new catalog_xmin out-parameter to GetOldestXmin(...), for use when
calculating hot standby feedback xmins. When passed, any needed catalog_xmin is
returned separately instead of being merged with the return value. Adjust
existing call sites.
---
 contrib/pg_visibility/pg_visibility.c |  4 +--
 contrib/pgstattuple/pgstatapprox.c    |  2 +-
 src/backend/access/transam/xlog.c     |  4 +--
 src/backend/catalog/index.c           |  2 +-
 src/backend/commands/analyze.c        |  2 +-
 src/backend/commands/vacuum.c         |  4 +--
 src/backend/replication/walreceiver.c |  2 +-
 src/backend/storage/ipc/procarray.c   | 51 +++++++++++++++++++++++------------
 src/include/storage/procarray.h       |  2 +-
 9 files changed, 45 insertions(+), 28 deletions(-)

diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c
index 9985e3e..4fa3ad4 100644
--- a/contrib/pg_visibility/pg_visibility.c
+++ b/contrib/pg_visibility/pg_visibility.c
@@ -538,7 +538,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 	if (all_visible)
 	{
 		/* Don't pass rel; that will fail in recovery. */
-		OldestXmin = GetOldestXmin(NULL, true);
+		OldestXmin = GetOldestXmin(NULL, true, false);
 	}
 
 	rel = relation_open(relid, AccessShareLock);
@@ -660,7 +660,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 				 * a buffer lock. And this shouldn't happen often, so it's
 				 * worth being careful so as to avoid false positives.
 				 */
-				RecomputedOldestXmin = GetOldestXmin(NULL, true);
+				RecomputedOldestXmin = GetOldestXmin(NULL, true, false);
 
 				if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin))
 					record_corrupt_item(items, &tuple.t_self);
diff --git a/contrib/pgstattuple/pgstatapprox.c b/contrib/pgstattuple/pgstatapprox.c
index f524fc4..5b33c97 100644
--- a/contrib/pgstattuple/pgstatapprox.c
+++ b/contrib/pgstattuple/pgstatapprox.c
@@ -70,7 +70,7 @@ statapprox_heap(Relation rel, output_type *stat)
 	TransactionId OldestXmin;
 	uint64		misc_count = 0;
 
-	OldestXmin = GetOldestXmin(rel, true);
+	OldestXmin = GetOldestXmin(rel, true, false);
 	bstrategy = GetAccessStrategy(BAS_BULKREAD);
 
 	nblocks = RelationGetNumberOfBlocks(rel);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6cec027..56c672c 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -8653,7 +8653,7 @@ CreateCheckPoint(int flags)
 	 * StartupSUBTRANS hasn't been called yet.
 	 */
 	if (!RecoveryInProgress())
-		TruncateSUBTRANS(GetOldestXmin(NULL, false));
+		TruncateSUBTRANS(GetOldestXmin(NULL, false, NULL));
 
 	/* Real work is done, but log and update stats before releasing lock. */
 	LogCheckpointEnd(false);
@@ -9016,7 +9016,7 @@ CreateRestartPoint(int flags)
 	 * this because StartupSUBTRANS hasn't been called yet.
 	 */
 	if (EnableHotStandby)
-		TruncateSUBTRANS(GetOldestXmin(NULL, false));
+		TruncateSUBTRANS(GetOldestXmin(NULL, false, NULL));
 
 	/* Real work is done, but log and update before releasing lock. */
 	LogCheckpointEnd(true);
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 08b646d..b673c06 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2272,7 +2272,7 @@ IndexBuildHeapRangeScan(Relation heapRelation,
 	{
 		snapshot = SnapshotAny;
 		/* okay to ignore lazy VACUUMs here */
-		OldestXmin = GetOldestXmin(heapRelation, true);
+		OldestXmin = GetOldestXmin(heapRelation, true, NULL);
 	}
 
 	scan = heap_beginscan_strat(heapRelation,	/* relation */
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index c617abb..9b0cc3a 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -992,7 +992,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	totalblocks = RelationGetNumberOfBlocks(onerel);
 
 	/* Need a cutoff xmin for HeapTupleSatisfiesVacuum */
-	OldestXmin = GetOldestXmin(onerel, true);
+	OldestXmin = GetOldestXmin(onerel, true, NULL);
 
 	/* Prepare for sampling block numbers */
 	BlockSampler_Init(&bs, totalblocks, targrows, random());
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 58bbf55..aaee9a6 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -497,7 +497,7 @@ vacuum_set_xid_limits(Relation rel,
 	 * always an independent transaction.
 	 */
 	*oldestXmin =
-		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel);
+		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true, NULL), rel);
 
 	Assert(TransactionIdIsNormal(*oldestXmin));
 
@@ -909,7 +909,7 @@ vac_update_datfrozenxid(void)
 	 * committed pg_class entries for new tables; see AddNewRelationTuple().
 	 * So we cannot produce a wrong minimum by starting with this.
 	 */
-	newFrozenXid = GetOldestXmin(NULL, true);
+	newFrozenXid = GetOldestXmin(NULL, true, NULL);
 
 	/*
 	 * Similarly, initialize the MultiXact "min" with the value that would be
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 06ca9e4..80cc482 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1206,7 +1206,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 	 * everything else has been checked.
 	 */
 	if (hot_standby_feedback)
-		xmin = GetOldestXmin(NULL, false);
+		xmin = GetOldestXmin(NULL, false, NULL);
 	else
 		xmin = InvalidTransactionId;
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index e5d487d..a4e3549 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1298,17 +1298,22 @@ TransactionIdIsActive(TransactionId xid)
  * process can set its xmin based on transactions that are no longer running
  * in the master but are still being replayed on the standby, thus possibly
  * making the GetOldestXmin reading go backwards.  In this case there is a
- * possibility that we lose data that the standby would like to have, but
- * there is little we can do about that --- data is only protected if the
- * walsender runs continuously while queries are executed on the standby.
- * (The Hot Standby code deals with such cases by failing standby queries
- * that needed to access already-removed data, so there's no integrity bug.)
+ * possibility that we lose data that the standby would like to have
+ * unless the standby uses a replication slot to make its xmin persistent
+ * even when it isn't connected. The Hot Standby code deals with such cases by
+ * failing standby queries that needed to access already-removed data, so
+ * there's no integrity bug.
+ *
  * The return value is also adjusted with vacuum_defer_cleanup_age, so
  * increasing that setting on the fly is another easy way to make
  * GetOldestXmin() move backwards, with no consequences for data integrity.
+ *
+ * The caller may request that replication slots' catalog_xmin values be
+ * disregarded when calculating the global xmin. The caller must account
+ * for catalog_xmin separately.
  */
 TransactionId
-GetOldestXmin(Relation rel, bool ignoreVacuum)
+GetOldestXmin(Relation rel, bool ignoreVacuum, TransactionId *catalog_xmin)
 {
 	ProcArrayStruct *arrayP = procArray;
 	TransactionId result;
@@ -1433,17 +1438,29 @@ GetOldestXmin(Relation rel, bool ignoreVacuum)
 		NormalTransactionIdPrecedes(replication_slot_xmin, result))
 		result = replication_slot_xmin;
 
-	/*
-	 * After locks have been released and defer_cleanup_age has been applied,
-	 * check whether we need to back up further to make logical decoding
-	 * possible. We need to do so if we're computing the global limit (rel =
-	 * NULL) or if the passed relation is a catalog relation of some kind.
-	 */
-	if ((rel == NULL ||
-		 RelationIsAccessibleInLogicalDecoding(rel)) &&
-		TransactionIdIsValid(replication_slot_catalog_xmin) &&
-		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
-		result = replication_slot_catalog_xmin;
+	if (!(rel == NULL || RelationIsAccessibleInLogicalDecoding(rel)))
+		replication_slot_catalog_xmin = InvalidXLogRecPtr;
+
+	if (catalog_xmin != NULL)
+	{
+		/*
+		 * The caller wants any logical decoding specific xmin reported
+		 * separately, so don't merge it with the xmin we'll return.
+		 */
+		*catalog_xmin = replication_slot_catalog_xmin;
+	}
+	else
+	{
+		/*
+		 * After locks have been released and defer_cleanup_age has been applied,
+		 * check whether we need to back up further to make logical decoding
+		 * possible. We need to do so if we're computing the global limit (rel =
+		 * NULL) or if the passed relation is a catalog relation of some kind.
+		 */
+		if (TransactionIdIsValid(replication_slot_catalog_xmin) &&
+			NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
+			result = replication_slot_catalog_xmin;
+	}
 
 	return result;
 }
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index dd37c0c..f7d1d96 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -53,7 +53,7 @@ extern RunningTransactions GetRunningTransactionData(void);
 
 extern bool TransactionIdIsInProgress(TransactionId xid);
 extern bool TransactionIdIsActive(TransactionId xid);
-extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
+extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum, TransactionId *catalog_xmin);
 extern TransactionId GetOldestActiveTransactionId(void);
 extern TransactionId GetOldestSafeDecodingTransactionId(void);
 
-- 
2.5.5

0003-Make-walsender-respect-catalog_xmin-in-hot-standby-f.patchtext/x-patch; charset=US-ASCII; name=0003-Make-walsender-respect-catalog_xmin-in-hot-standby-f.patchDownload
From 2b2dea6589916a04b8e40fd7ba6d3ccc8f6e8745 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 5 Sep 2016 15:38:40 +0800
Subject: [PATCH 3/7] Make walsender respect catalog_xmin in hot standby
 feedback messages

The walsender now respects the new catalog_xmin field in the hot standby
feedback message. It uses it to set the catalog_xmin field on its physical
replication slot if one is in use. Otherwise it sets its process xmin to the
older of the xmin and catalog_xmin, so the outcome is the same as before
the protocol change.

In the process, factor out walsender's sanity check for xid+epoch wraparound
into a separate TransactionIdInRecentPast() function since we're now checking
it in two places.
---
 src/backend/replication/walsender.c | 111 +++++++++++++++++++++++++++---------
 1 file changed, 84 insertions(+), 27 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index aca3ca1..ba3b471 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -215,6 +215,7 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
@@ -1533,6 +1534,11 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	 * be energy wasted - the worst lost information can do here is give us
 	 * wrong information in a statistics view - we'll just potentially be more
 	 * conservative in removing files.
+	 *
+	 * We don't have to do any effective_xmin / effective_catalog_xmin testing
+	 * here either, like for LogicalConfirmReceivedLocation. If we received
+	 * the xmin and catalog_xmin from downstream replication slots we know they
+	 * were already confirmed there,
 	 */
 }
 
@@ -1595,7 +1601,7 @@ ProcessStandbyReplyMessage(void)
 
 /* compute new replication slot xmin horizon if needed */
 static void
-PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 {
 	bool		changed = false;
 	ReplicationSlot *slot = MyReplicationSlot;
@@ -1616,6 +1622,22 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 		slot->data.xmin = feedbackXmin;
 		slot->effective_xmin = feedbackXmin;
 	}
+	/*
+	 * If the physical slot is relaying catalog_xmin for logical replication
+	 * slots on the replica it's safe to act on catalog_xmin advances
+	 * immediately too. The replica will only send a new catalog_xmin via
+	 * feedback when it advances its effective_catalog_xmin, so it's done the
+	 * delay-until-confirmed dance for us and knows it won't need the data
+	 * we're protecting from vacuum again.
+	 */
+	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
+		!TransactionIdIsNormal(feedbackCatalogXmin) ||
+		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
+	{
+		changed = true;
+		slot->data.catalog_xmin = feedbackCatalogXmin;
+		slot->effective_catalog_xmin = feedbackCatalogXmin;
+	}
 	SpinLockRelease(&slot->mutex);
 
 	if (changed)
@@ -1626,13 +1648,46 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 }
 
 /*
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
+ *
+ * This check doesn't care about whether clog exists for these xids
+ * at all.
+ */
+static bool
+TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
+{
+	TransactionId nextXid;
+	uint32		nextEpoch;
+
+	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+	if (xid <= nextXid)
+	{
+		if (epoch != nextEpoch)
+			return false;
+	}
+	else
+	{
+		if (epoch + 1 != nextEpoch)
+			return false;
+	}
+
+	if (!TransactionIdPrecedesOrEquals(xid, nextXid))
+		return false;				/* epoch OK, but it's wrapped around */
+
+	return true;
+}
+
+/*
  * Hot Standby feedback
  */
 static void
 ProcessStandbyHSFeedbackMessage(void)
 {
-	TransactionId nextXid;
-	uint32		nextEpoch;
 	TransactionId feedbackXmin;
 	uint32		feedbackEpoch;
 	TransactionId feedbackCatalogXmin;
@@ -1640,7 +1695,8 @@ ProcessStandbyHSFeedbackMessage(void)
 
 	/*
 	 * Decipher the reply message. The caller already consumed the msgtype
-	 * byte.
+	 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
+	 * of this message.
 	 */
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
 	feedbackXmin = pq_getmsgint(&reply_message, 4);
@@ -1654,37 +1710,30 @@ ProcessStandbyHSFeedbackMessage(void)
 		 feedbackCatalogXmin,
 		 feedbackCatalogEpoch);
 
-	/* Unset WalSender's xmin if the feedback message value is invalid */
-	if (!TransactionIdIsNormal(feedbackXmin))
+	/*
+	 * Unset WalSender's xmins if the feedback message values are invalid.
+	 * This happens when the downstream turned hot_standby_feedback off.
+	 */
+	if (!TransactionIdIsNormal(feedbackXmin)
+		&& !TransactionIdIsNormal(feedbackCatalogXmin))
 	{
 		MyPgXact->xmin = InvalidTransactionId;
 		if (MyReplicationSlot != NULL)
-			PhysicalReplicationSlotNewXmin(feedbackXmin);
+			PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 		return;
 	}
 
 	/*
 	 * Check that the provided xmin/epoch are sane, that is, not in the future
 	 * and not so far back as to be already wrapped around.  Ignore if not.
-	 *
-	 * Epoch of nextXid should be same as standby, or if the counter has
-	 * wrapped, then one greater than standby.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
-
-	if (feedbackXmin <= nextXid)
-	{
-		if (feedbackEpoch != nextEpoch)
-			return;
-	}
-	else
-	{
-		if (feedbackEpoch + 1 != nextEpoch)
-			return;
-	}
+	if (TransactionIdIsNormal(feedbackXmin) &&
+		!TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
+		return;
 
-	if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
-		return;					/* epoch OK, but it's wrapped around */
+	if (TransactionIdIsNormal(feedbackCatalogXmin) &&
+		!TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
+		return;
 
 	/*
 	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
@@ -1709,15 +1758,23 @@ ProcessStandbyHSFeedbackMessage(void)
 	 * already since a VACUUM could have just finished calling GetOldestXmin.)
 	 *
 	 * If we're using a replication slot we reserve the xmin via that,
-	 * otherwise via the walsender's PGXACT entry.
+	 * otherwise via the walsender's PGXACT entry. We can only track the
+	 * catalog xmin separately when using a slot, so we store the least
+	 * of the two provided when not using a slot.
 	 *
 	 * XXX: It might make sense to generalize the ephemeral slot concept and
 	 * always use the slot mechanism to handle the feedback xmin.
 	 */
 	if (MyReplicationSlot != NULL)		/* XXX: persistency configurable? */
-		PhysicalReplicationSlotNewXmin(feedbackXmin);
+		PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 	else
-		MyPgXact->xmin = feedbackXmin;
+	{
+		if (TransactionIdIsNormal(feedbackCatalogXmin)
+			&& TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
+			MyPgXact->xmin = feedbackCatalogXmin;
+		else
+			MyPgXact->xmin = feedbackXmin;
+	}
 }
 
 /*
-- 
2.5.5

0002-Send-catalog_xmin-in-hot-standby-feedback-protocol.patchtext/x-patch; charset=US-ASCII; name=0002-Send-catalog_xmin-in-hot-standby-feedback-protocol.patchDownload
From dcd32f60383666715ea8476a708c461fe2a056f6 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 5 Sep 2016 15:30:53 +0800
Subject: [PATCH 2/7] Send catalog_xmin in hot standby feedback protocol

Add catalog_xmin to the to the hot standby feedback protocol so a read replica
that has logical slots can use its physical slot to the master to hold down the
master's catalog_xmin. This information will let a replica prevent vacuuming of
catalog tuples still required by the replica's logical slots.

This is the hot standby feedback protocol change, the new value is always set
to zero by the walreceiver and is ignored by the walsender.
---
 doc/src/sgml/protocol.sgml            | 33 ++++++++++++++++++++++++++++-----
 src/backend/replication/walreceiver.c | 21 ++++++++++++++-------
 src/backend/replication/walsender.c   | 10 ++++++++--
 3 files changed, 50 insertions(+), 14 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 50cf527..e0fd9aa 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1783,10 +1783,11 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current xmin. This may be 0, if the standby is
-          sending notification that Hot Standby feedback will no longer
-          be sent on this connection. Later non-zero messages may
-          reinitiate the feedback mechanism.
+          The standby's current global xmin, excluding the catalog_xmin from any
+          replication slots. If both this value and the following
+          catalog_xmin are 0 this is treated as a notification that Hot Standby
+          feedback will no longer be sent on this connection. Later non-zero
+          messages may reinitiate the feedback mechanism.
       </para>
       </listitem>
       </varlistentry>
@@ -1796,7 +1797,29 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current epoch.
+          The epoch of the global xmin xid on the standby.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The lowest catalog_xmin of any replication slots on the standby. Set to 0
+          if no catalog_xmin exists on the standby or if hot standby feedback is being
+          disabled. New in 10.0.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The epoch of the catalog_xmin xid on the standby. New in 10.0.
       </para>
       </listitem>
       </varlistentry>
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 2bb3dce..06ca9e4 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1164,8 +1164,8 @@ XLogWalRcvSendHSFeedback(bool immed)
 {
 	TimestampTz now;
 	TransactionId nextXid;
-	uint32		nextEpoch;
-	TransactionId xmin;
+	uint32		xmin_epoch, catalog_xmin_epoch;
+	TransactionId xmin, catalog_xmin;
 	static TimestampTz sendTime = 0;
 	static bool master_has_standby_xmin = false;
 
@@ -1210,23 +1210,30 @@ XLogWalRcvSendHSFeedback(bool immed)
 	else
 		xmin = InvalidTransactionId;
 
+	catalog_xmin = InvalidTransactionId;
+
 	/*
 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
 	 * the epoch boundary.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+	GetNextXidAndEpoch(&nextXid, &xmin_epoch);
+	catalog_xmin_epoch = xmin_epoch;
 	if (nextXid < xmin)
-		nextEpoch--;
+		xmin_epoch --;
+	if (nextXid < catalog_xmin)
+		catalog_xmin_epoch --;
 
-	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
-		 xmin, nextEpoch);
+	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
+		 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
 
 	/* Construct the message and send it. */
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, 'h');
 	pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
 	pq_sendint(&reply_message, xmin, 4);
-	pq_sendint(&reply_message, nextEpoch, 4);
+	pq_sendint(&reply_message, xmin_epoch, 4);
+	pq_sendint(&reply_message, catalog_xmin, 4);
+	pq_sendint(&reply_message, catalog_xmin_epoch, 4);
 	walrcv_send(reply_message.data, reply_message.len);
 	if (TransactionIdIsValid(xmin))
 		master_has_standby_xmin = true;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc5e508..aca3ca1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1635,6 +1635,8 @@ ProcessStandbyHSFeedbackMessage(void)
 	uint32		nextEpoch;
 	TransactionId feedbackXmin;
 	uint32		feedbackEpoch;
+	TransactionId feedbackCatalogXmin;
+	uint32		feedbackCatalogEpoch;
 
 	/*
 	 * Decipher the reply message. The caller already consumed the msgtype
@@ -1643,10 +1645,14 @@ ProcessStandbyHSFeedbackMessage(void)
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
 	feedbackXmin = pq_getmsgint(&reply_message, 4);
 	feedbackEpoch = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
 
-	elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+	elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
 		 feedbackXmin,
-		 feedbackEpoch);
+		 feedbackEpoch,
+		 feedbackCatalogXmin,
+		 feedbackCatalogEpoch);
 
 	/* Unset WalSender's xmin if the feedback message value is invalid */
 	if (!TransactionIdIsNormal(feedbackXmin))
-- 
2.5.5

0001-Expand-streaming-replication-tests-to-cover-hot-stan.patchtext/x-patch; charset=US-ASCII; name=0001-Expand-streaming-replication-tests-to-cover-hot-stan.patchDownload
From e0cbf6ff72b14a11e5ab48b442f6b74d0aa9ff4c Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Wed, 9 Nov 2016 13:44:04 +0800
Subject: [PATCH 1/7] Expand streaming replication tests to cover hot standby
 feedback and physical replication slots

---
 src/test/recovery/t/001_stream_rep.pl | 112 ++++++++++++++++++++++++++++++----
 1 file changed, 101 insertions(+), 11 deletions(-)

diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index 981c00b..89d0e90 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 4;
+use Test::More tests => 18;
 
 # Initialize master node
 my $node_master = get_new_node('master');
@@ -40,16 +40,21 @@ $node_master->safe_psql('postgres',
 	"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
 
 # Wait for standbys to catch up
-my $applname_1 = $node_standby_1->name;
-my $applname_2 = $node_standby_2->name;
-my $caughtup_query =
-"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_1';";
-$node_master->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby 1 to catch up";
-$caughtup_query =
-"SELECT pg_last_xlog_replay_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_2';";
-$node_standby_1->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby 2 to catch up";
+sub wait_for_catchup
+{
+	my $applname_1 = $node_standby_1->name;
+	my $applname_2 = $node_standby_2->name;
+	my $caughtup_query =
+	"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_1';";
+	$node_master->poll_query_until('postgres', $caughtup_query)
+	  or die "Timed out while waiting for standby 1 to catch up";
+	$caughtup_query =
+	"SELECT pg_last_xlog_replay_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_2';";
+	$node_standby_1->poll_query_until('postgres', $caughtup_query)
+	  or die "Timed out while waiting for standby 2 to catch up";
+}
+
+wait_for_catchup();
 
 my $result =
   $node_standby_1->safe_psql('postgres', "SELECT count(*) FROM tab_int");
@@ -66,3 +71,88 @@ is($node_standby_1->psql('postgres', 'INSERT INTO tab_int VALUES (1)'),
 	3, 'read-only queries on standby 1');
 is($node_standby_2->psql('postgres', 'INSERT INTO tab_int VALUES (1)'),
 	3, 'read-only queries on standby 2');
+
+diag "switching to physical replication slot";
+# Switch to using a physical replication slot. We can do this without a new
+# backup since physical slots can go backwards if needed. Do so on both
+# standbys. Since we're going to be testing things that affect the slot state,
+# also increase the standby feedback interval to ensure timely updates.
+my ($slotname_1, $slotname_2) = ('standby_1', 'standby_2');
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 4\n");
+$node_master->restart;
+is($node_master->psql('postgres', qq[SELECT pg_create_physical_replication_slot('$slotname_1');]), 0, 'physical slot created on master');
+$node_standby_1->append_conf('recovery.conf', "primary_slot_name = $slotname_1\n");
+$node_standby_1->append_conf('postgresql.conf', "wal_receiver_status_interval = 1\n");
+$node_standby_1->append_conf('postgresql.conf', "max_replication_slots = 4\n");
+$node_standby_1->restart;
+is($node_standby_1->psql('postgres', qq[SELECT pg_create_physical_replication_slot('$slotname_2');]), 0, 'physical slot created on intermediate replica');
+$node_standby_2->append_conf('recovery.conf', "primary_slot_name = $slotname_2\n");
+$node_standby_2->append_conf('postgresql.conf', "wal_receiver_status_interval = 1\n");
+$node_standby_2->restart;
+
+sub get_slot_xmins
+{
+	my ($node, $slotname) = @_;
+	my ($xmin, $catalog_xmin) = split(qr/\|/, $node->safe_psql('postgres', qq[SELECT xmin, catalog_xmin FROM pg_replication_slots WHERE slot_name = '$slotname';]));
+	return (defined($xmin) ? $xmin : '', defined($catalog_xmin) ? $catalog_xmin : '');
+}
+
+# There's no hot standby feedback and there are no logical slots on either peer
+# so xmin and catalog_xmin should be null on both slots.
+my ($xmin, $catalog_xmin) = get_slot_xmins($node_master, $slotname_1);
+is($xmin, '', 'non-cascaded slot xmin null with no hs_feedback');
+is($catalog_xmin, '', 'non-cascaded slot xmin null with no hs_feedback');
+
+($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2);
+is($xmin, '', 'cascaded slot xmin null with no hs_feedback');
+is($catalog_xmin, '', 'cascaded slot xmin null with no hs_feedback');
+
+# Replication still works?
+$node_master->safe_psql('postgres', 'CREATE TABLE replayed(val integer);');
+
+sub replay_check
+{
+	my $newval = $node_master->safe_psql('postgres', 'INSERT INTO replayed(val) SELECT coalesce(max(val),0) + 1 AS newval FROM replayed RETURNING val');
+	wait_for_catchup();
+	$node_standby_1->safe_psql('postgres', qq[SELECT 1 FROM replayed WHERE val = $newval])
+		or die "standby_1 didn't replay master value $newval";
+	$node_standby_2->safe_psql('postgres', qq[SELECT 1 FROM replayed WHERE val = $newval])
+		or die "standby_2 didn't replay standby_1 value $newval";
+}
+
+replay_check();
+
+diag "enabling hot_standby_feedback";
+# Enable hs_feedback. The slot should gain an xmin. We set the status interval
+# so we'll see the results promptly.
+$node_standby_1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on;');
+$node_standby_1->reload;
+$node_standby_2->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on;');
+$node_standby_2->reload;
+replay_check();
+sleep(2);
+
+($xmin, $catalog_xmin) = get_slot_xmins($node_master, $slotname_1);
+isnt($xmin, '', 'non-cascaded slot xmin non-null with hs feedback');
+is($catalog_xmin, '', 'non-cascaded slot xmin still null with hs_feedback');
+
+($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2);
+isnt($xmin, '', 'cascaded slot xmin non-null with hs feedback');
+is($catalog_xmin, '', 'cascaded slot xmin still null with hs_feedback');
+
+diag "disabling hot_standby_feedback";
+# Disable hs_feedback. Xmin should be cleared.
+$node_standby_1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;');
+$node_standby_1->reload;
+$node_standby_2->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;');
+$node_standby_2->reload;
+replay_check();
+sleep(2);
+
+($xmin, $catalog_xmin) = get_slot_xmins($node_master, $slotname_1);
+is($xmin, '', 'non-cascaded slot xmin null with hs feedback reset');
+is($catalog_xmin, '', 'non-cascaded slot xmin still null with hs_feedback reset');
+
+($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2);
+is($xmin, '', 'cascaded slot xmin null with hs feedback reset');
+is($catalog_xmin, '', 'cascaded slot xmin still null with hs_feedback reset');
-- 
2.5.5