From 0030521b178b80b1de26afd8a71cb7b741511b01 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Mon, 19 Feb 2024 10:23:05 +0530
Subject: [PATCH v1 2/2] Added an option skip_not_exist_publication which will
 skip loading the publication, if the publication does not exist.

Added an option skip_not_exist_publication which will skip loading the
publication, if the publication does not exist.
---
 src/backend/catalog/pg_subscription.c         |  1 +
 src/backend/catalog/system_views.sql          |  3 +-
 src/backend/commands/subscriptioncmds.c       | 33 ++++++++++++++++++-
 .../libpqwalreceiver/libpqwalreceiver.c       |  4 +++
 src/backend/replication/logical/worker.c      |  4 +++
 src/backend/replication/pgoutput/pgoutput.c   | 23 ++++++++++---
 src/bin/psql/tab-complete.c                   | 10 +++---
 src/include/catalog/pg_subscription.h         |  5 +++
 src/include/replication/pgoutput.h            |  1 +
 src/include/replication/walreceiver.h         |  1 +
 src/test/subscription/t/031_column_list.pl    | 14 ++++----
 11 files changed, 82 insertions(+), 17 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 406a3c2dd1..404577725e 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -74,6 +74,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->passwordrequired = subform->subpasswordrequired;
 	sub->runasowner = subform->subrunasowner;
 	sub->failover = subform->subfailover;
+	sub->skipnotexistpub = subform->subskipnotexistpub;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 04227a72d1..323cd2485d 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1360,7 +1360,8 @@ REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
               subbinary, substream, subtwophasestate, subdisableonerr,
 			  subpasswordrequired, subrunasowner, subfailover,
-              subslotname, subsynccommit, subpublications, suborigin)
+              subskipnotexistpub, subslotname, subsynccommit, subpublications,
+              suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index a05d69922d..bd59efc73a 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -72,6 +72,7 @@
 #define SUBOPT_FAILOVER				0x00002000
 #define SUBOPT_LSN					0x00004000
 #define SUBOPT_ORIGIN				0x00008000
+#define SUBOPT_SKIP_NOT_EXISTS_PUB  0x00010000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -97,6 +98,7 @@ typedef struct SubOpts
 	bool		passwordrequired;
 	bool		runasowner;
 	bool		failover;
+	bool		skipnotexistpub;
 	char	   *origin;
 	XLogRecPtr	lsn;
 } SubOpts;
@@ -159,6 +161,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->runasowner = false;
 	if (IsSet(supported_opts, SUBOPT_FAILOVER))
 		opts->failover = false;
+	if (IsSet(supported_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+		opts->skipnotexistpub = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
 
@@ -316,6 +320,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_FAILOVER;
 			opts->failover = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_SKIP_NOT_EXISTS_PUB) &&
+				 strcmp(defel->defname, "skip_not_exist_publication") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_SKIP_NOT_EXISTS_PUB;
+			opts->skipnotexistpub = defGetBoolean(defel);
+		}
 		else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
 				 strcmp(defel->defname, "origin") == 0)
 		{
@@ -408,6 +421,13 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "failover = true")));
 
+		if (opts->skipnotexistpub &&
+			IsSet(opts->specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("%s and %s are mutually exclusive options",
+							"connect = false", "skip_not_exist_publication = true")));
+
 		/* Change the defaults of other options. */
 		opts->enabled = false;
 		opts->create_slot = false;
@@ -611,7 +631,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
-					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
+					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+					  SUBOPT_SKIP_NOT_EXISTS_PUB | SUBOPT_ORIGIN);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -718,6 +739,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
 	values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
 	values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
