From 46b046b2c794cce45bec007f8af69635380d48ce Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 5 Sep 2016 15:38:40 +0800
Subject: [PATCH 08/21] Make walsender respect catalog_xmin in hot standby
 feedback messages

The walsender now respects the new catalog_xmin field in the hot standby
feedback message. It uses it to set the catalog_xmin field on its physical
replication slot if one is in use. Otherwise it sets its process xmin to the
older of the xmin and catalog_xmin, so the outcome is the same as before
the protocol change.

In the process, factor out walsender's sanity check for xid+epoch wraparound
into a separate TransactionIdInRecentPast() function since we're now checking
it in two places.
---
 src/backend/replication/walsender.c | 111 +++++++++++++++++++++++++++---------
 1 file changed, 84 insertions(+), 27 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index cd749cd..ac8c2c3 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -216,6 +216,7 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
@@ -1536,6 +1537,11 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	 * be energy wasted - the worst lost information can do here is give us
 	 * wrong information in a statistics view - we'll just potentially be more
 	 * conservative in removing files.
+	 *
+	 * We don't have to do any effective_xmin / effective_catalog_xmin testing
+	 * here either, like for LogicalConfirmReceivedLocation. If we received
+	 * the xmin and catalog_xmin from downstream replication slots we know they
+	 * were already confirmed there,
 	 */
 }
 
@@ -1598,7 +1604,7 @@ ProcessStandbyReplyMessage(void)
 
 /* compute new replication slot xmin horizon if needed */
 static void
-PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 {
 	bool		changed = false;
 	ReplicationSlot *slot = MyReplicationSlot;
@@ -1619,6 +1625,22 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 		slot->data.xmin = feedbackXmin;
 		slot->effective_xmin = feedbackXmin;
 	}
+	/*
+	 * If the physical slot is relaying catalog_xmin for logical replication
+	 * slots on the replica it's safe to act on catalog_xmin advances
+	 * immediately too. The replica will only send a new catalog_xmin via
+	 * feedback when it advances its effective_catalog_xmin, so it's done the
+	 * delay-until-confirmed dance for us and knows it won't need the data
+	 * we're protecting from vacuum again.
+	 */
+	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
+		!TransactionIdIsNormal(feedbackCatalogXmin) ||
+		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
+	{
+		changed = true;
+		slot->data.catalog_xmin = feedbackCatalogXmin;
+		slot->effective_catalog_xmin = feedbackCatalogXmin;
+	}
 	SpinLockRelease(&slot->mutex);
 
 	if (changed)
@@ -1629,13 +1651,46 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 }
 
 /*
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
+ *
+ * This check doesn't care about whether clog exists for these xids
+ * at all.
+ */
+static bool
+TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
+{
+	TransactionId nextXid;
+	uint32		nextEpoch;
+
+	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+	if (xid <= nextXid)
+	{
+		if (epoch != nextEpoch)
+			return false;
+	}
+	else
+	{
+		if (epoch + 1 != nextEpoch)
+			return false;
+	}
+
+	if (!TransactionIdPrecedesOrEquals(xid, nextXid))
+		return false;				/* epoch OK, but it's wrapped around */
+
+	return true;
+}
+
+/*
  * Hot Standby feedback
  */
 static void
 ProcessStandbyHSFeedbackMessage(void)
 {
-	TransactionId nextXid;
-	uint32		nextEpoch;
 	TransactionId feedbackXmin;
 	uint32		feedbackEpoch;
 	TransactionId feedbackCatalogXmin;
@@ -1643,7 +1698,8 @@ ProcessStandbyHSFeedbackMessage(void)
 
 	/*
 	 * Decipher the reply message. The caller already consumed the msgtype
-	 * byte.
+	 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
+	 * of this message.
 	 */
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
 	feedbackXmin = pq_getmsgint(&reply_message, 4);
@@ -1657,37 +1713,30 @@ ProcessStandbyHSFeedbackMessage(void)
 		 feedbackCatalogXmin,
 		 feedbackCatalogEpoch);
 
-	/* Unset WalSender's xmin if the feedback message value is invalid */
-	if (!TransactionIdIsNormal(feedbackXmin))
+	/*
+	 * Unset WalSender's xmins if the feedback message values are invalid.
+	 * This happens when the downstream turned hot_standby_feedback off.
+	 */
+	if (!TransactionIdIsNormal(feedbackXmin)
+		&& !TransactionIdIsNormal(feedbackCatalogXmin))
 	{
 		MyPgXact->xmin = InvalidTransactionId;
 		if (MyReplicationSlot != NULL)
-			PhysicalReplicationSlotNewXmin(feedbackXmin);
+			PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 		return;
 	}
 
 	/*
 	 * Check that the provided xmin/epoch are sane, that is, not in the future
 	 * and not so far back as to be already wrapped around.  Ignore if not.
-	 *
-	 * Epoch of nextXid should be same as standby, or if the counter has
-	 * wrapped, then one greater than standby.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
-
-	if (feedbackXmin <= nextXid)
-	{
-		if (feedbackEpoch != nextEpoch)
-			return;
-	}
-	else
-	{
-		if (feedbackEpoch + 1 != nextEpoch)
-			return;
-	}
+	if (TransactionIdIsNormal(feedbackXmin) &&
+		!TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
+		return;
 
-	if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
-		return;					/* epoch OK, but it's wrapped around */
+	if (TransactionIdIsNormal(feedbackCatalogXmin) &&
+		!TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
+		return;
 
 	/*
 	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
@@ -1712,15 +1761,23 @@ ProcessStandbyHSFeedbackMessage(void)
 	 * already since a VACUUM could have just finished calling GetOldestXmin.)
 	 *
 	 * If we're using a replication slot we reserve the xmin via that,
-	 * otherwise via the walsender's PGXACT entry.
+	 * otherwise via the walsender's PGXACT entry. We can only track the
+	 * catalog xmin separately when using a slot, so we store the least
+	 * of the two provided when not using a slot.
 	 *
 	 * XXX: It might make sense to generalize the ephemeral slot concept and
 	 * always use the slot mechanism to handle the feedback xmin.
 	 */
 	if (MyReplicationSlot != NULL)		/* XXX: persistency configurable? */
-		PhysicalReplicationSlotNewXmin(feedbackXmin);
+		PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 	else
-		MyPgXact->xmin = feedbackXmin;
+	{
+		if (TransactionIdIsNormal(feedbackCatalogXmin)
+			&& TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
+			MyPgXact->xmin = feedbackCatalogXmin;
+		else
+			MyPgXact->xmin = feedbackXmin;
+	}
 }
 
 /*
-- 
2.5.5

