From 572acd14c15704ddafe26fc09b7288151e5115c7 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Thu, 6 Apr 2023 23:34:22 -0700
Subject: [PATCH va67 7/9] Allow logical decoding on standby

Allow a logical slot to be created on standby. Restrict its usage or its
creation if wal_level on primary is less than logical.  During slot creation,
it's restart_lsn is set to the last replayed LSN. Effectively, a logical slot
creation on standby waits for an xl_running_xact record to arrive from
primary.

This commit also introduces the pg_log_standby_snapshot() function. The idea
is to be able to take a snapshot of running transactions and write this to WAL
without requesting for a (costly) checkpoint. This allows to make it much
faster to create logical slots on a replica.

Bumps catversion, for the addition of the pg_log_standby_snapshot() function.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello
---
 src/include/access/xlog.h                 |  1 +
 src/include/catalog/pg_proc.dat           |  3 ++
 src/backend/access/transam/xlog.c         | 11 +++++
 src/backend/access/transam/xlogfuncs.c    | 32 ++++++++++++
 src/backend/catalog/system_functions.sql  |  2 +
 src/backend/replication/logical/decode.c  | 30 +++++++++++-
 src/backend/replication/logical/logical.c | 36 ++++++++------
 src/backend/replication/slot.c            | 59 ++++++++++++-----------
 src/backend/replication/walsender.c       | 48 ++++++++++++------
 doc/src/sgml/func.sgml                    | 15 ++++++
 10 files changed, 176 insertions(+), 61 deletions(-)

diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index cfe5409738c..48ca8523810 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -230,6 +230,7 @@ extern void XLOGShmemInit(void);
 extern void BootStrapXLOG(void);
 extern void InitializeWalConsistencyChecking(void);
 extern void LocalProcessControlFile(bool reset);
+extern WalLevel GetActiveWalLevelOnStandby(void);
 extern void StartupXLOG(void);
 extern void ShutdownXLOG(int code, Datum arg);
 extern void CreateCheckPoint(int flags);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 016354d75c5..55229759ac6 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6432,6 +6432,9 @@
 { oid => '2848', descr => 'switch to new wal file',
   proname => 'pg_switch_wal', provolatile => 'v', prorettype => 'pg_lsn',
   proargtypes => '', prosrc => 'pg_switch_wal' },
+{ oid => '9658', descr => 'log details of the current snapshot to WAL',
+  proname => 'pg_log_standby_snapshot', provolatile => 'v', prorettype => 'pg_lsn',
+  proargtypes => '', prosrc => 'pg_log_standby_snapshot' },
 { oid => '3098', descr => 'create a named restore point',
   proname => 'pg_create_restore_point', provolatile => 'v',
   prorettype => 'pg_lsn', proargtypes => 'text',
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 83ab70879bf..743a4723a76 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4466,6 +4466,17 @@ LocalProcessControlFile(bool reset)
 	ReadControlFile();
 }
 
+/*
+ * Get the wal_level from the control file. For a standby, this value should be
+ * considered as its active wal_level, because it may be different from what
+ * was originally configured on standby.
+ */
+WalLevel
+GetActiveWalLevelOnStandby(void)
+{
+	return ControlFile->wal_level;
+}
+
 /*
  * Initialization of shared memory for XLOG
  */
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index c07daa874f9..36a309b54cc 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -31,6 +31,7 @@
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/smgr.h"
+#include "storage/standby.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
@@ -196,6 +197,37 @@ pg_switch_wal(PG_FUNCTION_ARGS)
 	PG_RETURN_LSN(switchpoint);
 }
 
+/*
+ * pg_log_standby_snapshot: call LogStandbySnapshot()
+ *
+ * Permission checking for this function is managed through the normal
+ * GRANT system.
+ */
+Datum
+pg_log_standby_snapshot(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	recptr;
+
+	if (RecoveryInProgress())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("recovery is in progress"),
+				 errhint("pg_log_standby_snapshot() cannot be executed during recovery.")));
+
+	if (!XLogStandbyInfoActive())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("wal_level is not in desired state"),
+				 errhint("wal_level has to be >= WAL_LEVEL_REPLICA.")));
+
+	recptr = LogStandbySnapshot();
+
+	/*
+	 * As a convenience, return the WAL location of the last inserted record
+	 */
+	PG_RETURN_LSN(recptr);
+}
+
 /*
  * pg_create_restore_point: a named point for restore
  *
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 83ca8934440..b7c65ea37d7 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -644,6 +644,8 @@ REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public;
 
+REVOKE EXECUTE ON FUNCTION pg_log_standby_snapshot() FROM public;
+
 REVOKE EXECUTE ON FUNCTION pg_wal_replay_pause() FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_wal_replay_resume() FROM public;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8fe7bb65f1f..8352dbf5df6 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -152,11 +152,39 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 * can restart from there.
 			 */
 			break;
