logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

Started by Tomas Vondraabout 1 year ago42 messages
#1Tomas Vondra
tomas@vondra.me
5 attachment(s)

Hi,

I've been investigating some issues reported by users, related to
logical replication unexpectedly breaking with messages like:

LOG: invalidating slot "s" because its restart_lsn X/Y exceeds
max_slot_wal_keep_size

which is pretty confusing, because the system has that GUC set to -1 (so
disabled, there should be no limit). Or a message like this:

ERROR: requested WAL segment 000...0AA has already been removed

which is a bit less confusing, but still puzzling because replication
slots are meant to prevent exactly this.

I speculated there's some sort of rare race condition, in how we advance
the slot LSN values. I didn't know where to start, so I gave up on
trying to understand the whole complex code. Instead, I wrote a simple
stress test that

1) sets up a cluster (primary + 1+ logical replicas)

2) runs pgbench on the primary, checks replicas/slots

3) randomly restarts the nodes (both primary/replicas) with either fast
and immediate mode, with random short pauses

4) runs checkpoints in tight while loop

This is based on the observations that in past reports the issues seem
to happen only with logical replication, shortly after reconnect (e.g.
after connection reset etc.). And the slots are invalidated / WAL
removed during a checkpoint, so frequent checkpoints should make it
easier to hit ...

Attached is a couple scripts running this - it's not particularly pretty
and may need some tweak to make it work on your machine (run.sh is the
script to run).

And unfortunately, this started to fail pretty quick. The short story is
that it's not difficult to get into a situation where restart_lsn for a
slot moves backwards, so something like this can happen:

1) slot restart_lsn moves forward to LSN A

2) checkpoint happens, updates min required LSN for slots, recycles
segments it considers unnecessary (up to LSN A)

3) slot restart_lsn moves backwards to LSN B (where B < A)

4) kaboom

This would perfectly explain all the symptoms I mentioned earlier. The
max_slot_wal_keep_size reference is confusing, but AFAIK it's just based
on (reasonable) belief that LSN can't move backwards, and so the only
reason for restart_lsn being before min required LSN is that this GUC
kicked in. But the assumption does not hold :-(

Now, why/how can this happen?

I kept adding a elog(LOG) messages to all places updating LSNs for a
slot, and asserts to fail if data.restart_lsn moves backwards. See the
attached 0001 patch. An example for a failure (for the walsended backend
that triggered the assert) looks like this:

344.139 LOG: starting logical decoding for slot "s1"

344.139 DETAIL: Streaming transactions committing after 1/E97EAC30,
reading WAL from 1/E96FB4D0.

344.140 LOG: logical decoding found consistent point at 1/E96FB4D0

344.140 DETAIL: Logical decoding will begin using saved snapshot.

344.140 LOG: LogicalConfirmReceivedLocation 1/E9865398

344.140 LOG: LogicalConfirmReceivedLocation updating
data.restart_lsn to 1/E979D4C8 (from 1/E96FB4D0)
candidate_restart_valid 0/0 (from 1/E9865398)
candidate_restart_lsn 0/0 (from 1/E979D4C8)

344.145 LOG: LogicalIncreaseRestartDecodingForSlot
restart_decoding_lsn 1/E96FB4D0

344.145 LOG: LogicalIncreaseRestartDecodingForSlot s1
candidate_restart_valid_lsn 1/E979D4C8 (0/0)
candidate_restart_lsn 1/E96FB4D0 (0/0)

344.147 LOG: LogicalIncreaseRestartDecodingForSlot
restart_decoding_lsn 1/E979D4C8

344.156 LOG: LogicalIncreaseXminForSlot
candidate_catalog_xmin 30699
candidate_xmin_lsn 1/E993AD68 (0/0)
...
344.235 LOG: LogicalIncreaseRestartDecodingForSlot
restart_decoding_lsn 1/E9F33AF8

344.240 LOG: LogicalConfirmReceivedLocation 1/E9DCCD78

344.240 LOG: LogicalConfirmReceivedLocation updating
data.restart_lsn to 1/E96FB4D0 (from 1/E979D4C8)
candidate_restart_valid 0/0 (from 1/E979D4C8)
candidate_restart_lsn 0/0 (from 1/E96FB4D0)

345.536 LOG: server process (PID 2518127) was terminated by
signal 6: Aborted

We start decoding at 1/E96FB4D0, and right after startup we get a
confirmation, and LogicalConfirmReceivedLocation updates restart_lsn to
1/E979D4C8.

But then LogicalIncreaseRestartDecodingForSlot comes along, and stores
the restart_decoding_lsn 1/E96FB4D0 (which is the initial restart_lsn)
into candidate_restart_lsn.

And then a little bit later we get another confirmation, we call
LogicalConfirmReceivedLocation which propagates candidate_restart_lsn
into data.restart_lsn.

This is how restart_lsn moves backwards, causing issues. I've reproduced
this on PG14 and current master, but I believe the issue exists since
the introduction of logical replication in 9.4 :-(

I'm not claiming this is the only way how this can happen, but all the
cases I've seen in my stress testing look like this. Moreover, I'm not
claiming this is the only LSN field that can move backwards like this.
It seems to me various other candidate_ fields have the same issue, but
may have consequences other than discarding "unnecessary" WAL.

I've been removed of this [1]/messages/by-id/Yz2hivgyjS1RfMKs@depesz.com thread from 2022. I'm 99% sure it's the
same issue - it happened shortly after a reconnect, etc. And it seems to
me Masahiko-san was about the causes in [2]/messages/by-id/CAD21AoBVhYnGBuW_o=wEGgTp01qiHNAx1a14b1X9kFXmuBe=sg@mail.gmail.com. I don't think the fix
suggested in that message (changing the branch to "else if") can work,
though. At least it did not really help in my testing.

And I'm not sure it'd fix all the issues - it only affects restart_lsn,
but AFAICS the same issue (LSNs moving backwards) can happen for the
other LSN slot field (candidate_xmin_lsn).

I don't see how the backwards move could be valid for any of those
fields, and for "propagating" the candidate values into restart_lsn.

But there's no protection against the backward moves - so either it's
considered to be OK (which seems incorrect), or it was not expected to
happen in practice.

The 0001 patch adds an assert preventing those backward moves on all the
fields. This means it fails with ABORT earlier, even before a checkpoint
gets a chance to invalidate the slot or remove the segment.

0002 part replaces the asserts with elog(LOG), and instead tweaks the
updates to do Max(old,new) to prevent the backward moves. With this I'm
no longer able to reproduce the issue - and there's a lot of LOG
messages about the (prevented) backward moves.

Unfortunately, I'm not sure this is quite correct. Because consider for
example this:

slot->candidate_catalog_xmin = xmin;
slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn,
current_lsn);

I suspect this means the fields could get "out of sync". Not sure what
could break because of this.

I have to admit the "protocol" for the candidate fields (and how the
values propagate) is not very clear to me. And AFAICS it's not really
described/explained anywhere :-(

Note: While working on this, I realized PG14 and PG15 needs the fix
eb27d3dc8887, which was backpatched only to 16+. But I hit that on 14
too during testing. I already pinged Daniel about this, but cherry-pick
that before testing before he has time for that.

regards

[1]: /messages/by-id/Yz2hivgyjS1RfMKs@depesz.com

[2]: /messages/by-id/CAD21AoBVhYnGBuW_o=wEGgTp01qiHNAx1a14b1X9kFXmuBe=sg@mail.gmail.com
/messages/by-id/CAD21AoBVhYnGBuW_o=wEGgTp01qiHNAx1a14b1X9kFXmuBe=sg@mail.gmail.com

--
Tomas Vondra

Attachments:

logrep-script.tgzapplication/x-compressed-tar; name=logrep-script.tgzDownload
vmaster-0001-WIP-add-elog-messages-and-asserts.patchtext/x-patch; charset=UTF-8; name=vmaster-0001-WIP-add-elog-messages-and-asserts.patchDownload
From 8cd1b0472f1189be9ba19455df50fd1cc1f730e5 Mon Sep 17 00:00:00 2001
From: tomas <tomas>
Date: Wed, 6 Nov 2024 12:41:25 +0100
Subject: [PATCH vnew 1/2] WIP: add elog messages and asserts

shows how the LSNs move, protects against move backwards
---
 src/backend/access/transam/xlog.c           | 34 ++++++++++++
 src/backend/replication/logical/logical.c   | 60 +++++++++++++++++++++
 src/backend/replication/logical/snapbuild.c |  6 +++
 src/backend/replication/slot.c              | 17 +++++-
 4 files changed, 115 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f1a795bba9f..ba9415b9149 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2782,9 +2782,17 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 void
 XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
 {
+	XLogRecPtr	oldlsn;
+
 	SpinLockAcquire(&XLogCtl->info_lck);
+	oldlsn = XLogCtl->replicationSlotMinLSN;
 	XLogCtl->replicationSlotMinLSN = lsn;
 	SpinLockRelease(&XLogCtl->info_lck);
+
+	elog(LOG, "XLogSetReplicationSlotMinimumLSN set %X/%X (old %X/%X)", LSN_FORMAT_ARGS(lsn), LSN_FORMAT_ARGS(oldlsn));
+
+	/* no backward moves */
+	Assert(lsn >= oldlsn);
 }
 
 
@@ -2801,6 +2809,8 @@ XLogGetReplicationSlotMinimumLSN(void)
 	retval = XLogCtl->replicationSlotMinLSN;
 	SpinLockRelease(&XLogCtl->info_lck);
 
+	elog(LOG, "XLogGetReplicationSlotMinimumLSN got %X/%X", LSN_FORMAT_ARGS(retval));
+
 	return retval;
 }
 
@@ -9462,12 +9472,19 @@ CreateCheckPoint(int flags)
 	if (PriorRedoPtr != InvalidXLogRecPtr)
 		UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
 
+	elog(LOG, "CreateCheckPoint / slot invalidation start / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
+
 	/*
 	 * Delete old log files, those no longer needed for last checkpoint to
 	 * prevent the disk holding the xlog from growing full.
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 	KeepLogSeg(recptr, &_logSegNo);
+
+	elog(LOG, "CreateCheckPoint / slot invalidation after KeepLogSeg / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
+
 	if (InvalidateObsoleteReplicationSlots(_logSegNo))
 	{
 		/*
@@ -9476,10 +9493,15 @@ CreateCheckPoint(int flags)
 		 */
 		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 		KeepLogSeg(recptr, &_logSegNo);
+		elog(LOG, "CreateCheckPoint / slot invalidation recalculate / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+			LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
 	}
 	_logSegNo--;
 	RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
 
+	elog(LOG, "CreateCheckPoint / slot invalidation done / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
+
 	/*
 	 * Make more log segments if needed.  (Do this after recycling old log
 	 * segments, since that may supply some of the needed files.)
@@ -9877,6 +9899,10 @@ CreateRestartPoint(int flags)
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 	endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 	KeepLogSeg(endptr, &_logSegNo);
+
+	elog(LOG, "CreateRestartPoint / slot invalidation start / RedoRecPtr %X/%X receivePtr %X/%X replayPtr %X/%X endptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(receivePtr), LSN_FORMAT_ARGS(replayPtr), LSN_FORMAT_ARGS(endptr), _logSegNo);
+
 	if (InvalidateObsoleteReplicationSlots(_logSegNo))
 	{
 		/*
@@ -9885,9 +9911,15 @@ CreateRestartPoint(int flags)
 		 */
 		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 		KeepLogSeg(endptr, &_logSegNo);
+
+		elog(LOG, "CreateRestartPoint / slot invalidation recalculate / RedoRecPtr %X/%X endptr %X/%X _logSegNo %lu",
+			LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(endptr), _logSegNo);
 	}
 	_logSegNo--;
 
+	elog(LOG, "CreateRestartPoint / slot invalidation done / RedoRecPtr %X/%X endptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(endptr), _logSegNo);
+
 	/*
 	 * Try to recycle segments on a useful timeline. If we've been promoted
 	 * since the beginning of this restartpoint, use the new timeline chosen
@@ -10109,6 +10141,8 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 		}
 	}
 
+	elog(LOG, "KeepLogSeg recptr %X/%X logSegNo %lu segno %lu", LSN_FORMAT_ARGS(recptr), *logSegNo, segno);
+
 	/* don't delete WAL segments newer than the calculated segment */
 	if (segno < *logSegNo)
 		*logSegNo = segno;
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2f7b2c85d9b..16fed53063b 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -375,9 +375,16 @@ CreateInitDecodingContext(const char *plugin,
 		ReplicationSlotReserveWal();
 	else
 	{
+		XLogRecPtr oldlsn;
 		SpinLockAcquire(&slot->mutex);
+		oldlsn = slot->data.restart_lsn;
 		slot->data.restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
+
+		elog(LOG, "CreateInitDecodingContext updated restart_lsn %X/%X (old %X/%X) for slot %s",
+			LSN_FORMAT_ARGS(restart_lsn),
+			LSN_FORMAT_ARGS(oldlsn),
+			NameStr(slot->data.name));
 	}
 
 	/* ----
@@ -1598,6 +1605,13 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
+		elog(LOG, "LogicalIncreaseXminForSlot candidate_catalog_xmin %u candidate_xmin_lsn %X/%X (%X/%X)",
+			  xmin, LSN_FORMAT_ARGS(current_lsn),
+			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
+
+		/* don't allow the LSN to go backwards */
+		Assert(slot->candidate_xmin_lsn <= current_lsn);
+
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
 
@@ -1611,6 +1625,13 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 	 */
 	else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
 	{
+		elog(LOG, "LogicalIncreaseXminForSlot candidate_catalog_xmin %u candidate_xmin_lsn %X/%X (%X/%X)",
+			  xmin, LSN_FORMAT_ARGS(current_lsn),
+			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
+
+		/* don't allow the LSN to go backwards */
+		Assert(slot->candidate_xmin_lsn <= current_lsn);
+
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
 	}
@@ -1653,6 +1674,18 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot %s candidate_restart_valid_lsn %X/%X (%X/%X) candidate_restart_lsn %X/%X (%X/%X)",
+			NameStr(slot->data.name),
+			LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
+			LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
+
+		/* don't allow LSNs to go back */
+		Assert(slot->candidate_restart_valid <= current_lsn);
+		Assert(slot->candidate_restart_lsn <= restart_lsn);
+
+		/* also don't even consider going back for actual restart_lsn */
+		Assert(slot->data.restart_lsn <= restart_lsn);
+
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 
@@ -1667,6 +1700,21 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 */
 	if (slot->candidate_restart_valid == InvalidXLogRecPtr)
 	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot %s candidate_restart_valid_lsn %X/%X (%X/%X) candidate_restart_lsn %X/%X (%X/%X)",
+			  NameStr(slot->data.name),
+			  LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
+			  LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
+
+		/* don't allow LSNs to go back */
+		Assert(slot->candidate_restart_valid <= current_lsn);
+		Assert(slot->candidate_restart_lsn <= restart_lsn);
+
+		/*
+		 * also don't even consider going back for actual restart_lsn
+		 * XXX This is the first assert we hit, called from snapbuild.c: LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+		 */
+		Assert(slot->data.restart_lsn <= restart_lsn);
+
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
@@ -1707,6 +1755,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 {
 	Assert(lsn != InvalidXLogRecPtr);
 
+	elog(LOG, "LogicalConfirmReceivedLocation %X/%X", LSN_FORMAT_ARGS(lsn));
+
 	/* Do an unlocked check for candidate_lsn first. */
 	if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
 		MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
@@ -1746,6 +1796,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		{
 			Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
 
+			elog(LOG, "LogicalConfirmReceivedLocation updating data.restart_lsn to %X/%X (from %X/%X) candidate_restart_valid 0/0 (from %X/%X) candidate_restart_lsn 0/0 (from %X/%X)",
+				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn),
+				LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_valid),
+				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn));
+
+			/* don't allow the LSN to go backwards */
+			Assert(MyReplicationSlot->candidate_restart_lsn >= MyReplicationSlot->data.restart_lsn);
+
 			MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
 			MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
 			MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
@@ -1774,6 +1833,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 			MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
 			SpinLockRelease(&MyReplicationSlot->mutex);
 
+			elog(LOG, "LogicalConfirmReceivedLocation calculating required LSN/xmin after update");
 			ReplicationSlotsComputeRequiredXmin(false);
 			ReplicationSlotsComputeRequiredLSN();
 		}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 7b761510f1e..05184516704 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1270,7 +1270,10 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	 * anything because we hadn't reached a consistent state yet.
 	 */
 	if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
+	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot restart_decoding_lsn %X/%X", LSN_FORMAT_ARGS(txn->restart_decoding_lsn));
 		LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+	}
 
 	/*
 	 * No in-progress transaction, can reuse the last serialized snapshot if
@@ -1279,8 +1282,11 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	else if (txn == NULL &&
 			 builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
 			 builder->last_serialized_snapshot != InvalidXLogRecPtr)
+	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot last_serialized_snapshot %X/%X", LSN_FORMAT_ARGS(builder->last_serialized_snapshot));
 		LogicalIncreaseRestartDecodingForSlot(lsn,
 											  builder->last_serialized_snapshot);
+	}
 }
 
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 037a347cba0..8a7aa257293 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -827,6 +827,8 @@ ReplicationSlotsComputeRequiredLSN(void)
 
 	Assert(ReplicationSlotCtl != NULL);
 
+	elog(LOG, "ReplicationSlotsComputeRequiredLSN / start");
+
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -840,6 +842,8 @@ ReplicationSlotsComputeRequiredLSN(void)
 		restart_lsn = s->data.restart_lsn;
 		SpinLockRelease(&s->mutex);
 
+		elog(LOG, "ReplicationSlotsComputeRequiredLSN found slot %s with restart_lsn %X/%X", NameStr(s->data.name), LSN_FORMAT_ARGS(restart_lsn));
+
 		if (restart_lsn != InvalidXLogRecPtr &&
 			(min_required == InvalidXLogRecPtr ||
 			 restart_lsn < min_required))
@@ -847,6 +851,8 @@ ReplicationSlotsComputeRequiredLSN(void)
 	}
 	LWLockRelease(ReplicationSlotControlLock);
 
+	elog(LOG, "ReplicationSlotsComputeRequiredLSN / done");
+
 	XLogSetReplicationSlotMinimumLSN(min_required);
 }
 
@@ -1285,9 +1291,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 			ReplicationSlotRelease();
 
 			ereport(LOG,
-					(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
+					(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size (%d)",
 							NameStr(slotname),
-							LSN_FORMAT_ARGS(restart_lsn))));
+							LSN_FORMAT_ARGS(restart_lsn), max_slot_wal_keep_size_mb)));
 
 			/* done with this slot for now */
 			break;
@@ -1315,6 +1321,9 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
 
 	XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
 
+	elog(LOG, "InvalidateObsoleteReplicationSlots oldestSegno %lu oldestLSN %X/%X",
+		oldestSegno, LSN_FORMAT_ARGS(oldestLSN));
+
 restart:
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 	for (int i = 0; i < max_replication_slots; i++)
@@ -1327,6 +1336,7 @@ restart:
 		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
 		{
 			/* if the lock was released, start from scratch */
+			elog(LOG, "InvalidateObsoleteReplicationSlots restart invalidated=%d", invalidated);
 			goto restart;
 		}
 	}
@@ -1337,6 +1347,7 @@ restart:
 	 */
 	if (invalidated)
 	{
+		elog(LOG, "InvalidateObsoleteReplicationSlots slot invalidated, recalculate required");
 		ReplicationSlotsComputeRequiredXmin(false);
 		ReplicationSlotsComputeRequiredLSN();
 	}
@@ -1854,6 +1865,8 @@ RestoreSlotFromDisk(const char *name)
 		slot->candidate_restart_lsn = InvalidXLogRecPtr;
 		slot->candidate_restart_valid = InvalidXLogRecPtr;
 
+		elog(LOG, "RestoreSlotFromDisk restart_lsn %X/%X candidate_xmin_lsn 0/0 candidate_restart_lsn 0/0 candidate_restart_valid 0/0", LSN_FORMAT_ARGS(slot->data.restart_lsn));
+
 		slot->in_use = true;
 		slot->active_pid = 0;
 
-- 
2.39.5

vmaster-0002-WIP-change-asserts-to-elog-defense.patchtext/x-patch; charset=UTF-8; name=vmaster-0002-WIP-change-asserts-to-elog-defense.patchDownload
From 81981f60949ab4c0fb9418b17dfcc37c0aa063f1 Mon Sep 17 00:00:00 2001
From: tomas <tomas>
Date: Wed, 6 Nov 2024 12:46:16 +0100
Subject: [PATCH vnew 2/2] WIP: change asserts to elog + defense

When updating LSNs, use Max() with the preceding value to protect against
moves backwards.
---
 src/backend/replication/logical/logical.c | 57 ++++++++++++-----------
 1 file changed, 31 insertions(+), 26 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 16fed53063b..988f4add977 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1609,11 +1609,12 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 			  xmin, LSN_FORMAT_ARGS(current_lsn),
 			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
 
-		/* don't allow the LSN to go backwards */
-		Assert(slot->candidate_xmin_lsn <= current_lsn);
+		if (slot->candidate_xmin_lsn > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseXminForSlot: slot->candidate_xmin_lsn > current_lsn");
 
+		/* XXX doesn't the Max() put the fields out of sync? */
 		slot->candidate_catalog_xmin = xmin;
-		slot->candidate_xmin_lsn = current_lsn;
+		slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn);
 
 		/* our candidate can directly be used */
 		updated_xmin = true;
@@ -1629,11 +1630,12 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 			  xmin, LSN_FORMAT_ARGS(current_lsn),
 			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
 
-		/* don't allow the LSN to go backwards */
-		Assert(slot->candidate_xmin_lsn <= current_lsn);
+		if (slot->candidate_xmin_lsn > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseXminForSlot: slot->candidate_xmin_lsn > current_lsn");
 
+		/* XXX doesn't the Max() put the fields out of sync? */
 		slot->candidate_catalog_xmin = xmin;
-		slot->candidate_xmin_lsn = current_lsn;
+		slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn);
 	}
 	SpinLockRelease(&slot->mutex);
 
