From e3028d078b680b007126c6941a49f3335147b775 Mon Sep 17 00:00:00 2001
From: tomas <tomas>
Date: Mon, 11 Nov 2024 21:00:07 +0100
Subject: [PATCH 4/5] fix LogicalIncreaseRestartDecodingForSlot

---
 src/backend/replication/logical/logical.c | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 8fe77f8c463..40ac9f43ce2 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1638,6 +1638,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 {
 	bool		updated_lsn = false;
 	ReplicationSlot *slot;
+	bool		spin_released = false;
 
 	slot = MyReplicationSlot;
 
@@ -1673,7 +1674,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 * might never end up updating if the receiver acks too slowly. A missed
 	 * value here will just cause some extra effort after reconnecting.
 	 */
-	if (slot->candidate_restart_valid == InvalidXLogRecPtr)
+	else if (slot->candidate_restart_valid == InvalidXLogRecPtr)
 	{
 		Assert(slot->candidate_restart_valid <= current_lsn);
 		Assert(slot->candidate_restart_lsn <= restart_lsn);
@@ -1682,6 +1683,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
+		spin_released = true;
+
 		elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
 			 LSN_FORMAT_ARGS(restart_lsn),
 			 LSN_FORMAT_ARGS(current_lsn));
@@ -1697,6 +1700,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 		confirmed_flush = slot->data.confirmed_flush;
 		SpinLockRelease(&slot->mutex);
 
+		spin_released = true;
+
 		elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
 			 LSN_FORMAT_ARGS(restart_lsn),
 			 LSN_FORMAT_ARGS(current_lsn),
@@ -1705,6 +1710,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 			 LSN_FORMAT_ARGS(confirmed_flush));
 	}
 
+	if (!spin_released)
+		SpinLockRelease(&slot->mutex);
+
 	/* candidates are already valid with the current flush position, apply */
 	if (updated_lsn)
 		LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
-- 
2.39.5

