From 8cd1b0472f1189be9ba19455df50fd1cc1f730e5 Mon Sep 17 00:00:00 2001
From: tomas <tomas>
Date: Wed, 6 Nov 2024 12:41:25 +0100
Subject: [PATCH vnew 1/2] WIP: add elog messages and asserts

shows how the LSNs move, protects against move backwards
---
 src/backend/access/transam/xlog.c           | 34 ++++++++++++
 src/backend/replication/logical/logical.c   | 60 +++++++++++++++++++++
 src/backend/replication/logical/snapbuild.c |  6 +++
 src/backend/replication/slot.c              | 17 +++++-
 4 files changed, 115 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f1a795bba9f..ba9415b9149 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2782,9 +2782,17 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 void
 XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
 {
+	XLogRecPtr	oldlsn;
+
 	SpinLockAcquire(&XLogCtl->info_lck);
+	oldlsn = XLogCtl->replicationSlotMinLSN;
 	XLogCtl->replicationSlotMinLSN = lsn;
 	SpinLockRelease(&XLogCtl->info_lck);
+
+	elog(LOG, "XLogSetReplicationSlotMinimumLSN set %X/%X (old %X/%X)", LSN_FORMAT_ARGS(lsn), LSN_FORMAT_ARGS(oldlsn));
+
+	/* no backward moves */
+	Assert(lsn >= oldlsn);
 }
 
 
@@ -2801,6 +2809,8 @@ XLogGetReplicationSlotMinimumLSN(void)
 	retval = XLogCtl->replicationSlotMinLSN;
 	SpinLockRelease(&XLogCtl->info_lck);
 
+	elog(LOG, "XLogGetReplicationSlotMinimumLSN got %X/%X", LSN_FORMAT_ARGS(retval));
+
 	return retval;
 }
 
@@ -9462,12 +9472,19 @@ CreateCheckPoint(int flags)
 	if (PriorRedoPtr != InvalidXLogRecPtr)
 		UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
 