@@ -1679,15 +1681,17 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 			LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
 			LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
 
-		/* don't allow LSNs to go back */
-		Assert(slot->candidate_restart_valid <= current_lsn);
-		Assert(slot->candidate_restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_valid > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_valid > current_lsn");
 
-		/* also don't even consider going back for actual restart_lsn */
-		Assert(slot->data.restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_lsn > restart_lsn");
 
-		slot->candidate_restart_valid = current_lsn;
-		slot->candidate_restart_lsn = restart_lsn;
+		if (slot->data.restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > restart_lsn");
+
+		slot->candidate_restart_valid = Max(slot->candidate_restart_valid, current_lsn);
+		slot->candidate_restart_lsn = Max(slot->data.restart_lsn, Max(slot->candidate_restart_lsn, restart_lsn));
 
 		/* our candidate can directly be used */
 		updated_lsn = true;
@@ -1705,18 +1709,18 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 			  LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
 			  LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
 
-		/* don't allow LSNs to go back */
-		Assert(slot->candidate_restart_valid <= current_lsn);
-		Assert(slot->candidate_restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_valid > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_valid > current_lsn");
 
-		/*
-		 * also don't even consider going back for actual restart_lsn
-		 * XXX This is the first assert we hit, called from snapbuild.c: LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
-		 */
-		Assert(slot->data.restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_lsn > restart_lsn");
+
+		if (slot->data.restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > restart_lsn");
+
+		slot->candidate_restart_valid = Max(slot->candidate_restart_valid, current_lsn);
+		slot->candidate_restart_lsn = Max(slot->data.restart_lsn, Max(slot->candidate_restart_lsn, restart_lsn));
 
-		slot->candidate_restart_valid = current_lsn;
-		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
 		elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
@@ -1802,10 +1806,11 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_valid),
 				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn));
 
-			/* don't allow the LSN to go backwards */
-			Assert(MyReplicationSlot->candidate_restart_lsn >= MyReplicationSlot->data.restart_lsn);
+			if (MyReplicationSlot->candidate_restart_lsn < MyReplicationSlot->data.restart_lsn)
+				elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > MyReplicationSlot->candidate_restart_lsn");
 
-			MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
+			MyReplicationSlot->data.restart_lsn = Max(MyReplicationSlot->data.restart_lsn,
+								  MyReplicationSlot->candidate_restart_lsn);
 			MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
 			MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
 			updated_restart = true;
-- 
2.39.5

v14-0001-WIP-add-elog-messages-and-asserts.patchtext/x-patch; charset=UTF-8; name=v14-0001-WIP-add-elog-messages-and-asserts.patchDownload
From 8cd1b0472f1189be9ba19455df50fd1cc1f730e5 Mon Sep 17 00:00:00 2001
From: tomas <tomas>
Date: Wed, 6 Nov 2024 12:41:25 +0100
Subject: [PATCH v14 1/2] WIP: add elog messages and asserts

shows how the LSNs move, protects against move backwards
---
 src/backend/access/transam/xlog.c           | 34 ++++++++++++
 src/backend/replication/logical/logical.c   | 60 +++++++++++++++++++++
 src/backend/replication/logical/snapbuild.c |  6 +++
 src/backend/replication/slot.c              | 17 +++++-
 4 files changed, 115 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f1a795bba9f..ba9415b9149 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2782,9 +2782,17 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 void
 XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
 {
+	XLogRecPtr	oldlsn;
+
 	SpinLockAcquire(&XLogCtl->info_lck);
+	oldlsn = XLogCtl->replicationSlotMinLSN;
 	XLogCtl->replicationSlotMinLSN = lsn;
 	SpinLockRelease(&XLogCtl->info_lck);
+
+	elog(LOG, "XLogSetReplicationSlotMinimumLSN set %X/%X (old %X/%X)", LSN_FORMAT_ARGS(lsn), LSN_FORMAT_ARGS(oldlsn));
+
+	/* no backward moves */
+	Assert(lsn >= oldlsn);
 }
 
 
@@ -2801,6 +2809,8 @@ XLogGetReplicationSlotMinimumLSN(void)
 	retval = XLogCtl->replicationSlotMinLSN;
 	SpinLockRelease(&XLogCtl->info_lck);
 
+	elog(LOG, "XLogGetReplicationSlotMinimumLSN got %X/%X", LSN_FORMAT_ARGS(retval));
+
 	return retval;
 }
 
@@ -9462,12 +9472,19 @@ CreateCheckPoint(int flags)
 	if (PriorRedoPtr != InvalidXLogRecPtr)
 		UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
 
+	elog(LOG, "CreateCheckPoint / slot invalidation start / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
+
 	/*
 	 * Delete old log files, those no longer needed for last checkpoint to
 	 * prevent the disk holding the xlog from growing full.
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 	KeepLogSeg(recptr, &_logSegNo);
+
+	elog(LOG, "CreateCheckPoint / slot invalidation after KeepLogSeg / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
+
 	if (InvalidateObsoleteReplicationSlots(_logSegNo))
 	{
 		/*
@@ -9476,10 +9493,15 @@ CreateCheckPoint(int flags)
 		 */
 		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 		KeepLogSeg(recptr, &_logSegNo);
+		elog(LOG, "CreateCheckPoint / slot invalidation recalculate / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+			LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
 	}
 	_logSegNo--;
 	RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
 
+	elog(LOG, "CreateCheckPoint / slot invalidation done / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
+
 	/*
 	 * Make more log segments if needed.  (Do this after recycling old log
 	 * segments, since that may supply some of the needed files.)
@@ -9877,6 +9899,10 @@ CreateRestartPoint(int flags)
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 	endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 	KeepLogSeg(endptr, &_logSegNo);
+
+	elog(LOG, "CreateRestartPoint / slot invalidation start / RedoRecPtr %X/%X receivePtr %X/%X replayPtr %X/%X endptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(receivePtr), LSN_FORMAT_ARGS(replayPtr), LSN_FORMAT_ARGS(endptr), _logSegNo);
+
 	if (InvalidateObsoleteReplicationSlots(_logSegNo))
 	{
 		/*
@@ -9885,9 +9911,15 @@ CreateRestartPoint(int flags)
 		 */
 		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 		KeepLogSeg(endptr, &_logSegNo);
+
+		elog(LOG, "CreateRestartPoint / slot invalidation recalculate / RedoRecPtr %X/%X endptr %X/%X _logSegNo %lu",
+			LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(endptr), _logSegNo);
 	}
 	_logSegNo--;
 
+	elog(LOG, "CreateRestartPoint / slot invalidation done / RedoRecPtr %X/%X endptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(endptr), _logSegNo);
+
 	/*
 	 * Try to recycle segments on a useful timeline. If we've been promoted
 	 * since the beginning of this restartpoint, use the new timeline chosen
@@ -10109,6 +10141,8 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 		}
 	}
 
+	elog(LOG, "KeepLogSeg recptr %X/%X logSegNo %lu segno %lu", LSN_FORMAT_ARGS(recptr), *logSegNo, segno);
+
 	/* don't delete WAL segments newer than the calculated segment */
 	if (segno < *logSegNo)
 		*logSegNo = segno;
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2f7b2c85d9b..16fed53063b 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -375,9 +375,16 @@ CreateInitDecodingContext(const char *plugin,
 		ReplicationSlotReserveWal();
 	else
 	{
+		XLogRecPtr oldlsn;
 		SpinLockAcquire(&slot->mutex);
+		oldlsn = slot->data.restart_lsn;
 		slot->data.restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
+
+		elog(LOG, "CreateInitDecodingContext updated restart_lsn %X/%X (old %X/%X) for slot %s",
+			LSN_FORMAT_ARGS(restart_lsn),
+			LSN_FORMAT_ARGS(oldlsn),
+			NameStr(slot->data.name));
 	}
 
 	/* ----
@@ -1598,6 +1605,13 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
+		elog(LOG, "LogicalIncreaseXminForSlot candidate_catalog_xmin %u candidate_xmin_lsn %X/%X (%X/%X)",
+			  xmin, LSN_FORMAT_ARGS(current_lsn),
+			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
+
+		/* don't allow the LSN to go backwards */
+		Assert(slot->candidate_xmin_lsn <= current_lsn);
+
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
 
@@ -1611,6 +1625,13 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 	 */
 	else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
 	{
+		elog(LOG, "LogicalIncreaseXminForSlot candidate_catalog_xmin %u candidate_xmin_lsn %X/%X (%X/%X)",
+			  xmin, LSN_FORMAT_ARGS(current_lsn),
+			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
+
+		/* don't allow the LSN to go backwards */
+		Assert(slot->candidate_xmin_lsn <= current_lsn);
+
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
 	}
@@ -1653,6 +1674,18 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot %s candidate_restart_valid_lsn %X/%X (%X/%X) candidate_restart_lsn %X/%X (%X/%X)",
+			NameStr(slot->data.name),
+			LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
+			LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
+
+		/* don't allow LSNs to go back */
+		Assert(slot->candidate_restart_valid <= current_lsn);
+		Assert(slot->candidate_restart_lsn <= restart_lsn);
+
+		/* also don't even consider going back for actual restart_lsn */
+		Assert(slot->data.restart_lsn <= restart_lsn);
+
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 
@@ -1667,6 +1700,21 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 */
 	if (slot->candidate_restart_valid == InvalidXLogRecPtr)
 	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot %s candidate_restart_valid_lsn %X/%X (%X/%X) candidate_restart_lsn %X/%X (%X/%X)",
+			  NameStr(slot->data.name),
+			  LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
+			  LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
+
+		/* don't allow LSNs to go back */
+		Assert(slot->candidate_restart_valid <= current_lsn);
+		Assert(slot->candidate_restart_lsn <= restart_lsn);
+
+		/*
+		 * also don't even consider going back for actual restart_lsn
+		 * XXX This is the first assert we hit, called from snapbuild.c: LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+		 */
+		Assert(slot->data.restart_lsn <= restart_lsn);
+
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
@@ -1707,6 +1755,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 {
 	Assert(lsn != InvalidXLogRecPtr);
 
+	elog(LOG, "LogicalConfirmReceivedLocation %X/%X", LSN_FORMAT_ARGS(lsn));
+
 	/* Do an unlocked check for candidate_lsn first. */
 	if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
 		MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
@@ -1746,6 +1796,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		{
 			Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
 
+			elog(LOG, "LogicalConfirmReceivedLocation updating data.restart_lsn to %X/%X (from %X/%X) candidate_restart_valid 0/0 (from %X/%X) candidate_restart_lsn 0/0 (from %X/%X)",
+				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn),
+				LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_valid),
+				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn));
+
+			/* don't allow the LSN to go backwards */
+			Assert(MyReplicationSlot->candidate_restart_lsn >= MyReplicationSlot->data.restart_lsn);
+
 			MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
 			MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
 			MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
@@ -1774,6 +1833,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 			MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
 			SpinLockRelease(&MyReplicationSlot->mutex);
 
+			elog(LOG, "LogicalConfirmReceivedLocation calculating required LSN/xmin after update");
 			ReplicationSlotsComputeRequiredXmin(false);
 			ReplicationSlotsComputeRequiredLSN();
 		}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 7b761510f1e..05184516704 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1270,7 +1270,10 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	 * anything because we hadn't reached a consistent state yet.
 	 */
 	if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
+	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot restart_decoding_lsn %X/%X", LSN_FORMAT_ARGS(txn->restart_decoding_lsn));
 		LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+	}
 
 	/*
 	 * No in-progress transaction, can reuse the last serialized snapshot if
@@ -1279,8 +1282,11 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	else if (txn == NULL &&
 			 builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
 			 builder->last_serialized_snapshot != InvalidXLogRecPtr)
+	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot last_serialized_snapshot %X/%X", LSN_FORMAT_ARGS(builder->last_serialized_snapshot));
 		LogicalIncreaseRestartDecodingForSlot(lsn,
 											  builder->last_serialized_snapshot);
+	}
 }
 
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 037a347cba0..8a7aa257293 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -827,6 +827,8 @@ ReplicationSlotsComputeRequiredLSN(void)
 
 	Assert(ReplicationSlotCtl != NULL);
 
+	elog(LOG, "ReplicationSlotsComputeRequiredLSN / start");
+
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -840,6 +842,8 @@ ReplicationSlotsComputeRequiredLSN(void)
 		restart_lsn = s->data.restart_lsn;
 		SpinLockRelease(&s->mutex);
 
+		elog(LOG, "ReplicationSlotsComputeRequiredLSN found slot %s with restart_lsn %X/%X", NameStr(s->data.name), LSN_FORMAT_ARGS(restart_lsn));
+
 		if (restart_lsn != InvalidXLogRecPtr &&
 			(min_required == InvalidXLogRecPtr ||
 			 restart_lsn < min_required))
@@ -847,6 +851,8 @@ ReplicationSlotsComputeRequiredLSN(void)
 	}
 	LWLockRelease(ReplicationSlotControlLock);
 
+	elog(LOG, "ReplicationSlotsComputeRequiredLSN / done");
+
 	XLogSetReplicationSlotMinimumLSN(min_required);
 }
 
@@ -1285,9 +1291,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 			ReplicationSlotRelease();
 
 			ereport(LOG,
-					(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
+					(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size (%d)",
 							NameStr(slotname),
-							LSN_FORMAT_ARGS(restart_lsn))));
+							LSN_FORMAT_ARGS(restart_lsn), max_slot_wal_keep_size_mb)));
 
 			/* done with this slot for now */
 			break;
@@ -1315,6 +1321,9 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
 
 	XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
 
+	elog(LOG, "InvalidateObsoleteReplicationSlots oldestSegno %lu oldestLSN %X/%X",
+		oldestSegno, LSN_FORMAT_ARGS(oldestLSN));
+
 restart:
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 	for (int i = 0; i < max_replication_slots; i++)
@@ -1327,6 +1336,7 @@ restart:
 		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
 		{
 			/* if the lock was released, start from scratch */
+			elog(LOG, "InvalidateObsoleteReplicationSlots restart invalidated=%d", invalidated);
 			goto restart;
 		}
 	}
@@ -1337,6 +1347,7 @@ restart:
 	 */
 	if (invalidated)
 	{
+		elog(LOG, "InvalidateObsoleteReplicationSlots slot invalidated, recalculate required");
 		ReplicationSlotsComputeRequiredXmin(false);
 		ReplicationSlotsComputeRequiredLSN();
 	}
@@ -1854,6 +1865,8 @@ RestoreSlotFromDisk(const char *name)
 		slot->candidate_restart_lsn = InvalidXLogRecPtr;
 		slot->candidate_restart_valid = InvalidXLogRecPtr;
 
+		elog(LOG, "RestoreSlotFromDisk restart_lsn %X/%X candidate_xmin_lsn 0/0 candidate_restart_lsn 0/0 candidate_restart_valid 0/0", LSN_FORMAT_ARGS(slot->data.restart_lsn));
+
 		slot->in_use = true;
 		slot->active_pid = 0;
 
-- 
2.39.5

v14-0002-WIP-change-asserts-to-elog-defense.patchtext/x-patch; charset=UTF-8; name=v14-0002-WIP-change-asserts-to-elog-defense.patchDownload
From 81981f60949ab4c0fb9418b17dfcc37c0aa063f1 Mon Sep 17 00:00:00 2001
From: tomas <tomas>
Date: Wed, 6 Nov 2024 12:46:16 +0100
Subject: [PATCH v14 2/2] WIP: change asserts to elog + defense

When updating LSNs, use Max() with the preceding value to protect against
moves backwards.
---
 src/backend/replication/logical/logical.c | 57 ++++++++++++-----------
 1 file changed, 31 insertions(+), 26 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 16fed53063b..988f4add977 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1609,11 +1609,12 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 			  xmin, LSN_FORMAT_ARGS(current_lsn),
 			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
 
-		/* don't allow the LSN to go backwards */
-		Assert(slot->candidate_xmin_lsn <= current_lsn);
+		if (slot->candidate_xmin_lsn > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseXminForSlot: slot->candidate_xmin_lsn > current_lsn");
 
+		/* XXX doesn't the Max() put the fields out of sync? */
 		slot->candidate_catalog_xmin = xmin;
-		slot->candidate_xmin_lsn = current_lsn;
+		slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn);
 
 		/* our candidate can directly be used */
 		updated_xmin = true;
@@ -1629,11 +1630,12 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 			  xmin, LSN_FORMAT_ARGS(current_lsn),
 			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
 
-		/* don't allow the LSN to go backwards */
-		Assert(slot->candidate_xmin_lsn <= current_lsn);
+		if (slot->candidate_xmin_lsn > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseXminForSlot: slot->candidate_xmin_lsn > current_lsn");
 
+		/* XXX doesn't the Max() put the fields out of sync? */
 		slot->candidate_catalog_xmin = xmin;
-		slot->candidate_xmin_lsn = current_lsn;
+		slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn);
 	}
 	SpinLockRelease(&slot->mutex);
 
@@ -1679,15 +1681,17 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 			LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
 			LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
 
-		/* don't allow LSNs to go back */
-		Assert(slot->candidate_restart_valid <= current_lsn);
-		Assert(slot->candidate_restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_valid > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_valid > current_lsn");
 
-		/* also don't even consider going back for actual restart_lsn */
-		Assert(slot->data.restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_lsn > restart_lsn");
 
-		slot->candidate_restart_valid = current_lsn;
-		slot->candidate_restart_lsn = restart_lsn;
+		if (slot->data.restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > restart_lsn");
+
+		slot->candidate_restart_valid = Max(slot->candidate_restart_valid, current_lsn);
+		slot->candidate_restart_lsn = Max(slot->data.restart_lsn, Max(slot->candidate_restart_lsn, restart_lsn));
 
 		/* our candidate can directly be used */
 		updated_lsn = true;
@@ -1705,18 +1709,18 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 			  LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
 			  LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
 
-		/* don't allow LSNs to go back */
-		Assert(slot->candidate_restart_valid <= current_lsn);
-		Assert(slot->candidate_restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_valid > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_valid > current_lsn");
 
-		/*
-		 * also don't even consider going back for actual restart_lsn
-		 * XXX This is the first assert we hit, called from snapbuild.c: LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
-		 */
-		Assert(slot->data.restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_lsn > restart_lsn");
+
+		if (slot->data.restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > restart_lsn");
+
+		slot->candidate_restart_valid = Max(slot->candidate_restart_valid, current_lsn);
+		slot->candidate_restart_lsn = Max(slot->data.restart_lsn, Max(slot->candidate_restart_lsn, restart_lsn));
 
-		slot->candidate_restart_valid = current_lsn;
-		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
 		elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
@@ -1802,10 +1806,11 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_valid),
 				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn));
 
-			/* don't allow the LSN to go backwards */
-			Assert(MyReplicationSlot->candidate_restart_lsn >= MyReplicationSlot->data.restart_lsn);
+			if (MyReplicationSlot->candidate_restart_lsn < MyReplicationSlot->data.restart_lsn)
+				elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > MyReplicationSlot->candidate_restart_lsn");
 
-			MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
+			MyReplicationSlot->data.restart_lsn = Max(MyReplicationSlot->data.restart_lsn,
+								  MyReplicationSlot->candidate_restart_lsn);
 			MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
 			MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
 			updated_restart = true;
-- 
2.39.5

#2Tomas Vondra
tomas@vondra.me
In reply to: Tomas Vondra (#1)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

Hi,

I kept investigating this, but I haven't made much progress. I still
don't understand why would it be OK to move any of the LSN fields
backwards - certainly for fields like confirm_flush or restart_lsn.

I did a simple experiment - added asserts to the couple places in
logical.c updating the the LSN fields, checking the value is increased.
But then I simply ran make check-world, instead of the stress test.

And that actually fails too, 040_standby_failover_slots_sync.pl triggers
this

{
SpinLockAcquire(&MyReplicationSlot->mutex);
Assert(MyReplicationSlot->data.confirmed_flush <= lsn);
MyReplicationSlot->data.confirmed_flush = lsn;
SpinLockRelease(&MyReplicationSlot->mutex);
}

So this moves confirm_flush back, albeit only by a tiny amount (I've
seen ~56 byte difference). I don't have an example of this causing an
issue in practice, but I note that CheckPointReplicationSlots does this:

if (is_shutdown && SlotIsLogical(s))
{
SpinLockAcquire(&s->mutex);

if (s->data.invalidated == RS_INVAL_NONE &&
s->data.confirmed_flush > s->last_saved_confirmed_flush)
{
s->just_dirtied = true;
s->dirty = true;
}
SpinLockRelease(&s->mutex);
}

to determine if a slot needs to be flushed to disk during checkpoint. So
I guess it's possible we save a slot to disk at some LSN, then the
confirm_flush moves backward, and we fail to sync the slot to disk.

But I don't have a reproducer for this ...

I also noticed a strange difference between LogicalIncreaseXminForSlot
and LogicalIncreaseRestartDecodingForSlot.

The structure of LogicalIncreaseXminForSlot looks like this:

if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
{
}
else if (current_lsn <= slot->data.confirmed_flush)
{
... update candidate fields ...
}
else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
{
... update candidate fields ...
}

while LogicalIncreaseRestartDecodingForSlot looks like this:

if (restart_lsn <= slot->data.restart_lsn)
{
}
else if (current_lsn <= slot->data.confirmed_flush)
{
... update candidate fields ...
}

if (slot->candidate_restart_valid == InvalidXLogRecPtr)
{
... update candidate fields ...
}

Notice that LogicalIncreaseXminForSlot has the third block guarded by
"else if", while LogicalIncreaseRestartDecodingForSlot has "if". Isn't
that a bit suspicious, considering the functions do the same thing, just
for different fields? I don't know if this is dangerous, the comments
suggest it may just waste extra effort after reconnect.

regards

--
Tomas Vondra

#3Ashutosh Bapat
ashutosh.bapat.oss@gmail.com
In reply to: Tomas Vondra (#1)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Wed, Nov 6, 2024 at 8:54 PM Tomas Vondra <tomas@vondra.me> wrote:

Hi,

I've been investigating some issues reported by users, related to
logical replication unexpectedly breaking with messages like:

LOG: invalidating slot "s" because its restart_lsn X/Y exceeds
max_slot_wal_keep_size

which is pretty confusing, because the system has that GUC set to -1 (so
disabled, there should be no limit). Or a message like this:

ERROR: requested WAL segment 000...0AA has already been removed

which is a bit less confusing, but still puzzling because replication
slots are meant to prevent exactly this.

I speculated there's some sort of rare race condition, in how we advance
the slot LSN values. I didn't know where to start, so I gave up on
trying to understand the whole complex code. Instead, I wrote a simple
stress test that

1) sets up a cluster (primary + 1+ logical replicas)

2) runs pgbench on the primary, checks replicas/slots

3) randomly restarts the nodes (both primary/replicas) with either fast
and immediate mode, with random short pauses

4) runs checkpoints in tight while loop

This is based on the observations that in past reports the issues seem
to happen only with logical replication, shortly after reconnect (e.g.
after connection reset etc.). And the slots are invalidated / WAL
removed during a checkpoint, so frequent checkpoints should make it
easier to hit ...

Attached is a couple scripts running this - it's not particularly pretty
and may need some tweak to make it work on your machine (run.sh is the
script to run).

And unfortunately, this started to fail pretty quick. The short story is
that it's not difficult to get into a situation where restart_lsn for a
slot moves backwards, so something like this can happen:

1) slot restart_lsn moves forward to LSN A

2) checkpoint happens, updates min required LSN for slots, recycles
segments it considers unnecessary (up to LSN A)

