From 4c7f9b6d3845e1fe4c80ab0d884ce00eddd6036e Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 27 Mar 2024 07:09:48 +0000 Subject: [PATCH] Serialize a flag and use finding_start_point --- contrib/test_decoding/Makefile | 3 +- .../expected/skip_snapshot_restore.out | 129 ++++++++++++++++++ contrib/test_decoding/meson.build | 1 + .../specs/skip_snapshot_restore.spec | 60 ++++++++ src/backend/replication/logical/logical.c | 8 ++ src/backend/replication/logical/snapbuild.c | 47 ++++++- src/include/replication/snapbuild.h | 1 + 7 files changed, 242 insertions(+), 7 deletions(-) create mode 100644 contrib/test_decoding/expected/skip_snapshot_restore.out create mode 100644 contrib/test_decoding/specs/skip_snapshot_restore.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index c7ce603706..a4ba1a509a 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ - twophase_snapshot slot_creation_error catalog_change_snapshot + twophase_snapshot slot_creation_error catalog_change_snapshot \ + skip_snapshot_restore REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/skip_snapshot_restore.out b/contrib/test_decoding/expected/skip_snapshot_restore.out new file mode 100644 index 0000000000..243d1ee784 --- /dev/null +++ b/contrib/test_decoding/expected/skip_snapshot_restore.out @@ -0,0 +1,129 @@ +Parsed test spec with 3 sessions + +starting permutation: s0_init s0_begin s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert2 s0_commit s1_get_changes_slot0 s1_get_changes_slot1 +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_insert1: INSERT INTO tbl VALUES (1); +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); +step s2_checkpoint: CHECKPOINT; +step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_insert2: INSERT INTO tbl VALUES (2); +step s0_commit: COMMIT; +step s1_init: <... completed> +?column? +-------- +init +(1 row) + +step s1_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +----------------------------------------- +BEGIN +table public.tbl: INSERT: val1[integer]:1 +table public.tbl: INSERT: val1[integer]:2 +COMMIT +(4 rows) + +step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + + +starting permutation: s0_init s0_begin s0_savepoint s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert_cat s0_commit s1_get_changes_slot1 +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp0; +step s0_insert1: INSERT INTO tbl VALUES (1); +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); +step s2_checkpoint: CHECKPOINT; +step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_insert_cat: INSERT INTO tbl VALUES (1); +step s0_commit: COMMIT; +step s1_init: <... completed> +?column? +-------- +init +(1 row) + +step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + + +starting permutation: s0_init s0_insert1 s0_begin s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert2 s0_commit s1_get_changes_slot0 s1_get_changes_slot1 +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_insert1: INSERT INTO tbl VALUES (1); +step s0_begin: BEGIN; +step s0_insert1: INSERT INTO tbl VALUES (1); +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); +step s2_checkpoint: CHECKPOINT; +step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +----------------------------------------- +BEGIN +table public.tbl: INSERT: val1[integer]:1 +COMMIT +(3 rows) + +step s0_insert2: INSERT INTO tbl VALUES (2); +step s0_commit: COMMIT; +step s1_init: <... completed> +?column? +-------- +init +(1 row) + +step s1_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +----------------------------------------- +BEGIN +table public.tbl: INSERT: val1[integer]:1 +table public.tbl: INSERT: val1[integer]:2 +COMMIT +(4 rows) + +step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build index f1548c0faf..f643dc81a2 100644 --- a/contrib/test_decoding/meson.build +++ b/contrib/test_decoding/meson.build @@ -62,6 +62,7 @@ tests += { 'concurrent_stream', 'twophase_snapshot', 'slot_creation_error', + 'skip_snapshot_restore', ], 'regress_args': [ '--temp-config', files('logical.conf'), diff --git a/contrib/test_decoding/specs/skip_snapshot_restore.spec b/contrib/test_decoding/specs/skip_snapshot_restore.spec new file mode 100644 index 0000000000..73e6cd7b01 --- /dev/null +++ b/contrib/test_decoding/specs/skip_snapshot_restore.spec @@ -0,0 +1,60 @@ +# Test that a slot creation skips to restore serialized snapshot to reach +# the consistent state. + +setup +{ + DROP TABLE IF EXISTS tbl; + DROP TABLE IF EXISTS user_cat; + CREATE TABLE tbl (val1 integer); + CREATE TABLE user_cat (val1 integer) WITH (user_catalog_table = true); +} + +teardown +{ + DROP TABLE tbl; + DROP TABLE user_cat; + SELECT 'stop' FROM pg_drop_replication_slot('slot0'); + SELECT 'stop' FROM pg_drop_replication_slot('slot1'); +} + +session "s0" +setup { SET synchronous_commit = on; } +step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); } +step "s0_begin" { BEGIN; } +step "s0_insert1" { INSERT INTO tbl VALUES (1); } +step "s0_insert2" { INSERT INTO tbl VALUES (2); } +step "s0_insert_cat" { INSERT INTO tbl VALUES (1); } +step "s0_savepoint" { SAVEPOINT sp0; } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit = on; } +step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); } +step "s1_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } +step "s1_get_changes_slot1" { SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + +session "s2" +setup { SET synchronous_commit = on ;} +step "s2_checkpoint" { CHECKPOINT; } +step "s2_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + + +# While 'slot1' creation by "s1_init" waits for s0-transaction to commit, the +# RUNNING_XACTS record is written by "s2_checkpoint" and "s2_get_changes_slot1" +# serializes consistent snapshots to the disk at LSNs where are before +# s0-transaction's commit. After s0-transaction commits, "s1_init" resumes but +# must not restore any serialized snapshots and reach the consistent state until +# decoding s0-transaction's commit recrod. We check if the get_changes on 'slot1' +# will not return any s0-transaction's changes as its confirmed_flush_lsn will +# be after the s0-transaction's commit record. +permutation "s0_init" "s0_begin" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert2" "s0_commit" "s1_get_changes_slot0" "s1_get_changes_slot1" + +# The last decoding restarts from the NEW_CID record in the subtransaction. +# While processing it, a same ReorderBufferChange entry would be associated +# with both the top and the sub transaction, as the first entry. This breaks +# an assumption in AssertTXNLsnOrder() which the first_lsn of entries must be +# strictly higher than previous. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert_cat" "s0_commit" "s1_get_changes_slot1" + +permutation "s0_init" "s0_insert1" "s0_begin" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert2" "s0_commit" "s1_get_changes_slot0" "s1_get_changes_slot1" + diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 51ffb623c0..dcf9f841c3 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -649,6 +649,12 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X", LSN_FORMAT_ARGS(slot->data.restart_lsn)); + /* + * Tell snapshot builder to start finding the start point, preventing + * it from restoring snapshots. + */ + SnapBuildFindingStartPoint(ctx->snapshot_builder, true); + /* Wait for a consistent starting point */ for (;;) { @@ -671,6 +677,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) CHECK_FOR_INTERRUPTS(); } + SnapBuildFindingStartPoint(ctx->snapshot_builder, false); + SpinLockAcquire(&slot->mutex); slot->data.confirmed_flush = ctx->reader->EndRecPtr; if (slot->data.two_phase) diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index ac24b51860..95e592b1e0 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -189,6 +189,21 @@ struct SnapBuild /* Indicates if we are building full snapshot or just catalog one. */ bool building_full_snapshot; + /* + * True if we are finding the logical decoding start point. While finding + * the start point, we need to prevent the snapshot being restored + * from disk and require all in-progress transactions to finish, in order + * to become the consistent state. Otherwise the slot's restart and + * confirmed flush LSNs could be initialized to unsafe values. + */ + bool finding_start_point; + + /* + * True if there are concurrent transctions while being serialized. This + * flag is used with finding_start_point. See comments atop the attribute. + */ + bool is_there_running_xact; + /* * Snapshot that's valid to see the catalog state seen at this moment. */ @@ -302,7 +317,7 @@ static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff); /* serialization functions */ -static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn); +static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn, bool is_there_running_xact); static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn); static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path); @@ -426,6 +441,15 @@ SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr) builder->two_phase_at = ptr; } +/* + * Set we are finding the logical decoding start point. + */ +void +SnapBuildFindingStartPoint(SnapBuild *builder, bool finding) +{ + builder->finding_start_point = finding; +} + /* * Should the contents of transaction ending at 'ptr' be decoded? */ @@ -1231,7 +1255,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact return; } else - SnapBuildSerialize(builder, lsn); + SnapBuildSerialize(builder, lsn, running->xcnt > 0); /* * Update range of interesting xids based on the running xacts @@ -1330,7 +1354,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * snapshot to disk that we can use. Can't use this method for the * initial snapshot when slot is being created and needs full snapshot * for export or direct use, as that snapshot will only contain catalog - * modifying transactions. + * modifying transactions. We cannot use this method also when finding + * the logical decoding start point, as it requires all in-progress + * transaction to finish. * * c) First incrementally build a snapshot for catalog tuples * (BUILDING_SNAPSHOT), that requires all, already in-progress, @@ -1580,7 +1606,7 @@ typedef struct SnapBuildOnDisk offsetof(SnapBuildOnDisk, version) #define SNAPBUILD_MAGIC 0x51A1E001 -#define SNAPBUILD_VERSION 5 +#define SNAPBUILD_VERSION 6 /* * Store/Load a snapshot from disk, depending on the snapshot builder's state. @@ -1594,7 +1620,7 @@ SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn) if (builder->state < SNAPBUILD_CONSISTENT) SnapBuildRestore(builder, lsn); else - SnapBuildSerialize(builder, lsn); + SnapBuildSerialize(builder, lsn, false); } /* @@ -1602,7 +1628,7 @@ SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn) * been done by another decoding process. */ static void -SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) +SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn, bool is_there_running_xact) { Size needed_length; SnapBuildOnDisk *ondisk = NULL; @@ -1715,6 +1741,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) ondisk_c += sizeof(SnapBuildOnDisk); memcpy(&ondisk->builder, builder, sizeof(SnapBuild)); + ondisk->builder.is_there_running_xact = is_there_running_xact; /* NULL-ify memory-only data */ ondisk->builder.context = NULL; ondisk->builder.snapshot = NULL; @@ -1954,6 +1981,14 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon)) goto snapshot_not_interesting; + /* + * Don't use a snapshot if we are finding the logical decoding start point + * and concurrent transactions occur while the snapshot has been + * serialized. + */ + if (builder->finding_start_point && ondisk.builder.is_there_running_xact) + goto snapshot_not_interesting; + /* * Consistent snapshots have no next phase. Reset next_phase_at as it is * possible that an old value may remain. diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index fbdf362396..83d35bc61a 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -90,5 +90,6 @@ extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, struct xl_running_xacts *running); extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn); +extern void SnapBuildFindingStartPoint(SnapBuild *builder, bool finding); #endif /* SNAPBUILD_H */ -- 2.43.0