Rework subscription-related code for psql and pg_dump

Started by Michael Paquierabout 1 year ago5 messages
#1Michael Paquier
michael@paquier.xyz
1 attachment(s)

Hi all,

While looking at some of the code related to subscriptions in psql,
coming down to make LOGICALREP_STREAM_* available for client code,
I've also noticed quite a few inconsistencies and mistakes with how
pg_subscription is handled in pg_dump, that have accumulated over the
years as of what looks like set of copy-pastos for most of them:
- pg_dump.c includes pg_subscription.h and not pg_subscription_d.h.
This is a mistake as the former should only be included in the backend
code.
- SubscriptionInfo has accumulated dust over the years, using
declarations types that don't map with its catalog. To keep it short
here, all the fields use (char *) leading to more DIY logic in the
code (see two_phase_disabled[]), while most of the fields are booleans
or char values. Switching to char values allows direct comparisons
with the contents of pg_subscription_d.h, leading to more consistent
code.
- Inconsistent position of fields between SubscriptionInfo and the
catalog pg_subscription.

EXPOSE_TO_CLIENT_CODE has been added to pg_subscription.h so as values
for substream, twophasestate and origin can be used directly in psql
and pg_dump, switching these to use pg_subscription_d.h as this is
client code.

While all that addressed, I am finishing with the patch attached.

Thoughts or comments?
--
Michael

Attachments:

0001-Refactor-subscription-code-for-psql-and-pg_dump.patchtext/x-diff; charset=us-asciiDownload
From 097de22e34dec9aab560cad85af48e07d446b432 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Fri, 29 Nov 2024 13:02:29 +0900
Subject: [PATCH] Refactor subscription code for psql and pg_dump

---
 src/include/catalog/pg_subscription.h | 44 +++++++++++++------------
 src/bin/pg_dump/pg_dump.c             | 47 +++++++++++++--------------
 src/bin/pg_dump/pg_dump.h             | 16 ++++-----
 src/bin/psql/describe.c               |  7 ++--
 4 files changed, 58 insertions(+), 56 deletions(-)

diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b25f3fea56..beaff6578a 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -23,26 +23,6 @@
 #include "lib/stringinfo.h"
 #include "nodes/pg_list.h"
 
-/*
- * two_phase tri-state values. See comments atop worker.c to know more about
- * these states.
- */
-#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd'
-#define LOGICALREP_TWOPHASE_STATE_PENDING 'p'
-#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e'
-
-/*
- * The subscription will request the publisher to only send changes that do not
- * have any origin.
- */
-#define LOGICALREP_ORIGIN_NONE "none"
-
-/*
- * The subscription will request the publisher to send changes regardless
- * of their origin.
- */
-#define LOGICALREP_ORIGIN_ANY "any"
-
 /* ----------------
  *		pg_subscription definition. cpp turns this into
  *		typedef struct FormData_pg_subscription
@@ -159,6 +139,28 @@ typedef struct Subscription
 								 * specified origin */
 } Subscription;
 
+#ifdef EXPOSE_TO_CLIENT_CODE
+
+/*
+ * two_phase tri-state values. See comments atop worker.c to know more about
+ * these states.
+ */
+#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd'
+#define LOGICALREP_TWOPHASE_STATE_PENDING 'p'
+#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e'
+
+/*
+ * The subscription will request the publisher to only send changes that do not
+ * have any origin.
+ */
+#define LOGICALREP_ORIGIN_NONE "none"
+
+/*
+ * The subscription will request the publisher to send changes regardless
+ * of their origin.
+ */
+#define LOGICALREP_ORIGIN_ANY "any"
+
 /* Disallow streaming in-progress transactions. */
 #define LOGICALREP_STREAM_OFF 'f'
 
@@ -174,6 +176,8 @@ typedef struct Subscription
  */
 #define LOGICALREP_STREAM_PARALLEL 'p'
 
+#endif							/* EXPOSE_TO_CLIENT_CODE */
+
 extern Subscription *GetSubscription(Oid subid, bool missing_ok);
 extern void FreeSubscription(Subscription *sub);
 extern void DisableSubscription(Oid subid);
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index add7f16c90..ec0cdf4ed7 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -50,7 +50,7 @@
 #include "catalog/pg_default_acl_d.h"
 #include "catalog/pg_largeobject_d.h"
 #include "catalog/pg_proc_d.h"
-#include "catalog/pg_subscription.h"
+#include "catalog/pg_subscription_d.h"
 #include "catalog/pg_type_d.h"
 #include "common/connect.h"
 #include "common/int.h"
@@ -4968,20 +4968,20 @@ getSubscriptions(Archive *fout)
 	i_oid = PQfnumber(res, "oid");
 	i_subname = PQfnumber(res, "subname");
 	i_subowner = PQfnumber(res, "subowner");
+	i_subenabled = PQfnumber(res, "subenabled");
 	i_subbinary = PQfnumber(res, "subbinary");
 	i_substream = PQfnumber(res, "substream");
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
 	i_subrunasowner = PQfnumber(res, "subrunasowner");
