diff --git a/contrib/test_decoding/expected/slot_creation_consistency.out b/contrib/test_decoding/expected/slot_creation_consistency.out new file mode 100755 index 0000000000..4ee9ffea5d --- /dev/null +++ b/contrib/test_decoding/expected/slot_creation_consistency.out @@ -0,0 +1,133 @@ +Parsed test spec with 3 sessions + +starting permutation: s2_create s1_b s1_c s3_slot1_get s3_slot2_get +s1: NOTICE: table "test" does not exist, skipping +?column? +-------- +init +(1 row) + +step s2_create: SELECT 'init' FROM pg_create_logical_replication_slot('slot2', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s1_b: +BEGIN; +INSERT INTO test VALUES (1); + +step s1_c: +INSERT INTO test VALUES (2); +COMMIT; + +step s3_slot1_get: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data +--------------------------------------- +BEGIN +table public.test: INSERT: a[integer]:1 +table public.test: INSERT: a[integer]:2 +COMMIT +(4 rows) + +step s3_slot2_get: SELECT data FROM pg_logical_slot_get_changes('slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data +--------------------------------------- +BEGIN +table public.test: INSERT: a[integer]:1 +table public.test: INSERT: a[integer]:2 +COMMIT +(4 rows) + +pg_drop_replication_slot +------------------------ + +(1 row) + + +starting permutation: s1_b s2_create s3_checkpoint s3_slot1_get s1_c s3_slot1_get s3_slot2_get +?column? +-------- +init +(1 row) + +step s1_b: +BEGIN; +INSERT INTO test VALUES (1); + +step s2_create: SELECT 'init' FROM pg_create_logical_replication_slot('slot2', 'test_decoding'); +step s3_checkpoint: CHECKPOINT; +step s3_slot1_get: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data +---- +(0 rows) + +step s1_c: +INSERT INTO test VALUES (2); +COMMIT; + +step s2_create: <... completed> +?column? +-------- +init +(1 row) + +step s3_slot1_get: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data +--------------------------------------- +BEGIN +table public.test: INSERT: a[integer]:1 +table public.test: INSERT: a[integer]:2 +COMMIT +(4 rows) + +step s3_slot2_get: SELECT data FROM pg_logical_slot_get_changes('slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data +---- +(0 rows) + +pg_drop_replication_slot +------------------------ + +(1 row) + + +starting permutation: s1_b s1_c s2_create s3_slot1_get s3_slot2_get +?column? +-------- +init +(1 row) + +step s1_b: +BEGIN; +INSERT INTO test VALUES (1); + +step s1_c: +INSERT INTO test VALUES (2); +COMMIT; + +step s2_create: SELECT 'init' FROM pg_create_logical_replication_slot('slot2', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s3_slot1_get: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data +--------------------------------------- +BEGIN +table public.test: INSERT: a[integer]:1 +table public.test: INSERT: a[integer]:2 +COMMIT +(4 rows) + +step s3_slot2_get: SELECT data FROM pg_logical_slot_get_changes('slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data +---- +(0 rows) + +pg_drop_replication_slot +------------------------ + +(1 row) + diff --git a/contrib/test_decoding/specs/slot_creation_consistency.spec b/contrib/test_decoding/specs/slot_creation_consistency.spec new file mode 100755 index 0000000000..e988872e65 --- /dev/null +++ b/contrib/test_decoding/specs/slot_creation_consistency.spec @@ -0,0 +1,48 @@ +# Test that a slot will correctly replicate changes only from transactions that +# started after the slot was created. This is primarily done to validate that a slot +# cannot become consistent prematurely through a snapshot that was persisted by +# a different slot leading to a transaction being partially replicated. + +session "s1" +setup { +SET synchronous_commit=on; +BEGIN; +DROP TABLE IF EXISTS test; +CREATE TABLE test (a INT); +COMMIT; +SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); +} + +step s1_b { +BEGIN; +INSERT INTO test VALUES (1); +} +step s1_c { +INSERT INTO test VALUES (2); +COMMIT; +} + +teardown { +SELECT pg_drop_replication_slot('slot1'); +SELECT pg_drop_replication_slot('slot2'); +} + +session s2 +setup { SET synchronous_commit=on; } +step s2_create { SELECT 'init' FROM pg_create_logical_replication_slot('slot2', 'test_decoding'); } + +session s3 +setup { SET synchronous_commit=on; } +step s3_checkpoint { CHECKPOINT; } +step s3_slot1_get { SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } +step s3_slot2_get { SELECT data FROM pg_logical_slot_get_changes('slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } + +# Create and slot and then make changes. Already done heavily elsewhere, but addded for totality +permutation s2_create s1_b s1_c s3_slot1_get s3_slot2_get + +# Create a slot in the presence of an active transaction. The checkpoint and slot +# consumption will persist a snapshot to disk ahead of the current read position of the slot. +permutation s1_b s2_create s3_checkpoint s3_slot1_get s1_c s3_slot1_get s3_slot2_get + +# Create a slot after the completion of the active transaction +permutation s1_b s1_c s2_create s3_slot1_get s3_slot2_get