Improve error handling for invalid slots and ensure a same 'inactive_since' time for inactive slots

Started by Nisha Moond12 months ago15 messages
#1Nisha Moond
nisha.moond412@gmail.com
2 attachment(s)

Hello Hackers,
(CC people involved in the earlier discussion)

While implementing slot invalidation based on inactive(idle) timeout
(see [1]/messages/by-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com), several general optimizations and improvements were
identified.

This thread is a spin-off from [1]/messages/by-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com, intended to address these
optimizations separately from the main feature. As suggested in [2]/messages/by-id/CAA4eK1LiDjz+F8hEYG0_ux=rqwhxnTuWpT-RKNDWaac3w3bWNw@mail.gmail.com,
the improvements are divided into two parts:

Patch-001: Update the logic to ensure all inactive slots have the same
'inactive_since' time when restoring the slots from disk in
RestoreSlotFromDisk() and when updating the synced slots on standby in
update_synced_slots_inactive_since().

Patch-002: Raise error for invalid slots while acquiring it in
ReplicationSlotAcquire().
Currently, a process can acquire an invalid slot but may eventually
error out at later stages. For example, if a process acquires a slot
invalidated due to wal_removed, it will later fail in
CreateDecodingContext() when trying to access the removed WAL. The
idea here is to improve error handling by detecting invalid slots
earlier.
A new parameter, "error_if_invalid", is introduced in
ReplicationSlotAcquire(). If the caller specifies
error_if_invalid=true, an error is raised immediately instead of
letting the process acquire the invalid slot first and then fail later
due to the invalidated slot.

The v1 patches are attached, any feedback would be appreciated!

[1]: /messages/by-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
[2]: /messages/by-id/CAA4eK1LiDjz+F8hEYG0_ux=rqwhxnTuWpT-RKNDWaac3w3bWNw@mail.gmail.com

--
Thanks,
Nisha

Attachments:

v1-0001-Ensure-same-inactive_since-time-for-all-inactive-.patchapplication/octet-stream; name=v1-0001-Ensure-same-inactive_since-time-for-all-inactive-.patchDownload
From 5f38ecb859a6399f78d009029307436b1019df76 Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Tue, 28 Jan 2025 10:27:58 +0530
Subject: [PATCH v1 1/2] Ensure same inactive_since time for all inactive slots

Update the logic to make sure all slots get the same 'inactive_since' time
in following cases:
 1) When updating the synced slots on standby in update_synced_slots_inactive_since() and
 2) When restoring slots from disk in RestoreSlotFromDisk().
---
 src/backend/replication/logical/slotsync.c | 9 ++++-----
 src/backend/replication/slot.c             | 6 +++++-
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index f6945af1d4..db52795b73 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -1508,7 +1508,7 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 static void
 update_synced_slots_inactive_since(void)
 {
-	TimestampTz now = 0;
+	TimestampTz now;
 
 	/*
 	 * We need to update inactive_since only when we are promoting standby to
@@ -1523,6 +1523,9 @@ update_synced_slots_inactive_since(void)
 	/* The slot sync worker or SQL function mustn't be running by now */
 	Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
 
+	/* Use same inactive_since time for all slots */
+	now = GetCurrentTimestamp();
+
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
 	for (int i = 0; i < max_replication_slots; i++)
@@ -1537,10 +1540,6 @@ update_synced_slots_inactive_since(void)
 			/* The slot must not be acquired by any process */
 			Assert(s->active_pid == 0);
 
-			/* Use the same inactive_since time for all the slots. */
-			if (now == 0)
-				now = GetCurrentTimestamp();
-
 			SpinLockAcquire(&s->mutex);
 			s->inactive_since = now;
 			SpinLockRelease(&s->mutex);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index b30e0473e1..c567c1fef3 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -2208,6 +2208,7 @@ RestoreSlotFromDisk(const char *name)
 	bool		restored = false;
 	int			readBytes;
 	pg_crc32c	checksum;
+	TimestampTz now;
 
 	/* no need to lock here, no concurrent access allowed yet */
 
@@ -2368,6 +2369,9 @@ RestoreSlotFromDisk(const char *name)
 						NameStr(cp.slotdata.name)),
 				 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
 
+	/* Use same inactive_since time for all slots */
+	now = GetCurrentTimestamp();
+
 	/* nothing can be active yet, don't lock anything */
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -2400,7 +2404,7 @@ RestoreSlotFromDisk(const char *name)
 		 * slot from the disk into memory. Whoever acquires the slot i.e.
 		 * makes the slot active will reset it.
 		 */
-		slot->inactive_since = GetCurrentTimestamp();
+		slot->inactive_since = now;
 
 		restored = true;
 		break;
-- 
2.34.1

v1-0002-Raise-Error-for-Invalid-Slots-in-ReplicationSlotA.patchapplication/octet-stream; name=v1-0002-Raise-Error-for-Invalid-Slots-in-ReplicationSlotA.patchDownload
From 176d2df70465eaba6f5912edf352b49ac75b3c2d Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Tue, 28 Jan 2025 10:30:08 +0530
Subject: [PATCH v1 2/2] Raise Error for Invalid Slots in
 ReplicationSlotAcquire()

Once a replication slot is invalidated, it cannot be reused. However, a process
could still acquire an invalid slot and fail later. For example, if a process
acquires a slot invalidated due to wal_removed, it will eventually fail in
CreateDecodingContext() when attempting to access the removed WAL.

This patch improves error handling by detecting invalid slots earlier.
If error_if_invalid=true is specified when calling ReplicationSlotAcquire(),
an error will be raised immediately instead of letting the process acquire the
slot and fail later due to the invalidated slot.
---
 .../replication/logical/logicalfuncs.c        |  2 +-
 src/backend/replication/logical/slotsync.c    |  4 +-
 src/backend/replication/slot.c                | 54 +++++++++++++++++--
 src/backend/replication/slotfuncs.c           |  2 +-
 src/backend/replication/walsender.c           |  4 +-
 src/backend/utils/adt/pg_upgrade_support.c    |  2 +-
 src/include/replication/slot.h                |  3 +-
 src/test/recovery/t/019_replslot_limit.pl     |  2 +-
 8 files changed, 61 insertions(+), 12 deletions(-)

diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 0148ec3678..ca53caac2f 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index db52795b73..692527b984 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -446,7 +446,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -665,7 +665,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * pre-check to ensure that at least one of the slot properties is
 		 * changed before acquiring the slot.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index c567c1fef3..0c3c5b4e4e 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -163,6 +163,7 @@ static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 static void RestoreSlotFromDisk(const char *name);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
 static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
