From 82d84eea9055e4ee2d7467c284782397287611b2 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Thu, 22 Jan 2026 14:25:39 +0200
Subject: [PATCH 3/5] Use ProcNumber rather than pid in ReplicationSlot

This helps the next commit
---
 src/backend/replication/logical/slotsync.c |  2 +-
 src/backend/replication/slot.c             | 71 ++++++++++++----------
 src/backend/replication/slotfuncs.c        | 13 ++--
 src/include/replication/slot.h             |  7 ++-
 4 files changed, 53 insertions(+), 40 deletions(-)

diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 73fc51ea53e..681b78f951a 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -1758,7 +1758,7 @@ update_synced_slots_inactive_since(void)
 			Assert(SlotIsLogical(s));
 
 			/* The slot must not be acquired by any process */
-			Assert(s->active_pid == 0);
+			Assert(s->active_proc == INVALID_PROC_NUMBER);
 
 			/* Use the same inactive_since time for all the slots. */
 			if (now == 0)
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 4c47261c7f9..5c6f704e029 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -226,6 +226,7 @@ ReplicationSlotsShmemInit(void)
 			ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
 
 			/* everything else is zeroed by the memset above */
+			slot->active_proc = INVALID_PROC_NUMBER;
 			SpinLockInit(&slot->mutex);
 			LWLockInitialize(&slot->io_in_progress_lock,
 							 LWTRANCHE_REPLICATION_SLOT_IO);
@@ -461,7 +462,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	 * be doing that.  So it's safe to initialize the slot.
 	 */
 	Assert(!slot->in_use);
-	Assert(slot->active_pid == 0);
+	Assert(slot->active_proc == INVALID_PROC_NUMBER);
 
 	/* first initialize persistent data */
 	memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
@@ -505,8 +506,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 
 	/* We can now mark the slot active, and that makes it our slot. */
 	SpinLockAcquire(&slot->mutex);
-	Assert(slot->active_pid == 0);
-	slot->active_pid = MyProcPid;
+	Assert(slot->active_proc == INVALID_PROC_NUMBER);
+	slot->active_proc = MyProcNumber;
 	SpinLockRelease(&slot->mutex);
 	MyReplicationSlot = slot;
 
@@ -620,7 +621,8 @@ void
 ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
 {
 	ReplicationSlot *s;
-	int			active_pid;
+	ProcNumber	active_proc;
+	pid_t		active_pid;
 
 	Assert(name != NULL);
 
@@ -672,17 +674,18 @@ retry:
 		 * to inactive_since in InvalidatePossiblyObsoleteSlot.
 		 */
 		SpinLockAcquire(&s->mutex);
-		if (s->active_pid == 0)
-			s->active_pid = MyProcPid;
-		active_pid = s->active_pid;
+		if (s->active_proc == INVALID_PROC_NUMBER)
+			s->active_proc = MyProcNumber;
+		active_proc = s->active_proc;
 		ReplicationSlotSetInactiveSince(s, 0, false);
 		SpinLockRelease(&s->mutex);
 	}
 	else
 	{
-		s->active_pid = active_pid = MyProcPid;
+		s->active_proc = active_proc = MyProcNumber;
 		ReplicationSlotSetInactiveSince(s, 0, true);
 	}
+	active_pid = GetPGProcByNumber(active_proc)->pid;
 	LWLockRelease(ReplicationSlotControlLock);
 
 	/*
@@ -690,7 +693,7 @@ retry:
 	 * wait until the owning process signals us that it's been released, or
 	 * error out.
 	 */
