From cc1370fb4be75ce6619d9f29649a81822a16d977 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Thu, 6 Apr 2023 23:14:39 -0700
Subject: [PATCH va65 3/9] Add support for more causes to
 InvalidateObsoleteReplicationSlots()

This supports invalidating all logical replication slots and invalidating
logical replication slots that conflict with an increased snapshot conflict
horizon.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/replication/slot.h            |   7 +-
 src/include/storage/procsignal.h          |   1 +
 src/backend/access/transam/xlog.c         |   6 +-
 src/backend/replication/logical/logical.c |   7 +
 src/backend/replication/slot.c            | 161 ++++++++++++++++++----
 src/backend/storage/ipc/procsignal.c      |   3 +
 src/backend/storage/ipc/standby.c         |   3 +
 src/backend/tcop/postgres.c               |   9 ++
 8 files changed, 167 insertions(+), 30 deletions(-)

diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 793f0701b88..35cafe94bc5 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -46,6 +46,10 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_NONE,
 	/* required WAL has been removed */
 	RS_INVAL_WAL,
+	/* required rows have been removed */
+	RS_INVAL_XID,
+	/* wal_level insufficient for slot */
+	RS_INVAL_WAL_LEVEL,
 } ReplicationSlotInvalidationCause;
 
 /*
@@ -226,7 +230,8 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
-extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid dboid,
+											   TransactionId xid);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern int	ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 905af2231ba..2f52100b009 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -42,6 +42,7 @@ typedef enum
 	PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
 	PROCSIG_RECOVERY_CONFLICT_LOCK,
 	PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+	PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
 	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
 	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 46821ad6056..5e964e2e96b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6806,7 +6806,8 @@ CreateCheckPoint(int flags)
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 	KeepLogSeg(recptr, &_logSegNo);
-	if (InvalidateObsoleteReplicationSlots(_logSegNo))
+	if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid,
+										   InvalidTransactionId))
 	{
 		/*
 		 * Some slots have been invalidated; recalculate the old-segment
@@ -7250,7 +7251,8 @@ CreateRestartPoint(int flags)
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 	endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 	KeepLogSeg(endptr, &_logSegNo);
-	if (InvalidateObsoleteReplicationSlots(_logSegNo))
+	if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid,
+										   InvalidTransactionId))
 	{
 		/*
 		 * Some slots have been invalidated; recalculate the old-segment
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 85fc49f655d..27addd58f66 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -531,6 +531,13 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 						NameStr(MyReplicationSlot->data.name)),
 				 errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
 
+	if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot read from logical replication slot \"%s\"",
+						NameStr(MyReplicationSlot->data.name)),
+				 errdetail("This slot has been invalidated because it was conflicting with recovery.")));
+
 	Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
 	Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 06ff3559dd1..3e8c1b44e85 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1241,8 +1241,73 @@ ReplicationSlotReserveWal(void)
 }
 
 /*
- * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
- * and mark it invalid, if necessary and possible.
+ * Report that replication slot needs to be invalidated
+ */
+static void
+ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
+					   bool terminating,
+					   int pid,
+					   NameData slotname,
+					   XLogRecPtr restart_lsn,
+					   XLogRecPtr oldestLSN,
+					   TransactionId xid)
+{
+	StringInfoData err_detail;
+	bool		hint = false;
+
+	initStringInfo(&err_detail);
+
+	switch (cause)
+	{
+		case RS_INVAL_WAL:
+			hint = true;
+			appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."),
+							 LSN_FORMAT_ARGS(restart_lsn),
+							 (unsigned long long) (oldestLSN - restart_lsn));
+			break;
+		case RS_INVAL_XID:
+			appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."), xid);
+			break;
+
+		case RS_INVAL_WAL_LEVEL:
+			appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
+			break;
+		case RS_INVAL_NONE:
+			pg_unreachable();
+	}
+
+	ereport(LOG,
+			terminating ? errmsg("terminating process %d to release replication slot \"%s\"", pid, NameStr(slotname)) :
+			errmsg("invalidating obsolete replication slot \"%s\"", NameStr(slotname)),
+			errdetail_internal("%s", err_detail.data),
+			hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0);
+
+	pfree(err_detail.data);
+}
+
+/* FIXME: This is a too generic name */
+static inline bool
+TransactionIdIsValidPrecedesOrEquals(TransactionId xid1, TransactionId xid2)
+{
+	return (TransactionIdIsValid(xid1) && TransactionIdPrecedesOrEquals(xid1, xid2));
+}
+
+static inline bool
+LogicalReplicationSlotXidsConflict(ReplicationSlot *s, Oid dboid, TransactionId xid)
+{
+	/* an invalid DB oid signals a shared relation, need to conflict */
+	if (dboid != InvalidOid && dboid != s->data.database)
+		return false;
+
+	return
+		TransactionIdIsValidPrecedesOrEquals(s->effective_xmin, xid) ||
+		TransactionIdIsValidPrecedesOrEquals(s->effective_catalog_xmin, xid);
+}
+
+/*
+ * Helper for InvalidateObsoleteReplicationSlots
+ *
+ * Acquires the given slot and mark it invalid, if necessary and possible.
  *
  * Returns whether ReplicationSlotControlLock was released in the interim (and
  * in that case we're not holding the lock at return, otherwise we are).
@@ -1254,16 +1319,20 @@ ReplicationSlotReserveWal(void)
  */
 static bool
 InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