+static void RaiseSlotInvalidationError(ReplicationSlot *slot);
 
 /*
  * Report shared-memory space needed by ReplicationSlotsShmemInit.
@@ -535,9 +536,12 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * An error is raised if error_if_invalid is true and the slot is found to
+ * be invalid.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
 {
 	ReplicationSlot *s;
 	int			active_pid;
@@ -615,6 +619,13 @@ retry:
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = s;
 
+	/*
+	 * An error is raised if error_if_invalid is true and the slot is found to
+	 * be invalid.
+	 */
+	if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
+		RaiseSlotInvalidationError(s);
+
 	/*
 	 * The call to pgstat_acquire_replslot() protects against stats for a
 	 * different slot, from before a restart or such, being present during
@@ -785,7 +796,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -812,7 +823,7 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 	Assert(MyReplicationSlot == NULL);
 	Assert(failover || two_phase);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, false);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -2797,3 +2808,40 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
 
 	ConditionVariableCancelSleep();
 }
+
+/*
+ * Raise an error based on the slot's invalidation cause.
+ */
+static void
+RaiseSlotInvalidationError(ReplicationSlot *slot)
+{
+	StringInfo	err_detail = makeStringInfo();
+
+	Assert(slot->data.invalidated != RS_INVAL_NONE);
+
+	switch (slot->data.invalidated)
+	{
+		case RS_INVAL_WAL_REMOVED:
+			appendStringInfoString(err_detail, _("This slot has been invalidated because the required WAL has been removed."));
+			break;
+
+		case RS_INVAL_HORIZON:
+			appendStringInfoString(err_detail, _("This slot has been invalidated because the required rows have been removed."));
+			break;
+
+		case RS_INVAL_WAL_LEVEL:
+			/* translator: %s is a GUC variable name */
+			appendStringInfo(err_detail, _("This slot has been invalidated because \"%s\" is insufficient for slot."),
+							 "wal_level");
+			break;
+
+		case RS_INVAL_NONE:
+			pg_unreachable();
+	}
+
+	ereport(ERROR,
+			errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+			errmsg("can no longer get changes from replication slot \"%s\"",
+				   NameStr(slot->data.name)),
+			errdetail_internal("%s", err_detail->data));
+}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 977146789f..8be4b8c65b 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -536,7 +536,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bac504b554..446d10c1a7 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -816,7 +816,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1434,7 +1434,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 9a10907d05..d44f8c262b 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -298,7 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, true);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index bf62b36ad0..47ebdaecb6 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -253,7 +253,8 @@ extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, const bool *failover,
 								 const bool *two_phase);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool error_if_invalid);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(bool synced_only);
 extern void ReplicationSlotSave(void);
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index ae2ad5c933..66ac7c40f1 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -234,7 +234,7 @@ my $failed = 0;
 for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
 {
 	if ($node_standby->log_contains(
-			"requested WAL segment [0-9A-F]+ has already been removed",
+			"This slot has been invalidated because the required WAL has been removed",
 			$logstart))
 	{
 		$failed = 1;
-- 
2.34.1

#2Peter Smith
smithpb2250@gmail.com
In reply to: Nisha Moond (#1)
Re: Improve error handling for invalid slots and ensure a same 'inactive_since' time for inactive slots

Hi Nisha,

Some review comments for patch v1-0001.

======
src/backend/replication/logical/slotsync.c

ReplSlotSyncWorkerMain:

1.
+ /* Use same inactive_since time for all slots */
+ now = GetCurrentTimestamp();
+
  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);

for (int i = 0; i < max_replication_slots; i++)
@@ -1537,10 +1540,6 @@ update_synced_slots_inactive_since(void)
/* The slot must not be acquired by any process */
Assert(s->active_pid == 0);

- /* Use the same inactive_since time for all the slots. */
- if (now == 0)
- now = GetCurrentTimestamp();
-

AFAICT, this code was *already* ensuring to use the same
'inactive_since' even before your patch. The only difference is now
you are getting the timestamp value up-front instead of a deferred
assignment.

So why did you change this (and the code of RestoreSlotFromDisk) to do
the up-front assignment? Instead, you could have chosen to just leave
this code as-is, and then modify the RestoreSlotFromDisk code to match
it.

FWIW, I do prefer what you have done here because it is simpler, but I
just wondered about the choice because I think some people worry about
GetCurrentTimestamp overheads and try to avoid calling that wherever
possible.

======
src/backend/replication/slot.c

2. What about other loops?

AFAICT there are still some other loops where the inactive_since
timestamps might differ.

e.g. How about this logic in slot.c:

InvalidateObsoleteReplicationSlots:

LOOP:
for (int i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];

calls InvalidatePossiblyObsoleteSlot(...)
which calls ReplicationSlotRelease(...)
which assigns now = GetCurrentTimestamp();
then slot->inactive_since = now;
}

~

So, should you also assign a 'now' value outside this loop and pass
that timestamp down the calls so they eventually all get assigned the
same? I don't know, but I guess at least that would require much fewer
unnecessary calls to GetCurrentTimestamp so that may be a good thing.

======
Kind Regards,
Peter Smith.
Fujitsu Australia

#3Peter Smith
smithpb2250@gmail.com
In reply to: Nisha Moond (#1)
Re: Improve error handling for invalid slots and ensure a same 'inactive_since' time for inactive slots

Hi Nisha,

Some review comments for the patch v1-0002.

======
src/backend/replication/slot.c

1.
+
+/*
+ * Raise an error based on the slot's invalidation cause.
+ */
+static void
+RaiseSlotInvalidationError(ReplicationSlot *slot)
+{
+ StringInfo err_detail = makeStringInfo();
+
+ Assert(slot->data.invalidated != RS_INVAL_NONE);
+
+ switch (slot->data.invalidated)
+ {
+ case RS_INVAL_WAL_REMOVED:
+ appendStringInfoString(err_detail, _("This slot has been invalidated
because the required WAL has been removed."));
+ break;
+
+ case RS_INVAL_HORIZON:
+ appendStringInfoString(err_detail, _("This slot has been invalidated
because the required rows have been removed."));
+ break;
+
+ case RS_INVAL_WAL_LEVEL:
+ /* translator: %s is a GUC variable name */
+ appendStringInfo(err_detail, _("This slot has been invalidated
because \"%s\" is insufficient for slot."),
+ "wal_level");
+ break;
+
+ case RS_INVAL_NONE:
+ pg_unreachable();
+ }
+
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("can no longer get changes from replication slot \"%s\"",
+    NameStr(slot->data.name)),
+ errdetail_internal("%s", err_detail->data));
+}

AFAIK the _() is a macro for gettext(). So those strings are intended
for translation, right? There's also a "/* translator: ..." comment
implying the same.

OTOH, those strings are only being used by errdetail_internal, whose
function comment says "This is exactly like errdetail() except that
strings passed to errdetail_internal are not translated...".

Isn't there some contradiction here?

Perhaps the _() macro is not needed, or perhaps the
errdetail_internal() should be errdetail().

======
Kind Regards,
Peter Smith.
Fujitsu Australia

#4Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#2)
Re: Improve error handling for invalid slots and ensure a same 'inactive_since' time for inactive slots

On Wed, Jan 29, 2025 at 7:50 AM Peter Smith <smithpb2250@gmail.com> wrote:

Some review comments for patch v1-0001.

======
src/backend/replication/logical/slotsync.c

ReplSlotSyncWorkerMain:

1.
+ /* Use same inactive_since time for all slots */
+ now = GetCurrentTimestamp();
+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);

for (int i = 0; i < max_replication_slots; i++)
@@ -1537,10 +1540,6 @@ update_synced_slots_inactive_since(void)
/* The slot must not be acquired by any process */
Assert(s->active_pid == 0);

- /* Use the same inactive_since time for all the slots. */
- if (now == 0)
- now = GetCurrentTimestamp();
-

AFAICT, this code was *already* ensuring to use the same
'inactive_since' even before your patch. The only difference is now
you are getting the timestamp value up-front instead of a deferred
assignment.

I find the code without a patch better as it may sometimes skip to
call GetCurrentTimestamp().

So why did you change this (and the code of RestoreSlotFromDisk) to do
the up-front assignment? Instead, you could have chosen to just leave
this code as-is, and then modify the RestoreSlotFromDisk code to match
it.

FWIW, I do prefer what you have done here because it is simpler, but I
just wondered about the choice because I think some people worry about
GetCurrentTimestamp overheads and try to avoid calling that wherever
possible.

======
src/backend/replication/slot.c

2. What about other loops?

AFAICT there are still some other loops where the inactive_since
timestamps might differ.

e.g. How about this logic in slot.c:

InvalidateObsoleteReplicationSlots:

LOOP:
for (int i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];

calls InvalidatePossiblyObsoleteSlot(...)
which calls ReplicationSlotRelease(...)
which assigns now = GetCurrentTimestamp();
then slot->inactive_since = now;
}

~

So, should you also assign a 'now' value outside this loop and pass
that timestamp down the calls so they eventually all get assigned the
same? I don't know, but I guess at least that would require much fewer
unnecessary calls to GetCurrentTimestamp so that may be a good thing.

I don't see this as an optimization worth the effort of changing the
code. This gets called infrequently enough to matter. The same is true
for the code in RestoreSlotFromDisk().

So, overall, I think we should just reject the 0001 patch and focus on 0002.

--
With Regards,
Amit Kapila.

#5Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#3)
Re: Improve error handling for invalid slots and ensure a same 'inactive_since' time for inactive slots

On Wed, Jan 29, 2025 at 8:55 AM Peter Smith <smithpb2250@gmail.com> wrote:

======
src/backend/replication/slot.c

1.
+
+/*
+ * Raise an error based on the slot's invalidation cause.
+ */
+static void
+RaiseSlotInvalidationError(ReplicationSlot *slot)
+{
+ StringInfo err_detail = makeStringInfo();
+
+ Assert(slot->data.invalidated != RS_INVAL_NONE);
+
+ switch (slot->data.invalidated)
+ {
+ case RS_INVAL_WAL_REMOVED:
+ appendStringInfoString(err_detail, _("This slot has been invalidated
because the required WAL has been removed."));
+ break;
+
+ case RS_INVAL_HORIZON:
+ appendStringInfoString(err_detail, _("This slot has been invalidated
because the required rows have been removed."));
+ break;
+
+ case RS_INVAL_WAL_LEVEL:
+ /* translator: %s is a GUC variable name */
+ appendStringInfo(err_detail, _("This slot has been invalidated
because \"%s\" is insufficient for slot."),
+ "wal_level");
+ break;
+
+ case RS_INVAL_NONE:
+ pg_unreachable();
+ }
+
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("can no longer get changes from replication slot \"%s\"",
+    NameStr(slot->data.name)),
+ errdetail_internal("%s", err_detail->data));
+}

AFAIK the _() is a macro for gettext(). So those strings are intended
for translation, right? There's also a "/* translator: ..." comment
implying the same.

OTOH, those strings are only being used by errdetail_internal, whose
function comment says "This is exactly like errdetail() except that
strings passed to errdetail_internal are not translated...".

Isn't there some contradiction here?

Yeah, I also think so but I see some other similar usages. For
example, parse_one_reloption() uses errdetail_internal("%s",
_(optenum->detailmsg)) : 0));

There are various other places in the code where _( is used for
messages in errmsg_internal.

Perhaps the _() macro is not needed, or perhaps the
errdetail_internal() should be errdetail().

These messages should be with errdetail. We already have these
messages being displayed as errdetail in CreateDecodingContext().
Also, after this patch, shouldn't we remove the same error cases in
CreateDecodingContext(). There is already an
Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE) which
should be sufficient after this patch to ensure we didn't miss any
error cases.

--
With Regards,
Amit Kapila.

#6Amit Kapila
amit.kapila16@gmail.com
In reply to: Amit Kapila (#5)
Re: Improve error handling for invalid slots and ensure a same 'inactive_since' time for inactive slots

On Wed, Jan 29, 2025 at 3:48 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("can no longer get changes from replication slot \"%s\"",
+    NameStr(slot->data.name)),
+ errdetail_internal("%s", err_detail->data));

ERROR message appears to be specific to logical slots, it will occur
for physical slots as well, so I suggest changing from "can no longer
get changes from replication slot" to "can no longer access from
replication slot"

The errdetail should be used as we have in ReplicationSlotAlter.
errdetail("This replication slot has been invalidated due to \"%s\".",
SlotInvalidationCauses[MyReplicationSlot->data.invalidated]));

You have given the following reason in another thread for not giving a
message during acquire in ReplicationSlotAlter: "Because
ReplicationSlotAlter() already handles errors immediately after
acquiring the slot. It raises errors for invalidated slots and also
raises a different error message if the slot is a physical one. So, In
case of ALTER, I feel it is okay to acquire the slot first without
raising errors and then handle errors in the pre-defined way. Similar
immediate error handling is not available at other places."

It is better to unify the error in one place rather than at different
places. One benefit is error message contents will be the same which
is currently not the case.

BTW, in the commit message, you only mention the place where we get
the message without a patch during logical replication, it is better
to mention the timing for physical replication as well.

--
With Regards,
Amit Kapila.

#7Peter Smith
smithpb2250@gmail.com
In reply to: Amit Kapila (#4)
Re: Improve error handling for invalid slots and ensure a same 'inactive_since' time for inactive slots

On Wed, Jan 29, 2025 at 7:56 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Jan 29, 2025 at 7:50 AM Peter Smith <smithpb2250@gmail.com> wrote:

Some review comments for patch v1-0001.

======
src/backend/replication/logical/slotsync.c

ReplSlotSyncWorkerMain:

1.
+ /* Use same inactive_since time for all slots */
+ now = GetCurrentTimestamp();
+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);

for (int i = 0; i < max_replication_slots; i++)
@@ -1537,10 +1540,6 @@ update_synced_slots_inactive_since(void)
/* The slot must not be acquired by any process */
Assert(s->active_pid == 0);

- /* Use the same inactive_since time for all the slots. */
- if (now == 0)
- now = GetCurrentTimestamp();
-

AFAICT, this code was *already* ensuring to use the same
'inactive_since' even before your patch. The only difference is now
you are getting the timestamp value up-front instead of a deferred
assignment.

I find the code without a patch better as it may sometimes skip to
call GetCurrentTimestamp().

So why did you change this (and the code of RestoreSlotFromDisk) to do
the up-front assignment? Instead, you could have chosen to just leave
this code as-is, and then modify the RestoreSlotFromDisk code to match
it.

FWIW, I do prefer what you have done here because it is simpler, but I
just wondered about the choice because I think some people worry about
GetCurrentTimestamp overheads and try to avoid calling that wherever
possible.

======
src/backend/replication/slot.c

2. What about other loops?

AFAICT there are still some other loops where the inactive_since
timestamps might differ.

e.g. How about this logic in slot.c:

InvalidateObsoleteReplicationSlots:

LOOP:
for (int i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];

calls InvalidatePossiblyObsoleteSlot(...)
which calls ReplicationSlotRelease(...)
which assigns now = GetCurrentTimestamp();
then slot->inactive_since = now;
}

~

So, should you also assign a 'now' value outside this loop and pass
that timestamp down the calls so they eventually all get assigned the
same? I don't know, but I guess at least that would require much fewer
unnecessary calls to GetCurrentTimestamp so that may be a good thing.

I don't see this as an optimization worth the effort of changing the
code. This gets called infrequently enough to matter. The same is true
for the code in RestoreSlotFromDisk().

My understanding was that the purpose of this patch was not anything
to do with "optimisations" per se, but rather it was (like the
$SUBJECT says) to ensure the *same* 'active_since' timestamp value
gets assigned.

E.g the change to RestoreSlotFromDisk() was to prevent multiple slots
from all getting assigned different 'active_since' values that differ
by only 1 or 2 milliseconds because that would look strange to anyone
inspecting those 'active_since' values.

======
Kind Regards,
Peter Smith.
Fujitsu Australia

#8Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#7)
Re: Improve error handling for invalid slots and ensure a same 'inactive_since' time for inactive slots

On Thu, Jan 30, 2025 at 5:23 AM Peter Smith <smithpb2250@gmail.com> wrote:

My understanding was that the purpose of this patch was not anything
to do with "optimisations" per se, but rather it was (like the
$SUBJECT says) to ensure the *same* 'active_since' timestamp value
gets assigned.

E.g the change to RestoreSlotFromDisk() was to prevent multiple slots
from all getting assigned different 'active_since' values that differ
by only 1 or 2 milliseconds because that would look strange to anyone
inspecting those 'active_since' values.

I see your point but not sure whether it will matter in practice
unless the number of slots is large. I feel the second patch discussed
here is a clear improvement as it helps centralize the logic to give
ERRORs for invalid slots. This is useful especially when we are
thinking of adding more reasons for slot invalidation. So, we should
put our energy into getting the 0002 patch proposed here committed and
the related patch to add a new reason for slot invalidation.

--
With Regards,
Amit Kapila.

#9Nisha Moond
nisha.moond412@gmail.com
In reply to: Amit Kapila (#8)
1 attachment(s)
Re: Improve error handling for invalid slots and ensure a same 'inactive_since' time for inactive slots

On Thu, Jan 30, 2025 at 9:56 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Jan 30, 2025 at 5:23 AM Peter Smith <smithpb2250@gmail.com> wrote:

My understanding was that the purpose of this patch was not anything
to do with "optimisations" per se, but rather it was (like the
$SUBJECT says) to ensure the *same* 'active_since' timestamp value
gets assigned.

E.g the change to RestoreSlotFromDisk() was to prevent multiple slots
from all getting assigned different 'active_since' values that differ
by only 1 or 2 milliseconds because that would look strange to anyone
inspecting those 'active_since' values.

I see your point but not sure whether it will matter in practice
unless the number of slots is large. I feel the second patch discussed
here is a clear improvement as it helps centralize the logic to give
ERRORs for invalid slots. This is useful especially when we are
thinking of adding more reasons for slot invalidation. So, we should
put our energy into getting the 0002 patch proposed here committed and
the related patch to add a new reason for slot invalidation.

+1
Removed patch v1-0001. Please find the attached version 2 of 0002,
which is now v2-0001.

In v2, I have addressed all comments till now from [1]/messages/by-id/CAHut+PvS3rkwy6hr_awx1of4Se+qRsDo=jZyAjM9=bbvr2GF9g@mail.gmail.com , [2]/messages/by-id/CAA4eK1KW_30TNG65iRDBMcqqcC2wGnK+p4pbV7cLzHLTXn3-zQ@mail.gmail.com and [3]/messages/by-id/CAA4eK1KCYGMA-qWXBWRWKe+90fKw8gVMep3GuTvbRKdNG3OTMQ@mail.gmail.com.
- With the proposed errdetail message in [3]/messages/by-id/CAA4eK1KCYGMA-qWXBWRWKe+90fKw8gVMep3GuTvbRKdNG3OTMQ@mail.gmail.com, the new function
RaiseSlotInvalidationError() is no longer required.
- Updated the test files to match the new error message.

[1]: /messages/by-id/CAHut+PvS3rkwy6hr_awx1of4Se+qRsDo=jZyAjM9=bbvr2GF9g@mail.gmail.com
[2]: /messages/by-id/CAA4eK1KW_30TNG65iRDBMcqqcC2wGnK+p4pbV7cLzHLTXn3-zQ@mail.gmail.com
[3]: /messages/by-id/CAA4eK1KCYGMA-qWXBWRWKe+90fKw8gVMep3GuTvbRKdNG3OTMQ@mail.gmail.com

--
Thanks,
Nisha

Attachments:

v2-0001-Raise-Error-for-Invalid-Slots-in-ReplicationSlotA.patchapplication/octet-stream; name=v2-0001-Raise-Error-for-Invalid-Slots-in-ReplicationSlotA.patchDownload
From 357f91d3857a32ecca5c11b37d06a5b2574ffe8a Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Tue, 28 Jan 2025 10:30:08 +0530
Subject: [PATCH v2] Raise Error for Invalid Slots in ReplicationSlotAcquire()

Once a replication slot is invalidated, it cannot be reused. However, a
process could still acquire an invalid slot and fail later.

For example, if a process acquires a logical slot that was invalidated due
to wal_removed, it will eventually fail in CreateDecodingContext() when
attempting to access the removed WAL. Similarly, for physical replication
slots, even if the slot is invalidated and invalidation_reason is set to
wal_removed, the walsender does not currently check for invalidation when
starting physical replication. Instead, replication starts, and an error
is only reported later by the standby when a missing WAL is detected.

This patch improves error handling by detecting invalid slots earlier.
If error_if_invalid=true is specified when calling ReplicationSlotAcquire(),
an error will be raised immediately instead of letting the process acquire the
slot and fail later due to the invalidated slot.
---
 src/backend/replication/logical/logical.c     | 20 -------------
 .../replication/logical/logicalfuncs.c        |  2 +-
 src/backend/replication/logical/slotsync.c    |  4 +--
 src/backend/replication/slot.c                | 28 ++++++++++++-------
 src/backend/replication/slotfuncs.c           |  2 +-
 src/backend/replication/walsender.c           |  4 +--
 src/backend/utils/adt/pg_upgrade_support.c    |  2 +-
 src/include/replication/slot.h                |  3 +-
 src/test/recovery/t/019_replslot_limit.pl     |  2 +-
 .../t/035_standby_logical_decoding.pl         | 15 ++++------
 10 files changed, 34 insertions(+), 48 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 0b25efafe2..2c8cf516bd 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -542,26 +542,6 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 				errdetail("This replication slot is being synchronized from the primary server."),
 				errhint("Specify another replication slot."));
 
-	/*
-	 * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
-	 * "cannot get changes" wording in this errmsg because that'd be
-	 * confusingly ambiguous about no changes being available when called from
-	 * pg_logical_slot_get_changes_guts().
-	 */
-	if (MyReplicationSlot->data.invalidated == RS_INVAL_WAL_REMOVED)
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("can no longer get changes from replication slot \"%s\"",
-						NameStr(MyReplicationSlot->data.name)),
-				 errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
-
-	if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("can no longer get changes from replication slot \"%s\"",
-						NameStr(MyReplicationSlot->data.name)),
-				 errdetail("This slot has been invalidated because it was conflicting with recovery.")));
-
 	Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
 	Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
 
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 0148ec3678..ca53caac2f 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index f6945af1d4..be6f87f00b 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -446,7 +446,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -665,7 +665,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * pre-check to ensure that at least one of the slot properties is
 		 * changed before acquiring the slot.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index b30e0473e1..4f1196d2d1 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -535,9 +535,12 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * An error is raised if error_if_invalid is true and the slot is found to
