From 560c41d3334f7c50f9a1c69e5c5b0d297f8920e0 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 13 Mar 2024 09:28:00 +0000 Subject: [PATCH] fix snapbuild bug by approach a --- contrib/test_decoding/Makefile | 3 +- .../expected/skip_snapshot_restore.out | 82 +++++++++++++++++++ contrib/test_decoding/meson.build | 1 + .../specs/skip_snapshot_restore.spec | 58 +++++++++++++ src/backend/replication/logical/logical.c | 11 +-- src/backend/replication/slotfuncs.c | 1 - src/backend/replication/walsender.c | 8 +- src/bin/pg_basebackup/pg_createsubscriber.c | 5 ++ src/include/replication/logical.h | 1 - 9 files changed, 153 insertions(+), 17 deletions(-) create mode 100644 contrib/test_decoding/expected/skip_snapshot_restore.out create mode 100644 contrib/test_decoding/specs/skip_snapshot_restore.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index c7ce603706..a4ba1a509a 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ - twophase_snapshot slot_creation_error catalog_change_snapshot + twophase_snapshot slot_creation_error catalog_change_snapshot \ + skip_snapshot_restore REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/skip_snapshot_restore.out b/contrib/test_decoding/expected/skip_snapshot_restore.out new file mode 100644 index 0000000000..c1848478a5 --- /dev/null +++ b/contrib/test_decoding/expected/skip_snapshot_restore.out @@ -0,0 +1,82 @@ +unused step name: s0_savepoint_release +Parsed test spec with 3 sessions + +starting permutation: s0_init s0_begin s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert2 s0_commit s1_get_changes_slot0 s1_get_changes_slot1 +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_insert1: INSERT INTO tbl VALUES (1); +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); +step s2_checkpoint: CHECKPOINT; +step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_insert2: INSERT INTO tbl VALUES (2); +step s0_commit: COMMIT; +step s1_init: <... completed> +?column? +-------- +init +(1 row) + +step s1_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +----------------------------------------- +BEGIN +table public.tbl: INSERT: val1[integer]:1 +table public.tbl: INSERT: val1[integer]:2 +COMMIT +(4 rows) + +step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + + +starting permutation: s0_init s0_begin s0_savepoint s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert_cat s0_commit s1_get_changes_slot1 +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp0; +step s0_insert1: INSERT INTO tbl VALUES (1); +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); +step s2_checkpoint: CHECKPOINT; +step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_insert_cat: INSERT INTO tbl VALUES (1); +step s0_commit: COMMIT; +step s1_init: <... completed> +?column? +-------- +init +(1 row) + +step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build index f1548c0faf..f643dc81a2 100644 --- a/contrib/test_decoding/meson.build +++ b/contrib/test_decoding/meson.build @@ -62,6 +62,7 @@ tests += { 'concurrent_stream', 'twophase_snapshot', 'slot_creation_error', + 'skip_snapshot_restore', ], 'regress_args': [ '--temp-config', files('logical.conf'), diff --git a/contrib/test_decoding/specs/skip_snapshot_restore.spec b/contrib/test_decoding/specs/skip_snapshot_restore.spec new file mode 100644 index 0000000000..3c52e5de9e --- /dev/null +++ b/contrib/test_decoding/specs/skip_snapshot_restore.spec @@ -0,0 +1,58 @@ +# Test that a slot creation skips to restore serialized snapshot to reach +# the consistent state. + +setup +{ + DROP TABLE IF EXISTS tbl; + DROP TABLE IF EXISTS user_cat; + CREATE TABLE tbl (val1 integer); + CREATE TABLE user_cat (val1 integer) WITH (user_catalog_table = true); +} + +teardown +{ + DROP TABLE tbl; + DROP TABLE user_cat; + SELECT 'stop' FROM pg_drop_replication_slot('slot0'); + SELECT 'stop' FROM pg_drop_replication_slot('slot1'); +} + +session "s0" +setup { SET synchronous_commit = on; } +step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); } +step "s0_begin" { BEGIN; } +step "s0_insert1" { INSERT INTO tbl VALUES (1); } +step "s0_insert2" { INSERT INTO tbl VALUES (2); } +step "s0_insert_cat" { INSERT INTO tbl VALUES (1); } +step "s0_savepoint" { SAVEPOINT sp0; } +step "s0_savepoint_release" { RELEASE SAVEPOINT sp0; } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit = on; } +step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); } +step "s1_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } +step "s1_get_changes_slot1" { SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + +session "s2" +setup { SET synchronous_commit = on ;} +step "s2_checkpoint" { CHECKPOINT; } +step "s2_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + + +# While 'slot1' creation by "s1_init" waits for s0-transaction to commit, the +# RUNNING_XACTS record is written by "s2_checkpoint" and "s2_get_changes_slot1" +# serializes consistent snapshots to the disk at LSNs where are before +# s0-transaction's commit. After s0-transaction commits, "s1_init" resumes but +# must not restore any serialized snapshots and reach the consistent state until +# decoding s0-transaction's commit recrod. We check if the get_changes on 'slot1' +# will not return any s0-transaction's changes as its confirmed_flush_lsn will +# be after the s0-transaction's commit record. +permutation "s0_init" "s0_begin" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert2" "s0_commit" "s1_get_changes_slot0" "s1_get_changes_slot1" + +# The last decoding restarts from the NEW_CID record in the subtransaction. +# While processing it, a same ReorderBufferChange entry would be associated +# with both the top and the sub transaction, as the first entry. This breaks +# an assumption in AssertTXNLsnOrder() which the first_lsn of entries must be +# strictly higher than previous. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert_cat" "s0_commit" "s1_get_changes_slot1" diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 51ffb623c0..e1d60f244d 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -306,8 +306,6 @@ StartupDecodingContext(List *output_plugin_options, * * plugin -- contains the name of the output plugin * output_plugin_options -- contains options passed to the output plugin - * need_full_snapshot -- if true, must obtain a snapshot able to read all - * tables; if false, one that can read only catalogs is acceptable. * restart_lsn -- if given as invalid, it's this routine's responsibility to * mark WAL as reserved by setting a convenient restart_lsn for the slot. * Otherwise, we set for decoding to start from the given LSN without @@ -327,7 +325,6 @@ StartupDecodingContext(List *output_plugin_options, LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, - bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, @@ -420,13 +417,12 @@ CreateInitDecodingContext(const char *plugin, */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot); + xmin_horizon = GetOldestSafeDecodingTransactionId(false); SpinLockAcquire(&slot->mutex); slot->effective_catalog_xmin = xmin_horizon; slot->data.catalog_xmin = xmin_horizon; - if (need_full_snapshot) - slot->effective_xmin = xmin_horizon; + slot->effective_xmin = xmin_horizon; SpinLockRelease(&slot->mutex); ReplicationSlotsComputeRequiredXmin(true); @@ -437,7 +433,8 @@ CreateInitDecodingContext(const char *plugin, ReplicationSlotSave(); ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, - need_full_snapshot, false, + true, /* need_full_snapshot */ + false, /* fast_forward */ xl_routine, prepare_write, do_write, update_progress); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ad79e1fccd..6fb0fd0010 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -148,7 +148,6 @@ create_logical_replication_slot(char *name, char *plugin, * this point that the output plugin is validated. */ ctx = CreateInitDecodingContext(plugin, NIL, - false, /* just catalogs is OK */ restart_lsn, XL_ROUTINE(.page_read = read_local_xlog_page, .segment_open = wal_segment_open, diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index bc40c454de..e2924ba91a 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1237,7 +1237,6 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) else { LogicalDecodingContext *ctx; - bool need_full_snapshot = false; Assert(cmd->kind == REPLICATION_KIND_LOGICAL); @@ -1265,8 +1264,6 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must not be called inside a transaction", "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')"))); - - need_full_snapshot = true; } else if (snapshot_action == CRS_USE_SNAPSHOT) { @@ -1298,12 +1295,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must not be called in a subtransaction", "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')"))); - - need_full_snapshot = true; } - ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, - InvalidXLogRecPtr, + ctx = CreateInitDecodingContext(cmd->plugin, NIL, InvalidXLogRecPtr, XL_ROUTINE(.page_read = logical_read_xlog_page, .segment_open = WalSndSegmentOpen, .segment_close = wal_segment_close), diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index c79ed9044c..35a6192817 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -579,6 +579,7 @@ static char * setup_publisher(struct LogicalRepInfo *dbinfo) { char *lsn = NULL; + PGconn *conn; for (int i = 0; i < num_dbs; i++) { @@ -654,6 +655,10 @@ setup_publisher(struct LogicalRepInfo *dbinfo) disconnect_database(conn, false); } + conn = connect_database(dbinfo[0].pubconninfo, true); + PQclear((conn, "SELECT * FROM pg_switch_wal()")); + disconnect_database(conn, false); + return lsn; } diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index dc2df4ce92..f1b40b4b92 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -119,7 +119,6 @@ extern void CheckLogicalDecodingRequirements(void); extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin, List *output_plugin_options, - bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, -- 2.43.0