From 187a61e3a05bb41a8e06acda00c9b5cc6e438907 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Tue, 19 Mar 2024 02:08:57 +0000 Subject: [PATCH v2] Serialize running xacts --- contrib/test_decoding/Makefile | 3 +- .../expected/skip_snapshot_restore.out | 81 +++++++++++++++++++ contrib/test_decoding/meson.build | 1 + .../specs/skip_snapshot_restore.spec | 57 +++++++++++++ src/backend/replication/logical/snapbuild.c | 47 +++++++++-- 5 files changed, 183 insertions(+), 6 deletions(-) create mode 100644 contrib/test_decoding/expected/skip_snapshot_restore.out create mode 100644 contrib/test_decoding/specs/skip_snapshot_restore.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index c7ce603706..a4ba1a509a 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ - twophase_snapshot slot_creation_error catalog_change_snapshot + twophase_snapshot slot_creation_error catalog_change_snapshot \ + skip_snapshot_restore REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/skip_snapshot_restore.out b/contrib/test_decoding/expected/skip_snapshot_restore.out new file mode 100644 index 0000000000..cb0040660b --- /dev/null +++ b/contrib/test_decoding/expected/skip_snapshot_restore.out @@ -0,0 +1,81 @@ +Parsed test spec with 3 sessions + +starting permutation: s0_init s0_begin s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert2 s0_commit s1_get_changes_slot0 s1_get_changes_slot1 +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_insert1: INSERT INTO tbl VALUES (1); +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); +step s2_checkpoint: CHECKPOINT; +step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_insert2: INSERT INTO tbl VALUES (2); +step s0_commit: COMMIT; +step s1_init: <... completed> +?column? +-------- +init +(1 row) + +step s1_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +----------------------------------------- +BEGIN +table public.tbl: INSERT: val1[integer]:1 +table public.tbl: INSERT: val1[integer]:2 +COMMIT +(4 rows) + +step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + + +starting permutation: s0_init s0_begin s0_savepoint s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert_cat s0_commit s1_get_changes_slot1 +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp0; +step s0_insert1: INSERT INTO tbl VALUES (1); +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); +step s2_checkpoint: CHECKPOINT; +step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_insert_cat: INSERT INTO tbl VALUES (1); +step s0_commit: COMMIT; +step s1_init: <... completed> +?column? +-------- +init +(1 row) + +step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build index f1548c0faf..f643dc81a2 100644 --- a/contrib/test_decoding/meson.build +++ b/contrib/test_decoding/meson.build @@ -62,6 +62,7 @@ tests += { 'concurrent_stream', 'twophase_snapshot', 'slot_creation_error', + 'skip_snapshot_restore', ], 'regress_args': [ '--temp-config', files('logical.conf'), diff --git a/contrib/test_decoding/specs/skip_snapshot_restore.spec b/contrib/test_decoding/specs/skip_snapshot_restore.spec new file mode 100644 index 0000000000..6cb6c83ad8 --- /dev/null +++ b/contrib/test_decoding/specs/skip_snapshot_restore.spec @@ -0,0 +1,57 @@ +# Test that a slot creation skips to restore serialized snapshot to reach +# the consistent state. + +setup +{ + DROP TABLE IF EXISTS tbl; + DROP TABLE IF EXISTS user_cat; + CREATE TABLE tbl (val1 integer); + CREATE TABLE user_cat (val1 integer) WITH (user_catalog_table = true); +} + +teardown +{ + DROP TABLE tbl; + DROP TABLE user_cat; + SELECT 'stop' FROM pg_drop_replication_slot('slot0'); + SELECT 'stop' FROM pg_drop_replication_slot('slot1'); +} + +session "s0" +setup { SET synchronous_commit = on; } +step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); } +step "s0_begin" { BEGIN; } +step "s0_insert1" { INSERT INTO tbl VALUES (1); } +step "s0_insert2" { INSERT INTO tbl VALUES (2); } +step "s0_insert_cat" { INSERT INTO tbl VALUES (1); } +step "s0_savepoint" { SAVEPOINT sp0; } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit = on; } +step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); } +step "s1_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } +step "s1_get_changes_slot1" { SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + +session "s2" +setup { SET synchronous_commit = on ;} +step "s2_checkpoint" { CHECKPOINT; } +step "s2_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + + +# While 'slot1' creation by "s1_init" waits for s0-transaction to commit, the +# RUNNING_XACTS record is written by "s2_checkpoint" and "s2_get_changes_slot1" +# serializes consistent snapshots to the disk at LSNs where are before +# s0-transaction's commit. After s0-transaction commits, "s1_init" resumes but +# must not restore any serialized snapshots and reach the consistent state until +# decoding s0-transaction's commit recrod. We check if the get_changes on 'slot1' +# will not return any s0-transaction's changes as its confirmed_flush_lsn will +# be after the s0-transaction's commit record. +permutation "s0_init" "s0_begin" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert2" "s0_commit" "s1_get_changes_slot0" "s1_get_changes_slot1" + +# The last decoding restarts from the NEW_CID record in the subtransaction. +# While processing it, a same ReorderBufferChange entry would be associated +# with both the top and the sub transaction, as the first entry. This breaks +# an assumption in AssertTXNLsnOrder() which the first_lsn of entries must be +# strictly higher than previous. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert_cat" "s0_commit" "s1_get_changes_slot1" diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index ac24b51860..5fc33107d9 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -211,6 +211,23 @@ struct SnapBuild */ TransactionId next_phase_at; + /* + * The highest TransactionId which was running when the snapshot was + * serialized. + * + * The serialized snapshot can be used even by other replication slots. + * However, concurrent transactions might be partially decoded and output + * if a newly created slot finds the snapshot. The issue could happen when + * slot creation and serialization occur while running the same concurrent + * transaction. + * + * Now, to avoid the problem, we serialize a concurrently running + * transaction which have the highest TransactionId. The value is used to + * check whether restoring the snapshot can be done. If it is older than + * initial_xmin_horizon, we skip using the serialized snapshot. + */ + TransactionId highest; + /* * Array of transactions which could have catalog changes that committed * between xmin and xmax. @@ -302,7 +319,7 @@ static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff); /* serialization functions */ -static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn); +static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn, TransactionId highest); static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn); static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path); @@ -1231,7 +1248,18 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact return; } else - SnapBuildSerialize(builder, lsn); + { + TransactionId xid = InvalidTransactionId; + + /* If the concurrent transactions are recorded pass the latest one */ + for (int i = 0; i < running->xcnt; i++) + { + if (TransactionIdPrecedes(xid, running->xids[i])) + xid = running->xids[i]; + } + + SnapBuildSerialize(builder, lsn, xid); + } /* * Update range of interesting xids based on the running xacts @@ -1580,7 +1608,7 @@ typedef struct SnapBuildOnDisk offsetof(SnapBuildOnDisk, version) #define SNAPBUILD_MAGIC 0x51A1E001 -#define SNAPBUILD_VERSION 5 +#define SNAPBUILD_VERSION 6 /* * Store/Load a snapshot from disk, depending on the snapshot builder's state. @@ -1594,7 +1622,7 @@ SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn) if (builder->state < SNAPBUILD_CONSISTENT) SnapBuildRestore(builder, lsn); else - SnapBuildSerialize(builder, lsn); + SnapBuildSerialize(builder, lsn, InvalidTransactionId); } /* @@ -1602,7 +1630,7 @@ SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn) * been done by another decoding process. */ static void -SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) +SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn, TransactionId highest) { Size needed_length; SnapBuildOnDisk *ondisk = NULL; @@ -1715,6 +1743,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) ondisk_c += sizeof(SnapBuildOnDisk); memcpy(&ondisk->builder, builder, sizeof(SnapBuild)); + ondisk->builder.highest = highest; /* NULL-ify memory-only data */ ondisk->builder.context = NULL; ondisk->builder.snapshot = NULL; @@ -1954,6 +1983,14 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon)) goto snapshot_not_interesting; + /* + * Don't use a snapshot that one of listed transactions in running has + * smaller xid than initial_xmin_horizon. + */ + if (!TransactionIdPrecedes(builder->initial_xmin_horizon, + ondisk.builder.highest)) + goto snapshot_not_interesting; + /* * Consistent snapshots have no next phase. Reset next_phase_at as it is * possible that an old value may remain. -- 2.43.0