From 99dae75e451c8895b6603b693a9db47f877307a2 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Thu, 6 Apr 2023 23:34:22 -0700
Subject: [PATCH va65 7/9] Allow logical decoding on standby.

Allow a logical slot to be created on standby. Restrict its usage
or its creation if wal_level on primary is less than logical.
During slot creation, it's restart_lsn is set to the last replayed
LSN. Effectively, a logical slot creation on standby waits for an
xl_running_xact record to arrive from primary.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello
---
 src/include/access/xlog.h                 |  1 +
 src/backend/access/transam/xlog.c         | 11 +++++
 src/backend/replication/logical/decode.c  | 30 +++++++++++-
 src/backend/replication/logical/logical.c | 36 ++++++++------
 src/backend/replication/slot.c            | 59 ++++++++++++-----------
 src/backend/replication/walsender.c       | 48 ++++++++++++------
 6 files changed, 124 insertions(+), 61 deletions(-)

diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index cfe5409738c..48ca8523810 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -230,6 +230,7 @@ extern void XLOGShmemInit(void);
 extern void BootStrapXLOG(void);
 extern void InitializeWalConsistencyChecking(void);
 extern void LocalProcessControlFile(bool reset);
+extern WalLevel GetActiveWalLevelOnStandby(void);
 extern void StartupXLOG(void);
 extern void ShutdownXLOG(int code, Datum arg);
 extern void CreateCheckPoint(int flags);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index d30575d8741..a4153518fdb 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4466,6 +4466,17 @@ LocalProcessControlFile(bool reset)
 	ReadControlFile();
 }
 
+/*
+ * Get the wal_level from the control file. For a standby, this value should be
+ * considered as its active wal_level, because it may be different from what
+ * was originally configured on standby.
+ */
+WalLevel
+GetActiveWalLevelOnStandby(void)
+{
+	return ControlFile->wal_level;
+}
+
 /*
  * Initialization of shared memory for XLOG
  */
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8fe7bb65f1f..8352dbf5df6 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -152,11 +152,39 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 * can restart from there.
 			 */
 			break;
+		case XLOG_PARAMETER_CHANGE:
+			{
+				xl_parameter_change *xlrec =
+				(xl_parameter_change *) XLogRecGetData(buf->record);
+
+				/*
+				 * If wal_level on primary is reduced to less than logical,
+				 * then we want to prevent existing logical slots from being
+				 * used. Existing logical slots on standby get invalidated
+				 * when this WAL record is replayed; and further, slot
+				 * creation fails when the wal level is not sufficient; but
+				 * all these operations are not synchronized, so a logical
+				 * slot may creep in while the wal_level is being reduced.
+				 * Hence this extra check.
+				 */
+				if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+				{
+					/*
+					 * This can occur only on a standby, as a primary would
+					 * not allow to restart after changing wal_level < logical
+					 * if there is pre-existing logical slot.
+					 */
+					Assert(RecoveryInProgress());
+					ereport(ERROR,
+							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							 errmsg("logical decoding on standby requires wal_level to be at least logical on the primary server")));
+				}
+				break;
+			}
 		case XLOG_NOOP:
 		case XLOG_NEXTOID:
 		case XLOG_SWITCH:
 		case XLOG_BACKUP_END:
-		case XLOG_PARAMETER_CHANGE:
 		case XLOG_RESTORE_POINT:
 		case XLOG_FPW_CHANGE:
 		case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 27addd58f66..659b08cd456 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("logical decoding requires a database connection")));
 
-	/* ----
-	 * TODO: We got to change that someday soon...
-	 *
-	 * There's basically three things missing to allow this:
-	 * 1) We need to be able to correctly and quickly identify the timeline a
-	 *	  LSN belongs to
-	 * 2) We need to force hot_standby_feedback to be enabled at all times so
-	 *	  the primary cannot remove rows we need.
-	 * 3) support dropping replication slots referring to a database, in
-	 *	  dbase_redo. There can't be any active ones due to HS recovery
-	 *	  conflicts, so that should be relatively easy.
-	 * ----
-	 */
 	if (RecoveryInProgress())
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("logical decoding cannot be used while in recovery")));
+	{
+		/*
+		 * This check may have race conditions, but whenever
+		 * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
+		 * verify that there are no existing logical replication slots. And to
+		 * avoid races around creating a new slot,
+		 * CheckLogicalDecodingRequirements() is called once before creating
+		 * the slot, and once when logical decoding is initially starting up.
+		 */
+		if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("logical decoding on standby requires wal_level to be at least logical on the primary server")));
+	}
 }
 
 /*
@@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin,
 	LogicalDecodingContext *ctx;
 	MemoryContext old_context;
 
+	/*
+	 * On standby, this check is also required while creating the slot. Check
+	 * the comments in this function.
+	 */
+	CheckLogicalDecodingRequirements();
+
 	/* shorter lines... */
 	slot = MyReplicationSlot;
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 3e8c1b44e85..411f6c01491 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -41,6 +41,7 @@
 
 #include "access/transam.h"
 #include "access/xlog_internal.h"
+#include "access/xlogrecovery.h"
 #include "common/file_utils.h"
 #include "common/string.h"
 #include "miscadmin.h"