3) slot restart_lsn moves backwards to LSN B (where B < A)

4) kaboom

This would perfectly explain all the symptoms I mentioned earlier. The
max_slot_wal_keep_size reference is confusing, but AFAIK it's just based
on (reasonable) belief that LSN can't move backwards, and so the only
reason for restart_lsn being before min required LSN is that this GUC
kicked in. But the assumption does not hold :-(

Now, why/how can this happen?

I kept adding a elog(LOG) messages to all places updating LSNs for a
slot, and asserts to fail if data.restart_lsn moves backwards. See the
attached 0001 patch. An example for a failure (for the walsended backend
that triggered the assert) looks like this:

344.139 LOG: starting logical decoding for slot "s1"

344.139 DETAIL: Streaming transactions committing after 1/E97EAC30,
reading WAL from 1/E96FB4D0.

344.140 LOG: logical decoding found consistent point at 1/E96FB4D0

344.140 DETAIL: Logical decoding will begin using saved snapshot.

344.140 LOG: LogicalConfirmReceivedLocation 1/E9865398

344.140 LOG: LogicalConfirmReceivedLocation updating
data.restart_lsn to 1/E979D4C8 (from 1/E96FB4D0)
candidate_restart_valid 0/0 (from 1/E9865398)
candidate_restart_lsn 0/0 (from 1/E979D4C8)

344.145 LOG: LogicalIncreaseRestartDecodingForSlot
restart_decoding_lsn 1/E96FB4D0

344.145 LOG: LogicalIncreaseRestartDecodingForSlot s1
candidate_restart_valid_lsn 1/E979D4C8 (0/0)
candidate_restart_lsn 1/E96FB4D0 (0/0)

344.147 LOG: LogicalIncreaseRestartDecodingForSlot
restart_decoding_lsn 1/E979D4C8

344.156 LOG: LogicalIncreaseXminForSlot
candidate_catalog_xmin 30699
candidate_xmin_lsn 1/E993AD68 (0/0)
...
344.235 LOG: LogicalIncreaseRestartDecodingForSlot
restart_decoding_lsn 1/E9F33AF8

344.240 LOG: LogicalConfirmReceivedLocation 1/E9DCCD78

344.240 LOG: LogicalConfirmReceivedLocation updating
data.restart_lsn to 1/E96FB4D0 (from 1/E979D4C8)
candidate_restart_valid 0/0 (from 1/E979D4C8)
candidate_restart_lsn 0/0 (from 1/E96FB4D0)

345.536 LOG: server process (PID 2518127) was terminated by
signal 6: Aborted

We start decoding at 1/E96FB4D0, and right after startup we get a
confirmation, and LogicalConfirmReceivedLocation updates restart_lsn to
1/E979D4C8.

But then LogicalIncreaseRestartDecodingForSlot comes along, and stores
the restart_decoding_lsn 1/E96FB4D0 (which is the initial restart_lsn)
into candidate_restart_lsn.

And then a little bit later we get another confirmation, we call
LogicalConfirmReceivedLocation which propagates candidate_restart_lsn
into data.restart_lsn.

This is how restart_lsn moves backwards, causing issues. I've reproduced
this on PG14 and current master, but I believe the issue exists since
the introduction of logical replication in 9.4 :-(

I'm not claiming this is the only way how this can happen, but all the
cases I've seen in my stress testing look like this. Moreover, I'm not
claiming this is the only LSN field that can move backwards like this.
It seems to me various other candidate_ fields have the same issue, but
may have consequences other than discarding "unnecessary" WAL.

I've been removed of this [1] thread from 2022. I'm 99% sure it's the
same issue - it happened shortly after a reconnect, etc. And it seems to
me Masahiko-san was about the causes in [2]. I don't think the fix
suggested in that message (changing the branch to "else if") can work,
though. At least it did not really help in my testing.

And I'm not sure it'd fix all the issues - it only affects restart_lsn,
but AFAICS the same issue (LSNs moving backwards) can happen for the
other LSN slot field (candidate_xmin_lsn).

After examining the code before reading [2]/messages/by-id/CAD21AoBVhYnGBuW_o=wEGgTp01qiHNAx1a14b1X9kFXmuBe=sg@mail.gmail.com, I came to the same
conclusion as Masahiko-san in [2]/messages/by-id/CAD21AoBVhYnGBuW_o=wEGgTp01qiHNAx1a14b1X9kFXmuBe=sg@mail.gmail.com. We install candidate_restart_lsn
based on the running transaction record whose LSN is between
restart_lsn and confirmed_flush_lsn. Since candidate_restart_valid of
such candidates would be lesser than any confirmed_flush_lsn received
from downstream. I am surprised that the fix suggested by Masahiko-san
didn't work though. The fix also fix the asymmetry, between
LogicalIncreaseXminForSlot and LogicalIncreaseRestartDecodingForSlot,
that you have pointed out in your next email. What behaviour do you
see with that fix applied?

[1]: /messages/by-id/Yz2hivgyjS1RfMKs@depesz.com
[2]: /messages/by-id/CAD21AoBVhYnGBuW_o=wEGgTp01qiHNAx1a14b1X9kFXmuBe=sg@mail.gmail.com

--
Best Wishes,
Ashutosh Bapat

#4Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Tomas Vondra (#2)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

Hi,

Thank you for investigating this issue.

On Thu, Nov 7, 2024 at 10:40 AM Tomas Vondra <tomas@vondra.me> wrote:

Hi,

I kept investigating this, but I haven't made much progress. I still
don't understand why would it be OK to move any of the LSN fields
backwards - certainly for fields like confirm_flush or restart_lsn.

I did a simple experiment - added asserts to the couple places in
logical.c updating the the LSN fields, checking the value is increased.
But then I simply ran make check-world, instead of the stress test.

And that actually fails too, 040_standby_failover_slots_sync.pl triggers
this

{
SpinLockAcquire(&MyReplicationSlot->mutex);
Assert(MyReplicationSlot->data.confirmed_flush <= lsn);
MyReplicationSlot->data.confirmed_flush = lsn;
SpinLockRelease(&MyReplicationSlot->mutex);
}

So this moves confirm_flush back, albeit only by a tiny amount (I've
seen ~56 byte difference). I don't have an example of this causing an
issue in practice, but I note that CheckPointReplicationSlots does this:

if (is_shutdown && SlotIsLogical(s))
{
SpinLockAcquire(&s->mutex);

if (s->data.invalidated == RS_INVAL_NONE &&
s->data.confirmed_flush > s->last_saved_confirmed_flush)
{
s->just_dirtied = true;
s->dirty = true;
}
SpinLockRelease(&s->mutex);
}

to determine if a slot needs to be flushed to disk during checkpoint. So
I guess it's possible we save a slot to disk at some LSN, then the
confirm_flush moves backward, and we fail to sync the slot to disk.

But I don't have a reproducer for this ...

I also noticed a strange difference between LogicalIncreaseXminForSlot
and LogicalIncreaseRestartDecodingForSlot.

The structure of LogicalIncreaseXminForSlot looks like this:

if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
{
}
else if (current_lsn <= slot->data.confirmed_flush)
{
... update candidate fields ...
}
else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
{
... update candidate fields ...
}

while LogicalIncreaseRestartDecodingForSlot looks like this:

if (restart_lsn <= slot->data.restart_lsn)
{
}
else if (current_lsn <= slot->data.confirmed_flush)
{
... update candidate fields ...
}

if (slot->candidate_restart_valid == InvalidXLogRecPtr)
{
... update candidate fields ...
}

Notice that LogicalIncreaseXminForSlot has the third block guarded by
"else if", while LogicalIncreaseRestartDecodingForSlot has "if". Isn't
that a bit suspicious, considering the functions do the same thing, just
for different fields? I don't know if this is dangerous, the comments
suggest it may just waste extra effort after reconnect.

I also suspected this point. I still need to investigate if this
suspicion is related to the issue but I find this code in
LogicalIncreaseRestartDecodingForSlot() is dangerous.

We update slot's restart_lsn based on candidate_lsn and
candidate_valid upon receiving a feedback message from a subscriber,
then clear both fields. Therefore, this code in
LogicalIncreaseRestartDecodingForSlot() means that it sets an
arbitrary LSN to candidate_restart_lsn after updating slot's
restart_lsn.

I think an LSN older than slot's restart_lsn can be passed to
LogicalIncreaseRestartDecodingForSlot() as restart_lsn for example
after logical decoding restarts; My scenario I shared on another
thread was that after updating slot's restart_lsn (upon feedback from
a subscriber) based on both candidate_restart_lsn and
candidate_restart_valid that are remained in the slot, we might call
LogicalIncreaseRestartDecodingForSlot() when decoding a RUNNING_XACTS
record whose LSN is older than the slot's new restart_lsn. In this
case, we end up passing an LSN older than the new restart_lsn to
LogicalIncreaseRestartDecodingForSlot(), and that LSN is set to
candidate_restart_lsn. My hypothesis is that we wanted to prevent such
case by the first if block:

/* don't overwrite if have a newer restart lsn */
if (restart_lsn <= slot->data.restart_lsn)
{
}

Regards,

[1]: /messages/by-id/CAD21AoBG2OSDOFTtpPtQ7fx5Vt8p3dS5hPAv28CBSC6z2kHx-g@mail.gmail.com

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#5Tomas Vondra
tomas@vondra.me
In reply to: Ashutosh Bapat (#3)
2 attachment(s)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On 11/8/24 15:57, Ashutosh Bapat wrote:

...

After examining the code before reading [2], I came to the same
conclusion as Masahiko-san in [2]. We install candidate_restart_lsn
based on the running transaction record whose LSN is between
restart_lsn and confirmed_flush_lsn. Since candidate_restart_valid of
such candidates would be lesser than any confirmed_flush_lsn received
from downstream. I am surprised that the fix suggested by Masahiko-san
didn't work though. The fix also fix the asymmetry, between
LogicalIncreaseXminForSlot and LogicalIncreaseRestartDecodingForSlot,
that you have pointed out in your next email. What behaviour do you
see with that fix applied?

[1] /messages/by-id/Yz2hivgyjS1RfMKs@depesz.com
[2] /messages/by-id/CAD21AoBVhYnGBuW_o=wEGgTp01qiHNAx1a14b1X9kFXmuBe=sg@mail.gmail.com

I read that message (and the earlier discussion multiple times) while
investigating the issue, and TBH it's not very clear to me what the
conclusion is :-(

There's some discussion about whether the candidate fields should be
reset on release or not. There are even claims that it might be
legitimate to not reset the fields and update the restart_lsn. Using
such "stale" LSN values seems rather suspicious to me, but I don't have
a proof that it's incorrect. FWIW this unclarity is what I mentioned the
policy/contract for candidate fields is not explained anywhere.

That being said, I gave that fix a try - see the attached 0001 patch. It
tweaks LogicalIncreaseRestartDecodingForSlot (it needs a bit more care
because of the spinlock), and it adds a couple asserts to make sure the
data.restart_lsn never moves back.

And indeed, with this my stress test script does not crash anymore.

But is that really correct? The lack of failure in one specific test
does not really prove that. And then again - why should it be OK for the
other candidate fields to move backwards? Isn't that suspicious? It sure
seems counter-intuitive to me, and I'm not sure the code expects that.

So in 0002 I added a couple more asserts to make sure the LSN fields
only move forward, and those *do* continue to fail, and in some cases
the amount by which the fields move back are pretty significant
(multiple megabytes).

Maybe it's fine if this "backwards move" never propagates to e.g.
"restart_lsn", not sure. And I'm not sure which other fields should not
move backwards (what about data.confirm_flush for example?).

regards

--
Tomas Vondra

Attachments:

0001-restart_lsn-backwards-move-fix-and-asserts.patchtext/x-patch; charset=UTF-8; name=0001-restart_lsn-backwards-move-fix-and-asserts.patchDownload
From a81c50bbb8b7384a48752316468c59ab5faf9114 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@vondra.me>
Date: Sat, 9 Nov 2024 12:14:19 +0100
Subject: [PATCH 1/2] restart_lsn backwards move - fix and asserts

---
 src/backend/replication/logical/logical.c | 13 ++++++++++++-
 src/backend/replication/slot.c            |  1 +
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3fe1774a1e9..28d70e732d5 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -391,6 +391,7 @@ CreateInitDecodingContext(const char *plugin,
 	else
 	{
 		SpinLockAcquire(&slot->mutex);
+		Assert(slot->data.restart_lsn <= restart_lsn);
 		slot->data.restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 	}
@@ -1762,6 +1763,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 {
 	bool		updated_lsn = false;
 	ReplicationSlot *slot;
+	bool		spin_released = false;
 
 	slot = MyReplicationSlot;
 
@@ -1794,12 +1796,14 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 * might never end up updating if the receiver acks too slowly. A missed
 	 * value here will just cause some extra effort after reconnecting.
 	 */
-	if (slot->candidate_restart_valid == InvalidXLogRecPtr)
+	else if (slot->candidate_restart_valid == InvalidXLogRecPtr)
 	{
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
+		spin_released = true;
+
 		elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
 			 LSN_FORMAT_ARGS(restart_lsn),
 			 LSN_FORMAT_ARGS(current_lsn));
@@ -1815,6 +1819,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 		confirmed_flush = slot->data.confirmed_flush;
 		SpinLockRelease(&slot->mutex);
 
+		spin_released = true;
+
 		elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
 			 LSN_FORMAT_ARGS(restart_lsn),
 			 LSN_FORMAT_ARGS(current_lsn),
@@ -1823,6 +1829,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 			 LSN_FORMAT_ARGS(confirmed_flush));
 	}
 
+	if (!spin_released)
+		SpinLockRelease(&slot->mutex);
+
 	/* candidates are already valid with the current flush position, apply */
 	if (updated_lsn)
 		LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
@@ -1875,6 +1884,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		{
 			Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
 
+			Assert(MyReplicationSlot->data.restart_lsn <= MyReplicationSlot->candidate_restart_lsn);
+
 			MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
 			MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
 			MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 6828100cf1a..0f38ccc8353 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1468,6 +1468,7 @@ ReplicationSlotReserveWal(void)
 			restart_lsn = GetXLogInsertRecPtr();
 
 		SpinLockAcquire(&slot->mutex);
+		Assert(slot->data.restart_lsn <= restart_lsn);
 		slot->data.restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
-- 
2.47.0

0002-asserts-for-candidate-lsn-fields.patchtext/x-patch; charset=UTF-8; name=0002-asserts-for-candidate-lsn-fields.patchDownload
From 380dcc26d5b4db75233938130057d66547ddeae5 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@vondra.me>
Date: Sat, 9 Nov 2024 12:14:49 +0100
Subject: [PATCH 2/2] asserts for candidate lsn fields

---
 src/backend/replication/logical/logical.c | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 28d70e732d5..fe030d147c6 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1718,6 +1718,8 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
+		Assert(slot->candidate_xmin_lsn <= current_lsn);
+
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
 
@@ -1731,6 +1733,8 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 	 */
 	else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
 	{
+		Assert(slot->candidate_xmin_lsn <= current_lsn);
+
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
 
@@ -1784,6 +1788,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
+		Assert(slot->candidate_restart_valid <= current_lsn);
+		Assert(slot->candidate_restart_lsn <= restart_lsn);
+
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 
@@ -1798,6 +1805,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 */
 	else if (slot->candidate_restart_valid == InvalidXLogRecPtr)
 	{
+		Assert(slot->candidate_restart_valid <= current_lsn);
+		Assert(slot->candidate_restart_lsn <= restart_lsn);
+
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
-- 
2.47.0

#6Tomas Vondra
tomas@vondra.me
In reply to: Masahiko Sawada (#4)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On 11/8/24 19:25, Masahiko Sawada wrote:

Hi,

Thank you for investigating this issue.

On Thu, Nov 7, 2024 at 10:40 AM Tomas Vondra <tomas@vondra.me> wrote:

Hi,

I kept investigating this, but I haven't made much progress. I still
don't understand why would it be OK to move any of the LSN fields
backwards - certainly for fields like confirm_flush or restart_lsn.

I did a simple experiment - added asserts to the couple places in
logical.c updating the the LSN fields, checking the value is increased.
But then I simply ran make check-world, instead of the stress test.

And that actually fails too, 040_standby_failover_slots_sync.pl triggers
this

{
SpinLockAcquire(&MyReplicationSlot->mutex);
Assert(MyReplicationSlot->data.confirmed_flush <= lsn);
MyReplicationSlot->data.confirmed_flush = lsn;
SpinLockRelease(&MyReplicationSlot->mutex);
}

So this moves confirm_flush back, albeit only by a tiny amount (I've
seen ~56 byte difference). I don't have an example of this causing an
issue in practice, but I note that CheckPointReplicationSlots does this:

if (is_shutdown && SlotIsLogical(s))
{
SpinLockAcquire(&s->mutex);

if (s->data.invalidated == RS_INVAL_NONE &&
s->data.confirmed_flush > s->last_saved_confirmed_flush)
{
s->just_dirtied = true;
s->dirty = true;
}
SpinLockRelease(&s->mutex);
}

to determine if a slot needs to be flushed to disk during checkpoint. So
I guess it's possible we save a slot to disk at some LSN, then the
confirm_flush moves backward, and we fail to sync the slot to disk.

But I don't have a reproducer for this ...

I also noticed a strange difference between LogicalIncreaseXminForSlot
and LogicalIncreaseRestartDecodingForSlot.

The structure of LogicalIncreaseXminForSlot looks like this:

if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
{
}
else if (current_lsn <= slot->data.confirmed_flush)
{
... update candidate fields ...
}
else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
{
... update candidate fields ...
}

while LogicalIncreaseRestartDecodingForSlot looks like this:

if (restart_lsn <= slot->data.restart_lsn)
{
}
else if (current_lsn <= slot->data.confirmed_flush)
{
... update candidate fields ...
}

if (slot->candidate_restart_valid == InvalidXLogRecPtr)
{
... update candidate fields ...
}

Notice that LogicalIncreaseXminForSlot has the third block guarded by
"else if", while LogicalIncreaseRestartDecodingForSlot has "if". Isn't
that a bit suspicious, considering the functions do the same thing, just
for different fields? I don't know if this is dangerous, the comments
suggest it may just waste extra effort after reconnect.

I also suspected this point. I still need to investigate if this
suspicion is related to the issue but I find this code in
LogicalIncreaseRestartDecodingForSlot() is dangerous.

We update slot's restart_lsn based on candidate_lsn and
candidate_valid upon receiving a feedback message from a subscriber,
then clear both fields. Therefore, this code in
LogicalIncreaseRestartDecodingForSlot() means that it sets an
arbitrary LSN to candidate_restart_lsn after updating slot's
restart_lsn.

I think an LSN older than slot's restart_lsn can be passed to
LogicalIncreaseRestartDecodingForSlot() as restart_lsn for example
after logical decoding restarts; My scenario I shared on another
thread was that after updating slot's restart_lsn (upon feedback from
a subscriber) based on both candidate_restart_lsn and
candidate_restart_valid that are remained in the slot, we might call
LogicalIncreaseRestartDecodingForSlot() when decoding a RUNNING_XACTS
record whose LSN is older than the slot's new restart_lsn. In this
case, we end up passing an LSN older than the new restart_lsn to
LogicalIncreaseRestartDecodingForSlot(), and that LSN is set to
candidate_restart_lsn.

Right, I believe that matches my observations. I only see the issues
after (unexpected) restarts, say due to network issues, but chances are
regular reconnects have the same problem.

My hypothesis is that we wanted to prevent such
case by the first if block:

/* don't overwrite if have a newer restart lsn */
if (restart_lsn <= slot->data.restart_lsn)
{
}

Yeah, that condition / comment seems to say exactly that.

Do you plan / expect to work on fixing this? It seems you proposed the
right fix in that old thread, but it's been inactive since 2023/02 :-(

regards

--
Tomas Vondra

#7Ashutosh Bapat
ashutosh.bapat.oss@gmail.com
In reply to: Tomas Vondra (#5)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Sat, Nov 9, 2024 at 5:05 PM Tomas Vondra <tomas@vondra.me> wrote:

On 11/8/24 15:57, Ashutosh Bapat wrote:

...

After examining the code before reading [2], I came to the same
conclusion as Masahiko-san in [2]. We install candidate_restart_lsn
based on the running transaction record whose LSN is between
restart_lsn and confirmed_flush_lsn. Since candidate_restart_valid of
such candidates would be lesser than any confirmed_flush_lsn received
from downstream. I am surprised that the fix suggested by Masahiko-san
didn't work though. The fix also fix the asymmetry, between
LogicalIncreaseXminForSlot and LogicalIncreaseRestartDecodingForSlot,
that you have pointed out in your next email. What behaviour do you
see with that fix applied?

[1] /messages/by-id/Yz2hivgyjS1RfMKs@depesz.com
[2] /messages/by-id/CAD21AoBVhYnGBuW_o=wEGgTp01qiHNAx1a14b1X9kFXmuBe=sg@mail.gmail.com

I read that message (and the earlier discussion multiple times) while
investigating the issue, and TBH it's not very clear to me what the
conclusion is :-(

There's some discussion about whether the candidate fields should be
reset on release or not. There are even claims that it might be
legitimate to not reset the fields and update the restart_lsn. Using
such "stale" LSN values seems rather suspicious to me, but I don't have
a proof that it's incorrect. FWIW this unclarity is what I mentioned the
policy/contract for candidate fields is not explained anywhere.

That being said, I gave that fix a try - see the attached 0001 patch. It
tweaks LogicalIncreaseRestartDecodingForSlot (it needs a bit more care
because of the spinlock), and it adds a couple asserts to make sure the
data.restart_lsn never moves back.

And indeed, with this my stress test script does not crash anymore.

:)

I think the problem is about processing older running transactions
record and setting data.restart_lsn based on the candidates those
records produce. But what is not clear to me is how come a newer
candidate_restart_lsn is available immediately upon WAL sender
restart. I.e. in the sequence of events you mentioned in your first
email
1. 344.139 LOG: starting logical decoding for slot "s1"

2. 344.139 DETAIL: Streaming transactions committing after 1/E97EAC30,
reading WAL from 1/E96FB4D0.

3. 344.140 LOG: logical decoding found consistent point at 1/E96FB4D0

4. 344.140 DETAIL: Logical decoding will begin using saved snapshot.

5. 344.140 LOG: LogicalConfirmReceivedLocation 1/E9865398

6. 344.140 LOG: LogicalConfirmReceivedLocation updating
data.restart_lsn to 1/E979D4C8 (from 1/E96FB4D0)
candidate_restart_valid 0/0 (from 1/E9865398)
candidate_restart_lsn 0/0 (from 1/E979D4C8)

how did candidate_restart_lsn = 1/E979D4C8 and candidate_restart_valid
= 1/E9865398 were set in ReplicationSlot after WAL sender? It means it
must have read and processed running transaction record at 1/E9865398.
If that's true, how come it went back to a running transactions WAL
record at 1/E979D4C8? It should be reading WAL records sequentially,
hence read 1/E979D4C8 first then 1/E9865398.

344.145 LOG: LogicalIncreaseRestartDecodingForSlot s1
candidate_restart_valid_lsn 1/E979D4C8 (0/0)
candidate_restart_lsn 1/E96FB4D0 (0/0)

--
Best Wishes,
Ashutosh Bapat

#8Tomas Vondra
tomas@vondra.me
In reply to: Ashutosh Bapat (#7)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On 11/11/24 14:51, Ashutosh Bapat wrote:

...

I think the problem is about processing older running transactions
record and setting data.restart_lsn based on the candidates those
records produce. But what is not clear to me is how come a newer
candidate_restart_lsn is available immediately upon WAL sender
restart. I.e. in the sequence of events you mentioned in your first
email
1. 344.139 LOG: starting logical decoding for slot "s1"

2. 344.139 DETAIL: Streaming transactions committing after 1/E97EAC30,
reading WAL from 1/E96FB4D0.

3. 344.140 LOG: logical decoding found consistent point at 1/E96FB4D0

4. 344.140 DETAIL: Logical decoding will begin using saved snapshot.

5. 344.140 LOG: LogicalConfirmReceivedLocation 1/E9865398

6. 344.140 LOG: LogicalConfirmReceivedLocation updating
data.restart_lsn to 1/E979D4C8 (from 1/E96FB4D0)
candidate_restart_valid 0/0 (from 1/E9865398)
candidate_restart_lsn 0/0 (from 1/E979D4C8)

how did candidate_restart_lsn = 1/E979D4C8 and candidate_restart_valid
= 1/E9865398 were set in ReplicationSlot after WAL sender? It means it
must have read and processed running transaction record at 1/E9865398.
If that's true, how come it went back to a running transactions WAL
record at 1/E979D4C8? It should be reading WAL records sequentially,
hence read 1/E979D4C8 first then 1/E9865398.

344.145 LOG: LogicalIncreaseRestartDecodingForSlot s1
candidate_restart_valid_lsn 1/E979D4C8 (0/0)
candidate_restart_lsn 1/E96FB4D0 (0/0)

Those are good questions, but IIUC that's explained by this comment from
Masahiko-san's analysis [1]/messages/by-id/CAD21AoBVhYnGBuW_o=wEGgTp01qiHNAx1a14b1X9kFXmuBe=sg@mail.gmail.com:

Thinking about the root cause more, it seems to me that the root cause
is not the fact that candidate_xxx values are not cleared when being
released.

In the scenario I reproduced, after restarting the logical decoding,
the walsender sets the restart_lsn to a candidate_restart_lsn left in
the slot upon receiving the ack from the subscriber. ...

If this is correct, then what happens is:

1) replication is running, at some point we set candidate LSN to B

2) something breaks, causing reconnect with restart LSN A (< B)

3) we still have the candidate LSN B in memory, and after receiving
some confirmation we set it as restart_lsn

4) we get to decode the RUNNING_XACTS, which moves restart_lsn back

If this analysis is correct, I think it's rather suspicious we don't
reset the candidate fields on restart. Can those "old" values ever be
valid? But I haven't tried resetting them.

Also, this is why I'm not entirely sure just tweaking the conditions in
LogicalIncreaseRestartDecodingForSlot is quite correct. Maybe it fixes
this particular issue, but maybe the right fix would be to reset the
candidate fields on reconnect? And this change would be just hiding the
actual problem. I haven't tried this.

[1]: /messages/by-id/CAD21AoBVhYnGBuW_o=wEGgTp01qiHNAx1a14b1X9kFXmuBe=sg@mail.gmail.com
/messages/by-id/CAD21AoBVhYnGBuW_o=wEGgTp01qiHNAx1a14b1X9kFXmuBe=sg@mail.gmail.com

--
Tomas Vondra

#9Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Tomas Vondra (#6)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Sat, Nov 9, 2024 at 3:45 AM Tomas Vondra <tomas@vondra.me> wrote:

On 11/8/24 19:25, Masahiko Sawada wrote:

Hi,

Thank you for investigating this issue.

On Thu, Nov 7, 2024 at 10:40 AM Tomas Vondra <tomas@vondra.me> wrote:

Hi,

I kept investigating this, but I haven't made much progress. I still
don't understand why would it be OK to move any of the LSN fields
backwards - certainly for fields like confirm_flush or restart_lsn.

I did a simple experiment - added asserts to the couple places in
logical.c updating the the LSN fields, checking the value is increased.
But then I simply ran make check-world, instead of the stress test.

And that actually fails too, 040_standby_failover_slots_sync.pl triggers
this

{
SpinLockAcquire(&MyReplicationSlot->mutex);
Assert(MyReplicationSlot->data.confirmed_flush <= lsn);
MyReplicationSlot->data.confirmed_flush = lsn;
SpinLockRelease(&MyReplicationSlot->mutex);
}

So this moves confirm_flush back, albeit only by a tiny amount (I've
seen ~56 byte difference). I don't have an example of this causing an
issue in practice, but I note that CheckPointReplicationSlots does this:

if (is_shutdown && SlotIsLogical(s))
{
SpinLockAcquire(&s->mutex);

if (s->data.invalidated == RS_INVAL_NONE &&
s->data.confirmed_flush > s->last_saved_confirmed_flush)
{
s->just_dirtied = true;
s->dirty = true;
}
SpinLockRelease(&s->mutex);
}

to determine if a slot needs to be flushed to disk during checkpoint. So
I guess it's possible we save a slot to disk at some LSN, then the
confirm_flush moves backward, and we fail to sync the slot to disk.

But I don't have a reproducer for this ...

I also noticed a strange difference between LogicalIncreaseXminForSlot
and LogicalIncreaseRestartDecodingForSlot.

The structure of LogicalIncreaseXminForSlot looks like this:

if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
{
}
else if (current_lsn <= slot->data.confirmed_flush)
{
... update candidate fields ...
}
else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
{
... update candidate fields ...
}

while LogicalIncreaseRestartDecodingForSlot looks like this:

if (restart_lsn <= slot->data.restart_lsn)
{
}
else if (current_lsn <= slot->data.confirmed_flush)
{
... update candidate fields ...
}

if (slot->candidate_restart_valid == InvalidXLogRecPtr)
{
... update candidate fields ...
}

Notice that LogicalIncreaseXminForSlot has the third block guarded by
"else if", while LogicalIncreaseRestartDecodingForSlot has "if". Isn't
that a bit suspicious, considering the functions do the same thing, just
for different fields? I don't know if this is dangerous, the comments
suggest it may just waste extra effort after reconnect.

I also suspected this point. I still need to investigate if this
suspicion is related to the issue but I find this code in
LogicalIncreaseRestartDecodingForSlot() is dangerous.

We update slot's restart_lsn based on candidate_lsn and
candidate_valid upon receiving a feedback message from a subscriber,
then clear both fields. Therefore, this code in
LogicalIncreaseRestartDecodingForSlot() means that it sets an
arbitrary LSN to candidate_restart_lsn after updating slot's
restart_lsn.

I think an LSN older than slot's restart_lsn can be passed to
LogicalIncreaseRestartDecodingForSlot() as restart_lsn for example
after logical decoding restarts; My scenario I shared on another
thread was that after updating slot's restart_lsn (upon feedback from
a subscriber) based on both candidate_restart_lsn and
candidate_restart_valid that are remained in the slot, we might call
LogicalIncreaseRestartDecodingForSlot() when decoding a RUNNING_XACTS
record whose LSN is older than the slot's new restart_lsn. In this
case, we end up passing an LSN older than the new restart_lsn to
LogicalIncreaseRestartDecodingForSlot(), and that LSN is set to
candidate_restart_lsn.

Right, I believe that matches my observations. I only see the issues
after (unexpected) restarts, say due to network issues, but chances are
regular reconnects have the same problem.

My hypothesis is that we wanted to prevent such
case by the first if block:

/* don't overwrite if have a newer restart lsn */
if (restart_lsn <= slot->data.restart_lsn)
{
}

Yeah, that condition / comment seems to say exactly that.

Do you plan / expect to work on fixing this? It seems you proposed the
right fix in that old thread, but it's been inactive since 2023/02 :-(

I'm happy to work on this fix. At that time, I was unsure if my fix
was really correct and there was no further discussion.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#10Tomas Vondra
tomas@vondra.me
In reply to: Tomas Vondra (#8)
5 attachment(s)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On 11/11/24 15:17, Tomas Vondra wrote:

On 11/11/24 14:51, Ashutosh Bapat wrote:

...

I think the problem is about processing older running transactions
record and setting data.restart_lsn based on the candidates those
records produce. But what is not clear to me is how come a newer
candidate_restart_lsn is available immediately upon WAL sender
restart. I.e. in the sequence of events you mentioned in your first
email
1. 344.139 LOG: starting logical decoding for slot "s1"

2. 344.139 DETAIL: Streaming transactions committing after 1/E97EAC30,
reading WAL from 1/E96FB4D0.

3. 344.140 LOG: logical decoding found consistent point at 1/E96FB4D0

4. 344.140 DETAIL: Logical decoding will begin using saved snapshot.

5. 344.140 LOG: LogicalConfirmReceivedLocation 1/E9865398

6. 344.140 LOG: LogicalConfirmReceivedLocation updating
data.restart_lsn to 1/E979D4C8 (from 1/E96FB4D0)
candidate_restart_valid 0/0 (from 1/E9865398)
candidate_restart_lsn 0/0 (from 1/E979D4C8)

how did candidate_restart_lsn = 1/E979D4C8 and candidate_restart_valid
= 1/E9865398 were set in ReplicationSlot after WAL sender? It means it
must have read and processed running transaction record at 1/E9865398.
If that's true, how come it went back to a running transactions WAL
record at 1/E979D4C8? It should be reading WAL records sequentially,
hence read 1/E979D4C8 first then 1/E9865398.

344.145 LOG: LogicalIncreaseRestartDecodingForSlot s1
candidate_restart_valid_lsn 1/E979D4C8 (0/0)
candidate_restart_lsn 1/E96FB4D0 (0/0)

Those are good questions, but IIUC that's explained by this comment from
Masahiko-san's analysis [1]:

Thinking about the root cause more, it seems to me that the root cause
is not the fact that candidate_xxx values are not cleared when being
released.

In the scenario I reproduced, after restarting the logical decoding,
the walsender sets the restart_lsn to a candidate_restart_lsn left in
the slot upon receiving the ack from the subscriber. ...

If this is correct, then what happens is:

1) replication is running, at some point we set candidate LSN to B

2) something breaks, causing reconnect with restart LSN A (< B)

3) we still have the candidate LSN B in memory, and after receiving
some confirmation we set it as restart_lsn

4) we get to decode the RUNNING_XACTS, which moves restart_lsn back

