From 933c1fb59ee9f9f99601c25f8effc0c87924d39f Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Mon, 6 Apr 2020 14:13:34 -0400
Subject: [PATCH] Further changes

---
 src/backend/access/transam/xlog.c         | 15 +++++-----
 src/backend/replication/logical/logical.c |  3 +-
 src/backend/replication/slot.c            | 36 ++++++++++++++++++-----
 src/backend/replication/slotfuncs.c       |  4 +++
 src/backend/replication/walsender.c       |  3 +-
 5 files changed, 44 insertions(+), 17 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index bcea659fa0..bceddd6c05 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9515,20 +9515,21 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 	XLByteToSeg(recptr, currSegNo, wal_segment_size);
 	segno = currSegNo;
 
-	keep = XLogGetReplicationSlotMinimumLSN();
-
 	/*
-	 * Calculate how many segments are kept by slots first.
+	 * Calculate how many segments are kept by slots first, adjusting
+	 * for max_slot_wal_keep_size.
 	 */
-	/* Cap keepSegs by max_slot_wal_keep_size */
+	keep = XLogGetReplicationSlotMinimumLSN();
 	if (keep != InvalidXLogRecPtr)
 	{
 		XLByteToSeg(keep, segno, wal_segment_size);
 
-		/* Reduce it if slots already reserves too many. */
+		/* Cap by max_slot_wal_keep_size ... */
 		if (max_slot_wal_keep_size_mb >= 0)
 		{
-			XLogRecPtr slot_keep_segs =
+			XLogRecPtr	slot_keep_segs;
+
+			slot_keep_segs =
 				ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
 
 			if (currSegNo - segno > slot_keep_segs)
@@ -9536,7 +9537,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 		}
 	}
 
-	/* but, keep at least wal_keep_segments segments if any */
+	/* but, keep at least wal_keep_segments if that's set */
 	if (wal_keep_segments > 0 && currSegNo - segno < wal_keep_segments)
 	{
 		/* avoid underflow, don't go below 1 */
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5adf253583..833a07a3f2 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1036,7 +1036,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		}
 
 		if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr &&
-			MyReplicationSlot->candidate_restart_valid <= lsn)
+			MyReplicationSlot->candidate_restart_valid <= lsn &&
+			MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr)
 		{
 			Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index dd7a725641..52dbcf54a9 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -743,6 +743,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 
 /*
  * Compute the oldest restart LSN across all slots and inform xlog module.
+ *
+ * Note: while max_slot_wal_keep_size is theoretically relevant for this
+ * purpose, we don't try to account for that, because this module doesn't
+ * know what to compare against.
  */
 void
 ReplicationSlotsComputeRequiredLSN(void)
@@ -818,6 +822,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
 		restart_lsn = s->data.restart_lsn;
 		SpinLockRelease(&s->mutex);
 
+		if (restart_lsn == InvalidXLogRecPtr)
+			continue;
+
 		if (result == InvalidXLogRecPtr ||
 			restart_lsn < result)
 			result = restart_lsn;
@@ -1083,25 +1090,38 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
 	for (int i = 0; i < max_replication_slots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+		XLogRecPtr	restart_lsn = InvalidXLogRecPtr;
 
-		if (!s->in_use || s->data.restart_lsn == InvalidXLogRecPtr)
+		if (!s->in_use)
 			continue;
 
+		SpinLockAcquire(&s->mutex);
+		if (s->data.restart_lsn == InvalidXLogRecPtr)
+		{
+			SpinLockRelease(&s->mutex);
+			continue;
+		}
+
 		if (s->data.restart_lsn < oldestLSN)
 		{
-			elog(LOG, "slot %s is invalidated at %X/%X due to exceeding max_slot_wal_keep_size",
-				 s->data.name.data,
-				 (uint32) (s->data.restart_lsn >> 32),
-				 (uint32) s->data.restart_lsn);
 			/* mark this slot as invalid */
-			SpinLockAcquire(&s->mutex);
 			s->data.restart_lsn = InvalidXLogRecPtr;
 
-			/* remember PID for killing, if active*/
+			/* remember restart_lsn for logging */
+			restart_lsn = s->data.restart_lsn;
+			/* remember PID for killing, if active */
 			if (s->active_pid != 0)
 				pids = lappend_int(pids, s->active_pid);
-			SpinLockRelease(&s->mutex);
 		}
+		SpinLockRelease(&s->mutex);
+
+		if (restart_lsn != InvalidXLogRecPtr)
+			ereport(LOG,
+					errmsg("slot \"%s\" is invalidated at %X/%X due to exceeding max_slot_wal_keep_size",
+						   NameStr(s->data.name),
+						   (uint32) (restart_lsn >> 32),
+						   (uint32) restart_lsn));
+
 	}
 	LWLockRelease(ReplicationSlotControlLock);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index dc38b475c5..91a5d0f290 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -413,6 +413,8 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
 	XLogRecPtr	startlsn = MyReplicationSlot->data.restart_lsn;
 	XLogRecPtr	retlsn = startlsn;
 
+	Assert(moveto != InvalidXLogRecPtr);
+
 	if (startlsn < moveto)
 	{
 		SpinLockAcquire(&MyReplicationSlot->mutex);
@@ -450,6 +452,8 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 	ResourceOwner old_resowner = CurrentResourceOwner;
 	XLogRecPtr	retlsn;
 
+	Assert(moveto != InvalidXLogRecPtr);
+
 	PG_TRY();
 	{
 		/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9e5611574c..203bdfac6d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1812,7 +1812,8 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 
 	Assert(lsn != InvalidXLogRecPtr);
 	SpinLockAcquire(&slot->mutex);
-	if (slot->data.restart_lsn != lsn)
+	if (slot->data.restart_lsn != InvalidXLogRecPtr &&
+		slot->data.restart_lsn != lsn)
 	{
 		changed = true;
 		slot->data.restart_lsn = lsn;
-- 
2.20.1