+ * be invalid.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
 {
 	ReplicationSlot *s;
 	int			active_pid;
@@ -615,6 +618,18 @@ retry:
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = s;
 
+	/*
+	 * An error is raised if error_if_invalid is true and the slot is found to
+	 * be invalid.
+	 */
+	if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("can no longer access replication slot \"%s\"",
+					   NameStr(s->data.name)),
+				errdetail("This replication slot has been invalidated due to \"%s\".",
+						  SlotInvalidationCauses[s->data.invalidated]));
+
 	/*
 	 * The call to pgstat_acquire_replslot() protects against stats for a
 	 * different slot, from before a restart or such, being present during
@@ -785,7 +800,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -812,7 +827,7 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 	Assert(MyReplicationSlot == NULL);
 	Assert(failover || two_phase);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, true);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -820,13 +835,6 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 				errmsg("cannot use %s with a physical replication slot",
 					   "ALTER_REPLICATION_SLOT"));
 
-	if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
-		ereport(ERROR,
-				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				errmsg("cannot alter invalid replication slot \"%s\"", name),
-				errdetail("This replication slot has been invalidated due to \"%s\".",
-						  SlotInvalidationCauses[MyReplicationSlot->data.invalidated]));
-
 	if (RecoveryInProgress())
 	{
 		/*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 977146789f..8be4b8c65b 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -536,7 +536,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bac504b554..446d10c1a7 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -816,7 +816,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1434,7 +1434,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 9a10907d05..d44f8c262b 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -298,7 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, true);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index bf62b36ad0..47ebdaecb6 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -253,7 +253,8 @@ extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, const bool *failover,
 								 const bool *two_phase);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool error_if_invalid);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(bool synced_only);
 extern void ReplicationSlotSave(void);
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index ae2ad5c933..6468784b83 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -234,7 +234,7 @@ my $failed = 0;
 for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
 {
 	if ($node_standby->log_contains(
-			"requested WAL segment [0-9A-F]+ has already been removed",
+			"This replication slot has been invalidated due to \"wal_removed\".",
 			$logstart))
 	{
 		$failed = 1;
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
index 7e794c5bea..505e85d1eb 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -533,7 +533,7 @@ check_slots_conflict_reason('vacuum_full_', 'rows_removed');
 	qq[ALTER_REPLICATION_SLOT vacuum_full_inactiveslot (failover);],
 	replication => 'database');
 ok( $stderr =~
-	  /ERROR:  cannot alter invalid replication slot "vacuum_full_inactiveslot"/
+	  /ERROR:  can no longer access replication slot "vacuum_full_inactiveslot"/
 	  && $stderr =~
 	  /DETAIL:  This replication slot has been invalidated due to "rows_removed"./,
 	"invalidated slot cannot be altered");
@@ -551,8 +551,7 @@ $handle =
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"vacuum_full_activeslot\""
-);
+	"can no longer access replication slot \"vacuum_full_activeslot\"");
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -632,8 +631,7 @@ $handle =
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"row_removal_activeslot\""
-);
+	"can no longer access replication slot \"row_removal_activeslot\"");
 
 ##################################################
 # Recovery conflict: Same as Scenario 2 but on a shared catalog table
@@ -668,7 +666,7 @@ $handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout,
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"shared_row_removal_activeslot\""
+	"can no longer access replication slot \"shared_row_removal_activeslot\""
 );
 
 ##################################################
@@ -759,7 +757,7 @@ $handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"pruning_activeslot\"");
+	"can no longer access replication slot \"pruning_activeslot\"");
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -818,8 +816,7 @@ $handle =
   make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
 # as the slot has been invalidated we should not be able to read
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"wal_level_activeslot\""
-);
+	"can no longer access replication slot \"wal_level_activeslot\"");
 
 ##################################################
 # DROP DATABASE should drop its slots, including active slots.
-- 
2.34.1

#10Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Nisha Moond (#9)
RE: Improve error handling for invalid slots and ensure a same 'inactive_since' time for inactive slots

Dear Nisha,

Thanks for creating a patch!

Removed patch v1-0001. Please find the attached version 2 of 0002,
which is now v2-0001.

ISMT error_if_invalid is set to true when the slot is using and set to false when dropping.
One exception is the slot_sync, but it has already had mechanism to handle such a slot.

I confirmed RaiseSlotInvalidationError() is removed based on comments.
I ran regression tests on my env and passed.

In total the patch looks good to me.

----------
Best regards,
Haato Kuroda

#11Amit Kapila
amit.kapila16@gmail.com
In reply to: Nisha Moond (#9)
1 attachment(s)
Re: Improve error handling for invalid slots and ensure a same 'inactive_since' time for inactive slots

On Thu, Jan 30, 2025 at 12:00 PM Nisha Moond <nisha.moond412@gmail.com> wrote:

Removed patch v1-0001. Please find the attached version 2 of 0002,
which is now v2-0001.

In v2, I have addressed all comments till now from [1] , [2] and [3].
- With the proposed errdetail message in [3], the new function
RaiseSlotInvalidationError() is no longer required.

I have made minor changes in the attached. The main change is to raise
an ERROR before we broadcast to let everybody know we've modified this
slot. There is no point in broadcasting if the slot is unusable.

- Updated the test files to match the new error message.

-   /ERROR:  cannot alter invalid replication slot "vacuum_full_inactiveslot"/
+   /ERROR:  can no longer access replication slot "vacuum_full_inactiveslot"/
- "can no longer get changes from replication slot
\"shared_row_removal_activeslot\""
+ "can no longer access replication slot \"shared_row_removal_activeslot\""

With the above changes, we made the ERROR message generic which was
required to raise it from the central place. This looks reasonable to
me. The other option that occurred to me is to write it as: "can no
longer use replication slot "vacuum_full_inactiveslot" to access WAL
or modify it" but I find the one used currently in the patch better as
this is longer and may need modification in future if we start using
slot for something else.

--
With Regards,
Amit Kapila.

Attachments:

v3-0001-Raise-Error-for-Invalid-Slots-in-ReplicationSlotA.patchapplication/octet-stream; name=v3-0001-Raise-Error-for-Invalid-Slots-in-ReplicationSlotA.patchDownload
From 121e8534d5b14dcc6aa4d268c99735ea2130aae0 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Thu, 30 Jan 2025 15:50:02 +0530
Subject: [PATCH v3] Raise Error for Invalid Slots in ReplicationSlotAcquire()

Once a replication slot is invalidated, it cannot be reused. However, a
process could still acquire an invalid slot and fail later.

For example, if a process acquires a logical slot that was invalidated due
to wal_removed, it will eventually fail in CreateDecodingContext() when
attempting to access the removed WAL. Similarly, for physical replication
slots, even if the slot is invalidated and invalidation_reason is set to
wal_removed, the walsender does not currently check for invalidation when
starting physical replication. Instead, replication starts, and an error
is only reported later by the standby when a missing WAL is detected.

This patch improves error handling by detecting invalid slots earlier.
If error_if_invalid=true is specified when calling ReplicationSlotAcquire(),
an error will be raised immediately instead of letting the process acquire the
slot and fail later due to the invalidated slot.
---
 src/backend/replication/logical/logical.c     | 20 --------------
 .../replication/logical/logicalfuncs.c        |  2 +-
 src/backend/replication/logical/slotsync.c    |  4 +--
 src/backend/replication/slot.c                | 26 ++++++++++++-------
 src/backend/replication/slotfuncs.c           |  2 +-
 src/backend/replication/walsender.c           |  4 +--
 src/backend/utils/adt/pg_upgrade_support.c    |  2 +-
 src/include/replication/slot.h                |  3 ++-
 src/test/recovery/t/019_replslot_limit.pl     |  2 +-
 .../t/035_standby_logical_decoding.pl         | 15 +++++------
 10 files changed, 32 insertions(+), 48 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 0b25efafe2b..2c8cf516bde 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -542,26 +542,6 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 				errdetail("This replication slot is being synchronized from the primary server."),
 				errhint("Specify another replication slot."));
 
-	/*
-	 * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
-	 * "cannot get changes" wording in this errmsg because that'd be
-	 * confusingly ambiguous about no changes being available when called from
-	 * pg_logical_slot_get_changes_guts().
-	 */
-	if (MyReplicationSlot->data.invalidated == RS_INVAL_WAL_REMOVED)
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("can no longer get changes from replication slot \"%s\"",
-						NameStr(MyReplicationSlot->data.name)),
-				 errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
-
-	if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("can no longer get changes from replication slot \"%s\"",
-						NameStr(MyReplicationSlot->data.name)),
-				 errdetail("This slot has been invalidated because it was conflicting with recovery.")));
-
 	Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
 	Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
 
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 0148ec36788..ca53caac2f2 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index f6945af1d43..be6f87f00b2 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -446,7 +446,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -665,7 +665,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * pre-check to ensure that at least one of the slot properties is
 		 * changed before acquiring the slot.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index b30e0473e1c..5f31022bb64 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -535,9 +535,13 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * An error is raised if error_if_invalid is true and the slot is found to
