From 91705833be615690388e41d176c16b2c294bb0cf Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Date: Thu, 15 Jul 2021 15:52:48 +0900
Subject: [PATCH v1] Advance old-segment horizon properly after slot
 invalidation

When some slots are invalidated due to the limit of
max_slot_wal_keep_size, the old segment horizon should advance to
within the limit.

Previously that advancement didn't happen.  Hence especially if no
valid slots remain after a slot-invalidation, the horizon no longer
advances and the live WAL files no longer be less than
max_slot_wal_keep_size thereafter.

Backpatch to 14 where the feature was introduced.
---
 src/backend/access/transam/xlog.c         | 24 +++++++++++++++++++++--
 src/backend/replication/slot.c            | 20 ++++++++++++++++---
 src/include/replication/slot.h            |  2 +-
 src/test/recovery/t/019_replslot_limit.pl | 11 ++++++++++-
 4 files changed, 50 insertions(+), 7 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c7c928f50b..6500ee5b11 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9300,7 +9300,17 @@ CreateCheckPoint(int flags)
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 	KeepLogSeg(recptr, &_logSegNo);
-	InvalidateObsoleteReplicationSlots(_logSegNo);
+	if (InvalidateObsoleteReplicationSlots(_logSegNo))
+	{
+		/*
+		 * Some slots have been gone, recalculate the old-segment horizon
+		 * excluding the invalidated slots.
+		 */
+		ReplicationSlotsComputeRequiredLSN();
+		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
+		KeepLogSeg(recptr, &_logSegNo);
+	}
+
 	_logSegNo--;
 	RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
 
@@ -9640,7 +9650,17 @@ CreateRestartPoint(int flags)
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 	endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 	KeepLogSeg(endptr, &_logSegNo);
-	InvalidateObsoleteReplicationSlots(_logSegNo);
+	if (InvalidateObsoleteReplicationSlots(_logSegNo))
+	{
+		/*
+		 * Some slots have been gone, recalculate the old-segment horizon
+		 * excluding the invalidated slots.
+		 */
+		ReplicationSlotsComputeRequiredLSN();
+		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
+		KeepLogSeg(endptr, &_logSegNo);
+	}
+
 	_logSegNo--;
 
 	/*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 33b85d86cc..b8eddcab53 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1143,11 +1143,14 @@ ReplicationSlotReserveWal(void)
  * Returns whether ReplicationSlotControlLock was released in the interim (and
  * in that case we're not holding the lock at return, otherwise we are).
  *
+ * Sets *invalidated if the slot was invalidated but never unsets otherwise.
+ *
  * This is inherently racy, because we release the LWLock
  * for syscalls, so caller must restart if we return true.
  */
 static bool
-InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
+InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
+							   bool *invalidated)
 {
 	int			last_signaled_pid = 0;
 	bool		released_lock = false;
@@ -1193,6 +1196,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
 		slotname = s->data.name;
 		active_pid = s->active_pid;
 
+		/*
+		 * Inform the caller that this slot have got invalidated. We could just
+		 * assign true here but make it clear what we are intending here.
+		 */
+		*invalidated |= true;
+
 		/*
 		 * If the slot can be acquired, do so and mark it invalidated
 		 * immediately.  Otherwise we'll signal the owning process, below, and
@@ -1291,12 +1300,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
  * Mark any slot that points to an LSN older than the given segment
  * as invalid; it requires WAL that's about to be removed.
  *
+ * Returns true when any slot have got invalidated.
+ *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
-void
+bool
 InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
 {
 	XLogRecPtr	oldestLSN;
+	bool		invalidated = false;
 
 	XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
 
@@ -1309,13 +1321,15 @@ restart:
 		if (!s->in_use)
 			continue;
 
-		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN))
+		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
 		{
 			/* if the lock was released, start from scratch */
 			goto restart;
 		}
 	}
 	LWLockRelease(ReplicationSlotControlLock);
+
+	return invalidated;
 }
 
 /*
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 34d95eac8e..e32fb85db8 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -213,7 +213,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
-extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot);
 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index d4b9ff705f..b76ed0fdac 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -11,7 +11,7 @@ use TestLib;
 use PostgresNode;
 
 use File::Path qw(rmtree);
-use Test::More tests => $TestLib::windows_os ? 14 : 18;
+use Test::More tests => $TestLib::windows_os ? 15 : 19;
 use Time::HiRes qw(usleep);
 
 $ENV{PGDATABASE} = 'postgres';
@@ -192,6 +192,15 @@ $result = $node_primary->safe_psql('postgres',
 is($result, "rep1|f|t|lost|",
 	'check that the slot became inactive and the state "lost" persists');
 
+# The invalidated slot shouldn't retreat the old-segment horizon
+my $redoseg = $node_primary->safe_psql('postgres',
+	"select pg_walfile_name(lsn) from pg_create_physical_replication_slot('xx', true)");
+my $oldestseg = $node_primary->safe_psql('postgres',
+	"select pg_ls_dir from pg_ls_dir('pg_wal') where pg_ls_dir like '0%' order by pg_ls_dir limit 1");
+
+is($oldestseg, $redoseg,
+   "check if slot-invalidation advances the old segment horizon");
+
 # The standby no longer can connect to the primary
 $logstart = get_log_size($node_standby);
 $node_standby->start;
-- 
2.27.0

