From 0b4a7662daa3876f5fbaba277389907fd2ec65bb Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Sat, 3 Feb 2024 19:09:02 +0530
Subject: [PATCH v2] While a new logical replication slot is created, an
 already persisted snapshot was being used.

While creating a new logical replication slot, skip restoring a persisted
snapshot to prevent data inconsistency.
---
 contrib/test_decoding/Makefile                |  3 +-
 .../slot_create_skip_exist_snapshot.out       | 52 +++++++++++++++++++
 contrib/test_decoding/meson.build             |  1 +
 .../slot_create_skip_exist_snapshot.spec      | 34 ++++++++++++
 src/backend/replication/logical/logical.c     |  2 +
 src/backend/replication/logical/snapbuild.c   | 22 +++++++-
 src/include/replication/snapbuild.h           |  1 +
 7 files changed, 113 insertions(+), 2 deletions(-)
 create mode 100644 contrib/test_decoding/expected/slot_create_skip_exist_snapshot.out
 create mode 100644 contrib/test_decoding/specs/slot_create_skip_exist_snapshot.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index c7ce603706..dacc383898 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -8,7 +8,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 	spill slot truncate stream stats twophase twophase_stream
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
-	twophase_snapshot slot_creation_error catalog_change_snapshot
+	twophase_snapshot slot_creation_error catalog_change_snapshot \
+	slot_create_skip_exist_snapshot
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/slot_create_skip_exist_snapshot.out b/contrib/test_decoding/expected/slot_create_skip_exist_snapshot.out
new file mode 100644
index 0000000000..eb5a26d0ac
--- /dev/null
+++ b/contrib/test_decoding/expected/slot_create_skip_exist_snapshot.out
@@ -0,0 +1,52 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s0_init s0_begin s0_insert1 s1_init s2_checkpoint s2_get_changes s0_insert2 s0_commit s1_get_changes_slot1 s1_get_changes_slot2 s1_drop_slot s0_drop_slot
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding');
+?column?
+--------
+init    
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_insert1: INSERT INTO tbl1 VALUES (1);
+step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot2', 'test_decoding'); <waiting ...>
+step s2_checkpoint: CHECKPOINT;
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_insert2: INSERT INTO tbl1 VALUES (2);
+step s0_commit: COMMIT;
+step s1_init: <... completed>
+?column?
+--------
+init    
+(1 row)
+
+step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'include-xids', '0');
+data                                      
+------------------------------------------
+BEGIN                                     
+table public.tbl1: INSERT: val1[integer]:1
+table public.tbl1: INSERT: val1[integer]:2
+COMMIT                                    
+(4 rows)
+
+step s1_get_changes_slot2: SELECT data FROM pg_logical_slot_get_changes('slot2', NULL, NULL, 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s1_drop_slot: SELECT pg_drop_replication_slot('slot2');
+pg_drop_replication_slot
+------------------------
+                        
+(1 row)
+
+step s0_drop_slot: SELECT pg_drop_replication_slot('slot1');
+pg_drop_replication_slot
+------------------------
+                        
+(1 row)
+
diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build
index f1548c0faf..b665039eb7 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -62,6 +62,7 @@ tests += {
       'concurrent_stream',
       'twophase_snapshot',
       'slot_creation_error',
+      'slot_create_skip_exist_snapshot',
     ],
     'regress_args': [
       '--temp-config', files('logical.conf'),
diff --git a/contrib/test_decoding/specs/slot_create_skip_exist_snapshot.spec b/contrib/test_decoding/specs/slot_create_skip_exist_snapshot.spec
new file mode 100644
index 0000000000..1c15d12647
--- /dev/null
+++ b/contrib/test_decoding/specs/slot_create_skip_exist_snapshot.spec
@@ -0,0 +1,34 @@
+# When a new slot is created, it should not use the existing snapshots available.
+
+setup
+{
+    DROP TABLE IF EXISTS tbl1;
+    CREATE TABLE tbl1 (val1 integer);
+}
+
+teardown
+{
+    DROP TABLE tbl1;
+}
+
+session "s0"
+step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); }
+step "s0_begin" { BEGIN; }
+step "s0_insert1" { INSERT INTO tbl1 VALUES (1); }
+step "s0_insert2" { INSERT INTO tbl1 VALUES (2); }
+step "s0_commit" { COMMIT; }
+step "s0_drop_slot" { SELECT pg_drop_replication_slot('slot1'); }
+
+session "s1"
+step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot2', 'test_decoding'); }
+step "s1_get_changes_slot1" { SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'include-xids', '0'); }
+step "s1_get_changes_slot2" { SELECT data FROM pg_logical_slot_get_changes('slot2', NULL, NULL, 'include-xids', '0'); }
+step "s1_drop_slot" { SELECT pg_drop_replication_slot('slot2'); }
+
+session "s2"
+step "s2_checkpoint" { CHECKPOINT; }
+step "s2_get_changes" { SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'include-xids', '0'); }
+
+# During s2_checkpoint, slot1's snapshot will be persisted. When slot2
+# is creating a snapshot any or the existing snapshots should not be used.
+permutation "s0_init" "s0_begin" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes" "s0_insert2" "s0_commit" "s1_get_changes_slot1" "s1_get_changes_slot2" "s1_drop_slot" "s0_drop_slot"
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index ca09c683f1..f4e9a5312a 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -633,6 +633,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 {
 	ReplicationSlot *slot = ctx->slot;
 
+	SnapBuildSkipReadPeristedSnapshot(ctx->snapshot_builder);
+
 	/* Initialize from where to start reading WAL. */
 	XLogBeginRead(ctx->reader, slot->data.restart_lsn);
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index a0b7947d2f..419d460f9c 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -212,6 +212,15 @@ struct SnapBuild
 	 */
 	TransactionId next_phase_at;
 
+	/*
+	 * Used to mark the current snapbuild instance in a create mode.
+	 * This create mode is required to prevent the snapbuilder from
+	 * restoring a snapshot from disk to become consistent as this can
+	 * cause the restart and confirmed flush LSNs of the slot to become
+	 * initialized to unsafe values.
+	 */
+	bool 	skip_read_persisted_snapshot;
+
 	/*
 	 * Array of transactions which could have catalog changes that committed
 	 * between xmin and xmax.
@@ -350,6 +359,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
 	builder->start_decoding_at = start_lsn;
 	builder->building_full_snapshot = need_full_snapshot;
 	builder->two_phase_at = two_phase_at;
+	builder->skip_read_persisted_snapshot = false;
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -375,6 +385,15 @@ FreeSnapshotBuilder(SnapBuild *builder)
 	MemoryContextDelete(context);
 }
 
+/*
+ * Skip reading a persisted snapshot.
+ */
+void
+SnapBuildSkipReadPeristedSnapshot(SnapBuild *builder)
+{
+	builder->skip_read_persisted_snapshot = true;
+}
+
 /*
  * Free an unreferenced snapshot that has previously been built by us.
  */
@@ -1396,8 +1415,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 
 		return false;
 	}
-	/* b) valid on disk state and not building full snapshot */
+	/* b) valid on disk state, we are an exisiting slot, and not building full snapshot */
 	else if (!builder->building_full_snapshot &&
+			 !builder->skip_read_persisted_snapshot &&
 			 SnapBuildRestore(builder, lsn))
 	{
 		/* there won't be any state to cleanup */
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index fbdf362396..f14d7b8f7a 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -65,6 +65,7 @@ extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *reorder,
 										  XLogRecPtr two_phase_at);
 extern void FreeSnapshotBuilder(SnapBuild *builder);
 
+extern void SnapBuildSkipReadPeristedSnapshot(SnapBuild *builder);
 extern void SnapBuildSnapDecRefcount(Snapshot snap);
 
 extern Snapshot SnapBuildInitialSnapshot(SnapBuild *builder);
-- 
2.34.1

