From a81c50bbb8b7384a48752316468c59ab5faf9114 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@vondra.me>
Date: Sat, 9 Nov 2024 12:14:19 +0100
Subject: [PATCH 1/2] restart_lsn backwards move - fix and asserts

---
 src/backend/replication/logical/logical.c | 13 ++++++++++++-
 src/backend/replication/slot.c            |  1 +
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3fe1774a1e9..28d70e732d5 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -391,6 +391,7 @@ CreateInitDecodingContext(const char *plugin,
 	else
 	{
 		SpinLockAcquire(&slot->mutex);
+		Assert(slot->data.restart_lsn <= restart_lsn);
 		slot->data.restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 	}
@@ -1762,6 +1763,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 {
 	bool		updated_lsn = false;
 	ReplicationSlot *slot;
+	bool		spin_released = false;
 
 	slot = MyReplicationSlot;
 
@@ -1794,12 +1796,14 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 * might never end up updating if the receiver acks too slowly. A missed
 	 * value here will just cause some extra effort after reconnecting.
 	 */
-	if (slot->candidate_restart_valid == InvalidXLogRecPtr)
+	else if (slot->candidate_restart_valid == InvalidXLogRecPtr)
 	{
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
+		spin_released = true;
+
 		elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
 			 LSN_FORMAT_ARGS(restart_lsn),
 			 LSN_FORMAT_ARGS(current_lsn));
@@ -1815,6 +1819,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 		confirmed_flush = slot->data.confirmed_flush;
 		SpinLockRelease(&slot->mutex);
 
+		spin_released = true;
+
 		elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
 			 LSN_FORMAT_ARGS(restart_lsn),
 			 LSN_FORMAT_ARGS(current_lsn),
@@ -1823,6 +1829,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 			 LSN_FORMAT_ARGS(confirmed_flush));
 	}
 
+	if (!spin_released)
+		SpinLockRelease(&slot->mutex);
+
 	/* candidates are already valid with the current flush position, apply */
 	if (updated_lsn)
 		LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
@@ -1875,6 +1884,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		{
 			Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
 
+			Assert(MyReplicationSlot->data.restart_lsn <= MyReplicationSlot->candidate_restart_lsn);
+
 			MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
 			MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
 			MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 6828100cf1a..0f38ccc8353 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1468,6 +1468,7 @@ ReplicationSlotReserveWal(void)
 			restart_lsn = GetXLogInsertRecPtr();
 
 		SpinLockAcquire(&slot->mutex);
+		Assert(slot->data.restart_lsn <= restart_lsn);
 		slot->data.restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
-- 
2.47.0

