From 5af6cc67938b31b39fa1998a10a9c7f1bdd8fb0e Mon Sep 17 00:00:00 2001
From: Japin Li <japinli@hotmail.com>
Date: Tue, 26 Jan 2021 15:43:52 +0800
Subject: [PATCH v2 1/4] Add ALTER SUBSCRIPTION...ADD/DROP PUBLICATION...
 syntax

---
 src/backend/commands/subscriptioncmds.c | 119 ++++++++++++++++++++++++
 src/backend/parser/gram.y               |  20 ++++
 src/include/nodes/parsenodes.h          |   2 +
 3 files changed, 141 insertions(+)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 082f7855b8..07167c9a8b 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -46,6 +46,8 @@
 #include "utils/syscache.h"
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *merge_subpublications(HeapTuple tuple, TupleDesc tupledesc,
+								   List *publications, bool addpub);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -857,6 +859,51 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				break;
 			}
 
+		case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
+		case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
+			{
+				bool		copy_data;
+				bool		refresh;
+				List	   *publications = NIL;
+
+				publications = merge_subpublications(tup, RelationGetDescr(rel),
+													 stmt->publication,
+													 stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION);
+
+				parse_subscription_options(stmt->options,
+										   NULL,	/* no "connect" */
+										   NULL, NULL,	/* no "enabled" */
+										   NULL,	/* no "create_slot" */
+										   NULL, NULL,	/* no "slot_name" */
+										   &copy_data,
+										   NULL,	/* no "synchronous_commit" */
+										   &refresh,
+										   NULL, NULL,	/* no "binary" */
+										   NULL, NULL); /* no "streaming" */
+				values[Anum_pg_subscription_subpublications - 1] =
+					publicationListToArray(publications);
+				replaces[Anum_pg_subscription_subpublications - 1] = true;
+
+				update_tuple = true;
+
+				/* Refresh if user asked us to. */
+				if (refresh)
+				{
+					if (!sub->enabled)
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
+								 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
+
+					/* Make sure refresh sees the new list of publications. */
+					sub->publications = publications;
+
+					AlterSubscription_refresh(sub, copy_data);
+				}
+
+				break;
+			}
+
 		case ALTER_SUBSCRIPTION_REFRESH:
 			{
 				bool		copy_data;
@@ -1278,3 +1325,75 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 
 	return tablelist;
 }
+
+/*
+ * merge current subpublications and user specified by add/drop publications.
+ *
+ * If addpub == true, we will add the list of publications into current
+ * subpublications.  Otherwise, we will delete the list of publications from
+ * current subpublications.
+ */
+static List *
+merge_subpublications(HeapTuple tuple, TupleDesc tupledesc,
+					  List *publications, bool addpub)
+{
+	int		 i;
+	int		 noldpublications;
+	Datum	*oldpublications;
+	bool	 nulls[Natts_pg_subscription];
+	Datum	 values[Natts_pg_subscription];
+	List	*merged = NIL;
+	ListCell	*lc;
+	ArrayType	*array;
+
+	/* deconstruct the subpublications */
+	heap_deform_tuple(tuple, tupledesc, values, nulls);
+	array = DatumGetArrayTypeP(values[Anum_pg_subscription_subpublications - 1]);
+	deconstruct_array(array, TEXTOID, -1, false, TYPALIGN_INT,
+					  &oldpublications, NULL, &noldpublications);
+
+	for (i = 0; i < noldpublications; i++)
+		merged = lappend(merged, makeString(TextDatumGetCString((oldpublications[i]))));
+
+	foreach(lc, publications)
+	{
+		char		*name = strVal(lfirst(lc));
+		ListCell	*cell = NULL;
+
+		foreach(cell, merged)
+		{
+			char	*subpub = strVal(lfirst(cell));
+
+			if (strcmp(name, subpub) == 0)
+			{
+				if (addpub)
+				{
+					ereport(ERROR,
+							(errcode(ERRCODE_SYNTAX_ERROR),
+							 errmsg("publication name \"%s\" is already in subscription",
+									name)));
+				}
+				else
+				{
+					merged = list_delete_cell(merged, cell);
+					break;
+				}
+			}
+		}
+
+		if (addpub)
+			merged = lappend(merged, makeString(name));
+		else if (cell == NULL)
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("publication name \"%s\" do not in subscription",
+							name)));
+	}
+
+	if (merged == NIL)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("subscription must contain at least one publication")));
+
+	return merged;
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 7574d545e0..d20e513518 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9615,6 +9615,26 @@ AlterSubscriptionStmt:
 					n->options = $7;
 					$$ = (Node *)n;
 				}
+			| ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+					n->kind = ALTER_SUBSCRIPTION_ADD_PUBLICATION;
+					n->subname = $3;
+					n->publication = $6;
+					n->options = $7;
+					$$ = (Node *)n;
+				}
+			| ALTER SUBSCRIPTION name DROP PUBLICATION name_list opt_definition
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+					n->kind = ALTER_SUBSCRIPTION_DROP_PUBLICATION;
+					n->subname = $3;
+					n->publication = $6;
+					n->options = $7;
+					$$ = (Node *)n;
+				}
 			| ALTER SUBSCRIPTION name ENABLE_P
 				{
 					AlterSubscriptionStmt *n =
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index dc2bb40926..9148ca9888 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3553,6 +3553,8 @@ typedef enum AlterSubscriptionType
 	ALTER_SUBSCRIPTION_OPTIONS,
 	ALTER_SUBSCRIPTION_CONNECTION,
 	ALTER_SUBSCRIPTION_PUBLICATION,
+	ALTER_SUBSCRIPTION_ADD_PUBLICATION,
+	ALTER_SUBSCRIPTION_DROP_PUBLICATION,
 	ALTER_SUBSCRIPTION_REFRESH,
 	ALTER_SUBSCRIPTION_ENABLED
 } AlterSubscriptionType;
-- 
2.30.0