+ * be invalid. It should always be set to true, except when we are temporarily
+ * acquiring the slot and doesn't intend to change it.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
 {
 	ReplicationSlot *s;
 	int			active_pid;
@@ -609,6 +613,15 @@ retry:
 	else if (!nowait)
 		ConditionVariableCancelSleep(); /* no sleep needed after all */
 
+	/* Invalid slots can't be modified or used before accessing the WAL. */
+	if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("can no longer access replication slot \"%s\"",
+					   NameStr(s->data.name)),
+				errdetail("This replication slot has been invalidated due to \"%s\".",
+						  SlotInvalidationCauses[s->data.invalidated]));
+
 	/* Let everybody know we've modified this slot */
 	ConditionVariableBroadcast(&s->active_cv);
 
@@ -785,7 +798,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -812,7 +825,7 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 	Assert(MyReplicationSlot == NULL);
 	Assert(failover || two_phase);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, true);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -820,13 +833,6 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 				errmsg("cannot use %s with a physical replication slot",
 					   "ALTER_REPLICATION_SLOT"));
 
-	if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
-		ereport(ERROR,
-				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				errmsg("cannot alter invalid replication slot \"%s\"", name),
-				errdetail("This replication slot has been invalidated due to \"%s\".",
-						  SlotInvalidationCauses[MyReplicationSlot->data.invalidated]));
-
 	if (RecoveryInProgress())
 	{
 		/*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 977146789fe..8be4b8c65b5 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -536,7 +536,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bac504b554e..446d10c1a7d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -816,7 +816,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1434,7 +1434,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 9a10907d05b..d44f8c262ba 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -298,7 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, true);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index bf62b36ad07..47ebdaecb6a 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -253,7 +253,8 @@ extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, const bool *failover,
 								 const bool *two_phase);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool error_if_invalid);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(bool synced_only);
 extern void ReplicationSlotSave(void);
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index ae2ad5c933a..6468784b83d 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -234,7 +234,7 @@ my $failed = 0;
 for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
 {
 	if ($node_standby->log_contains(
-			"requested WAL segment [0-9A-F]+ has already been removed",
+			"This replication slot has been invalidated due to \"wal_removed\".",
 			$logstart))
 	{
 		$failed = 1;
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
index 16ac9299283..358122dac32 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -533,7 +533,7 @@ check_slots_conflict_reason('vacuum_full_', 'rows_removed');
 	qq[ALTER_REPLICATION_SLOT vacuum_full_inactiveslot (failover);],
 	replication => 'database');
 ok( $stderr =~
-	  /ERROR:  cannot alter invalid replication slot "vacuum_full_inactiveslot"/
+	  /ERROR:  can no longer access replication slot "vacuum_full_inactiveslot"/
 	  && $stderr =~
 	  /DETAIL:  This replication slot has been invalidated due to "rows_removed"./,
 	"invalidated slot cannot be altered");
@@ -551,8 +551,7 @@ $handle =
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"vacuum_full_activeslot\""
-);
+	"can no longer access replication slot \"vacuum_full_activeslot\"");
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -632,8 +631,7 @@ $handle =
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"row_removal_activeslot\""
-);
+	"can no longer access replication slot \"row_removal_activeslot\"");
 
 ##################################################
 # Recovery conflict: Same as Scenario 2 but on a shared catalog table
@@ -668,7 +666,7 @@ $handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout,
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"shared_row_removal_activeslot\""
+	"can no longer access replication slot \"shared_row_removal_activeslot\""
 );
 
 ##################################################
@@ -759,7 +757,7 @@ $handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"pruning_activeslot\"");
+	"can no longer access replication slot \"pruning_activeslot\"");
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -818,8 +816,7 @@ $handle =
   make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
 # as the slot has been invalidated we should not be able to read
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"wal_level_activeslot\""
-);
+	"can no longer access replication slot \"wal_level_activeslot\"");
 
 ##################################################
 # DROP DATABASE should drops it's slots, including active slots.
-- 
2.28.0.windows.1

#12vignesh C
vignesh21@gmail.com
In reply to: Amit Kapila (#11)
1 attachment(s)
Re: Improve error handling for invalid slots and ensure a same 'inactive_since' time for inactive slots

On Thu, 30 Jan 2025 at 16:02, Amit Kapila <amit.kapila16@gmail.com> wrote:

I have made minor changes in the attached. The main change is to raise
an ERROR before we broadcast to let everybody know we've modified this
slot. There is no point in broadcasting if the slot is unusable.

- Updated the test files to match the new error message.

-   /ERROR:  cannot alter invalid replication slot "vacuum_full_inactiveslot"/
+   /ERROR:  can no longer access replication slot "vacuum_full_inactiveslot"/
- "can no longer get changes from replication slot
\"shared_row_removal_activeslot\""
+ "can no longer access replication slot \"shared_row_removal_activeslot\""

With the above changes, we made the ERROR message generic which was
required to raise it from the central place. This looks reasonable to
me. The other option that occurred to me is to write it as: "can no
longer use replication slot "vacuum_full_inactiveslot" to access WAL
or modify it" but I find the one used currently in the patch better as
this is longer and may need modification in future if we start using
slot for something else.

One of the test was failing because MyReplicationSlot was not set to s
before erroring out which resulted in:
TRAP: failed Assert("s->data.persistency == RS_TEMPORARY"), File:
"slot.c", Line: 777, PID: 280121
postgres: primary: walsender vignesh [local]
START_REPLICATION(ExceptionalCondition+0xbb)[0x578612cd9289]
postgres: primary: walsender vignesh [local]
START_REPLICATION(ReplicationSlotCleanup+0x12b)[0x578612a32348]
postgres: primary: walsender vignesh [local]
START_REPLICATION(WalSndErrorCleanup+0x5e)[0x578612a41995]

Fixed it by setting MyReplicationSlot just before erroring out so that
WalSndErrorCleanup function will have access to it. I also moved the
error reporting above as there is no need to check for slot is active
for pid in case of invalidated slots. The attached patch has the
changes for the same.

Regards,
Vignesh

Attachments:

v4-0001-Raise-Error-for-Invalid-Slots-in-ReplicationSlotA.patchtext/x-patch; charset=US-ASCII; name=v4-0001-Raise-Error-for-Invalid-Slots-in-ReplicationSlotA.patchDownload
From 507561f1442f678c5a1e8900b06604c1c84c6b47 Mon Sep 17 00:00:00 2001
From: Vignesh <vignesh21@gmail.com>
Date: Thu, 30 Jan 2025 18:15:15 +0530
Subject: [PATCH v4] Raise Error for Invalid Slots in ReplicationSlotAcquire()

Once a replication slot is invalidated, it cannot be reused. However, a
process could still acquire an invalid slot and fail later.

For example, if a process acquires a logical slot that was invalidated due
to wal_removed, it will eventually fail in CreateDecodingContext() when
attempting to access the removed WAL. Similarly, for physical replication
slots, even if the slot is invalidated and invalidation_reason is set to
wal_removed, the walsender does not currently check for invalidation when
starting physical replication. Instead, replication starts, and an error
is only reported later by the standby when a missing WAL is detected.

This patch improves error handling by detecting invalid slots earlier.
If error_if_invalid=true is specified when calling ReplicationSlotAcquire(),
an error will be raised immediately instead of letting the process acquire the
slot and fail later due to the invalidated slot.
---
 src/backend/replication/logical/logical.c     | 20 ------------
 .../replication/logical/logicalfuncs.c        |  2 +-
 src/backend/replication/logical/slotsync.c    |  4 +--
 src/backend/replication/slot.c                | 32 +++++++++++--------
 src/backend/replication/slotfuncs.c           |  2 +-
 src/backend/replication/walsender.c           |  4 +--
 src/backend/utils/adt/pg_upgrade_support.c    |  2 +-
 src/include/replication/slot.h                |  3 +-
 src/test/recovery/t/019_replslot_limit.pl     |  2 +-
 .../t/035_standby_logical_decoding.pl         | 15 ++++-----
 10 files changed, 35 insertions(+), 51 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 0b25efafe2..2c8cf516bd 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -542,26 +542,6 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 				errdetail("This replication slot is being synchronized from the primary server."),
 				errhint("Specify another replication slot."));
 
-	/*
-	 * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
-	 * "cannot get changes" wording in this errmsg because that'd be
-	 * confusingly ambiguous about no changes being available when called from
-	 * pg_logical_slot_get_changes_guts().
-	 */
-	if (MyReplicationSlot->data.invalidated == RS_INVAL_WAL_REMOVED)
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("can no longer get changes from replication slot \"%s\"",
-						NameStr(MyReplicationSlot->data.name)),
-				 errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
-
-	if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("can no longer get changes from replication slot \"%s\"",
-						NameStr(MyReplicationSlot->data.name)),
-				 errdetail("This slot has been invalidated because it was conflicting with recovery.")));
-
 	Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
 	Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
 
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 0148ec3678..ca53caac2f 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index f6945af1d4..be6f87f00b 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -446,7 +446,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -665,7 +665,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * pre-check to ensure that at least one of the slot properties is
 		 * changed before acquiring the slot.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index b30e0473e1..74f7d565f0 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -535,9 +535,13 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * An error is raised if error_if_invalid is true and the slot is found to
