From 81981f60949ab4c0fb9418b17dfcc37c0aa063f1 Mon Sep 17 00:00:00 2001
From: tomas <tomas>
Date: Wed, 6 Nov 2024 12:46:16 +0100
Subject: [PATCH v14 2/2] WIP: change asserts to elog + defense

When updating LSNs, use Max() with the preceding value to protect against
moves backwards.
---
 src/backend/replication/logical/logical.c | 57 ++++++++++++-----------
 1 file changed, 31 insertions(+), 26 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 16fed53063b..988f4add977 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1609,11 +1609,12 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 			  xmin, LSN_FORMAT_ARGS(current_lsn),
 			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
 
-		/* don't allow the LSN to go backwards */
-		Assert(slot->candidate_xmin_lsn <= current_lsn);
+		if (slot->candidate_xmin_lsn > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseXminForSlot: slot->candidate_xmin_lsn > current_lsn");
 
+		/* XXX doesn't the Max() put the fields out of sync? */
 		slot->candidate_catalog_xmin = xmin;
-		slot->candidate_xmin_lsn = current_lsn;
+		slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn);
 
 		/* our candidate can directly be used */
 		updated_xmin = true;
@@ -1629,11 +1630,12 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 			  xmin, LSN_FORMAT_ARGS(current_lsn),
 			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
 
-		/* don't allow the LSN to go backwards */
-		Assert(slot->candidate_xmin_lsn <= current_lsn);
+		if (slot->candidate_xmin_lsn > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseXminForSlot: slot->candidate_xmin_lsn > current_lsn");
 
+		/* XXX doesn't the Max() put the fields out of sync? */
 		slot->candidate_catalog_xmin = xmin;
-		slot->candidate_xmin_lsn = current_lsn;
+		slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn);
 	}
 	SpinLockRelease(&slot->mutex);
 
@@ -1679,15 +1681,17 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 			LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
 			LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
 
-		/* don't allow LSNs to go back */
-		Assert(slot->candidate_restart_valid <= current_lsn);
-		Assert(slot->candidate_restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_valid > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_valid > current_lsn");
 
-		/* also don't even consider going back for actual restart_lsn */
-		Assert(slot->data.restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_lsn > restart_lsn");
 
-		slot->candidate_restart_valid = current_lsn;
-		slot->candidate_restart_lsn = restart_lsn;
+		if (slot->data.restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > restart_lsn");
+
+		slot->candidate_restart_valid = Max(slot->candidate_restart_valid, current_lsn);
+		slot->candidate_restart_lsn = Max(slot->data.restart_lsn, Max(slot->candidate_restart_lsn, restart_lsn));
 
 		/* our candidate can directly be used */
 		updated_lsn = true;
@@ -1705,18 +1709,18 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 			  LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
 			  LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
 
-		/* don't allow LSNs to go back */
-		Assert(slot->candidate_restart_valid <= current_lsn);
-		Assert(slot->candidate_restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_valid > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_valid > current_lsn");
 
-		/*
-		 * also don't even consider going back for actual restart_lsn
-		 * XXX This is the first assert we hit, called from snapbuild.c: LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
-		 */
-		Assert(slot->data.restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_lsn > restart_lsn");
+
+		if (slot->data.restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > restart_lsn");
+
+		slot->candidate_restart_valid = Max(slot->candidate_restart_valid, current_lsn);
+		slot->candidate_restart_lsn = Max(slot->data.restart_lsn, Max(slot->candidate_restart_lsn, restart_lsn));
 
-		slot->candidate_restart_valid = current_lsn;
-		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
 		elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
@@ -1802,10 +1806,11 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_valid),
 				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn));
 
-			/* don't allow the LSN to go backwards */
-			Assert(MyReplicationSlot->candidate_restart_lsn >= MyReplicationSlot->data.restart_lsn);
+			if (MyReplicationSlot->candidate_restart_lsn < MyReplicationSlot->data.restart_lsn)
+				elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > MyReplicationSlot->candidate_restart_lsn");
 
-			MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
+			MyReplicationSlot->data.restart_lsn = Max(MyReplicationSlot->data.restart_lsn,
+								  MyReplicationSlot->candidate_restart_lsn);
 			MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
 			MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
 			updated_restart = true;
-- 
2.39.5