+	i_subfailover = PQfnumber(res, "subfailover");
 	i_subconninfo = PQfnumber(res, "subconninfo");
 	i_subslotname = PQfnumber(res, "subslotname");
 	i_subsynccommit = PQfnumber(res, "subsynccommit");
 	i_subpublications = PQfnumber(res, "subpublications");
 	i_suborigin = PQfnumber(res, "suborigin");
 	i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
-	i_subenabled = PQfnumber(res, "subenabled");
-	i_subfailover = PQfnumber(res, "subfailover");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4995,18 +4995,20 @@ getSubscriptions(Archive *fout)
 		subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
 		subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
 
+		subinfo[i].subenabled =
+			(strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0);
 		subinfo[i].subbinary =
-			pg_strdup(PQgetvalue(res, i, i_subbinary));
-		subinfo[i].substream =
-			pg_strdup(PQgetvalue(res, i, i_substream));
-		subinfo[i].subtwophasestate =
-			pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
+			(strcmp(PQgetvalue(res, i, i_subbinary), "t") == 0);
+		subinfo[i].substream = *(PQgetvalue(res, i, i_substream));
+		subinfo[i].subtwophasestate = *(PQgetvalue(res, i, i_subtwophasestate));
 		subinfo[i].subdisableonerr =
-			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
+			(strcmp(PQgetvalue(res, i, i_subdisableonerr), "t") == 0);
 		subinfo[i].subpasswordrequired =
-			pg_strdup(PQgetvalue(res, i, i_subpasswordrequired));
+			(strcmp(PQgetvalue(res, i, i_subpasswordrequired), "t") == 0);
 		subinfo[i].subrunasowner =
-			pg_strdup(PQgetvalue(res, i, i_subrunasowner));
+			(strcmp(PQgetvalue(res, i, i_subrunasowner), "t") == 0);
+		subinfo[i].subfailover =
+			(strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0);
 		subinfo[i].subconninfo =
 			pg_strdup(PQgetvalue(res, i, i_subconninfo));
 		if (PQgetisnull(res, i, i_subslotname))
@@ -5024,10 +5026,6 @@ getSubscriptions(Archive *fout)
 		else
 			subinfo[i].suboriginremotelsn =
 				pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn));
-		subinfo[i].subenabled =
-			pg_strdup(PQgetvalue(res, i, i_subenabled));
-		subinfo[i].subfailover =
-			pg_strdup(PQgetvalue(res, i, i_subfailover));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -5208,7 +5206,6 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	char	  **pubnames = NULL;
 	int			npubnames = 0;
 	int			i;
-	char		two_phase_disabled[] = {LOGICALREP_TWOPHASE_STATE_DISABLED, '\0'};
 
 	/* Do nothing if not dumping schema */
 	if (!dopt->dumpSchema)
@@ -5245,29 +5242,29 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	else
 		appendPQExpBufferStr(query, "NONE");
 
-	if (strcmp(subinfo->subbinary, "t") == 0)
+	if (subinfo->subbinary)
 		appendPQExpBufferStr(query, ", binary = true");
 
-	if (strcmp(subinfo->substream, "t") == 0)
+	if (subinfo->substream == LOGICALREP_STREAM_ON)
 		appendPQExpBufferStr(query, ", streaming = on");
-	else if (strcmp(subinfo->substream, "p") == 0)
+	else if (subinfo->substream == LOGICALREP_STREAM_PARALLEL)
 		appendPQExpBufferStr(query, ", streaming = parallel");
 	else
 		appendPQExpBufferStr(query, ", streaming = off");
 
-	if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
+	if (subinfo->subtwophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED)
 		appendPQExpBufferStr(query, ", two_phase = on");
 
-	if (strcmp(subinfo->subdisableonerr, "t") == 0)
+	if (subinfo->subdisableonerr)
 		appendPQExpBufferStr(query, ", disable_on_error = true");
 
-	if (strcmp(subinfo->subpasswordrequired, "t") != 0)
+	if (!subinfo->subpasswordrequired)
 		appendPQExpBuffer(query, ", password_required = false");
 
-	if (strcmp(subinfo->subrunasowner, "t") == 0)
+	if (subinfo->subrunasowner)
 		appendPQExpBufferStr(query, ", run_as_owner = true");
 
-	if (strcmp(subinfo->subfailover, "t") == 0)
+	if (subinfo->subfailover)
 		appendPQExpBufferStr(query, ", failover = true");
 
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
@@ -5303,7 +5300,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 			appendPQExpBuffer(query, ", '%s');\n", subinfo->suboriginremotelsn);
 		}
 