+ * be invalid. It should always be set to true, except when we are temporarily
+ * acquiring the slot and doesn't intend to change it.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
 {
 	ReplicationSlot *s;
 	int			active_pid;
@@ -585,6 +589,18 @@ retry:
 		active_pid = MyProcPid;
 	LWLockRelease(ReplicationSlotControlLock);
 
+	/* We made this slot active, so it's ours now. */
+	MyReplicationSlot = s;
+
+	/* Invalid slots can't be modified or used before accessing the WAL. */
+	if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("can no longer access replication slot \"%s\"",
+					   NameStr(s->data.name)),
+				errdetail("This replication slot has been invalidated due to \"%s\".",
+						  SlotInvalidationCauses[s->data.invalidated]));
+
 	/*
 	 * If we found the slot but it's already active in another process, we
 	 * wait until the owning process signals us that it's been released, or
@@ -612,9 +628,6 @@ retry:
 	/* Let everybody know we've modified this slot */
 	ConditionVariableBroadcast(&s->active_cv);
 
-	/* We made this slot active, so it's ours now. */
-	MyReplicationSlot = s;
-
 	/*
 	 * The call to pgstat_acquire_replslot() protects against stats for a
 	 * different slot, from before a restart or such, being present during
@@ -785,7 +798,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -812,7 +825,7 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 	Assert(MyReplicationSlot == NULL);
 	Assert(failover || two_phase);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, true);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -820,13 +833,6 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 				errmsg("cannot use %s with a physical replication slot",
 					   "ALTER_REPLICATION_SLOT"));
 
-	if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
-		ereport(ERROR,
-				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				errmsg("cannot alter invalid replication slot \"%s\"", name),
-				errdetail("This replication slot has been invalidated due to \"%s\".",
-						  SlotInvalidationCauses[MyReplicationSlot->data.invalidated]));
-
 	if (RecoveryInProgress())
 	{
 		/*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 977146789f..8be4b8c65b 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -536,7 +536,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bac504b554..446d10c1a7 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -816,7 +816,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1434,7 +1434,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 9a10907d05..d44f8c262b 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -298,7 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, true);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index bf62b36ad0..47ebdaecb6 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -253,7 +253,8 @@ extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, const bool *failover,
 								 const bool *two_phase);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool error_if_invalid);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(bool synced_only);
 extern void ReplicationSlotSave(void);
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index ae2ad5c933..6468784b83 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -234,7 +234,7 @@ my $failed = 0;
 for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
 {
 	if ($node_standby->log_contains(
-			"requested WAL segment [0-9A-F]+ has already been removed",
+			"This replication slot has been invalidated due to \"wal_removed\".",
 			$logstart))
 	{
 		$failed = 1;
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
index 7e794c5bea..505e85d1eb 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -533,7 +533,7 @@ check_slots_conflict_reason('vacuum_full_', 'rows_removed');
 	qq[ALTER_REPLICATION_SLOT vacuum_full_inactiveslot (failover);],
 	replication => 'database');
 ok( $stderr =~
-	  /ERROR:  cannot alter invalid replication slot "vacuum_full_inactiveslot"/
+	  /ERROR:  can no longer access replication slot "vacuum_full_inactiveslot"/
 	  && $stderr =~
 	  /DETAIL:  This replication slot has been invalidated due to "rows_removed"./,
 	"invalidated slot cannot be altered");
@@ -551,8 +551,7 @@ $handle =
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"vacuum_full_activeslot\""
-);
+	"can no longer access replication slot \"vacuum_full_activeslot\"");
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -632,8 +631,7 @@ $handle =
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"row_removal_activeslot\""
-);
+	"can no longer access replication slot \"row_removal_activeslot\"");
 
 ##################################################
 # Recovery conflict: Same as Scenario 2 but on a shared catalog table
@@ -668,7 +666,7 @@ $handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout,
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"shared_row_removal_activeslot\""
+	"can no longer access replication slot \"shared_row_removal_activeslot\""
 );
 
 ##################################################
@@ -759,7 +757,7 @@ $handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"pruning_activeslot\"");
+	"can no longer access replication slot \"pruning_activeslot\"");
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -818,8 +816,7 @@ $handle =
   make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
 # as the slot has been invalidated we should not be able to read
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"wal_level_activeslot\""
-);
+	"can no longer access replication slot \"wal_level_activeslot\"");
 
 ##################################################
 # DROP DATABASE should drop its slots, including active slots.
-- 
2.43.0

#13Peter Smith
smithpb2250@gmail.com
In reply to: vignesh C (#12)
Re: Improve error handling for invalid slots and ensure a same 'inactive_since' time for inactive slots

Some review comments for patch v4-0001.

======
src/backend/replication/logical/logical.c

CreateDecodingContext:

1.
Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);

~

These lines are adjacent to the patch change. It's inconsistent to
refer to 'MyReplicationSlot' here. Everywhere else in this function
deliberated using variable 'slot' to keep the code shorter. In passing
we should change this too.

======
src/backend/replication/slot.c

2.
+ *
+ * An error is raised if error_if_invalid is true and the slot is found to
+ * be invalid. It should always be set to true, except when we are temporarily
+ * acquiring the slot and doesn't intend to change it.
  */