@@ -1192,37 +1193,28 @@ ReplicationSlotReserveWal(void)
 		/*
 		 * For logical slots log a standby snapshot and start logical decoding
 		 * at exactly that position. That allows the slot to start up more
-		 * quickly.
+		 * quickly. But on a standby we cannot do WAL writes, so just use the
+		 * replay pointer; effectively, an attempt to create a logical slot on
+		 * standby will cause it to wait for an xl_running_xact record to be
+		 * logged independently on the primary, so that a snapshot can be
+		 * built using the record.
 		 *
-		 * That's not needed (or indeed helpful) for physical slots as they'll
-		 * start replay at the last logged checkpoint anyway. Instead return
-		 * the location of the last redo LSN. While that slightly increases
-		 * the chance that we have to retry, it's where a base backup has to
-		 * start replay at.
+		 * None of this is needed (or indeed helpful) for physical slots as
+		 * they'll start replay at the last logged checkpoint anyway. Instead
+		 * return the location of the last redo LSN. While that slightly
+		 * increases the chance that we have to retry, it's where a base
+		 * backup has to start replay at.
 		 */
-		if (!RecoveryInProgress() && SlotIsLogical(slot))
-		{
-			XLogRecPtr	flushptr;
-
-			/* start at current insert position */
-			restart_lsn = GetXLogInsertRecPtr();
-			SpinLockAcquire(&slot->mutex);
-			slot->data.restart_lsn = restart_lsn;
-			SpinLockRelease(&slot->mutex);
-
-			/* make sure we have enough information to start */
-			flushptr = LogStandbySnapshot();
-
-			/* and make sure it's fsynced to disk */
-			XLogFlush(flushptr);
-		}
-		else
-		{
+		if (SlotIsPhysical(slot))
 			restart_lsn = GetRedoRecPtr();
-			SpinLockAcquire(&slot->mutex);
-			slot->data.restart_lsn = restart_lsn;
-			SpinLockRelease(&slot->mutex);
-		}
+		else if (RecoveryInProgress())
+			restart_lsn = GetXLogReplayRecPtr(NULL);
+		else
+			restart_lsn = GetXLogInsertRecPtr();
+
+		SpinLockAcquire(&slot->mutex);
+		slot->data.restart_lsn = restart_lsn;
+		SpinLockRelease(&slot->mutex);
 
 		/* prevent WAL removal as fast as possible */
 		ReplicationSlotsComputeRequiredLSN();
@@ -1238,6 +1230,17 @@ ReplicationSlotReserveWal(void)
 		if (XLogGetLastRemovedSegno() < segno)
 			break;
 	}
+
+	if (!RecoveryInProgress() && SlotIsLogical(slot))
+	{
+		XLogRecPtr	flushptr;
+
+		/* make sure we have enough information to start */
+		flushptr = LogStandbySnapshot();
+
+		/* and make sure it's fsynced to disk */
+		XLogFlush(flushptr);
+	}
 }
 
 /*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 66493b6e896..743e338b1b6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -906,23 +906,34 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	int			count;
 	WALReadError errinfo;
 	XLogSegNo	segno;
-	TimeLineID	currTLI = GetWALInsertionTimeLine();
+	TimeLineID	currTLI;
 
 	/*
-	 * Since logical decoding is only permitted on a primary server, we know
-	 * that the current timeline ID can't be changing any more. If we did this
-	 * on a standby, we'd have to worry about the values we compute here
-	 * becoming invalid due to a promotion or timeline change.
+	 * Make sure we have enough WAL available before retrieving the current
+	 * timeline. This is needed to determine am_cascading_walsender accurately
+	 * which is needed to determine the current timeline.
 	 */
+	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
+
+	/*
+	 * Since logical decoding is also permitted on a standby server, we need
+	 * to check if the server is in recovery to decide how to get the current
+	 * timeline ID (so that it also cover the promotion or timeline change
+	 * cases).
+	 */
+	am_cascading_walsender = RecoveryInProgress();
+
+	if (am_cascading_walsender)
+		GetXLogReplayRecPtr(&currTLI);
+	else
+		currTLI = GetWALInsertionTimeLine();
+
 	XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
 	sendTimeLineIsHistoric = (state->currTLI != currTLI);
 	sendTimeLine = state->currTLI;
 	sendTimeLineValidUpto = state->currTLIValidUntil;
 	sendTimeLineNextTLI = state->nextTLI;
 
-	/* make sure we have enough WAL available */
-	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
-
 	/* fail if not (implies we are going to shut down) */
 	if (flushptr < targetPagePtr + reqLen)
 		return -1;
@@ -937,9 +948,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 				 cur_page,
 				 targetPagePtr,
 				 XLOG_BLCKSZ,
-				 state->seg.ws_tli, /* Pass the current TLI because only
-									 * WalSndSegmentOpen controls whether new
-									 * TLI is needed. */
+				 currTLI,		/* Pass the current TLI because only
+								 * WalSndSegmentOpen controls whether new TLI
+								 * is needed. */
 				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
@@ -3076,10 +3087,14 @@ XLogSendLogical(void)
 	 * If first time through in this session, initialize flushPtr.  Otherwise,
 	 * we only need to update flushPtr if EndRecPtr is past it.
 	 */
-	if (flushPtr == InvalidXLogRecPtr)
-		flushPtr = GetFlushRecPtr(NULL);
-	else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
-		flushPtr = GetFlushRecPtr(NULL);
+	if (flushPtr == InvalidXLogRecPtr ||
+		logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+	{
+		if (am_cascading_walsender)
+			flushPtr = GetStandbyFlushRecPtr(NULL);
+		else
+			flushPtr = GetFlushRecPtr(NULL);
+	}
 
 	/* If EndRecPtr is still past our flushPtr, it means we caught up. */
 	if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3170,7 +3185,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
 	receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 
-	*tli = replayTLI;
+	if (tli)
+		*tli = replayTLI;
 
 	result = replayPtr;
 	if (receiveTLI == replayTLI && receivePtr > replayPtr)
-- 
2.38.0

