diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index c7ce603706..a4ba1a509a 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ - twophase_snapshot slot_creation_error catalog_change_snapshot + twophase_snapshot slot_creation_error catalog_change_snapshot \ + skip_snapshot_restore REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build index f1548c0faf..f643dc81a2 100644 --- a/contrib/test_decoding/meson.build +++ b/contrib/test_decoding/meson.build @@ -62,6 +62,7 @@ tests += { 'concurrent_stream', 'twophase_snapshot', 'slot_creation_error', + 'skip_snapshot_restore', ], 'regress_args': [ '--temp-config', files('logical.conf'), diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index ca09c683f1..96e841d77f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -639,6 +639,12 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X", LSN_FORMAT_ARGS(slot->data.restart_lsn)); + /* + * Tell snapshot builder to start finding the start point, preventing + * it from restoring snapshots. + */ + SnapBuildFindingStartPoint(ctx->snapshot_builder, true); + /* Wait for a consistent starting point */ for (;;) { @@ -661,6 +667,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) CHECK_FOR_INTERRUPTS(); } + SnapBuildFindingStartPoint(ctx->snapshot_builder, false); + SpinLockAcquire(&slot->mutex); slot->data.confirmed_flush = ctx->reader->EndRecPtr; if (slot->data.two_phase) diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index a0b7947d2f..be2ec367ff 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -190,6 +190,15 @@ struct SnapBuild /* Indicates if we are building full snapshot or just catalog one. */ bool building_full_snapshot; + /* + * True if we are finding the logical decoding start point. While finding + * the start point, we need to prevent the snapshot being restored + * from disk and require all in-progress transactions to finish, in order + * to become the consistent state. Otherwise the slot's restart and + * confirmed flush LSNs could be initialized to unsafe values. + */ + bool finding_start_point; + /* * Snapshot that's valid to see the catalog state seen at this moment. */ @@ -427,6 +436,15 @@ SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr) builder->two_phase_at = ptr; } +/* + * Set we are finding the logical decoding start point. + */ +void +SnapBuildFindingStartPoint(SnapBuild *builder, bool finding) +{ + builder->finding_start_point = finding; +} + /* * Should the contents of transaction ending at 'ptr' be decoded? */ @@ -1331,7 +1349,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * snapshot to disk that we can use. Can't use this method for the * initial snapshot when slot is being created and needs full snapshot * for export or direct use, as that snapshot will only contain catalog - * modifying transactions. + * modifying transactions. We cannot use this method also when finding + * the logical decoding start point, as it requires all in-progress + * transaction to finish. * * c) First incrementally build a snapshot for catalog tuples * (BUILDING_SNAPSHOT), that requires all, already in-progress, @@ -1396,8 +1416,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn return false; } - /* b) valid on disk state and not building full snapshot */ + /* + * b) valid on disk state and neither building full snapshot nor + * finding the start point. + */ else if (!builder->building_full_snapshot && + !builder->finding_start_point && SnapBuildRestore(builder, lsn)) { /* there won't be any state to cleanup */ @@ -1581,7 +1605,7 @@ typedef struct SnapBuildOnDisk offsetof(SnapBuildOnDisk, version) #define SNAPBUILD_MAGIC 0x51A1E001 -#define SNAPBUILD_VERSION 5 +#define SNAPBUILD_VERSION 6 /* * Store/Load a snapshot from disk, depending on the snapshot builder's state. diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index fbdf362396..83d35bc61a 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -90,5 +90,6 @@ extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, struct xl_running_xacts *running); extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn); +extern void SnapBuildFindingStartPoint(SnapBuild *builder, bool finding); #endif /* SNAPBUILD_H */