AlterSubscription_refresh "wrconn" wrong variable?
While reviewing some logical replication code I stumbled across a
variable usage that looks suspicious to me.
Note that the AlterSubscription_refresh function (unlike other
functions in the subscriptioncmds.c) is using the global variable
"wrconn" instead of a local stack variable of the same name. I was
unable to think of any good reason why it would be deliberately doing
this, so my guess is that it is simply an accidental mistake that has
gone unnoticed because the compiler was silently equally happy just
using the global var.
Apparently, this is not causing any reported problems because it seems
like the code has been this way for ~4 years [1]https://github.com/postgres/postgres/commit/7c4f52409a8c7d85ed169bbbc1f6092274d03920#.
Even so, it doesn't look intentional to me and I felt that there may
be unknown consequences (e.g. resource leakage?) of just blatting over
the global var. So, PSA a small patch to make this
AlterSubscription_refresh function use a stack variable consistent
with the other nearby functions.
Thoughts?
------
[1]: https://github.com/postgres/postgres/commit/7c4f52409a8c7d85ed169bbbc1f6092274d03920#
Kind Regards,
Peter Smith.
Fujitsu Australia
Attachments:
v1-0001-Fix-wrconn.-Use-stack-variable.patchapplication/octet-stream; name=v1-0001-Fix-wrconn.-Use-stack-variable.patchDownload
From 70ab0e6ddc7514af58bf92ca47ea37b02eec8a04 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Fri, 30 Apr 2021 12:12:28 +1000
Subject: [PATCH v1] Fix wrconn. Use stack variable.
Unlike the other function in the subscriptioncmds.c, the AlterSubscription_refresh was accidentally using
the global variable "wrconn" instead of a local stack variable.
Confusion probably due to same named local/global variables.
---
src/backend/commands/subscriptioncmds.c | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index ec5c409..668cecd 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -601,18 +601,19 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
char state;
} SubRemoveRels;
SubRemoveRels *sub_remove_rels;
+ WalReceiverConn *wrconn;
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
+ /* Try to connect to the publisher. */
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ (errmsg("could not connect to the publisher: %s", err)));
+
PG_TRY();
{
- /* Try to connect to the publisher. */
- wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
- if (!wrconn)
- ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
-
/* Get the table list from publisher. */
pubrel_names = fetch_table_list(wrconn, sub->publications);
@@ -782,8 +783,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
}
PG_FINALLY();
{
- if (wrconn)
- walrcv_disconnect(wrconn);
+ walrcv_disconnect(wrconn);
}
PG_END_TRY();
--
1.8.3.1
On Tue, May 4, 2021 at 5:00 AM Peter Smith <smithpb2250@gmail.com> wrote:
While reviewing some logical replication code I stumbled across a
variable usage that looks suspicious to me.Note that the AlterSubscription_refresh function (unlike other
functions in the subscriptioncmds.c) is using the global variable
"wrconn" instead of a local stack variable of the same name. I was
unable to think of any good reason why it would be deliberately doing
this, so my guess is that it is simply an accidental mistake that has
gone unnoticed because the compiler was silently equally happy just
using the global var.Apparently, this is not causing any reported problems because it seems
like the code has been this way for ~4 years [1].Even so, it doesn't look intentional to me and I felt that there may
be unknown consequences (e.g. resource leakage?) of just blatting over
the global var. So, PSA a small patch to make this
AlterSubscription_refresh function use a stack variable consistent
with the other nearby functions.Thoughts?
+1. It looks like the global variable wrconn defined/declared in
worker_internal.h/worker.c, is for logical apply/table sync worker and
it doesn't make sense to use it for CREATE/ALTER subscription refresh
code that runs on a backend. And I couldn't think of any unknown
consequences/resource leakage, because that global variable is being
used by different processes which have their own copy.
And, the patch basically looks good to me, except a bit of rewording
the commit message to something like "Use local variable wrconn in
AlterSubscription_refresh instead of global a variable with the same
name which is meant to be used for logical apply/table sync worker.
Having the wrconn global variable in AlterSubscription_refresh doesn't
cause any real issue as such but it keeps the code in
subscriptioncmds.c inconsistent with other functions which use a local
variable named wrconn." or some other better wording?
Regression tests were passed on my dev system with the patch.
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Hi,
On 2021-05-04 09:29:42 +1000, Peter Smith wrote:
While reviewing some logical replication code I stumbled across a
variable usage that looks suspicious to me.
Note that the AlterSubscription_refresh function (unlike other
functions in the subscriptioncmds.c) is using the global variable
"wrconn" instead of a local stack variable of the same name. I was
unable to think of any good reason why it would be deliberately doing
this, so my guess is that it is simply an accidental mistake that has
gone unnoticed because the compiler was silently equally happy just
using the global var.
Apparently, this is not causing any reported problems because it seems
like the code has been this way for ~4 years [1].
This sounded vaguely familiar. After a bit of searching I found that's
because I debugged a crash related to it:
/messages/by-id/20201111215820.qihhrz7fayu6myfi@alap3.anarazel.de
Peter?
Greetings,
Andres Freund
On Tue, May 4, 2021 at 1:56 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:
On Tue, May 4, 2021 at 5:00 AM Peter Smith <smithpb2250@gmail.com> wrote:
While reviewing some logical replication code I stumbled across a
variable usage that looks suspicious to me.Note that the AlterSubscription_refresh function (unlike other
functions in the subscriptioncmds.c) is using the global variable
"wrconn" instead of a local stack variable of the same name. I was
unable to think of any good reason why it would be deliberately doing
this, so my guess is that it is simply an accidental mistake that has
gone unnoticed because the compiler was silently equally happy just
using the global var.Apparently, this is not causing any reported problems because it seems
like the code has been this way for ~4 years [1].Even so, it doesn't look intentional to me and I felt that there may
be unknown consequences (e.g. resource leakage?) of just blatting over
the global var. So, PSA a small patch to make this
AlterSubscription_refresh function use a stack variable consistent
with the other nearby functions.Thoughts?
+1. It looks like the global variable wrconn defined/declared in
worker_internal.h/worker.c, is for logical apply/table sync worker and
it doesn't make sense to use it for CREATE/ALTER subscription refresh
code that runs on a backend. And I couldn't think of any unknown
consequences/resource leakage, because that global variable is being
used by different processes which have their own copy.And, the patch basically looks good to me, except a bit of rewording
the commit message to something like "Use local variable wrconn in
AlterSubscription_refresh instead of global a variable with the same
name which is meant to be used for logical apply/table sync worker.
Having the wrconn global variable in AlterSubscription_refresh doesn't
cause any real issue as such but it keeps the code in
subscriptioncmds.c inconsistent with other functions which use a local
variable named wrconn." or some other better wording?Regression tests were passed on my dev system with the patch.
Thanks for your feedback.
I can post another patch (or same patch with an improved commit
comment) later, but I will just wait a day first in case there is more
information to say about it. e.g. my suspicion that there would be
"consequences" seems to have come to fruition after all [1]/messages/by-id/20210504043149.vg4w66vuh4qjrbph@alap3.anarazel.de although I
never would have thought of that tricky trigger / refresh scenario.
------
[1]: /messages/by-id/20210504043149.vg4w66vuh4qjrbph@alap3.anarazel.de
Kind Regards,
Peter Smith.
Fujitsu Australia
On Tue, May 4, 2021 at 2:31 PM Andres Freund <andres@anarazel.de> wrote:
Hi,
On 2021-05-04 09:29:42 +1000, Peter Smith wrote:
While reviewing some logical replication code I stumbled across a
variable usage that looks suspicious to me.Note that the AlterSubscription_refresh function (unlike other
functions in the subscriptioncmds.c) is using the global variable
"wrconn" instead of a local stack variable of the same name. I was
unable to think of any good reason why it would be deliberately doing
this, so my guess is that it is simply an accidental mistake that has
gone unnoticed because the compiler was silently equally happy just
using the global var.Apparently, this is not causing any reported problems because it seems
like the code has been this way for ~4 years [1].This sounded vaguely familiar. After a bit of searching I found that's
because I debugged a crash related to it:
/messages/by-id/20201111215820.qihhrz7fayu6myfi@alap3.anarazel.de
Oh! No wonder it sounded familiar.
It looks like I've just re-discovered the identical problem 5 months
after your post.
------
Kind Regards,
Peter Smith.
Fujitsu Australia
PSA v2 of this patch - it has the same content, but an improved commit comment.
I have also added a commitfest entry, https://commitfest.postgresql.org/33/3109/
------
Kind Regards,
Peter Smith
Fujitsu Australia
Attachments:
v2-0001-Fix-wrconn.-Use-stack-variable.patchapplication/octet-stream; name=v2-0001-Fix-wrconn.-Use-stack-variable.patchDownload
From b1011672ef711635e000ae7dbe6fc466e3e332ff Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Wed, 5 May 2021 10:01:15 +1000
Subject: [PATCH v2] Fix wrconn. Use stack variable.
This patch replaces the global "wrconn" in AlterSubscription_refresh with a local variable of the same name, making it consistent with other functions in subscriptioncmds.c (e.g. DropSubscription).
The global wrconn is only meant to be used for logical apply/tablesync worker.
Using the global/incorrect wrconn in AlterSubscription_refresh doesn't normally cause any problems, but harm is still posslble if the apply worker ever manages to do a subscription refresh. e.g. see [1].
[1] https://www.postgresql.org/message-id/20201111215820.qihhrz7fayu6myfi%40alap3.anarazel.de
---
src/backend/commands/subscriptioncmds.c | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index ec5c409..668cecd 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -601,18 +601,19 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
char state;
} SubRemoveRels;
SubRemoveRels *sub_remove_rels;
+ WalReceiverConn *wrconn;
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
+ /* Try to connect to the publisher. */
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ (errmsg("could not connect to the publisher: %s", err)));
+
PG_TRY();
{
- /* Try to connect to the publisher. */
- wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
- if (!wrconn)
- ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
-
/* Get the table list from publisher. */
pubrel_names = fetch_table_list(wrconn, sub->publications);
@@ -782,8 +783,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
}
PG_FINALLY();
{
- if (wrconn)
- walrcv_disconnect(wrconn);
+ walrcv_disconnect(wrconn);
}
PG_END_TRY();
--
1.8.3.1
Peter Smith <smithpb2250@gmail.com> writes:
This patch replaces the global "wrconn" in AlterSubscription_refresh with a local variable of the same name, making it consistent with other functions in subscriptioncmds.c (e.g. DropSubscription).
The global wrconn is only meant to be used for logical apply/tablesync worker.
Using the global/incorrect wrconn in AlterSubscription_refresh doesn't normally cause any problems, but harm is still posslble if the apply worker ever manages to do a subscription refresh. e.g. see [1]
Hm. I would actually place the blame for this on whoever thought
it was okay to name a global variable something as generic as
"wrconn". Let's rename that while we're at it, say to something
like "tablesync_wrconn" (feel free to bikeshed).
regards, tom lane
On Tue, May 04, 2021 at 10:35:02PM -0400, Tom Lane wrote:
Peter Smith <smithpb2250@gmail.com> writes:
This patch replaces the global "wrconn" in AlterSubscription_refresh with a local variable of the same name, making it consistent with other functions in subscriptioncmds.c (e.g. DropSubscription).
The global wrconn is only meant to be used for logical apply/tablesync worker.
Using the global/incorrect wrconn in AlterSubscription_refresh doesn't normally cause any problems, but harm is still posslble if the apply worker ever manages to do a subscription refresh. e.g. see [1]Hm. I would actually place the blame for this on whoever thought
it was okay to name a global variable something as generic as
"wrconn". Let's rename that while we're at it, say to something
like "tablesync_wrconn" (feel free to bikeshed).
Yea, I think global vars should have at least 1) an underscore, or 2) a
capital, and in any case be 3) longer than 6 chars.
There's very few which violate both "arms" of that rule - should anything else
be renamed, too ?
$ git grep -E '^static [^(=]*\<[[:lower:]]{,6}(;$| =)' src/backend/'*.c'
src/backend/access/heap/vacuumlazy.c:static int elevel = -1;
src/backend/access/transam/xloginsert.c:static XLogRecData *rdatas;
src/backend/bootstrap/bootstrap.c:static MemoryContext nogc = NULL; /* special no-gc mem context */
src/backend/libpq/be-fsstubs.c:static MemoryContext fscxt = NULL;
src/backend/replication/walreceiver.c:static WalReceiverConn *wrconn = NULL;
src/backend/replication/walsender.c:static StringInfoData tmpbuf;
src/backend/storage/file/fd.c:static int nfile = 0;
src/backend/utils/misc/sampling.c:static ReservoirStateData oldrs;
pryzbyj@pryzbyj:~/src/postgres$ git grep -lE '^static [^(=]*\<[[:lower:]]{,6}(;$| =)' src/backend/'*.c' |xargs wc -l |sort -nr
4326 src/backend/access/heap/vacuumlazy.c
3781 src/backend/storage/file/fd.c
3698 src/backend/replication/walsender.c
1428 src/backend/replication/walreceiver.c
1227 src/backend/access/transam/xloginsert.c
1155 src/backend/bootstrap/bootstrap.c
864 src/backend/libpq/be-fsstubs.c
296 src/backend/utils/misc/sampling.c
--
Justin
On Wed, May 5, 2021 at 8:05 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:
Peter Smith <smithpb2250@gmail.com> writes:
This patch replaces the global "wrconn" in AlterSubscription_refresh with a local variable of the same name, making it consistent with other functions in subscriptioncmds.c (e.g. DropSubscription).
The global wrconn is only meant to be used for logical apply/tablesync worker.
Using the global/incorrect wrconn in AlterSubscription_refresh doesn't normally cause any problems, but harm is still posslble if the apply worker ever manages to do a subscription refresh. e.g. see [1]Hm. I would actually place the blame for this on whoever thought
it was okay to name a global variable something as generic as
"wrconn". Let's rename that while we're at it, say to something
like "tablesync_wrconn" (feel free to bikeshed).
I don't think "tablesync_wrconn" is the right name, because wrconn is
also being used in logical replication apply worker. So something like
"apply_worker_wrconn" would be more meaningful.
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
On Wed, May 5, 2021 at 3:20 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:
On Wed, May 5, 2021 at 8:05 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:
Peter Smith <smithpb2250@gmail.com> writes:
This patch replaces the global "wrconn" in AlterSubscription_refresh with a local variable of the same name, making it consistent with other functions in subscriptioncmds.c (e.g. DropSubscription).
The global wrconn is only meant to be used for logical apply/tablesync worker.
Using the global/incorrect wrconn in AlterSubscription_refresh doesn't normally cause any problems, but harm is still posslble if the apply worker ever manages to do a subscription refresh. e.g. see [1]Hm. I would actually place the blame for this on whoever thought
it was okay to name a global variable something as generic as
"wrconn". Let's rename that while we're at it, say to something
like "tablesync_wrconn" (feel free to bikeshed).
OK, I am happy to change this but firstly just need some consensus on
the new name to use. I hope to avoid changing it, and then changing it
5 more times.
I don't think "tablesync_wrconn" is the right name, because wrconn is
also being used in logical replication apply worker. So something like
"apply_worker_wrconn" would be more meaningful.
Yes. that is better except I wonder if "apply_worker_wrconn" might
seem unusual when used by the tablesync worker.
My suggestion is "lrep_worker_wrconn" which seems ok for both apply /
tablesyn workers.
------
Kind Regards,
Peter Smith.
Fujitsu Australia
On Wed, May 5, 2021 at 12:35 PM Tom Lane <tgl@sss.pgh.pa.us> wrote:
Peter Smith <smithpb2250@gmail.com> writes:
This patch replaces the global "wrconn" in AlterSubscription_refresh with a local variable of the same name, making it consistent with other functions in subscriptioncmds.c (e.g. DropSubscription).
The global wrconn is only meant to be used for logical apply/tablesync worker.
Using the global/incorrect wrconn in AlterSubscription_refresh doesn't normally cause any problems, but harm is still posslble if the apply worker ever manages to do a subscription refresh. e.g. see [1]Hm. I would actually place the blame for this on whoever thought
it was okay to name a global variable something as generic as
"wrconn". Let's rename that while we're at it, say to something
like "tablesync_wrconn" (feel free to bikeshed).
PSA v3 of the patch. Same as before, but now also renames the global
variable from "wrconn" to "lrep_worker_wrconn".
------
Kind Regards,
Peter Smith
Fujitsu Australia
Attachments:
v3-0001-Fix-wrconn.-Use-stack-variable.patchapplication/octet-stream; name=v3-0001-Fix-wrconn.-Use-stack-variable.patchDownload
From 9e8b4ddd829f73a5520a8796c7e74f2589a8d969 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Thu, 6 May 2021 18:57:38 +1000
Subject: [PATCH v3] Fix wrconn. Use stack variable.
This patch replaces the global "wrconn" in AlterSubscription_refresh with a local variable of the same name, making it consistent with other functions in subscriptioncmds.c (e.g. DropSubscription).
The global wrconn is only meant to be used for logical apply/tablesync worker. To reduce future confusion it has renamed from "wrconn" to "lrep_worker_wrconn".
Using the global/incorrect wrconn in AlterSubscription_refresh doesn't normally cause any problems, but harm is still posslble if the apply worker ever manages to do a subscription refresh. e.g. see [1].
[1] https://www.postgresql.org/message-id/20201111215820.qihhrz7fayu6myfi%40alap3.anarazel.de
---
src/backend/commands/subscriptioncmds.c | 16 ++++++++--------
src/backend/replication/logical/launcher.c | 4 ++--
src/backend/replication/logical/tablesync.c | 29 +++++++++++++++--------------
src/backend/replication/logical/worker.c | 20 ++++++++++----------
src/include/replication/worker_internal.h | 2 +-
5 files changed, 36 insertions(+), 35 deletions(-)
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 517c8ed..1096aa8 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -556,18 +556,19 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
char state;
} SubRemoveRels;
SubRemoveRels *sub_remove_rels;
+ WalReceiverConn *wrconn;
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
+ /* Try to connect to the publisher. */
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ (errmsg("could not connect to the publisher: %s", err)));
+
PG_TRY();
{
- /* Try to connect to the publisher. */
- wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
- if (!wrconn)
- ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
-
/* Get the table list from publisher. */
pubrel_names = fetch_table_list(wrconn, sub->publications);
@@ -737,8 +738,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
}
PG_FINALLY();
{
- if (wrconn)
- walrcv_disconnect(wrconn);
+ walrcv_disconnect(wrconn);
}
PG_END_TRY();
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index cb462a0..a39ae17 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -649,8 +649,8 @@ static void
logicalrep_worker_onexit(int code, Datum arg)
{
/* Disconnect gracefully from the remote side. */
- if (wrconn)
- walrcv_disconnect(wrconn);
+ if (lrep_worker_wrconn)
+ walrcv_disconnect(lrep_worker_wrconn);
logicalrep_worker_detach();
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 0638f5c..eda9f23 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -302,8 +302,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
- /* End wal streaming so wrconn can be re-used to drop the slot. */
- walrcv_endstreaming(wrconn, &tli);
+ /* End wal streaming so lrep_worker_wrconn can be re-used to drop the slot. */
+ walrcv_endstreaming(lrep_worker_wrconn, &tli);
/*
* Cleanup the tablesync slot.
@@ -322,7 +322,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
* otherwise, it won't be dropped till the corresponding subscription
* is dropped. So passing missing_ok = false.
*/
- ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
+ ReplicationSlotDropAtPubNode(lrep_worker_wrconn, syncslotname, false);
finish_sync_worker();
}
@@ -642,7 +642,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
for (;;)
{
/* Try read the data. */
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(lrep_worker_wrconn, &buf, &fd);
CHECK_FOR_INTERRUPTS();
@@ -715,7 +715,7 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND c.relname = %s",
quote_literal_cstr(nspname),
quote_literal_cstr(relname));
- res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
+ res = walrcv_exec(lrep_worker_wrconn, cmd.data, lengthof(tableRow), tableRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -752,9 +752,10 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND a.attrelid = %u"
" ORDER BY a.attnum",
lrel->remoteid,
- (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
+ (walrcv_server_version(lrep_worker_wrconn) >= 120000 ?
+ "AND a.attgenerated = ''" : ""),
lrel->remoteid);
- res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
+ res = walrcv_exec(lrep_worker_wrconn, cmd.data, lengthof(attrRow), attrRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -841,7 +842,7 @@ copy_table(Relation rel)
appendStringInfo(&cmd, " FROM %s) TO STDOUT",
quote_qualified_identifier(lrel.nspname, lrel.relname));
}
- res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+ res = walrcv_exec(lrep_worker_wrconn, cmd.data, 0, NULL);
pfree(cmd.data);
if (res->status != WALRCV_OK_COPY_OUT)
ereport(ERROR,
@@ -957,8 +958,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* application_name, so that it is different from the main apply worker,
* so that synchronous replication can distinguish them.
*/
- wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
- if (wrconn == NULL)
+ lrep_worker_wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
+ if (lrep_worker_wrconn == NULL)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
@@ -985,7 +986,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* breakdown then it wouldn't have succeeded so trying it next time
* seems like a better bet.
*/
- ReplicationSlotDropAtPubNode(wrconn, slotname, true);
+ ReplicationSlotDropAtPubNode(lrep_worker_wrconn, slotname, true);
}
else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
{
@@ -1038,7 +1039,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* ensures that both the replication slot we create (see below) and the
* COPY are consistent with each other.
*/
- res = walrcv_exec(wrconn,
+ res = walrcv_exec(lrep_worker_wrconn,
"BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
0, NULL);
if (res->status != WALRCV_OK_COMMAND)
@@ -1058,7 +1059,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* slot leading to a dangling slot on the server.
*/
HOLD_INTERRUPTS();
- walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
+ walrcv_create_slot(lrep_worker_wrconn, slotname, false /* permanent */ ,
CRS_USE_SNAPSHOT, origin_startpos);
RESUME_INTERRUPTS();
@@ -1100,7 +1101,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
copy_table(rel);
PopActiveSnapshot();
- res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+ res = walrcv_exec(lrep_worker_wrconn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
(errmsg("table copy could not finish transaction on publisher: %s",
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d9f1571..181b716 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -156,7 +156,7 @@ MemoryContext ApplyContext = NULL;
/* per stream context for streaming transactions */
static MemoryContext LogicalStreamingContext = NULL;
-WalReceiverConn *wrconn = NULL;
+WalReceiverConn *lrep_worker_wrconn = NULL;
Subscription *MySubscription = NULL;
bool MySubscriptionValid = false;
@@ -2126,7 +2126,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
MemoryContextSwitchTo(ApplyMessageContext);
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(lrep_worker_wrconn, &buf, &fd);
if (len != 0)
{
@@ -2206,7 +2206,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
MemoryContextReset(ApplyMessageContext);
}
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(lrep_worker_wrconn, &buf, &fd);
}
}
@@ -2312,7 +2312,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
}
/* All done */
- walrcv_endstreaming(wrconn, &tli);
+ walrcv_endstreaming(lrep_worker_wrconn, &tli);
}
/*
@@ -2396,7 +2396,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
LSN_FORMAT_ARGS(writepos),
LSN_FORMAT_ARGS(flushpos));
- walrcv_send(wrconn, reply_message->data, reply_message->len);
+ walrcv_send(lrep_worker_wrconn, reply_message->data, reply_message->len);
if (recvpos > last_recvpos)
last_recvpos = recvpos;
@@ -3090,9 +3090,9 @@ ApplyWorkerMain(Datum main_arg)
origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();
- wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
+ lrep_worker_wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
&err);
- if (wrconn == NULL)
+ if (lrep_worker_wrconn == NULL)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
@@ -3100,7 +3100,7 @@ ApplyWorkerMain(Datum main_arg)
* We don't really use the output identify_system for anything but it
* does some initializations on the upstream so let's still call it.
*/
- (void) walrcv_identify_system(wrconn, &startpointTLI);
+ (void) walrcv_identify_system(lrep_worker_wrconn, &startpointTLI);
}
/*
@@ -3116,14 +3116,14 @@ ApplyWorkerMain(Datum main_arg)
options.startpoint = origin_startpos;
options.slotname = myslotname;
options.proto.logical.proto_version =
- walrcv_server_version(wrconn) >= 140000 ?
+ walrcv_server_version(lrep_worker_wrconn) >= 140000 ?
LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
options.proto.logical.streaming = MySubscription->stream;
/* Start normal logical streaming replication. */
- walrcv_startstreaming(wrconn, &options);
+ walrcv_startstreaming(lrep_worker_wrconn, &options);
/* Run the main loop. */
LogicalRepApplyLoop(origin_startpos);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 1cac75e..9209991 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -62,7 +62,7 @@ typedef struct LogicalRepWorker
extern MemoryContext ApplyContext;
/* libpqreceiver connection */
-extern struct WalReceiverConn *wrconn;
+extern struct WalReceiverConn *lrep_worker_wrconn;
/* Worker and subscription objects. */
extern Subscription *MySubscription;
--
1.8.3.1
On Thu, 06 May 2021 at 17:08, Peter Smith <smithpb2250@gmail.com> wrote:
On Wed, May 5, 2021 at 12:35 PM Tom Lane <tgl@sss.pgh.pa.us> wrote:
Peter Smith <smithpb2250@gmail.com> writes:
This patch replaces the global "wrconn" in AlterSubscription_refresh with a local variable of the same name, making it consistent with other functions in subscriptioncmds.c (e.g. DropSubscription).
The global wrconn is only meant to be used for logical apply/tablesync worker.
Using the global/incorrect wrconn in AlterSubscription_refresh doesn't normally cause any problems, but harm is still posslble if the apply worker ever manages to do a subscription refresh. e.g. see [1]Hm. I would actually place the blame for this on whoever thought
it was okay to name a global variable something as generic as
"wrconn". Let's rename that while we're at it, say to something
like "tablesync_wrconn" (feel free to bikeshed).PSA v3 of the patch. Same as before, but now also renames the global
variable from "wrconn" to "lrep_worker_wrconn".
Thanks for updating patch. I'm confused why we move the walrcv_connect() out of
PG_TRY() block?
+ /* Try to connect to the publisher. */
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ (errmsg("could not connect to the publisher: %s", err)));
+
PG_TRY();
{
- /* Try to connect to the publisher. */
- wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
- if (!wrconn)
- ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
-
/* Get the table list from publisher. */
pubrel_names = fetch_table_list(wrconn, sub->publications);
--
Regrads,
Japin Li.
ChengDu WenWu Information Technology Co.,Ltd.
On Thu, May 6, 2021 at 7:18 PM Japin Li <japinli@hotmail.com> wrote:
On Thu, 06 May 2021 at 17:08, Peter Smith <smithpb2250@gmail.com> wrote:
On Wed, May 5, 2021 at 12:35 PM Tom Lane <tgl@sss.pgh.pa.us> wrote:
Peter Smith <smithpb2250@gmail.com> writes:
This patch replaces the global "wrconn" in AlterSubscription_refresh with a local variable of the same name, making it consistent with other functions in subscriptioncmds.c (e.g. DropSubscription).
The global wrconn is only meant to be used for logical apply/tablesync worker.
Using the global/incorrect wrconn in AlterSubscription_refresh doesn't normally cause any problems, but harm is still posslble if the apply worker ever manages to do a subscription refresh. e.g. see [1]Hm. I would actually place the blame for this on whoever thought
it was okay to name a global variable something as generic as
"wrconn". Let's rename that while we're at it, say to something
like "tablesync_wrconn" (feel free to bikeshed).PSA v3 of the patch. Same as before, but now also renames the global
variable from "wrconn" to "lrep_worker_wrconn".Thanks for updating patch. I'm confused why we move the walrcv_connect() out of PG_TRY() block? + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + PG_TRY(); { - /* Try to connect to the publisher. */ - wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); - if (!wrconn) - ereport(ERROR, - (errmsg("could not connect to the publisher: %s", err))); - /* Get the table list from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications);
Thanks for your review. Reason for moving that out of the PG_TRY are:
a) It makes code now consistent with other functions using wrconn. See
CreateSubscription, DropSubscription etc
b) It means don't need the wrconn NULL check anymore in the PG_FINALLY
so it simplifies the disconnect.
------
Kind Regards,
Peter Smith.
Fujitsu Australia
On Thu, 06 May 2021 at 17:30, Peter Smith <smithpb2250@gmail.com> wrote:
On Thu, May 6, 2021 at 7:18 PM Japin Li <japinli@hotmail.com> wrote:
On Thu, 06 May 2021 at 17:08, Peter Smith <smithpb2250@gmail.com> wrote:
On Wed, May 5, 2021 at 12:35 PM Tom Lane <tgl@sss.pgh.pa.us> wrote:
Peter Smith <smithpb2250@gmail.com> writes:
This patch replaces the global "wrconn" in AlterSubscription_refresh with a local variable of the same name, making it consistent with other functions in subscriptioncmds.c (e.g. DropSubscription).
The global wrconn is only meant to be used for logical apply/tablesync worker.
Using the global/incorrect wrconn in AlterSubscription_refresh doesn't normally cause any problems, but harm is still posslble if the apply worker ever manages to do a subscription refresh. e.g. see [1]Hm. I would actually place the blame for this on whoever thought
it was okay to name a global variable something as generic as
"wrconn". Let's rename that while we're at it, say to something
like "tablesync_wrconn" (feel free to bikeshed).PSA v3 of the patch. Same as before, but now also renames the global
variable from "wrconn" to "lrep_worker_wrconn".Thanks for updating patch. I'm confused why we move the walrcv_connect() out of PG_TRY() block? + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + PG_TRY(); { - /* Try to connect to the publisher. */ - wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); - if (!wrconn) - ereport(ERROR, - (errmsg("could not connect to the publisher: %s", err))); - /* Get the table list from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications);Thanks for your review. Reason for moving that out of the PG_TRY are:
a) It makes code now consistent with other functions using wrconn. See
CreateSubscription, DropSubscription etcb) It means don't need the wrconn NULL check anymore in the PG_FINALLY
so it simplifies the disconnect.
Thanks for your explanation!
--
Regrads,
Japin Li.
ChengDu WenWu Information Technology Co.,Ltd.
On Thu, May 6, 2021 at 3:00 PM Peter Smith <smithpb2250@gmail.com> wrote:
On Thu, May 6, 2021 at 7:18 PM Japin Li <japinli@hotmail.com> wrote:
On Thu, 06 May 2021 at 17:08, Peter Smith <smithpb2250@gmail.com> wrote:
On Wed, May 5, 2021 at 12:35 PM Tom Lane <tgl@sss.pgh.pa.us> wrote:
Peter Smith <smithpb2250@gmail.com> writes:
This patch replaces the global "wrconn" in AlterSubscription_refresh with a local variable of the same name, making it consistent with other functions in subscriptioncmds.c (e.g. DropSubscription).
The global wrconn is only meant to be used for logical apply/tablesync worker.
Using the global/incorrect wrconn in AlterSubscription_refresh doesn't normally cause any problems, but harm is still posslble if the apply worker ever manages to do a subscription refresh. e.g. see [1]Hm. I would actually place the blame for this on whoever thought
it was okay to name a global variable something as generic as
"wrconn". Let's rename that while we're at it, say to something
like "tablesync_wrconn" (feel free to bikeshed).PSA v3 of the patch. Same as before, but now also renames the global
variable from "wrconn" to "lrep_worker_wrconn".Thanks for updating patch. I'm confused why we move the walrcv_connect() out of PG_TRY() block? + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + PG_TRY(); { - /* Try to connect to the publisher. */ - wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); - if (!wrconn) - ereport(ERROR, - (errmsg("could not connect to the publisher: %s", err))); - /* Get the table list from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications);Thanks for your review. Reason for moving that out of the PG_TRY are:
a) It makes code now consistent with other functions using wrconn. See
CreateSubscription, DropSubscription etcb) It means don't need the wrconn NULL check anymore in the PG_FINALLY
so it simplifies the disconnect.
And even if any error occurs after the connection is established and
while libpqrcv_PQexec is being done in libpqrcv_connect, we reach
PG_FINALLY() block to disconnect the connection, so no connection leak
can occur.
Patch looks good to me except for the comments in the commit message:
1) it crosses 80 char limit 2) a typo : "posslble"
Please add it to the current commitfest if not done already so that we
don't lose track of it and the patch gets a chance to be tested.
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
On 2021-May-06, Peter Smith wrote:
PSA v3 of the patch. Same as before, but now also renames the global
variable from "wrconn" to "lrep_worker_wrconn".
I think there are two patches here -- the changes to
AlterSubscription_refresh are a backpatchable bugfix, and the rest of it
can just be applied to master.
In my mind we make a bit of a distinction for global variables by using
CamelCase rather than undercore_separated_words. There are plenty that
violate that "rule" of course, but ISTM that makes them stand more and
it's less likely we've made this mistake. So I would name the variable
LogRepWALRcvConn or something like that. My €0.02.
--
Álvaro Herrera Valdivia, Chile
"Entristecido, Wutra (canción de Las Barreras)
echa a Freyr a rodar
y a nosotros al mar"
Alvaro Herrera <alvherre@alvh.no-ip.org> writes:
On 2021-May-06, Peter Smith wrote:
PSA v3 of the patch. Same as before, but now also renames the global
variable from "wrconn" to "lrep_worker_wrconn".
I think there are two patches here -- the changes to
AlterSubscription_refresh are a backpatchable bugfix, and the rest of it
can just be applied to master.
The rename of that variable is just cosmetic, true, but I'd still be
inclined to back-patch it. If we don't do so then I'm afraid that
future back-patched fixes might be bitten by the same confusion,
possibly introducing new real bugs.
Having said that, keeping the two aspects in separate patches might
ease review and testing.
In my mind we make a bit of a distinction for global variables by using
CamelCase rather than undercore_separated_words.
I think it's about 50/50, TBH. I'd stick with whichever style is
being used in nearby code.
regards, tom lane
On Fri, May 7, 2021 at 7:08 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:
Alvaro Herrera <alvherre@alvh.no-ip.org> writes:
On 2021-May-06, Peter Smith wrote:
PSA v3 of the patch. Same as before, but now also renames the global
variable from "wrconn" to "lrep_worker_wrconn".I think there are two patches here -- the changes to
AlterSubscription_refresh are a backpatchable bugfix, and the rest of it
can just be applied to master.The rename of that variable is just cosmetic, true, but I'd still be
inclined to back-patch it. If we don't do so then I'm afraid that
future back-patched fixes might be bitten by the same confusion,
possibly introducing new real bugs.Having said that, keeping the two aspects in separate patches might
ease review and testing.
Done.
In my mind we make a bit of a distinction for global variables by using
CamelCase rather than undercore_separated_words.I think it's about 50/50, TBH. I'd stick with whichever style is
being used in nearby code.
The nearby code was a random mixture of Camels and Snakes, so instead
of flipping a coin I went with the suggestion from Alvaro.
~~
PSA v4 of the patch.
0001 - Fixes the AlterSubscription_refresh as before.
0002 - Renames the global var "wrconn" -> "LogRepWorkerWalRcvConn" as suggested.
------
Kind Regards,
Peter Smith
Fujitsu Australia
Attachments:
v4-0001-AlterSubscription_refresh-Use-stack-variable-for-.patchapplication/octet-stream; name=v4-0001-AlterSubscription_refresh-Use-stack-variable-for-.patchDownload
From f5d9437a48a6e42baea722788d298bc82dd0e471 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Fri, 7 May 2021 17:11:13 +1000
Subject: [PATCH v4] AlterSubscription_refresh - Use stack variable for wrconn.
This patch replaces the global "wrconn" in AlterSubscription_refresh with a local
variable of the same name, making it consistent with other functions in
subscriptioncmds.c (e.g. DropSubscription).
The global wrconn is only meant to be used for logical apply/tablesync worker.
Using the global/incorrect wrconn in AlterSubscription_refresh doesn't normally
cause any problems, but harm is still possible if the apply worker ever manages to
do a subscription refresh. e.g. see [1].
[1] https://www.postgresql.org/message-id/20201111215820.qihhrz7fayu6myfi%40alap3.anarazel.de
---
src/backend/commands/subscriptioncmds.c | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 517c8ed..1096aa8 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -556,18 +556,19 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
char state;
} SubRemoveRels;
SubRemoveRels *sub_remove_rels;
+ WalReceiverConn *wrconn;
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
+ /* Try to connect to the publisher. */
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ (errmsg("could not connect to the publisher: %s", err)));
+
PG_TRY();
{
- /* Try to connect to the publisher. */
- wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
- if (!wrconn)
- ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
-
/* Get the table list from publisher. */
pubrel_names = fetch_table_list(wrconn, sub->publications);
@@ -737,8 +738,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
}
PG_FINALLY();
{
- if (wrconn)
- walrcv_disconnect(wrconn);
+ walrcv_disconnect(wrconn);
}
PG_END_TRY();
--
1.8.3.1
v4-0002-Rename-the-logical-replication-global-wrconn.patchapplication/octet-stream; name=v4-0002-Rename-the-logical-replication-global-wrconn.patchDownload
From 0492d165180c80197d5552dd3f114da202c5de34 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Fri, 7 May 2021 17:51:12 +1000
Subject: [PATCH v4] Rename the logical replication global wrconn.
The worker.c global wrconn is only meant to be used for logical apply/tablesync
workers, but there are other variables with the same name. To reduce future confusion
the global has renamed from "wrconn" to "LogRepWorkerWalRcvConn".
---
src/backend/replication/logical/launcher.c | 4 ++--
src/backend/replication/logical/tablesync.c | 33 +++++++++++++++++------------
src/backend/replication/logical/worker.c | 22 +++++++++----------
src/include/replication/worker_internal.h | 2 +-
4 files changed, 33 insertions(+), 28 deletions(-)
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index cb462a0..f97ab12 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -649,8 +649,8 @@ static void
logicalrep_worker_onexit(int code, Datum arg)
{
/* Disconnect gracefully from the remote side. */
- if (wrconn)
- walrcv_disconnect(wrconn);
+ if (LogRepWorkerWalRcvConn)
+ walrcv_disconnect(LogRepWorkerWalRcvConn);
logicalrep_worker_detach();
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 0638f5c..c2e18b6 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -302,8 +302,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
- /* End wal streaming so wrconn can be re-used to drop the slot. */
- walrcv_endstreaming(wrconn, &tli);
+ /*
+ * End wal streaming so LogRepWorkerWalRcvConn can be re-used to drop
+ * the slot.
+ */
+ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
/*
* Cleanup the tablesync slot.
@@ -322,7 +325,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
* otherwise, it won't be dropped till the corresponding subscription
* is dropped. So passing missing_ok = false.
*/
- ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
+ ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
finish_sync_worker();
}
@@ -642,7 +645,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
for (;;)
{
/* Try read the data. */
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
CHECK_FOR_INTERRUPTS();
@@ -715,7 +718,7 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND c.relname = %s",
quote_literal_cstr(nspname),
quote_literal_cstr(relname));
- res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, lengthof(tableRow), tableRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -752,9 +755,10 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND a.attrelid = %u"
" ORDER BY a.attnum",
lrel->remoteid,
- (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
+ (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
+ "AND a.attgenerated = ''" : ""),
lrel->remoteid);
- res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, lengthof(attrRow), attrRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -841,7 +845,7 @@ copy_table(Relation rel)
appendStringInfo(&cmd, " FROM %s) TO STDOUT",
quote_qualified_identifier(lrel.nspname, lrel.relname));
}
- res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
pfree(cmd.data);
if (res->status != WALRCV_OK_COPY_OUT)
ereport(ERROR,
@@ -957,8 +961,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* application_name, so that it is different from the main apply worker,
* so that synchronous replication can distinguish them.
*/
- wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
- if (wrconn == NULL)
+ LogRepWorkerWalRcvConn =
+ walrcv_connect(MySubscription->conninfo, true, slotname, &err);
+ if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
@@ -985,7 +990,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* breakdown then it wouldn't have succeeded so trying it next time
* seems like a better bet.
*/
- ReplicationSlotDropAtPubNode(wrconn, slotname, true);
+ ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
}
else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
{
@@ -1038,7 +1043,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* ensures that both the replication slot we create (see below) and the
* COPY are consistent with each other.
*/
- res = walrcv_exec(wrconn,
+ res = walrcv_exec(LogRepWorkerWalRcvConn,
"BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
0, NULL);
if (res->status != WALRCV_OK_COMMAND)
@@ -1058,7 +1063,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* slot leading to a dangling slot on the server.
*/
HOLD_INTERRUPTS();
- walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
+ walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ ,
CRS_USE_SNAPSHOT, origin_startpos);
RESUME_INTERRUPTS();
@@ -1100,7 +1105,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
copy_table(rel);
PopActiveSnapshot();
- res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
(errmsg("table copy could not finish transaction on publisher: %s",
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d9f1571..347ee9d 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -156,7 +156,7 @@ MemoryContext ApplyContext = NULL;
/* per stream context for streaming transactions */
static MemoryContext LogicalStreamingContext = NULL;
-WalReceiverConn *wrconn = NULL;
+WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
Subscription *MySubscription = NULL;
bool MySubscriptionValid = false;
@@ -2126,7 +2126,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
MemoryContextSwitchTo(ApplyMessageContext);
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
if (len != 0)
{
@@ -2206,7 +2206,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
MemoryContextReset(ApplyMessageContext);
}
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
}
}
@@ -2312,7 +2312,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
}
/* All done */
- walrcv_endstreaming(wrconn, &tli);
+ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
}
/*
@@ -2396,7 +2396,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
LSN_FORMAT_ARGS(writepos),
LSN_FORMAT_ARGS(flushpos));
- walrcv_send(wrconn, reply_message->data, reply_message->len);
+ walrcv_send(LogRepWorkerWalRcvConn, reply_message->data, reply_message->len);
if (recvpos > last_recvpos)
last_recvpos = recvpos;
@@ -3090,9 +3090,9 @@ ApplyWorkerMain(Datum main_arg)
origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();
- wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
- &err);
- if (wrconn == NULL)
+ LogRepWorkerWalRcvConn =
+ walrcv_connect(MySubscription->conninfo, true, MySubscription->name, &err);
+ if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
@@ -3100,7 +3100,7 @@ ApplyWorkerMain(Datum main_arg)
* We don't really use the output identify_system for anything but it
* does some initializations on the upstream so let's still call it.
*/
- (void) walrcv_identify_system(wrconn, &startpointTLI);
+ (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
}
/*
@@ -3116,14 +3116,14 @@ ApplyWorkerMain(Datum main_arg)
options.startpoint = origin_startpos;
options.slotname = myslotname;
options.proto.logical.proto_version =
- walrcv_server_version(wrconn) >= 140000 ?
+ walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
options.proto.logical.streaming = MySubscription->stream;
/* Start normal logical streaming replication. */
- walrcv_startstreaming(wrconn, &options);
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
/* Run the main loop. */
LogicalRepApplyLoop(origin_startpos);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 1cac75e..179eb43 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -62,7 +62,7 @@ typedef struct LogicalRepWorker
extern MemoryContext ApplyContext;
/* libpqreceiver connection */
-extern struct WalReceiverConn *wrconn;
+extern struct WalReceiverConn *LogRepWorkerWalRcvConn;
/* Worker and subscription objects. */
extern Subscription *MySubscription;
--
1.8.3.1
On Fri, May 7, 2021 at 6:09 PM Peter Smith <smithpb2250@gmail.com> wrote:
On Fri, May 7, 2021 at 7:08 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:
Alvaro Herrera <alvherre@alvh.no-ip.org> writes:
On 2021-May-06, Peter Smith wrote:
PSA v3 of the patch. Same as before, but now also renames the global
variable from "wrconn" to "lrep_worker_wrconn".I think there are two patches here -- the changes to
AlterSubscription_refresh are a backpatchable bugfix, and the rest of it
can just be applied to master.The rename of that variable is just cosmetic, true, but I'd still be
inclined to back-patch it. If we don't do so then I'm afraid that
future back-patched fixes might be bitten by the same confusion,
possibly introducing new real bugs.Having said that, keeping the two aspects in separate patches might
ease review and testing.Done.
In my mind we make a bit of a distinction for global variables by using
CamelCase rather than undercore_separated_words.I think it's about 50/50, TBH. I'd stick with whichever style is
being used in nearby code.The nearby code was a random mixture of Camels and Snakes, so instead
of flipping a coin I went with the suggestion from Alvaro.~~
PSA v4 of the patch.
0001 - Fixes the AlterSubscription_refresh as before.
0002 - Renames the global var "wrconn" -> "LogRepWorkerWalRcvConn" as suggested.
It seems that the 0001 part of this patch was pushed in the weekend [1]https://github.com/postgres/postgres/commit/4e8c0f1a0d0d095a749a329a216c88a340a455b6. Thanks!
But what about the 0002 part? If there is no immediate plan to push
that also then I will post a v5 just to stop the cfbot complaining.
--------
[1]: https://github.com/postgres/postgres/commit/4e8c0f1a0d0d095a749a329a216c88a340a455b6
KInd Regards,
Peter Smith
Fujitsu Australia
On Mon, May 10, 2021 at 7:50 AM Peter Smith <smithpb2250@gmail.com> wrote:
0001 - Fixes the AlterSubscription_refresh as before.
0002 - Renames the global var "wrconn" -> "LogRepWorkerWalRcvConn" as suggested.It seems that the 0001 part of this patch was pushed in the weekend [1]. Thanks!
But what about the 0002 part? If there is no immediate plan to push
that also then I will post a v5 just to stop the cfbot complaining.
I think the 0002 patch can be posted here, if it looks good, it can be
made "Ready For Committer".
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
PSA v5 of the patch. It is the same as v4 but with the v4-0001 part
omitted because that was already pushed.
(reposted to keep cfbot happy).
------
Kind Regards,
Peter Smith
Fujitsu Australia
Attachments:
v5-0001-Rename-the-logical-replication-global-wrconn.patchapplication/octet-stream; name=v5-0001-Rename-the-logical-replication-global-wrconn.patchDownload
From f11afb85ebc0eb9bd54702ddfb2b40432b945bfc Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Mon, 10 May 2021 16:08:49 +1000
Subject: [PATCH v5] Rename the logical replication global wrconn.
The worker.c global wrconn is only meant to be used for logical apply/tablesync
workers, but there are other variables with the same name. To reduce future confusion
the global has renamed from "wrconn" to "LogRepWorkerWalRcvConn".
---
src/backend/replication/logical/launcher.c | 4 ++--
src/backend/replication/logical/tablesync.c | 33 +++++++++++++++++------------
src/backend/replication/logical/worker.c | 22 +++++++++----------
src/include/replication/worker_internal.h | 2 +-
4 files changed, 33 insertions(+), 28 deletions(-)
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index cb462a0..f97ab12 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -649,8 +649,8 @@ static void
logicalrep_worker_onexit(int code, Datum arg)
{
/* Disconnect gracefully from the remote side. */
- if (wrconn)
- walrcv_disconnect(wrconn);
+ if (LogRepWorkerWalRcvConn)
+ walrcv_disconnect(LogRepWorkerWalRcvConn);
logicalrep_worker_detach();
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 0638f5c..c2e18b6 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -302,8 +302,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
- /* End wal streaming so wrconn can be re-used to drop the slot. */
- walrcv_endstreaming(wrconn, &tli);
+ /*
+ * End wal streaming so LogRepWorkerWalRcvConn can be re-used to drop
+ * the slot.
+ */
+ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
/*
* Cleanup the tablesync slot.
@@ -322,7 +325,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
* otherwise, it won't be dropped till the corresponding subscription
* is dropped. So passing missing_ok = false.
*/
- ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
+ ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
finish_sync_worker();
}
@@ -642,7 +645,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
for (;;)
{
/* Try read the data. */
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
CHECK_FOR_INTERRUPTS();
@@ -715,7 +718,7 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND c.relname = %s",
quote_literal_cstr(nspname),
quote_literal_cstr(relname));
- res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, lengthof(tableRow), tableRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -752,9 +755,10 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND a.attrelid = %u"
" ORDER BY a.attnum",
lrel->remoteid,
- (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
+ (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
+ "AND a.attgenerated = ''" : ""),
lrel->remoteid);
- res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, lengthof(attrRow), attrRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -841,7 +845,7 @@ copy_table(Relation rel)
appendStringInfo(&cmd, " FROM %s) TO STDOUT",
quote_qualified_identifier(lrel.nspname, lrel.relname));
}
- res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
pfree(cmd.data);
if (res->status != WALRCV_OK_COPY_OUT)
ereport(ERROR,
@@ -957,8 +961,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* application_name, so that it is different from the main apply worker,
* so that synchronous replication can distinguish them.
*/
- wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
- if (wrconn == NULL)
+ LogRepWorkerWalRcvConn =
+ walrcv_connect(MySubscription->conninfo, true, slotname, &err);
+ if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
@@ -985,7 +990,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* breakdown then it wouldn't have succeeded so trying it next time
* seems like a better bet.
*/
- ReplicationSlotDropAtPubNode(wrconn, slotname, true);
+ ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
}
else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
{
@@ -1038,7 +1043,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* ensures that both the replication slot we create (see below) and the
* COPY are consistent with each other.
*/
- res = walrcv_exec(wrconn,
+ res = walrcv_exec(LogRepWorkerWalRcvConn,
"BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
0, NULL);
if (res->status != WALRCV_OK_COMMAND)
@@ -1058,7 +1063,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* slot leading to a dangling slot on the server.
*/
HOLD_INTERRUPTS();
- walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
+ walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ ,
CRS_USE_SNAPSHOT, origin_startpos);
RESUME_INTERRUPTS();
@@ -1100,7 +1105,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
copy_table(rel);
PopActiveSnapshot();
- res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
(errmsg("table copy could not finish transaction on publisher: %s",
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d9f1571..347ee9d 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -156,7 +156,7 @@ MemoryContext ApplyContext = NULL;
/* per stream context for streaming transactions */
static MemoryContext LogicalStreamingContext = NULL;
-WalReceiverConn *wrconn = NULL;
+WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
Subscription *MySubscription = NULL;
bool MySubscriptionValid = false;
@@ -2126,7 +2126,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
MemoryContextSwitchTo(ApplyMessageContext);
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
if (len != 0)
{
@@ -2206,7 +2206,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
MemoryContextReset(ApplyMessageContext);
}
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
}
}
@@ -2312,7 +2312,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
}
/* All done */
- walrcv_endstreaming(wrconn, &tli);
+ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
}
/*
@@ -2396,7 +2396,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
LSN_FORMAT_ARGS(writepos),
LSN_FORMAT_ARGS(flushpos));
- walrcv_send(wrconn, reply_message->data, reply_message->len);
+ walrcv_send(LogRepWorkerWalRcvConn, reply_message->data, reply_message->len);
if (recvpos > last_recvpos)
last_recvpos = recvpos;
@@ -3090,9 +3090,9 @@ ApplyWorkerMain(Datum main_arg)
origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();
- wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
- &err);
- if (wrconn == NULL)
+ LogRepWorkerWalRcvConn =
+ walrcv_connect(MySubscription->conninfo, true, MySubscription->name, &err);
+ if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
@@ -3100,7 +3100,7 @@ ApplyWorkerMain(Datum main_arg)
* We don't really use the output identify_system for anything but it
* does some initializations on the upstream so let's still call it.
*/
- (void) walrcv_identify_system(wrconn, &startpointTLI);
+ (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
}
/*
@@ -3116,14 +3116,14 @@ ApplyWorkerMain(Datum main_arg)
options.startpoint = origin_startpos;
options.slotname = myslotname;
options.proto.logical.proto_version =
- walrcv_server_version(wrconn) >= 140000 ?
+ walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
options.proto.logical.streaming = MySubscription->stream;
/* Start normal logical streaming replication. */
- walrcv_startstreaming(wrconn, &options);
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
/* Run the main loop. */
LogicalRepApplyLoop(origin_startpos);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 1cac75e..179eb43 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -62,7 +62,7 @@ typedef struct LogicalRepWorker
extern MemoryContext ApplyContext;
/* libpqreceiver connection */
-extern struct WalReceiverConn *wrconn;
+extern struct WalReceiverConn *LogRepWorkerWalRcvConn;
/* Worker and subscription objects. */
extern Subscription *MySubscription;
--
1.8.3.1
On 2021-May-10, Peter Smith wrote:
PSA v5 of the patch. It is the same as v4 but with the v4-0001 part
omitted because that was already pushed.
I made a few whitespace adjustments on Friday that I didn't get time to
push, so I left the whole set to after the minors are finalized this
week. I'll get them pushed on Wednesday or so. (The back branches have
a few conflicts, on every release, but I see no reason to post those and
it'd upset the cfbot).
--
�lvaro Herrera Valdivia, Chile
Attachments:
v6-0001-Rename-the-logical-replication-global-wrconn.patchtext/x-diff; charset=us-asciiDownload
From 8494316bef64cde7476146bce0bda9a93890def7 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Fri, 7 May 2021 17:51:12 +1000
Subject: [PATCH v6] Rename the logical replication global wrconn.
The worker.c global wrconn is only meant to be used for logical apply/tablesync
workers, but there are other variables with the same name. To reduce future confusion
the global has renamed from "wrconn" to "LogRepWorkerWalRcvConn".
---
src/backend/replication/logical/launcher.c | 4 +--
src/backend/replication/logical/tablesync.c | 35 ++++++++++++---------
src/backend/replication/logical/worker.c | 23 +++++++-------
src/include/replication/worker_internal.h | 2 +-
4 files changed, 36 insertions(+), 28 deletions(-)
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index cb462a052a..f97ab12675 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -649,8 +649,8 @@ static void
logicalrep_worker_onexit(int code, Datum arg)
{
/* Disconnect gracefully from the remote side. */
- if (wrconn)
- walrcv_disconnect(wrconn);
+ if (LogRepWorkerWalRcvConn)
+ walrcv_disconnect(LogRepWorkerWalRcvConn);
logicalrep_worker_detach();
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 0638f5c7f8..67f907cdd9 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -302,8 +302,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
- /* End wal streaming so wrconn can be re-used to drop the slot. */
- walrcv_endstreaming(wrconn, &tli);
+ /*
+ * End streaming so that LogRepWorkerWalRcvConn can be used to drop
+ * the slot.
+ */
+ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
/*
* Cleanup the tablesync slot.
@@ -322,7 +325,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
* otherwise, it won't be dropped till the corresponding subscription
* is dropped. So passing missing_ok = false.
*/
- ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
+ ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
finish_sync_worker();
}
@@ -642,7 +645,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
for (;;)
{
/* Try read the data. */
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
CHECK_FOR_INTERRUPTS();
@@ -715,7 +718,8 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND c.relname = %s",
quote_literal_cstr(nspname),
quote_literal_cstr(relname));
- res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+ lengthof(tableRow), tableRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -752,9 +756,11 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND a.attrelid = %u"
" ORDER BY a.attnum",
lrel->remoteid,
- (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
+ (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
+ "AND a.attgenerated = ''" : ""),
lrel->remoteid);
- res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+ lengthof(attrRow), attrRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -841,7 +847,7 @@ copy_table(Relation rel)
appendStringInfo(&cmd, " FROM %s) TO STDOUT",
quote_qualified_identifier(lrel.nspname, lrel.relname));
}
- res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
pfree(cmd.data);
if (res->status != WALRCV_OK_COPY_OUT)
ereport(ERROR,
@@ -957,8 +963,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* application_name, so that it is different from the main apply worker,
* so that synchronous replication can distinguish them.
*/
- wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
- if (wrconn == NULL)
+ LogRepWorkerWalRcvConn =
+ walrcv_connect(MySubscription->conninfo, true, slotname, &err);
+ if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
@@ -985,7 +992,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* breakdown then it wouldn't have succeeded so trying it next time
* seems like a better bet.
*/
- ReplicationSlotDropAtPubNode(wrconn, slotname, true);
+ ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
}
else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
{
@@ -1038,7 +1045,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* ensures that both the replication slot we create (see below) and the
* COPY are consistent with each other.
*/
- res = walrcv_exec(wrconn,
+ res = walrcv_exec(LogRepWorkerWalRcvConn,
"BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
0, NULL);
if (res->status != WALRCV_OK_COMMAND)
@@ -1058,7 +1065,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* slot leading to a dangling slot on the server.
*/
HOLD_INTERRUPTS();
- walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
+ walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ ,
CRS_USE_SNAPSHOT, origin_startpos);
RESUME_INTERRUPTS();
@@ -1100,7 +1107,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
copy_table(rel);
PopActiveSnapshot();
- res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
(errmsg("table copy could not finish transaction on publisher: %s",
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d9f157172b..1432554d5a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -156,7 +156,7 @@ MemoryContext ApplyContext = NULL;
/* per stream context for streaming transactions */
static MemoryContext LogicalStreamingContext = NULL;
-WalReceiverConn *wrconn = NULL;
+WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
Subscription *MySubscription = NULL;
bool MySubscriptionValid = false;
@@ -2126,7 +2126,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
MemoryContextSwitchTo(ApplyMessageContext);
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
if (len != 0)
{
@@ -2206,7 +2206,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
MemoryContextReset(ApplyMessageContext);
}
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
}
}
@@ -2312,7 +2312,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
}
/* All done */
- walrcv_endstreaming(wrconn, &tli);
+ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
}
/*
@@ -2396,7 +2396,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
LSN_FORMAT_ARGS(writepos),
LSN_FORMAT_ARGS(flushpos));
- walrcv_send(wrconn, reply_message->data, reply_message->len);
+ walrcv_send(LogRepWorkerWalRcvConn,
+ reply_message->data, reply_message->len);
if (recvpos > last_recvpos)
last_recvpos = recvpos;
@@ -3090,9 +3091,9 @@ ApplyWorkerMain(Datum main_arg)
origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();
- wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
- &err);
- if (wrconn == NULL)
+ LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+ MySubscription->name, &err);
+ if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
@@ -3100,7 +3101,7 @@ ApplyWorkerMain(Datum main_arg)
* We don't really use the output identify_system for anything but it
* does some initializations on the upstream so let's still call it.
*/
- (void) walrcv_identify_system(wrconn, &startpointTLI);
+ (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
}
/*
@@ -3116,14 +3117,14 @@ ApplyWorkerMain(Datum main_arg)
options.startpoint = origin_startpos;
options.slotname = myslotname;
options.proto.logical.proto_version =
- walrcv_server_version(wrconn) >= 140000 ?
+ walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
options.proto.logical.streaming = MySubscription->stream;
/* Start normal logical streaming replication. */
- walrcv_startstreaming(wrconn, &options);
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
/* Run the main loop. */
LogicalRepApplyLoop(origin_startpos);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 1cac75e5a9..179eb43900 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -62,7 +62,7 @@ typedef struct LogicalRepWorker
extern MemoryContext ApplyContext;
/* libpqreceiver connection */
-extern struct WalReceiverConn *wrconn;
+extern struct WalReceiverConn *LogRepWorkerWalRcvConn;
/* Worker and subscription objects. */
extern Subscription *MySubscription;
--
2.20.1