diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index d91055a440..285cc30298 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -371,7 +371,7 @@ standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r); - SnapBuildProcessRunningXacts(builder, buf->origptr, running); + SnapBuildProcessRunningXacts(builder, buf->origptr, ctx->in_create, running); /* * Abort all transactions that we keep track of, that are diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 41243d0187..77400dc2d4 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -151,6 +151,7 @@ StartupDecodingContext(List *output_plugin_options, TransactionId xmin_horizon, bool need_full_snapshot, bool fast_forward, + bool in_create, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, @@ -200,6 +201,7 @@ StartupDecodingContext(List *output_plugin_options, } ctx->slot = slot; + ctx->in_create = in_create; ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx); if (!ctx->reader) @@ -437,9 +439,9 @@ CreateInitDecodingContext(const char *plugin, ReplicationSlotSave(); ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, - need_full_snapshot, false, - xl_routine, prepare_write, do_write, - update_progress); + need_full_snapshot, false /* fast_forward */, + true /* in_create */, xl_routine, + prepare_write, do_write, update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -572,9 +574,10 @@ CreateDecodingContext(XLogRecPtr start_lsn, } ctx = StartupDecodingContext(output_plugin_options, - start_lsn, InvalidTransactionId, false, - fast_forward, xl_routine, prepare_write, - do_write, update_progress); + start_lsn, InvalidTransactionId, + false /* need_full_snapshot */, fast_forward, + false /* in_create */, xl_routine, + prepare_write, do_write, update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 7a7aba33e1..f846710843 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -294,7 +294,8 @@ static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, Transaction uint32 xinfo); /* xlog reading helper functions for SnapBuildProcessRunningXacts */ -static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running); +static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, + const bool in_create, xl_running_xacts *running); static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff); /* serialization functions */ @@ -1210,7 +1211,8 @@ SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, * anymore. */ void -SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running) +SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, + const bool in_create, xl_running_xacts *running) { ReorderBufferTXN *txn; TransactionId xmin; @@ -1223,7 +1225,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact if (builder->state < SNAPBUILD_CONSISTENT) { /* returns false if there's no point in performing cleanup just yet */ - if (!SnapBuildFindSnapshot(builder, lsn, running)) + if (!SnapBuildFindSnapshot(builder, lsn, in_create, running)) return; } else @@ -1312,7 +1314,8 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact * using the xl_running_xacts record. */ static bool -SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running) +SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, const bool in_create, + xl_running_xacts *running) { /* --- * Build catalog decoding snapshot incrementally using information about @@ -1391,8 +1394,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn return false; } - /* b) valid on disk state and not building full snapshot */ + /* b) valid on disk state, we are an exisiting slot, and not building full snapshot */ else if (!builder->building_full_snapshot && + !in_create && SnapBuildRestore(builder, lsn)) { /* there won't be any state to cleanup */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 5f49554ea0..39a27f3db7 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -50,6 +50,12 @@ typedef struct LogicalDecodingContext */ bool fast_forward; + /* + * Marks the logical decoding context as being used for the initial creation + * of a logical replication slot. + */ + bool in_create; + OutputPluginCallbacks callbacks; OutputPluginOptions options; diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index f49b941b53..b84403f511 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -88,7 +88,7 @@ extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn, struct xl_heap_new_cid *xlrec); extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, - struct xl_running_xacts *running); + const bool in_create, struct xl_running_xacts *running); extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn); #endif /* SNAPBUILD_H */