typo /and doesn't intend to change it/and don't intend to change it/

On Fri, Jan 31, 2025 at 1:02 AM vignesh C <vignesh21@gmail.com> wrote:

On Thu, 30 Jan 2025 at 16:02, Amit Kapila <amit.kapila16@gmail.com> wrote:

I have made minor changes in the attached. The main change is to raise
an ERROR before we broadcast to let everybody know we've modified this
slot. There is no point in broadcasting if the slot is unusable.

- Updated the test files to match the new error message.

-   /ERROR:  cannot alter invalid replication slot "vacuum_full_inactiveslot"/
+   /ERROR:  can no longer access replication slot "vacuum_full_inactiveslot"/
- "can no longer get changes from replication slot
\"shared_row_removal_activeslot\""
+ "can no longer access replication slot \"shared_row_removal_activeslot\""

With the above changes, we made the ERROR message generic which was
required to raise it from the central place. This looks reasonable to
me. The other option that occurred to me is to write it as: "can no
longer use replication slot "vacuum_full_inactiveslot" to access WAL
or modify it" but I find the one used currently in the patch better as
this is longer and may need modification in future if we start using
slot for something else.

One of the test was failing because MyReplicationSlot was not set to s
before erroring out which resulted in:
TRAP: failed Assert("s->data.persistency == RS_TEMPORARY"), File:
"slot.c", Line: 777, PID: 280121
postgres: primary: walsender vignesh [local]
START_REPLICATION(ExceptionalCondition+0xbb)[0x578612cd9289]
postgres: primary: walsender vignesh [local]
START_REPLICATION(ReplicationSlotCleanup+0x12b)[0x578612a32348]
postgres: primary: walsender vignesh [local]
START_REPLICATION(WalSndErrorCleanup+0x5e)[0x578612a41995]