If this analysis is correct, I think it's rather suspicious we don't
reset the candidate fields on restart. Can those "old" values ever be
valid? But I haven't tried resetting them.

To clarify this a bit, I mean something like in the attached 0003 patch.

The reasoning is that after ReplicationSlotAcquire() we should get the
slot in the same state as if we just read it from disk. Because why not?
Why should the result be different from what we'd get if the primary
restated right before the reconnect?

Parts 0001 and 0002 add a couple asserts to prevent backwards move for
both the restart_lsn and the various candidate LSN fields.

Both the 0003 and 0004 patches (applied separately) seems to fix crashes
in my stress test, and none of the asserts from 0001+0002 seem to fail.
I'm not sure if we need both fixes or just one of them.

But neither of those fixes prevents backwards move for confirmed_flush
LSN, as enforced by asserts in the 0005 patch. I don't know if this
assert is incorrect or now. It seems natural that once we get a
confirmation for some LSN, we can't move before that position, but I'm
not sure about that. Maybe it's too strict.

regards

--
Tomas Vondra

Attachments:

0001-asserts-on-restart_lsn-backwards-move.patchtext/x-patch; charset=UTF-8; name=0001-asserts-on-restart_lsn-backwards-move.patchDownload
From be7ba029036613df80562cc735a3040d6f760a90 Mon Sep 17 00:00:00 2001
From: tomas <tomas>
Date: Sat, 9 Nov 2024 12:09:04 +0100
Subject: [PATCH 1/5] asserts on restart_lsn backwards move

