From c276b302c33f97660e6facc813c6f7c5a185260a Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Tue, 26 Sep 2023 19:31:42 +0800 Subject: [PATCH v2 2/2] Move in_streaming to output private data The in_streaming flag tracks the streaming of one pgoutput instance, so it would be better to stores it as part of the output plugin's private data. --- src/backend/replication/pgoutput/pgoutput.c | 34 +++++++++++---------- src/include/replication/pgoutput.h | 3 ++ 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 98a3900da8..cf54c61455 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -81,7 +81,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static bool publications_valid; -static bool in_streaming; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, @@ -476,9 +475,6 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("streaming requested, but not supported by output plugin"))); - /* Also remember we're currently not streaming any transaction. */ - in_streaming = false; - /* * Here, we just check whether the two-phase option is passed by * plugin and decide whether to enable it at later point of time. It @@ -676,6 +672,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry) { + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; bool schema_sent; TransactionId xid = InvalidTransactionId; TransactionId topxid = InvalidTransactionId; @@ -688,7 +685,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, * If we're not in a streaming block, just use InvalidTransactionId and * the write methods will not include it. */ - if (in_streaming) + if (data->in_streaming) xid = change->txn->xid; if (rbtxn_is_subtxn(change->txn)) @@ -708,7 +705,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, * doing that we need to study its impact on the case where we have a mix * of streaming and non-streaming transactions. */ - if (in_streaming) + if (data->in_streaming) schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid); else schema_sent = relentry->schema_sent; @@ -732,7 +729,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, send_relation_and_attrs(relation, xid, ctx, relentry->columns); - if (in_streaming) + if (data->in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); else relentry->schema_sent = true; @@ -1418,7 +1415,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * their association and on aborts, it can discard the corresponding * changes. */ - if (in_streaming) + if (data->in_streaming) xid = change->txn->xid; relentry = get_rel_sync_entry(data, relation); @@ -1567,7 +1564,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TransactionId xid = InvalidTransactionId; /* Remember the xid for the change in streaming mode. See pgoutput_change. */ - if (in_streaming) + if (data->in_streaming) xid = change->txn->xid; old = MemoryContextSwitchTo(data->context); @@ -1636,7 +1633,7 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * Remember the xid for the message in streaming mode. See * pgoutput_change. */ - if (in_streaming) + if (data->in_streaming) xid = txn->xid; /* @@ -1740,10 +1737,11 @@ static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; bool send_replication_origin = txn->origin_id != InvalidRepOriginId; /* we can't nest streaming of transactions */ - Assert(!in_streaming); + Assert(!data->in_streaming); /* * If we already sent the first stream for this transaction then don't @@ -1761,7 +1759,7 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); /* we're streaming a chunk of transaction now */ - in_streaming = true; + data->in_streaming = true; } /* @@ -1771,15 +1769,17 @@ static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + /* we should be streaming a transaction */ - Assert(in_streaming); + Assert(data->in_streaming); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_stop(ctx->out); OutputPluginWrite(ctx, true); /* we've stopped streaming a transaction */ - in_streaming = false; + data->in_streaming = false; } /* @@ -1799,7 +1799,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx, * The abort should happen outside streaming block, even for streamed * transactions. The transaction has to be marked as streamed, though. */ - Assert(!in_streaming); + Assert(!data->in_streaming); /* determine the toplevel transaction */ toptxn = rbtxn_get_toptxn(txn); @@ -1824,11 +1824,13 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private; + /* * The commit should happen outside streaming block, even for streamed * transactions. The transaction has to be marked as streamed, though. */ - Assert(!in_streaming); + Assert(!data->in_streaming); Assert(rbtxn_is_streamed(txn)); OutputPluginUpdateProgress(ctx, false); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index b4a8015403..f3ba24949d 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -21,6 +21,9 @@ typedef struct PGOutputData * allocations */ MemoryContext cachectx; /* private memory context for cache data */ + bool in_streaming; /* true if we are streaming a chunk of + * transaction */ + /* client-supplied info: */ uint32 protocol_version; List *publication_names; -- 2.30.0.windows.2