+	values[Anum_pg_subscription_subskipnotexistpub - 1] =
+		BoolGetDatum(opts.skipnotexistpub);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -1169,6 +1192,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
 								  SUBOPT_PASSWORD_REQUIRED |
 								  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+								  SUBOPT_SKIP_NOT_EXISTS_PUB |
 								  SUBOPT_ORIGIN);
 
 				parse_subscription_options(pstate, stmt->options,
@@ -1248,6 +1272,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_subrunasowner - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+				{
+					values[Anum_pg_subscription_subskipnotexistpub - 1] =
+						BoolGetDatum(opts.runasowner);
+					replaces[Anum_pg_subscription_subskipnotexistpub - 1] = true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
 				{
 					if (!sub->slotname)
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 9270d7b855..a66108aee8 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -593,6 +593,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			appendStringInfo(&cmd, ", origin '%s'",
 							 options->proto.logical.origin);
 
+		if (options->proto.logical.skipnotexistpub &&
+			PQserverVersion(conn->streamConn) >= 170000)
+			appendStringInfo(&cmd, ", skip_not_exist_publication 'true'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 9dd2446fbf..ef362e3571 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3952,6 +3952,7 @@ maybe_reread_subscription(void)
 		newsub->binary != MySubscription->binary ||
 		newsub->stream != MySubscription->stream ||
 		newsub->passwordrequired != MySubscription->passwordrequired ||
+		newsub->skipnotexistpub != MySubscription->skipnotexistpub ||
 		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
 		!equal(newsub->publications, MySubscription->publications))
@@ -4380,6 +4381,9 @@ set_stream_options(WalRcvStreamOptions *options,
 	options->proto.logical.publication_names = MySubscription->publications;
 	options->proto.logical.binary = MySubscription->binary;
 
+	if (server_version >= 170000 && MySubscription->skipnotexistpub)
+		options->proto.logical.skipnotexistpub = true;
+
 	/*
 	 * Assign the appropriate option value for streaming option according to
 	 * the 'streaming' mode and the publisher's ability to support that mode.
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index f7b6d0384d..7c27d4a7d7 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -82,7 +82,8 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 
 static bool publications_valid;
 
-static List *LoadPublications(List *pubnames, bool *skipped);
+static List *LoadPublications(List *pubnames, bool skipnotexistpub,
+							  bool *skipped);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
@@ -284,11 +285,13 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 	bool		origin_option_given = false;
+	bool		skipnotexistpub_option_given = false;
 
 	data->binary = false;
 	data->streaming = LOGICALREP_STREAM_OFF;
 	data->messages = false;
 	data->two_phase = false;
+	data->skipnotexistpub = false;
 
 	foreach(lc, options)
 	{
@@ -397,6 +400,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						errmsg("unrecognized origin value: \"%s\"", origin));
 		}
+		else if (strcmp(defel->defname, "skip_not_exist_publication") == 0)
+		{
+			if (skipnotexistpub_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			skipnotexistpub_option_given = true;
+
+			data->skipnotexistpub = true;
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -1709,7 +1722,7 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
  * exist.
  */
 static List *
-LoadPublications(List *pubnames, bool *skipped)
+LoadPublications(List *pubnames, bool skipnotexistpub, bool *skipped)
 {
 	List	   *result = NIL;
 	ListCell   *lc;
@@ -1717,7 +1730,7 @@ LoadPublications(List *pubnames, bool *skipped)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, true);
+		Publication *pub = GetPublicationByName(pubname, skipnotexistpub);
 
 		if (pub)
 			result = lappend(result, pub);
@@ -2029,7 +2042,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				list_free_deep(data->publications);
 				data->publications = NIL;
 			}
-			data->publications = LoadPublications(data->publication_names, &skipped_pub);
+			data->publications = LoadPublications(data->publication_names,
+												  data->skipnotexistpub,
+												  &skipped_pub);
 			MemoryContextSwitchTo(oldctx);
 
 			/*
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 151a5211ee..0642ff4581 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1944,8 +1944,9 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
 		COMPLETE_WITH("binary", "disable_on_error", "failover", "origin",
-					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit");
+					  "password_required", "run_as_owner",
+					  "skip_not_exist_publication", "slot_name", "streaming",
+					  "synchronous_commit");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
 		COMPLETE_WITH("lsn");
@@ -3341,8 +3342,9 @@ psql_completion(const char *text, int start, int end)
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
 					  "disable_on_error", "enabled", "failover", "origin",
-					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit", "two_phase");
+					  "password_required", "run_as_owner",
+					  "skip_not_exist_publication", "slot_name", "streaming",
+					  "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0aa14ec4a2..704d1217d9 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -98,6 +98,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
 
+	bool		subskipnotexistpub;	/* True if the not-exist publications should
+									 * be ignored */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -151,6 +154,8 @@ typedef struct Subscription
 								 * (i.e. the main slot and the table sync
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
+	bool		skipnotexistpub; /* True if the non-existent publications should
+								  * be ignored. */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 89f94e1147..38faa2ea04 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -33,6 +33,7 @@ typedef struct PGOutputData
 	bool		messages;
 	bool		two_phase;
 	bool		publish_no_origin;
+	bool		skipnotexistpub;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index b906bb5ce8..e7a0d9e08a 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -186,6 +186,7 @@ typedef struct
 									 * prepare time */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
+			bool		skipnotexistpub;
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index 938582e31a..8626397c1c 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -145,7 +145,7 @@ $node_publisher->safe_psql(
 # then check the sync results
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (skip_not_exist_publication = true)
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -741,7 +741,7 @@ $node_publisher->safe_psql(
 $node_subscriber->safe_psql(
 	'postgres', qq(
 	DROP SUBSCRIPTION sub1;
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -921,7 +921,7 @@ $node_publisher->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -973,7 +973,7 @@ $node_publisher->safe_psql(
 # both table sync and data replication.
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -1213,7 +1213,7 @@ $node_subscriber->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_7, pub_mix_8;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_7, pub_mix_8 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -1255,7 +1255,7 @@ $node_subscriber->safe_psql(
 
 my ($cmdret, $stdout, $stderr) = $node_subscriber->psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2 WITH (skip_not_exist_publication = true);
 ));
 
 ok( $stderr =~
@@ -1272,7 +1272,7 @@ $node_publisher->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2 WITH (skip_not_exist_publication = true);
 ));
 
 $node_publisher->wait_for_catchup('sub1');
-- 
2.34.1

