From 24bdb1df051e33c42f1cb2da86f329eae9d4ff6d Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 25 Mar 2024 11:29:21 +0900 Subject: [PATCH] draft fix. --- contrib/test_decoding/Makefile | 3 +- .../expected/skip_snapshot_restore.out | 129 ++++++++++++++++++ .../specs/skip_snapshot_restore.spec | 58 ++++++++ src/backend/replication/logical/logical.c | 13 +- src/backend/replication/logical/slotsync.c | 3 +- src/backend/replication/logical/snapbuild.c | 16 ++- src/backend/storage/ipc/procarray.c | 9 +- src/include/replication/snapbuild.h | 4 +- src/include/storage/procarray.h | 3 +- 9 files changed, 228 insertions(+), 10 deletions(-) create mode 100644 contrib/test_decoding/expected/skip_snapshot_restore.out create mode 100644 contrib/test_decoding/specs/skip_snapshot_restore.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index c7ce603706..a4ba1a509a 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ - twophase_snapshot slot_creation_error catalog_change_snapshot + twophase_snapshot slot_creation_error catalog_change_snapshot \ + skip_snapshot_restore REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/skip_snapshot_restore.out b/contrib/test_decoding/expected/skip_snapshot_restore.out new file mode 100644 index 0000000000..fa4498fb11 --- /dev/null +++ b/contrib/test_decoding/expected/skip_snapshot_restore.out @@ -0,0 +1,129 @@ +Parsed test spec with 3 sessions + +starting permutation: s0_init s0_begin s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert2 s0_commit s1_get_changes_slot0 s1_get_changes_slot1 +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_insert1: INSERT INTO tbl VALUES (1); +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); +step s2_checkpoint: CHECKPOINT; +step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_insert2: INSERT INTO tbl VALUES (2); +step s0_commit: COMMIT; +step s1_init: <... completed> +?column? +-------- +init +(1 row) + +step s1_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +----------------------------------------- +BEGIN +table public.tbl: INSERT: val1[integer]:1 +table public.tbl: INSERT: val1[integer]:2 +COMMIT +(4 rows) + +step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + + +starting permutation: s0_init s0_insert1 s0_begin s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert2 s0_commit s1_get_changes_slot0 s1_get_changes_slot1 +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_insert1: INSERT INTO tbl VALUES (1); +step s0_begin: BEGIN; +step s0_insert1: INSERT INTO tbl VALUES (1); +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); +step s2_checkpoint: CHECKPOINT; +step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +----------------------------------------- +BEGIN +table public.tbl: INSERT: val1[integer]:1 +COMMIT +(3 rows) + +step s0_insert2: INSERT INTO tbl VALUES (2); +step s0_commit: COMMIT; +step s1_init: <... completed> +?column? +-------- +init +(1 row) + +step s1_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +----------------------------------------- +BEGIN +table public.tbl: INSERT: val1[integer]:1 +table public.tbl: INSERT: val1[integer]:2 +COMMIT +(4 rows) + +step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + + +starting permutation: s0_init s0_begin s0_savepoint s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert_cat s0_commit s1_get_changes_slot1 +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp0; +step s0_insert1: INSERT INTO tbl VALUES (1); +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); +step s2_checkpoint: CHECKPOINT; +step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_insert_cat: INSERT INTO tbl VALUES (1); +step s0_commit: COMMIT; +step s1_init: <... completed> +?column? +-------- +init +(1 row) + +step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/skip_snapshot_restore.spec b/contrib/test_decoding/specs/skip_snapshot_restore.spec new file mode 100644 index 0000000000..81c6a66039 --- /dev/null +++ b/contrib/test_decoding/specs/skip_snapshot_restore.spec @@ -0,0 +1,58 @@ +# Test that a slot creation skips to restore serialized snapshot to reach +# the consistent state. + +setup +{ + DROP TABLE IF EXISTS tbl; + DROP TABLE IF EXISTS user_cat; + CREATE TABLE tbl (val1 integer); + CREATE TABLE user_cat (val1 integer) WITH (user_catalog_table = true); +} + +teardown +{ + DROP TABLE tbl; + DROP TABLE user_cat; + SELECT 'stop' FROM pg_drop_replication_slot('slot0'); + SELECT 'stop' FROM pg_drop_replication_slot('slot1'); +} + +session "s0" +setup { SET synchronous_commit = on; } +step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); } +step "s0_begin" { BEGIN; } +step "s0_insert1" { INSERT INTO tbl VALUES (1); } +step "s0_insert2" { INSERT INTO tbl VALUES (2); } +step "s0_insert_cat" { INSERT INTO tbl VALUES (1); } +step "s0_savepoint" { SAVEPOINT sp0; } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit = on; } +step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); } +step "s1_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } +step "s1_get_changes_slot1" { SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + +session "s2" +setup { SET synchronous_commit = on ;} +step "s2_checkpoint" { CHECKPOINT; } +step "s2_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + + +# While 'slot1' creation by "s1_init" waits for s0-transaction to commit, the +# RUNNING_XACTS record is written by "s2_checkpoint" and "s2_get_changes_slot1" +# serializes consistent snapshots to the disk at LSNs where are before +# s0-transaction's commit. After s0-transaction commits, "s1_init" resumes but +# must not restore any serialized snapshots and reach the consistent state until +# decoding s0-transaction's commit recrod. We check if the get_changes on 'slot1' +# will not return any s0-transaction's changes as its confirmed_flush_lsn will +# be after the s0-transaction's commit record. +permutation "s0_init" "s0_begin" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert2" "s0_commit" "s1_get_changes_slot0" "s1_get_changes_slot1" +permutation "s0_init" "s0_insert1" "s0_begin" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert2" "s0_commit" "s1_get_changes_slot0" "s1_get_changes_slot1" + +# The last decoding restarts from the NEW_CID record in the subtransaction. +# While processing it, a same ReorderBufferChange entry would be associated +# with both the top and the sub transaction, as the first entry. This breaks +# an assumption in AssertTXNLsnOrder() which the first_lsn of entries must be +# strictly higher than previous. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert_cat" "s0_commit" "s1_get_changes_slot1" diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 51ffb623c0..bf1e9bf4e4 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -149,6 +149,7 @@ static LogicalDecodingContext * StartupDecodingContext(List *output_plugin_options, XLogRecPtr start_lsn, TransactionId xmin_horizon, + TransactionId oldest_running, bool need_full_snapshot, bool fast_forward, XLogReaderRoutine *xl_routine, @@ -210,8 +211,8 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = - AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn, - need_full_snapshot, slot->data.two_phase_at); + AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, oldest_running, + start_lsn, need_full_snapshot, slot->data.two_phase_at); ctx->reorder->private_data = ctx; @@ -335,6 +336,7 @@ CreateInitDecodingContext(const char *plugin, LogicalOutputPluginWriterUpdateProgress update_progress) { TransactionId xmin_horizon = InvalidTransactionId; + TransactionId oldest_running = InvalidTransactionId; ReplicationSlot *slot; NameData plugin_name; LogicalDecodingContext *ctx; @@ -420,7 +422,8 @@ CreateInitDecodingContext(const char *plugin, */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot); + xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot, + &oldest_running); SpinLockAcquire(&slot->mutex); slot->effective_catalog_xmin = xmin_horizon; @@ -437,6 +440,7 @@ CreateInitDecodingContext(const char *plugin, ReplicationSlotSave(); ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, + oldest_running, need_full_snapshot, false, xl_routine, prepare_write, do_write, update_progress); @@ -584,7 +588,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, } ctx = StartupDecodingContext(output_plugin_options, - start_lsn, InvalidTransactionId, false, + start_lsn, InvalidTransactionId, + InvalidTransactionId, false, fast_forward, xl_routine, prepare_write, do_write, update_progress); diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 30480960c5..d46b07c8b5 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -592,6 +592,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) { NameData plugin_name; TransactionId xmin_horizon = InvalidTransactionId; + TransactionId tmp = InvalidTransactionId; /* Skip creating the local slot if remote_slot is invalidated already */ if (remote_slot->invalidated != RS_INVAL_NONE) @@ -623,7 +624,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) reserve_wal_for_local_slot(remote_slot->restart_lsn); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - xmin_horizon = GetOldestSafeDecodingTransactionId(true); + xmin_horizon = GetOldestSafeDecodingTransactionId(true, &tmp); SpinLockAcquire(&slot->mutex); slot->effective_catalog_xmin = xmin_horizon; slot->data.catalog_xmin = xmin_horizon; diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index ac24b51860..94def02f11 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -185,6 +185,7 @@ struct SnapBuild * indicates there are no running xids with an xid smaller than this. */ TransactionId initial_xmin_horizon; + TransactionId initial_oldest_running; /* Indicates if we are building full snapshot or just catalog one. */ bool building_full_snapshot; @@ -315,6 +316,7 @@ static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char * SnapBuild * AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, + TransactionId oldest_running, XLogRecPtr start_lsn, bool need_full_snapshot, XLogRecPtr two_phase_at) @@ -346,6 +348,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, builder->catchange.xip = NULL; builder->initial_xmin_horizon = xmin_horizon; + builder->initial_oldest_running = oldest_running; builder->start_decoding_at = start_lsn; builder->building_full_snapshot = need_full_snapshot; builder->two_phase_at = two_phase_at; @@ -572,6 +575,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder) TransactionId xid; TransactionId safeXid; TransactionId *newxip; + TransactionId tmp; int newxcnt = 0; Assert(XactIsoLevel == XACT_REPEATABLE_READ); @@ -605,7 +609,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder) * the horizon is enforced. */ LWLockAcquire(ProcArrayLock, LW_SHARED); - safeXid = GetOldestSafeDecodingTransactionId(false); + safeXid = GetOldestSafeDecodingTransactionId(false, &tmp); LWLockRelease(ProcArrayLock); if (TransactionIdFollows(safeXid, snap->xmin)) @@ -1305,6 +1309,15 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact builder->last_serialized_snapshot); } +static bool +xxx_restore_ok(SnapBuild *builder, xl_running_xacts *running) +{ + if (!TransactionIdIsValid(builder->initial_oldest_running)) + return true; + + return TransactionIdPrecedes(builder->initial_oldest_running, + running->oldestRunningXid); +} /* * Build the start of a snapshot that's capable of decoding the catalog. @@ -1397,6 +1410,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn } /* b) valid on disk state and not building full snapshot */ else if (!builder->building_full_snapshot && + xxx_restore_ok(builder, running) && SnapBuildRestore(builder, lsn)) { /* there won't be any state to cleanup */ diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index b3cd248fb6..5fbdec69ba 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2929,10 +2929,11 @@ GetOldestActiveTransactionId(void) * that the caller will immediately use the xid to peg the xmin horizon. */ TransactionId -GetOldestSafeDecodingTransactionId(bool catalogOnly) +GetOldestSafeDecodingTransactionId(bool catalogOnly, TransactionId *oldestActiveXid_p) { ProcArrayStruct *arrayP = procArray; TransactionId oldestSafeXid; + TransactionId oldestActiveXid; int index; bool recovery_in_progress = RecoveryInProgress(); @@ -2948,6 +2949,7 @@ GetOldestSafeDecodingTransactionId(bool catalogOnly) */ LWLockAcquire(XidGenLock, LW_SHARED); oldestSafeXid = XidFromFullTransactionId(TransamVariables->nextXid); + oldestActiveXid = oldestSafeXid; /* * If there's already a slot pegging the xmin horizon, we can start with @@ -2998,11 +3000,16 @@ GetOldestSafeDecodingTransactionId(bool catalogOnly) if (TransactionIdPrecedes(xid, oldestSafeXid)) oldestSafeXid = xid; + + if (TransactionIdPrecedes(xid, oldestActiveXid)) + oldestActiveXid = xid; } } LWLockRelease(XidGenLock); + *oldestActiveXid_p = oldestActiveXid; + return oldestSafeXid; } diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index fbdf362396..63a380b692 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -60,7 +60,9 @@ struct xl_running_xacts; extern void CheckPointSnapBuild(void); extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *reorder, - TransactionId xmin_horizon, XLogRecPtr start_lsn, + TransactionId xmin_horizon, + TransactionId oldest_running, + XLogRecPtr start_lsn, bool need_full_snapshot, XLogRecPtr two_phase_at); extern void FreeSnapshotBuilder(SnapBuild *builder); diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 8ca6050462..d060941e4f 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -57,7 +57,8 @@ extern bool TransactionIdIsActive(TransactionId xid); extern TransactionId GetOldestNonRemovableTransactionId(Relation rel); extern TransactionId GetOldestTransactionIdConsideredRunning(void); extern TransactionId GetOldestActiveTransactionId(void); -extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly); +extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly, + TransactionId *oldestActiveXid_p); extern void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin); extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids, int type); -- 2.39.3