Fixed it by setting MyReplicationSlot just before erroring out so that
WalSndErrorCleanup function will have access to it. I also moved the
error reporting above as there is no need to check for slot is active
for pid in case of invalidated slots. The attached patch has the
changes for the same.

3.
+ /* We made this slot active, so it's ours now. */
+ MyReplicationSlot = s;
+
+ /* Invalid slots can't be modified or used before accessing the WAL. */
+ if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("can no longer access replication slot \"%s\"",
+    NameStr(s->data.name)),
+ errdetail("This replication slot has been invalidated due to \"%s\".",
+   SlotInvalidationCauses[s->data.invalidated]));
+

This change looked dubious. I understand that it was done to avoid a
TRAP, but I am not sure if this is the correct fix. At the point of
that ereport ERROR this slot doesn't yet belong to us because we are
still determining *can* we acquire this slot or not, so I felt it
doesn't seem correct to have the MyReplicationSlot code/comment ("it's
ours now") come before the ERROR.

Furthermore, having the code/comment "We made this slot active, so
it's ours now" ahead of the other code/comment "... but it's already
active in another process..." is contradictory.

======
Kind Regards,
Peter Smith.
Fujitsu Australia

#14Peter Smith
smithpb2250@gmail.com
In reply to: Peter Smith (#13)
1 attachment(s)
Re: Improve error handling for invalid slots and ensure a same 'inactive_since' time for inactive slots

On Fri, Jan 31, 2025 at 11:19 AM Peter Smith <smithpb2250@gmail.com> wrote:
...

On Fri, Jan 31, 2025 at 1:02 AM vignesh C <vignesh21@gmail.com> wrote:

On Thu, 30 Jan 2025 at 16:02, Amit Kapila <amit.kapila16@gmail.com> wrote:

I have made minor changes in the attached. The main change is to raise
an ERROR before we broadcast to let everybody know we've modified this
slot. There is no point in broadcasting if the slot is unusable.

- Updated the test files to match the new error message.

-   /ERROR:  cannot alter invalid replication slot "vacuum_full_inactiveslot"/
+   /ERROR:  can no longer access replication slot "vacuum_full_inactiveslot"/
- "can no longer get changes from replication slot
\"shared_row_removal_activeslot\""
+ "can no longer access replication slot \"shared_row_removal_activeslot\""

With the above changes, we made the ERROR message generic which was
required to raise it from the central place. This looks reasonable to
me. The other option that occurred to me is to write it as: "can no
longer use replication slot "vacuum_full_inactiveslot" to access WAL
or modify it" but I find the one used currently in the patch better as
this is longer and may need modification in future if we start using
slot for something else.

One of the test was failing because MyReplicationSlot was not set to s
before erroring out which resulted in:
TRAP: failed Assert("s->data.persistency == RS_TEMPORARY"), File:
"slot.c", Line: 777, PID: 280121
postgres: primary: walsender vignesh [local]
START_REPLICATION(ExceptionalCondition+0xbb)[0x578612cd9289]
postgres: primary: walsender vignesh [local]
START_REPLICATION(ReplicationSlotCleanup+0x12b)[0x578612a32348]
postgres: primary: walsender vignesh [local]
START_REPLICATION(WalSndErrorCleanup+0x5e)[0x578612a41995]

Fixed it by setting MyReplicationSlot just before erroring out so that
WalSndErrorCleanup function will have access to it. I also moved the
error reporting above as there is no need to check for slot is active
for pid in case of invalidated slots. The attached patch has the
changes for the same.

3.
+ /* We made this slot active, so it's ours now. */
+ MyReplicationSlot = s;
+
+ /* Invalid slots can't be modified or used before accessing the WAL. */
+ if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("can no longer access replication slot \"%s\"",
+    NameStr(s->data.name)),
+ errdetail("This replication slot has been invalidated due to \"%s\".",
+   SlotInvalidationCauses[s->data.invalidated]));
+