+		case XLOG_PARAMETER_CHANGE:
+			{
+				xl_parameter_change *xlrec =
+				(xl_parameter_change *) XLogRecGetData(buf->record);
+
+				/*
+				 * If wal_level on primary is reduced to less than logical,
+				 * then we want to prevent existing logical slots from being
+				 * used. Existing logical slots on standby get invalidated
+				 * when this WAL record is replayed; and further, slot
+				 * creation fails when the wal level is not sufficient; but
+				 * all these operations are not synchronized, so a logical
+				 * slot may creep in while the wal_level is being reduced.
+				 * Hence this extra check.
+				 */
+				if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+				{
+					/*
+					 * This can occur only on a standby, as a primary would
+					 * not allow to restart after changing wal_level < logical
+					 * if there is pre-existing logical slot.
+					 */
+					Assert(RecoveryInProgress());
+					ereport(ERROR,
+							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							 errmsg("logical decoding on standby requires wal_level to be at least logical on the primary server")));
+				}
+				break;
+			}
 		case XLOG_NOOP:
 		case XLOG_NEXTOID:
 		case XLOG_SWITCH:
 		case XLOG_BACKUP_END:
-		case XLOG_PARAMETER_CHANGE:
 		case XLOG_RESTORE_POINT:
 		case XLOG_FPW_CHANGE:
 		case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 27addd58f66..659b08cd456 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("logical decoding requires a database connection")));
 
-	/* ----
-	 * TODO: We got to change that someday soon...
-	 *
-	 * There's basically three things missing to allow this:
-	 * 1) We need to be able to correctly and quickly identify the timeline a
-	 *	  LSN belongs to
-	 * 2) We need to force hot_standby_feedback to be enabled at all times so
-	 *	  the primary cannot remove rows we need.
-	 * 3) support dropping replication slots referring to a database, in
-	 *	  dbase_redo. There can't be any active ones due to HS recovery
-	 *	  conflicts, so that should be relatively easy.
-	 * ----
-	 */
 	if (RecoveryInProgress())
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("logical decoding cannot be used while in recovery")));
+	{
+		/*
+		 * This check may have race conditions, but whenever
+		 * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
+		 * verify that there are no existing logical replication slots. And to
+		 * avoid races around creating a new slot,
+		 * CheckLogicalDecodingRequirements() is called once before creating
+		 * the slot, and once when logical decoding is initially starting up.
+		 */
+		if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("logical decoding on standby requires wal_level to be at least logical on the primary server")));
+	}
 }
 
 /*
@@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin,
 	LogicalDecodingContext *ctx;
 	MemoryContext old_context;
 
+	/*
+	 * On standby, this check is also required while creating the slot. Check
+	 * the comments in this function.
+	 */
+	CheckLogicalDecodingRequirements();
+
 	/* shorter lines... */
 	slot = MyReplicationSlot;
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 1b1b51e21ed..513c132f16d 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -41,6 +41,7 @@
 
 #include "access/transam.h"
 #include "access/xlog_internal.h"
+#include "access/xlogrecovery.h"
 #include "common/file_utils.h"
 #include "common/string.h"
 #include "miscadmin.h"
@@ -1192,37 +1193,28 @@ ReplicationSlotReserveWal(void)
 		/*
 		 * For logical slots log a standby snapshot and start logical decoding
 		 * at exactly that position. That allows the slot to start up more
-		 * quickly.
+		 * quickly. But on a standby we cannot do WAL writes, so just use the
+		 * replay pointer; effectively, an attempt to create a logical slot on
+		 * standby will cause it to wait for an xl_running_xact record to be
+		 * logged independently on the primary, so that a snapshot can be
+		 * built using the record.
 		 *
-		 * That's not needed (or indeed helpful) for physical slots as they'll
-		 * start replay at the last logged checkpoint anyway. Instead return
-		 * the location of the last redo LSN. While that slightly increases
-		 * the chance that we have to retry, it's where a base backup has to
-		 * start replay at.
+		 * None of this is needed (or indeed helpful) for physical slots as
+		 * they'll start replay at the last logged checkpoint anyway. Instead
+		 * return the location of the last redo LSN. While that slightly
+		 * increases the chance that we have to retry, it's where a base
+		 * backup has to start replay at.
 		 */
-		if (!RecoveryInProgress() && SlotIsLogical(slot))
-		{
-			XLogRecPtr	flushptr;
-
-			/* start at current insert position */
-			restart_lsn = GetXLogInsertRecPtr();
-			SpinLockAcquire(&slot->mutex);
-			slot->data.restart_lsn = restart_lsn;
-			SpinLockRelease(&slot->mutex);
-
-			/* make sure we have enough information to start */
-			flushptr = LogStandbySnapshot();
-
-			/* and make sure it's fsynced to disk */
-			XLogFlush(flushptr);
-		}
-		else
-		{
+		if (SlotIsPhysical(slot))
 			restart_lsn = GetRedoRecPtr();
-			SpinLockAcquire(&slot->mutex);
-			slot->data.restart_lsn = restart_lsn;
-			SpinLockRelease(&slot->mutex);
-		}
+		else if (RecoveryInProgress())
+			restart_lsn = GetXLogReplayRecPtr(NULL);
+		else
+			restart_lsn = GetXLogInsertRecPtr();
+
+		SpinLockAcquire(&slot->mutex);
+		slot->data.restart_lsn = restart_lsn;
+		SpinLockRelease(&slot->mutex);
 
 		/* prevent WAL removal as fast as possible */
 		ReplicationSlotsComputeRequiredLSN();
