From 311a1d8f9c2d1acf0c22e091d53f7a533073c8b7 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 7 Apr 2023 09:56:02 -0700
Subject: [PATCH va67 4/9] Handle logical slot conflicts on standby
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

During WAL replay on standby, when slot conflict is identified, invalidate
such slots. Also do the same thing if wal_level on the primary server is
reduced to below logical and there are existing logical slots on
standby. Introduce a new ProcSignalReason value for slot conflict recovery.

Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Author: Andres Freund <andres@anarazel.de>
Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version)
Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Robert Haas <robertmhaas@gmail.com>
Reviewed-by: FabrÃ­zio de Royes Mello <fabriziomello@gmail.com>
Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/20230407075009.igg7be27ha2htkbt@awork3.anarazel.de
---
 src/include/storage/procsignal.h     |  1 +
 src/include/storage/standby.h        |  2 ++
 src/backend/access/gist/gistxlog.c   |  2 ++
 src/backend/access/hash/hash_xlog.c  |  1 +
 src/backend/access/heap/heapam.c     |  3 +++
 src/backend/access/nbtree/nbtxlog.c  |  2 ++
 src/backend/access/spgist/spgxlog.c  |  1 +
 src/backend/access/transam/xlog.c    | 15 +++++++++++++++
 src/backend/replication/slot.c       |  8 +++++++-
 src/backend/storage/ipc/procsignal.c |  3 +++
 src/backend/storage/ipc/standby.c    | 20 +++++++++++++++++++-
 src/backend/tcop/postgres.c          |  9 +++++++++
 12 files changed, 65 insertions(+), 2 deletions(-)

diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 905af2231ba..2f52100b009 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -42,6 +42,7 @@ typedef enum
 	PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
 	PROCSIG_RECOVERY_CONFLICT_LOCK,
 	PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+	PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
 	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
 	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 2effdea126f..41f4dc372e6 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void);
 extern void ShutdownRecoveryTransactionEnvironment(void);
 
 extern void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
+												bool isCatalogRel,
 												RelFileLocator locator);
 extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon,
+													   bool isCatalogRel,
 													   RelFileLocator locator);
 extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
 extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index b7678f3c144..9a86fb3feff 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -197,6 +197,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
 		XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 
 		ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+											xldata->isCatalogRel,
 											rlocator);
 	}
 
@@ -390,6 +391,7 @@ gistRedoPageReuse(XLogReaderState *record)
 	 */
 	if (InHotStandby)
 		ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+												   xlrec->isCatalogRel,
 												   xlrec->locator);
 }
 
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
index f2dd9be8d3f..e8e06c62a95 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1003,6 +1003,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 		ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+											xldata->isCatalogRel,
 											rlocator);
 	}
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 8b13e3f8925..f389ceee1ea 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8769,6 +8769,7 @@ heap_xlog_prune(XLogReaderState *record)
 	 */
 	if (InHotStandby)
 		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+											xlrec->isCatalogRel,
 											rlocator);
 
 	/*
@@ -8940,6 +8941,7 @@ heap_xlog_visible(XLogReaderState *record)
 	 */
 	if (InHotStandby)
 		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+											xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
 											rlocator);
 
 	/*
@@ -9061,6 +9063,7 @@ heap_xlog_freeze_page(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+											xlrec->isCatalogRel,
 											rlocator);
 	}
 
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 414ca4f6deb..c87e46ed66e 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record)
 		XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 
 		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+											xlrec->isCatalogRel,
 											rlocator);
 	}
 
@@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
 
 	if (InHotStandby)
 		ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+												   xlrec->isCatalogRel,
 												   xlrec->locator);
 }
 
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index b071b59c8ac..459ac929ba5 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
 		ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+											xldata->isCatalogRel,
 											locator);
 	}
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1485e8f9ca9..5227fc675c8 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7965,6 +7965,21 @@ xlog_redo(XLogReaderState *record)
 		/* Update our copy of the parameters in pg_control */
 		memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
 
+		/*
+		 * Invalidate logical slots if we are in hot standby and the primary
+		 * does not have a WAL level sufficient for logical decoding. No need
+		 * to search for potentially conflicting logically slots if standby is
+		 * running with wal_level lower than logical, because in that case, we
+		 * would have either disallowed creation of logical slots or
+		 * invalidated existing ones.
+		 */
+		if (InRecovery && InHotStandby &&
+			xlrec.wal_level < WAL_LEVEL_LOGICAL &&
+			wal_level >= WAL_LEVEL_LOGICAL)
+			InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL,
+											   0, InvalidOid,
+											   InvalidTransactionId);
+
 		LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
 		ControlFile->MaxConnections = xlrec.MaxConnections;
 		ControlFile->max_worker_processes = xlrec.max_worker_processes;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index c2a9accebf6..1b1b51e21ed 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1443,7 +1443,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 									   slotname, restart_lsn,
 									   oldestLSN, snapshotConflictHorizon);
 
-				(void) kill(active_pid, SIGTERM);
+				if (MyBackendType == B_STARTUP)
+					(void) SendProcSignal(active_pid,
+										  PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
+										  InvalidBackendId);
+				else
+					(void) kill(active_pid, SIGTERM);
+
 				last_signaled_pid = active_pid;
 			}
 
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 395b2cf6909..c85cb5cc18d 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -673,6 +673,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
 
+	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
+		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 9f56b4e95cf..3b5d654347e 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -24,6 +24,7 @@
 #include "access/xlogutils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/slot.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -466,6 +467,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
  */
 void
 ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
+									bool isCatalogRel,
 									RelFileLocator locator)
 {
 	VirtualTransactionId *backends;
@@ -491,6 +493,16 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
 										   WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
 										   true);
+
+	/*
+	 * Note that WaitExceedsMaxStandbyDelay() is not taken into account here
+	 * (as opposed to ResolveRecoveryConflictWithVirtualXIDs() above). That
+	 * seems OK, given that this kind of conflict should not normally be
+	 * reached, e.g. by using a physical replication slot.
+	 */
+	if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid,
+										   snapshotConflictHorizon);
 }
 
 /*
@@ -499,6 +511,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
  */
 void
 ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon,
+										   bool isCatalogRel,
 										   RelFileLocator locator)
 {
 	/*
@@ -517,7 +530,9 @@ ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHor
 		TransactionId truncated;
 
 		truncated = XidFromFullTransactionId(snapshotConflictHorizon);
-		ResolveRecoveryConflictWithSnapshot(truncated, locator);
+		ResolveRecoveryConflictWithSnapshot(truncated,
+											isCatalogRel,
+											locator);
 	}
 }
 
@@ -1478,6 +1493,9 @@ get_recovery_conflict_desc(ProcSignalReason reason)
 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
 			reasonDesc = _("recovery conflict on snapshot");
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+			reasonDesc = _("recovery conflict on replication slot");
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
 			reasonDesc = _("recovery conflict on buffer deadlock");
 			break;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index a10ecbaf50b..25e0de4e0ff 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2526,6 +2526,9 @@ errdetail_recovery_conflict(void)
 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
 			errdetail("User query might have needed to see row versions that must be removed.");
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+			errdetail("User was using the logical slot that must be dropped.");
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
 			errdetail("User transaction caused buffer deadlock with recovery.");
 			break;
@@ -3143,6 +3146,12 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
 				InterruptPending = true;
 				break;
 
+			case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+				RecoveryConflictPending = true;
+				QueryCancelPending = true;
+				InterruptPending = true;
+				break;
+
 			default:
 				elog(FATAL, "unrecognized conflict mode: %d",
 					 (int) reason);
-- 
2.38.0

