From bff45768b066ccf94de6dae9f687cf54212900b1 Mon Sep 17 00:00:00 2001
From: Japin Li <japinli@hotmail.com>
Date: Tue, 26 Jan 2021 15:43:52 +0800
Subject: [PATCH v3 1/4] Introduce a new syntax to add/drop publications

At present, if we want to update publications in subscription, we can
use SET PUBLICATION, however, it requires supply all publications that
exists and the new publications if we want to add new publications, it's
inconvenient.  The new syntax only supply the new publications.  When
the refresh is true, it only refresh the new publications.
---
 src/backend/commands/subscriptioncmds.c | 121 ++++++++++++++++++++++++
 src/backend/parser/gram.y               |  20 ++++
 src/include/nodes/parsenodes.h          |   2 +
 3 files changed, 143 insertions(+)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 082f7855b8..88fa7f1b3f 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -46,6 +46,8 @@
 #include "utils/syscache.h"
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *merge_subpublications(HeapTuple tuple, TupleDesc tupledesc,
+								   List *publications, bool addpub);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -857,6 +859,51 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				break;
 			}
 
+		case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
+		case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
+			{
+				bool	copy_data = false;
+				bool	isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
+				bool	refresh;
+				List   *publist = NIL;
+
+				publist = merge_subpublications(tup, RelationGetDescr(rel),
+												stmt->publication, isadd);
+
+				parse_subscription_options(stmt->options,
+										   NULL,	/* no "connect" */
+										   NULL, NULL,	/* no "enabled" */
+										   NULL,	/* no "create_slot" */
+										   NULL, NULL,	/* no "slot_name" */
+										   isadd ? &copy_data : NULL,
+										   NULL,	/* no "synchronous_commit" */
+										   &refresh,
+										   NULL, NULL,	/* no "binary" */
+										   NULL, NULL); /* no "streaming" */
+				values[Anum_pg_subscription_subpublications - 1] =
+					publicationListToArray(publist);
+				replaces[Anum_pg_subscription_subpublications - 1] = true;
+
+				update_tuple = true;
+
+				/* Refresh if user asked us to. */
+				if (refresh)
+				{
+					if (!sub->enabled)
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
+								 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
+
+					/* Only refresh the added/dropped list of publications. */
+					sub->publications = stmt->publication;
+
+					AlterSubscription_refresh(sub, copy_data);
+				}
+
+				break;
+			}
+
 		case ALTER_SUBSCRIPTION_REFRESH:
 			{
 				bool		copy_data;
@@ -1278,3 +1325,77 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 
 	return tablelist;
 }
+
+/*
+ * Merge current subscription's publications and user specified publications
+ * by ADD/DROP PUBLICATIONS.
+ *
+ * If isadd == true, we will add the list of publications into current
+ * subscription's publications.  Otherwise, we will delete the list of
+ * publications from current subscription's publications.
+ */
+static List *
+merge_subpublications(HeapTuple tuple, TupleDesc tupledesc,
+					  List *newpublist, bool isadd)
+{
+	int		 i;
+	int		 npublications;
+	Datum	*publications;
+	bool	 nulls[Natts_pg_subscription];
+	Datum	 values[Natts_pg_subscription];
+	List	*publist = NIL;
+	ListCell	*lc;
+	ArrayType	*array;
+
+	/* deconstruct the subpublications */
+	heap_deform_tuple(tuple, tupledesc, values, nulls);
+	array = DatumGetArrayTypeP(values[Anum_pg_subscription_subpublications - 1]);
+	deconstruct_array(array, TEXTOID, -1, false, TYPALIGN_INT,
+					  &publications, NULL, &npublications);
+
+	for (i = 0; i < npublications; i++)
+		publist = lappend(publist,
+						  makeString(TextDatumGetCString((publications[i]))));
+
+	foreach(lc, newpublist)
+	{
+		char		*name = strVal(lfirst(lc));
+		ListCell	*cell = NULL;
+
+		foreach(cell, publist)
+		{
+			char	*pubname = strVal(lfirst(cell));
+
+			if (strcmp(name, pubname) == 0)
+			{
+				if (isadd)
+				{
+					ereport(ERROR,
+							(errcode(ERRCODE_SYNTAX_ERROR),
+							 errmsg("publication name \"%s\" is already in subscription",
+									name)));
+				}
+				else
+				{
+					publist = list_delete_cell(publist, cell);
+					break;
+				}
+			}
+		}
+
+		if (isadd)
+			publist = lappend(publist, makeString(name));
+		else if (cell == NULL)
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("publication name \"%s\" do not in subscription",
+							name)));
+	}
+
+	if (publist == NIL)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("subscription must contain at least one publication")));
+
+	return publist;
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 7574d545e0..d20e513518 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9615,6 +9615,26 @@ AlterSubscriptionStmt:
 					n->options = $7;
 					$$ = (Node *)n;
 				}
+			| ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+					n->kind = ALTER_SUBSCRIPTION_ADD_PUBLICATION;
+					n->subname = $3;
+					n->publication = $6;
+					n->options = $7;
+					$$ = (Node *)n;
+				}
+			| ALTER SUBSCRIPTION name DROP PUBLICATION name_list opt_definition
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+					n->kind = ALTER_SUBSCRIPTION_DROP_PUBLICATION;
+					n->subname = $3;
+					n->publication = $6;
+					n->options = $7;
+					$$ = (Node *)n;
+				}
 			| ALTER SUBSCRIPTION name ENABLE_P
 				{
 					AlterSubscriptionStmt *n =
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index dc2bb40926..9148ca9888 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3553,6 +3553,8 @@ typedef enum AlterSubscriptionType
 	ALTER_SUBSCRIPTION_OPTIONS,
 	ALTER_SUBSCRIPTION_CONNECTION,
 	ALTER_SUBSCRIPTION_PUBLICATION,
+	ALTER_SUBSCRIPTION_ADD_PUBLICATION,
+	ALTER_SUBSCRIPTION_DROP_PUBLICATION,
 	ALTER_SUBSCRIPTION_REFRESH,
 	ALTER_SUBSCRIPTION_ENABLED
 } AlterSubscriptionType;
-- 
2.30.0