@@ -1238,6 +1230,17 @@ ReplicationSlotReserveWal(void)
 		if (XLogGetLastRemovedSegno() < segno)
 			break;
 	}
+
+	if (!RecoveryInProgress() && SlotIsLogical(slot))
+	{
+		XLogRecPtr	flushptr;
+
+		/* make sure we have enough information to start */
+		flushptr = LogStandbySnapshot();
+
+		/* and make sure it's fsynced to disk */
+		XLogFlush(flushptr);
+	}
 }
 
 /*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 66493b6e896..743e338b1b6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -906,23 +906,34 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	int			count;
 	WALReadError errinfo;
 	XLogSegNo	segno;
-	TimeLineID	currTLI = GetWALInsertionTimeLine();
+	TimeLineID	currTLI;
 
 	/*
-	 * Since logical decoding is only permitted on a primary server, we know
-	 * that the current timeline ID can't be changing any more. If we did this
-	 * on a standby, we'd have to worry about the values we compute here
-	 * becoming invalid due to a promotion or timeline change.
+	 * Make sure we have enough WAL available before retrieving the current
+	 * timeline. This is needed to determine am_cascading_walsender accurately
+	 * which is needed to determine the current timeline.
 	 */
+	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
+
+	/*
+	 * Since logical decoding is also permitted on a standby server, we need
+	 * to check if the server is in recovery to decide how to get the current
+	 * timeline ID (so that it also cover the promotion or timeline change
+	 * cases).
+	 */
+	am_cascading_walsender = RecoveryInProgress();
+
+	if (am_cascading_walsender)
+		GetXLogReplayRecPtr(&currTLI);
+	else
+		currTLI = GetWALInsertionTimeLine();
+
 	XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
 	sendTimeLineIsHistoric = (state->currTLI != currTLI);
 	sendTimeLine = state->currTLI;
 	sendTimeLineValidUpto = state->currTLIValidUntil;
 	sendTimeLineNextTLI = state->nextTLI;
 
-	/* make sure we have enough WAL available */
-	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
-
 	/* fail if not (implies we are going to shut down) */
 	if (flushptr < targetPagePtr + reqLen)
 		return -1;
@@ -937,9 +948,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 				 cur_page,
 				 targetPagePtr,
 				 XLOG_BLCKSZ,
-				 state->seg.ws_tli, /* Pass the current TLI because only
-									 * WalSndSegmentOpen controls whether new
-									 * TLI is needed. */
+				 currTLI,		/* Pass the current TLI because only
+								 * WalSndSegmentOpen controls whether new TLI
+								 * is needed. */
 				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
@@ -3076,10 +3087,14 @@ XLogSendLogical(void)
 	 * If first time through in this session, initialize flushPtr.  Otherwise,
 	 * we only need to update flushPtr if EndRecPtr is past it.
 	 */
-	if (flushPtr == InvalidXLogRecPtr)
-		flushPtr = GetFlushRecPtr(NULL);
-	else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
-		flushPtr = GetFlushRecPtr(NULL);
+	if (flushPtr == InvalidXLogRecPtr ||
+		logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+	{
+		if (am_cascading_walsender)
+			flushPtr = GetStandbyFlushRecPtr(NULL);
+		else
+			flushPtr = GetFlushRecPtr(NULL);
+	}
 
 	/* If EndRecPtr is still past our flushPtr, it means we caught up. */
 	if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3170,7 +3185,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
 	receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 
-	*tli = replayTLI;
+	if (tli)
+		*tli = replayTLI;
 
 	result = replayPtr;
 	if (receiveTLI == replayTLI && receivePtr > replayPtr)
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 4211d31f307..326f8bc2f47 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -27074,6 +27074,21 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         prepared with <xref linkend="sql-prepare-transaction"/>.
        </para></entry>
       </row>
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_log_standby_snapshot</primary>
+        </indexterm>
+        <function>pg_log_standby_snapshot</function> ()
+        <returnvalue>pg_lsn</returnvalue>
+       </para>
+       <para>
+        Take a snapshot of running transactions and write this to WAL without
+        having to wait bgwriter or checkpointer to log one. This one is useful for
+        logical decoding on standby for which logical slot creation is hanging
+        until such a record is replayed on the standby.
+       </para></entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
-- 
2.38.0