-	if (active_pid != MyProcPid)
+	if (active_proc != MyProcNumber)
 	{
 		if (!nowait)
 		{
@@ -762,7 +765,7 @@ ReplicationSlotRelease(void)
 	bool		is_logical;
 	TimestampTz now = 0;
 
-	Assert(slot != NULL && slot->active_pid != 0);
+	Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
 
 	is_logical = SlotIsLogical(slot);
 
@@ -815,7 +818,7 @@ ReplicationSlotRelease(void)
 		 * disconnecting, but wake up others that may be waiting for it.
 		 */
 		SpinLockAcquire(&slot->mutex);
-		slot->active_pid = 0;
+		slot->active_proc = INVALID_PROC_NUMBER;
 		ReplicationSlotSetInactiveSince(slot, now, false);
 		SpinLockRelease(&slot->mutex);
 		ConditionVariableBroadcast(&slot->active_cv);
@@ -877,7 +880,7 @@ restart:
 		found_valid_logicalslot |=
 			(SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
 
-		if ((s->active_pid == MyProcPid &&
+		if ((s->active_proc == MyProcNumber &&
 			 (!synced_only || s->data.synced)))
 		{
 			Assert(s->data.persistency == RS_TEMPORARY);
@@ -1088,7 +1091,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 		bool		fail_softly = slot->data.persistency != RS_PERSISTENT;
 
 		SpinLockAcquire(&slot->mutex);
-		slot->active_pid = 0;
+		slot->active_proc = INVALID_PROC_NUMBER;
 		SpinLockRelease(&slot->mutex);
 
 		/* wake up anyone waiting on this slot */
@@ -1110,7 +1113,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 	 * Also wake up processes waiting for it.
 	 */
 	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
-	slot->active_pid = 0;
+	slot->active_proc = INVALID_PROC_NUMBER;
 	slot->in_use = false;
 	LWLockRelease(ReplicationSlotControlLock);
 	ConditionVariableBroadcast(&slot->active_cv);
@@ -1476,7 +1479,7 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
 		/* count slots with spinlock held */
 		SpinLockAcquire(&s->mutex);
 		(*nslots)++;
-		if (s->active_pid != 0)
+		if (s->active_proc != INVALID_PROC_NUMBER)
 			(*nactive)++;
 		SpinLockRelease(&s->mutex);
 	}
@@ -1520,7 +1523,7 @@ restart:
 	{
 		ReplicationSlot *s;
 		char	   *slotname;
-		int			active_pid;
+		ProcNumber	active_proc;
 
 		s = &ReplicationSlotCtl->replication_slots[i];
 
@@ -1550,11 +1553,11 @@ restart:
 		SpinLockAcquire(&s->mutex);
 		/* can't change while ReplicationSlotControlLock is held */
 		slotname = NameStr(s->data.name);
-		active_pid = s->active_pid;
-		if (active_pid == 0)
+		active_proc = s->active_proc;
+		if (active_proc == INVALID_PROC_NUMBER)
 		{
 			MyReplicationSlot = s;
-			s->active_pid = MyProcPid;
+			s->active_proc = MyProcNumber;
 		}
 		SpinLockRelease(&s->mutex);
 
@@ -1579,11 +1582,11 @@ restart:
 		 * XXX: We can consider shutting down the slot sync worker before
 		 * trying to drop synced temporary slots here.
 		 */
-		if (active_pid)
+		if (active_proc != INVALID_PROC_NUMBER)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_IN_USE),
 					 errmsg("replication slot \"%s\" is active for PID %d",
-							slotname, active_pid)));
+							slotname, GetPGProcByNumber(active_proc)->pid)));
 
 		/*
 		 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
@@ -1974,7 +1977,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
 	{
 		XLogRecPtr	restart_lsn;
 		NameData	slotname;
-		int			active_pid = 0;
+		ProcNumber	active_proc;
+		pid_t		active_pid = 0;
 		ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
 		TimestampTz now = 0;
 		long		slot_idle_secs = 0;
@@ -2027,7 +2031,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
 		}
 
 		slotname = s->data.name;
-		active_pid = s->active_pid;
+		active_proc = s->active_proc;
 
 		/*
 		 * If the slot can be acquired, do so and mark it invalidated
@@ -2039,10 +2043,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
 		 * is terminated. So, the inactive slot can only be invalidated
 		 * immediately without being terminated.
 		 */