-							   bool *invalidated)
+							   Oid dboid, TransactionId xid, bool *invalidated)
 {
 	int			last_signaled_pid = 0;
 	bool		released_lock = false;
+	bool		invalidate_all_logical = !TransactionIdIsValid(xid) &&
+		oldestLSN == InvalidXLogRecPtr;
+
 
 	for (;;)
 	{
 		XLogRecPtr	restart_lsn;
 		NameData	slotname;
 		int			active_pid = 0;
+		ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;
 
 		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
 
@@ -1286,10 +1355,23 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 		restart_lsn = s->data.restart_lsn;
 
 		/*
-		 * If the slot is already invalid or is fresh enough, we don't need to
-		 * do anything.
+		 * If the slot is already invalid or is a non conflicting slot, we
+		 * don't need to do anything.
 		 */
-		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
+		if (s->data.invalidated == RS_INVAL_NONE)
+		{
+			if (oldestLSN != InvalidXLogRecPtr && s->data.restart_lsn != InvalidXLogRecPtr &&
+				s->data.restart_lsn < oldestLSN)
+				conflict = RS_INVAL_WAL;
+			if (TransactionIdIsValid(xid) && SlotIsLogical(s) &&
+				LogicalReplicationSlotXidsConflict(s, dboid, xid))
+				conflict = RS_INVAL_XID;
+			else if (invalidate_all_logical && SlotIsLogical(s))
+				conflict = RS_INVAL_WAL_LEVEL;
+		}
+
+		/* if there's no conflict, we're done */
+		if (conflict == RS_INVAL_NONE)
 		{
 			SpinLockRelease(&s->mutex);
 			if (released_lock)
@@ -1309,8 +1391,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 		{
 			MyReplicationSlot = s;
 			s->active_pid = MyProcPid;
-			s->data.invalidated = RS_INVAL_WAL;
-			s->data.restart_lsn = InvalidXLogRecPtr;
+			s->data.invalidated = conflict;
+			if (conflict == RS_INVAL_WAL)
+				s->data.restart_lsn = InvalidXLogRecPtr;
 
 			/* Let caller know */
 			*invalidated = true;
@@ -1344,15 +1427,17 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 			 */
 			if (last_signaled_pid != active_pid)
 			{
-				ereport(LOG,
-						errmsg("terminating process %d to release replication slot \"%s\"",
-							   active_pid, NameStr(slotname)),
-						errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
-								  LSN_FORMAT_ARGS(restart_lsn),
-								  (unsigned long long) (oldestLSN - restart_lsn)),
-						errhint("You might need to increase max_slot_wal_keep_size."));
+				ReportSlotInvalidation(conflict, true, active_pid,
+									   slotname, restart_lsn,
+									   oldestLSN, xid);
+
+				if (MyBackendType == B_STARTUP)
+					(void) SendProcSignal(active_pid,
+										  PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
+										  InvalidBackendId);
+				else
+					(void) kill(active_pid, SIGTERM);
 
-				(void) kill(active_pid, SIGTERM);
 				last_signaled_pid = active_pid;
 			}
 
@@ -1385,14 +1470,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 			ReplicationSlotMarkDirty();
 			ReplicationSlotSave();
 			ReplicationSlotRelease();
+			pgstat_drop_replslot(s);
 
-			ereport(LOG,
-					errmsg("invalidating obsolete replication slot \"%s\"",
-						   NameStr(slotname)),
-					errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
-							  LSN_FORMAT_ARGS(restart_lsn),
-							  (unsigned long long) (oldestLSN - restart_lsn)),
-					errhint("You might need to increase max_slot_wal_keep_size."));
+			ReportSlotInvalidation(conflict, false, active_pid,
+								   slotname, restart_lsn,
+								   oldestLSN, xid);
 
 			/* done with this slot for now */
 			break;
@@ -1405,19 +1487,44 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 }
 
 /*
- * Mark any slot that points to an LSN older than the given segment
- * as invalid; it requires WAL that's about to be removed.
+ * Invalidate slots that require resources about to be removed.
  *
  * Returns true when any slot have got invalidated.
  *
+ * FIXME:
+ *
+ * WAL case (aka check_on_xid is false):
+ *
+ *	 Mark any slot that points to an LSN older than the given segment
+ *	 as invalid; it requires WAL that's about to be removed.
+ *	 invalidated is set to true when any slot have got invalidated.
+ *
+ * Xid case (aka check_on_xid is true):
+ *
+ *	 When xid is valid, it means that we are about to remove rows older than xid.
+ *	 Therefore we need to invalidate slots that depend on seeing those rows.
+ *	 When xid is invalid, invalidate all logical slots. This is required when the
+ *	 master wal_level is set back to replica, so existing logical slots need to
+ *	 be invalidated. Note that WaitExceedsMaxStandbyDelay() is not taken into
+ *	 account here (as opposed to ResolveRecoveryConflictWithVirtualXIDs()): XXXX
+ *
+ *
+ * XXX: Should we have the caller pass in a specific
+ * ReplicationSlotInvalidationCause that we should search for? That'd likely
+ * make some things a bit neater.
+ *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
 bool
-InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid dboid,
+								   TransactionId xid)
 {
 	XLogRecPtr	oldestLSN;
 	bool		invalidated = false;
 
+	if (max_replication_slots == 0)
+		return invalidated;
+
 	XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
 
 restart:
@@ -1429,7 +1536,7 @@ restart:
 		if (!s->in_use)
 			continue;
 
-		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
+		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, dboid, xid, &invalidated))
 		{
 			/* if the lock was released, start from scratch */
 			goto restart;
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 395b2cf6909..c85cb5cc18d 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -673,6 +673,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
 
+	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
+		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 9f56b4e95cf..fc81e17901c 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -1478,6 +1478,9 @@ get_recovery_conflict_desc(ProcSignalReason reason)
 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
 			reasonDesc = _("recovery conflict on snapshot");
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+			reasonDesc = _("recovery conflict on replication slot");
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
 			reasonDesc = _("recovery conflict on buffer deadlock");
 			break;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index a10ecbaf50b..25e0de4e0ff 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2526,6 +2526,9 @@ errdetail_recovery_conflict(void)
 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
 			errdetail("User query might have needed to see row versions that must be removed.");
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+			errdetail("User was using the logical slot that must be dropped.");
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
 			errdetail("User transaction caused buffer deadlock with recovery.");
 			break;
@@ -3143,6 +3146,12 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
 				InterruptPending = true;
 				break;
 
+			case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+				RecoveryConflictPending = true;
+				QueryCancelPending = true;
+				InterruptPending = true;
+				break;
+
 			default:
 				elog(FATAL, "unrecognized conflict mode: %d",
 					 (int) reason);
-- 
2.38.0

