diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index c7ce603706..a4ba1a509a 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ - twophase_snapshot slot_creation_error catalog_change_snapshot + twophase_snapshot slot_creation_error catalog_change_snapshot \ + skip_snapshot_restore REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build index 7b05cc25a3..2dd3ede41b 100644 --- a/contrib/test_decoding/meson.build +++ b/contrib/test_decoding/meson.build @@ -62,6 +62,7 @@ tests += { 'concurrent_stream', 'twophase_snapshot', 'slot_creation_error', + 'skip_snapshot_restore', ], 'regress_args': [ '--temp-config', files('logical.conf'), diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 41243d0187..b57fc33cbb 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -637,6 +637,12 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X", LSN_FORMAT_ARGS(slot->data.restart_lsn)); + /* + * Tell snapshot builder to start finding the start point, preventing + * it from restoring snapshots. + */ + SnapBuildFindingStartPoint(ctx->snapshot_builder, true); + /* Wait for a consistent starting point */ for (;;) { @@ -659,6 +665,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) CHECK_FOR_INTERRUPTS(); } + SnapBuildFindingStartPoint(ctx->snapshot_builder, false); + SpinLockAcquire(&slot->mutex); slot->data.confirmed_flush = ctx->reader->EndRecPtr; if (slot->data.two_phase) diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 7a7aba33e1..e8dc7acbf3 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -278,6 +278,15 @@ struct SnapBuild static ResourceOwner SavedResourceOwnerDuringExport = NULL; static bool ExportInProgress = false; +/* + * True if we are finding the logical decoding start point. While finding + * the start point, we need to prevent the snapshot being restored + * from disk and require all in-progress transactions to finish, in order + * to become the consistent state. Otherwise the slot's restart and + * confirmed flush LSNs could be initialized to unsafe values. + */ +static bool FindingStartPoint = false; + /* ->committed and ->catchange manipulation */ static void SnapBuildPurgeOlderTxn(SnapBuild *builder); @@ -346,6 +355,8 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, builder->building_full_snapshot = need_full_snapshot; builder->two_phase_at = two_phase_at; + FindingStartPoint = false; + MemoryContextSwitchTo(oldcontext); return builder; @@ -422,6 +433,15 @@ SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr) builder->two_phase_at = ptr; } +/* + * Set we are finding the logical decoding start point. + */ +void +SnapBuildFindingStartPoint(SnapBuild *builder, bool finding) +{ + FindingStartPoint = finding; +} + /* * Should the contents of transaction ending at 'ptr' be decoded? */ @@ -1326,7 +1346,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * snapshot to disk that we can use. Can't use this method for the * initial snapshot when slot is being created and needs full snapshot * for export or direct use, as that snapshot will only contain catalog - * modifying transactions. + * modifying transactions. We cannot use this method also when finding + * the logical decoding start point, as it requires all in-progress + * transaction to finish. * * c) First incrementally build a snapshot for catalog tuples * (BUILDING_SNAPSHOT), that requires all, already in-progress, @@ -1391,8 +1413,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn return false; } - /* b) valid on disk state and not building full snapshot */ + /* + * b) valid on disk state and neither building full snapshot nor + * finding the start point. + */ else if (!builder->building_full_snapshot && + !FindingStartPoint && SnapBuildRestore(builder, lsn)) { /* there won't be any state to cleanup */ diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index f49b941b53..8239d6dbb3 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -90,5 +90,6 @@ extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, struct xl_running_xacts *running); extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn); +extern void SnapBuildFindingStartPoint(SnapBuild *builder, bool finding); #endif /* SNAPBUILD_H */