From 38eab580fb651bf01c0d8e06d534499be270b44c Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 5 Sep 2016 15:30:53 +0800
Subject: [PATCH 07/21] Send catalog_xmin in hot standby feedback protocol

Add catalog_xmin to the to the hot standby feedback protocol so a read replica
that has logical slots can use its physical slot to the master to hold down the
master's catalog_xmin. This information will let a replica prevent vacuuming of
catalog tuples still required by the replica's logical slots.

This is the hot standby feedback protocol change, the new value is always set
to zero by the walreceiver and is ignored by the walsender.
---
 doc/src/sgml/protocol.sgml            | 33 ++++++++++++++++++++++++++++-----
 src/backend/replication/walreceiver.c | 21 ++++++++++++++-------
 src/backend/replication/walsender.c   | 10 ++++++++--
 3 files changed, 50 insertions(+), 14 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 50cf527..e0fd9aa 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1783,10 +1783,11 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current xmin. This may be 0, if the standby is
-          sending notification that Hot Standby feedback will no longer
-          be sent on this connection. Later non-zero messages may
-          reinitiate the feedback mechanism.
+          The standby's current global xmin, excluding the catalog_xmin from any
+          replication slots. If both this value and the following
+          catalog_xmin are 0 this is treated as a notification that Hot Standby
+          feedback will no longer be sent on this connection. Later non-zero
+          messages may reinitiate the feedback mechanism.
       </para>
       </listitem>
       </varlistentry>
@@ -1796,7 +1797,29 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current epoch.
+          The epoch of the global xmin xid on the standby.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The lowest catalog_xmin of any replication slots on the standby. Set to 0
+          if no catalog_xmin exists on the standby or if hot standby feedback is being
+          disabled. New in 10.0.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The epoch of the catalog_xmin xid on the standby. New in 10.0.
       </para>
       </listitem>
       </varlistentry>
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 2bb3dce..06ca9e4 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1164,8 +1164,8 @@ XLogWalRcvSendHSFeedback(bool immed)
 {
 	TimestampTz now;
 	TransactionId nextXid;
-	uint32		nextEpoch;
-	TransactionId xmin;
+	uint32		xmin_epoch, catalog_xmin_epoch;
+	TransactionId xmin, catalog_xmin;
 	static TimestampTz sendTime = 0;
 	static bool master_has_standby_xmin = false;
 
@@ -1210,23 +1210,30 @@ XLogWalRcvSendHSFeedback(bool immed)
 	else
 		xmin = InvalidTransactionId;
 
+	catalog_xmin = InvalidTransactionId;
+
 	/*
 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
 	 * the epoch boundary.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+	GetNextXidAndEpoch(&nextXid, &xmin_epoch);
+	catalog_xmin_epoch = xmin_epoch;
 	if (nextXid < xmin)
-		nextEpoch--;
+		xmin_epoch --;
+	if (nextXid < catalog_xmin)
+		catalog_xmin_epoch --;
 
-	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
-		 xmin, nextEpoch);
+	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
+		 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
 
 	/* Construct the message and send it. */
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, 'h');
 	pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
 	pq_sendint(&reply_message, xmin, 4);
-	pq_sendint(&reply_message, nextEpoch, 4);
+	pq_sendint(&reply_message, xmin_epoch, 4);
+	pq_sendint(&reply_message, catalog_xmin, 4);
+	pq_sendint(&reply_message, catalog_xmin_epoch, 4);
 	walrcv_send(reply_message.data, reply_message.len);
 	if (TransactionIdIsValid(xmin))
 		master_has_standby_xmin = true;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ef8ba80..cd749cd 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1638,6 +1638,8 @@ ProcessStandbyHSFeedbackMessage(void)
 	uint32		nextEpoch;
 	TransactionId feedbackXmin;
 	uint32		feedbackEpoch;
+	TransactionId feedbackCatalogXmin;
+	uint32		feedbackCatalogEpoch;
 
 	/*
 	 * Decipher the reply message. The caller already consumed the msgtype
@@ -1646,10 +1648,14 @@ ProcessStandbyHSFeedbackMessage(void)
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
 	feedbackXmin = pq_getmsgint(&reply_message, 4);
 	feedbackEpoch = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
 
-	elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+	elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
 		 feedbackXmin,
-		 feedbackEpoch);
+		 feedbackEpoch,
+		 feedbackCatalogXmin,
+		 feedbackCatalogEpoch);
 
 	/* Unset WalSender's xmin if the feedback message value is invalid */
 	if (!TransactionIdIsNormal(feedbackXmin))
-- 
2.5.5

