Move global variables of pgoutput to plugin private scope.
Hi,
Per complain in another thread[1]/messages/by-id/20230821182732.t3qc75i5s5xvovls@awork3.anarazel.de, I started to look into the global
variables in pgoutput.
Currently we have serval global variables in pgoutput, but each of them is
inherently local to an individual pgoutput instance. This could cause issues if
we switch to different output plugin instance in one session and could miss to
reset their value in case of errors. The analysis for each variable is as
follows:
- static HTAB *RelationSyncCache = NULL;
pgoutput creates this hash table under cacheMemoryContext to remember the
relation schemas that have been sent, but it's local to an individual pgoutput
instance, and because it's under global memory context, the hashtable is not
properly cleared in error paths which means it has a risk of being accessed in
a different output plugin instance. This was also mentioned in another thread[2]/messages/by-id/CAA4eK1LJ=CSsxETs5ydqP58OiWPiwodx=Jqw89LQ7fMrRWqK9w@mail.gmail.com.
So I think we'd better allocate this under output plugin private context.
But note that, instead of completely moving the hash table into the output
plugin private data, we need to to keep the static pointer variable for the map to
be accessed by the syscache callbacks. This is because syscache callbacks won't
be un-registered even after shutting down the output plugin, so we need a
static pointer to cache the map pointer so that callbacks can check it.
- static bool publish_no_origin;
This flag is also local to pgoutput instance, and we didn't reset the flag in
output shutdown callback, so if we consume changes from different slots, then
the second call would reuse the flag value that is set in the first call which
is unexpected. To completely avoid this issue, we think we'd better move this
flag to output plugin private data structure.
Example:
SELECT data FROM pg_logical_slot_peek_binary_changes('isolation_slot_1', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub', 'origin', 'none'); --- Set origin in this call.
SELECT data FROM pg_logical_slot_peek_binary_changes('isolation_slot_2', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub'); -- Didn't set origin, but will reuse the origin flag in the first call.
- static bool in_streaming;
While on it, I feel we can also move this flag to private data, although I didn't
see problems for this one.
- static bool publications_valid;
I thought we need to move this to private data as well, but we need to access this in a
syscache callback, which means we need to keep the static variable.
Attach the patches to change in_streaming, publish_no_origin and RelationSyncCache.
Suggestions and comments are welcome.
[1]: /messages/by-id/20230821182732.t3qc75i5s5xvovls@awork3.anarazel.de
[2]: /messages/by-id/CAA4eK1LJ=CSsxETs5ydqP58OiWPiwodx=Jqw89LQ7fMrRWqK9w@mail.gmail.com
Best Regards,
Hou Zhijie
Attachments:
v1-0001-Allocate-RelationSyncCache-of-pgoutput-under-priv.patchapplication/octet-stream; name=v1-0001-Allocate-RelationSyncCache-of-pgoutput-under-priv.patchDownload
From fb423bb016482e8bf553f98485346540602f7bae Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Fri, 25 Aug 2023 17:07:13 +0800
Subject: [PATCH v1 1/3] Allocate RelationSyncCache of pgoutput under private
memory context
The pgoutput module creates a map under cacheMemoryContext to remember the
relation schemas that have been sent.
However, this map is inherently local to an individual pgoutput instance.
Additionally, while the map is adequately cleaned up when shutting down the
output plugin, it is not properly cleared in error paths. Consequently,
subsequent retries may access the outdated map.
To improve it, the patch allocates the map under the memory context specific to
the output plugin and registers a memory context reset callback to clean it.
Note: Instead of completely moving the map variable into the output plugin
private data, we have to keep the static pointer variable for the map to be
accessed by the syscache callbacks. This is because syscache callbacks won't be
un-registered even after shutting down the output plugin, so we need a static
pointer to cache the map pointer so that callbacks can check it.
---
src/backend/replication/pgoutput/pgoutput.c | 43 +++++++++++++++------
1 file changed, 31 insertions(+), 12 deletions(-)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index b08ca55041..b3d515b59f 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -219,7 +219,8 @@ static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
uint32 hashvalue);
-static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
+static void set_schema_sent_in_streamed_txn(PGOutputData *data,
+ RelationSyncEntry *entry,
TransactionId xid);
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
TransactionId xid);
@@ -520,7 +521,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
}
/* Initialize relation schema cache. */
- init_rel_sync_cache(CacheMemoryContext);
+ init_rel_sync_cache(data->cachectx);
}
else
{
@@ -736,7 +737,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
send_relation_and_attrs(relation, xid, ctx, relentry->columns);
if (in_streaming)
- set_schema_sent_in_streamed_txn(relentry, topxid);
+ set_schema_sent_in_streamed_txn((PGOutputData *) ctx->output_plugin_private,
+ relentry, topxid);
else
relentry->schema_sent = true;
}
@@ -1157,7 +1159,7 @@ init_tuple_slot(PGOutputData *data, Relation relation,
TupleDesc outdesc = RelationGetDescr(ancestor);
/* Map must live as long as the session does. */
- oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+ oldctx = MemoryContextSwitchTo(data->cachectx);
entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
@@ -1689,11 +1691,11 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
static void
pgoutput_shutdown(LogicalDecodingContext *ctx)
{
- if (RelationSyncCache)
- {
- hash_destroy(RelationSyncCache);
- RelationSyncCache = NULL;
- }
+ /*
+ * Don't need to destroy the hash table as it will be cleaned along with
+ * logical decoding memory context.
+ */
+ RelationSyncCache = NULL;
}
/*
@@ -1858,6 +1860,15 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
+/*
+ * Memory context reset callback for clearing the RelationSyncCache.
+ */
+static void
+reset_rel_sync_cache(void *arg)
+{
+ RelationSyncCache = NULL;
+}
+
/*
* Initialize the relation schema sync cache for a decoding session.
*
@@ -1869,12 +1880,19 @@ static void
init_rel_sync_cache(MemoryContext cachectx)
{
HASHCTL ctl;
+ MemoryContextCallback *mcallback;
static bool relation_callbacks_registered = false;
/* Nothing to do if hash table already exists */
if (RelationSyncCache != NULL)
return;
+ /* Ensure that the hash table is reset in error paths. */
+ mcallback = MemoryContextAllocZero(cachectx,
+ sizeof(MemoryContextCallback));
+ mcallback->func = reset_rel_sync_cache;
+ MemoryContextRegisterResetCallback(cachectx, mcallback);
+
/* Make a new hash table for the cache */
ctl.keysize = sizeof(Oid);
ctl.entrysize = sizeof(RelationSyncEntry);
@@ -1930,11 +1948,12 @@ get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
* of the relation.
*/
static void
-set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
+set_schema_sent_in_streamed_txn(PGOutputData *data, RelationSyncEntry *entry,
+ TransactionId xid)
{
MemoryContext oldctx;
- oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+ oldctx = MemoryContextSwitchTo(data->cachectx);
entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
@@ -2005,7 +2024,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
/* Reload publications if needed before use. */
if (!publications_valid)
{
- oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+ oldctx = MemoryContextSwitchTo(data->cachectx);
if (data->publications)
{
list_free_deep(data->publications);
--
2.30.0.windows.2
v1-0003-Move-in_streaming-to-output-private-data.patchapplication/octet-stream; name=v1-0003-Move-in_streaming-to-output-private-data.patchDownload
From c4ce199d6f8803ab4695aa44d25142235b2f6aac Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Wed, 13 Sep 2023 20:22:02 +0800
Subject: [PATCH v1 3/3] Move in_streaming to output private data
---
src/backend/replication/pgoutput/pgoutput.c | 34 +++++++++++----------
src/include/replication/pgoutput.h | 2 ++
2 files changed, 20 insertions(+), 16 deletions(-)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 89789c298f..9f076d99c3 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -81,7 +81,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
static bool publications_valid;
-static bool in_streaming;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
@@ -479,9 +478,6 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("streaming requested, but not supported by output plugin")));
- /* Also remember we're currently not streaming any transaction. */
- in_streaming = false;
-
/*
* Here, we just check whether the two-phase option is passed by
* plugin and decide whether to enable it at later point of time. It
@@ -679,6 +675,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
ReorderBufferChange *change,
Relation relation, RelationSyncEntry *relentry)
{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
bool schema_sent;
TransactionId xid = InvalidTransactionId;
TransactionId topxid = InvalidTransactionId;
@@ -691,7 +688,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
* If we're not in a streaming block, just use InvalidTransactionId and
* the write methods will not include it.
*/
- if (in_streaming)
+ if (data->in_streaming)
xid = change->txn->xid;
if (rbtxn_is_subtxn(change->txn))
@@ -711,7 +708,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
* doing that we need to study its impact on the case where we have a mix
* of streaming and non-streaming transactions.
*/
- if (in_streaming)
+ if (data->in_streaming)
schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
else
schema_sent = relentry->schema_sent;
@@ -735,7 +732,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
send_relation_and_attrs(relation, xid, ctx, relentry->columns);
- if (in_streaming)
+ if (data->in_streaming)
set_schema_sent_in_streamed_txn((PGOutputData *) ctx->output_plugin_private,
relentry, topxid);
else
@@ -1422,7 +1419,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* their association and on aborts, it can discard the corresponding
* changes.
*/
- if (in_streaming)
+ if (data->in_streaming)
xid = change->txn->xid;
relentry = get_rel_sync_entry(data, relation);
@@ -1571,7 +1568,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TransactionId xid = InvalidTransactionId;
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
- if (in_streaming)
+ if (data->in_streaming)
xid = change->txn->xid;
old = MemoryContextSwitchTo(data->context);
@@ -1640,7 +1637,7 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* Remember the xid for the message in streaming mode. See
* pgoutput_change.
*/
- if (in_streaming)
+ if (data->in_streaming)
xid = txn->xid;
/*
@@ -1743,10 +1740,11 @@ static void
pgoutput_stream_start(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
/* we can't nest streaming of transactions */
- Assert(!in_streaming);
+ Assert(!data->in_streaming);
/*
* If we already sent the first stream for this transaction then don't
@@ -1764,7 +1762,7 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
/* we're streaming a chunk of transaction now */
- in_streaming = true;
+ data->in_streaming = true;
}
/*
@@ -1774,15 +1772,17 @@ static void
pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
/* we should be streaming a transaction */
- Assert(in_streaming);
+ Assert(data->in_streaming);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_stop(ctx->out);
OutputPluginWrite(ctx, true);
/* we've stopped streaming a transaction */
- in_streaming = false;
+ data->in_streaming = false;
}
/*
@@ -1802,7 +1802,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
* The abort should happen outside streaming block, even for streamed
* transactions. The transaction has to be marked as streamed, though.
*/
- Assert(!in_streaming);
+ Assert(!data->in_streaming);
/* determine the toplevel transaction */
toptxn = rbtxn_get_toptxn(txn);
@@ -1827,11 +1827,13 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
/*
* The commit should happen outside streaming block, even for streamed
* transactions. The transaction has to be marked as streamed, though.
*/
- Assert(!in_streaming);
+ Assert(!data->in_streaming);
Assert(rbtxn_is_streamed(txn));
OutputPluginUpdateProgress(ctx, false);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 6f39e72252..d928010248 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -21,6 +21,8 @@ typedef struct PGOutputData
* allocations */
MemoryContext cachectx; /* private memory context for cache data */
+ bool in_streaming;
+
/* client-supplied info: */
uint32 protocol_version;
List *publication_names;
--
2.30.0.windows.2
v1-0002-Maintain-publish_no_origin-in-output-plugin-priva.patchapplication/octet-stream; name=v1-0002-Maintain-publish_no_origin-in-output-plugin-priva.patchDownload
From 507d225e148e3e1ee642b7bf4e8b326f52aa474c Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Fri, 25 Aug 2023 17:29:35 +0800
Subject: [PATCH v1 2/3] Maintain publish_no_origin in output plugin private
data
The pgoutput module uses a global variable(publish_no_origin) to cache the
action for the origin filter. But we only initialize publish_no_origin when
user specifies the "origin" in the output paramters which means we could refer
to an uninitialized variable if user didn't specify the paramter.
Besides, we don't reset the flag when shutting down the output plugin, so
subsequent retries may access the previous publish_no_origin value.
To improve it, the patch stores the map within the private data of the output
plugin so that it will get initialized and reset along with the output plugin
context.
---
src/backend/replication/pgoutput/pgoutput.c | 9 +++++----
src/include/replication/pgoutput.h | 1 +
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index b3d515b59f..89789c298f 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -82,7 +82,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
static bool publications_valid;
static bool in_streaming;
-static bool publish_no_origin;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
@@ -390,9 +389,9 @@ parse_output_parameters(List *options, PGOutputData *data)
data->origin = defGetString(defel);
if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
- publish_no_origin = true;
+ data->publish_no_origin = true;
else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
- publish_no_origin = false;
+ data->publish_no_origin = false;
else
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -1675,7 +1674,9 @@ static bool
pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id)
{
- if (publish_no_origin && origin_id != InvalidRepOriginId)
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+ if (data->publish_no_origin && origin_id != InvalidRepOriginId)
return true;
return false;
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index b4a8015403..6f39e72252 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -30,6 +30,7 @@ typedef struct PGOutputData
bool messages;
bool two_phase;
char *origin;
+ bool publish_no_origin;
} PGOutputData;
#endif /* PGOUTPUT_H */
--
2.30.0.windows.2
On Tue, Sep 19, 2023 at 04:10:39AM +0000, Zhijie Hou (Fujitsu) wrote:
Currently we have serval global variables in pgoutput, but each of them is
inherently local to an individual pgoutput instance. This could cause issues if
we switch to different output plugin instance in one session and could miss to
reset their value in case of errors. The analysis for each variable is as
follows:
(Moved the last block of the message as per the relationship between
RelationSyncCache and publications_valid).
- static HTAB *RelationSyncCache = NULL;
pgoutput creates this hash table under cacheMemoryContext to remember the
relation schemas that have been sent, but it's local to an individual pgoutput
instance, and because it's under global memory context, the hashtable is not
properly cleared in error paths which means it has a risk of being accessed in
a different output plugin instance. This was also mentioned in another thread[2].So I think we'd better allocate this under output plugin private context.
But note that, instead of completely moving the hash table into the output
plugin private data, we need to to keep the static pointer variable for the map to
be accessed by the syscache callbacks. This is because syscache callbacks won't
be un-registered even after shutting down the output plugin, so we need a
static pointer to cache the map pointer so that callbacks can check it.- static bool publications_valid;
I thought we need to move this to private data as well, but we need to access this in a
syscache callback, which means we need to keep the static variable.
FWIW, I think that keeping publications_valid makes the code kind of
confusing once 0001 is applied, because this makes the handling of the
cached data for relations and publications even more inconsistent than
it is now, with a mixed bag of two different logics caused by the
relationship between the synced relation cache and the publication
cache: RelationSyncCache tracks if relations should be rebuilt, while
publications_valid does it for the publication data, but both are
still static and could be shared by multiple pgoutput contexts. On
top of that, publications_valid is hidden at the top of pgoutput.c
within a bunch of declarations and no comments to explain why it's
here (spoiler: to handle the cache rebuilds with its reset in the
cache callback).
I agree that CacheMemoryContext is not really a good idea to cache the
data only proper to a pgoutput session and that tracking a context in
the output data makes the whole cleanup attractive, but it also seems
to me that we should avoid entirely the use of relcache callbacks if
the intention is to have one RelationSyncEntry per pgoutput. The
patch does something different than HEAD and than having one
RelationSyncEntry per pgoutout: RelationSyncEntry can reference
*everything*, with its data stored in multiple memory contexts as of
one per pgoutput. It looks like RelationSyncEntry should be a list
or a hash table, at least, so as it can refer to multiple pgoutput
states. Knowing that a session can only use one replication slot with
MyReplicationSlot, not sure that's worth bothering with. As a whole,
0001 with its changes for RelationSyncCache don't seem like an
improvement to me.
- static bool publish_no_origin;
This flag is also local to pgoutput instance, and we didn't reset the flag in
output shutdown callback, so if we consume changes from different slots, then
the second call would reuse the flag value that is set in the first call which
is unexpected. To completely avoid this issue, we think we'd better move this
flag to output plugin private data structure.
Yep, that's incorrect.
- static bool in_streaming;
While on it, I feel we can also move this flag to private data, although I didn't
see problems for this one.
Moving this one to the private state data makes sense to me, as it
tracks the streaming of one PGOutputData.
Note that we name twice RelSchemaSyncCache in the code, but it does
not exist..
--
Michael
Hi Hou-san.
Given there are some issues raised about the 0001 patch [1]/messages/by-id/ZQk1Ca_eFDTmBiZy@paquier.xyz I am
skipping that one until I see the replies.
Meanwhile, here are some review comments for the patches v1-0002 and v1-0003
////////////////////
v1-0002
======
Commit message
1.
The pgoutput module uses a global variable(publish_no_origin) to cache the
action for the origin filter. But we only initialize publish_no_origin when
user specifies the "origin" in the output paramters which means we could refer
to an uninitialized variable if user didn't specify the paramter.
~
1a.
typos
/variable(publish_no_origin)/variable (publish_no_origin)/
/paramters/parameters/
/paramter./paramter./
~
1b.
"...we could refer to an uninitialized variable"
I'm not sure what this means. Previously it was static, so it wouldn't
be "uninitialised"; it would be false. Perhaps there might be a stale
value from a previous pgoutput, but IIUC that's the point made by your
next paragraph ("Besides, we don't...")
~~~
2.
To improve it, the patch stores the map within the private data of the output
plugin so that it will get initialized and reset along with the output plugin
context.
2a.
/To improve it,/To fix this/
~
2b.
"stores the map"
What map? This might be a cut/paste error from the v1-0001 patch comment.
////////////////////
v1-0003
======
Commit message
1.
Missing patch comment.
======
src/backend/replication/pgoutput/pgoutput.c
2. maybe_send_schema
- if (in_streaming)
+ if (data->in_streaming)
set_schema_sent_in_streamed_txn((PGOutputData *) ctx->output_plugin_private,
relentry, topxid);
~
Since you added a new 'data' variable, you might as well make use of
it here instead of doing "(PGOutputData *) ctx->output_plugin_private"
again.
======
src/include/replication/pgoutput.h
3.
MemoryContext cachectx; /* private memory context for cache data */
+ bool in_streaming;
+
Even though there was no comment previously when this was static, IMO
it is better to comment on all the structure fields where possible.
------
[1]: /messages/by-id/ZQk1Ca_eFDTmBiZy@paquier.xyz
Kind Regards,
Peter Smith.
Fujitsu Australia
On Tue, Sep 19, 2023 at 12:48 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:
- static bool publish_no_origin;
This flag is also local to pgoutput instance, and we didn't reset the flag in
output shutdown callback, so if we consume changes from different slots, then
the second call would reuse the flag value that is set in the first call which
is unexpected. To completely avoid this issue, we think we'd better move this
flag to output plugin private data structure.Example:
SELECT data FROM pg_logical_slot_peek_binary_changes('isolation_slot_1', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub', 'origin', 'none'); --- Set origin in this call.
SELECT data FROM pg_logical_slot_peek_binary_changes('isolation_slot_2', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub'); -- Didn't set origin, but will reuse the origin flag in the first call.
char *origin;
+ bool publish_no_origin;
} PGOutputData;
Do we really need a new parameter in above structure? Can't we just
use the existing origin in the same structure? Please remember if this
needs to be backpatched then it may not be good idea to add new
parameter in the structure but apart from that having two members to
represent similar information doesn't seem advisable to me. I feel for
backbranch we can just use PGOutputData->origin for comparison and for
HEAD, we can remove origin and just use a boolean to avoid any extra
cost for comparisions for each change.
Can we add a test case to cover this case?
--
With Regards,
Amit Kapila.
On Tuesday, September 19, 2023 1:44 PM Michael Paquier <michael@paquier.xyz> wrote:
On Tue, Sep 19, 2023 at 04:10:39AM +0000, Zhijie Hou (Fujitsu) wrote:
Currently we have serval global variables in pgoutput, but each of
them is inherently local to an individual pgoutput instance. This
could cause issues if we switch to different output plugin instance in
one session and could miss to reset their value in case of errors. The
analysis for each variable is as
follows:(Moved the last block of the message as per the relationship between
RelationSyncCache and publications_valid).- static HTAB *RelationSyncCache = NULL;
pgoutput creates this hash table under cacheMemoryContext to remember
the relation schemas that have been sent, but it's local to an
individual pgoutput instance, and because it's under global memory
context, the hashtable is not properly cleared in error paths which
means it has a risk of being accessed in a different output plugin instance.This was also mentioned in another thread[2].
So I think we'd better allocate this under output plugin private context.
But note that, instead of completely moving the hash table into the
output plugin private data, we need to to keep the static pointer
variable for the map to be accessed by the syscache callbacks. This is
because syscache callbacks won't be un-registered even after shutting
down the output plugin, so we need a static pointer to cache the map pointerso that callbacks can check it.
- static bool publications_valid;
I thought we need to move this to private data as well, but we need to
access this in a syscache callback, which means we need to keep the staticvariable.
FWIW, I think that keeping publications_valid makes the code kind of confusing
once 0001 is applied, because this makes the handling of the cached data for
relations and publications even more inconsistent than it is now, with a mixed
bag of two different logics caused by the relationship between the synced
relation cache and the publication
cache: RelationSyncCache tracks if relations should be rebuilt, while
publications_valid does it for the publication data, but both are still static and
could be shared by multiple pgoutput contexts. On top of that,
publications_valid is hidden at the top of pgoutput.c within a bunch of
declarations and no comments to explain why it's here (spoiler: to handle the
cache rebuilds with its reset in the cache callback).I agree that CacheMemoryContext is not really a good idea to cache the data
only proper to a pgoutput session and that tracking a context in the output
data makes the whole cleanup attractive, but it also seems to me that we
should avoid entirely the use of relcache callbacks if the intention is to have one
RelationSyncEntry per pgoutput. The patch does something different than
HEAD and than having one RelationSyncEntry per pgoutout: RelationSyncEntry
can reference *everything*, with its data stored in multiple memory contexts as
of one per pgoutput. It looks like RelationSyncEntry should be a list or a hash
table, at least, so as it can refer to multiple pgoutput states. Knowing that a
session can only use one replication slot with MyReplicationSlot, not sure that's
worth bothering with. As a whole,
0001 with its changes for RelationSyncCache don't seem like an improvement
to me.
Thanks for your comments. Currently, I am not sure how to avoid the use of the
syscache callback functions, So I think the change for RelationSyncCache needs
more thought and I will retry later if I find another way.
Best Regards,
Hou zj
On Tuesday, September 26, 2023 4:40 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Tue, Sep 19, 2023 at 12:48 PM Zhijie Hou (Fujitsu) <houzj.fnst@fujitsu.com>
wrote:- static bool publish_no_origin;
This flag is also local to pgoutput instance, and we didn't reset the
flag in output shutdown callback, so if we consume changes from
different slots, then the second call would reuse the flag value that
is set in the first call which is unexpected. To completely avoid this
issue, we think we'd better move this flag to output plugin private datastructure.
Example:
SELECT data FROM pg_logical_slot_peek_binary_changes('isolation_slot_1',NULL, NULL, 'proto_version', '1', 'publication_names', 'pub', 'origin', 'none'); ---
Set origin in this call.SELECT data FROM pg_logical_slot_peek_binary_changes('isolation_slot_2',
NULL, NULL, 'proto_version', '1', 'publication_names', 'pub'); -- Didn't set
origin, but will reuse the origin flag in the first call.char *origin;
+ bool publish_no_origin;
} PGOutputData;Do we really need a new parameter in above structure? Can't we just use the
existing origin in the same structure? Please remember if this needs to be
backpatched then it may not be good idea to add new parameter in the
structure but apart from that having two members to represent similar
information doesn't seem advisable to me. I feel for backbranch we can just use
PGOutputData->origin for comparison and for HEAD, we can remove origin
and just use a boolean to avoid any extra cost for comparisions for each
change.
OK, I agree to remove the origin string on HEAD and we can add that back
when we support other origin value. I also modified to use the string for comparison
as suggested for back-branch. I will also test it locally to confirm it doesn't affect
the perf.
Can we add a test case to cover this case?
Added one in replorigin.sql.
Attach the patch set for publish_no_origin and in_streaming including the
patch(v2-PG16-0001) for back-branches. Since the patch for hash table need
more thoughts, I didn't post it this time.
Best Regards,
Hou zj
Attachments:
v2-0001-Maintain-publish_no_origin-in-output-plugin-priv.patchapplication/octet-stream; name=v2-0001-Maintain-publish_no_origin-in-output-plugin-priv.patchDownload
From 952875bd737dfc652d7c1f63247cacfcc4e5ec16 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Fri, 25 Aug 2023 17:29:35 +0800
Subject: [PATCH v22] Maintain publish_no_origin in output plugin private data
The pgoutput module uses a global variable (publish_no_origin) to cache the
action for the origin filter, but we didn't reset the flag when shutting down
the output plugin, so subsequent retries may access the previous
publish_no_origin value.
The patch stores the flag as part of the output plugin's private data. This
ensures its initialization and reset in line with the output plugin context.
Additionally, the patch removes the currently unused origin string from the
structure.
While for back-branches, to avoid the introduction of new structure field, the
patch eliminates the global variable and instead directly use the origin string
for change filtering.
---
contrib/test_decoding/expected/replorigin.out | 56 +++++++++++++++++++
contrib/test_decoding/sql/replorigin.sql | 22 ++++++++
src/backend/replication/pgoutput/pgoutput.c | 19 ++++---
src/include/replication/pgoutput.h | 2 +-
4 files changed, 90 insertions(+), 9 deletions(-)
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index 49ffaeea2d..c85e1a01b2 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -267,3 +267,59 @@ SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn
(1 row)
+-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
+ ?column?
+----------
+ init
+(1 row)
+
+CREATE PUBLICATION pub FOR TABLE target_tbl;
+SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
+ pg_replication_origin_create
+------------------------------
+ 1
+(1 row)
+
+-- mark session as replaying
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
+ pg_replication_origin_session_setup
+-------------------------------------
+
+(1 row)
+
+INSERT INTO target_tbl(data) VALUES ('test data');
+-- The replayed change will be filtered.
+SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
+ ?column?
+----------
+ t
+(1 row)
+
+-- The replayed change will be output if the origin value is not specified.
+SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
+ ?column?
+----------
+ t
+(1 row)
+
+-- Clean up
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset
+-------------------------------------
+
+(1 row)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
+ pg_replication_origin_drop
+----------------------------
+
+(1 row)
+
+DROP PUBLICATION pub;
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index db06541f56..e71ee02d05 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -124,3 +124,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NUL
SELECT pg_replication_origin_session_reset();
SELECT pg_drop_replication_slot('regression_slot_no_lsn');
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn');
+
+-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
+CREATE PUBLICATION pub FOR TABLE target_tbl;
+SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
+
+-- mark session as replaying
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
+
+INSERT INTO target_tbl(data) VALUES ('test data');
+
+-- The replayed change will be filtered.
+SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
+
+-- The replayed change will be output if the origin value is not specified.
+SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
+
+-- Clean up
+SELECT pg_replication_origin_session_reset();
+SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
+DROP PUBLICATION pub;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 3d2becb45c..251ba46da5 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -82,7 +82,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
static bool publications_valid;
static bool in_streaming;
-static bool publish_no_origin;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
@@ -381,21 +380,23 @@ parse_output_parameters(List *options, PGOutputData *data)
}
else if (strcmp(defel->defname, "origin") == 0)
{
+ char *origin;
+
if (origin_option_given)
ereport(ERROR,
errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"));
origin_option_given = true;
- data->origin = defGetString(defel);
- if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
- publish_no_origin = true;
- else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
- publish_no_origin = false;
+ origin = defGetString(defel);
+ if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
+ data->publish_no_origin = true;
+ else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
+ data->publish_no_origin = false;
else
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("unrecognized origin value: \"%s\"", data->origin));
+ errmsg("unrecognized origin value: \"%s\"", origin));
}
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
@@ -1673,7 +1674,9 @@ static bool
pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id)
{
- if (publish_no_origin && origin_id != InvalidRepOriginId)
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+ if (data->publish_no_origin && origin_id != InvalidRepOriginId)
return true;
return false;
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index b4a8015403..b3f9a01629 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -29,7 +29,7 @@ typedef struct PGOutputData
char streaming;
bool messages;
bool two_phase;
- char *origin;
+ bool publish_no_origin;
} PGOutputData;
#endif /* PGOUTPUT_H */
--
2.30.0.windows.2
v2-PG16-0001-Maintain-publish_no_origin-in-output-plugin-priva.patchapplication/octet-stream; name=v2-PG16-0001-Maintain-publish_no_origin-in-output-plugin-priva.patchDownload
From 41b46f2cef97188ef999f94cf618285ea6d59d72 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Fri, 25 Aug 2023 17:29:35 +0800
Subject: [PATCH v2] Maintain publish_no_origin in output plugin private data
The pgoutput module uses a global variable (publish_no_origin) to cache the
action for the origin filter, but we didn't reset the flag when shutting down
the output plugin, so subsequent retries may access the previous
publish_no_origin value.
The patch stores the flag as part of the output plugin's private data. This
ensures its initialization and reset in line with the output plugin context.
Additionally, the patch removes the currently unused origin string from the
structure.
While for back-branches, to avoid the introduction of new structure field, the
patch eliminates the global variable and instead directly use the origin string
for change filtering.
---
contrib/test_decoding/expected/replorigin.out | 56 +++++++++++++++++++
contrib/test_decoding/sql/replorigin.sql | 22 ++++++++
src/backend/replication/pgoutput/pgoutput.c | 14 ++---
3 files changed, 85 insertions(+), 7 deletions(-)
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index 49ffaeea2d..c85e1a01b2 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -267,3 +267,59 @@ SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn
(1 row)
+-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
+ ?column?
+----------
+ init
+(1 row)
+
+CREATE PUBLICATION pub FOR TABLE target_tbl;
+SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
+ pg_replication_origin_create
+------------------------------
+ 1
+(1 row)
+
+-- mark session as replaying
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
+ pg_replication_origin_session_setup
+-------------------------------------
+
+(1 row)
+
+INSERT INTO target_tbl(data) VALUES ('test data');
+-- The replayed change will be filtered.
+SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
+ ?column?
+----------
+ t
+(1 row)
+
+-- The replayed change will be output if the origin value is not specified.
+SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
+ ?column?
+----------
+ t
+(1 row)
+
+-- Clean up
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset
+-------------------------------------
+
+(1 row)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
+ pg_replication_origin_drop
+----------------------------
+
+(1 row)
+
+DROP PUBLICATION pub;
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index db06541f56..e71ee02d05 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -124,3 +124,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NUL
SELECT pg_replication_origin_session_reset();
SELECT pg_drop_replication_slot('regression_slot_no_lsn');
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn');
+
+-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
+CREATE PUBLICATION pub FOR TABLE target_tbl;
+SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
+
+-- mark session as replaying
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
+
+INSERT INTO target_tbl(data) VALUES ('test data');
+
+-- The replayed change will be filtered.
+SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
+
+-- The replayed change will be output if the origin value is not specified.
+SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
+
+-- Clean up
+SELECT pg_replication_origin_session_reset();
+SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
+DROP PUBLICATION pub;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index b08ca55041..8caf75d4c8 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -82,7 +82,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
static bool publications_valid;
static bool in_streaming;
-static bool publish_no_origin;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
@@ -388,11 +387,9 @@ parse_output_parameters(List *options, PGOutputData *data)
origin_option_given = true;
data->origin = defGetString(defel);
- if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
- publish_no_origin = true;
- else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
- publish_no_origin = false;
- else
+
+ if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) != 0 &&
+ pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) != 0)
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("unrecognized origin value: \"%s\"", data->origin));
@@ -1673,7 +1670,10 @@ static bool
pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id)
{
- if (publish_no_origin && origin_id != InvalidRepOriginId)
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+ if (data->origin && (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0) &&
+ origin_id != InvalidRepOriginId)
return true;
return false;
--
2.30.0.windows.2
v2-0002-Move-in_streaming-to-output-private-data.patchapplication/octet-stream; name=v2-0002-Move-in_streaming-to-output-private-data.patchDownload
From c276b302c33f97660e6facc813c6f7c5a185260a Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Tue, 26 Sep 2023 19:31:42 +0800
Subject: [PATCH v2 2/2] Move in_streaming to output private data
The in_streaming flag tracks the streaming of one pgoutput instance,
so it would be better to stores it as part of the output plugin's private
data.
---
src/backend/replication/pgoutput/pgoutput.c | 34 +++++++++++----------
src/include/replication/pgoutput.h | 3 ++
2 files changed, 21 insertions(+), 16 deletions(-)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 98a3900da8..cf54c61455 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -81,7 +81,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
static bool publications_valid;
-static bool in_streaming;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
@@ -476,9 +475,6 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("streaming requested, but not supported by output plugin")));
- /* Also remember we're currently not streaming any transaction. */
- in_streaming = false;
-
/*
* Here, we just check whether the two-phase option is passed by
* plugin and decide whether to enable it at later point of time. It
@@ -676,6 +672,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
ReorderBufferChange *change,
Relation relation, RelationSyncEntry *relentry)
{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
bool schema_sent;
TransactionId xid = InvalidTransactionId;
TransactionId topxid = InvalidTransactionId;
@@ -688,7 +685,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
* If we're not in a streaming block, just use InvalidTransactionId and
* the write methods will not include it.
*/
- if (in_streaming)
+ if (data->in_streaming)
xid = change->txn->xid;
if (rbtxn_is_subtxn(change->txn))
@@ -708,7 +705,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
* doing that we need to study its impact on the case where we have a mix
* of streaming and non-streaming transactions.
*/
- if (in_streaming)
+ if (data->in_streaming)
schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
else
schema_sent = relentry->schema_sent;
@@ -732,7 +729,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
send_relation_and_attrs(relation, xid, ctx, relentry->columns);
- if (in_streaming)
+ if (data->in_streaming)
set_schema_sent_in_streamed_txn(relentry, topxid);
else
relentry->schema_sent = true;
@@ -1418,7 +1415,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* their association and on aborts, it can discard the corresponding
* changes.
*/
- if (in_streaming)
+ if (data->in_streaming)
xid = change->txn->xid;
relentry = get_rel_sync_entry(data, relation);
@@ -1567,7 +1564,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TransactionId xid = InvalidTransactionId;
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
- if (in_streaming)
+ if (data->in_streaming)
xid = change->txn->xid;
old = MemoryContextSwitchTo(data->context);
@@ -1636,7 +1633,7 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* Remember the xid for the message in streaming mode. See
* pgoutput_change.
*/
- if (in_streaming)
+ if (data->in_streaming)
xid = txn->xid;
/*
@@ -1740,10 +1737,11 @@ static void
pgoutput_stream_start(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
/* we can't nest streaming of transactions */
- Assert(!in_streaming);
+ Assert(!data->in_streaming);
/*
* If we already sent the first stream for this transaction then don't
@@ -1761,7 +1759,7 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
/* we're streaming a chunk of transaction now */
- in_streaming = true;
+ data->in_streaming = true;
}
/*
@@ -1771,15 +1769,17 @@ static void
pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
/* we should be streaming a transaction */
- Assert(in_streaming);
+ Assert(data->in_streaming);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_stop(ctx->out);
OutputPluginWrite(ctx, true);
/* we've stopped streaming a transaction */
- in_streaming = false;
+ data->in_streaming = false;
}
/*
@@ -1799,7 +1799,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
* The abort should happen outside streaming block, even for streamed
* transactions. The transaction has to be marked as streamed, though.
*/
- Assert(!in_streaming);
+ Assert(!data->in_streaming);
/* determine the toplevel transaction */
toptxn = rbtxn_get_toptxn(txn);
@@ -1824,11 +1824,13 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
+ PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
+
/*
* The commit should happen outside streaming block, even for streamed
* transactions. The transaction has to be marked as streamed, though.
*/
- Assert(!in_streaming);
+ Assert(!data->in_streaming);
Assert(rbtxn_is_streamed(txn));
OutputPluginUpdateProgress(ctx, false);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index b4a8015403..f3ba24949d 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -21,6 +21,9 @@ typedef struct PGOutputData
* allocations */
MemoryContext cachectx; /* private memory context for cache data */
+ bool in_streaming; /* true if we are streaming a chunk of
+ * transaction */
+
/* client-supplied info: */
uint32 protocol_version;
List *publication_names;
--
2.30.0.windows.2
On Tue, Sep 26, 2023 at 01:55:10PM +0000, Zhijie Hou (Fujitsu) wrote:
On Tuesday, September 26, 2023 4:40 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
Do we really need a new parameter in above structure? Can't we just use the
existing origin in the same structure? Please remember if this needs to be
backpatched then it may not be good idea to add new parameter in the
structure but apart from that having two members to represent similar
information doesn't seem advisable to me. I feel for backbranch we can just use
PGOutputData->origin for comparison and for HEAD, we can remove origin
and just use a boolean to avoid any extra cost for comparisions for each
change.OK, I agree to remove the origin string on HEAD and we can add that back
when we support other origin value. I also modified to use the string for comparison
as suggested for back-branch. I will also test it locally to confirm it doesn't affect
the perf.
Err, actually, I am going to disagree here for the patch of HEAD. It
seems to me that there is zero need for pgoutput.h and we don't need
to show PGOutputData to the world. The structure is internal to
pgoutput.c and used only by its internal static routines.
Doing a codesearch in the Debian repos or just github shows that it is
used nowhere else, as well, something not really surprising as the
structure is filled and maintained in the file.
--
Michael
On Wed, Sep 27, 2023 at 9:10 AM Michael Paquier <michael@paquier.xyz> wrote:
On Tue, Sep 26, 2023 at 01:55:10PM +0000, Zhijie Hou (Fujitsu) wrote:
On Tuesday, September 26, 2023 4:40 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
Do we really need a new parameter in above structure? Can't we just use the
existing origin in the same structure? Please remember if this needs to be
backpatched then it may not be good idea to add new parameter in the
structure but apart from that having two members to represent similar
information doesn't seem advisable to me. I feel for backbranch we can just use
PGOutputData->origin for comparison and for HEAD, we can remove origin
and just use a boolean to avoid any extra cost for comparisions for each
change.OK, I agree to remove the origin string on HEAD and we can add that back
when we support other origin value. I also modified to use the string for comparison
as suggested for back-branch. I will also test it locally to confirm it doesn't affect
the perf.Err, actually, I am going to disagree here for the patch of HEAD. It
seems to me that there is zero need for pgoutput.h and we don't need
to show PGOutputData to the world. The structure is internal to
pgoutput.c and used only by its internal static routines.
Do you disagree with the approach for the PG16 patch or HEAD? You
mentioned HEAD but your argument sounds like you disagree with a
different approach for PG16.
--
With Regards,
Amit Kapila.
On Wed, Sep 27, 2023 at 09:39:19AM +0530, Amit Kapila wrote:
On Wed, Sep 27, 2023 at 9:10 AM Michael Paquier <michael@paquier.xyz> wrote:
Err, actually, I am going to disagree here for the patch of HEAD. It
seems to me that there is zero need for pgoutput.h and we don't need
to show PGOutputData to the world. The structure is internal to
Pgoutput.c and used only by its internal static routines.Do you disagree with the approach for the PG16 patch or HEAD? You
mentioned HEAD but your argument sounds like you disagree with a
different approach for PG16.
Only HEAD where the structure should be moved from pgoutput.h to
pgoutput.c, IMO. The proposed patch for PG16 is OK as the size of the
structure should not change in a branch already released.
--
Michael
On Wed, Sep 27, 2023 at 9:46 AM Michael Paquier <michael@paquier.xyz> wrote:
On Wed, Sep 27, 2023 at 09:39:19AM +0530, Amit Kapila wrote:
On Wed, Sep 27, 2023 at 9:10 AM Michael Paquier <michael@paquier.xyz> wrote:
Err, actually, I am going to disagree here for the patch of HEAD. It
seems to me that there is zero need for pgoutput.h and we don't need
to show PGOutputData to the world. The structure is internal to
Pgoutput.c and used only by its internal static routines.Do you disagree with the approach for the PG16 patch or HEAD? You
mentioned HEAD but your argument sounds like you disagree with a
different approach for PG16.Only HEAD where the structure should be moved from pgoutput.h to
pgoutput.c, IMO.
It's like that from the beginning. Now, even if we want to move, your
suggestion is not directly related to this patch as we are just
changing one field, and that too to fix a bug. We should start a
separate thread to gather a broader consensus if we want to move the
exposed structure to an internal file.
--
With Regards,
Amit Kapila.
On Wednesday, September 27, 2023 12:45 PM Amit Kapila <amit.kapila16@gmail.com>
On Wed, Sep 27, 2023 at 9:46 AM Michael Paquier <michael@paquier.xyz>
wrote:On Wed, Sep 27, 2023 at 09:39:19AM +0530, Amit Kapila wrote:
On Wed, Sep 27, 2023 at 9:10 AM Michael Paquier <michael@paquier.xyz>
wrote:
Err, actually, I am going to disagree here for the patch of HEAD.
It seems to me that there is zero need for pgoutput.h and we don't
need to show PGOutputData to the world. The structure is internal
to Pgoutput.c and used only by its internal static routines.Do you disagree with the approach for the PG16 patch or HEAD? You
mentioned HEAD but your argument sounds like you disagree with a
different approach for PG16.Only HEAD where the structure should be moved from pgoutput.h to
pgoutput.c, IMO.It's like that from the beginning. Now, even if we want to move, your
suggestion is not directly related to this patch as we are just changing one field,
and that too to fix a bug. We should start a separate thread to gather a broader
consensus if we want to move the exposed structure to an internal file.
While searching the code, I noticed one postgres fork where the PGoutputData is
used in other places, although it's a separate fork, but it seems better to
discuss the removal separately.
Best Regards,
Hou zj
On Wed, Sep 27, 2023 at 10:15:24AM +0530, Amit Kapila wrote:
It's like that from the beginning. Now, even if we want to move, your
suggestion is not directly related to this patch as we are just
changing one field, and that too to fix a bug. We should start a
separate thread to gather a broader consensus if we want to move the
exposed structure to an internal file.
As you wish. You are planning to take care of the patches 0001 and
0002 posted on this thread, I guess?
--
Michael
On Wed, Sep 27, 2023 at 10:26 AM Michael Paquier <michael@paquier.xyz> wrote:
On Wed, Sep 27, 2023 at 10:15:24AM +0530, Amit Kapila wrote:
It's like that from the beginning. Now, even if we want to move, your
suggestion is not directly related to this patch as we are just
changing one field, and that too to fix a bug. We should start a
separate thread to gather a broader consensus if we want to move the
exposed structure to an internal file.As you wish.
Thanks.
You are planning to take care of the patches 0001 and
0002 posted on this thread, I guess?
I have tested and reviewed
v2-0001-Maintain-publish_no_origin-in-output-plugin-priv and
v2-PG16-0001-Maintain-publish_no_origin-in-output-plugin-priva patches
posted in the email [1]/messages/by-id/OS0PR01MB57164B085332DB677DBFA8E994C3A@OS0PR01MB5716.jpnprd01.prod.outlook.com. I'll push those unless there are more
comments on them. I have briefly looked at
v2-0002-Move-in_streaming-to-output-private-data in the same email [1]/messages/by-id/OS0PR01MB57164B085332DB677DBFA8E994C3A@OS0PR01MB5716.jpnprd01.prod.outlook.com
but didn't think about it in detail (like whether there is any live
bug that can be fixed or is just an improvement). If you wanted to
look and commit v2-0002-Move-in_streaming-to-output-private-data, I am
fine with that?
[1]: /messages/by-id/OS0PR01MB57164B085332DB677DBFA8E994C3A@OS0PR01MB5716.jpnprd01.prod.outlook.com
--
With Regards,
Amit Kapila.
On Wed, Sep 27, 2023 at 04:51:29AM +0000, Zhijie Hou (Fujitsu) wrote:
While searching the code, I noticed one postgres fork where the PGoutputData is
used in other places, although it's a separate fork, but it seems better to
discuss the removal separately.
Indeed. Interesting.
--
Michael
On Wed, Sep 27, 2023 at 10:51:52AM +0530, Amit Kapila wrote:
I have briefly looked at
v2-0002-Move-in_streaming-to-output-private-data in the same email [1]
but didn't think about it in detail (like whether there is any live
bug that can be fixed or is just an improvement).
This looks like an improvement to me, as at the startup of a stream
the flag is forcibly reset to a false state. So, you cannot really
reach a state where a second stream could be started within the same
session but with a flag incorrectly set to true. Tracking that in the
state data of pgoutput is cleaner, definitely.
If you wanted to
look and commit v2-0002-Move-in_streaming-to-output-private-data, I am
fine with that?
Sure. I found the concept behind 0002 sound. Feel free to go ahead
with 0001, and I can always look at the second. Always happy to help.
--
Michael
On Wed, Sep 27, 2023 at 04:57:06PM +0900, Michael Paquier wrote:
Sure. I found the concept behind 0002 sound. Feel free to go ahead
with 0001, and I can always look at the second. Always happy to help.
For the sake of the archives:
- Amit has applied 0001 down to 16 as of 54ccfd65868c.
- I've applied 0002 after on HEAD as of 9210afd3bcd6.
--
Michael