From 633fd196d13b85b0566669c4bd6ed011b2f26a31 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Tue, 19 Mar 2024 02:08:57 +0000 Subject: [PATCH] Serialize running xacts --- contrib/test_decoding/Makefile | 3 +- .../expected/skip_snapshot_restore.out | 81 +++++++++++++++++++ contrib/test_decoding/meson.build | 1 + .../specs/skip_snapshot_restore.spec | 57 +++++++++++++ src/backend/replication/logical/snapbuild.c | 73 +++++++++++++++-- 5 files changed, 208 insertions(+), 7 deletions(-) create mode 100644 contrib/test_decoding/expected/skip_snapshot_restore.out create mode 100644 contrib/test_decoding/specs/skip_snapshot_restore.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index c7ce603706..a4ba1a509a 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ - twophase_snapshot slot_creation_error catalog_change_snapshot + twophase_snapshot slot_creation_error catalog_change_snapshot \ + skip_snapshot_restore REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/skip_snapshot_restore.out b/contrib/test_decoding/expected/skip_snapshot_restore.out new file mode 100644 index 0000000000..cb0040660b --- /dev/null +++ b/contrib/test_decoding/expected/skip_snapshot_restore.out @@ -0,0 +1,81 @@ +Parsed test spec with 3 sessions + +starting permutation: s0_init s0_begin s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert2 s0_commit s1_get_changes_slot0 s1_get_changes_slot1 +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_insert1: INSERT INTO tbl VALUES (1); +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); +step s2_checkpoint: CHECKPOINT; +step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_insert2: INSERT INTO tbl VALUES (2); +step s0_commit: COMMIT; +step s1_init: <... completed> +?column? +-------- +init +(1 row) + +step s1_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +----------------------------------------- +BEGIN +table public.tbl: INSERT: val1[integer]:1 +table public.tbl: INSERT: val1[integer]:2 +COMMIT +(4 rows) + +step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + + +starting permutation: s0_init s0_begin s0_savepoint s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert_cat s0_commit s1_get_changes_slot1 +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp0; +step s0_insert1: INSERT INTO tbl VALUES (1); +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); +step s2_checkpoint: CHECKPOINT; +step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_insert_cat: INSERT INTO tbl VALUES (1); +step s0_commit: COMMIT; +step s1_init: <... completed> +?column? +-------- +init +(1 row) + +step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build index f1548c0faf..f643dc81a2 100644 --- a/contrib/test_decoding/meson.build +++ b/contrib/test_decoding/meson.build @@ -62,6 +62,7 @@ tests += { 'concurrent_stream', 'twophase_snapshot', 'slot_creation_error', + 'skip_snapshot_restore', ], 'regress_args': [ '--temp-config', files('logical.conf'), diff --git a/contrib/test_decoding/specs/skip_snapshot_restore.spec b/contrib/test_decoding/specs/skip_snapshot_restore.spec new file mode 100644 index 0000000000..6cb6c83ad8 --- /dev/null +++ b/contrib/test_decoding/specs/skip_snapshot_restore.spec @@ -0,0 +1,57 @@ +# Test that a slot creation skips to restore serialized snapshot to reach +# the consistent state. + +setup +{ + DROP TABLE IF EXISTS tbl; + DROP TABLE IF EXISTS user_cat; + CREATE TABLE tbl (val1 integer); + CREATE TABLE user_cat (val1 integer) WITH (user_catalog_table = true); +} + +teardown +{ + DROP TABLE tbl; + DROP TABLE user_cat; + SELECT 'stop' FROM pg_drop_replication_slot('slot0'); + SELECT 'stop' FROM pg_drop_replication_slot('slot1'); +} + +session "s0" +setup { SET synchronous_commit = on; } +step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); } +step "s0_begin" { BEGIN; } +step "s0_insert1" { INSERT INTO tbl VALUES (1); } +step "s0_insert2" { INSERT INTO tbl VALUES (2); } +step "s0_insert_cat" { INSERT INTO tbl VALUES (1); } +step "s0_savepoint" { SAVEPOINT sp0; } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit = on; } +step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); } +step "s1_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } +step "s1_get_changes_slot1" { SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + +session "s2" +setup { SET synchronous_commit = on ;} +step "s2_checkpoint" { CHECKPOINT; } +step "s2_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + + +# While 'slot1' creation by "s1_init" waits for s0-transaction to commit, the +# RUNNING_XACTS record is written by "s2_checkpoint" and "s2_get_changes_slot1" +# serializes consistent snapshots to the disk at LSNs where are before +# s0-transaction's commit. After s0-transaction commits, "s1_init" resumes but +# must not restore any serialized snapshots and reach the consistent state until +# decoding s0-transaction's commit recrod. We check if the get_changes on 'slot1' +# will not return any s0-transaction's changes as its confirmed_flush_lsn will +# be after the s0-transaction's commit record. +permutation "s0_init" "s0_begin" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert2" "s0_commit" "s1_get_changes_slot0" "s1_get_changes_slot1" + +# The last decoding restarts from the NEW_CID record in the subtransaction. +# While processing it, a same ReorderBufferChange entry would be associated +# with both the top and the sub transaction, as the first entry. This breaks +# an assumption in AssertTXNLsnOrder() which the first_lsn of entries must be +# strictly higher than previous. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert_cat" "s0_commit" "s1_get_changes_slot1" diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index ac24b51860..c4f8dae8c9 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -273,6 +273,30 @@ struct SnapBuild /* This array must be sorted in xidComparator order */ TransactionId *xip; } catchange; + + /* + * Array of concurrently running transactions when the snapshot was + * serialized. + * + * The serialized snapshot can be used even by other replication slots. + * However, concurrent transactions might be partially decoded and output + * if a newly created slot finds the snapshot. The issue could happen when + * slot creation and serialization occur while running the same concurrent + * transaction. + * + * Now, to avoid the problem, we serialize concurrently running + * transactions. The array is used to check whether restoring the snapshot + * can be done. If one of the listed transactions is older than + * initial_xmin_horizon, we skip using the serialized snapshot. + */ + struct + { + /* number of transactions */ + size_t xcnt; + + /* Array of transacitons. No need to be ordered. */ + TransactionId *xip; + } running; }; /* @@ -302,7 +326,8 @@ static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff); /* serialization functions */ -static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn); +static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn, int xcnt, + TransactionId *xip); static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn); static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path); @@ -1231,7 +1256,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact return; } else - SnapBuildSerialize(builder, lsn); + SnapBuildSerialize(builder, lsn, running->xcnt, running->xids); /* * Update range of interesting xids based on the running xacts @@ -1580,7 +1605,7 @@ typedef struct SnapBuildOnDisk offsetof(SnapBuildOnDisk, version) #define SNAPBUILD_MAGIC 0x51A1E001 -#define SNAPBUILD_VERSION 5 +#define SNAPBUILD_VERSION 6 /* * Store/Load a snapshot from disk, depending on the snapshot builder's state. @@ -1594,7 +1619,7 @@ SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn) if (builder->state < SNAPBUILD_CONSISTENT) SnapBuildRestore(builder, lsn); else - SnapBuildSerialize(builder, lsn); + SnapBuildSerialize(builder, lsn, 0, NULL); } /* @@ -1602,7 +1627,8 @@ SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn) * been done by another decoding process. */ static void -SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) +SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn, int xcnt, + TransactionId *xip) { Size needed_length; SnapBuildOnDisk *ondisk = NULL; @@ -1701,7 +1727,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) catchange_xcnt = dclist_count(&builder->reorder->catchange_txns); needed_length = sizeof(SnapBuildOnDisk) + - sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt); + sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt + + xcnt); ondisk_c = palloc0(needed_length); ondisk = (SnapBuildOnDisk *) ondisk_c; @@ -1723,6 +1750,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) ondisk->builder.catchange.xip = NULL; /* update catchange only on disk data */ ondisk->builder.catchange.xcnt = catchange_xcnt; + /* update running only on disk data */ + ondisk->builder.running.xcnt = xcnt; COMP_CRC32C(ondisk->checksum, &ondisk->builder, @@ -1746,6 +1775,15 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) ondisk_c += sz; } + /* copy concurrently running xacts */ + if (xcnt > 0) + { + sz = sizeof(TransactionId) * xcnt; + memcpy(ondisk_c, xip, sz); + COMP_CRC32C(ondisk->checksum, ondisk_c, sz); + ondisk_c += sz; + } + FIN_CRC32C(ondisk->checksum); /* we have valid data now, open tempfile and write it there */ @@ -1920,6 +1958,15 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz); } + /* restore concurrently running xacts information */ + if (ondisk.builder.running.xcnt > 0) + { + sz = sizeof(TransactionId) * ondisk.builder.running.xcnt; + ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz); + SnapBuildRestoreContents(fd, (char *) ondisk.builder.running.xip, sz, path); + COMP_CRC32C(checksum, ondisk.builder.running.xip, sz); + } + if (CloseTransientFile(fd) != 0) ereport(ERROR, (errcode_for_file_access(), @@ -1954,6 +2001,18 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon)) goto snapshot_not_interesting; + /* + * Don't use a snapshot that one of listed transactions in running has + * smaller xid than initial_xmin_horizon. + */ + for (int i = 0; i < ondisk.builder.running.xcnt; i++) + { + TransactionId xid = ondisk.builder.running.xip[i]; + + if (!TransactionIdPrecedes(builder->initial_xmin_horizon, xid)) + goto snapshot_not_interesting; + } + /* * Consistent snapshots have no next phase. Reset next_phase_at as it is * possible that an old value may remain. @@ -2007,6 +2066,8 @@ snapshot_not_interesting: pfree(ondisk.builder.committed.xip); if (ondisk.builder.catchange.xip != NULL) pfree(ondisk.builder.catchange.xip); + if (ondisk.builder.running.xip != NULL) + pfree(ondisk.builder.running.xip); return false; } -- 2.43.0