From 52c25cc15abc4470d19e305d245b9362e6b8d6a3 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 7 Apr 2023 09:32:48 -0700
Subject: [PATCH va67 3/9] Support invalidating replication slots due to
 horizon and wal_level
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Needed for supporting logical decoding on a standby. The new invalidation
methods will be used in a subsequent commit.

Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Author: Andres Freund <andres@anarazel.de>
Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version)
Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Robert Haas <robertmhaas@gmail.com>
Reviewed-by: FabrÃ­zio de Royes Mello <fabriziomello@gmail.com>
Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/20230407075009.igg7be27ha2htkbt@awork3.anarazel.de
---
 src/include/replication/slot.h            |   9 +-
 src/backend/access/transam/xlog.c         |   6 +-
 src/backend/replication/logical/logical.c |   7 +
 src/backend/replication/slot.c            | 151 ++++++++++++++++++----
 4 files changed, 144 insertions(+), 29 deletions(-)

diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index ebcb637baed..bfc84193a7a 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -46,6 +46,10 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_NONE,
 	/* required WAL has been removed */
 	RS_INVAL_WAL,
+	/* required rows have been removed */
+	RS_INVAL_HORIZON,
+	/* wal_level insufficient for slot */
+	RS_INVAL_WAL_LEVEL,
 } ReplicationSlotInvalidationCause;
 
 /*
@@ -226,7 +230,10 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
-extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
+											   XLogSegNo oldestSegno,
+											   Oid dboid,
+											   TransactionId snapshotConflictHorizon);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern int	ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 46821ad6056..1485e8f9ca9 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6806,7 +6806,8 @@ CreateCheckPoint(int flags)
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 	KeepLogSeg(recptr, &_logSegNo);
-	if (InvalidateObsoleteReplicationSlots(_logSegNo))
+	if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL, _logSegNo, InvalidOid,
+										   InvalidTransactionId))
 	{
 		/*
 		 * Some slots have been invalidated; recalculate the old-segment
@@ -7250,7 +7251,8 @@ CreateRestartPoint(int flags)
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 	endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 	KeepLogSeg(endptr, &_logSegNo);
-	if (InvalidateObsoleteReplicationSlots(_logSegNo))
+	if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL, _logSegNo, InvalidOid,
+										   InvalidTransactionId))
 	{
 		/*
 		 * Some slots have been invalidated; recalculate the old-segment
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 85fc49f655d..27addd58f66 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -531,6 +531,13 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 						NameStr(MyReplicationSlot->data.name)),
 				 errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
 
+	if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot read from logical replication slot \"%s\"",
+						NameStr(MyReplicationSlot->data.name)),
+				 errdetail("This slot has been invalidated because it was conflicting with recovery.")));
+
 	Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
 	Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index df23b7ed31e..c2a9accebf6 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1241,8 +1241,58 @@ ReplicationSlotReserveWal(void)
 }
 
 /*
- * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
- * and mark it invalid, if necessary and possible.
+ * Report that replication slot needs to be invalidated
+ */
+static void
+ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
+					   bool terminating,
+					   int pid,
+					   NameData slotname,
+					   XLogRecPtr restart_lsn,
+					   XLogRecPtr oldestLSN,
+					   TransactionId snapshotConflictHorizon)
+{
+	StringInfoData err_detail;
+	bool		hint = false;
+
+	initStringInfo(&err_detail);
+
+	switch (cause)
+	{
+		case RS_INVAL_WAL:
+			hint = true;
+			appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."),
+							 LSN_FORMAT_ARGS(restart_lsn),
+							 (unsigned long long) (oldestLSN - restart_lsn));
+			break;
+		case RS_INVAL_HORIZON:
+			appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
+							 snapshotConflictHorizon);
+			break;
+
+		case RS_INVAL_WAL_LEVEL:
+			appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
+			break;
+		case RS_INVAL_NONE:
+			pg_unreachable();
+	}
+
+	ereport(LOG,
+			terminating ?
+			errmsg("terminating process %d to release replication slot \"%s\"",
+				   pid, NameStr(slotname)) :
+			errmsg("invalidating obsolete replication slot \"%s\"",
+				   NameStr(slotname)),
+			errdetail_internal("%s", err_detail.data),
+			hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0);
+
+	pfree(err_detail.data);
+}
+
+/*
+ * Helper for InvalidateObsoleteReplicationSlots
+ *
+ * Acquires the given slot and mark it invalid, if necessary and possible.
  *
  * Returns whether ReplicationSlotControlLock was released in the interim (and
  * in that case we're not holding the lock at return, otherwise we are).
@@ -1253,7 +1303,10 @@ ReplicationSlotReserveWal(void)
  * for syscalls, so caller must restart if we return true.
  */
 static bool
-InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
+InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
+							   ReplicationSlot *s,
+							   XLogRecPtr oldestLSN,
+							   Oid dboid, TransactionId snapshotConflictHorizon,
 							   bool *invalidated)
 {
 	int			last_signaled_pid = 0;
@@ -1264,6 +1317,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 		XLogRecPtr	restart_lsn;
 		NameData	slotname;
 		int			active_pid = 0;
+		ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;
 
 		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
 
@@ -1286,10 +1340,45 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 		restart_lsn = s->data.restart_lsn;
 
 		/*
-		 * If the slot is already invalid or is fresh enough, we don't need to
-		 * do anything.
+		 * If the slot is already invalid or is a non conflicting slot, we
+		 * don't need to do anything.
 		 */
-		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
+		if (s->data.invalidated == RS_INVAL_NONE)
+		{
+			switch (cause)
+			{
+				case RS_INVAL_WAL:
+					if (s->data.restart_lsn != InvalidXLogRecPtr &&
+						s->data.restart_lsn < oldestLSN)
+						conflict = cause;
+					break;
+				case RS_INVAL_HORIZON:
+					if (!SlotIsLogical(s))
+						break;
+					/* invalid DB oid signals a shared relation */
+					if (dboid != InvalidOid && dboid != s->data.database)
+						break;
+					if (TransactionIdIsValid(s->effective_xmin) &&
+						TransactionIdPrecedesOrEquals(s->effective_xmin,
+													  snapshotConflictHorizon))
+						conflict = cause;
+					else if (TransactionIdIsValid(s->effective_catalog_xmin) &&
+							 TransactionIdPrecedesOrEquals(s->effective_catalog_xmin,
+														   snapshotConflictHorizon))
+						conflict = cause;
+					break;
+				case RS_INVAL_WAL_LEVEL:
+					if (SlotIsLogical(s))
+						conflict = cause;
+					break;
+				default:
+					pg_unreachable();
+					break;
+			}
+		}
+
+		/* if there's no conflict, we're done */
+		if (conflict == RS_INVAL_NONE)
 		{
 			SpinLockRelease(&s->mutex);
 			if (released_lock)
@@ -1309,13 +1398,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 		{
 			MyReplicationSlot = s;
 			s->active_pid = MyProcPid;
-			s->data.invalidated = RS_INVAL_WAL;
+			s->data.invalidated = conflict;
 
 			/*
 			 * XXX: We should consider not overwriting restart_lsn and instead
 			 * just rely on .invalidated.
 			 */
-			s->data.restart_lsn = InvalidXLogRecPtr;
+			if (conflict == RS_INVAL_WAL)
+				s->data.restart_lsn = InvalidXLogRecPtr;
 
 			/* Let caller know */
 			*invalidated = true;
@@ -1349,13 +1439,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 			 */
 			if (last_signaled_pid != active_pid)
 			{
-				ereport(LOG,
-						errmsg("terminating process %d to release replication slot \"%s\"",
-							   active_pid, NameStr(slotname)),
-						errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
-								  LSN_FORMAT_ARGS(restart_lsn),
-								  (unsigned long long) (oldestLSN - restart_lsn)),
-						errhint("You might need to increase max_slot_wal_keep_size."));
+				ReportSlotInvalidation(conflict, true, active_pid,
+									   slotname, restart_lsn,
+									   oldestLSN, snapshotConflictHorizon);
 
 				(void) kill(active_pid, SIGTERM);
 				last_signaled_pid = active_pid;
@@ -1390,14 +1476,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 			ReplicationSlotMarkDirty();
 			ReplicationSlotSave();
 			ReplicationSlotRelease();
+			pgstat_drop_replslot(s);
 
-			ereport(LOG,
-					errmsg("invalidating obsolete replication slot \"%s\"",
-						   NameStr(slotname)),
-					errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
-							  LSN_FORMAT_ARGS(restart_lsn),
-							  (unsigned long long) (oldestLSN - restart_lsn)),
-					errhint("You might need to increase max_slot_wal_keep_size."));
+			ReportSlotInvalidation(conflict, false, active_pid,
+								   slotname, restart_lsn,
+								   oldestLSN, snapshotConflictHorizon);
 
 			/* done with this slot for now */
 			break;
@@ -1410,19 +1493,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 }
 
 /*
- * Mark any slot that points to an LSN older than the given segment
- * as invalid; it requires WAL that's about to be removed.
+ * Invalidate slots that require resources about to be removed.
  *
  * Returns true when any slot have got invalidated.
  *
+ * Whether a slot needs to be invalidated depends on the cause. A slot is
+ * removed if it:
+ * - RS_INVAL_WAL: requires a LSN older than the given segment
+ * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon, in the given db
+     dboid may be InvalidOid for shared relations
+ * - RS_INVAL_WAL_LEVEL: is logical
+ *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
 bool
-InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
+								   XLogSegNo oldestSegno, Oid dboid,
+								   TransactionId snapshotConflictHorizon)
 {
 	XLogRecPtr	oldestLSN;
 	bool		invalidated = false;
 
+	Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
+	Assert(cause != RS_INVAL_WAL || oldestSegno > 0);
+
+	if (max_replication_slots == 0)
+		return invalidated;
+
 	XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
 
 restart:
@@ -1434,7 +1531,9 @@ restart:
 		if (!s->in_use)
 			continue;
 
-		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
+		if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
+										   snapshotConflictHorizon,
+										   &invalidated))
 		{
 			/* if the lock was released, start from scratch */
 			goto restart;
-- 
2.38.0

