From 848ad7383cede7600ae3fca07440e3f2441ac934 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Wed, 22 Jan 2020 12:51:28 -0300
Subject: [PATCH 2/2] =?UTF-8?q?Changes=20by=20=C3=81lvaro?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 doc/src/sgml/config.sgml                    | 21 -------
 src/backend/commands/subscriptioncmds.c     | 62 +++++++++++++--------
 src/backend/replication/pgoutput/pgoutput.c | 23 +++-----
 src/bin/psql/describe.c                     |  4 +-
 src/include/catalog/pg_subscription.h       |  1 -
 5 files changed, 52 insertions(+), 59 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 163cc77d1d..3ccacd528b 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1751,27 +1751,6 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
-     <varlistentry id="guc-logical-decoding-work-mem" xreflabel="logical_decoding_work_mem">
-      <term><varname>logical_decoding_work_mem</varname> (<type>integer</type>)
-      <indexterm>
-       <primary><varname>logical_decoding_work_mem</varname> configuration parameter</primary>
-      </indexterm>
-      </term>
-      <listitem>
-       <para>
-        Specifies the maximum amount of memory to be used by logical decoding,
-        before some of the decoded changes are either written to local disk.
-        This limits the amount of memory used by logical streaming replication
-        connections. It defaults to 64 megabytes (<literal>64MB</literal>).
-        Since each replication connection only uses a single buffer of this size,
-        and an installation normally doesn't have many such connections
-        concurrently (as limited by <varname>max_wal_senders</varname>), it's
-        safe to set this value significantly higher than <varname>work_mem</varname>,
-        reducing the amount of decoded changes written to disk.
-       </para>
-      </listitem>
-     </varlistentry>
-
      <varlistentry id="guc-max-stack-depth" xreflabel="max_stack_depth">
       <term><varname>max_stack_depth</varname> (<type>integer</type>)
       <indexterm>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index c50e854e96..7920e75bfa 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -54,12 +54,13 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
  * accommodate that.
  */
 static void
-parse_subscription_options(List *options, bool *connect, bool *enabled_given,
-						   bool *enabled, bool *create_slot,
+parse_subscription_options(List *options, bool *connect,
+						   bool *enabled_given, bool *enabled,
+						   bool *create_slot,
 						   bool *slot_name_given, char **slot_name,
+						   bool *logical_wm_given, int *logical_wm,
 						   bool *copy_data, char **synchronous_commit,
-						   bool *refresh, int *logical_wm,
-						   bool *logical_wm_given)
+						   bool *refresh)
 {
 	ListCell   *lc;
 	bool		connect_given = false;
@@ -177,15 +178,25 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 			refresh_given = true;
 			*refresh = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "work_mem") == 0 && logical_wm)