---
 src/backend/replication/logical/logical.c | 3 +++
 src/backend/replication/slot.c            | 2 ++
 2 files changed, 5 insertions(+)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2f7b2c85d9b..d76889664a8 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -376,6 +376,7 @@ CreateInitDecodingContext(const char *plugin,
 	else
 	{
 		SpinLockAcquire(&slot->mutex);
+		Assert(slot->data.restart_lsn <= restart_lsn);
 		slot->data.restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 	}
@@ -1746,6 +1747,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		{
 			Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
 
+			Assert(MyReplicationSlot->data.restart_lsn <= MyReplicationSlot->candidate_restart_lsn);
+
 			MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
 			MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
 			MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 037a347cba0..01e854b4486 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1109,6 +1109,7 @@ ReplicationSlotReserveWal(void)
 			/* start at current insert position */
 			restart_lsn = GetXLogInsertRecPtr();
 			SpinLockAcquire(&slot->mutex);
+			Assert(slot->data.restart_lsn <= restart_lsn);
 			slot->data.restart_lsn = restart_lsn;
 			SpinLockRelease(&slot->mutex);
 
@@ -1122,6 +1123,7 @@ ReplicationSlotReserveWal(void)
 		{
 			restart_lsn = GetRedoRecPtr();
 			SpinLockAcquire(&slot->mutex);
+			Assert(slot->data.restart_lsn <= restart_lsn);
 			slot->data.restart_lsn = restart_lsn;
 			SpinLockRelease(&slot->mutex);
 		}
-- 
2.39.5

0002-asserts-for-candidate-lsn-fields.patchtext/x-patch; charset=UTF-8; name=0002-asserts-for-candidate-lsn-fields.patchDownload
From 39024b07c5bc8c3f43fba7a1368368c64a5b6f14 Mon Sep 17 00:00:00 2001
From: tomas <tomas>
Date: Sat, 9 Nov 2024 12:09:42 +0100
Subject: [PATCH 2/5] asserts for candidate lsn fields

---
 src/backend/replication/logical/logical.c | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index d76889664a8..8fe77f8c463 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1599,6 +1599,8 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
+		Assert(slot->candidate_xmin_lsn <= current_lsn);
+
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
 
@@ -1612,6 +1614,8 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 	 */
 	else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
 	{
+		Assert(slot->candidate_xmin_lsn <= current_lsn);
+
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
 	}
@@ -1654,6 +1658,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
+		Assert(slot->candidate_restart_valid <= current_lsn);
+		Assert(slot->candidate_restart_lsn <= restart_lsn);
+
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 
@@ -1668,6 +1675,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 */
 	if (slot->candidate_restart_valid == InvalidXLogRecPtr)
 	{
+		Assert(slot->candidate_restart_valid <= current_lsn);
+		Assert(slot->candidate_restart_lsn <= restart_lsn);
+
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
-- 
2.39.5

0003-reset-slot-in-acquire.patchtext/x-patch; charset=UTF-8; name=0003-reset-slot-in-acquire.patchDownload
From 2437880b414e61cbfd02dca7a5d70ca9853c8a51 Mon Sep 17 00:00:00 2001
From: tomas <tomas>
Date: Mon, 11 Nov 2024 20:52:01 +0100
Subject: [PATCH 3/5] reset slot in acquire

---
 src/backend/replication/slot.c | 24 ++++++++++++++++++++++++
 1 file changed, 24 insertions(+)

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 01e854b4486..c398b91f40c 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -456,6 +456,30 @@ retry:
 
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = s;
+
+	/*
+	 * Reset the time since the slot has become inactive as the slot is active
+	 * now.
+	 */
+	SpinLockAcquire(&s->mutex);
+
+	/*
+	 * ReplicationSlotCreate would do this:
+	 * s->effective_xmin = InvalidTransactionId;
+	 * s->effective_catalog_xmin = InvalidTransactionId;
+	 * s->last_saved_confirmed_flush = InvalidXLogRecPtr;
+	 *
+	 * But we do this more like RestoreSlotFromDisk, as if we loaded the
+	 * slot from disk.
+	 */
+	s->effective_xmin = s->data.xmin;
+	s->effective_catalog_xmin = s->data.catalog_xmin;
+
+	s->candidate_catalog_xmin = InvalidTransactionId;
+	s->candidate_xmin_lsn = InvalidXLogRecPtr;
+	s->candidate_restart_valid = InvalidXLogRecPtr;
+	s->candidate_restart_lsn = InvalidXLogRecPtr;
+	SpinLockRelease(&s->mutex);
 }
 
 /*
-- 
2.39.5

0004-fix-LogicalIncreaseRestartDecodingForSlot.patchtext/x-patch; charset=UTF-8; name=0004-fix-LogicalIncreaseRestartDecodingForSlot.patchDownload
From e3028d078b680b007126c6941a49f3335147b775 Mon Sep 17 00:00:00 2001
From: tomas <tomas>
Date: Mon, 11 Nov 2024 21:00:07 +0100
Subject: [PATCH 4/5] fix LogicalIncreaseRestartDecodingForSlot

---
 src/backend/replication/logical/logical.c | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 8fe77f8c463..40ac9f43ce2 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1638,6 +1638,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 {
 	bool		updated_lsn = false;
 	ReplicationSlot *slot;
+	bool		spin_released = false;
 
 	slot = MyReplicationSlot;
 
@@ -1673,7 +1674,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 * might never end up updating if the receiver acks too slowly. A missed
 	 * value here will just cause some extra effort after reconnecting.
 	 */
-	if (slot->candidate_restart_valid == InvalidXLogRecPtr)
+	else if (slot->candidate_restart_valid == InvalidXLogRecPtr)
 	{
 		Assert(slot->candidate_restart_valid <= current_lsn);
 		Assert(slot->candidate_restart_lsn <= restart_lsn);
@@ -1682,6 +1683,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
+		spin_released = true;
+
 		elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
 			 LSN_FORMAT_ARGS(restart_lsn),
 			 LSN_FORMAT_ARGS(current_lsn));
@@ -1697,6 +1700,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 		confirmed_flush = slot->data.confirmed_flush;
 		SpinLockRelease(&slot->mutex);
 
+		spin_released = true;
+
 		elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
 			 LSN_FORMAT_ARGS(restart_lsn),
 			 LSN_FORMAT_ARGS(current_lsn),
@@ -1705,6 +1710,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 			 LSN_FORMAT_ARGS(confirmed_flush));
 	}
 
+	if (!spin_released)
+		SpinLockRelease(&slot->mutex);
+
 	/* candidates are already valid with the current flush position, apply */
 	if (updated_lsn)
 		LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
-- 
2.39.5

0005-confirmed_flush-asserts.patchtext/x-patch; charset=UTF-8; name=0005-confirmed_flush-asserts.patchDownload
From 017a1fd52a125b4551a22444ae4d67889f20a0ac Mon Sep 17 00:00:00 2001
From: tomas <tomas>
Date: Mon, 11 Nov 2024 20:31:02 +0100
Subject: [PATCH 5/5] confirmed_flush asserts

---
 src/backend/replication/logical/logical.c | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 40ac9f43ce2..73d6149afde 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -605,7 +605,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 	}
 
 	SpinLockAcquire(&slot->mutex);
+	Assert(slot->data.confirmed_flush <= ctx->reader->EndRecPtr);
 	slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+	Assert(slot->data.initial_consistent_point <= ctx->reader->EndRecPtr);
 	slot->data.initial_consistent_point = ctx->reader->EndRecPtr;
 	SpinLockRelease(&slot->mutex);
 }
@@ -1735,6 +1737,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 
 		SpinLockAcquire(&MyReplicationSlot->mutex);
 
+		Assert(MyReplicationSlot->data.confirmed_flush <= lsn);
 		MyReplicationSlot->data.confirmed_flush = lsn;
 
 		/* if we're past the location required for bumping xmin, do so */
@@ -1802,6 +1805,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 	else
 	{
 		SpinLockAcquire(&MyReplicationSlot->mutex);
+		Assert(MyReplicationSlot->data.confirmed_flush <= lsn);
 		MyReplicationSlot->data.confirmed_flush = lsn;
 		SpinLockRelease(&MyReplicationSlot->mutex);
 	}
-- 
2.39.5

