Rework subscription-related code for psql and pg_dump
Hi all,
While looking at some of the code related to subscriptions in psql,
coming down to make LOGICALREP_STREAM_* available for client code,
I've also noticed quite a few inconsistencies and mistakes with how
pg_subscription is handled in pg_dump, that have accumulated over the
years as of what looks like set of copy-pastos for most of them:
- pg_dump.c includes pg_subscription.h and not pg_subscription_d.h.
This is a mistake as the former should only be included in the backend
code.
- SubscriptionInfo has accumulated dust over the years, using
declarations types that don't map with its catalog. To keep it short
here, all the fields use (char *) leading to more DIY logic in the
code (see two_phase_disabled[]), while most of the fields are booleans
or char values. Switching to char values allows direct comparisons
with the contents of pg_subscription_d.h, leading to more consistent
code.
- Inconsistent position of fields between SubscriptionInfo and the
catalog pg_subscription.
EXPOSE_TO_CLIENT_CODE has been added to pg_subscription.h so as values
for substream, twophasestate and origin can be used directly in psql
and pg_dump, switching these to use pg_subscription_d.h as this is
client code.
While all that addressed, I am finishing with the patch attached.
Thoughts or comments?
--
Michael
Attachments:
0001-Refactor-subscription-code-for-psql-and-pg_dump.patchtext/x-diff; charset=us-asciiDownload
From 097de22e34dec9aab560cad85af48e07d446b432 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Fri, 29 Nov 2024 13:02:29 +0900
Subject: [PATCH] Refactor subscription code for psql and pg_dump
---
src/include/catalog/pg_subscription.h | 44 +++++++++++++------------
src/bin/pg_dump/pg_dump.c | 47 +++++++++++++--------------
src/bin/pg_dump/pg_dump.h | 16 ++++-----
src/bin/psql/describe.c | 7 ++--
4 files changed, 58 insertions(+), 56 deletions(-)
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b25f3fea56..beaff6578a 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -23,26 +23,6 @@
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
-/*
- * two_phase tri-state values. See comments atop worker.c to know more about
- * these states.
- */
-#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd'
-#define LOGICALREP_TWOPHASE_STATE_PENDING 'p'
-#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e'
-
-/*
- * The subscription will request the publisher to only send changes that do not
- * have any origin.
- */
-#define LOGICALREP_ORIGIN_NONE "none"
-
-/*
- * The subscription will request the publisher to send changes regardless
- * of their origin.
- */
-#define LOGICALREP_ORIGIN_ANY "any"
-
/* ----------------
* pg_subscription definition. cpp turns this into
* typedef struct FormData_pg_subscription
@@ -159,6 +139,28 @@ typedef struct Subscription
* specified origin */
} Subscription;
+#ifdef EXPOSE_TO_CLIENT_CODE
+
+/*
+ * two_phase tri-state values. See comments atop worker.c to know more about
+ * these states.
+ */
+#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd'
+#define LOGICALREP_TWOPHASE_STATE_PENDING 'p'
+#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e'
+
+/*
+ * The subscription will request the publisher to only send changes that do not
+ * have any origin.
+ */
+#define LOGICALREP_ORIGIN_NONE "none"
+
+/*
+ * The subscription will request the publisher to send changes regardless
+ * of their origin.
+ */
+#define LOGICALREP_ORIGIN_ANY "any"
+
/* Disallow streaming in-progress transactions. */
#define LOGICALREP_STREAM_OFF 'f'
@@ -174,6 +176,8 @@ typedef struct Subscription
*/
#define LOGICALREP_STREAM_PARALLEL 'p'
+#endif /* EXPOSE_TO_CLIENT_CODE */
+
extern Subscription *GetSubscription(Oid subid, bool missing_ok);
extern void FreeSubscription(Subscription *sub);
extern void DisableSubscription(Oid subid);
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index add7f16c90..ec0cdf4ed7 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -50,7 +50,7 @@
#include "catalog/pg_default_acl_d.h"
#include "catalog/pg_largeobject_d.h"
#include "catalog/pg_proc_d.h"
-#include "catalog/pg_subscription.h"
+#include "catalog/pg_subscription_d.h"
#include "catalog/pg_type_d.h"
#include "common/connect.h"
#include "common/int.h"
@@ -4968,20 +4968,20 @@ getSubscriptions(Archive *fout)
i_oid = PQfnumber(res, "oid");
i_subname = PQfnumber(res, "subname");
i_subowner = PQfnumber(res, "subowner");
+ i_subenabled = PQfnumber(res, "subenabled");
i_subbinary = PQfnumber(res, "subbinary");
i_substream = PQfnumber(res, "substream");
i_subtwophasestate = PQfnumber(res, "subtwophasestate");
i_subdisableonerr = PQfnumber(res, "subdisableonerr");
i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
i_subrunasowner = PQfnumber(res, "subrunasowner");
+ i_subfailover = PQfnumber(res, "subfailover");
i_subconninfo = PQfnumber(res, "subconninfo");
i_subslotname = PQfnumber(res, "subslotname");
i_subsynccommit = PQfnumber(res, "subsynccommit");
i_subpublications = PQfnumber(res, "subpublications");
i_suborigin = PQfnumber(res, "suborigin");
i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
- i_subenabled = PQfnumber(res, "subenabled");
- i_subfailover = PQfnumber(res, "subfailover");
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
@@ -4995,18 +4995,20 @@ getSubscriptions(Archive *fout)
subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
+ subinfo[i].subenabled =
+ (strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0);
subinfo[i].subbinary =
- pg_strdup(PQgetvalue(res, i, i_subbinary));
- subinfo[i].substream =
- pg_strdup(PQgetvalue(res, i, i_substream));
- subinfo[i].subtwophasestate =
- pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
+ (strcmp(PQgetvalue(res, i, i_subbinary), "t") == 0);
+ subinfo[i].substream = *(PQgetvalue(res, i, i_substream));
+ subinfo[i].subtwophasestate = *(PQgetvalue(res, i, i_subtwophasestate));
subinfo[i].subdisableonerr =
- pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
+ (strcmp(PQgetvalue(res, i, i_subdisableonerr), "t") == 0);
subinfo[i].subpasswordrequired =
- pg_strdup(PQgetvalue(res, i, i_subpasswordrequired));
+ (strcmp(PQgetvalue(res, i, i_subpasswordrequired), "t") == 0);
subinfo[i].subrunasowner =
- pg_strdup(PQgetvalue(res, i, i_subrunasowner));
+ (strcmp(PQgetvalue(res, i, i_subrunasowner), "t") == 0);
+ subinfo[i].subfailover =
+ (strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0);
subinfo[i].subconninfo =
pg_strdup(PQgetvalue(res, i, i_subconninfo));
if (PQgetisnull(res, i, i_subslotname))
@@ -5024,10 +5026,6 @@ getSubscriptions(Archive *fout)
else
subinfo[i].suboriginremotelsn =
pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn));
- subinfo[i].subenabled =
- pg_strdup(PQgetvalue(res, i, i_subenabled));
- subinfo[i].subfailover =
- pg_strdup(PQgetvalue(res, i, i_subfailover));
/* Decide whether we want to dump it */
selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -5208,7 +5206,6 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
char **pubnames = NULL;
int npubnames = 0;
int i;
- char two_phase_disabled[] = {LOGICALREP_TWOPHASE_STATE_DISABLED, '\0'};
/* Do nothing if not dumping schema */
if (!dopt->dumpSchema)
@@ -5245,29 +5242,29 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
else
appendPQExpBufferStr(query, "NONE");
- if (strcmp(subinfo->subbinary, "t") == 0)
+ if (subinfo->subbinary)
appendPQExpBufferStr(query, ", binary = true");
- if (strcmp(subinfo->substream, "t") == 0)
+ if (subinfo->substream == LOGICALREP_STREAM_ON)
appendPQExpBufferStr(query, ", streaming = on");
- else if (strcmp(subinfo->substream, "p") == 0)
+ else if (subinfo->substream == LOGICALREP_STREAM_PARALLEL)
appendPQExpBufferStr(query, ", streaming = parallel");
else
appendPQExpBufferStr(query, ", streaming = off");
- if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
+ if (subinfo->subtwophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED)
appendPQExpBufferStr(query, ", two_phase = on");
- if (strcmp(subinfo->subdisableonerr, "t") == 0)
+ if (subinfo->subdisableonerr)
appendPQExpBufferStr(query, ", disable_on_error = true");
- if (strcmp(subinfo->subpasswordrequired, "t") != 0)
+ if (!subinfo->subpasswordrequired)
appendPQExpBuffer(query, ", password_required = false");
- if (strcmp(subinfo->subrunasowner, "t") == 0)
+ if (subinfo->subrunasowner)
appendPQExpBufferStr(query, ", run_as_owner = true");
- if (strcmp(subinfo->subfailover, "t") == 0)
+ if (subinfo->subfailover)
appendPQExpBufferStr(query, ", failover = true");
if (strcmp(subinfo->subsynccommit, "off") != 0)
@@ -5303,7 +5300,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBuffer(query, ", '%s');\n", subinfo->suboriginremotelsn);
}
- if (strcmp(subinfo->subenabled, "t") == 0)
+ if (subinfo->subenabled)
{
/*
* Enable the subscription to allow the replication to continue
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index d65f558565..2e55a0e3bb 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -664,20 +664,20 @@ typedef struct _SubscriptionInfo
{
DumpableObject dobj;
const char *rolname;
- char *subenabled;
- char *subbinary;
- char *substream;
- char *subtwophasestate;
- char *subdisableonerr;
- char *subpasswordrequired;
- char *subrunasowner;
+ bool subenabled;
+ bool subbinary;
+ char substream;
+ char subtwophasestate;
+ bool subdisableonerr;
+ bool subpasswordrequired;
+ bool subrunasowner;
+ bool subfailover;
char *subconninfo;
char *subslotname;
char *subsynccommit;
char *subpublications;
char *suborigin;
char *suboriginremotelsn;
- char *subfailover;
} SubscriptionInfo;
/*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 0aa39906a1..2657abdc72 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -25,6 +25,7 @@
#include "catalog/pg_default_acl_d.h"
#include "catalog/pg_proc_d.h"
#include "catalog/pg_statistic_ext_d.h"
+#include "catalog/pg_subscription_d.h"
#include "catalog/pg_type_d.h"
#include "common.h"
#include "common/logging.h"
@@ -6679,9 +6680,9 @@ describeSubscriptions(const char *pattern, bool verbose)
if (pset.sversion >= 160000)
appendPQExpBuffer(&buf,
", (CASE substream\n"
- " WHEN 'f' THEN 'off'\n"
- " WHEN 't' THEN 'on'\n"
- " WHEN 'p' THEN 'parallel'\n"
+ " WHEN " CppAsString2(LOGICALREP_STREAM_OFF) " THEN 'off'\n"
+ " WHEN " CppAsString2(LOGICALREP_STREAM_ON) " THEN 'on'\n"
+ " WHEN " CppAsString2(LOGICALREP_STREAM_PARALLEL) " THEN 'parallel'\n"
" END) AS \"%s\"\n",
gettext_noop("Streaming"));
else
--
2.45.2
Dear Michael,
Thanks for working on this.
It was not good to follow existing codes without confirmation :-(.
I grepped files in bin/ and could not find lines which includes catalog/pg_xxx.h files.
(One exception is pg_control.h. It is not a catalog-header but has the same prefix.)
The patch basically LGTM.
One comment...
In describeSubscriptions(), only the streaming option is represented as {off, on, parallel},
whereas twophase option is {d, p, e}.
I feel it is bit strange so we can fix to show like {disabled, pending, enabled} by the same approach.
Thought?
Best regards,
Hayato Kuroda
FUJITSU LIMITED
On Fri, Nov 29, 2024 at 05:42:13AM +0000, Hayato Kuroda (Fujitsu) wrote:
It was not good to follow existing codes without confirmation :-(.
No problem.
I grepped files in bin/ and could not find lines which includes catalog/pg_xxx.h files.
(One exception is pg_control.h. It is not a catalog-header but has the same prefix.)
The patch basically LGTM.
Thanks for double-checking.
Note also the tweak on top of pg_resetwal.c with its "#define FRONTEND
1" while declaring postgres.h. This is also one of the historical fun
things with the frontend code. Perhaps we'll have a cleaner split at
some point in the future.
One comment...
In describeSubscriptions(), only the streaming option is represented as {off, on, parallel},
whereas twophase option is {d, p, e}.
I feel it is bit strange so we can fix to show like {disabled,
pending, enabled} by the same approach.
- if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
+ if (subinfo->subtwophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED)
appendPQExpBufferStr(query, ", two_phase = on");
I'm not feeling strongly either way. The code intentionally wants to
set two_phase to "on" if the catalog state is "pending" or "on", so
sticking with the current assumption of the code and keeping it as
proposed in the patch is fine, IMO.
--
Michael
On Sat, Nov 30, 2024 at 12:54:19PM +0900, Michael Paquier wrote:
- if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0) + if (subinfo->subtwophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED) appendPQExpBufferStr(query, ", two_phase = on");I'm not feeling strongly either way. The code intentionally wants to
set two_phase to "on" if the catalog state is "pending" or "on", so
sticking with the current assumption of the code and keeping it as
proposed in the patch is fine, IMO.
I've looked at that again and let this as-is, then applied all this
cleanup.
--
Michael