+		else if (strcmp(defel->defname, "work_mem") == 0)
 		{
+			if (!logical_wm)
+				elog(ERROR, "option \"work_mem\" not valid in this context");
+
 			if (*logical_wm_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
+			/* Test if the value is valid for logical_decoding_work_mem */
+			(void) set_config_option("logical_decoding_work_mem", defGetString(defel),
+									 PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
+									 false, 0, false);
+			if (!parse_int(defGetString(defel), logical_wm,
+						   GUC_UNIT_KB, NULL))
+				elog(ERROR, "parse_int failed");	/* shouldn't happen */
 			*logical_wm_given = true;
-			*logical_wm = defGetInt32(defel);
+
 		}
 		else
 			ereport(ERROR,
@@ -345,10 +356,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	 *
 	 * Connection and publication should not be specified here.
 	 */
-	parse_subscription_options(stmt->options, &connect, &enabled_given,
-							   &enabled, &create_slot, &slotname_given,
-							   &slotname, &copy_data, &synchronous_commit,
-							   NULL, &logical_wm, &logical_wm_given);
+	parse_subscription_options(stmt->options, &connect,
+							   &enabled_given, &enabled,
+							   &create_slot, &slotname_given, &slotname,
+							   &logical_wm_given, &logical_wm,
+							   &copy_data, &synchronous_commit, NULL);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -692,10 +704,11 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				int			logical_wm;
 				bool		logical_wm_given;
 
-				parse_subscription_options(stmt->options, NULL, NULL, NULL,
+				parse_subscription_options(stmt->options, NULL,
+										   NULL, NULL,	/* enabled */
 										   NULL, &slotname_given, &slotname,
-										   NULL, &synchronous_commit, NULL,
-										   &logical_wm, &logical_wm_given);
+										   &logical_wm_given, &logical_wm,
+										   NULL, &synchronous_commit, NULL);
 
 				if (slotname_given)
 				{
@@ -737,9 +750,10 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 							enabled_given;
 
 				parse_subscription_options(stmt->options, NULL,
-										   &enabled_given, &enabled, NULL,
-										   NULL, NULL, NULL, NULL, NULL,
-										   NULL, NULL);
+										   &enabled_given, &enabled,
+										   NULL, NULL, NULL,	/* slot */
+										   NULL, NULL,	/* logical wm */
+										   NULL, NULL, NULL);
 				Assert(enabled_given);
 
 				if (!sub->slotname && enabled)
@@ -775,9 +789,11 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				bool		copy_data;
 				bool		refresh;
 
-				parse_subscription_options(stmt->options, NULL, NULL, NULL,
-										   NULL, NULL, NULL, &copy_data,
-										   NULL, &refresh, NULL, NULL);
+				parse_subscription_options(stmt->options, NULL,
+										   NULL, NULL,	/* enabled */
+										   NULL, NULL, NULL,	/* slot */
+										   NULL, NULL,	/* logical wm */
+										   &copy_data, NULL, &refresh);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
@@ -812,9 +828,11 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 							(errcode(ERRCODE_SYNTAX_ERROR),
 							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
-				parse_subscription_options(stmt->options, NULL, NULL, NULL,
-										   NULL, NULL, NULL, &copy_data,
-										   NULL, NULL, NULL, NULL);
+				parse_subscription_options(stmt->options, NULL,
+										   NULL, NULL,	/* enabled */
+										   NULL, NULL, NULL,	/* slot */
+										   NULL, NULL,	/* logical wm */
+										   &copy_data, NULL, NULL);
 
 				AlterSubscription_refresh(sub, copy_data);
 
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 536722b32f..d243d90821 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "catalog/pg_publication.h"
+#include "commands/defrem.h"
 #include "fmgr.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
@@ -141,26 +142,20 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 		}
 		else if (strcmp(defel->defname, "work_mem") == 0)
 		{
-			int64	parsed;
-
 			if (work_mem_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 			work_mem_given = true;
+			/* Test if the value is valid for logical_decoding_work_mem */
+			(void) set_config_option("logical_decoding_work_mem", defGetString(defel),
+									 PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
+									 false, 0, false);
 
-			if (!scanint8(strVal(defel->arg), true, &parsed))
-				ereport(ERROR,
-						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-						 errmsg("invalid work_mem")));
-
-			if (parsed > PG_INT32_MAX || parsed < 64)
-				ereport(ERROR,
-						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-						 errmsg("work_mem \"%s\" out of range",
-								strVal(defel->arg))));
-
-			*logical_decoding_work_mem = (int)parsed;
+			/* by here it must be valid, so this shouldn't fail */
+			if (!parse_int(defGetString(defel), logical_decoding_work_mem,
+						   GUC_UNIT_KB, NULL))
+				elog(ERROR, "parse_int failed");	/* shouldn't happen */
 		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index f3c7eb96fa..956ad41f56 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5933,7 +5933,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false};
+	false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -5961,8 +5961,10 @@ describeSubscriptions(const char *pattern, bool verbose)
 	{
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
+						  ",  pg_catalog.pg_size_pretty(subworkmem::bigint * 1024) AS \"%s\"\n"
 						  ",  subconninfo AS \"%s\"\n",
 						  gettext_noop("Synchronous commit"),
+						  gettext_noop("Working Memory"),
 						  gettext_noop("Conninfo"));
 	}
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 3394379f86..eef585b0e5 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -47,7 +47,6 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	bool		subenabled;		/* True if the subscription is enabled (the
 								 * worker should be running) */
-
 	int32		subworkmem;		/* Memory to use to decode changes. */
 
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
-- 
2.20.1