This change looked dubious. I understand that it was done to avoid a
TRAP, but I am not sure if this is the correct fix. At the point of
that ereport ERROR this slot doesn't yet belong to us because we are
still determining *can* we acquire this slot or not, so I felt it
doesn't seem correct to have the MyReplicationSlot code/comment ("it's
ours now") come before the ERROR.

Furthermore, having the code/comment "We made this slot active, so
it's ours now" ahead of the other code/comment "... but it's already
active in another process..." is contradictory.

Hi Nisha.

Further to my previous post, I was experimenting with an alternate fix
for the TAP test error reported by Vignesh.

Please see my diffs -- apply this atop the v4-0001 patch.

Basically, I have just moved the invalidation ereport to be earlier in
the code. IIUC, the TRAP might have been due to the v4-0001 still
allowing the slot active_pid being modified to MyProcPid prematurely
even when we did not end up acquiring the invalidated slot.

Note, I also added a LWLockRelease to match the existing/adjacent ereport.

Anyway, please consider it. The recovery and subscription TAP test are
working for me.

======
Kind Regards,
Peter Smith.
Fujitsu Australia

Attachments:

PS_20250131_v4_updates.txttext/plain; charset=US-ASCII; name=PS_20250131_v4_updates.txtDownload
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 74f7d56..29f0af9 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -565,6 +565,19 @@ retry:
 						name)));
 	}
 
+	/* Invalid slots can't be modified or used before accessing the WAL. */
+	if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
+	{
+		LWLockRelease(ReplicationSlotControlLock);
+
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("can no longer access replication slot \"%s\"",
+					   NameStr(s->data.name)),
+				errdetail("This replication slot has been invalidated due to \"%s\".",
+						  SlotInvalidationCauses[s->data.invalidated]));
+	}
+
 	/*
 	 * This is the slot we want; check if it's active under some other
 	 * process.  In single user mode, we don't need this check.
@@ -589,18 +602,6 @@ retry:
 		active_pid = MyProcPid;
 	LWLockRelease(ReplicationSlotControlLock);
 
-	/* We made this slot active, so it's ours now. */
-	MyReplicationSlot = s;
-
-	/* Invalid slots can't be modified or used before accessing the WAL. */
-	if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
-		ereport(ERROR,
-				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				errmsg("can no longer access replication slot \"%s\"",
-					   NameStr(s->data.name)),
-				errdetail("This replication slot has been invalidated due to \"%s\".",
-						  SlotInvalidationCauses[s->data.invalidated]));
-
 	/*
 	 * If we found the slot but it's already active in another process, we
 	 * wait until the owning process signals us that it's been released, or
@@ -628,6 +629,9 @@ retry:
 	/* Let everybody know we've modified this slot */
 	ConditionVariableBroadcast(&s->active_cv);
 
+	/* We made this slot active, so it's ours now. */
+	MyReplicationSlot = s;
+
 	/*
 	 * The call to pgstat_acquire_replslot() protects against stats for a
 	 * different slot, from before a restart or such, being present during
#15Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#14)
Re: Improve error handling for invalid slots and ensure a same 'inactive_since' time for inactive slots

On Fri, Jan 31, 2025 at 6:43 AM Peter Smith <smithpb2250@gmail.com> wrote:

Anyway, please consider it. The recovery and subscription TAP test are
working for me.

Your fix looks good to me. I have pushed the patch along with that. Thanks.

--
With Regards,
Amit Kapila.