-		if (strcmp(subinfo->subenabled, "t") == 0)
+		if (subinfo->subenabled)
 		{
 			/*
 			 * Enable the subscription to allow the replication to continue
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index d65f558565..2e55a0e3bb 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -664,20 +664,20 @@ typedef struct _SubscriptionInfo
 {
 	DumpableObject dobj;
 	const char *rolname;
-	char	   *subenabled;
-	char	   *subbinary;
-	char	   *substream;
-	char	   *subtwophasestate;
-	char	   *subdisableonerr;
-	char	   *subpasswordrequired;
-	char	   *subrunasowner;
+	bool		subenabled;
+	bool		subbinary;
+	char		substream;
+	char		subtwophasestate;
+	bool		subdisableonerr;
+	bool		subpasswordrequired;
+	bool		subrunasowner;
+	bool		subfailover;
 	char	   *subconninfo;
 	char	   *subslotname;
 	char	   *subsynccommit;
 	char	   *subpublications;
 	char	   *suborigin;
 	char	   *suboriginremotelsn;
-	char	   *subfailover;
 } SubscriptionInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 0aa39906a1..2657abdc72 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_default_acl_d.h"
 #include "catalog/pg_proc_d.h"
 #include "catalog/pg_statistic_ext_d.h"
+#include "catalog/pg_subscription_d.h"
 #include "catalog/pg_type_d.h"
 #include "common.h"
 #include "common/logging.h"
@@ -6679,9 +6680,9 @@ describeSubscriptions(const char *pattern, bool verbose)
 			if (pset.sversion >= 160000)
 				appendPQExpBuffer(&buf,
 								  ", (CASE substream\n"
-								  "    WHEN 'f' THEN 'off'\n"
-								  "    WHEN 't' THEN 'on'\n"
-								  "    WHEN 'p' THEN 'parallel'\n"
+								  "    WHEN " CppAsString2(LOGICALREP_STREAM_OFF) " THEN 'off'\n"
+								  "    WHEN " CppAsString2(LOGICALREP_STREAM_ON) " THEN 'on'\n"
+								  "    WHEN " CppAsString2(LOGICALREP_STREAM_PARALLEL) " THEN 'parallel'\n"
 								  "   END) AS \"%s\"\n",
 								  gettext_noop("Streaming"));
 			else
-- 
2.45.2

#2Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Michael Paquier (#1)
RE: Rework subscription-related code for psql and pg_dump

Dear Michael,

Thanks for working on this.
It was not good to follow existing codes without confirmation :-(.

I grepped files in bin/ and could not find lines which includes catalog/pg_xxx.h files.
(One exception is pg_control.h. It is not a catalog-header but has the same prefix.)
The patch basically LGTM.

One comment...
In describeSubscriptions(), only the streaming option is represented as {off, on, parallel},
whereas twophase option is {d, p, e}.
I feel it is bit strange so we can fix to show like {disabled, pending, enabled} by the same approach.
Thought?

Best regards,
Hayato Kuroda
FUJITSU LIMITED

#3Michael Paquier
michael@paquier.xyz
In reply to: Hayato Kuroda (Fujitsu) (#2)
Re: Rework subscription-related code for psql and pg_dump

On Fri, Nov 29, 2024 at 05:42:13AM +0000, Hayato Kuroda (Fujitsu) wrote:

It was not good to follow existing codes without confirmation :-(.

No problem.

I grepped files in bin/ and could not find lines which includes catalog/pg_xxx.h files.
(One exception is pg_control.h. It is not a catalog-header but has the same prefix.)
The patch basically LGTM.

Thanks for double-checking.

Note also the tweak on top of pg_resetwal.c with its "#define FRONTEND
1" while declaring postgres.h. This is also one of the historical fun
things with the frontend code. Perhaps we'll have a cleaner split at
some point in the future.

One comment...
In describeSubscriptions(), only the streaming option is represented as {off, on, parallel},
whereas twophase option is {d, p, e}.
I feel it is bit strange so we can fix to show like {disabled,
pending, enabled} by the same approach.

-   if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
+   if (subinfo->subtwophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED)
        appendPQExpBufferStr(query, ", two_phase = on");

I'm not feeling strongly either way. The code intentionally wants to
set two_phase to "on" if the catalog state is "pending" or "on", so
sticking with the current assumption of the code and keeping it as
proposed in the patch is fine, IMO.
--
Michael

#4Michael Paquier
michael@paquier.xyz
In reply to: Michael Paquier (#3)
Re: Rework subscription-related code for psql and pg_dump

On Sat, Nov 30, 2024 at 12:54:19PM +0900, Michael Paquier wrote:

-   if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
+   if (subinfo->subtwophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED)
appendPQExpBufferStr(query, ", two_phase = on");

I'm not feeling strongly either way. The code intentionally wants to
set two_phase to "on" if the catalog state is "pending" or "on", so
sticking with the current assumption of the code and keeping it as
proposed in the patch is fine, IMO.

I've looked at that again and let this as-is, then applied all this
cleanup.
--
Michael

#5Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Michael Paquier (#4)
RE: Rework subscription-related code for psql and pg_dump

Dear Michael,

I've looked at that again and let this as-is, then applied all this
cleanup.

I missed your e-mail but it's ok to keep the output style for two_phase.
Thanks a lot for working on it!

Best regards,
Hayato Kuroda
FUJITSU LIMITED