+	elog(LOG, "CreateCheckPoint / slot invalidation start / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
+
 	/*
 	 * Delete old log files, those no longer needed for last checkpoint to
 	 * prevent the disk holding the xlog from growing full.
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 	KeepLogSeg(recptr, &_logSegNo);
+
+	elog(LOG, "CreateCheckPoint / slot invalidation after KeepLogSeg / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
+
 	if (InvalidateObsoleteReplicationSlots(_logSegNo))
 	{
 		/*
@@ -9476,10 +9493,15 @@ CreateCheckPoint(int flags)
 		 */
 		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 		KeepLogSeg(recptr, &_logSegNo);
+		elog(LOG, "CreateCheckPoint / slot invalidation recalculate / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+			LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
 	}
 	_logSegNo--;
 	RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
 
+	elog(LOG, "CreateCheckPoint / slot invalidation done / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
+
 	/*
 	 * Make more log segments if needed.  (Do this after recycling old log
 	 * segments, since that may supply some of the needed files.)
@@ -9877,6 +9899,10 @@ CreateRestartPoint(int flags)
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 	endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 	KeepLogSeg(endptr, &_logSegNo);
+
+	elog(LOG, "CreateRestartPoint / slot invalidation start / RedoRecPtr %X/%X receivePtr %X/%X replayPtr %X/%X endptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(receivePtr), LSN_FORMAT_ARGS(replayPtr), LSN_FORMAT_ARGS(endptr), _logSegNo);
+
 	if (InvalidateObsoleteReplicationSlots(_logSegNo))
 	{
 		/*
@@ -9885,9 +9911,15 @@ CreateRestartPoint(int flags)
 		 */
 		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 		KeepLogSeg(endptr, &_logSegNo);
+
+		elog(LOG, "CreateRestartPoint / slot invalidation recalculate / RedoRecPtr %X/%X endptr %X/%X _logSegNo %lu",
+			LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(endptr), _logSegNo);
 	}
 	_logSegNo--;
 
+	elog(LOG, "CreateRestartPoint / slot invalidation done / RedoRecPtr %X/%X endptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(endptr), _logSegNo);
+
 	/*
 	 * Try to recycle segments on a useful timeline. If we've been promoted
 	 * since the beginning of this restartpoint, use the new timeline chosen
@@ -10109,6 +10141,8 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 		}
 	}
 
+	elog(LOG, "KeepLogSeg recptr %X/%X logSegNo %lu segno %lu", LSN_FORMAT_ARGS(recptr), *logSegNo, segno);
+
 	/* don't delete WAL segments newer than the calculated segment */
 	if (segno < *logSegNo)
 		*logSegNo = segno;
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2f7b2c85d9b..16fed53063b 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -375,9 +375,16 @@ CreateInitDecodingContext(const char *plugin,
 		ReplicationSlotReserveWal();
 	else
 	{
+		XLogRecPtr oldlsn;
 		SpinLockAcquire(&slot->mutex);
+		oldlsn = slot->data.restart_lsn;
 		slot->data.restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
+
+		elog(LOG, "CreateInitDecodingContext updated restart_lsn %X/%X (old %X/%X) for slot %s",
+			LSN_FORMAT_ARGS(restart_lsn),
+			LSN_FORMAT_ARGS(oldlsn),
+			NameStr(slot->data.name));
 	}
 
 	/* ----
@@ -1598,6 +1605,13 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
+		elog(LOG, "LogicalIncreaseXminForSlot candidate_catalog_xmin %u candidate_xmin_lsn %X/%X (%X/%X)",
+			  xmin, LSN_FORMAT_ARGS(current_lsn),
+			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
+
+		/* don't allow the LSN to go backwards */
+		Assert(slot->candidate_xmin_lsn <= current_lsn);
+
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
 
@@ -1611,6 +1625,13 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 	 */
 	else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
 	{
+		elog(LOG, "LogicalIncreaseXminForSlot candidate_catalog_xmin %u candidate_xmin_lsn %X/%X (%X/%X)",
+			  xmin, LSN_FORMAT_ARGS(current_lsn),
+			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
+
+		/* don't allow the LSN to go backwards */
+		Assert(slot->candidate_xmin_lsn <= current_lsn);
+
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
 	}
@@ -1653,6 +1674,18 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot %s candidate_restart_valid_lsn %X/%X (%X/%X) candidate_restart_lsn %X/%X (%X/%X)",
+			NameStr(slot->data.name),
+			LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
+			LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
+
+		/* don't allow LSNs to go back */
+		Assert(slot->candidate_restart_valid <= current_lsn);
+		Assert(slot->candidate_restart_lsn <= restart_lsn);
+
+		/* also don't even consider going back for actual restart_lsn */
+		Assert(slot->data.restart_lsn <= restart_lsn);
+
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 
@@ -1667,6 +1700,21 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 */
 	if (slot->candidate_restart_valid == InvalidXLogRecPtr)
 	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot %s candidate_restart_valid_lsn %X/%X (%X/%X) candidate_restart_lsn %X/%X (%X/%X)",
+			  NameStr(slot->data.name),
+			  LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
+			  LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
+
+		/* don't allow LSNs to go back */
+		Assert(slot->candidate_restart_valid <= current_lsn);
+		Assert(slot->candidate_restart_lsn <= restart_lsn);
+
+		/*
+		 * also don't even consider going back for actual restart_lsn
+		 * XXX This is the first assert we hit, called from snapbuild.c: LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+		 */
+		Assert(slot->data.restart_lsn <= restart_lsn);
+
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
@@ -1707,6 +1755,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 {
 	Assert(lsn != InvalidXLogRecPtr);
 
+	elog(LOG, "LogicalConfirmReceivedLocation %X/%X", LSN_FORMAT_ARGS(lsn));
+
 	/* Do an unlocked check for candidate_lsn first. */
 	if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
 		MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
@@ -1746,6 +1796,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		{
 			Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
 
+			elog(LOG, "LogicalConfirmReceivedLocation updating data.restart_lsn to %X/%X (from %X/%X) candidate_restart_valid 0/0 (from %X/%X) candidate_restart_lsn 0/0 (from %X/%X)",
+				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn),
+				LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_valid),
+				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn));
+
+			/* don't allow the LSN to go backwards */
+			Assert(MyReplicationSlot->candidate_restart_lsn >= MyReplicationSlot->data.restart_lsn);
+
 			MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
 			MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
 			MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
@@ -1774,6 +1833,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 			MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
 			SpinLockRelease(&MyReplicationSlot->mutex);
 
+			elog(LOG, "LogicalConfirmReceivedLocation calculating required LSN/xmin after update");
 			ReplicationSlotsComputeRequiredXmin(false);
 			ReplicationSlotsComputeRequiredLSN();
 		}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 7b761510f1e..05184516704 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1270,7 +1270,10 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	 * anything because we hadn't reached a consistent state yet.
 	 */
 	if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
+	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot restart_decoding_lsn %X/%X", LSN_FORMAT_ARGS(txn->restart_decoding_lsn));
 		LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+	}
 
 	/*
 	 * No in-progress transaction, can reuse the last serialized snapshot if
@@ -1279,8 +1282,11 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	else if (txn == NULL &&
 			 builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
 			 builder->last_serialized_snapshot != InvalidXLogRecPtr)
+	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot last_serialized_snapshot %X/%X", LSN_FORMAT_ARGS(builder->last_serialized_snapshot));
 		LogicalIncreaseRestartDecodingForSlot(lsn,
 											  builder->last_serialized_snapshot);
+	}
 }
 
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 037a347cba0..8a7aa257293 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -827,6 +827,8 @@ ReplicationSlotsComputeRequiredLSN(void)
 
 	Assert(ReplicationSlotCtl != NULL);
 
+	elog(LOG, "ReplicationSlotsComputeRequiredLSN / start");
+
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -840,6 +842,8 @@ ReplicationSlotsComputeRequiredLSN(void)
 		restart_lsn = s->data.restart_lsn;
 		SpinLockRelease(&s->mutex);
 
+		elog(LOG, "ReplicationSlotsComputeRequiredLSN found slot %s with restart_lsn %X/%X", NameStr(s->data.name), LSN_FORMAT_ARGS(restart_lsn));
+
 		if (restart_lsn != InvalidXLogRecPtr &&
 			(min_required == InvalidXLogRecPtr ||
 			 restart_lsn < min_required))
@@ -847,6 +851,8 @@ ReplicationSlotsComputeRequiredLSN(void)
 	}
 	LWLockRelease(ReplicationSlotControlLock);
 
+	elog(LOG, "ReplicationSlotsComputeRequiredLSN / done");
+
 	XLogSetReplicationSlotMinimumLSN(min_required);
 }
 
@@ -1285,9 +1291,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 			ReplicationSlotRelease();
 
 			ereport(LOG,
-					(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
+					(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size (%d)",
 							NameStr(slotname),
-							LSN_FORMAT_ARGS(restart_lsn))));
+							LSN_FORMAT_ARGS(restart_lsn), max_slot_wal_keep_size_mb)));
 
 			/* done with this slot for now */
 			break;
@@ -1315,6 +1321,9 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
 
 	XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
 
+	elog(LOG, "InvalidateObsoleteReplicationSlots oldestSegno %lu oldestLSN %X/%X",
+		oldestSegno, LSN_FORMAT_ARGS(oldestLSN));
+
 restart:
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 	for (int i = 0; i < max_replication_slots; i++)
@@ -1327,6 +1336,7 @@ restart:
 		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
 		{
 			/* if the lock was released, start from scratch */
+			elog(LOG, "InvalidateObsoleteReplicationSlots restart invalidated=%d", invalidated);
 			goto restart;
 		}
 	}
@@ -1337,6 +1347,7 @@ restart:
 	 */
 	if (invalidated)
 	{
+		elog(LOG, "InvalidateObsoleteReplicationSlots slot invalidated, recalculate required");
 		ReplicationSlotsComputeRequiredXmin(false);
 		ReplicationSlotsComputeRequiredLSN();
 	}
@@ -1854,6 +1865,8 @@ RestoreSlotFromDisk(const char *name)
 		slot->candidate_restart_lsn = InvalidXLogRecPtr;
 		slot->candidate_restart_valid = InvalidXLogRecPtr;
 
+		elog(LOG, "RestoreSlotFromDisk restart_lsn %X/%X candidate_xmin_lsn 0/0 candidate_restart_lsn 0/0 candidate_restart_valid 0/0", LSN_FORMAT_ARGS(slot->data.restart_lsn));
+
 		slot->in_use = true;
 		slot->active_pid = 0;
 
-- 
2.39.5