-		if (active_pid == 0)
+		if (active_proc == INVALID_PROC_NUMBER)
 		{
 			MyReplicationSlot = s;
-			s->active_pid = MyProcPid;
+			s->active_proc = MyProcNumber;
 			s->data.invalidated = invalidation_cause;
 
 			/*
@@ -2058,6 +2062,11 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
 			/* Let caller know */
 			invalidated = true;
 		}
+		else
+		{
+			active_pid = GetPGProcByNumber(active_proc)->pid;
+			Assert(active_pid != 0);
+		}
 
 		SpinLockRelease(&s->mutex);
 
@@ -2073,7 +2082,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
 								&slot_idle_usecs);
 		}
 
-		if (active_pid != 0)
+		if (active_proc != INVALID_PROC_NUMBER)
 		{
 			/*
 			 * Prepare the sleep on the slot's condition variable before
@@ -2105,9 +2114,9 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
 									   slot_idle_secs);
 
 				if (MyBackendType == B_STARTUP)
-					(void) SendProcSignal(active_pid,
-										  PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
-										  INVALID_PROC_NUMBER);
+					SendProcSignal(active_pid,
+								   PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
+								   active_proc);
 				else
 					(void) kill(active_pid, SIGTERM);
 
@@ -2875,7 +2884,7 @@ RestoreSlotFromDisk(const char *name)
 		slot->candidate_restart_valid = InvalidXLogRecPtr;
 
 		slot->in_use = true;
-		slot->active_pid = 0;
+		slot->active_proc = INVALID_PROC_NUMBER;
 
 		/*
 		 * Set the time since the slot has become inactive after loading the
@@ -3158,7 +3167,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 		SpinLockAcquire(&slot->mutex);
 		restart_lsn = slot->data.restart_lsn;
 		invalidated = slot->data.invalidated != RS_INVAL_NONE;
-		inactive = slot->active_pid == 0;
+		inactive = slot->active_proc == INVALID_PROC_NUMBER;
 		SpinLockRelease(&slot->mutex);
 
 		if (invalidated)
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 1ed2d80c2d2..9f5e4f998fe 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -20,6 +20,7 @@
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "replication/slotsync.h"
+#include "storage/proc.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/pg_lsn.h"
@@ -309,10 +310,10 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 			values[i++] = ObjectIdGetDatum(slot_contents.data.database);
 
 		values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
-		values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
+		values[i++] = BoolGetDatum(slot_contents.active_proc != INVALID_PROC_NUMBER);
 
-		if (slot_contents.active_pid != 0)
-			values[i++] = Int32GetDatum(slot_contents.active_pid);
+		if (slot_contents.active_proc != INVALID_PROC_NUMBER)
+			values[i++] = Int32GetDatum(GetPGProcByNumber(slot_contents.active_proc)->pid);
 		else
 			nulls[i++] = true;
 
@@ -377,13 +378,13 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 				 */
 				if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
 				{
-					int			pid;
+					ProcNumber	procno;
 
 					SpinLockAcquire(&slot->mutex);
-					pid = slot->active_pid;
+					procno = slot->active_proc;
 					slot_contents.data.restart_lsn = slot->data.restart_lsn;
 					SpinLockRelease(&slot->mutex);
-					if (pid != 0)
+					if (procno != INVALID_PROC_NUMBER)
 					{
 						values[i++] = CStringGetTextDatum("unreserved");
 						walstate = WALAVAIL_UNRESERVED;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index f465e430cc6..72f8be629f3 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -185,8 +185,11 @@ typedef struct ReplicationSlot
 	/* is this slot defined */
 	bool		in_use;
 
-	/* Who is streaming out changes for this slot? 0 in unused slots. */
-	pid_t		active_pid;
+	/*
+	 * Who is streaming out changes for this slot? INVALID_PROC_NUMBER in
+	 * unused slots.
+	 */
+	ProcNumber	active_proc;
 
 	/* any outstanding modifications? */
 	bool		just_dirtied;
-- 
2.47.3