#11Tomas Vondra
tomas@vondra.me
In reply to: Masahiko Sawada (#9)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On 11/11/24 21:56, Masahiko Sawada wrote:

...

My hypothesis is that we wanted to prevent such
case by the first if block:

/* don't overwrite if have a newer restart lsn */
if (restart_lsn <= slot->data.restart_lsn)
{
}

Yeah, that condition / comment seems to say exactly that.

Do you plan / expect to work on fixing this? It seems you proposed the
right fix in that old thread, but it's been inactive since 2023/02 :-(

I'm happy to work on this fix. At that time, I was unsure if my fix
was really correct and there was no further discussion.

Thanks. I'm not sure about the correctness either, but I think it's
clear the issue is real, and it's not difficult to reproduce it.

regards

--
Tomas Vondra

#12Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Tomas Vondra (#8)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Mon, Nov 11, 2024 at 6:17 AM Tomas Vondra <tomas@vondra.me> wrote:

If this analysis is correct, I think it's rather suspicious we don't
reset the candidate fields on restart. Can those "old" values ever be
valid? But I haven't tried resetting them.

I had the same question. IIRC resetting them also fixes the
problem[1]/messages/by-id/CAD21AoBG2OSDOFTtpPtQ7fx5Vt8p3dS5hPAv28CBSC6z2kHx-g@mail.gmail.com. However, I got a comment from Alvaro[2]/messages/by-id/20230206152209.yglmntznhcmaueyn@alvherre.pgsql:

Hmm, interesting -- I was studying some other bug recently involving the
xmin of a slot that had been invalidated and I remember wondering if
these "candidate" fields were being properly ignored when the slot is
marked not in use; but I didn't check. Are you sure that resetting them
when the slot is released is the appropriate thing to do? I mean,
shouldn't they be kept set while the slot is in use, and only reset if
we destroy it?

Which made me re-investigate the issue and thought that it doesn't
necessarily need to clear these candidate values in memory on
releasing a slot as long as we're carefully updating restart_lsn.
Which seems a bit efficient for example when restarting from a very
old point. Of course, even if we reset them on releasing a slot, it
would perfectly work since it's the same as restarting logical
decoding with a server restart. I think
LogicalIncreaseRestartDecodingForSlot() should be fixed as it seems
not to be working expectedly, but I could not have proof that we
should either keep or reset them on releasing a slot.

Regards,

[1]: /messages/by-id/CAD21AoBG2OSDOFTtpPtQ7fx5Vt8p3dS5hPAv28CBSC6z2kHx-g@mail.gmail.com
[2]: /messages/by-id/20230206152209.yglmntznhcmaueyn@alvherre.pgsql

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#13Tomas Vondra
tomas@vondra.me
In reply to: Masahiko Sawada (#12)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On 11/11/24 23:41, Masahiko Sawada wrote:

On Mon, Nov 11, 2024 at 6:17 AM Tomas Vondra <tomas@vondra.me> wrote:

If this analysis is correct, I think it's rather suspicious we don't
reset the candidate fields on restart. Can those "old" values ever be
valid? But I haven't tried resetting them.

I had the same question. IIRC resetting them also fixes the
problem[1]. However, I got a comment from Alvaro[2]:

Hmm, interesting -- I was studying some other bug recently involving the
xmin of a slot that had been invalidated and I remember wondering if
these "candidate" fields were being properly ignored when the slot is
marked not in use; but I didn't check. Are you sure that resetting them
when the slot is released is the appropriate thing to do? I mean,
shouldn't they be kept set while the slot is in use, and only reset if
we destroy it?

Which made me re-investigate the issue and thought that it doesn't
necessarily need to clear these candidate values in memory on
releasing a slot as long as we're carefully updating restart_lsn.

Not sure, but maybe it'd be useful to ask the opposite question. Why
shouldn't it be correct to reset the fields, which essentially puts the
slot into the same state as if it was just read from disk? That also
discards all these values, and we can't rely on accidentally keeping
something important info in memory (because if the instance restarts
we'd lose that).

But this reminds me that the patch I shared earlier today resets the
slot in the ReplicationSlotAcquire() function, but I guess that's not
quite correct. It probably should be in the "release" path.

Which seems a bit efficient for example when restarting from a very
old point. Of course, even if we reset them on releasing a slot, it
would perfectly work since it's the same as restarting logical
decoding with a server restart.

I find the "efficiency" argument a bit odd. It'd be fine if we had a
correct behavior to start with, but we don't have that ... Also, I'm not
quite sure why exactly would it be more efficient?

And how likely is this in practice? It seems to me that
performance-sensitive cases can't do reconnects very often anyway,
that's inherently inefficient. No?

I think
LogicalIncreaseRestartDecodingForSlot() should be fixed as it seems
not to be working expectedly, but I could not have proof that we
should either keep or reset them on releasing a slot.

Not sure. Chances are we need both fixes, if only to make
LogicalIncreaseRestartDecodingForSlot more like the other function.

regards

--
Tomas Vondra

#14Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Tomas Vondra (#10)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Mon, Nov 11, 2024 at 2:08 PM Tomas Vondra <tomas@vondra.me> wrote:

But neither of those fixes prevents backwards move for confirmed_flush
LSN, as enforced by asserts in the 0005 patch. I don't know if this
assert is incorrect or now. It seems natural that once we get a
confirmation for some LSN, we can't move before that position, but I'm
not sure about that. Maybe it's too strict.

Hmm, I'm concerned that it might be another problem. I think there are
some cases where a subscriber sends a flush position older than slot's
confirmed_flush as a feedback message. But it seems to be dangerous if
we always accept it as a new confirmed_flush value. It could happen
that confirm_flush could be set to a LSN older than restart_lsn.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#15Ashutosh Bapat
ashutosh.bapat.oss@gmail.com
In reply to: Masahiko Sawada (#14)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Tue, Nov 12, 2024 at 12:02 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Mon, Nov 11, 2024 at 2:08 PM Tomas Vondra <tomas@vondra.me> wrote:

But neither of those fixes prevents backwards move for confirmed_flush
LSN, as enforced by asserts in the 0005 patch. I don't know if this
assert is incorrect or now. It seems natural that once we get a
confirmation for some LSN, we can't move before that position, but I'm
not sure about that. Maybe it's too strict.

Hmm, I'm concerned that it might be another problem. I think there are
some cases where a subscriber sends a flush position older than slot's
confirmed_flush as a feedback message. But it seems to be dangerous if
we always accept it as a new confirmed_flush value. It could happen
that confirm_flush could be set to a LSN older than restart_lsn.

If confirmed_flush LSN moves backwards, it means the transactions
which were thought to be replicated earlier are no longer considered
to be replicated. This means that the restart_lsn of the slot needs to
be at least far back as the oldest of starting points of those
transactions. Thus restart_lsn of slot has to be pushed further back.
That WAL may not be available anymore. Similar issue with
catalog_xmin, the older catalog rows may have been removed. Other
problem is we may send some transactions twice, which might cause
trouble downstream. So I agree that confirmed_flush LSN should not
move backwards. IIRC, if the downstream sends an older confirmed_flush
in START_REPLICATION message, WAL sender does not consider it and
instead uses the one in replication slot.

--
Best Wishes,
Ashutosh Bapat

#16Ashutosh Bapat
ashutosh.bapat.oss@gmail.com
In reply to: Tomas Vondra (#13)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Tue, Nov 12, 2024 at 4:54 AM Tomas Vondra <tomas@vondra.me> wrote:

On 11/11/24 23:41, Masahiko Sawada wrote:

On Mon, Nov 11, 2024 at 6:17 AM Tomas Vondra <tomas@vondra.me> wrote:

Which made me re-investigate the issue and thought that it doesn't
necessarily need to clear these candidate values in memory on
releasing a slot as long as we're carefully updating restart_lsn.

Not sure, but maybe it'd be useful to ask the opposite question. Why
shouldn't it be correct to reset the fields, which essentially puts the
slot into the same state as if it was just read from disk? That also
discards all these values, and we can't rely on accidentally keeping
something important info in memory (because if the instance restarts
we'd lose that).

But this reminds me that the patch I shared earlier today resets the
slot in the ReplicationSlotAcquire() function, but I guess that's not
quite correct. It probably should be in the "release" path.

Which seems a bit efficient for example when restarting from a very
old point. Of course, even if we reset them on releasing a slot, it
would perfectly work since it's the same as restarting logical
decoding with a server restart.

I find the "efficiency" argument a bit odd. It'd be fine if we had a
correct behavior to start with, but we don't have that ... Also, I'm not
quite sure why exactly would it be more efficient?

And how likely is this in practice? It seems to me that
performance-sensitive cases can't do reconnects very often anyway,
that's inherently inefficient. No?

I think
LogicalIncreaseRestartDecodingForSlot() should be fixed as it seems
not to be working expectedly, but I could not have proof that we
should either keep or reset them on releasing a slot.

Not sure. Chances are we need both fixes, if only to make
LogicalIncreaseRestartDecodingForSlot more like the other function.

Thanks a lot for pointing to Masahiko's analysis. I missed that part
when I read that thread. Sorry.

A candidate_restart_lsn and candidate_restart_valid pair just tells
that we may set slot's data.restart_lsn to candidate_restart_lsn when
the downstream confirms an LSN >= candidate_restart_valid. That pair
can never get inaccurate. It may get stale but never inaccurate. So
wiping those fields from ReplicationSlot is unnecessary.

What should ideally happen is we should ignore candidates produced by
older running transactions WAL records after WAL sender restart. This
is inline with what logical replication does with transactions
committed before slot's confirmed_flush_lsn - those are ignored. But
the criteria for ignoring running transactions records is slightly
different from that for transactions. If we ignore
candidate_restart_lsn which has candidate_restart_valid <=
confirmed_flush_lsn, we might lose some opportunity to advance
data.restart_lsn. Instead we should ignore any candidate_restart_lsn
<= data.restart_lsn especially before WAL sender finds first change to
send downstream. We can do that in SnapBuildProcessRunningXacts() by
accessing MyReplicationSlot, taking lock on it and then comparing
data.restart_lsn with txn->restart_decoding_lsn before calling
LogicalIncreaseRestartDecodingForSlot(). But then
LogicalIncreaseRestartDecodingForSlot() would be doing the same anyway
after applying your patch 0004. The only downside of 0004 is that the
logic to ignore candidates produced by a running transactions record
is not clearly visible in SnapBuildProcessRunningXacts(). For a
transaction which is ignored the logic to ignore the transaction is
visible in DecodeCommit() or DecodeAbort() - where people are likely
to look for that logic. We may add a comment to that effect in
SnapBuildProcessRunningXacts().

--
Best Wishes,
Ashutosh Bapat

#17Tomas Vondra
tomas@vondra.me
In reply to: Ashutosh Bapat (#16)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On 11/12/24 10:37, Ashutosh Bapat wrote:

On Tue, Nov 12, 2024 at 4:54 AM Tomas Vondra <tomas@vondra.me> wrote:

On 11/11/24 23:41, Masahiko Sawada wrote:

On Mon, Nov 11, 2024 at 6:17 AM Tomas Vondra <tomas@vondra.me> wrote:

Which made me re-investigate the issue and thought that it doesn't
necessarily need to clear these candidate values in memory on
releasing a slot as long as we're carefully updating restart_lsn.

Not sure, but maybe it'd be useful to ask the opposite question. Why
shouldn't it be correct to reset the fields, which essentially puts the
slot into the same state as if it was just read from disk? That also
discards all these values, and we can't rely on accidentally keeping
something important info in memory (because if the instance restarts
we'd lose that).

But this reminds me that the patch I shared earlier today resets the
slot in the ReplicationSlotAcquire() function, but I guess that's not
quite correct. It probably should be in the "release" path.

Which seems a bit efficient for example when restarting from a very
old point. Of course, even if we reset them on releasing a slot, it
would perfectly work since it's the same as restarting logical
decoding with a server restart.

I find the "efficiency" argument a bit odd. It'd be fine if we had a
correct behavior to start with, but we don't have that ... Also, I'm not
quite sure why exactly would it be more efficient?

And how likely is this in practice? It seems to me that
performance-sensitive cases can't do reconnects very often anyway,
that's inherently inefficient. No?

I think
LogicalIncreaseRestartDecodingForSlot() should be fixed as it seems
not to be working expectedly, but I could not have proof that we
should either keep or reset them on releasing a slot.

Not sure. Chances are we need both fixes, if only to make
LogicalIncreaseRestartDecodingForSlot more like the other function.

Thanks a lot for pointing to Masahiko's analysis. I missed that part
when I read that thread. Sorry.

No need to apologize. The discussion is complex and spread over a rather
long time period.

A candidate_restart_lsn and candidate_restart_valid pair just tells
that we may set slot's data.restart_lsn to candidate_restart_lsn when
the downstream confirms an LSN >= candidate_restart_valid. That pair
can never get inaccurate. It may get stale but never inaccurate. So
wiping those fields from ReplicationSlot is unnecessary.

Isn't this issue a proof that those fields *can* get inaccurate? Or what
do you mean by "stale but not inaccurate"?

What should ideally happen is we should ignore candidates produced by
older running transactions WAL records after WAL sender restart. This
is inline with what logical replication does with transactions
committed before slot's confirmed_flush_lsn - those are ignored. But
the criteria for ignoring running transactions records is slightly
different from that for transactions. If we ignore
candidate_restart_lsn which has candidate_restart_valid <=
confirmed_flush_lsn, we might lose some opportunity to advance
data.restart_lsn. Instead we should ignore any candidate_restart_lsn
<= data.restart_lsn especially before WAL sender finds first change to
send downstream. We can do that in SnapBuildProcessRunningXacts() by
accessing MyReplicationSlot, taking lock on it and then comparing
data.restart_lsn with txn->restart_decoding_lsn before calling
LogicalIncreaseRestartDecodingForSlot(). But then
LogicalIncreaseRestartDecodingForSlot() would be doing the same anyway
after applying your patch 0004. The only downside of 0004 is that the
logic to ignore candidates produced by a running transactions record
is not clearly visible in SnapBuildProcessRunningXacts(). For a
transaction which is ignored the logic to ignore the transaction is
visible in DecodeCommit() or DecodeAbort() - where people are likely
to look for that logic. We may add a comment to that effect in
SnapBuildProcessRunningXacts().

I have thought about just doing something like:

slot->data.restart_lsn = Max(slot->data.restart_lsn, new_lsn);

and similar for the other LSN fields. And it did resolve the issue at
hand, of course. But it seems sloppy, and I'm worried it might easily
mask actual issues in other cases.

I'm still of the opinion that (with the exception of a reconnect), these
places should not need to deal with values that go backwards. It should
work just fine without the Max(), and we should add Asserts() to check
that it's always a higher LSN.

For the reconnect, I think it's a bit as if the primary restarted right
before the reconnect. That could happen anyway, and we need to handle
that correctly - if not, we have yet another issue, IMHO. And with the
restart it's the same as writing the slot to disk and reading it back,
which also doesn't retain most of the fields. So it seems cleaner to do
the same thing and just reset the various fields.

I haven't thought about making SnapBuildProcessRunningXacts() more
complex to consider this stuff. But I doubt we'd like to be accessing
slots from that code - it has nothing to do with slots. If anything,
tweaking LogicalIncreaseRestartDecodingForSlot() seems more appropriate,
but I'm still wondering if the current coding was intentional and we're
just missing why it was written like this.

There's also the question of backpatching - the simpler the better, and
this I think just resetting the fields wins in this regard. The main
question is whether it's correct - I think it is. I'm not too worried
about efficiency very much, on the grounds that this should not matter
very often (only after unexpected restart). Correctness > efficiency.

regards

--
Tomas Vondra

#18Ashutosh Bapat
ashutosh.bapat.oss@gmail.com
In reply to: Tomas Vondra (#17)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Tue, Nov 12, 2024 at 4:55 PM Tomas Vondra <tomas@vondra.me> wrote:

A candidate_restart_lsn and candidate_restart_valid pair just tells
that we may set slot's data.restart_lsn to candidate_restart_lsn when
the downstream confirms an LSN >= candidate_restart_valid. That pair
can never get inaccurate. It may get stale but never inaccurate. So
wiping those fields from ReplicationSlot is unnecessary.

Isn't this issue a proof that those fields *can* get inaccurate? Or what
do you mean by "stale but not inaccurate"?

While processing a running transaction log record at RT1 the WAL
sender gathers that slot's data.restart_lsn can be set to
candidate_restart_lsn1 when confirmed_flush_lsn is past
candidate_valid_restart1. In that sense it's accurate irrespective of
when those candidates are generated. But when the next running
transactions record RT2 is processed a new pair of
candidate_restart_lsn2 and candidate_restart_valid2 is produced
(candidate_restart_valid2 > candidate_restart_valid1). Thus if
confirmed_flush_lsn >= candidate_restart_valid2, it's better to set
data.restart_lsn = candidate_restart_lsn2 instead of
candidate_restart_lsn1. In that sense the first pair becomes stale. If
we could maintain all such pairs in a sorted fashion, we would be able
to set data.restart_lsn to the latest candiate_restart_lsn for a
received confirmed_flush_lsn and remove all the pairs including the
latest one after setting data.restart_lsn. But we don't maintain such
a list and hence the business of resetting candiate_restart_lsn and
candidate_restart_valid to indicate that we have consumed the previous
pair and are ready for a new one. We don't keep updating
candidate_restart_lsn and candidate_restart_valid to avoid chasing a
moving target and never updating data.restart_lsn.

What should ideally happen is we should ignore candidates produced by
older running transactions WAL records after WAL sender restart. This
is inline with what logical replication does with transactions
committed before slot's confirmed_flush_lsn - those are ignored. But
the criteria for ignoring running transactions records is slightly
different from that for transactions. If we ignore
candidate_restart_lsn which has candidate_restart_valid <=
confirmed_flush_lsn, we might lose some opportunity to advance
data.restart_lsn. Instead we should ignore any candidate_restart_lsn
<= data.restart_lsn especially before WAL sender finds first change to
send downstream. We can do that in SnapBuildProcessRunningXacts() by
accessing MyReplicationSlot, taking lock on it and then comparing
data.restart_lsn with txn->restart_decoding_lsn before calling
LogicalIncreaseRestartDecodingForSlot(). But then
LogicalIncreaseRestartDecodingForSlot() would be doing the same anyway
after applying your patch 0004. The only downside of 0004 is that the
logic to ignore candidates produced by a running transactions record
is not clearly visible in SnapBuildProcessRunningXacts(). For a
transaction which is ignored the logic to ignore the transaction is
visible in DecodeCommit() or DecodeAbort() - where people are likely
to look for that logic. We may add a comment to that effect in
SnapBuildProcessRunningXacts().

I have thought about just doing something like:

slot->data.restart_lsn = Max(slot->data.restart_lsn, new_lsn);

and similar for the other LSN fields. And it did resolve the issue at
hand, of course. But it seems sloppy, and I'm worried it might easily
mask actual issues in other cases.

Agreed.

I'm still of the opinion that (with the exception of a reconnect), these
places should not need to deal with values that go backwards. It should
work just fine without the Max(), and we should add Asserts() to check
that it's always a higher LSN.

Agreed.

I haven't thought about making SnapBuildProcessRunningXacts() more
complex to consider this stuff. But I doubt we'd like to be accessing
slots from that code - it has nothing to do with slots. If anything,
tweaking LogicalIncreaseRestartDecodingForSlot() seems more appropriate,

Agree.

but I'm still wondering if the current coding was intentional and we're
just missing why it was written like this.

Interestingly, the asymmetry between the functions is added in the
same commit b89e151054a05f0f6d356ca52e3b725dd0505e53. I doubt it was
intentional; the comments at both places say the same thing. This
problem is probably as old as that commit.

For the reconnect, I think it's a bit as if the primary restarted right
before the reconnect. That could happen anyway, and we need to handle
that correctly - if not, we have yet another issue, IMHO. And with the
restart it's the same as writing the slot to disk and reading it back,
which also doesn't retain most of the fields. So it seems cleaner to do
the same thing and just reset the various fields.

There's also the question of backpatching - the simpler the better, and
this I think just resetting the fields wins in this regard. The main
question is whether it's correct - I think it is. I'm not too worried
about efficiency very much, on the grounds that this should not matter
very often (only after unexpected restart). Correctness > efficiency.

If the slot's restart_lsn is very old before disconnect we will lose
an opportunity to update the restart_lsn and thus release some
resources earlier. However, that opportunity is only for a short
duration. On a fast enough machine the data.restart_lsn will be
updated anyway after processing all running transactions wal records
anyway. So I am also of the opinion that the argument of efficiency
won't stand here. But I doubt if "not resetting" is wrong. It looks
more intentional to me than the asymmetry between
LogicalIncreaseRestartDecodingForSlot and LogicalIncreaseXminForSlot.
This is our chance to settle that asymmetry forever :D.

I don't have a strong argument against resetting candidates in slots
at SlotRelease. However, resetting effective_xmin and
effective_catalog_xmin may does not look good. The old values may have
been used to advance xmin horizon. I have not closely read the code to
know whether the on-disk xmin and effective_xmin are always in sync
when computing xmin horizon or not.

--
Best Wishes,
Ashutosh Bapat

#19Amit Kapila
amit.kapila16@gmail.com
In reply to: Tomas Vondra (#17)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Tue, Nov 12, 2024 at 4:55 PM Tomas Vondra <tomas@vondra.me> wrote:

There's also the question of backpatching - the simpler the better, and
this I think just resetting the fields wins in this regard. The main
question is whether it's correct - I think it is. I'm not too worried
about efficiency very much, on the grounds that this should not matter
very often (only after unexpected restart). Correctness > efficiency.

Sure, but what is wrong with changing
LogicalIncreaseRestartDecodingForSlot's "if
(slot->candidate_restart_valid == InvalidXLogRecPtr)" to "else if
(slot->candidate_restart_valid == InvalidXLogRecPtr)"? My previous
analysis [1]/messages/by-id/CAA4eK1JvyWHzMwhO9jzPquctE_ha6bz3EkB3KE6qQJx63StErQ@mail.gmail.com[2]/messages/by-id/CAA4eK1KcnTvwrVqmpRTEMpyarBeTxwr8KA+kaveQOiqJ0zYsXA@mail.gmail.com on similar issue also leads to that conclusion. Then
later Sawada-San's email [3]/messages/by-id/CAD21AoBVhYnGBuW_o=wEGgTp01qiHNAx1a14b1X9kFXmuBe=sg@mail.gmail.com also leads to the same solution. I know
that the same has been discussed in this thread and we are primarily
worried about whether we are missing some case that needs the current
code in LogicalIncreaseRestartDecodingForSlot(). It is always possible
that all who have analyzed are missing some point but I feel the
chances are less. I vote to fix LogicalIncreaseRestartDecodingForSlot.

Now, we have at least some theory to not clear candidate_restart_*
values which is why to prevent advancing restart_lsn earlier if we get
confirmation from the subscriber. Now, your theory that walsender
exits should be rare so this doesn't impact much is also true but OTOH
why change something that can work more efficiently provided we fix
LogicalIncreaseRestartDecodingForSlot as per our analysis?

[1]: /messages/by-id/CAA4eK1JvyWHzMwhO9jzPquctE_ha6bz3EkB3KE6qQJx63StErQ@mail.gmail.com
[2]: /messages/by-id/CAA4eK1KcnTvwrVqmpRTEMpyarBeTxwr8KA+kaveQOiqJ0zYsXA@mail.gmail.com
[3]: /messages/by-id/CAD21AoBVhYnGBuW_o=wEGgTp01qiHNAx1a14b1X9kFXmuBe=sg@mail.gmail.com

--
With Regards,
Amit Kapila.

#20Tomas Vondra
tomas@vondra.me
In reply to: Amit Kapila (#19)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On 11/12/24 13:19, Amit Kapila wrote:

On Tue, Nov 12, 2024 at 4:55 PM Tomas Vondra <tomas@vondra.me> wrote:

There's also the question of backpatching - the simpler the better, and
this I think just resetting the fields wins in this regard. The main
question is whether it's correct - I think it is. I'm not too worried
about efficiency very much, on the grounds that this should not matter
very often (only after unexpected restart). Correctness > efficiency.

Sure, but what is wrong with changing
LogicalIncreaseRestartDecodingForSlot's "if
(slot->candidate_restart_valid == InvalidXLogRecPtr)" to "else if
(slot->candidate_restart_valid == InvalidXLogRecPtr)"? My previous
analysis [1][2] on similar issue also leads to that conclusion. Then
later Sawada-San's email [3] also leads to the same solution. I know
that the same has been discussed in this thread and we are primarily
worried about whether we are missing some case that needs the current
code in LogicalIncreaseRestartDecodingForSlot(). It is always possible
that all who have analyzed are missing some point but I feel the
chances are less. I vote to fix LogicalIncreaseRestartDecodingForSlot.

I'm not opposed to adjusting LogicalIncreaseRestartDecodingForSlot().
The difference from LogicalIncreaseXminForSlot() seems accidental, in
which case fixing it seems desirable.

Now, we have at least some theory to not clear candidate_restart_*
values which is why to prevent advancing restart_lsn earlier if we get
confirmation from the subscriber. Now, your theory that walsender
exits should be rare so this doesn't impact much is also true but OTOH
why change something that can work more efficiently provided we fix
LogicalIncreaseRestartDecodingForSlot as per our analysis?

Because with how it works now it's impossible to check if the LSN values
are correct. We get a LSN value that's before what we already have in
the slot, and it might be either fine after a restart, or a bug (perhaps
on the subscriber side) causing all kinds of strange issues.

I'm all in favor of making stuff more efficient, but only if it doesn't
harm reliability. And that does seem to be happening here, because this
(not resetting + inability to have strict checks on the LSN updates)
seems to be one of the reasons why we have this bug since 9.4.

Sure, maybe fixing LogicalIncreaseRestartDecodingForSlot() is enough to
fix this particular case. But I'd be happier if we could also add
asserts checking the LSN advances, to detect similar issues that we may
be unaware of yet.

regards

--
Tomas Vondra

#21Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Ashutosh Bapat (#18)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Tue, Nov 12, 2024 at 4:08 AM Ashutosh Bapat
<ashutosh.bapat.oss@gmail.com> wrote:

On Tue, Nov 12, 2024 at 4:55 PM Tomas Vondra <tomas@vondra.me> wrote:

but I'm still wondering if the current coding was intentional and we're
just missing why it was written like this.

Interestingly, the asymmetry between the functions is added in the
same commit b89e151054a05f0f6d356ca52e3b725dd0505e53. I doubt it was
intentional; the comments at both places say the same thing. This
problem is probably as old as that commit.

FYI the asymmetry appears to have been present since the first version
of the patch that gave the LogicalIncreaseRestartDecodingForSlot()
this form[1]/messages/by-id/20131212000839.GA22776@awork2.anarazel.de.

For the reconnect, I think it's a bit as if the primary restarted right
before the reconnect. That could happen anyway, and we need to handle
that correctly - if not, we have yet another issue, IMHO. And with the
restart it's the same as writing the slot to disk and reading it back,
which also doesn't retain most of the fields. So it seems cleaner to do
the same thing and just reset the various fields.

There's also the question of backpatching - the simpler the better, and
this I think just resetting the fields wins in this regard. The main
question is whether it's correct - I think it is. I'm not too worried
about efficiency very much, on the grounds that this should not matter
very often (only after unexpected restart). Correctness > efficiency.

If the slot's restart_lsn is very old before disconnect we will lose
an opportunity to update the restart_lsn and thus release some
resources earlier. However, that opportunity is only for a short
duration. On a fast enough machine the data.restart_lsn will be
updated anyway after processing all running transactions wal records
anyway. So I am also of the opinion that the argument of efficiency
won't stand here. But I doubt if "not resetting" is wrong. It looks
more intentional to me than the asymmetry between
LogicalIncreaseRestartDecodingForSlot and LogicalIncreaseXminForSlot.

In addition to the asymmetry between two functions, the reason why I
think we should fix LogicalIncreaseRestartDecodingForSlot() is the
fact that it accepts any LSN as a candidate restart_lsn. Which is
dangerous and incorrect to me even if there wasn't the asymmetry
stuff.

Regards,

[1]: /messages/by-id/20131212000839.GA22776@awork2.anarazel.de

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#22Ashutosh Bapat
ashutosh.bapat.oss@gmail.com
In reply to: Tomas Vondra (#17)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Tue, Nov 12, 2024 at 4:55 PM Tomas Vondra <tomas@vondra.me> wrote:

On 11/12/24 10:37, Ashutosh Bapat wrote:

On Tue, Nov 12, 2024 at 4:54 AM Tomas Vondra <tomas@vondra.me> wrote:

On 11/11/24 23:41, Masahiko Sawada wrote:

On Mon, Nov 11, 2024 at 6:17 AM Tomas Vondra <tomas@vondra.me> wrote:

Which made me re-investigate the issue and thought that it doesn't
necessarily need to clear these candidate values in memory on
releasing a slot as long as we're carefully updating restart_lsn.

Not sure, but maybe it'd be useful to ask the opposite question. Why
shouldn't it be correct to reset the fields, which essentially puts the
slot into the same state as if it was just read from disk? That also
discards all these values, and we can't rely on accidentally keeping
something important info in memory (because if the instance restarts
we'd lose that).

But this reminds me that the patch I shared earlier today resets the
slot in the ReplicationSlotAcquire() function, but I guess that's not
quite correct. It probably should be in the "release" path.

Which seems a bit efficient for example when restarting from a very
old point. Of course, even if we reset them on releasing a slot, it
would perfectly work since it's the same as restarting logical
decoding with a server restart.

I find the "efficiency" argument a bit odd. It'd be fine if we had a
correct behavior to start with, but we don't have that ... Also, I'm not
quite sure why exactly would it be more efficient?

And how likely is this in practice? It seems to me that
performance-sensitive cases can't do reconnects very often anyway,
that's inherently inefficient. No?

I think
LogicalIncreaseRestartDecodingForSlot() should be fixed as it seems
not to be working expectedly, but I could not have proof that we
should either keep or reset them on releasing a slot.

Not sure. Chances are we need both fixes, if only to make
LogicalIncreaseRestartDecodingForSlot more like the other function.

Thanks a lot for pointing to Masahiko's analysis. I missed that part
when I read that thread. Sorry.

No need to apologize. The discussion is complex and spread over a rather
long time period.

A candidate_restart_lsn and candidate_restart_valid pair just tells
that we may set slot's data.restart_lsn to candidate_restart_lsn when
the downstream confirms an LSN >= candidate_restart_valid. That pair
can never get inaccurate. It may get stale but never inaccurate. So
wiping those fields from ReplicationSlot is unnecessary.

Isn't this issue a proof that those fields *can* get inaccurate? Or what
do you mean by "stale but not inaccurate"?

What should ideally happen is we should ignore candidates produced by
older running transactions WAL records after WAL sender restart. This
is inline with what logical replication does with transactions
committed before slot's confirmed_flush_lsn - those are ignored. But
the criteria for ignoring running transactions records is slightly
different from that for transactions. If we ignore
candidate_restart_lsn which has candidate_restart_valid <=
confirmed_flush_lsn, we might lose some opportunity to advance
data.restart_lsn. Instead we should ignore any candidate_restart_lsn
<= data.restart_lsn especially before WAL sender finds first change to
send downstream. We can do that in SnapBuildProcessRunningXacts() by
accessing MyReplicationSlot, taking lock on it and then comparing
data.restart_lsn with txn->restart_decoding_lsn before calling
LogicalIncreaseRestartDecodingForSlot(). But then
LogicalIncreaseRestartDecodingForSlot() would be doing the same anyway
after applying your patch 0004. The only downside of 0004 is that the
logic to ignore candidates produced by a running transactions record
is not clearly visible in SnapBuildProcessRunningXacts(). For a
transaction which is ignored the logic to ignore the transaction is
visible in DecodeCommit() or DecodeAbort() - where people are likely
to look for that logic. We may add a comment to that effect in
SnapBuildProcessRunningXacts().

I have thought about just doing something like:

slot->data.restart_lsn = Max(slot->data.restart_lsn, new_lsn);

and similar for the other LSN fields. And it did resolve the issue at
hand, of course. But it seems sloppy, and I'm worried it might easily
mask actual issues in other cases.

I'm still of the opinion that (with the exception of a reconnect), these
places should not need to deal with values that go backwards. It should
work just fine without the Max(), and we should add Asserts() to check
that it's always a higher LSN.

For the reconnect, I think it's a bit as if the primary restarted right
before the reconnect. That could happen anyway, and we need to handle
that correctly - if not, we have yet another issue, IMHO. And with the
restart it's the same as writing the slot to disk and reading it back,
which also doesn't retain most of the fields. So it seems cleaner to do
the same thing and just reset the various fields.

I haven't thought about making SnapBuildProcessRunningXacts() more
complex to consider this stuff. But I doubt we'd like to be accessing
slots from that code - it has nothing to do with slots.

Here's way we can fix SnapBuildProcessRunningXacts() similar to
DecodeCommit(). DecodeCommit() uses SnapBuildXactNeedsSkip() to decide
whether a given transaction should be decoded or not.
/*
* Should the contents of transaction ending at 'ptr' be decoded?
*/
bool
SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
{
return ptr < builder->start_decoding_at;
}

Similar to SnapBuild::start_decoding_at we could maintain a field
SnapBuild::start_reading_at to the LSN from which the WAL sender would
start reading WAL. If candidate_restart_lsn produced by a running
transactions WAL record is less than SnapBuild::start_reading_at,
SnapBuildProcessRunningXacts() won't call
LogicalIncreaseRestartDecodingForSlot() with that candiate LSN. We
won't access the slot here and the solution will be inline with
DecodeCommit() which skips the transactions.

--
Best Wishes,
Ashutosh Bapat

#23Amit Kapila
amit.kapila16@gmail.com
In reply to: Tomas Vondra (#20)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Tue, Nov 12, 2024 at 6:29 PM Tomas Vondra <tomas@vondra.me> wrote:

Sure, maybe fixing LogicalIncreaseRestartDecodingForSlot() is enough to
fix this particular case. But I'd be happier if we could also add
asserts checking the LSN advances, to detect similar issues that we may
be unaware of yet.

As most of us lean towards fixing
LogicalIncreaseRestartDecodingForSlot(), let's fix that in the HEAD
and back branches. Separately we can consider other asserts just for
HEAD that you think will make the code robust and help avoid such bugs
in the future.

--
With Regards,
Amit Kapila.

#24Amit Kapila
amit.kapila16@gmail.com
In reply to: Ashutosh Bapat (#15)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Tue, Nov 12, 2024 at 12:43 PM Ashutosh Bapat
<ashutosh.bapat.oss@gmail.com> wrote:

On Tue, Nov 12, 2024 at 12:02 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Mon, Nov 11, 2024 at 2:08 PM Tomas Vondra <tomas@vondra.me> wrote:

But neither of those fixes prevents backwards move for confirmed_flush
LSN, as enforced by asserts in the 0005 patch. I don't know if this
assert is incorrect or now. It seems natural that once we get a
confirmation for some LSN, we can't move before that position, but I'm
not sure about that. Maybe it's too strict.

Hmm, I'm concerned that it might be another problem. I think there are
some cases where a subscriber sends a flush position older than slot's
confirmed_flush as a feedback message.

Right, it can happen for cases where subscribers doesn't have to do
anything (for example DDLs) like I have explained in one of my emails
[1]: /messages/by-id/CAA4eK1+zWQwOe5G8zCYGvErnaXh5+Dbyg_A1Z3uywSf_4=T0UA@mail.gmail.com

But it seems to be dangerous if
we always accept it as a new confirmed_flush value. It could happen
that confirm_flush could be set to a LSN older than restart_lsn.

Possible, though I haven't tried to reproduce such a case. But, will
it create any issues? I don't know if there is any benefit in allowing
to move confirmed_flush LSN backward. AFAIR, we don't allow such
backward values to persist. They will temporarily be in memory. I
think as a separate patch we should prevent it from moving backward.

If confirmed_flush LSN moves backwards, it means the transactions
which were thought to be replicated earlier are no longer considered
to be replicated. This means that the restart_lsn of the slot needs to
be at least far back as the oldest of starting points of those
transactions. Thus restart_lsn of slot has to be pushed further back.

I don't see a reason to move restart_lsn backward. Why do you think so?

[1]: /messages/by-id/CAA4eK1+zWQwOe5G8zCYGvErnaXh5+Dbyg_A1Z3uywSf_4=T0UA@mail.gmail.com

--
With Regards,
Amit Kapila.

#25Tomas Vondra
tomas@vondra.me
In reply to: Amit Kapila (#24)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On 11/13/24 11:59, Amit Kapila wrote:

On Tue, Nov 12, 2024 at 12:43 PM Ashutosh Bapat
<ashutosh.bapat.oss@gmail.com> wrote:

On Tue, Nov 12, 2024 at 12:02 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Mon, Nov 11, 2024 at 2:08 PM Tomas Vondra <tomas@vondra.me> wrote:

But neither of those fixes prevents backwards move for confirmed_flush
LSN, as enforced by asserts in the 0005 patch. I don't know if this
assert is incorrect or now. It seems natural that once we get a
confirmation for some LSN, we can't move before that position, but I'm
not sure about that. Maybe it's too strict.

Hmm, I'm concerned that it might be another problem. I think there are
some cases where a subscriber sends a flush position older than slot's
confirmed_flush as a feedback message.

Right, it can happen for cases where subscribers doesn't have to do
anything (for example DDLs) like I have explained in one of my emails
[1]

Thanks. I admit not being entirely familiar with all the details, but
doesn't that email explain more "Why it currently happens?" rather than
"Is this what should be happening?"

Sure, the subscriber needs to confirm changes for which nothing needs to
be done, like DDL. But isn't there a better way to do that, rather than
allowing confirmed_lsn to go backwards?

But it seems to be dangerous if
we always accept it as a new confirmed_flush value. It could happen
that confirm_flush could be set to a LSN older than restart_lsn.

Possible, though I haven't tried to reproduce such a case. But, will
it create any issues? I don't know if there is any benefit in allowing
to move confirmed_flush LSN backward. AFAIR, we don't allow such
backward values to persist. They will temporarily be in memory. I
think as a separate patch we should prevent it from moving backward.

If confirmed_flush LSN moves backwards, it means the transactions
which were thought to be replicated earlier are no longer considered
to be replicated. This means that the restart_lsn of the slot needs to
be at least far back as the oldest of starting points of those
transactions. Thus restart_lsn of slot has to be pushed further back.

I don't see a reason to move restart_lsn backward. Why do you think so?

I think what Ashutosh is saying that if confirmed_flush is allowed to
move backwards, that may result in start_lsn moving backwards too. And
we need to be able to decode all transactions committed since start_lsn,
so if start_lsn moves backwards, maybe restart_lsn needs to move
backwards too. I have no idea if confirmed_flush/start_lsn can move
backwards enough to require restart_lsn to move, though.

Anyway, these discussions are a good illustration why I think allowing
these LSNs to move backwards is a problem. It either causes bugs (like
with breaking replication slots) and/or it makes the reasoning about
correct behavior much harder.

regards

--
Tomas Vondra

#26Tomas Vondra
tomas@vondra.me
In reply to: Amit Kapila (#23)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On 11/13/24 10:38, Amit Kapila wrote:

On Tue, Nov 12, 2024 at 6:29 PM Tomas Vondra <tomas@vondra.me> wrote:

Sure, maybe fixing LogicalIncreaseRestartDecodingForSlot() is enough to
fix this particular case. But I'd be happier if we could also add
asserts checking the LSN advances, to detect similar issues that we may
be unaware of yet.

As most of us lean towards fixing
LogicalIncreaseRestartDecodingForSlot(), let's fix that in the HEAD
and back branches. Separately we can consider other asserts just for
HEAD that you think will make the code robust and help avoid such bugs
in the future.

+1 to that

regards

--
Tomas Vondra

#27Tomas Vondra
tomas@vondra.me
In reply to: Ashutosh Bapat (#22)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On 11/13/24 05:38, Ashutosh Bapat wrote:

...

Here's way we can fix SnapBuildProcessRunningXacts() similar to
DecodeCommit(). DecodeCommit() uses SnapBuildXactNeedsSkip() to decide
whether a given transaction should be decoded or not.
/*
* Should the contents of transaction ending at 'ptr' be decoded?
*/
bool
SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
{
return ptr < builder->start_decoding_at;
}

Similar to SnapBuild::start_decoding_at we could maintain a field
SnapBuild::start_reading_at to the LSN from which the WAL sender would
start reading WAL. If candidate_restart_lsn produced by a running
transactions WAL record is less than SnapBuild::start_reading_at,
SnapBuildProcessRunningXacts() won't call
LogicalIncreaseRestartDecodingForSlot() with that candiate LSN. We
won't access the slot here and the solution will be inline with
DecodeCommit() which skips the transactions.

Could you maybe write a patch doing this? That would allow proper
testing etc.

regards

--
Tomas Vondra

#28Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Tomas Vondra (#26)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Wed, Nov 13, 2024 at 3:53 AM Tomas Vondra <tomas@vondra.me> wrote:

On 11/13/24 10:38, Amit Kapila wrote:

On Tue, Nov 12, 2024 at 6:29 PM Tomas Vondra <tomas@vondra.me> wrote:

Sure, maybe fixing LogicalIncreaseRestartDecodingForSlot() is enough to
fix this particular case. But I'd be happier if we could also add
asserts checking the LSN advances, to detect similar issues that we may
be unaware of yet.

As most of us lean towards fixing
LogicalIncreaseRestartDecodingForSlot(), let's fix that in the HEAD
and back branches. Separately we can consider other asserts just for
HEAD that you think will make the code robust and help avoid such bugs
in the future.

+1 to that

+1.

Tomas, are you going to update the patch you shared before[1]/messages/by-id/abf794c4-c459-4fed-84d9-968c4f0e2052@vondra.me, 0004-fix-LogicalIncreaseRestartDecodingForSlot.patch or shall I?

Regards,

[1]: /messages/by-id/abf794c4-c459-4fed-84d9-968c4f0e2052@vondra.me, 0004-fix-LogicalIncreaseRestartDecodingForSlot.patch
0004-fix-LogicalIncreaseRestartDecodingForSlot.patch

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#29Tomas Vondra
tomas@vondra.me
In reply to: Masahiko Sawada (#28)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On 11/14/24 00:33, Masahiko Sawada wrote:

On Wed, Nov 13, 2024 at 3:53 AM Tomas Vondra <tomas@vondra.me> wrote:

On 11/13/24 10:38, Amit Kapila wrote:

On Tue, Nov 12, 2024 at 6:29 PM Tomas Vondra <tomas@vondra.me> wrote:

Sure, maybe fixing LogicalIncreaseRestartDecodingForSlot() is enough to
fix this particular case. But I'd be happier if we could also add
asserts checking the LSN advances, to detect similar issues that we may
be unaware of yet.

As most of us lean towards fixing
LogicalIncreaseRestartDecodingForSlot(), let's fix that in the HEAD
and back branches. Separately we can consider other asserts just for
HEAD that you think will make the code robust and help avoid such bugs
in the future.

+1 to that

+1.

Tomas, are you going to update the patch you shared before[1] or shall I?

Please feel free to take over. I'm busy with some other stuff and the
initial analysis was done by you anyway.

regards

--
Tomas Vondra

#30Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Tomas Vondra (#29)
1 attachment(s)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Wed, Nov 13, 2024 at 3:47 PM Tomas Vondra <tomas@vondra.me> wrote:

On 11/14/24 00:33, Masahiko Sawada wrote:

On Wed, Nov 13, 2024 at 3:53 AM Tomas Vondra <tomas@vondra.me> wrote:

On 11/13/24 10:38, Amit Kapila wrote:

On Tue, Nov 12, 2024 at 6:29 PM Tomas Vondra <tomas@vondra.me> wrote:

Sure, maybe fixing LogicalIncreaseRestartDecodingForSlot() is enough to
fix this particular case. But I'd be happier if we could also add
asserts checking the LSN advances, to detect similar issues that we may
be unaware of yet.

As most of us lean towards fixing
LogicalIncreaseRestartDecodingForSlot(), let's fix that in the HEAD
and back branches. Separately we can consider other asserts just for
HEAD that you think will make the code robust and help avoid such bugs
in the future.

+1 to that

+1.

Tomas, are you going to update the patch you shared before[1] or shall I?

Please feel free to take over. I'm busy with some other stuff and the
initial analysis was done by you anyway.

Sure. I've attached the updated patch. I just added the commit message.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

Attachments:

v1-0001-Fix-a-possibility-of-logical-replication-slot-s-r.patchapplication/octet-stream; name=v1-0001-Fix-a-possibility-of-logical-replication-slot-s-r.patchDownload
From c7f4297363c376b66169533d540e390d0932e1c6 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <msawada@postgresql.orig>
Date: Wed, 13 Nov 2024 14:58:01 -0800
Subject: [PATCH v1] Fix a possibility of logical replication slot's
 restart_lsn going backwards.

Previously LogicalIncreaseRestartDecodingForSlot() accidentally
accepted any LSN as the candidate_lsn and candidate_valid after the
restart_lsn of the replication slot was updated, so it potentially
caused the restart_lsn to move backwards.

A scenario where this could happen in logical replication is: after a
logical replication restart, based on previous candidate_lsn and
candidate_valid values in memory, the restart_lsn advances upon
receiving a subscriber acknowledgment. Then, logical decoding restarts
from an older point, setting candidate_lsn and candidate_valid based
on an old RUNNING_XACTS record. Subsequent subscriber acknowledgments
then update the restart_lsn to an LSN older than the current value.

In the reported case, after WAL files were removed by a checkpoint,
the retreated restart_lsn prevented logical replication from
restarting due to missing WAL segments.

This change essentially modifies the 'if' condition to 'else if'
condition within the function. The previous code had an asymmetry in
this regard compared to LogicalIncreaseXminForSlot(), which does
almost the same thing for different fields.

The WAL removal issue was reported by Hubert Depesz Lubaczewski.

Backpatch to all supported versions, since the bug exists since 9.4
where logical decoding was introduced.

Author: Tomas Vondra
Reviewed-by: Ashutosh Bapat, Amit Kapila, Masahiko Sawada
Discussion: https://postgr.es/m/Yz2hivgyjS1RfMKs%40depesz.com
Discussion: https://postgr.es/m/85fff40e-148b-4e86-b921-b4b846289132%40vondra.me
Backpatch-through: 13
---
 src/backend/replication/logical/logical.c | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3fe1774a1e..9437ee3fb4 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1761,6 +1761,7 @@ void
 LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
 {
 	bool		updated_lsn = false;
+	bool		spin_released = false;
 	ReplicationSlot *slot;
 
 	slot = MyReplicationSlot;
@@ -1794,12 +1795,14 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 * might never end up updating if the receiver acks too slowly. A missed
 	 * value here will just cause some extra effort after reconnecting.
 	 */
-	if (slot->candidate_restart_valid == InvalidXLogRecPtr)
+	else if (slot->candidate_restart_valid == InvalidXLogRecPtr)
 	{
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
+		spin_released = true;
+
 		elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
 			 LSN_FORMAT_ARGS(restart_lsn),
 			 LSN_FORMAT_ARGS(current_lsn));
@@ -1815,6 +1818,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 		confirmed_flush = slot->data.confirmed_flush;
 		SpinLockRelease(&slot->mutex);
 
+		spin_released = true;
+
 		elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
 			 LSN_FORMAT_ARGS(restart_lsn),
 			 LSN_FORMAT_ARGS(current_lsn),
@@ -1823,6 +1828,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 			 LSN_FORMAT_ARGS(confirmed_flush));
 	}
 
+	if (!spin_released)
+		SpinLockRelease(&slot->mutex);
+
 	/* candidates are already valid with the current flush position, apply */
 	if (updated_lsn)
 		LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
-- 
2.43.5

#31Ashutosh Bapat
ashutosh.bapat.oss@gmail.com
In reply to: Tomas Vondra (#27)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Wed, Nov 13, 2024 at 5:25 PM Tomas Vondra <tomas@vondra.me> wrote:

On 11/13/24 05:38, Ashutosh Bapat wrote:

...

Here's way we can fix SnapBuildProcessRunningXacts() similar to
DecodeCommit(). DecodeCommit() uses SnapBuildXactNeedsSkip() to decide
whether a given transaction should be decoded or not.
/*
* Should the contents of transaction ending at 'ptr' be decoded?
*/
bool
SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
{
return ptr < builder->start_decoding_at;
}

Similar to SnapBuild::start_decoding_at we could maintain a field
SnapBuild::start_reading_at to the LSN from which the WAL sender would
start reading WAL. If candidate_restart_lsn produced by a running
transactions WAL record is less than SnapBuild::start_reading_at,
SnapBuildProcessRunningXacts() won't call
LogicalIncreaseRestartDecodingForSlot() with that candiate LSN. We
won't access the slot here and the solution will be inline with
DecodeCommit() which skips the transactions.

Could you maybe write a patch doing this? That would allow proper
testing etc.

Here's a quick and dirty patch which describes the idea. I didn't get
time to implement code to move SnapBuild::restart_lsn if
SnapBuild::start_decoding_at moves forward while building initial
snapshot. I am not sure whether that's necessary either.

I have added three elogs to see if the logic is working as expected. I
see two of the elogs in patch in the server log when I run tests from
tests/subscription and tests/recovery. But I do not see the third one.
That either means that the situation causing the bug is not covered by
those tests or the fix is not triggered. If you run your reproduction
and still see the crashes please provide the output of those elog
messages along with the rest of the elogs you have added.

--
Best Wishes,
Ashutosh Bapat

#32Michael Paquier
michael@paquier.xyz
In reply to: Ashutosh Bapat (#31)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Thu, Nov 14, 2024 at 11:45:56AM +0530, Ashutosh Bapat wrote:

Here's a quick and dirty patch which describes the idea. I didn't get
time to implement code to move SnapBuild::restart_lsn if
SnapBuild::start_decoding_at moves forward while building initial
snapshot. I am not sure whether that's necessary either.

I have added three elogs to see if the logic is working as expected. I
see two of the elogs in patch in the server log when I run tests from
tests/subscription and tests/recovery. But I do not see the third one.
That either means that the situation causing the bug is not covered by
those tests or the fix is not triggered. If you run your reproduction
and still see the crashes please provide the output of those elog
messages along with the rest of the elogs you have added.

Forgot the attachment, perhaps?
--
Michael

#33Ashutosh Bapat
ashutosh.bapat.oss@gmail.com
In reply to: Michael Paquier (#32)
1 attachment(s)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

Thanks for noticing it. Sorry. Attached.

On Thu, Nov 14, 2024 at 12:04 PM Michael Paquier <michael@paquier.xyz> wrote:

On Thu, Nov 14, 2024 at 11:45:56AM +0530, Ashutosh Bapat wrote:

Here's a quick and dirty patch which describes the idea. I didn't get
time to implement code to move SnapBuild::restart_lsn if
SnapBuild::start_decoding_at moves forward while building initial
snapshot. I am not sure whether that's necessary either.

I have added three elogs to see if the logic is working as expected. I
see two of the elogs in patch in the server log when I run tests from
tests/subscription and tests/recovery. But I do not see the third one.
That either means that the situation causing the bug is not covered by
those tests or the fix is not triggered. If you run your reproduction
and still see the crashes please provide the output of those elog
messages along with the rest of the elogs you have added.

Forgot the attachment, perhaps?
--
Michael

--
Best Wishes,
Ashutosh Bapat

Attachments:

ignore_running_xact_candidates.patchtext/x-patch; charset=US-ASCII; name=ignore_running_xact_candidates.patchDownload
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3fe1774a1e9..e42fe52de39 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -147,7 +147,7 @@ CheckLogicalDecodingRequirements(void)
  * CreateDecodingContext() performing common tasks.
  */
 static LogicalDecodingContext *
-StartupDecodingContext(List *output_plugin_options,
+StartupDecodingContext(List *output_plugin_options, XLogRecPtr restart_lsn,
 					   XLogRecPtr start_lsn,
 					   TransactionId xmin_horizon,
 					   bool need_full_snapshot,
@@ -212,7 +212,7 @@ StartupDecodingContext(List *output_plugin_options,
 
 	ctx->reorder = ReorderBufferAllocate();
 	ctx->snapshot_builder =
-		AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
+		AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, restart_lsn, start_lsn,
 								need_full_snapshot, in_create, slot->data.two_phase_at);
 
 	ctx->reorder->private_data = ctx;
@@ -438,7 +438,7 @@ CreateInitDecodingContext(const char *plugin,
 	ReplicationSlotMarkDirty();
 	ReplicationSlotSave();
 
-	ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
+	ctx = StartupDecodingContext(NIL, restart_lsn, restart_lsn, xmin_horizon,
 								 need_full_snapshot, false, true,
 								 xl_routine, prepare_write, do_write,
 								 update_progress);
@@ -591,7 +591,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 		start_lsn = slot->data.confirmed_flush;
 	}
 
-	ctx = StartupDecodingContext(output_plugin_options,
+	ctx = StartupDecodingContext(output_plugin_options, slot->data.restart_lsn,
 								 start_lsn, InvalidTransactionId, false,
 								 fast_forward, false, xl_routine, prepare_write,
 								 do_write, update_progress);
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index a6a4da32668..62f92b51917 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -184,6 +184,7 @@ static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *
 SnapBuild *
 AllocateSnapshotBuilder(ReorderBuffer *reorder,
 						TransactionId xmin_horizon,
+						XLogRecPtr restart_lsn,
 						XLogRecPtr start_lsn,
 						bool need_full_snapshot,
 						bool in_slot_creation,
@@ -217,6 +218,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
 
 	builder->initial_xmin_horizon = xmin_horizon;
 	builder->start_decoding_at = start_lsn;
+	builder->restart_lsn = restart_lsn;
 	builder->in_slot_creation = in_slot_creation;
 	builder->building_full_snapshot = need_full_snapshot;
 	builder->two_phase_at = two_phase_at;
@@ -1163,7 +1165,17 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	 * anything because we hadn't reached a consistent state yet.
 	 */
 	if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
-		LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+	{
+		elog(LOG, "running xact record at %X/%X, builder::restart_lsn = %X/%X, restart_decoding_lsn of txn (%d): %X/%X",
+			LSN_FORMAT_ARGS(lsn), LSN_FORMAT_ARGS(builder->restart_lsn), txn->xid, LSN_FORMAT_ARGS(txn->restart_decoding_lsn));
+
+		if (txn->restart_decoding_lsn >= builder->restart_lsn)
+			LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+		else
+			elog(LOG, "skipped candidates generated by running transaction record at %X/%X",
+				LSN_FORMAT_ARGS(lsn));
+
+	}
 
 	/*
 	 * No in-progress transaction, can reuse the last serialized snapshot if
@@ -1172,8 +1184,18 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	else if (txn == NULL &&
 			 builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
 			 builder->last_serialized_snapshot != InvalidXLogRecPtr)
-		LogicalIncreaseRestartDecodingForSlot(lsn,
+	{
+
+		elog(LOG, "running xact record at %X/%X, last serialized snapshot (LSN) %X/%X, builder::restart_lsn %X/%X, builder::rb::current_restart_decoding_lsn %X/%X",
+			LSN_FORMAT_ARGS(lsn), LSN_FORMAT_ARGS(builder->last_serialized_snapshot), LSN_FORMAT_ARGS(builder->restart_lsn), LSN_FORMAT_ARGS(builder->reorder->current_restart_decoding_lsn));
+
+		if (builder->last_serialized_snapshot >= builder->restart_lsn)
+			LogicalIncreaseRestartDecodingForSlot(lsn,
 											  builder->last_serialized_snapshot);
+		else
+			elog(LOG, "skipped candidates generated by running transaction record at %X/%X",
+					LSN_FORMAT_ARGS(lsn));
+	}
 }
 
 
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 3c1454df993..f9a5cc7bab2 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -64,7 +64,8 @@ struct xl_running_xacts;
 extern void CheckPointSnapBuild(void);
 
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *reorder,
-										  TransactionId xmin_horizon, XLogRecPtr start_lsn,
+										  TransactionId xmin_horizon, XLogRecPtr restart_lsn,
+										  XLogRecPtr start_lsn,
 										  bool need_full_snapshot,
 										  bool in_slot_creation,
 										  XLogRecPtr two_phase_at);
diff --git a/src/include/replication/snapbuild_internal.h b/src/include/replication/snapbuild_internal.h
index 1e295c75076..9d912ea87bb 100644
--- a/src/include/replication/snapbuild_internal.h
+++ b/src/include/replication/snapbuild_internal.h
@@ -43,6 +43,13 @@ struct SnapBuild
 	 */
 	XLogRecPtr	start_decoding_at;
 
+	/*
+	 * LSN from which the WAL reader will start reading WAL. Ignore
+	 * candidate_restart_lsn before this values produced by running transaction
+	 * log records.
+	 */
+	XLogRecPtr restart_lsn;
+
 	/*
 	 * LSN at which two-phase decoding was enabled or LSN at which we found a
 	 * consistent point at the time of slot creation.
#34Amit Kapila
amit.kapila16@gmail.com
In reply to: Tomas Vondra (#25)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Wed, Nov 13, 2024 at 5:23 PM Tomas Vondra <tomas@vondra.me> wrote:

On 11/13/24 11:59, Amit Kapila wrote:

On Tue, Nov 12, 2024 at 12:43 PM Ashutosh Bapat
<ashutosh.bapat.oss@gmail.com> wrote:

On Tue, Nov 12, 2024 at 12:02 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Mon, Nov 11, 2024 at 2:08 PM Tomas Vondra <tomas@vondra.me> wrote:

But neither of those fixes prevents backwards move for confirmed_flush
LSN, as enforced by asserts in the 0005 patch. I don't know if this
assert is incorrect or now. It seems natural that once we get a
confirmation for some LSN, we can't move before that position, but I'm
not sure about that. Maybe it's too strict.

Hmm, I'm concerned that it might be another problem. I think there are
some cases where a subscriber sends a flush position older than slot's
confirmed_flush as a feedback message.

Right, it can happen for cases where subscribers doesn't have to do
anything (for example DDLs) like I have explained in one of my emails
[1]

Thanks. I admit not being entirely familiar with all the details, but
doesn't that email explain more "Why it currently happens?" rather than
"Is this what should be happening?"

Right.

Sure, the subscriber needs to confirm changes for which nothing needs to
be done, like DDL. But isn't there a better way to do that, rather than
allowing confirmed_lsn to go backwards?

Firstly, I agree that we should try to find ways to avoid going
confirmed_lsn going backward. We can try to explore solutions both in
the publisher and subscriber-side.

But it seems to be dangerous if
we always accept it as a new confirmed_flush value. It could happen
that confirm_flush could be set to a LSN older than restart_lsn.

Possible, though I haven't tried to reproduce such a case. But, will
it create any issues? I don't know if there is any benefit in allowing
to move confirmed_flush LSN backward. AFAIR, we don't allow such
backward values to persist. They will temporarily be in memory. I
think as a separate patch we should prevent it from moving backward.

If confirmed_flush LSN moves backwards, it means the transactions
which were thought to be replicated earlier are no longer considered
to be replicated. This means that the restart_lsn of the slot needs to
be at least far back as the oldest of starting points of those
transactions. Thus restart_lsn of slot has to be pushed further back.

I don't see a reason to move restart_lsn backward. Why do you think so?

I think what Ashutosh is saying that if confirmed_flush is allowed to
move backwards, that may result in start_lsn moving backwards too. And
we need to be able to decode all transactions committed since start_lsn,
so if start_lsn moves backwards, maybe restart_lsn needs to move
backwards too. I have no idea if confirmed_flush/start_lsn can move
backwards enough to require restart_lsn to move, though.

Anyway, these discussions are a good illustration why I think allowing
these LSNs to move backwards is a problem. It either causes bugs (like
with breaking replication slots) and/or it makes the reasoning about
correct behavior much harder.

Right, I think one needs to come up with some reproducible scenarios
where these cause any kind of problem or inefficiency. Then, we can
discuss the solutions accordingly. I mean to say that someone has to
put effort into making a bit more solid case for changing this code
because it may not be a good idea to change something just based on
some theory unless it is just adding some assertions.

--
With Regards,
Amit Kapila.

#35Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#30)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Thu, Nov 14, 2024 at 7:08 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

Sure. I've attached the updated patch. I just added the commit message.

@@ -1815,6 +1818,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr
current_lsn, XLogRecPtr restart
confirmed_flush = slot->data.confirmed_flush;
SpinLockRelease(&slot->mutex);

+ spin_released = true;
+
elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after
%X/%X, current candidate %X/%X, current after %X/%X, flushed up to
%X/%X",
LSN_FORMAT_ARGS(restart_lsn),
LSN_FORMAT_ARGS(current_lsn),
@@ -1823,6 +1828,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr
current_lsn, XLogRecPtr restart
LSN_FORMAT_ARGS(confirmed_flush));
}

+ if (!spin_released)
+ SpinLockRelease(&slot->mutex);

This coding pattern looks odd to me. We can consider releasing
spinlock in the other two if/else if checks. I understand it is a
matter of individual preference, so, if you and or others prefer the
current way, that is also fine with me. Other than this, the patch
looks good to me.

--
With Regards,
Amit Kapila.

#36Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Amit Kapila (#35)
1 attachment(s)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Thu, Nov 14, 2024 at 10:16 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Nov 14, 2024 at 7:08 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

Sure. I've attached the updated patch. I just added the commit message.

@@ -1815,6 +1818,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr
current_lsn, XLogRecPtr restart
confirmed_flush = slot->data.confirmed_flush;
SpinLockRelease(&slot->mutex);

+ spin_released = true;
+
elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after
%X/%X, current candidate %X/%X, current after %X/%X, flushed up to
%X/%X",
LSN_FORMAT_ARGS(restart_lsn),
LSN_FORMAT_ARGS(current_lsn),
@@ -1823,6 +1828,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr
current_lsn, XLogRecPtr restart
LSN_FORMAT_ARGS(confirmed_flush));
}

+ if (!spin_released)
+ SpinLockRelease(&slot->mutex);

This coding pattern looks odd to me. We can consider releasing
spinlock in the other two if/else if checks. I understand it is a
matter of individual preference, so, if you and or others prefer the
current way, that is also fine with me. Other than this, the patch
looks good to me.

Indeed, I prefer your idea. I"ve attached the updated patch. I'll push
it early next week unless there are further comments.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

Attachments:

v2-0001-Fix-a-possibility-of-logical-replication-slot-s-r.patchapplication/octet-stream; name=v2-0001-Fix-a-possibility-of-logical-replication-slot-s-r.patchDownload
From 3bb02cd66a1ae6204a14e58f34b4f4a07d1dae93 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <msawada@postgresql.orig>
Date: Wed, 13 Nov 2024 14:58:01 -0800
Subject: [PATCH v2] Fix a possibility of logical replication slot's
 restart_lsn going backwards.

Previously LogicalIncreaseRestartDecodingForSlot() accidentally
accepted any LSN as the candidate_lsn and candidate_valid after the
restart_lsn of the replication slot was updated, so it potentially
caused the restart_lsn to move backwards.

A scenario where this could happen in logical replication is: after a
logical replication restart, based on previous candidate_lsn and
candidate_valid values in memory, the restart_lsn advances upon
receiving a subscriber acknowledgment. Then, logical decoding restarts
from an older point, setting candidate_lsn and candidate_valid based
on an old RUNNING_XACTS record. Subsequent subscriber acknowledgments
then update the restart_lsn to an LSN older than the current value.

In the reported case, after WAL files were removed by a checkpoint,
the retreated restart_lsn prevented logical replication from
restarting due to missing WAL segments.

This change essentially modifies the 'if' condition to 'else if'
condition within the function. The previous code had an asymmetry in
this regard compared to LogicalIncreaseXminForSlot(), which does
almost the same thing for different fields.

The WAL removal issue was reported by Hubert Depesz Lubaczewski.

Backpatch to all supported versions, since the bug exists since 9.4
where logical decoding was introduced.

Reviewed-by: Tomas Vondra, Ashutosh Bapat, Amit Kapila
Discussion: https://postgr.es/m/Yz2hivgyjS1RfMKs%40depesz.com
Discussion: https://postgr.es/m/85fff40e-148b-4e86-b921-b4b846289132%40vondra.me
Backpatch-through: 13
---
 src/backend/replication/logical/logical.c | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3fe1774a1e..e941bb491d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1774,6 +1774,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	/* don't overwrite if have a newer restart lsn */
 	if (restart_lsn <= slot->data.restart_lsn)
 	{
+		SpinLockRelease(&slot->mutex);
 	}
 
 	/*
@@ -1784,6 +1785,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	{
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
+		SpinLockRelease(&slot->mutex);
 
 		/* our candidate can directly be used */
 		updated_lsn = true;
@@ -1794,7 +1796,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 * might never end up updating if the receiver acks too slowly. A missed
 	 * value here will just cause some extra effort after reconnecting.
 	 */
-	if (slot->candidate_restart_valid == InvalidXLogRecPtr)
+	else if (slot->candidate_restart_valid == InvalidXLogRecPtr)
 	{
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
-- 
2.43.5

#37Tomas Vondra
tomas@vondra.me
In reply to: Masahiko Sawada (#36)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On 11/15/24 18:40, Masahiko Sawada wrote:

On Thu, Nov 14, 2024 at 10:16 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Nov 14, 2024 at 7:08 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

Sure. I've attached the updated patch. I just added the commit message.

@@ -1815,6 +1818,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr
current_lsn, XLogRecPtr restart
confirmed_flush = slot->data.confirmed_flush;
SpinLockRelease(&slot->mutex);

+ spin_released = true;
+
elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after
%X/%X, current candidate %X/%X, current after %X/%X, flushed up to
%X/%X",
LSN_FORMAT_ARGS(restart_lsn),
LSN_FORMAT_ARGS(current_lsn),
@@ -1823,6 +1828,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr
current_lsn, XLogRecPtr restart
LSN_FORMAT_ARGS(confirmed_flush));
}

+ if (!spin_released)
+ SpinLockRelease(&slot->mutex);

This coding pattern looks odd to me. We can consider releasing
spinlock in the other two if/else if checks. I understand it is a
matter of individual preference, so, if you and or others prefer the
current way, that is also fine with me. Other than this, the patch
looks good to me.

Indeed, I prefer your idea. I"ve attached the updated patch. I'll push
it early next week unless there are further comments.

I'm not particularly attached to how I did this in my WIP patch, it was
simply the simplest way to make it work for experimentation. I'd imagine
it'd be best to just mirror how LogicalIncreaseXminForSlot() does this.

regards

--
Tomas Vondra

#38Tomas Vondra
tomas@vondra.me
In reply to: Amit Kapila (#34)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On 11/15/24 04:26, Amit Kapila wrote:

On Wed, Nov 13, 2024 at 5:23 PM Tomas Vondra <tomas@vondra.me> wrote:

On 11/13/24 11:59, Amit Kapila wrote:

On Tue, Nov 12, 2024 at 12:43 PM Ashutosh Bapat
<ashutosh.bapat.oss@gmail.com> wrote:

On Tue, Nov 12, 2024 at 12:02 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Mon, Nov 11, 2024 at 2:08 PM Tomas Vondra <tomas@vondra.me> wrote:

But neither of those fixes prevents backwards move for confirmed_flush
LSN, as enforced by asserts in the 0005 patch. I don't know if this
assert is incorrect or now. It seems natural that once we get a
confirmation for some LSN, we can't move before that position, but I'm
not sure about that. Maybe it's too strict.

Hmm, I'm concerned that it might be another problem. I think there are
some cases where a subscriber sends a flush position older than slot's
confirmed_flush as a feedback message.

Right, it can happen for cases where subscribers doesn't have to do
anything (for example DDLs) like I have explained in one of my emails
[1]

Thanks. I admit not being entirely familiar with all the details, but
doesn't that email explain more "Why it currently happens?" rather than
"Is this what should be happening?"

Right.

Sure, the subscriber needs to confirm changes for which nothing needs to
be done, like DDL. But isn't there a better way to do that, rather than
allowing confirmed_lsn to go backwards?

Firstly, I agree that we should try to find ways to avoid going
confirmed_lsn going backward. We can try to explore solutions both in
the publisher and subscriber-side.

Good that we agree.

I'm not sure how could we do this on the subscriber side. Or more
precisely, I see "LSNs don't go backwards" as an invariant that the
publisher can enforce for all subscribers (with whatever plugin).

And if a subscriber responds in a way that contradicts the invariant,
I'd treat that as a subscriber bug. I don't think we should rely on
subscriber to do the right thing - we have little control over that, and
when things break (say, WAL gets removed too early), people generally
point at the publisher.

I think what Ashutosh is saying that if confirmed_flush is allowed to
move backwards, that may result in start_lsn moving backwards too. And
we need to be able to decode all transactions committed since start_lsn,
so if start_lsn moves backwards, maybe restart_lsn needs to move
backwards too. I have no idea if confirmed_flush/start_lsn can move
backwards enough to require restart_lsn to move, though.

Anyway, these discussions are a good illustration why I think allowing
these LSNs to move backwards is a problem. It either causes bugs (like
with breaking replication slots) and/or it makes the reasoning about
correct behavior much harder.

Right, I think one needs to come up with some reproducible scenarios
where these cause any kind of problem or inefficiency. Then, we can
discuss the solutions accordingly. I mean to say that someone has to
put effort into making a bit more solid case for changing this code
because it may not be a good idea to change something just based on
some theory unless it is just adding some assertions.

I don't know, but isn't this a bit backwards? I understand we don't want
to just recklessly change long-standing code, but I think it's sensible
to enforce sensible invariants like "LSNs can't go backwards" to ensure
correct behavior. And then if there's a performance/efficiency problem,
and someone proposes a fix, it's up to that patch to prove it's correct.

Here we seem to have code no one is quite sure is correct, but we're
asking for reproducible scenarios proving the existence of a bug. The
bugs can be quite subtle / hard to reproduce, as evidenced by the
restart_lsn issue present since 9.4. Maybe this should be the other way
around, i.e. make it "strict" and then prove that (a) relaxing the check
is still correct and (b) it actually has other benefits.

That being said, I don't have a clear idea how to change this.

So +1 to at least introducing the asserts.

regards

--
Tomas Vondra

#39Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Tomas Vondra (#37)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On Fri, Nov 15, 2024 at 9:48 AM Tomas Vondra <tomas@vondra.me> wrote:

On 11/15/24 18:40, Masahiko Sawada wrote:

On Thu, Nov 14, 2024 at 10:16 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Nov 14, 2024 at 7:08 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

Sure. I've attached the updated patch. I just added the commit message.

@@ -1815,6 +1818,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr
current_lsn, XLogRecPtr restart
confirmed_flush = slot->data.confirmed_flush;
SpinLockRelease(&slot->mutex);

+ spin_released = true;
+
elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after
%X/%X, current candidate %X/%X, current after %X/%X, flushed up to
%X/%X",
LSN_FORMAT_ARGS(restart_lsn),
LSN_FORMAT_ARGS(current_lsn),
@@ -1823,6 +1828,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr
current_lsn, XLogRecPtr restart
LSN_FORMAT_ARGS(confirmed_flush));
}

+ if (!spin_released)
+ SpinLockRelease(&slot->mutex);

This coding pattern looks odd to me. We can consider releasing
spinlock in the other two if/else if checks. I understand it is a
matter of individual preference, so, if you and or others prefer the
current way, that is also fine with me. Other than this, the patch
looks good to me.

Indeed, I prefer your idea. I"ve attached the updated patch. I'll push
it early next week unless there are further comments.

I'm not particularly attached to how I did this in my WIP patch, it was
simply the simplest way to make it work for experimentation. I'd imagine
it'd be best to just mirror how LogicalIncreaseXminForSlot() does this.

I was going to push it next Monday but we're going to have
out-of-cycle minor releases next week and we don't have to wait for
February minor releases given that we already agreed with the current
fix. So I pushed the fix.

Looking at buildfarm animal, it seems that alligator started complain
a build error:

plancat.c: In function \342\200\230get_relation_info\342\200\231:
plancat.c:331:54: error: assignment to \342\200\230void
(*)(void)\342\200\231 from incompatible pointer type
\342\200\230amcostestimate_function\342\200\231 {aka \342\200\230void
(*)(struct PlannerInfo *, struct IndexPath *, double, double *,
double *, double *, double *, double *)\342\200\231}
[-Wincompatible-pointer-types]
331 | info->amcostestimate =
amroutine->amcostestimate;
| ^
make[4]: *** [<builtin>: plancat.o] Error 1

I think that it's not relevant with this fix and recent commits but
caused by changes happening in gcc[1]likely https://gcc.gnu.org/git/?p=gcc.git;a=commit;h=55e3bd376b2214e200fa76d12b67ff259b06c212 (it's using nightly build gcc).

From a success log, 5f28e6b[2]https://buildfarm.postgresql.org/cgi-bin/show_stage_log.pl?nm=alligator&amp;dt=2024-11-16%2000%3A20%3A53&amp;stg=configure:

configure: using compiler=gcc (GCC) 15.0.0 20241115 (experimental)

On the other hand, from a failure log[3]https://buildfarm.postgresql.org/cgi-bin/show_stage_log.pl?nm=alligator&amp;dt=2024-11-16%2002%3A14%3A05&amp;stg=configure:

configure: using compiler=gcc (GCC) 15.0.0 20241116 (experimental)

FYI it started to report build errors with 20241116 build also on v12
where this fix is not pushed.

Regards,

[1]: likely https://gcc.gnu.org/git/?p=gcc.git;a=commit;h=55e3bd376b2214e200fa76d12b67ff259b06c212
[2]: https://buildfarm.postgresql.org/cgi-bin/show_stage_log.pl?nm=alligator&amp;dt=2024-11-16%2000%3A20%3A53&amp;stg=configure
[3]: https://buildfarm.postgresql.org/cgi-bin/show_stage_log.pl?nm=alligator&amp;dt=2024-11-16%2002%3A14%3A05&amp;stg=configure

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#40Tomas Vondra
tomas@vondra.me
In reply to: Masahiko Sawada (#39)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

On 11/16/24 08:26, Masahiko Sawada wrote:

On Fri, Nov 15, 2024 at 9:48 AM Tomas Vondra <tomas@vondra.me> wrote:

On 11/15/24 18:40, Masahiko Sawada wrote:

On Thu, Nov 14, 2024 at 10:16 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Nov 14, 2024 at 7:08 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

Sure. I've attached the updated patch. I just added the commit message.

@@ -1815,6 +1818,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr
current_lsn, XLogRecPtr restart
confirmed_flush = slot->data.confirmed_flush;
SpinLockRelease(&slot->mutex);

+ spin_released = true;
+
elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after
%X/%X, current candidate %X/%X, current after %X/%X, flushed up to
%X/%X",
LSN_FORMAT_ARGS(restart_lsn),
LSN_FORMAT_ARGS(current_lsn),
@@ -1823,6 +1828,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr
current_lsn, XLogRecPtr restart
LSN_FORMAT_ARGS(confirmed_flush));
}

+ if (!spin_released)
+ SpinLockRelease(&slot->mutex);

This coding pattern looks odd to me. We can consider releasing
spinlock in the other two if/else if checks. I understand it is a
matter of individual preference, so, if you and or others prefer the
current way, that is also fine with me. Other than this, the patch
looks good to me.

Indeed, I prefer your idea. I"ve attached the updated patch. I'll push
it early next week unless there are further comments.

I'm not particularly attached to how I did this in my WIP patch, it was
simply the simplest way to make it work for experimentation. I'd imagine
it'd be best to just mirror how LogicalIncreaseXminForSlot() does this.

I was going to push it next Monday but we're going to have
out-of-cycle minor releases next week and we don't have to wait for
February minor releases given that we already agreed with the current
fix. So I pushed the fix.

Thanks. I see you only backpatched to 13, but I believe 12 will be
rewrapped too. So maybe backpatch to 12 too?

regards

--
Tomas Vondra

#41Tom Lane
tgl@sss.pgh.pa.us
In reply to: Tomas Vondra (#40)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

Tomas Vondra <tomas@vondra.me> writes:

Thanks. I see you only backpatched to 13, but I believe 12 will be
rewrapped too. So maybe backpatch to 12 too?

Please don't. The release team already discussed this and determined
that we want to push 12.22 with only the ROLE-regression fix. v12
is not "back in support" for any other purpose.

regards, tom lane

#42Tomas Vondra
tomas@vondra.me
In reply to: Tomas Vondra (#40)
1 attachment(s)
Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

OK,

Now that the fix is committed, shall we introduce some of the asserts? I
believe there's an agreement the restart_lsn shouldn't move backwards,
so I propose the attached patch.

I still think not resetting the fields when releasing the slot, and
allowing the values to move backwards is rather suspicious. But I don't
have any reproducer demonstrating an issue (beyond just hitting an
assert). Perhaps it's correct, but in that case it'd be good to add a
comment explaining why that's the case. Sadly, it has to be written by
someone else - I've been unable to form a justification why it's OK :-(

regards

--
Tomas Vondra

Attachments:

0001-asserts-restart_lsn.patchtext/x-patch; charset=UTF-8; name=0001-asserts-restart_lsn.patchDownload
From 25678fbbef96965dfb54387dacbe979c920b84e8 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@vondra.me>
Date: Tue, 19 Nov 2024 16:21:57 +0100
Subject: [PATCH vasserts 1/3] asserts: restart_lsn

---
 src/backend/replication/logical/logical.c | 3 +++
 src/backend/replication/slot.c            | 1 +
 2 files changed, 4 insertions(+)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index e941bb491d8..7fb9f7bd0fa 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -391,6 +391,7 @@ CreateInitDecodingContext(const char *plugin,
 	else
 	{
 		SpinLockAcquire(&slot->mutex);
+		Assert(slot->data.restart_lsn <= restart_lsn);
 		slot->data.restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 	}
@@ -1877,6 +1878,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		{
 			Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
 
+			Assert(MyReplicationSlot->data.restart_lsn <= MyReplicationSlot->candidate_restart_lsn);
+
 			MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
 			MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
 			MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 6828100cf1a..0f38ccc8353 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1468,6 +1468,7 @@ ReplicationSlotReserveWal(void)
 			restart_lsn = GetXLogInsertRecPtr();
 
 		SpinLockAcquire(&slot->mutex);
+		Assert(slot->data.restart_lsn <= restart_lsn);
 		slot->data.restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
-- 
2.47.0