Add an option to skip loading missing publication to avoid logical replication failure

Started by vignesh Calmost 2 years ago48 messages
#1vignesh C
vignesh21@gmail.com
2 attachment(s)

Hi,

Currently ALTER SUBSCRIPTION ... SET PUBLICATION will break the
logical replication in certain cases. This can happen as the apply
worker will get restarted after SET PUBLICATION, the apply worker will
use the existing slot and replication origin corresponding to the
subscription. Now, it is possible that before restart the origin has
not been updated and the WAL start location points to a location prior
to where PUBLICATION pub exists which can lead to such an error. Once
this error occurs, apply worker will never be able to proceed and will
always return the same error.

There was discussion on this and Amit had posted a patch to handle
this at [2]. Amit's patch does continue using a historic snapshot but
ignores publications that are not found for the purpose of computing
RelSyncEntry attributes. We won't mark such an entry as valid till all
the publications are loaded without anything missing. This means we
won't publish operations on tables corresponding to that publication
till we found such a publication and that seems okay.
I have added an option skip_not_exist_publication to enable this
operation only when skip_not_exist_publication is specified as true.
There is no change in default behavior when skip_not_exist_publication
is specified as false.

But one thing to note with the patch (with skip_not_exist_publication
option) is that replication of few WAL entries will be skipped till
the publication is loaded like in the below example:
-- Create table in publisher and subscriber
create table t1(c1 int);
create table t2(c1 int);

-- Create publications
create publication pub1 for table t1;
create publication pub2 for table t2;

-- Create subscription
create subscription test1 connection 'dbname=postgres host=localhost
port=5432' publication pub1, pub2;

-- Drop one publication
drop publication pub1;

-- Insert in the publisher
insert into t1 values(11);
insert into t2 values(21);

-- Select in subscriber
postgres=# select * from t1;
c1
----
(0 rows)

postgres=# select * from t2;
c1
----
21
(1 row)

-- Create the dropped publication in publisher
create publication pub1 for table t1;

-- Insert in the publisher
insert into t1 values(12);
postgres=# select * from t1;
c1
----
11
12
(2 rows)

-- Select data in subscriber
postgres=# select * from t1; -- record with value 11 will be missing
in subscriber
c1
----
12
(1 row)

Thoughts?

[1]: /messages/by-id/CAA4eK1+T-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q@mail.gmail.com

Regards,
Vignesh

Attachments:

v1-0001-Skip-loading-the-publication-if-the-publication-d.patchtext/x-patch; charset=US-ASCII; name=v1-0001-Skip-loading-the-publication-if-the-publication-d.patchDownload
From 9b79dee26554ae4af9ff00948ab185482b071ba8 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Mon, 19 Feb 2024 10:20:02 +0530
Subject: [PATCH v1 1/2] Skip loading the publication if the publication does
 not exist.

Skip loading the publication if the publication does not exist.
---
 src/backend/replication/pgoutput/pgoutput.c | 28 +++++++++++++++------
 1 file changed, 21 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 998f92d671..f7b6d0384d 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -82,7 +82,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 
 static bool publications_valid;
 
-static List *LoadPublications(List *pubnames);
+static List *LoadPublications(List *pubnames, bool *skipped);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
@@ -1703,9 +1703,13 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 
 /*
  * Load publications from the list of publication names.
+ *
+ * Here, we just skip the publications that don't exist yet. 'skipped'
+ * will be true if we find any publication from the given list that doesn't
+ * exist.
  */
 static List *
-LoadPublications(List *pubnames)
+LoadPublications(List *pubnames, bool *skipped)
 {
 	List	   *result = NIL;
 	ListCell   *lc;
@@ -1713,9 +1717,12 @@ LoadPublications(List *pubnames)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, false);
+		Publication *pub = GetPublicationByName(pubname, true);
 
-		result = lappend(result, pub);
+		if (pub)
+			result = lappend(result, pub);
+		else
+			*skipped = true;
 	}
 
 	return result;
@@ -1994,7 +2001,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 	}
 
 	/* Validate the entry */
-	if (!entry->replicate_valid)
+	if (!entry->replicate_valid || !publications_valid)
 	{
 		Oid			schemaId = get_rel_namespace(relid);
 		List	   *pubids = GetRelationPublications(relid);
@@ -2011,6 +2018,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		bool		am_partition = get_rel_relispartition(relid);
 		char		relkind = get_rel_relkind(relid);
 		List	   *rel_publications = NIL;
+		bool		skipped_pub = false;
 
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
@@ -2021,9 +2029,15 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				list_free_deep(data->publications);
 				data->publications = NIL;
 			}
-			data->publications = LoadPublications(data->publication_names);
+			data->publications = LoadPublications(data->publication_names, &skipped_pub);
 			MemoryContextSwitchTo(oldctx);
-			publications_valid = true;
+
+			/*
+			 * We don't consider the publications to be valid till we have
+			 * information of all the publications.
+			 */
+			if (!skipped_pub)
+				publications_valid = true;
 		}
 
 		/*
-- 
2.34.1

v1-0002-Added-an-option-skip_not_exist_publication-which-.patchtext/x-patch; charset=US-ASCII; name=v1-0002-Added-an-option-skip_not_exist_publication-which-.patchDownload
From 0030521b178b80b1de26afd8a71cb7b741511b01 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Mon, 19 Feb 2024 10:23:05 +0530
Subject: [PATCH v1 2/2] Added an option skip_not_exist_publication which will
 skip loading the publication, if the publication does not exist.

Added an option skip_not_exist_publication which will skip loading the
publication, if the publication does not exist.
---
 src/backend/catalog/pg_subscription.c         |  1 +
 src/backend/catalog/system_views.sql          |  3 +-
 src/backend/commands/subscriptioncmds.c       | 33 ++++++++++++++++++-
 .../libpqwalreceiver/libpqwalreceiver.c       |  4 +++
 src/backend/replication/logical/worker.c      |  4 +++
 src/backend/replication/pgoutput/pgoutput.c   | 23 ++++++++++---
 src/bin/psql/tab-complete.c                   | 10 +++---
 src/include/catalog/pg_subscription.h         |  5 +++
 src/include/replication/pgoutput.h            |  1 +
 src/include/replication/walreceiver.h         |  1 +
 src/test/subscription/t/031_column_list.pl    | 14 ++++----
 11 files changed, 82 insertions(+), 17 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 406a3c2dd1..404577725e 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -74,6 +74,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->passwordrequired = subform->subpasswordrequired;
 	sub->runasowner = subform->subrunasowner;
 	sub->failover = subform->subfailover;
+	sub->skipnotexistpub = subform->subskipnotexistpub;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 04227a72d1..323cd2485d 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1360,7 +1360,8 @@ REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
               subbinary, substream, subtwophasestate, subdisableonerr,
 			  subpasswordrequired, subrunasowner, subfailover,
-              subslotname, subsynccommit, subpublications, suborigin)
+              subskipnotexistpub, subslotname, subsynccommit, subpublications,
+              suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index a05d69922d..bd59efc73a 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -72,6 +72,7 @@
 #define SUBOPT_FAILOVER				0x00002000
 #define SUBOPT_LSN					0x00004000
 #define SUBOPT_ORIGIN				0x00008000
+#define SUBOPT_SKIP_NOT_EXISTS_PUB  0x00010000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -97,6 +98,7 @@ typedef struct SubOpts
 	bool		passwordrequired;
 	bool		runasowner;
 	bool		failover;
+	bool		skipnotexistpub;
 	char	   *origin;
 	XLogRecPtr	lsn;
 } SubOpts;
@@ -159,6 +161,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->runasowner = false;
 	if (IsSet(supported_opts, SUBOPT_FAILOVER))
 		opts->failover = false;
+	if (IsSet(supported_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+		opts->skipnotexistpub = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
 
@@ -316,6 +320,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_FAILOVER;
 			opts->failover = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_SKIP_NOT_EXISTS_PUB) &&
+				 strcmp(defel->defname, "skip_not_exist_publication") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_SKIP_NOT_EXISTS_PUB;
+			opts->skipnotexistpub = defGetBoolean(defel);
+		}
 		else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
 				 strcmp(defel->defname, "origin") == 0)
 		{
@@ -408,6 +421,13 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "failover = true")));
 
+		if (opts->skipnotexistpub &&
+			IsSet(opts->specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("%s and %s are mutually exclusive options",
+							"connect = false", "skip_not_exist_publication = true")));
+
 		/* Change the defaults of other options. */
 		opts->enabled = false;
 		opts->create_slot = false;
@@ -611,7 +631,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
-					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
+					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+					  SUBOPT_SKIP_NOT_EXISTS_PUB | SUBOPT_ORIGIN);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -718,6 +739,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
 	values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
 	values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
+	values[Anum_pg_subscription_subskipnotexistpub - 1] =
+		BoolGetDatum(opts.skipnotexistpub);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -1169,6 +1192,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
 								  SUBOPT_PASSWORD_REQUIRED |
 								  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+								  SUBOPT_SKIP_NOT_EXISTS_PUB |
 								  SUBOPT_ORIGIN);
 
 				parse_subscription_options(pstate, stmt->options,
@@ -1248,6 +1272,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_subrunasowner - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+				{
+					values[Anum_pg_subscription_subskipnotexistpub - 1] =
+						BoolGetDatum(opts.runasowner);
+					replaces[Anum_pg_subscription_subskipnotexistpub - 1] = true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
 				{
 					if (!sub->slotname)
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 9270d7b855..a66108aee8 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -593,6 +593,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			appendStringInfo(&cmd, ", origin '%s'",
 							 options->proto.logical.origin);
 
+		if (options->proto.logical.skipnotexistpub &&
+			PQserverVersion(conn->streamConn) >= 170000)
+			appendStringInfo(&cmd, ", skip_not_exist_publication 'true'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 9dd2446fbf..ef362e3571 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3952,6 +3952,7 @@ maybe_reread_subscription(void)
 		newsub->binary != MySubscription->binary ||
 		newsub->stream != MySubscription->stream ||
 		newsub->passwordrequired != MySubscription->passwordrequired ||
+		newsub->skipnotexistpub != MySubscription->skipnotexistpub ||
 		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
 		!equal(newsub->publications, MySubscription->publications))
@@ -4380,6 +4381,9 @@ set_stream_options(WalRcvStreamOptions *options,
 	options->proto.logical.publication_names = MySubscription->publications;
 	options->proto.logical.binary = MySubscription->binary;
 
+	if (server_version >= 170000 && MySubscription->skipnotexistpub)
+		options->proto.logical.skipnotexistpub = true;
+
 	/*
 	 * Assign the appropriate option value for streaming option according to
 	 * the 'streaming' mode and the publisher's ability to support that mode.
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index f7b6d0384d..7c27d4a7d7 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -82,7 +82,8 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 
 static bool publications_valid;
 
-static List *LoadPublications(List *pubnames, bool *skipped);
+static List *LoadPublications(List *pubnames, bool skipnotexistpub,
+							  bool *skipped);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
@@ -284,11 +285,13 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 	bool		origin_option_given = false;
+	bool		skipnotexistpub_option_given = false;
 
 	data->binary = false;
 	data->streaming = LOGICALREP_STREAM_OFF;
 	data->messages = false;
 	data->two_phase = false;
+	data->skipnotexistpub = false;
 
 	foreach(lc, options)
 	{
@@ -397,6 +400,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						errmsg("unrecognized origin value: \"%s\"", origin));
 		}
+		else if (strcmp(defel->defname, "skip_not_exist_publication") == 0)
+		{
+			if (skipnotexistpub_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			skipnotexistpub_option_given = true;
+
+			data->skipnotexistpub = true;
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -1709,7 +1722,7 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
  * exist.
  */
 static List *
-LoadPublications(List *pubnames, bool *skipped)
+LoadPublications(List *pubnames, bool skipnotexistpub, bool *skipped)
 {
 	List	   *result = NIL;
 	ListCell   *lc;
@@ -1717,7 +1730,7 @@ LoadPublications(List *pubnames, bool *skipped)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, true);
+		Publication *pub = GetPublicationByName(pubname, skipnotexistpub);
 
 		if (pub)
 			result = lappend(result, pub);
@@ -2029,7 +2042,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				list_free_deep(data->publications);
 				data->publications = NIL;
 			}
-			data->publications = LoadPublications(data->publication_names, &skipped_pub);
+			data->publications = LoadPublications(data->publication_names,
+												  data->skipnotexistpub,
+												  &skipped_pub);
 			MemoryContextSwitchTo(oldctx);
 
 			/*
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 151a5211ee..0642ff4581 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1944,8 +1944,9 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
 		COMPLETE_WITH("binary", "disable_on_error", "failover", "origin",
-					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit");
+					  "password_required", "run_as_owner",
+					  "skip_not_exist_publication", "slot_name", "streaming",
+					  "synchronous_commit");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
 		COMPLETE_WITH("lsn");
@@ -3341,8 +3342,9 @@ psql_completion(const char *text, int start, int end)
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
 					  "disable_on_error", "enabled", "failover", "origin",
-					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit", "two_phase");
+					  "password_required", "run_as_owner",
+					  "skip_not_exist_publication", "slot_name", "streaming",
+					  "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0aa14ec4a2..704d1217d9 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -98,6 +98,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
 
+	bool		subskipnotexistpub;	/* True if the not-exist publications should
+									 * be ignored */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -151,6 +154,8 @@ typedef struct Subscription
 								 * (i.e. the main slot and the table sync
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
+	bool		skipnotexistpub; /* True if the non-existent publications should
+								  * be ignored. */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 89f94e1147..38faa2ea04 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -33,6 +33,7 @@ typedef struct PGOutputData
 	bool		messages;
 	bool		two_phase;
 	bool		publish_no_origin;
+	bool		skipnotexistpub;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index b906bb5ce8..e7a0d9e08a 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -186,6 +186,7 @@ typedef struct
 									 * prepare time */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
+			bool		skipnotexistpub;
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index 938582e31a..8626397c1c 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -145,7 +145,7 @@ $node_publisher->safe_psql(
 # then check the sync results
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (skip_not_exist_publication = true)
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -741,7 +741,7 @@ $node_publisher->safe_psql(
 $node_subscriber->safe_psql(
 	'postgres', qq(
 	DROP SUBSCRIPTION sub1;
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -921,7 +921,7 @@ $node_publisher->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -973,7 +973,7 @@ $node_publisher->safe_psql(
 # both table sync and data replication.
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -1213,7 +1213,7 @@ $node_subscriber->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_7, pub_mix_8;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_7, pub_mix_8 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -1255,7 +1255,7 @@ $node_subscriber->safe_psql(
 
 my ($cmdret, $stdout, $stderr) = $node_subscriber->psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2 WITH (skip_not_exist_publication = true);
 ));
 
 ok( $stderr =~
@@ -1272,7 +1272,7 @@ $node_publisher->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2 WITH (skip_not_exist_publication = true);
 ));
 
 $node_publisher->wait_for_catchup('sub1');
-- 
2.34.1

#2vignesh C
vignesh21@gmail.com
In reply to: vignesh C (#1)
2 attachment(s)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Mon, 19 Feb 2024 at 12:48, vignesh C <vignesh21@gmail.com> wrote:

Hi,

Currently ALTER SUBSCRIPTION ... SET PUBLICATION will break the
logical replication in certain cases. This can happen as the apply
worker will get restarted after SET PUBLICATION, the apply worker will
use the existing slot and replication origin corresponding to the
subscription. Now, it is possible that before restart the origin has
not been updated and the WAL start location points to a location prior
to where PUBLICATION pub exists which can lead to such an error. Once
this error occurs, apply worker will never be able to proceed and will
always return the same error.

There was discussion on this and Amit had posted a patch to handle
this at [2]. Amit's patch does continue using a historic snapshot but
ignores publications that are not found for the purpose of computing
RelSyncEntry attributes. We won't mark such an entry as valid till all
the publications are loaded without anything missing. This means we
won't publish operations on tables corresponding to that publication
till we found such a publication and that seems okay.
I have added an option skip_not_exist_publication to enable this
operation only when skip_not_exist_publication is specified as true.
There is no change in default behavior when skip_not_exist_publication
is specified as false.

I have updated the patch to now include changes for pg_dump, added few
tests, describe changes and added documentation changes. The attached
v2 version patch has the changes for the same.

Regards,
Vignesh

Attachments:

v2-0001-Skip-loading-the-publication-if-the-publication-d.patchtext/x-patch; charset=US-ASCII; name=v2-0001-Skip-loading-the-publication-if-the-publication-d.patchDownload
From c0f97fc671db390239ca5cb4c224cb3d80a0e22e Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Mon, 19 Feb 2024 10:20:02 +0530
Subject: [PATCH v2 1/2] Skip loading the publication if the publication does
 not exist.

Skip loading the publication if the publication does not exist.
---
 src/backend/replication/pgoutput/pgoutput.c | 28 +++++++++++++++------
 1 file changed, 21 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 998f92d671..f7b6d0384d 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -82,7 +82,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 
 static bool publications_valid;
 
-static List *LoadPublications(List *pubnames);
+static List *LoadPublications(List *pubnames, bool *skipped);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
@@ -1703,9 +1703,13 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 
 /*
  * Load publications from the list of publication names.
+ *
+ * Here, we just skip the publications that don't exist yet. 'skipped'
+ * will be true if we find any publication from the given list that doesn't
+ * exist.
  */
 static List *
-LoadPublications(List *pubnames)
+LoadPublications(List *pubnames, bool *skipped)
 {
 	List	   *result = NIL;
 	ListCell   *lc;
@@ -1713,9 +1717,12 @@ LoadPublications(List *pubnames)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, false);
+		Publication *pub = GetPublicationByName(pubname, true);
 
-		result = lappend(result, pub);
+		if (pub)
+			result = lappend(result, pub);
+		else
+			*skipped = true;
 	}
 
 	return result;
@@ -1994,7 +2001,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 	}
 
 	/* Validate the entry */
-	if (!entry->replicate_valid)
+	if (!entry->replicate_valid || !publications_valid)
 	{
 		Oid			schemaId = get_rel_namespace(relid);
 		List	   *pubids = GetRelationPublications(relid);
@@ -2011,6 +2018,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		bool		am_partition = get_rel_relispartition(relid);
 		char		relkind = get_rel_relkind(relid);
 		List	   *rel_publications = NIL;
+		bool		skipped_pub = false;
 
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
@@ -2021,9 +2029,15 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				list_free_deep(data->publications);
 				data->publications = NIL;
 			}
-			data->publications = LoadPublications(data->publication_names);
+			data->publications = LoadPublications(data->publication_names, &skipped_pub);
 			MemoryContextSwitchTo(oldctx);
-			publications_valid = true;
+
+			/*
+			 * We don't consider the publications to be valid till we have
+			 * information of all the publications.
+			 */
+			if (!skipped_pub)
+				publications_valid = true;
 		}
 
 		/*
-- 
2.34.1

v2-0002-Added-an-option-skip_not_exist_publication-which-.patchtext/x-patch; charset=US-ASCII; name=v2-0002-Added-an-option-skip_not_exist_publication-which-.patchDownload
From e9ff5304b2a3a0fccfb9084ff076a0465f28edbe Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Mon, 19 Feb 2024 10:23:05 +0530
Subject: [PATCH v2 2/2] Added an option skip_not_exist_publication which will
 skip loading the publication, if the publication does not exist.

Added an option skip_not_exist_publication which will skip loading the
publication, if the publication does not exist.
---
 doc/src/sgml/catalogs.sgml                    |  10 ++
 doc/src/sgml/ref/alter_subscription.sgml      |   5 +-
 doc/src/sgml/ref/create_subscription.sgml     |  11 ++
 src/backend/catalog/pg_subscription.c         |   1 +
 src/backend/catalog/system_views.sql          |   3 +-
 src/backend/commands/subscriptioncmds.c       |  26 ++-
 .../libpqwalreceiver/libpqwalreceiver.c       |   4 +
 src/backend/replication/logical/worker.c      |   4 +
 src/backend/replication/pgoutput/pgoutput.c   |  23 ++-
 src/bin/pg_dump/pg_dump.c                     |  14 ++
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/describe.c                       |   8 +-
 src/bin/psql/tab-complete.c                   |  10 +-
 src/include/catalog/catversion.h              |   2 +-
 src/include/catalog/pg_subscription.h         |   5 +
 src/include/replication/pgoutput.h            |   1 +
 src/include/replication/walreceiver.h         |   1 +
 src/test/regress/expected/subscription.out    | 154 +++++++++---------
 src/test/regress/sql/subscription.sql         |   3 +
 src/test/subscription/t/031_column_list.pl    |  14 +-
 src/test/subscription/t/034_skip_not_exist.pl |  71 ++++++++
 21 files changed, 272 insertions(+), 99 deletions(-)
 create mode 100644 src/test/subscription/t/034_skip_not_exist.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 880f717b10..aa46b0676c 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -8011,6 +8011,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subskipnotexistpub</structfield> <type>bool</type>
+      </para>
+      <para>
+       If true, the publisher will skip loading the publication if the
+       publication does not exist
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subconninfo</structfield> <type>text</type>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index e9e6d9d74a..6ef2e848a4 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -227,8 +227,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <link linkend="sql-createsubscription-params-with-disable-on-error"><literal>disable_on_error</literal></link>,
       <link linkend="sql-createsubscription-params-with-password-required"><literal>password_required</literal></link>,
       <link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>,
-      <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>, and
-      <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>.
+      <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>,
+      <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
+      <link linkend="sql-createsubscription-params-with-skipnotexistpublication"><literal>skip_not_exist_publication</literal></link>, and
       Only a superuser can set <literal>password_required = false</literal>.
      </para>
 
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 15794731bb..b410ab0b4d 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -414,6 +414,17 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+      </variablelist>
+
+       <varlistentry id="sql-createsubscription-params-with-skipnotexistpublication">
+        <term><literal>skip_not_exist_publication</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the publisher must skip loading the publication if
+          the publication does not exist. The default is <literal>false</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist></para>
 
     </listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 406a3c2dd1..404577725e 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -74,6 +74,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->passwordrequired = subform->subpasswordrequired;
 	sub->runasowner = subform->subrunasowner;
 	sub->failover = subform->subfailover;
+	sub->skipnotexistpub = subform->subskipnotexistpub;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 04227a72d1..323cd2485d 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1360,7 +1360,8 @@ REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
               subbinary, substream, subtwophasestate, subdisableonerr,
 			  subpasswordrequired, subrunasowner, subfailover,
-              subslotname, subsynccommit, subpublications, suborigin)
+              subskipnotexistpub, subslotname, subsynccommit, subpublications,
+              suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index a05d69922d..17512e7731 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -72,6 +72,7 @@
 #define SUBOPT_FAILOVER				0x00002000
 #define SUBOPT_LSN					0x00004000
 #define SUBOPT_ORIGIN				0x00008000
+#define SUBOPT_SKIP_NOT_EXISTS_PUB  0x00010000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -97,6 +98,7 @@ typedef struct SubOpts
 	bool		passwordrequired;
 	bool		runasowner;
 	bool		failover;
+	bool		skipnotexistpub;
 	char	   *origin;
 	XLogRecPtr	lsn;
 } SubOpts;
@@ -159,6 +161,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->runasowner = false;
 	if (IsSet(supported_opts, SUBOPT_FAILOVER))
 		opts->failover = false;
+	if (IsSet(supported_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+		opts->skipnotexistpub = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
 
@@ -316,6 +320,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_FAILOVER;
 			opts->failover = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_SKIP_NOT_EXISTS_PUB) &&
+				 strcmp(defel->defname, "skip_not_exist_publication") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_SKIP_NOT_EXISTS_PUB;
+			opts->skipnotexistpub = defGetBoolean(defel);
+		}
 		else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
 				 strcmp(defel->defname, "origin") == 0)
 		{
@@ -611,7 +624,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
-					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
+					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+					  SUBOPT_SKIP_NOT_EXISTS_PUB | SUBOPT_ORIGIN);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -718,6 +732,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
 	values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
 	values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
+	values[Anum_pg_subscription_subskipnotexistpub - 1] =
+		BoolGetDatum(opts.skipnotexistpub);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -1169,6 +1185,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
 								  SUBOPT_PASSWORD_REQUIRED |
 								  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+								  SUBOPT_SKIP_NOT_EXISTS_PUB |
 								  SUBOPT_ORIGIN);
 
 				parse_subscription_options(pstate, stmt->options,
@@ -1248,6 +1265,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_subrunasowner - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+				{
+					values[Anum_pg_subscription_subskipnotexistpub - 1] =
+						BoolGetDatum(opts.skipnotexistpub);
+					replaces[Anum_pg_subscription_subskipnotexistpub - 1] = true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
 				{
 					if (!sub->slotname)
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 9270d7b855..a66108aee8 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -593,6 +593,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			appendStringInfo(&cmd, ", origin '%s'",
 							 options->proto.logical.origin);
 
+		if (options->proto.logical.skipnotexistpub &&
+			PQserverVersion(conn->streamConn) >= 170000)
+			appendStringInfo(&cmd, ", skip_not_exist_publication 'true'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 9dd2446fbf..ef362e3571 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3952,6 +3952,7 @@ maybe_reread_subscription(void)
 		newsub->binary != MySubscription->binary ||
 		newsub->stream != MySubscription->stream ||
 		newsub->passwordrequired != MySubscription->passwordrequired ||
+		newsub->skipnotexistpub != MySubscription->skipnotexistpub ||
 		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
 		!equal(newsub->publications, MySubscription->publications))
@@ -4380,6 +4381,9 @@ set_stream_options(WalRcvStreamOptions *options,
 	options->proto.logical.publication_names = MySubscription->publications;
 	options->proto.logical.binary = MySubscription->binary;
 
+	if (server_version >= 170000 && MySubscription->skipnotexistpub)
+		options->proto.logical.skipnotexistpub = true;
+
 	/*
 	 * Assign the appropriate option value for streaming option according to
 	 * the 'streaming' mode and the publisher's ability to support that mode.
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index f7b6d0384d..7c27d4a7d7 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -82,7 +82,8 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 
 static bool publications_valid;
 
-static List *LoadPublications(List *pubnames, bool *skipped);
+static List *LoadPublications(List *pubnames, bool skipnotexistpub,
+							  bool *skipped);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
@@ -284,11 +285,13 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 	bool		origin_option_given = false;
+	bool		skipnotexistpub_option_given = false;
 
 	data->binary = false;
 	data->streaming = LOGICALREP_STREAM_OFF;
 	data->messages = false;
 	data->two_phase = false;
+	data->skipnotexistpub = false;
 
 	foreach(lc, options)
 	{
@@ -397,6 +400,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						errmsg("unrecognized origin value: \"%s\"", origin));
 		}
+		else if (strcmp(defel->defname, "skip_not_exist_publication") == 0)
+		{
+			if (skipnotexistpub_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			skipnotexistpub_option_given = true;
+
+			data->skipnotexistpub = true;
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -1709,7 +1722,7 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
  * exist.
  */
 static List *
-LoadPublications(List *pubnames, bool *skipped)
+LoadPublications(List *pubnames, bool skipnotexistpub, bool *skipped)
 {
 	List	   *result = NIL;
 	ListCell   *lc;
@@ -1717,7 +1730,7 @@ LoadPublications(List *pubnames, bool *skipped)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, true);
+		Publication *pub = GetPublicationByName(pubname, skipnotexistpub);
 
 		if (pub)
 			result = lappend(result, pub);
@@ -2029,7 +2042,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				list_free_deep(data->publications);
 				data->publications = NIL;
 			}
-			data->publications = LoadPublications(data->publication_names, &skipped_pub);
+			data->publications = LoadPublications(data->publication_names,
+												  data->skipnotexistpub,
+												  &skipped_pub);
 			MemoryContextSwitchTo(oldctx);
 
 			/*
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 2225a12718..20abd319fc 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4644,6 +4644,7 @@ getSubscriptions(Archive *fout)
 	int			i_suboriginremotelsn;
 	int			i_subenabled;
 	int			i_subfailover;
+	int			i_subskipnotexistpublication;
 	int			i,
 				ntups;
 
@@ -4707,6 +4708,13 @@ getSubscriptions(Archive *fout)
 						  " '%s' AS suborigin,\n",
 						  LOGICALREP_ORIGIN_ANY);
 
+	if (fout->remoteVersion >= 170000)
+		appendPQExpBufferStr(query,
+							 " s.subskipnotexistpub,\n");
+	else
+		appendPQExpBuffer(query,
+						  " false AS subskipnotexistpub\n");
+
 	if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
 		appendPQExpBufferStr(query, " o.remote_lsn AS suboriginremotelsn,\n"
 							 " s.subenabled,\n"
@@ -4754,6 +4762,7 @@ getSubscriptions(Archive *fout)
 	i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
 	i_subenabled = PQfnumber(res, "subenabled");
 	i_subfailover = PQfnumber(res, "subfailover");
+	i_subskipnotexistpublication = PQfnumber(res, "subskipnotexistpub");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4800,6 +4809,8 @@ getSubscriptions(Archive *fout)
 			pg_strdup(PQgetvalue(res, i, i_subenabled));
 		subinfo[i].subfailover =
 			pg_strdup(PQgetvalue(res, i, i_subfailover));
+		subinfo[i].subskipnotexistpublication =
+			pg_strdup(PQgetvalue(res, i, i_subskipnotexistpublication));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -5043,6 +5054,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (pg_strcasecmp(subinfo->suborigin, LOGICALREP_ORIGIN_ANY) != 0)
 		appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin);
 
+	if (strcmp(subinfo->subskipnotexistpublication, "t") == 0)
+		appendPQExpBuffer(query, ", skip_not_exist_publication = true");
+
 	appendPQExpBufferStr(query, ");\n");
 
 	/*
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 77db42e354..493960b839 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -668,6 +668,7 @@ typedef struct _SubscriptionInfo
 	char	   *suborigin;
 	char	   *suboriginremotelsn;
 	char	   *subfailover;
+	char	   *subskipnotexistpublication;
 } SubscriptionInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index b6a4eb1d56..9261c4cf38 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6572,7 +6572,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
 		false, false, false, false, false, false, false, false, false, false,
-	false};
+	false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6638,8 +6638,10 @@ describeSubscriptions(const char *pattern, bool verbose)
 
 		if (pset.sversion >= 170000)
 			appendPQExpBuffer(&buf,
-							  ", subfailover AS \"%s\"\n",
-							  gettext_noop("Failover"));
+							  ", subfailover AS \"%s\"\n"
+							  ", subskipnotexistpub AS \"%s\"\n",
+							  gettext_noop("Failover"),
+							  gettext_noop("Skipnotexistpub"));
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 151a5211ee..0642ff4581 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1944,8 +1944,9 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
 		COMPLETE_WITH("binary", "disable_on_error", "failover", "origin",
-					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit");
+					  "password_required", "run_as_owner",
+					  "skip_not_exist_publication", "slot_name", "streaming",
+					  "synchronous_commit");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
 		COMPLETE_WITH("lsn");
@@ -3341,8 +3342,9 @@ psql_completion(const char *text, int start, int end)
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
 					  "disable_on_error", "enabled", "failover", "origin",
-					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit", "two_phase");
+					  "password_required", "run_as_owner",
+					  "skip_not_exist_publication", "slot_name", "streaming",
+					  "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 61beae92e2..934e7148b6 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
  */
 
 /*							yyyymmddN */
-#define CATALOG_VERSION_NO	202402142
+#define CATALOG_VERSION_NO	202402191
 
 #endif
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0aa14ec4a2..704d1217d9 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -98,6 +98,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
 
+	bool		subskipnotexistpub;	/* True if the not-exist publications should
+									 * be ignored */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -151,6 +154,8 @@ typedef struct Subscription
 								 * (i.e. the main slot and the table sync
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
+	bool		skipnotexistpub; /* True if the non-existent publications should
+								  * be ignored. */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 89f94e1147..38faa2ea04 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -33,6 +33,7 @@ typedef struct PGOutputData
 	bool		messages;
 	bool		two_phase;
 	bool		publish_no_origin;
+	bool		skipnotexistpub;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index b906bb5ce8..e7a0d9e08a 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -186,6 +186,7 @@ typedef struct
 									 * prepare time */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
+			bool		skipnotexistpub;
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 1eee6b17b8..43f98f5dd0 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -118,18 +118,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+ regress_testsub4
-                                                                                                                 List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                          List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                                                 List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                          List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -147,10 +147,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -158,15 +158,17 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist2';
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
 ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
 ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
+ALTER SUBSCRIPTION regress_testsub SET (skip_not_exist_publication = true);
 \dRs+
-                                                                                                                     List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | f                 | t             | f        | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                                                              List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | f                 | t             | f        | t               | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
 ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = false);
+ALTER SUBSCRIPTION regress_testsub SET (skip_not_exist_publication = false);
 -- fail
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = '');
 ERROR:  replication slot name "" is too short
@@ -178,10 +180,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                                                     List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist2 | 0/12345
+                                                                                                                              List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist2 | 0/12345
 (1 row)
 
 -- ok - with lsn = NONE
@@ -190,10 +192,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                                                     List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                                                              List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -225,10 +227,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                                                       List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | local              | dbname=regress_doesnotexist2 | 0/0
+                                                                                                                                List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | f               | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -257,19 +259,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -281,27 +283,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -316,10 +318,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                                        List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more than once
@@ -334,10 +336,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -373,10 +375,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -385,10 +387,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -398,10 +400,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -414,18 +416,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 1b2a23ba7b..85d8964b58 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -96,10 +96,13 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist2';
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
 ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
 ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
+ALTER SUBSCRIPTION regress_testsub SET (skip_not_exist_publication = true);
 \dRs+
 
 ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
 ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = false);
+ALTER SUBSCRIPTION regress_testsub SET (skip_not_exist_publication = false);
+
 
 -- fail
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = '');
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index 938582e31a..8626397c1c 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -145,7 +145,7 @@ $node_publisher->safe_psql(
 # then check the sync results
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (skip_not_exist_publication = true)
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -741,7 +741,7 @@ $node_publisher->safe_psql(
 $node_subscriber->safe_psql(
 	'postgres', qq(
 	DROP SUBSCRIPTION sub1;
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -921,7 +921,7 @@ $node_publisher->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -973,7 +973,7 @@ $node_publisher->safe_psql(
 # both table sync and data replication.
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -1213,7 +1213,7 @@ $node_subscriber->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_7, pub_mix_8;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_7, pub_mix_8 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -1255,7 +1255,7 @@ $node_subscriber->safe_psql(
 
 my ($cmdret, $stdout, $stderr) = $node_subscriber->psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2 WITH (skip_not_exist_publication = true);
 ));
 
 ok( $stderr =~
@@ -1272,7 +1272,7 @@ $node_publisher->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2 WITH (skip_not_exist_publication = true);
 ));
 
 $node_publisher->wait_for_catchup('sub1');
diff --git a/src/test/subscription/t/034_skip_not_exist.pl b/src/test/subscription/t/034_skip_not_exist.pl
new file mode 100644
index 0000000000..b7cc9514c0
--- /dev/null
+++ b/src/test/subscription/t/034_skip_not_exist.pl
@@ -0,0 +1,71 @@
+
+# Copyright (c) 2021-2024, PostgreSQL Global Development Group
+
+# This test checks behaviour of SUBSCRIPTION with skip_not_exist_publication
+# option.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create table on publisher
+$node_publisher->safe_psql(
+	'postgres', qq[
+		CREATE TABLE tab_1 (a int);
+		CREATE TABLE tab_2 (a int);
+]);
+
+# Create table on subscriber
+$node_subscriber->safe_psql(
+	'postgres', qq[
+		CREATE TABLE tab_1 (a int);
+		CREATE TABLE tab_2 (a int);
+]);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql(
+	'postgres', qq[
+		CREATE PUBLICATION tap_pub_1 FOR TABLE tab_1;
+		CREATE PUBLICATION tap_pub_2 FOR TABLE tab_2;
+]);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub_1, tap_pub_2 with (skip_not_exist_publication=true)"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+# Drop one of the publications
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_1");
+
+# Insert few records
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_2 SELECT generate_series(1,10)");
+
+# Wait for table data to be replicated
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+# Check that the inserted data was copied to the subscriber even after one of
+# the subscription's publication was dropped.
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab_2");
+is($result, qq(10|1|10), 'check inserted data is copied to subscriber');
+
+# shutdown
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
+
+done_testing();
-- 
2.34.1

#3Amit Kapila
amit.kapila16@gmail.com
In reply to: vignesh C (#1)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Mon, Feb 19, 2024 at 12:49 PM vignesh C <vignesh21@gmail.com> wrote:

Currently ALTER SUBSCRIPTION ... SET PUBLICATION will break the
logical replication in certain cases. This can happen as the apply
worker will get restarted after SET PUBLICATION, the apply worker will
use the existing slot and replication origin corresponding to the
subscription. Now, it is possible that before restart the origin has
not been updated and the WAL start location points to a location prior
to where PUBLICATION pub exists which can lead to such an error. Once
this error occurs, apply worker will never be able to proceed and will
always return the same error.

There was discussion on this and Amit had posted a patch to handle
this at [2]. Amit's patch does continue using a historic snapshot but
ignores publications that are not found for the purpose of computing
RelSyncEntry attributes. We won't mark such an entry as valid till all
the publications are loaded without anything missing. This means we
won't publish operations on tables corresponding to that publication
till we found such a publication and that seems okay.
I have added an option skip_not_exist_publication to enable this
operation only when skip_not_exist_publication is specified as true.
There is no change in default behavior when skip_not_exist_publication
is specified as false.

Did you try to measure the performance impact of this change? We can
try a few cases where DDL and DMLs are involved, missing publication
(drop publication and recreate after a varying number of records to
check the impact).

The other names for the option could be:
skip_notexistant_publications, or ignore_nonexistant_publications. Can
we think of any others?

--
With Regards,
Amit Kapila.

#4vignesh C
vignesh21@gmail.com
In reply to: Amit Kapila (#3)
4 attachment(s)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Fri, 14 Feb 2025 at 15:36, Amit Kapila <amit.kapila16@gmail.com> wrote:

Did you try to measure the performance impact of this change? We can
try a few cases where DDL and DMLs are involved, missing publication
(drop publication and recreate after a varying number of records to
check the impact).

Since we don't have an exact scenario to compare with the patch
(because, in the current HEAD, when the publication is missing, an
error is thrown and the walsender/worker restarts), I compared the
positive case, where records are successfully replicated to the
subscriber, as shown below. For the scenario with the patch, I ran the
same test, where the publication is dropped before the insert,
allowing the walsender to check whether the publication is present.
The test results, which represent the median of 7 runs and the
execution run is in milliseconds, are provided below:

Brach/records | 100 | 1000 | 10000 | 100000 | 1000000
Head | 1.214 | 2.548 | 10.823 | 90.3 | 951.833
Patch | 1.215 | 2.5485 | 10.8545 | 90.94 | 955.134
% diff | 0.082 | 0.020 | 0.291 | 0.704 | 0.347

I noticed that the test run with patches is very negligible. The
scripts used for execution are attached.

The other names for the option could be:
skip_notexistant_publications, or ignore_nonexistant_publications. Can
we think of any others?

How about using ignore_missing_publications or skip_missing_publications?

Regards,
Vignesh

Attachments:

v3-0001-Skip-loading-the-publication-if-the-publication-d.patchapplication/octet-stream; name=v3-0001-Skip-loading-the-publication-if-the-publication-d.patchDownload
From b1d9e322e4aa7b8eded115a10cbd1a65ccff5f29 Mon Sep 17 00:00:00 2001
From: Vignesh <vignesh21@gmail.com>
Date: Sun, 16 Feb 2025 18:19:15 +0530
Subject: [PATCH v3 1/2] Skip loading the publication if the publication does
 not exist.

Skip loading the publication if the publication does not exist.
---
 src/backend/replication/pgoutput/pgoutput.c | 28 +++++++++++++++------
 1 file changed, 21 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7d464f656a..048266ba77 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -82,7 +82,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 
 static bool publications_valid;
 
-static List *LoadPublications(List *pubnames);
+static List *LoadPublications(List *pubnames, bool *skipped);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_repl_origin(LogicalDecodingContext *ctx,
@@ -1762,9 +1762,13 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 
 /*
  * Load publications from the list of publication names.
+ *
+ * Here, we just skip the publications that don't exist yet. 'skipped'
+ * will be true if we find any publication from the given list that doesn't
+ * exist.
  */
 static List *
-LoadPublications(List *pubnames)
+LoadPublications(List *pubnames, bool *skipped)
 {
 	List	   *result = NIL;
 	ListCell   *lc;
@@ -1772,9 +1776,12 @@ LoadPublications(List *pubnames)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, false);
+		Publication *pub = GetPublicationByName(pubname, true);
 
-		result = lappend(result, pub);
+		if (pub)
+			result = lappend(result, pub);
+		else
+			*skipped = true;
 	}
 
 	return result;
@@ -2054,7 +2061,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 	}
 
 	/* Validate the entry */
-	if (!entry->replicate_valid)
+	if (!entry->replicate_valid || !publications_valid)
 	{
 		Oid			schemaId = get_rel_namespace(relid);
 		List	   *pubids = GetRelationPublications(relid);
@@ -2071,6 +2078,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		bool		am_partition = get_rel_relispartition(relid);
 		char		relkind = get_rel_relkind(relid);
 		List	   *rel_publications = NIL;
+		bool		skipped_pub = false;
 
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
@@ -2078,9 +2086,15 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			MemoryContextReset(data->pubctx);
 
 			oldctx = MemoryContextSwitchTo(data->pubctx);
-			data->publications = LoadPublications(data->publication_names);
+			data->publications = LoadPublications(data->publication_names, &skipped_pub);
 			MemoryContextSwitchTo(oldctx);
-			publications_valid = true;
+
+			/*
+			 * We don't consider the publications to be valid till we have
+			 * information of all the publications.
+			 */
+			if (!skipped_pub)
+				publications_valid = true;
 		}
 
 		/*
-- 
2.43.0

v3-0002-Added-an-option-skip_not_exist_publication-which-.patchapplication/octet-stream; name=v3-0002-Added-an-option-skip_not_exist_publication-which-.patchDownload
From e01a7cc4038b9923b47b08efbb98d7e824d25a4e Mon Sep 17 00:00:00 2001
From: Vignesh <vignesh21@gmail.com>
Date: Sun, 16 Feb 2025 20:00:42 +0530
Subject: [PATCH v3 2/2] Added an option skip_not_exist_publication which will
 skip loading the publication, if the publication does not exist.

Added an option skip_not_exist_publication which will skip loading the
publication, if the publication does not exist.
---
 doc/src/sgml/catalogs.sgml                    |  10 ++
 doc/src/sgml/ref/alter_subscription.sgml      |   5 +-
 doc/src/sgml/ref/create_subscription.sgml     |  11 ++
 src/backend/catalog/pg_subscription.c         |   1 +
 src/backend/catalog/system_views.sql          |   3 +-
 src/backend/commands/subscriptioncmds.c       |  26 ++-
 .../libpqwalreceiver/libpqwalreceiver.c       |   4 +
 src/backend/replication/logical/worker.c      |   4 +
 src/backend/replication/pgoutput/pgoutput.c   |  23 ++-
 src/bin/pg_dump/pg_dump.c                     |  15 ++
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/describe.c                       |   8 +-
 src/bin/psql/tab-complete.in.c                |  10 +-
 src/include/catalog/pg_subscription.h         |   5 +
 src/include/replication/pgoutput.h            |   1 +
 src/include/replication/walreceiver.h         |   1 +
 src/test/regress/expected/subscription.out    | 154 +++++++++---------
 src/test/regress/sql/subscription.sql         |   3 +
 src/test/subscription/t/031_column_list.pl    |  14 +-
 src/test/subscription/t/035_skip_not_exist.pl |  71 ++++++++
 20 files changed, 272 insertions(+), 98 deletions(-)
 create mode 100644 src/test/subscription/t/035_skip_not_exist.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index ee59a7e15d..2043c2199f 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -8067,6 +8067,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subskipnotexistpub</structfield> <type>bool</type>
+      </para>
+      <para>
+       If true, the publisher will skip loading the publication if the
+       publication does not exist
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subconninfo</structfield> <type>text</type>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index fdc648d007..02841c4c00 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -235,8 +235,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <link linkend="sql-createsubscription-params-with-password-required"><literal>password_required</literal></link>,
       <link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>,
       <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>,
-      <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, and
-      <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>.
+      <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
+      <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>, and
+      <link linkend="sql-createsubscription-params-with-skipnotexistpublication"><literal>skip_not_exist_publication</literal></link>, and
       Only a superuser can set <literal>password_required = false</literal>.
      </para>
 
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 6cf7d4f9a1..5884584249 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -435,6 +435,17 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+      </variablelist>
+
+       <varlistentry id="sql-createsubscription-params-with-skipnotexistpublication">
+        <term><literal>skip_not_exist_publication</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the publisher must skip loading the publication if
+          the publication does not exist. The default is <literal>false</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist></para>
 
     </listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 1395032413..8395b71302 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -103,6 +103,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->passwordrequired = subform->subpasswordrequired;
 	sub->runasowner = subform->subrunasowner;
 	sub->failover = subform->subfailover;
+	sub->skipnotexistpub = subform->subskipnotexistpub;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index eff0990957..1982e26c27 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1373,7 +1373,8 @@ REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
               subbinary, substream, subtwophasestate, subdisableonerr,
 			  subpasswordrequired, subrunasowner, subfailover,
-              subslotname, subsynccommit, subpublications, suborigin)
+              subskipnotexistpub, subslotname, subsynccommit, subpublications,
+              suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 2d8a71ca1e..b7f7eb24a4 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -73,6 +73,7 @@
 #define SUBOPT_FAILOVER				0x00002000
 #define SUBOPT_LSN					0x00004000
 #define SUBOPT_ORIGIN				0x00008000
+#define SUBOPT_SKIP_NOT_EXISTS_PUB  0x00010000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -98,6 +99,7 @@ typedef struct SubOpts
 	bool		passwordrequired;
 	bool		runasowner;
 	bool		failover;
+	bool		skipnotexistpub;
 	char	   *origin;
 	XLogRecPtr	lsn;
 } SubOpts;
@@ -162,6 +164,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->runasowner = false;
 	if (IsSet(supported_opts, SUBOPT_FAILOVER))
 		opts->failover = false;
+	if (IsSet(supported_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+		opts->skipnotexistpub = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
 
@@ -307,6 +311,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_FAILOVER;
 			opts->failover = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_SKIP_NOT_EXISTS_PUB) &&
+				 strcmp(defel->defname, "skip_not_exist_publication") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_SKIP_NOT_EXISTS_PUB;
+			opts->skipnotexistpub = defGetBoolean(defel);
+		}
 		else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
 				 strcmp(defel->defname, "origin") == 0)
 		{
@@ -563,7 +576,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
-					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
+					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+					  SUBOPT_SKIP_NOT_EXISTS_PUB | SUBOPT_ORIGIN);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -670,6 +684,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
 	values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
 	values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
+	values[Anum_pg_subscription_subskipnotexistpub - 1] =
+		BoolGetDatum(opts.skipnotexistpub);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -1165,6 +1181,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								  SUBOPT_DISABLE_ON_ERR |
 								  SUBOPT_PASSWORD_REQUIRED |
 								  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+								  SUBOPT_SKIP_NOT_EXISTS_PUB |
 								  SUBOPT_ORIGIN);
 
 				parse_subscription_options(pstate, stmt->options,
@@ -1308,6 +1325,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+				{
+					values[Anum_pg_subscription_subskipnotexistpub - 1] =
+						BoolGetDatum(opts.skipnotexistpub);
+					replaces[Anum_pg_subscription_subskipnotexistpub - 1] = true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
 				{
 					/*
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 1b158c9d28..aa6a737c9e 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -599,6 +599,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			appendStringInfo(&cmd, ", origin '%s'",
 							 options->proto.logical.origin);
 
+		if (options->proto.logical.skipnotexistpub &&
+			PQserverVersion(conn->streamConn) >= 170000)
+			appendStringInfo(&cmd, ", skip_not_exist_publication 'true'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f09ab41c60..48398f2472 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4023,6 +4023,7 @@ maybe_reread_subscription(void)
 		newsub->binary != MySubscription->binary ||
 		newsub->stream != MySubscription->stream ||
 		newsub->passwordrequired != MySubscription->passwordrequired ||
+		newsub->skipnotexistpub != MySubscription->skipnotexistpub ||
 		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
 		!equal(newsub->publications, MySubscription->publications))
@@ -4451,6 +4452,9 @@ set_stream_options(WalRcvStreamOptions *options,
 	options->proto.logical.publication_names = MySubscription->publications;
 	options->proto.logical.binary = MySubscription->binary;
 
+	if (server_version >= 170000 && MySubscription->skipnotexistpub)
+		options->proto.logical.skipnotexistpub = true;
+
 	/*
 	 * Assign the appropriate option value for streaming option according to
 	 * the 'streaming' mode and the publisher's ability to support that mode.
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 048266ba77..2bc6c3378d 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -82,7 +82,8 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 
 static bool publications_valid;
 
-static List *LoadPublications(List *pubnames, bool *skipped);
+static List *LoadPublications(List *pubnames, bool skipnotexistpub,
+							  bool *skipped);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_repl_origin(LogicalDecodingContext *ctx,
@@ -293,11 +294,13 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 	bool		origin_option_given = false;
+	bool		skipnotexistpub_option_given = false;
 
 	data->binary = false;
 	data->streaming = LOGICALREP_STREAM_OFF;
 	data->messages = false;
 	data->two_phase = false;
+	data->skipnotexistpub = false;
 
 	foreach(lc, options)
 	{
@@ -406,6 +409,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						errmsg("unrecognized origin value: \"%s\"", origin));
 		}
+		else if (strcmp(defel->defname, "skip_not_exist_publication") == 0)
+		{
+			if (skipnotexistpub_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			skipnotexistpub_option_given = true;
+
+			data->skipnotexistpub = true;
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -1768,7 +1781,7 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
  * exist.
  */
 static List *
-LoadPublications(List *pubnames, bool *skipped)
+LoadPublications(List *pubnames, bool skipnotexistpub, bool *skipped)
 {
 	List	   *result = NIL;
 	ListCell   *lc;
@@ -1776,7 +1789,7 @@ LoadPublications(List *pubnames, bool *skipped)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, true);
+		Publication *pub = GetPublicationByName(pubname, skipnotexistpub);
 
 		if (pub)
 			result = lappend(result, pub);
@@ -2086,7 +2099,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			MemoryContextReset(data->pubctx);
 
 			oldctx = MemoryContextSwitchTo(data->pubctx);
-			data->publications = LoadPublications(data->publication_names, &skipped_pub);
+			data->publications = LoadPublications(data->publication_names,
+												  data->skipnotexistpub,
+												  &skipped_pub);
 			MemoryContextSwitchTo(oldctx);
 
 			/*
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 30dfda8c3f..492cdf7a70 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4871,6 +4871,7 @@ getSubscriptions(Archive *fout)
 	int			i_suboriginremotelsn;
 	int			i_subenabled;
 	int			i_subfailover;
+	int			i_subskipnotexistpublication;
 	int			i,
 				ntups;
 
@@ -4934,6 +4935,13 @@ getSubscriptions(Archive *fout)
 						  " '%s' AS suborigin,\n",
 						  LOGICALREP_ORIGIN_ANY);
 
+	if (fout->remoteVersion >= 170000)
+		appendPQExpBufferStr(query,
+							 " s.subskipnotexistpub,\n");
+	else
+		appendPQExpBuffer(query,
+						  " false AS subskipnotexistpub\n");
+
 	if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
 		appendPQExpBufferStr(query, " o.remote_lsn AS suboriginremotelsn,\n"
 							 " s.subenabled,\n");
@@ -4986,6 +4994,7 @@ getSubscriptions(Archive *fout)
 	i_subpublications = PQfnumber(res, "subpublications");
 	i_suborigin = PQfnumber(res, "suborigin");
 	i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
+	i_subskipnotexistpublication = PQfnumber(res, "subskipnotexistpub");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -5013,6 +5022,9 @@ getSubscriptions(Archive *fout)
 			(strcmp(PQgetvalue(res, i, i_subrunasowner), "t") == 0);
 		subinfo[i].subfailover =
 			(strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0);
+		subinfo[i].subskipnotexistpublication =
+			pg_strdup(PQgetvalue(res, i, i_subskipnotexistpublication));
+
 		subinfo[i].subconninfo =
 			pg_strdup(PQgetvalue(res, i, i_subconninfo));
 		if (PQgetisnull(res, i, i_subslotname))
@@ -5277,6 +5289,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (pg_strcasecmp(subinfo->suborigin, LOGICALREP_ORIGIN_ANY) != 0)
 		appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin);
 
+	if (strcmp(subinfo->subskipnotexistpublication, "t") == 0)
+		appendPQExpBuffer(query, ", skip_not_exist_publication = true");
+
 	appendPQExpBufferStr(query, ");\n");
 
 	/*
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 7139c88a69..e157547532 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -687,6 +687,7 @@ typedef struct _SubscriptionInfo
 	char	   *subpublications;
 	char	   *suborigin;
 	char	   *suboriginremotelsn;
+	char       *subskipnotexistpublication;
 } SubscriptionInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 3b7ba66fad..b9353c4020 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6747,7 +6747,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
 		false, false, false, false, false, false, false, false, false, false,
-	false};
+	false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6813,8 +6813,10 @@ describeSubscriptions(const char *pattern, bool verbose)
 
 		if (pset.sversion >= 170000)
 			appendPQExpBuffer(&buf,
-							  ", subfailover AS \"%s\"\n",
-							  gettext_noop("Failover"));
+							  ", subfailover AS \"%s\"\n"
+							  ", subskipnotexistpub AS \"%s\"\n",
+							  gettext_noop("Failover"),
+							  gettext_noop("Skipnotexistpub"));
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index a9a81ab3c1..18176c6bce 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -2283,8 +2283,9 @@ match_previous_words(int pattern_id,
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "("))
 		COMPLETE_WITH("binary", "disable_on_error", "failover", "origin",
-					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit", "two_phase");
+					  "password_required", "run_as_owner",
+					  "skip_not_exist_publication", "slot_name", "streaming",
+					  "synchronous_commit", "two_phase");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SKIP", "("))
 		COMPLETE_WITH("lsn");
@@ -3693,8 +3694,9 @@ match_previous_words(int pattern_id,
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
 					  "disable_on_error", "enabled", "failover", "origin",
-					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit", "two_phase");
+					  "password_required", "run_as_owner",
+					  "skip_not_exist_publication", "slot_name" "streaming",
+					  "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 20fc329992..ae75dccb89 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -78,6 +78,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
 
+	bool		subskipnotexistpub;	/* True if the not-exist publications should
+									 * be ignored */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -131,6 +134,8 @@ typedef struct Subscription
 								 * (i.e. the main slot and the table sync
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
+	bool		skipnotexistpub; /* True if the non-existent publications should
+								  * be ignored. */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index e53456c260..8691c461bd 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -34,6 +34,7 @@ typedef struct PGOutputData
 	bool		messages;
 	bool		two_phase;
 	bool		publish_no_origin;
+	bool		skipnotexistpub;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 204419bd8a..a87e6b79cb 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -187,6 +187,7 @@ typedef struct
 									 * prepare time */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
+			bool		skipnotexistpub;
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 1443e1d929..6ca0618f46 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+ regress_testsub4
-                                                                                                                 List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | none   | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                          List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | none   | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                                                 List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                          List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -156,15 +156,17 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist2';
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
 ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
 ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
+ALTER SUBSCRIPTION regress_testsub SET (skip_not_exist_publication = true);
 \dRs+
-                                                                                                                     List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | f                 | t             | f        | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                                                              List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | f                 | t             | f        | t               | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
 ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = false);
+ALTER SUBSCRIPTION regress_testsub SET (skip_not_exist_publication = false);
 -- fail
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = '');
 ERROR:  replication slot name "" is too short
@@ -176,10 +178,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                                                     List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist2 | 0/12345
+                                                                                                                              List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist2 | 0/12345
 (1 row)
 
 -- ok - with lsn = NONE
@@ -188,10 +190,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                                                     List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                                                              List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -223,10 +225,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                                                       List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | local              | dbname=regress_doesnotexist2 | 0/0
+                                                                                                                                List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | f               | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -255,19 +257,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | parallel  | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -279,27 +281,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -314,10 +316,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                                        List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more than once
@@ -332,10 +334,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -371,19 +373,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | p                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- we can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -393,10 +395,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -409,18 +411,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | t                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Skipnotexistpub | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | t                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 007c9e7037..87715c2483 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -95,10 +95,13 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist2';
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
 ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
 ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
+ALTER SUBSCRIPTION regress_testsub SET (skip_not_exist_publication = true);
 \dRs+
 
 ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
 ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = false);
+ALTER SUBSCRIPTION regress_testsub SET (skip_not_exist_publication = false);
+
 
 -- fail
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = '');
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index e859bcdf4e..c88f96f227 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -145,7 +145,7 @@ $node_publisher->safe_psql(
 # then check the sync results
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (skip_not_exist_publication = true)
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -749,7 +749,7 @@ $node_publisher->safe_psql(
 $node_subscriber->safe_psql(
 	'postgres', qq(
 	DROP SUBSCRIPTION sub1;
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -931,7 +931,7 @@ $node_publisher->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -983,7 +983,7 @@ $node_publisher->safe_psql(
 # both table sync and data replication.
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -1227,7 +1227,7 @@ $node_subscriber->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_7, pub_mix_8;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_7, pub_mix_8 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -1269,7 +1269,7 @@ $node_subscriber->safe_psql(
 
 my ($cmdret, $stdout, $stderr) = $node_subscriber->psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2 WITH (skip_not_exist_publication = true);
 ));
 
 ok( $stderr =~
@@ -1286,7 +1286,7 @@ $node_publisher->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2 WITH (skip_not_exist_publication = true);
 ));
 
 $node_publisher->wait_for_catchup('sub1');
diff --git a/src/test/subscription/t/035_skip_not_exist.pl b/src/test/subscription/t/035_skip_not_exist.pl
new file mode 100644
index 0000000000..b7cc9514c0
--- /dev/null
+++ b/src/test/subscription/t/035_skip_not_exist.pl
@@ -0,0 +1,71 @@
+
+# Copyright (c) 2021-2024, PostgreSQL Global Development Group
+
+# This test checks behaviour of SUBSCRIPTION with skip_not_exist_publication
+# option.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create table on publisher
+$node_publisher->safe_psql(
+	'postgres', qq[
+		CREATE TABLE tab_1 (a int);
+		CREATE TABLE tab_2 (a int);
+]);
+
+# Create table on subscriber
+$node_subscriber->safe_psql(
+	'postgres', qq[
+		CREATE TABLE tab_1 (a int);
+		CREATE TABLE tab_2 (a int);
+]);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql(
+	'postgres', qq[
+		CREATE PUBLICATION tap_pub_1 FOR TABLE tab_1;
+		CREATE PUBLICATION tap_pub_2 FOR TABLE tab_2;
+]);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub_1, tap_pub_2 with (skip_not_exist_publication=true)"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+# Drop one of the publications
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_1");
+
+# Insert few records
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_2 SELECT generate_series(1,10)");
+
+# Wait for table data to be replicated
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+# Check that the inserted data was copied to the subscriber even after one of
+# the subscription's publication was dropped.
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab_2");
+is($result, qq(10|1|10), 'check inserted data is copied to subscriber');
+
+# shutdown
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
+
+done_testing();
-- 
2.43.0

test_skip_pub_1702.shtext/x-sh; charset=US-ASCII; name=test_skip_pub_1702.shDownload
insert.shtext/x-sh; charset=US-ASCII; name=insert.shDownload
#5Amit Kapila
amit.kapila16@gmail.com
In reply to: vignesh C (#4)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Tue, Feb 18, 2025 at 2:24 PM vignesh C <vignesh21@gmail.com> wrote:

On Fri, 14 Feb 2025 at 15:36, Amit Kapila <amit.kapila16@gmail.com> wrote:

Did you try to measure the performance impact of this change? We can
try a few cases where DDL and DMLs are involved, missing publication
(drop publication and recreate after a varying number of records to
check the impact).

Since we don't have an exact scenario to compare with the patch
(because, in the current HEAD, when the publication is missing, an
error is thrown and the walsender/worker restarts), I compared the
positive case, where records are successfully replicated to the
subscriber, as shown below. For the scenario with the patch, I ran the
same test, where the publication is dropped before the insert,
allowing the walsender to check whether the publication is present.
The test results, which represent the median of 7 runs and the
execution run is in milliseconds, are provided below:

Brach/records | 100 | 1000 | 10000 | 100000 | 1000000
Head | 1.214 | 2.548 | 10.823 | 90.3 | 951.833
Patch | 1.215 | 2.5485 | 10.8545 | 90.94 | 955.134
% diff | 0.082 | 0.020 | 0.291 | 0.704 | 0.347

I noticed that the test run with patches is very negligible. The
scripts used for execution are attached.

You have used the synchronous_standby_name to evaluate the performance
which covers other parts of replication than the logical decoding. It
would be better to test using pg_recvlogical.

--
With Regards,
Amit Kapila.

#6vignesh C
vignesh21@gmail.com
In reply to: Amit Kapila (#5)
1 attachment(s)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Tue, 18 Feb 2025 at 16:53, Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Feb 18, 2025 at 2:24 PM vignesh C <vignesh21@gmail.com> wrote:

On Fri, 14 Feb 2025 at 15:36, Amit Kapila <amit.kapila16@gmail.com> wrote:

Did you try to measure the performance impact of this change? We can
try a few cases where DDL and DMLs are involved, missing publication
(drop publication and recreate after a varying number of records to
check the impact).

Since we don't have an exact scenario to compare with the patch
(because, in the current HEAD, when the publication is missing, an
error is thrown and the walsender/worker restarts), I compared the
positive case, where records are successfully replicated to the
subscriber, as shown below. For the scenario with the patch, I ran the
same test, where the publication is dropped before the insert,
allowing the walsender to check whether the publication is present.
The test results, which represent the median of 7 runs and the
execution run is in milliseconds, are provided below:

Brach/records | 100 | 1000 | 10000 | 100000 | 1000000
Head | 1.214 | 2.548 | 10.823 | 90.3 | 951.833
Patch | 1.215 | 2.5485 | 10.8545 | 90.94 | 955.134
% diff | 0.082 | 0.020 | 0.291 | 0.704 | 0.347

I noticed that the test run with patches is very negligible. The
scripts used for execution are attached.

You have used the synchronous_standby_name to evaluate the performance
which covers other parts of replication than the logical decoding. It
would be better to test using pg_recvlogical.

Here are the test runs with pg_recvlogical, the test results, which
represent the median of 10 runs and the execution run is in
milliseconds, are provided below:
Brach/records | 100 | 1000 | 10000 | 100000 | 1000000
Head | 9.95 | 15.26 | 62.62 | 536.57 | 8480.83
Patch | 9.218 | 10.32 | 23.05 | 143.83 | 4852.43
% diff | 7.356 | 32.38 | 63.19 | 73.193| 42.783

We observe that test execution with the patch performs better between
7.35 percent to 73.19 percent. This is because, in HEAD, after loading
and verifying that the publication is valid, it must continue
processing to output the change. In contrast, with the patch,
outputting the change is skipped since the publication does not exist.

The attached script has the script that was used for testing. Here the
NUM_RECORDS count should be changed accordingly for each of the tests
and while running the test with the patch change uncomment the drop
publication command.

Regards,
Vignesh

Attachments:

test_pg_recvlogical.shtext/x-sh; charset=US-ASCII; name=test_pg_recvlogical.shDownload
#7vignesh C
vignesh21@gmail.com
In reply to: vignesh C (#6)
2 attachment(s)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Tue, 25 Feb 2025 at 15:32, vignesh C <vignesh21@gmail.com> wrote:

The attached script has the script that was used for testing. Here the
NUM_RECORDS count should be changed accordingly for each of the tests
and while running the test with the patch change uncomment the drop
publication command.

I have done further analysis on the test and changed the test to
compare it better with HEAD. The execution time is in milliseconds.
Brach/records | 100 | 1000 | 10000 | 100000 | 1000000
Head | 10.43 | 15.86 | 64.44 | 550.56 | 8991.04
Patch | 11.35 | 17.26 | 73.50 | 640.21 | 10104.72
% diff | -8.82 | -8.85 | -14.08 | -16.28 | -12.38

There is a performance degradation in the range of 8.8 to 16.2 percent.

Test Details (With Head):
a) Create two publications for the same table. b) Insert the records
listed in the table below. c) Use pg_recvlogical to capture the
changes.
Test Details (With Patch):
a) Create two publications for the same table.b) Drop one
publication(to check the impact of skip missing publication), ensuring
that changes from the remaining publication continue to be captured.
c) Insert the records listed in the table below.d) Use pg_recvlogical
to capture the changes.

The performance degradation is in the range of 8.8 to 16.2
percentage.The script used for the testing is attached, while running
with patch the drop publication command in script should be
uncommented and the record count should be changed for each of the
run. Also I changed the patch so that we need not execute the
get_rel_sync_entry code flow for every record in case of missing
publication case and to do so only in case when the publications have
changed. The attached v4 version patch has the changes for the same.

Regards,
Vignesh

Attachments:

v4-0001-Fix-logical-replication-breakage-after.patchapplication/octet-stream; name=v4-0001-Fix-logical-replication-breakage-after.patchDownload
From f2d31bdf99240288349a8a3acebc78391c8275f4 Mon Sep 17 00:00:00 2001
From: Vignesh <vignesh21@gmail.com>
Date: Mon, 3 Mar 2025 11:54:27 +0530
Subject: [PATCH v4] Fix logical replication breakage after ALTER SUBSCRIPTION
 ... SET PUBLICATION

Altering a subscription with `ALTER SUBSCRIPTION ... SET PUBLICATION`
could cause logical replication to break under certain conditions. When
the apply worker restarts after executing SET PUBLICATION, it continues
using the existing replication slot and origin. If the replication origin
was not updated before the restart, the WAL start location could point to
a position prior to the existence of the specified publication, leading to
persistent start of apply worker and reporting errors.

This patch skips loading the publication if the publication does not exist
and loads the publication and updates the relation entry when the publication
gets created.

Discussion: https://www.postgresql.org/message-id/flat/CALDaNm0-n8FGAorM%2BbTxkzn%2BAOUyx5%3DL_XmnvOP6T24%2B-NcBKg%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/CAA4eK1+T-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q@mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 74 ++++++++++++++++++---
 1 file changed, 66 insertions(+), 8 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7d464f656aa..64c31cc2758 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -82,7 +82,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 
 static bool publications_valid;
 
-static List *LoadPublications(List *pubnames);
+static List *LoadPublications(List *pubnames, bool *skipped);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_repl_origin(LogicalDecodingContext *ctx,
@@ -1762,9 +1762,13 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 
 /*
  * Load publications from the list of publication names.
+ *
+ * Here, we just skip the publications that don't exist yet. 'skipped'
+ * will be true if we find any publication from the given list that doesn't
+ * exist.
  */
 static List *
-LoadPublications(List *pubnames)
+LoadPublications(List *pubnames, bool *skipped)
 {
 	List	   *result = NIL;
 	ListCell   *lc;
@@ -1772,9 +1776,15 @@ LoadPublications(List *pubnames)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, false);
+		Publication *pub = GetPublicationByName(pubname, true);
 
-		result = lappend(result, pub);
+		if (pub)
+			result = lappend(result, pub);
+		else
+		{
+			elog(DEBUG1, "skipped loading publication: %s", pubname);
+			*skipped = true;
+		}
 	}
 
 	return result;
@@ -2026,6 +2036,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 	bool		found;
 	MemoryContext oldctx;
 	Oid			relid = RelationGetRelid(relation);
+	bool		publications_updated = false;
 
 	Assert(RelationSyncCache != NULL);
 
@@ -2053,8 +2064,48 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->attrmap = NULL;
 	}
 
-	/* Validate the entry */
-	if (!entry->replicate_valid)
+	/*
+	 * If the publication is invalid, check for updates.
+	 * This optimization ensures that the next block, which queries the system
+	 * tables and builds the relation entry, runs only if a new publication was
+	 * created.
+	 */
+	if (!publications_valid && data->publications)
+	{
+		bool		skipped_pub = false;
+		List	   *publications;
+
+		publications = LoadPublications(data->publication_names, &skipped_pub);
+
+		/* Check if any new publications have been created. */
+		foreach_ptr(Publication, pub1, publications)
+		{
+			bool match = false;
+			foreach_ptr(Publication, pub2, data->publications)
+			{
+				if (strcmp(pub1->name, pub2->name) == 0)
+				{
+					match = true;
+					break;
+				}
+			}
+
+			if (!match)
+			{
+				publications_updated = true;
+				break;
+			}
+		}
+
+		list_free(publications);
+	}
+
+	/*
+	 * Validate the entry only if the entry is not valid or in case a new
+	 * publication has been added.
+	 */
+	if (!entry->replicate_valid ||
+		(!publications_valid && publications_updated))
 	{
 		Oid			schemaId = get_rel_namespace(relid);
 		List	   *pubids = GetRelationPublications(relid);
@@ -2071,6 +2122,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		bool		am_partition = get_rel_relispartition(relid);
 		char		relkind = get_rel_relkind(relid);
 		List	   *rel_publications = NIL;
+		bool		skipped_pub = false;
 
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
@@ -2078,9 +2130,15 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			MemoryContextReset(data->pubctx);
 
 			oldctx = MemoryContextSwitchTo(data->pubctx);
-			data->publications = LoadPublications(data->publication_names);
+			data->publications = LoadPublications(data->publication_names, &skipped_pub);
 			MemoryContextSwitchTo(oldctx);
-			publications_valid = true;
+
+			/*
+			 * We don't consider the publications to be valid till we have
+			 * information of all the publications.
+			 */
+			if (!skipped_pub)
+				publications_valid = true;
 		}
 
 		/*
-- 
2.43.0

test_pg_recvlogical.shtext/x-sh; charset=US-ASCII; name=test_pg_recvlogical.shDownload
#8Amit Kapila
amit.kapila16@gmail.com
In reply to: vignesh C (#7)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Mon, Mar 3, 2025 at 2:30 PM vignesh C <vignesh21@gmail.com> wrote:

On Tue, 25 Feb 2025 at 15:32, vignesh C <vignesh21@gmail.com> wrote:

The attached script has the script that was used for testing. Here the
NUM_RECORDS count should be changed accordingly for each of the tests
and while running the test with the patch change uncomment the drop
publication command.

I have done further analysis on the test and changed the test to
compare it better with HEAD. The execution time is in milliseconds.
Brach/records | 100 | 1000 | 10000 | 100000 | 1000000
Head | 10.43 | 15.86 | 64.44 | 550.56 | 8991.04
Patch | 11.35 | 17.26 | 73.50 | 640.21 | 10104.72
% diff | -8.82 | -8.85 | -14.08 | -16.28 | -12.38

There is a performance degradation in the range of 8.8 to 16.2 percent.

- /* Validate the entry */
- if (!entry->replicate_valid)
+ /*
+ * If the publication is invalid, check for updates.
+ * This optimization ensures that the next block, which queries the system
+ * tables and builds the relation entry, runs only if a new publication was
+ * created.
+ */
+ if (!publications_valid && data->publications)
+ {
+ bool skipped_pub = false;
+ List    *publications;
+
+ publications = LoadPublications(data->publication_names, &skipped_pub);

The publications_valid flag indicates whether the publications cache
is valid or not; the flag is set to false for any invalidation in the
pg_publication catalog. I wonder that instead of using the same flag
what if we use a separate publications_skipped flag? If that works,
you don't even need to change the current location where we
LoadPublications.

--
With Regards,
Amit Kapila.

#9vignesh C
vignesh21@gmail.com
In reply to: Amit Kapila (#8)
2 attachment(s)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Mon, 3 Mar 2025 at 16:41, Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Mar 3, 2025 at 2:30 PM vignesh C <vignesh21@gmail.com> wrote:

On Tue, 25 Feb 2025 at 15:32, vignesh C <vignesh21@gmail.com> wrote:

The attached script has the script that was used for testing. Here the
NUM_RECORDS count should be changed accordingly for each of the tests
and while running the test with the patch change uncomment the drop
publication command.

I have done further analysis on the test and changed the test to
compare it better with HEAD. The execution time is in milliseconds.
Brach/records | 100 | 1000 | 10000 | 100000 | 1000000
Head | 10.43 | 15.86 | 64.44 | 550.56 | 8991.04
Patch | 11.35 | 17.26 | 73.50 | 640.21 | 10104.72
% diff | -8.82 | -8.85 | -14.08 | -16.28 | -12.38

There is a performance degradation in the range of 8.8 to 16.2 percent.

- /* Validate the entry */
- if (!entry->replicate_valid)
+ /*
+ * If the publication is invalid, check for updates.
+ * This optimization ensures that the next block, which queries the system
+ * tables and builds the relation entry, runs only if a new publication was
+ * created.
+ */
+ if (!publications_valid && data->publications)
+ {
+ bool skipped_pub = false;
+ List    *publications;
+
+ publications = LoadPublications(data->publication_names, &skipped_pub);

The publications_valid flag indicates whether the publications cache
is valid or not; the flag is set to false for any invalidation in the
pg_publication catalog. I wonder that instead of using the same flag
what if we use a separate publications_skipped flag? If that works,
you don't even need to change the current location where we
LoadPublications.

There is almost negligible dip with the above suggested way, the test
results for the same is given below(execution time is in milli
seconds):
Brach/records | 100 | 1000 | 10000 | 100000 | 1000000
Head | 10.25 | 15.85 | 65.53 | 569.15 | 9194.19
Patch | 10.25 | 15.84 | 65.91 | 571.75 | 9208.66
% diff | 0.00 | 0.06 | -0.58 | -0.46 | -0.16

There is a performance dip in the range of 0 to 0.58 percent.
The attached patch has the changes for the same. The test script used
is also attached.

Regards,
Vignesh

Attachments:

v5-0001-Fix-logical-replication-breakage-after.patchapplication/x-patch; name=v5-0001-Fix-logical-replication-breakage-after.patchDownload
From 8f2bca5afc03f3e34b2a0955415a702cbe6aef79 Mon Sep 17 00:00:00 2001
From: Vignesh <vignesh21@gmail.com>
Date: Mon, 3 Mar 2025 11:54:27 +0530
Subject: [PATCH v5] Fix logical replication breakage after ALTER SUBSCRIPTION
 ... SET PUBLICATION

Altering a subscription with `ALTER SUBSCRIPTION ... SET PUBLICATION`
could cause logical replication to break under certain conditions. When
the apply worker restarts after executing SET PUBLICATION, it continues
using the existing replication slot and origin. If the replication origin
was not updated before the restart, the WAL start location could point to
a position prior to the existence of the specified publication, leading to
persistent start of apply worker and reporting errors.

This patch skips loading the publication if the publication does not exist
and loads the publication and updates the relation entry when the publication
gets created.

Discussion: https://www.postgresql.org/message-id/flat/CALDaNm0-n8FGAorM%2BbTxkzn%2BAOUyx5%3DL_XmnvOP6T24%2B-NcBKg%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/CAA4eK1+T-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q@mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 41 +++++++++++++++++----
 1 file changed, 33 insertions(+), 8 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7d464f656aa..46793efe2b5 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -81,8 +81,9 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 										ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
 
 static bool publications_valid;
+static bool publications_updated;
 
-static List *LoadPublications(List *pubnames);
+static List *LoadPublications(List *pubnames, bool *skipped);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_repl_origin(LogicalDecodingContext *ctx,
@@ -1762,9 +1763,13 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 
 /*
  * Load publications from the list of publication names.
+ *
+ * Here, we just skip the publications that don't exist yet. 'skipped'
+ * will be true if we find any publication from the given list that doesn't
+ * exist.
  */
 static List *
-LoadPublications(List *pubnames)
+LoadPublications(List *pubnames, bool *skipped)
 {
 	List	   *result = NIL;
 	ListCell   *lc;
@@ -1772,9 +1777,15 @@ LoadPublications(List *pubnames)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, false);
+		Publication *pub = GetPublicationByName(pubname, true);
 
-		result = lappend(result, pub);
+		if (pub)
+			result = lappend(result, pub);
+		else
+		{
+			elog(DEBUG1, "skipped loading publication: %s", pubname);
+			*skipped = true;
+		}
 	}
 
 	return result;
@@ -1789,6 +1800,7 @@ static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	publications_valid = false;
+	publications_updated = true;
 
 	/*
 	 * Also invalidate per-relation cache so that next time the filtering info
@@ -2053,8 +2065,12 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->attrmap = NULL;
 	}
 
-	/* Validate the entry */
-	if (!entry->replicate_valid)
+	/*
+	 * Validate the entry only if the entry is not valid or in case the
+	 * publications have been updated.
+	 */
+	if (!entry->replicate_valid ||
+		(!publications_valid && publications_updated))
 	{
 		Oid			schemaId = get_rel_namespace(relid);
 		List	   *pubids = GetRelationPublications(relid);
@@ -2071,16 +2087,25 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		bool		am_partition = get_rel_relispartition(relid);
 		char		relkind = get_rel_relkind(relid);
 		List	   *rel_publications = NIL;
+		bool		skipped_pub = false;
 
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
 		{
+			publications_updated = false;
+
 			MemoryContextReset(data->pubctx);
 
 			oldctx = MemoryContextSwitchTo(data->pubctx);
-			data->publications = LoadPublications(data->publication_names);
+			data->publications = LoadPublications(data->publication_names, &skipped_pub);
 			MemoryContextSwitchTo(oldctx);
-			publications_valid = true;
+
+			/*
+			 * We don't consider the publications to be valid till we have
+			 * information of all the publications.
+			 */
+			if (!skipped_pub)
+				publications_valid = true;
 		}
 
 		/*
-- 
2.43.0

test_pg_recvlogical.shtext/x-sh; charset=US-ASCII; name=test_pg_recvlogical.shDownload
#10vignesh C
vignesh21@gmail.com
In reply to: vignesh C (#9)
1 attachment(s)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Tue, 4 Mar 2025 at 12:22, vignesh C <vignesh21@gmail.com> wrote:

On Mon, 3 Mar 2025 at 16:41, Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Mar 3, 2025 at 2:30 PM vignesh C <vignesh21@gmail.com> wrote:

On Tue, 25 Feb 2025 at 15:32, vignesh C <vignesh21@gmail.com> wrote:

The attached script has the script that was used for testing. Here the
NUM_RECORDS count should be changed accordingly for each of the tests
and while running the test with the patch change uncomment the drop
publication command.

I have done further analysis on the test and changed the test to
compare it better with HEAD. The execution time is in milliseconds.
Brach/records | 100 | 1000 | 10000 | 100000 | 1000000
Head | 10.43 | 15.86 | 64.44 | 550.56 | 8991.04
Patch | 11.35 | 17.26 | 73.50 | 640.21 | 10104.72
% diff | -8.82 | -8.85 | -14.08 | -16.28 | -12.38

There is a performance degradation in the range of 8.8 to 16.2 percent.

- /* Validate the entry */
- if (!entry->replicate_valid)
+ /*
+ * If the publication is invalid, check for updates.
+ * This optimization ensures that the next block, which queries the system
+ * tables and builds the relation entry, runs only if a new publication was
+ * created.
+ */
+ if (!publications_valid && data->publications)
+ {
+ bool skipped_pub = false;
+ List    *publications;
+
+ publications = LoadPublications(data->publication_names, &skipped_pub);

The publications_valid flag indicates whether the publications cache
is valid or not; the flag is set to false for any invalidation in the
pg_publication catalog. I wonder that instead of using the same flag
what if we use a separate publications_skipped flag? If that works,
you don't even need to change the current location where we
LoadPublications.

There is almost negligible dip with the above suggested way, the test
results for the same is given below(execution time is in milli
seconds):
Brach/records | 100 | 1000 | 10000 | 100000 | 1000000
Head | 10.25 | 15.85 | 65.53 | 569.15 | 9194.19
Patch | 10.25 | 15.84 | 65.91 | 571.75 | 9208.66
% diff | 0.00 | 0.06 | -0.58 | -0.46 | -0.16

There is a performance dip in the range of 0 to 0.58 percent.
The attached patch has the changes for the same. The test script used
is also attached.

On further thinking, I felt the use of publications_updated variable
is not required we can use publications_valid itself which will be set
if the publication system table is invalidated. Here is a patch for
the same.

Regards,
Vignesh

Attachments:

v6-0001-Fix-logical-replication-breakage-after-ALTER-SUBS.patchtext/x-patch; charset=US-ASCII; name=v6-0001-Fix-logical-replication-breakage-after-ALTER-SUBS.patchDownload
From 856403e4943954267b0e9b4fa96ccc6955838210 Mon Sep 17 00:00:00 2001
From: Vignesh <vignesh21@gmail.com>
Date: Mon, 3 Mar 2025 11:54:27 +0530
Subject: [PATCH v6] Fix logical replication breakage after ALTER SUBSCRIPTION
 ... SET PUBLICATION

Altering a subscription with `ALTER SUBSCRIPTION ... SET PUBLICATION`
could cause logical replication to break under certain conditions. When
the apply worker restarts after executing SET PUBLICATION, it continues
using the existing replication slot and origin. If the replication origin
was not updated before the restart, the WAL start location could point to
a position prior to the existence of the specified publication, leading to
persistent start of apply worker and reporting errors.

This patch skips loading the publication if the publication does not exist
and loads the publication and updates the relation entry when the publication
gets created.

Discussion: https://www.postgresql.org/message-id/flat/CALDaNm0-n8FGAorM%2BbTxkzn%2BAOUyx5%3DL_XmnvOP6T24%2B-NcBKg%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/CAA4eK1+T-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q@mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7d464f656aa..4cfd2b02023 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1762,6 +1762,8 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 
 /*
  * Load publications from the list of publication names.
+ *
+ * Here, we just skip the publications that don't exist yet.
  */
 static List *
 LoadPublications(List *pubnames)
@@ -1772,9 +1774,12 @@ LoadPublications(List *pubnames)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, false);
+		Publication *pub = GetPublicationByName(pubname, true);
 
-		result = lappend(result, pub);
+		if (pub)
+			result = lappend(result, pub);
+		else
+			elog(WARNING, "skipped loading publication: %s", pubname);
 	}
 
 	return result;
@@ -2053,8 +2058,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->attrmap = NULL;
 	}
 
-	/* Validate the entry */
-	if (!entry->replicate_valid)
+	/*
+	 * Validate the entry if (a) the entry is not valid, or (b) there is a
+	 * change in the publications.
+	 */
+	if (!entry->replicate_valid || !publications_valid)
 	{
 		Oid			schemaId = get_rel_namespace(relid);
 		List	   *pubids = GetRelationPublications(relid);
-- 
2.43.0

#11Amit Kapila
amit.kapila16@gmail.com
In reply to: vignesh C (#9)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Tue, Mar 4, 2025 at 12:23 PM vignesh C <vignesh21@gmail.com> wrote:

There is almost negligible dip with the above suggested way, the test
results for the same is given below(execution time is in milli
seconds):
Brach/records | 100 | 1000 | 10000 | 100000 | 1000000
Head | 10.25 | 15.85 | 65.53 | 569.15 | 9194.19
Patch | 10.25 | 15.84 | 65.91 | 571.75 | 9208.66
% diff | 0.00 | 0.06 | -0.58 | -0.46 | -0.16

There is a performance dip in the range of 0 to 0.58 percent.
The attached patch has the changes for the same. The test script used
is also attached.

The patch still needs more review but the change has negligible
performance impact. The next step is to get more opinions on whether
we should add a new subscription option (say
skip_not_existant_publication) for this work. See patch v1-0002-* in
email [1]/messages/by-id/CALDaNm0-n8FGAorM+bTxkzn+AOUyx5=L_XmnvOP6T24+-NcBKg@mail.gmail.com. The problem summary is explained in email [2]/messages/by-id/CAA4eK1Lc=NDV1HrY2gNasFK90MtysnA575a+rd0p+POjXN+Spw@mail.gmail.com and in the
commit message of the 0001 patch in this thread. But still, let me
write briefly for the ease of others.

The problem is that ALTER SUBSCRIPTION ... SET PUBLICATION ... will
lead to restarting of apply worker, and after the restart, the apply
worker will use the existing slot and replication origin corresponding
to the subscription. Now, it is possible that before the restart, the
origin has not been updated, and the WAL start location points to a
location before where PUBLICATION pointed to by SET PUBLICATION
exists. This leads to an error: "ERROR: publication "pub1" does not
exist". Once this error occurs, apply worker will never be able to
proceed and will always return the same error. For users, this is a
problem because they would have created a publication before executing
ALTER SUBSCRIPTION ... SET PUBLICATION .. and now they have no way to
proceed.

The solution we came up with is to skip loading the publication if the
publication does not exist. We load the publication later and update
the relation entry when the publication gets created.

The two main concerns with this idea, as shared in email [3]/messages/by-id/dc08add3-10a8-738b-983a-191c7406707b@enterprisedb.com, are
performance implications of this change and the possibility of current
behaviour expectations from the users.

We came up with a solution where the performance impact is negligible,
as shown in the tests [4]/messages/by-id/CALDaNm2Xkm1M-ik2RLJZ9rMhW2zW2GRLL6ePyZJbXcAjOVwzXg@mail.gmail.com. For that, we won't try to reload the
skipped/missing publication for each change but will attempt it only
when any new publication is created/dropped for a valid relation entry
in RelationSyncCache (maintained by pgoutput).

The new option skip_not_existant_publication is to address the second
concern "Imagine you have a subscriber using two publications p1 and
p2, and someone comes around and drops p1 by mistake. With the
proposed patch, the subscription will notice this, but it'll continue
sending data ignoring the missing publication. Yes, it will continue
working, but it's quite possible this breaks the subscriber and it's
be better to fail and stop replicating.".

I see the point of adding such an option to avoid breaking the current
applications (if there are any) that are relying on current behaviour.
But OTOH, I am not sure if users expect us to fail explicitly in such
scenarios.

This is a long-standing behaviour for which we get reports from time
to time, and once analyzing a failure, Tom also looked at it and
agreed that we don't have much choice to avoid skipping non-existent
publications [5]/messages/by-id/631312.1707251789@sss.pgh.pa.us. But we never concluded as to whether skipping should
be a default behavior or an optional one. So, we need more opinions on
it.

Thoughts?

[1]: /messages/by-id/CALDaNm0-n8FGAorM+bTxkzn+AOUyx5=L_XmnvOP6T24+-NcBKg@mail.gmail.com
[2]: /messages/by-id/CAA4eK1Lc=NDV1HrY2gNasFK90MtysnA575a+rd0p+POjXN+Spw@mail.gmail.com
[3]: /messages/by-id/dc08add3-10a8-738b-983a-191c7406707b@enterprisedb.com
[4]: /messages/by-id/CALDaNm2Xkm1M-ik2RLJZ9rMhW2zW2GRLL6ePyZJbXcAjOVwzXg@mail.gmail.com
[5]: /messages/by-id/631312.1707251789@sss.pgh.pa.us

--
With Regards,
Amit Kapila.

#12Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Amit Kapila (#11)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Tue, Mar 4, 2025 at 9:04 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Mar 4, 2025 at 12:23 PM vignesh C <vignesh21@gmail.com> wrote:

There is almost negligible dip with the above suggested way, the test
results for the same is given below(execution time is in milli
seconds):
Brach/records | 100 | 1000 | 10000 | 100000 | 1000000
Head | 10.25 | 15.85 | 65.53 | 569.15 | 9194.19
Patch | 10.25 | 15.84 | 65.91 | 571.75 | 9208.66
% diff | 0.00 | 0.06 | -0.58 | -0.46 | -0.16

There is a performance dip in the range of 0 to 0.58 percent.
The attached patch has the changes for the same. The test script used
is also attached.

The patch still needs more review but the change has negligible
performance impact. The next step is to get more opinions on whether
we should add a new subscription option (say
skip_not_existant_publication) for this work. See patch v1-0002-* in
email [1]. The problem summary is explained in email [2] and in the
commit message of the 0001 patch in this thread. But still, let me
write briefly for the ease of others.

The problem is that ALTER SUBSCRIPTION ... SET PUBLICATION ... will
lead to restarting of apply worker, and after the restart, the apply
worker will use the existing slot and replication origin corresponding
to the subscription. Now, it is possible that before the restart, the
origin has not been updated, and the WAL start location points to a
location before where PUBLICATION pointed to by SET PUBLICATION
exists. This leads to an error: "ERROR: publication "pub1" does not
exist". Once this error occurs, apply worker will never be able to
proceed and will always return the same error. For users, this is a
problem because they would have created a publication before executing
ALTER SUBSCRIPTION ... SET PUBLICATION .. and now they have no way to
proceed.

The solution we came up with is to skip loading the publication if the
publication does not exist. We load the publication later and update
the relation entry when the publication gets created.

The two main concerns with this idea, as shared in email [3], are
performance implications of this change and the possibility of current
behaviour expectations from the users.

We came up with a solution where the performance impact is negligible,
as shown in the tests [4]. For that, we won't try to reload the
skipped/missing publication for each change but will attempt it only
when any new publication is created/dropped for a valid relation entry
in RelationSyncCache (maintained by pgoutput).

Thank you for summarizing the issue. That helps catch up a lot.

The new option skip_not_existant_publication is to address the second
concern "Imagine you have a subscriber using two publications p1 and
p2, and someone comes around and drops p1 by mistake. With the
proposed patch, the subscription will notice this, but it'll continue
sending data ignoring the missing publication. Yes, it will continue
working, but it's quite possible this breaks the subscriber and it's
be better to fail and stop replicating.".

I think that in this particular situation the current behavior would
be likely to miss more changes than the patch'ed behavior case.

After the logical replication stops, the user would have to alter the
subscription to subscribe to only p1 by executing 'ALTER SUBSCRIPTION
... SET PUBLICATION p1' in order to resume the logical replication. In
any case, the publisher might be receiving further changes but the
subscriber would end up missing changes for tables associated with p2
generated while p2 doesn't exist. Even if the user re-creates the
publication p2 after that, it would be hard for users to re-alter the
subscription to get changes for tables associated with p1 and p2 from
the exact point of p2 being created. Therefore, the subscriber could
end up missing some changes that happened between 'CREATE PUBLICATION
p2' and 'ALTER SUBSCRIPTION ... SET PUBLICATION p1, p2'.

On the other hand, with the patch, the publication can send the
changes to tables associated with p1 and p2 as soon as it decodes the
WAL record of re-CREATE PUBLICATION p2.

I see the point of adding such an option to avoid breaking the current
applications (if there are any) that are relying on current behaviour.
But OTOH, I am not sure if users expect us to fail explicitly in such
scenarios.

On the side note, with the patch since we ignore missing publications
we will be able to create a subscription with whatever publications
regardless their existence like:

CREATE SUBSCRIPTION ... PUBLICATION pub1, pub2, pub3, pub4, pub5, ..., pub1000;

The walsender corresponding to such subscriber can stream changes as
soon as the listed publications are created on the publisher and
REFRESH PUBLICATION is executed.

This is a long-standing behaviour for which we get reports from time
to time, and once analyzing a failure, Tom also looked at it and
agreed that we don't have much choice to avoid skipping non-existent
publications [5]. But we never concluded as to whether skipping should
be a default behavior or an optional one. So, we need more opinions on
it.

I'm leaning toward making the skipping behavior a default as I could
not find a good benefit for the current behavior (i.e., stopping
logical replication due to missing publications).

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#13Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#12)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Sun, Mar 9, 2025 at 9:00 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Tue, Mar 4, 2025 at 9:04 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

I see the point of adding such an option to avoid breaking the current
applications (if there are any) that are relying on current behaviour.
But OTOH, I am not sure if users expect us to fail explicitly in such
scenarios.

On the side note, with the patch since we ignore missing publications
we will be able to create a subscription with whatever publications
regardless their existence like:

CREATE SUBSCRIPTION ... PUBLICATION pub1, pub2, pub3, pub4, pub5, ..., pub1000;

The walsender corresponding to such subscriber can stream changes as
soon as the listed publications are created on the publisher and
REFRESH PUBLICATION is executed.

Right, but OTOH, one can expect that the data should start replicating
as soon as one creates a publication on the publisher. However, the
data for tables that are part of the publication will start
replicating from the point when the decoding process will process the
WAL corresponding to Create Publication. I suggest to add something
for this in docs unless it is already explained.

This is a long-standing behaviour for which we get reports from time
to time, and once analyzing a failure, Tom also looked at it and
agreed that we don't have much choice to avoid skipping non-existent
publications [5]. But we never concluded as to whether skipping should
be a default behavior or an optional one. So, we need more opinions on
it.

I'm leaning toward making the skipping behavior a default as I could
not find a good benefit for the current behavior (i.e., stopping
logical replication due to missing publications).

Sounds reasonable. We can always add the option at a later point if
required. Thanks for your input. We can continue reviewing and
committing the current patch.

--
With Regards,
Amit Kapila.

#14Amit Kapila
amit.kapila16@gmail.com
In reply to: vignesh C (#10)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Tue, Mar 4, 2025 at 6:54 PM vignesh C <vignesh21@gmail.com> wrote:

On further thinking, I felt the use of publications_updated variable
is not required we can use publications_valid itself which will be set
if the publication system table is invalidated. Here is a patch for
the same.

The patch relies on the fact that whenever a publication's data is
invalidated, it will also invalidate all the RelSyncEntires as per
publication_invalidation_cb. But note that we are discussing removing
that inefficiency in the thread [1]/messages/by-id/OSCPR01MB14966C09AA201EFFA706576A7F5C92@OSCPR01MB14966.jpnprd01.prod.outlook.com. So, we should try to rebuild the
entry when we have skipped the required publication previously.

Apart from this, please consider updating the docs, as mentioned in my
response to Sawada-San's email.

BTW, I am planning to commit this only on HEAD as this is a behavior
change. Please let me know if you guys think otherwise.

[1]: /messages/by-id/OSCPR01MB14966C09AA201EFFA706576A7F5C92@OSCPR01MB14966.jpnprd01.prod.outlook.com

--
With Regards,
Amit Kapila.

#15Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#14)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Mon, Mar 10, 2025 at 9:33 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Mar 4, 2025 at 6:54 PM vignesh C <vignesh21@gmail.com> wrote:

On further thinking, I felt the use of publications_updated variable
is not required we can use publications_valid itself which will be set
if the publication system table is invalidated. Here is a patch for
the same.

The patch relies on the fact that whenever a publication's data is
invalidated, it will also invalidate all the RelSyncEntires as per
publication_invalidation_cb. But note that we are discussing removing
that inefficiency in the thread [1]. So, we should try to rebuild the
entry when we have skipped the required publication previously.

Apart from this, please consider updating the docs, as mentioned in my
response to Sawada-San's email.

I'm not sure I fully understand it, but based on your previous email and
the initial email from Vignesh, if IIUC, the issue occurs when a
publication is created after a certain LSN. When ALTER SUBSCRIPTION ... SET
PUBLICATION is executed, the subscriber workers restart and request the
changes based on restart_lsn, which is at an earlier LSN in the WAL than
the LSN at which the publication was created. This leads to an error, and
we are addressing this behavior as part of the fix by skipping the changes
which are between the restart_lsn of subscriber and the lsn at which
publication is created and this behavior looks fine.

BTW, I am planning to commit this only on HEAD as this is a behavior

change. Please let me know if you guys think otherwise.

Somehow this looks like a bug fix which should be backported no? Am I
missing something?

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#16Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#15)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Mon, Mar 10, 2025 at 10:15 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Mar 10, 2025 at 9:33 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Mar 4, 2025 at 6:54 PM vignesh C <vignesh21@gmail.com> wrote:

On further thinking, I felt the use of publications_updated variable
is not required we can use publications_valid itself which will be set
if the publication system table is invalidated. Here is a patch for
the same.

The patch relies on the fact that whenever a publication's data is
invalidated, it will also invalidate all the RelSyncEntires as per
publication_invalidation_cb. But note that we are discussing removing
that inefficiency in the thread [1]. So, we should try to rebuild the
entry when we have skipped the required publication previously.

Apart from this, please consider updating the docs, as mentioned in my
response to Sawada-San's email.

I'm not sure I fully understand it, but based on your previous email and the initial email from Vignesh, if IIUC, the issue occurs when a publication is created after a certain LSN. When ALTER SUBSCRIPTION ... SET PUBLICATION is executed, the subscriber workers restart and request the changes based on restart_lsn, which is at an earlier LSN in the WAL than the LSN at which the publication was created. This leads to an error, and we are addressing this behavior as part of the fix by skipping the changes which are between the restart_lsn of subscriber and the lsn at which publication is created and this behavior looks fine.

Yes, your understanding is correct, but note that as such, the patch
is simply skipping the missing publication. The skipped changes are
because those were on the table that is not part of any publication
w.r.t historic snapshot we have at the point of time.

BTW, I am planning to commit this only on HEAD as this is a behavior
change. Please let me know if you guys think otherwise.

Somehow this looks like a bug fix which should be backported no? Am I missing something?

We can consider this a bug-fix and backpatch it, but I am afraid that
because this is a behavior change, some users may not like it. Also, I
don't remember seeing public reports for this behavior; that is
probably because it is hard to hit. FYI, we found this via BF
failures. So, I thought it would be better to get this field tested
via HEAD only at this point in time.

--
With Regards,
Amit Kapila.

#17Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#16)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Mon, Mar 10, 2025 at 10:54 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Mar 10, 2025 at 10:15 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Mar 10, 2025 at 9:33 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Mar 4, 2025 at 6:54 PM vignesh C <vignesh21@gmail.com> wrote:

On further thinking, I felt the use of publications_updated variable
is not required we can use publications_valid itself which will be set
if the publication system table is invalidated. Here is a patch for
the same.

The patch relies on the fact that whenever a publication's data is
invalidated, it will also invalidate all the RelSyncEntires as per
publication_invalidation_cb. But note that we are discussing removing
that inefficiency in the thread [1]. So, we should try to rebuild the
entry when we have skipped the required publication previously.

Apart from this, please consider updating the docs, as mentioned in my
response to Sawada-San's email.

I'm not sure I fully understand it, but based on your previous email and the initial email from Vignesh, if IIUC, the issue occurs when a publication is created after a certain LSN. When ALTER SUBSCRIPTION ... SET PUBLICATION is executed, the subscriber workers restart and request the changes based on restart_lsn, which is at an earlier LSN in the WAL than the LSN at which the publication was created. This leads to an error, and we are addressing this behavior as part of the fix by skipping the changes which are between the restart_lsn of subscriber and the lsn at which publication is created and this behavior looks fine.

Yes, your understanding is correct, but note that as such, the patch
is simply skipping the missing publication. The skipped changes are
because those were on the table that is not part of any publication
w.r.t historic snapshot we have at the point of time.

So, it will skip loading the missing publication up to the LSN where
the publication is created and then load it from there, correct? Do we
have a test case for this? I couldn't find one in the latest patch or
in the email thread to demonstrate this behavior.

BTW, I am planning to commit this only on HEAD as this is a behavior
change. Please let me know if you guys think otherwise.

Somehow this looks like a bug fix which should be backported no? Am I missing something?

We can consider this a bug-fix and backpatch it, but I am afraid that
because this is a behavior change, some users may not like it. Also, I
don't remember seeing public reports for this behavior; that is
probably because it is hard to hit. FYI, we found this via BF
failures. So, I thought it would be better to get this field tested
via HEAD only at this point in time.

At the moment, I don't have a strong opinion on this. Since no one has
encountered or reported this issue, it might be the case that it's not
affecting anyone, and we could simply backpatch without causing any
dissatisfaction. However, I'm fine with whatever others decide.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#18vignesh C
vignesh21@gmail.com
In reply to: Amit Kapila (#14)
1 attachment(s)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Mon, 10 Mar 2025 at 09:33, Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Mar 4, 2025 at 6:54 PM vignesh C <vignesh21@gmail.com> wrote:

On further thinking, I felt the use of publications_updated variable
is not required we can use publications_valid itself which will be set
if the publication system table is invalidated. Here is a patch for
the same.

The patch relies on the fact that whenever a publication's data is
invalidated, it will also invalidate all the RelSyncEntires as per
publication_invalidation_cb. But note that we are discussing removing
that inefficiency in the thread [1]. So, we should try to rebuild the
entry when we have skipped the required publication previously.

Apart from this, please consider updating the docs, as mentioned in my
response to Sawada-San's email.

The create subscription documentation already has "We allow
non-existent publications to be specified so that users can add those
later. This means pg_subscription can have non-existent publications."
and should be enough at [1]https://www.postgresql.org/docs/devel/sql-createsubscription.html. Let me know if we need to add more
documentation.

Apart from this I have changed the log level that logs "skipped
loading publication" to WARNING as we log a warning "WARNING:
publications "pub2", "pub3" do not exist on the publisher" in case of
CREATE SUBSCRIPTION and looked similar to this. I can change it to a
different log level in case you feel this is not the right level.

Also I have added a test case for dilip's comment from [2]/messages/by-id/CAFiTN-tgUR6QLSs3UHK7gx4VP7cURGNkufA_xkrQLw9eCnbGQw@mail.gmail.com.
The attached v7 version patch has the changes for the same.

[1]: https://www.postgresql.org/docs/devel/sql-createsubscription.html
[2]: /messages/by-id/CAFiTN-tgUR6QLSs3UHK7gx4VP7cURGNkufA_xkrQLw9eCnbGQw@mail.gmail.com

Regards,
Vignesh

Attachments:

v7-0001-Fix-logical-replication-breakage-after-ALTER-SUBS.patchapplication/octet-stream; name=v7-0001-Fix-logical-replication-breakage-after-ALTER-SUBS.patchDownload
From 1b838710fbbe16d412c5eff8a38d1aeb5590dd23 Mon Sep 17 00:00:00 2001
From: Vignesh <vignesh21@gmail.com>
Date: Mon, 3 Mar 2025 11:54:27 +0530
Subject: [PATCH v7] Fix logical replication breakage after ALTER SUBSCRIPTION
 ... SET PUBLICATION

Altering a subscription with `ALTER SUBSCRIPTION ... SET PUBLICATION`
could cause logical replication to break under certain conditions. When
the apply worker restarts after executing SET PUBLICATION, it continues
using the existing replication slot and origin. If the replication origin
was not updated before the restart, the WAL start location could point to
a position prior to the existence of the specified publication, leading to
persistent start of apply worker and reporting errors.

This patch skips loading the publication if the publication does not exist
and loads the publication and updates the relation entry when the publication
gets created.

Discussion: https://www.postgresql.org/message-id/flat/CALDaNm0-n8FGAorM%2BbTxkzn%2BAOUyx5%3DL_XmnvOP6T24%2B-NcBKg%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/CAA4eK1+T-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q@mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 26 ++++++++++++++++--
 src/test/subscription/t/024_add_drop_pub.pl | 29 ++++++++++++++++++++-
 2 files changed, 52 insertions(+), 3 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9063af6e1df..991aa6f7282 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1762,6 +1762,8 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 
 /*
  * Load publications from the list of publication names.
+ *
+ * Here, we just skip the publications that don't exist yet.
  */
 static List *
 LoadPublications(List *pubnames)
@@ -1772,9 +1774,29 @@ LoadPublications(List *pubnames)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, false);
+		Publication *pub = GetPublicationByName(pubname, true);
 
-		result = lappend(result, pub);
+		if (pub)
+			result = lappend(result, pub);
+		else
+		{
+			/*
+			 * When executing 'ALTER SUBSCRIPTION ... SET PUBLICATION', the
+			 * apply worker continues using the existing replication slot and
+			 * origin after restarting. If the replication origin is not
+			 * updated before the restart, the WAL start location may point to
+			 * a position before the specified publication exists, causing
+			 * persistent apply worker restarts and errors.
+			 *
+			 * This ensures that the publication is skipped if it does not
+			 * exist and is loaded when the corresponding WAL record is
+			 * encountered.
+			 */
+			ereport(WARNING,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("skipped loading publication: %s", pubname),
+					errhint("If the publication already exists, ignore it as it will be loaded upon reaching the corresponding WAL record; otherwise, create it."));
+		}
 	}
 
 	return result;
diff --git a/src/test/subscription/t/024_add_drop_pub.pl b/src/test/subscription/t/024_add_drop_pub.pl
index 4428a3413db..dfdc9835e0c 100644
--- a/src/test/subscription/t/024_add_drop_pub.pl
+++ b/src/test/subscription/t/024_add_drop_pub.pl
@@ -1,7 +1,9 @@
 
 # Copyright (c) 2021-2025, PostgreSQL Global Development Group
 
-# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION
+# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and
+# ensures that dropping a publication associated with a subscription does not
+# disrupt existing logical replication.
 use strict;
 use warnings FATAL => 'all';
 use PostgreSQL::Test::Cluster;
@@ -80,6 +82,31 @@ $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_1");
 is($result, qq(20|1|10), 'check initial data is copied to subscriber');
 
+# Ensure that dropping a publication associated with a subscription does not
+# disrupt existing logical replication. Instead, it should log a warning
+# while allowing replication to continue. Additionally, verify that replication
+# of the dropped publication resumes once the publication is recreated.
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_1;");
+
+my $offset = -s $node_publisher->logfile;
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_1 values(50)");
+
+# Verify that a warning is logged.
+$node_publisher->wait_for_log(
+	qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_1/, $offset);
+
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_1 FOR TABLE tab_1");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_1 values(11)");
+
+# Verify that the insert operation gets replicated to subscriber after
+# re-createion.
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab_1");
+is($result, qq(21|1|11), 'check that the incremental data is replicated after the dropped publication is re-created');
+
 # shutdown
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
-- 
2.43.0

#19Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#17)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Tue, Mar 11, 2025 at 9:48 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Mar 10, 2025 at 10:54 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

BTW, I am planning to commit this only on HEAD as this is a behavior
change. Please let me know if you guys think otherwise.

Somehow this looks like a bug fix which should be backported no? Am I missing something?

We can consider this a bug-fix and backpatch it, but I am afraid that
because this is a behavior change, some users may not like it. Also, I
don't remember seeing public reports for this behavior; that is
probably because it is hard to hit. FYI, we found this via BF
failures. So, I thought it would be better to get this field tested
via HEAD only at this point in time.

At the moment, I don't have a strong opinion on this. Since no one has
encountered or reported this issue, it might be the case that it's not
affecting anyone, and we could simply backpatch without causing any
dissatisfaction. However, I'm fine with whatever others decide.

Sawada-San, others, do you have an opinion on whether to backpatch this change?

--
With Regards,
Amit Kapila.

#20Dilip Kumar
dilipbalaut@gmail.com
In reply to: vignesh C (#18)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Tue, Mar 11, 2025 at 4:01 PM vignesh C <vignesh21@gmail.com> wrote:

On Mon, 10 Mar 2025 at 09:33, Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Mar 4, 2025 at 6:54 PM vignesh C <vignesh21@gmail.com> wrote:

On further thinking, I felt the use of publications_updated variable
is not required we can use publications_valid itself which will be set
if the publication system table is invalidated. Here is a patch for
the same.

The patch relies on the fact that whenever a publication's data is
invalidated, it will also invalidate all the RelSyncEntires as per
publication_invalidation_cb. But note that we are discussing removing
that inefficiency in the thread [1]. So, we should try to rebuild the
entry when we have skipped the required publication previously.

Apart from this, please consider updating the docs, as mentioned in my
response to Sawada-San's email.

The create subscription documentation already has "We allow
non-existent publications to be specified so that users can add those
later. This means pg_subscription can have non-existent publications."
and should be enough at [1]. Let me know if we need to add more
documentation.

Apart from this I have changed the log level that logs "skipped
loading publication" to WARNING as we log a warning "WARNING:
publications "pub2", "pub3" do not exist on the publisher" in case of
CREATE SUBSCRIPTION and looked similar to this. I can change it to a
different log level in case you feel this is not the right level.

Also I have added a test case for dilip's comment from [2].
The attached v7 version patch has the changes for the same.

Thanks, Vignesh, for adding the test. I believe you've tested the
effect of DROP PUBLICATION. However, I think we should also test the
behavior of ALTER SUBSCRIPTION...SET PUBLICATION before creating the
PUBLICATION, and then create the PUBLICATION at a later stage.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#21Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Amit Kapila (#19)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Wed, Mar 12, 2025 at 3:34 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Mar 11, 2025 at 9:48 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Mar 10, 2025 at 10:54 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

BTW, I am planning to commit this only on HEAD as this is a behavior
change. Please let me know if you guys think otherwise.

Somehow this looks like a bug fix which should be backported no? Am I missing something?

We can consider this a bug-fix and backpatch it, but I am afraid that
because this is a behavior change, some users may not like it. Also, I
don't remember seeing public reports for this behavior; that is
probably because it is hard to hit. FYI, we found this via BF
failures. So, I thought it would be better to get this field tested
via HEAD only at this point in time.

At the moment, I don't have a strong opinion on this. Since no one has
encountered or reported this issue, it might be the case that it's not
affecting anyone, and we could simply backpatch without causing any
dissatisfaction. However, I'm fine with whatever others decide.

Sawada-San, others, do you have an opinion on whether to backpatch this change?

I'm also afraid of backpatching it so I guess it would be better to
push it to only HEAD. I think if users have encountered and we see
reported the issue we can consider backpatching again. If regression
tests on backbranches continue to fail intermittently, probably we can
consider adding waits as the patch Osumi-san proposed[1]/messages/by-id/TYCPR01MB83737A68CD5D554EA82BD7B9EDD39@TYCPR01MB8373.jpnprd01.prod.outlook.com?

Regards,

[1]: /messages/by-id/TYCPR01MB83737A68CD5D554EA82BD7B9EDD39@TYCPR01MB8373.jpnprd01.prod.outlook.com

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#22vignesh C
vignesh21@gmail.com
In reply to: Dilip Kumar (#20)
1 attachment(s)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Wed, 12 Mar 2025 at 16:15, Dilip Kumar <dilipbalaut@gmail.com> wrote:

Thanks, Vignesh, for adding the test. I believe you've tested the
effect of DROP PUBLICATION. However, I think we should also test the
behavior of ALTER SUBSCRIPTION...SET PUBLICATION before creating the
PUBLICATION, and then create the PUBLICATION at a later stage.

I felt having only one test case for this is enough, I have removed
the DROP PUBLICATION test and added the SET PUBLICATION test. The
attached v8 version patch has the changes for the same.

Regards,
Vignesh

Attachments:

v8-0001-Fix-logical-replication-breakage-after-ALTER-SUBS.patchtext/x-patch; charset=US-ASCII; name=v8-0001-Fix-logical-replication-breakage-after-ALTER-SUBS.patchDownload
From c2f806ce866e3b0dfcebcd639683627a83e4e35a Mon Sep 17 00:00:00 2001
From: Vignesh <vignesh21@gmail.com>
Date: Mon, 3 Mar 2025 11:54:27 +0530
Subject: [PATCH v8] Fix logical replication breakage after ALTER SUBSCRIPTION
 ... SET PUBLICATION

Altering a subscription with `ALTER SUBSCRIPTION ... SET PUBLICATION`
could cause logical replication to break under certain conditions. When
the apply worker restarts after executing SET PUBLICATION, it continues
using the existing replication slot and origin. If the replication origin
was not updated before the restart, the WAL start location could point to
a position prior to the existence of the specified publication, leading to
persistent start of apply worker and reporting errors.

This patch skips loading the publication if the publication does not exist
and loads the publication and updates the relation entry when the publication
gets created.

Discussion: https://www.postgresql.org/message-id/flat/CALDaNm0-n8FGAorM%2BbTxkzn%2BAOUyx5%3DL_XmnvOP6T24%2B-NcBKg%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/CAA4eK1+T-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q@mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 26 +++++++++++-
 src/test/subscription/t/024_add_drop_pub.pl | 44 ++++++++++++++++++++-
 2 files changed, 67 insertions(+), 3 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9063af6e1df..991aa6f7282 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1762,6 +1762,8 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 
 /*
  * Load publications from the list of publication names.
+ *
+ * Here, we just skip the publications that don't exist yet.
  */
 static List *
 LoadPublications(List *pubnames)
@@ -1772,9 +1774,29 @@ LoadPublications(List *pubnames)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, false);
+		Publication *pub = GetPublicationByName(pubname, true);
 
-		result = lappend(result, pub);
+		if (pub)
+			result = lappend(result, pub);
+		else
+		{
+			/*
+			 * When executing 'ALTER SUBSCRIPTION ... SET PUBLICATION', the
+			 * apply worker continues using the existing replication slot and
+			 * origin after restarting. If the replication origin is not
+			 * updated before the restart, the WAL start location may point to
+			 * a position before the specified publication exists, causing
+			 * persistent apply worker restarts and errors.
+			 *
+			 * This ensures that the publication is skipped if it does not
+			 * exist and is loaded when the corresponding WAL record is
+			 * encountered.
+			 */
+			ereport(WARNING,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("skipped loading publication: %s", pubname),
+					errhint("If the publication already exists, ignore it as it will be loaded upon reaching the corresponding WAL record; otherwise, create it."));
+		}
 	}
 
 	return result;
diff --git a/src/test/subscription/t/024_add_drop_pub.pl b/src/test/subscription/t/024_add_drop_pub.pl
index 4428a3413db..4eb17f45ec3 100644
--- a/src/test/subscription/t/024_add_drop_pub.pl
+++ b/src/test/subscription/t/024_add_drop_pub.pl
@@ -1,7 +1,9 @@
 
 # Copyright (c) 2021-2025, PostgreSQL Global Development Group
 
-# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION
+# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and
+# ensures that dropping a publication associated with a subscription does not
+# disrupt existing logical replication.
 use strict;
 use warnings FATAL => 'all';
 use PostgreSQL::Test::Cluster;
@@ -80,6 +82,46 @@ $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_1");
 is($result, qq(20|1|10), 'check initial data is copied to subscriber');
 
+# Ensure that setting a missing publication to the subscription does not
+# disrupt existing logical replication. Instead, it should log a warning
+# while allowing replication to continue. Additionally, verify that replication
+# resumes after the missing publication is created for the publication table.
+
+# Create table on publisher and subscriber
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)");
+
+# Set the subscription with a missing publication
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_3");
+
+my $offset = -s $node_publisher->logfile;
+
+$node_publisher->safe_psql('postgres',"INSERT INTO tab_3 values(1)");
+
+# Verify that a warning is logged.
+$node_publisher->wait_for_log(
+	qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3/, $offset);
+
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3");
+
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub REFRESH  PUBLICATION");
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(2)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Verify that the insert operation gets replicated to subscriber after
+# publication is created.
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab_3");
+is($result, qq(1
+2), 'check that the incremental data is replicated after the publication is created');
+
 # shutdown
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
-- 
2.43.0

#23Dilip Kumar
dilipbalaut@gmail.com
In reply to: vignesh C (#22)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Thu, Mar 13, 2025 at 7:38 AM vignesh C <vignesh21@gmail.com> wrote:

On Wed, 12 Mar 2025 at 16:15, Dilip Kumar <dilipbalaut@gmail.com> wrote:

Thanks, Vignesh, for adding the test. I believe you've tested the
effect of DROP PUBLICATION. However, I think we should also test the
behavior of ALTER SUBSCRIPTION...SET PUBLICATION before creating the
PUBLICATION, and then create the PUBLICATION at a later stage.

I felt having only one test case for this is enough, I have removed
the DROP PUBLICATION test and added the SET PUBLICATION test. The
attached v8 version patch has the changes for the same.

Thanks looks good to me.

While looking at the patch, I have a few comments/questions

+ if (pub)
+ result = lappend(result, pub);
+ else
+ {
+ /*
+ * When executing 'ALTER SUBSCRIPTION ... SET PUBLICATION', the
+ * apply worker continues using the existing replication slot and
+ * origin after restarting. If the replication origin is not
+ * updated before the restart, the WAL start location may point to
+ * a position before the specified publication exists, causing
+ * persistent apply worker restarts and errors.
+ *
+ * This ensures that the publication is skipped if it does not
+ * exist and is loaded when the corresponding WAL record is
+ * encountered.
+ */
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("skipped loading publication: %s", pubname),
+ errhint("If the publication already exists, ignore it as it will be
loaded upon reaching the corresponding WAL record; otherwise, create
it."));
+ }

This comment focuses on a specific use case regarding the problem with
'ALTER SUBSCRIPTION ... SET PUBLICATION,' but in reality, we are
addressing a more general case where the user is trying to SET
PUBLICATION or even CREATE SUBSCRIPTION, and some publications are
missing. Wouldn't it be better to rephrase the comment?

2. + errhint("If the publication already exists, ignore it as it will
be loaded upon reaching the corresponding WAL record; otherwise,
create it."));

Is this hint correct? This is a question rather than a comment: When
we reach a particular WAL where the publication was created, will the
publication automatically load, or does the user need to REFRESH the
publications?

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#24vignesh C
vignesh21@gmail.com
In reply to: Dilip Kumar (#23)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Thu, 13 Mar 2025 at 09:18, Dilip Kumar <dilipbalaut@gmail.com> wrote:

Thanks looks good to me.

While looking at the patch, I have a few comments/questions

+ if (pub)
+ result = lappend(result, pub);
+ else
+ {
+ /*
+ * When executing 'ALTER SUBSCRIPTION ... SET PUBLICATION', the
+ * apply worker continues using the existing replication slot and
+ * origin after restarting. If the replication origin is not
+ * updated before the restart, the WAL start location may point to
+ * a position before the specified publication exists, causing
+ * persistent apply worker restarts and errors.
+ *
+ * This ensures that the publication is skipped if it does not
+ * exist and is loaded when the corresponding WAL record is
+ * encountered.
+ */
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("skipped loading publication: %s", pubname),
+ errhint("If the publication already exists, ignore it as it will be
loaded upon reaching the corresponding WAL record; otherwise, create
it."));
+ }

This comment focuses on a specific use case regarding the problem with
'ALTER SUBSCRIPTION ... SET PUBLICATION,' but in reality, we are
addressing a more general case where the user is trying to SET
PUBLICATION or even CREATE SUBSCRIPTION, and some publications are
missing. Wouldn't it be better to rephrase the comment?

How about a comment something like below:
/*
* In 'ALTER SUBSCRIPTION ... ADD/SET PUBLICATION' and
* 'CREATE SUBSCRIPTION', if the replication origin is not updated
* before the worker exits, the WAL start location might point to a
* position before the publication's WAL record. This can lead to
* persistent apply worker restarts and errors.
*
* Additionally, dropping a subscription's publication should not
* disrupt logical replication.
*
* This ensures that a missing publication is skipped and loaded
* when its corresponding WAL record is encountered.
*/
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("skipped loading publication: %s", pubname),
errhint("If the publication is missing, create and refresh it.
Otherwise, wait for the slot to reach the WAL record, then refresh"));

2. + errhint("If the publication already exists, ignore it as it will
be loaded upon reaching the corresponding WAL record; otherwise,
create it."));

Is this hint correct? This is a question rather than a comment: When
we reach a particular WAL where the publication was created, will the
publication automatically load, or does the user need to REFRESH the
publications?

Users need to refresh the publication in case the relation is not
already added to pg_subscription_rel and apply incremental changes.
How about an error hint like:
"If the publication is missing, create and refresh it. Otherwise, wait
for the slot to reach the WAL record for the created publication, then
refresh"

Regards,
Vignesh

#25Dilip Kumar
dilipbalaut@gmail.com
In reply to: vignesh C (#24)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Thu, Mar 13, 2025 at 10:49 AM vignesh C <vignesh21@gmail.com> wrote:

On Thu, 13 Mar 2025 at 09:18, Dilip Kumar <dilipbalaut@gmail.com> wrote:

Thanks looks good to me.

While looking at the patch, I have a few comments/questions

+ if (pub)
+ result = lappend(result, pub);
+ else
+ {
+ /*
+ * When executing 'ALTER SUBSCRIPTION ... SET PUBLICATION', the
+ * apply worker continues using the existing replication slot and
+ * origin after restarting. If the replication origin is not
+ * updated before the restart, the WAL start location may point to
+ * a position before the specified publication exists, causing
+ * persistent apply worker restarts and errors.
+ *
+ * This ensures that the publication is skipped if it does not
+ * exist and is loaded when the corresponding WAL record is
+ * encountered.
+ */
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("skipped loading publication: %s", pubname),
+ errhint("If the publication already exists, ignore it as it will be
loaded upon reaching the corresponding WAL record; otherwise, create
it."));
+ }

This comment focuses on a specific use case regarding the problem with
'ALTER SUBSCRIPTION ... SET PUBLICATION,' but in reality, we are
addressing a more general case where the user is trying to SET
PUBLICATION or even CREATE SUBSCRIPTION, and some publications are
missing. Wouldn't it be better to rephrase the comment?

How about a comment something like below:
/*
* In 'ALTER SUBSCRIPTION ... ADD/SET PUBLICATION' and
* 'CREATE SUBSCRIPTION', if the replication origin is not updated
* before the worker exits, the WAL start location might point to a
* position before the publication's WAL record. This can lead to
* persistent apply worker restarts and errors.
*
* Additionally, dropping a subscription's publication should not
* disrupt logical replication.
*
* This ensures that a missing publication is skipped and loaded
* when its corresponding WAL record is encountered.
*/

Looks fine, shall we add the missing publication point as well
something like below

/*
* In operations like 'ALTER SUBSCRIPTION ... ADD/SET PUBLICATION' and
* 'CREATE SUBSCRIPTION', if the specified publication does not exist or
* if the replication origin is not updated before the worker exits,
* the WAL start location may point to a position prior to the publication's
* WAL record. This can cause persistent restarts and errors
* in the apply worker.
*

* Additionally, dropping a subscription's publication should not
* disrupt logical replication.

*

* This ensures that a missing publication is skipped and loaded
* when its corresponding WAL record is encountered.

*/

ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("skipped loading publication: %s", pubname),
errhint("If the publication is missing, create and refresh it.
Otherwise, wait for the slot to reach the WAL record, then refresh"));

2. + errhint("If the publication already exists, ignore it as it will
be loaded upon reaching the corresponding WAL record; otherwise,
create it."));

Is this hint correct? This is a question rather than a comment: When
we reach a particular WAL where the publication was created, will the
publication automatically load, or does the user need to REFRESH the
publications?

Users need to refresh the publication in case the relation is not
already added to pg_subscription_rel and apply incremental changes.
How about an error hint like:
"If the publication is missing, create and refresh it. Otherwise, wait
for the slot to reach the WAL record for the created publication, then
refresh"

+1

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#26Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#25)
1 attachment(s)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Thu, Mar 13, 2025 at 11:43 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

Looks fine, shall we add the missing publication point as well
something like below

/*
* In operations like 'ALTER SUBSCRIPTION ... ADD/SET PUBLICATION' and
* 'CREATE SUBSCRIPTION', if the specified publication does not exist or
* if the replication origin is not updated before the worker exits,
* the WAL start location may point to a position prior to the publication's
* WAL record. This can cause persistent restarts and errors
* in the apply worker.
*

I think that is too much related to pub-sub model, and ideally,
pgoutput should not care about it. I have written a comment
considering somebody using pgoutput decoding module via APIs.

* Additionally, dropping a subscription's publication should not
* disrupt logical replication.

*

* This ensures that a missing publication is skipped and loaded
* when its corresponding WAL record is encountered.

*/

ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("skipped loading publication: %s", pubname),
errhint("If the publication is missing, create and refresh it.
Otherwise, wait for the slot to reach the WAL record, then refresh"));

2. + errhint("If the publication already exists, ignore it as it will
be loaded upon reaching the corresponding WAL record; otherwise,
create it."));

Is this hint correct? This is a question rather than a comment: When
we reach a particular WAL where the publication was created, will the
publication automatically load, or does the user need to REFRESH the
publications?

Users need to refresh the publication in case the relation is not
already added to pg_subscription_rel and apply incremental changes.
How about an error hint like:
"If the publication is missing, create and refresh it. Otherwise, wait
for the slot to reach the WAL record for the created publication, then
refresh"

I have tried to split this information into errdetail and errhint in
the attached. See and let me know what you think of the same.

--
With Regards,
Amit Kapila.

Attachments:

v9-0001-Fix-ALTER-SUBSCRIPTION-.-SET-PUBLICATION-.-comman.patchapplication/octet-stream; name=v9-0001-Fix-ALTER-SUBSCRIPTION-.-SET-PUBLICATION-.-comman.patchDownload
From de767940fe6f33bb8e12975428b827cfd183dcd5 Mon Sep 17 00:00:00 2001
From: Vignesh <vignesh21@gmail.com>
Date: Mon, 3 Mar 2025 11:54:27 +0530
Subject: [PATCH v9] Fix ALTER SUBSCRIPTION ... SET PUBLICATION ... command.

The problem is that ALTER SUBSCRIPTION ... SET PUBLICATION ... will lead
to restarting of apply worker and after the restart, the apply worker will
use the existing slot and replication origin corresponding to the
subscription. Now, it is possible that before the restart, the origin has
not been updated, and the WAL start location points to a location before
where PUBLICATION pointed to by SET PUBLICATION doesn't exist, and that
can lead to an error like: "ERROR:  publication "pub1" does not exist".
Once this error occurs, apply worker will never be able to proceed and
will always return the same error.

We decided to skip loading the publication if the publication does not
exist. The publication is loaded later and updates the relation entry when
the publication gets created.

We decided not to backpatch this as this is a behaviour change, and we don't
see field reports. This problem has been found by intermittent buildfarm
failures.

Author: vignesh C <vignesh21@gmail.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Discussion: https://postgr.es/m/flat/CALDaNm0-n8FGAorM%2BbTxkzn%2BAOUyx5%3DL_XmnvOP6T24%2B-NcBKg%40mail.gmail.com
Discussion: https://postgr.es/m/CAA4eK1+T-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q@mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 16 +++++++-
 src/test/subscription/t/024_add_drop_pub.pl | 44 ++++++++++++++++++++-
 2 files changed, 57 insertions(+), 3 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index ed806c54300..8357bf8b4c0 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1764,6 +1764,11 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 
 /*
  * Load publications from the list of publication names.
+ *
+ * Here, we skip the publications that don't exist yet. This will allow us
+ * to silently continue the replication in the absence of a missing publication.
+ * This is required because we allow the users to create publications after they
+ * have specified the required publications at the time of replication start.
  */
 static List *
 LoadPublications(List *pubnames)
@@ -1774,9 +1779,16 @@ LoadPublications(List *pubnames)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, false);
+		Publication *pub = GetPublicationByName(pubname, true);
 
-		result = lappend(result, pub);
+		if (pub)
+			result = lappend(result, pub);
+		else
+			ereport(WARNING,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("skipped loading publication: %s", pubname),
+					errdetail("The publication does not exist at this point in the WAL."),
+					errhint("Create the publication if it does not exist."));
 	}
 
 	return result;
diff --git a/src/test/subscription/t/024_add_drop_pub.pl b/src/test/subscription/t/024_add_drop_pub.pl
index 4428a3413db..b594941c7cb 100644
--- a/src/test/subscription/t/024_add_drop_pub.pl
+++ b/src/test/subscription/t/024_add_drop_pub.pl
@@ -1,7 +1,9 @@
 
 # Copyright (c) 2021-2025, PostgreSQL Global Development Group
 
-# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION
+# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and
+# ensures that creating a publication associated with a subscription at a later
+# point of time does not break logical replication.
 use strict;
 use warnings FATAL => 'all';
 use PostgreSQL::Test::Cluster;
@@ -80,6 +82,46 @@ $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_1");
 is($result, qq(20|1|10), 'check initial data is copied to subscriber');
 
+# Ensure that setting a missing publication to the subscription does not
+# disrupt existing logical replication. Instead, it should log a warning
+# while allowing replication to continue. Additionally, verify that replication
+# resumes after the missing publication is created for the publication table.
+
+# Create table on publisher and subscriber
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)");
+
+# Set the subscription with a missing publication
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_3");
+
+my $offset = -s $node_publisher->logfile;
+
+$node_publisher->safe_psql('postgres',"INSERT INTO tab_3 values(1)");
+
+# Verify that a warning is logged.
+$node_publisher->wait_for_log(
+	qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3/, $offset);
+
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3");
+
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub REFRESH  PUBLICATION");
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(2)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Verify that the insert operation gets replicated to subscriber after
+# publication is created.
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab_3");
+is($result, qq(1
+2), 'check that the incremental data is replicated after the publication is created');
+
 # shutdown
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
-- 
2.28.0.windows.1

#27Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#26)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Thu, Mar 13, 2025 at 3:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Mar 13, 2025 at 11:43 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

Looks fine, shall we add the missing publication point as well
something like below

/*
* In operations like 'ALTER SUBSCRIPTION ... ADD/SET PUBLICATION' and
* 'CREATE SUBSCRIPTION', if the specified publication does not exist or
* if the replication origin is not updated before the worker exits,
* the WAL start location may point to a position prior to the publication's
* WAL record. This can cause persistent restarts and errors
* in the apply worker.
*

I think that is too much related to pub-sub model, and ideally,
pgoutput should not care about it. I have written a comment
considering somebody using pgoutput decoding module via APIs.

I agree, here we just need to talk about skipping the missing
publication, not different scenarios where that can happen. This
comments look much better.

* Additionally, dropping a subscription's publication should not
* disrupt logical replication.

*

* This ensures that a missing publication is skipped and loaded
* when its corresponding WAL record is encountered.

*/

ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("skipped loading publication: %s", pubname),
errhint("If the publication is missing, create and refresh it.
Otherwise, wait for the slot to reach the WAL record, then refresh"));

2. + errhint("If the publication already exists, ignore it as it will
be loaded upon reaching the corresponding WAL record; otherwise,
create it."));

Is this hint correct? This is a question rather than a comment: When
we reach a particular WAL where the publication was created, will the
publication automatically load, or does the user need to REFRESH the
publications?

Users need to refresh the publication in case the relation is not
already added to pg_subscription_rel and apply incremental changes.
How about an error hint like:
"If the publication is missing, create and refresh it. Otherwise, wait
for the slot to reach the WAL record for the created publication, then
refresh"

I have tried to split this information into errdetail and errhint in
the attached. See and let me know what you think of the same.

Yes, the errhint also makes sense to me.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#28Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#27)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Thu, Mar 13, 2025 at 7:26 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Thu, Mar 13, 2025 at 3:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

I think that is too much related to pub-sub model, and ideally,
pgoutput should not care about it. I have written a comment
considering somebody using pgoutput decoding module via APIs.

Thanks, Dilip and Sawada-San, for the inputs, and Vignesh for the
patch. I have pushed the change.

--
With Regards,
Amit Kapila.

#29Tom Lane
tgl@sss.pgh.pa.us
In reply to: Amit Kapila (#28)
Re: Add an option to skip loading missing publication to avoid logical replication failure

Amit Kapila <amit.kapila16@gmail.com> writes:

Thanks, Dilip and Sawada-San, for the inputs, and Vignesh for the
patch. I have pushed the change.

Xuneng Zhou pointed out on Discord that the test case added by
7c99dc587 has caused repeated failures in CI --- though oddly,
it's not failed in the buildfarm so far as I can find. The
failures look like

timed out waiting for match: (?^:WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3) at /tmp/cirrus-ci-build/src/test/subscription/t/024_add_drop_pub.pl line 103.

I suspect that what is happening is that the "skipped loading
publication" message comes out sooner than the 024 test script
expects, and thus that it could be fixed by moving the
"my $offset = -s $node_publisher->logfile;" line to be just
before the ALTER SUBSCRIPTION command instead of just after it.
Because the "skipped" message is from LoadPublications() which
is fundamentally invoked as a result of cache flushes, it's
hardly astonishing that its timing would be erratic.

However, I can't really prove this because I've been unable
to reproduce the failure locally, except by moving the
"my $offset" assignment further down which is surely cheating.

Thoughts?

regards, tom lane

#30Amit Kapila
amit.kapila16@gmail.com
In reply to: Tom Lane (#29)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Wed, Apr 30, 2025 at 11:22 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Xuneng Zhou pointed out on Discord that the test case added by
7c99dc587 has caused repeated failures in CI --- though oddly,
it's not failed in the buildfarm so far as I can find. The
failures look like

timed out waiting for match: (?^:WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3) at /tmp/cirrus-ci-build/src/test/subscription/t/024_add_drop_pub.pl line 103.

I analyzed the relevant publisher-side CI Logs [1]https://api.cirrus-ci.com/v1/artifact/task/6561639182368768/testrun/build/testrun/subscription/024_add_drop_pub/log/024_add_drop_pub_publisher.log:
...
2025-04-19 08:24:14.096 UTC [21961][client backend]
[024_add_drop_pub.pl][7/4:0] LOG: statement: INSERT INTO tab_3
values(1)
2025-04-19 08:24:14.098 UTC [21961][client backend]
[024_add_drop_pub.pl][:0] LOG: disconnection: session time:
0:00:00.003 user=postgres database=postgres host=[local]
2025-04-19 08:24:14.108 UTC [21797][walsender] [tap_sub][30/0:0] LOG:
released logical replication slot "tap_sub"
2025-04-19 08:24:14.108 UTC [21797][walsender] [tap_sub][:0] LOG:
disconnection: session time: 0:00:00.329 user=postgres
database=postgres host=[local]
2025-04-19 08:24:14.127 UTC [21979][not initialized] [[unknown]][:0]
LOG: connection received: host=[local]
2025-04-19 08:24:14.128 UTC [21979][walsender] [[unknown]][23/5:0]
LOG: connection authenticated: user="postgres" method=trust
(/tmp/cirrus-ci-build/build/testrun/subscription/024_add_drop_pub/data/t_024_add_drop_pub_publisher_data/pgdata/pg_hba.conf:117)
2025-04-19 08:24:14.128 UTC [21979][walsender] [[unknown]][23/5:0]
LOG: replication connection authorized: user=postgres
application_name=tap_sub
2025-04-19 08:24:14.129 UTC [21979][walsender] [tap_sub][23/6:0] LOG:
statement: SELECT pg_catalog.set_config('search_path', '', false);
2025-04-19 08:24:14.130 UTC [21979][walsender] [tap_sub][23/0:0] LOG:
received replication command: IDENTIFY_SYSTEM
2025-04-19 08:24:14.130 UTC [21979][walsender] [tap_sub][23/0:0]
STATEMENT: IDENTIFY_SYSTEM
2025-04-19 08:24:14.131 UTC [21979][walsender] [tap_sub][23/0:0] LOG:
received replication command: START_REPLICATION SLOT "tap_sub" LOGICAL
0/0 (proto_version '4', streaming 'parallel', origin 'any',
publication_names '"tap_pub_
...

This shows that walsender restarts after the "INSERT INTO tab_3
values(1)" is processed by the previous walsender ("released logical
replication slot "tap_sub"" is after "INSERT INTO tab_3 values(1)").
So, it is possible that the old apply worker has sent the confirmation
of WAL received location after the Insert (due to keep_alive message
handling). So, after the restart, the new walsender will start
processing WAL after the INSERT and wait for the skipped message LOG
timed out.

Considering the above theory is correct, after "ALTER SUBSCRIPTION
tap_sub SET PUBLICATION tap_pub_3", we should wait for the new
walsender to restart. We are already doing the same for a similar case
in 001_rep_changes.pl (See "# check that change of connection string
and/or publication list causes restart of subscription workers. We
check the state along with application_name to ensure that the
walsender is (re)started.).

Unfortunately, I will be away for the rest of the week. In the
meantime, if you or someone else is able to reproduce and fix it, then
good; otherwise, I'll take care of it after I return.

[1]: https://api.cirrus-ci.com/v1/artifact/task/6561639182368768/testrun/build/testrun/subscription/024_add_drop_pub/log/024_add_drop_pub_publisher.log

--
With Regards,
Amit Kapila.

#31vignesh C
vignesh21@gmail.com
In reply to: Amit Kapila (#30)
1 attachment(s)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Wed, 30 Apr 2025 at 17:41, Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Apr 30, 2025 at 11:22 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Xuneng Zhou pointed out on Discord that the test case added by
7c99dc587 has caused repeated failures in CI --- though oddly,
it's not failed in the buildfarm so far as I can find. The
failures look like

timed out waiting for match: (?^:WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3) at /tmp/cirrus-ci-build/src/test/subscription/t/024_add_drop_pub.pl line 103.

I analyzed the relevant publisher-side CI Logs [1]:
...
2025-04-19 08:24:14.096 UTC [21961][client backend]
[024_add_drop_pub.pl][7/4:0] LOG: statement: INSERT INTO tab_3
values(1)
2025-04-19 08:24:14.098 UTC [21961][client backend]
[024_add_drop_pub.pl][:0] LOG: disconnection: session time:
0:00:00.003 user=postgres database=postgres host=[local]
2025-04-19 08:24:14.108 UTC [21797][walsender] [tap_sub][30/0:0] LOG:
released logical replication slot "tap_sub"
2025-04-19 08:24:14.108 UTC [21797][walsender] [tap_sub][:0] LOG:
disconnection: session time: 0:00:00.329 user=postgres
database=postgres host=[local]
2025-04-19 08:24:14.127 UTC [21979][not initialized] [[unknown]][:0]
LOG: connection received: host=[local]
2025-04-19 08:24:14.128 UTC [21979][walsender] [[unknown]][23/5:0]
LOG: connection authenticated: user="postgres" method=trust
(/tmp/cirrus-ci-build/build/testrun/subscription/024_add_drop_pub/data/t_024_add_drop_pub_publisher_data/pgdata/pg_hba.conf:117)
2025-04-19 08:24:14.128 UTC [21979][walsender] [[unknown]][23/5:0]
LOG: replication connection authorized: user=postgres
application_name=tap_sub
2025-04-19 08:24:14.129 UTC [21979][walsender] [tap_sub][23/6:0] LOG:
statement: SELECT pg_catalog.set_config('search_path', '', false);
2025-04-19 08:24:14.130 UTC [21979][walsender] [tap_sub][23/0:0] LOG:
received replication command: IDENTIFY_SYSTEM
2025-04-19 08:24:14.130 UTC [21979][walsender] [tap_sub][23/0:0]
STATEMENT: IDENTIFY_SYSTEM
2025-04-19 08:24:14.131 UTC [21979][walsender] [tap_sub][23/0:0] LOG:
received replication command: START_REPLICATION SLOT "tap_sub" LOGICAL
0/0 (proto_version '4', streaming 'parallel', origin 'any',
publication_names '"tap_pub_
...

This shows that walsender restarts after the "INSERT INTO tab_3
values(1)" is processed by the previous walsender ("released logical
replication slot "tap_sub"" is after "INSERT INTO tab_3 values(1)").
So, it is possible that the old apply worker has sent the confirmation
of WAL received location after the Insert (due to keep_alive message
handling). So, after the restart, the new walsender will start
processing WAL after the INSERT and wait for the skipped message LOG
timed out.

Considering the above theory is correct, after "ALTER SUBSCRIPTION
tap_sub SET PUBLICATION tap_pub_3", we should wait for the new
walsender to restart. We are already doing the same for a similar case
in 001_rep_changes.pl (See "# check that change of connection string
and/or publication list causes restart of subscription workers. We
check the state along with application_name to ensure that the
walsender is (re)started.).

Unfortunately, I will be away for the rest of the week. In the
meantime, if you or someone else is able to reproduce and fix it, then
good; otherwise, I'll take care of it after I return.

I agree with your analysis. I was able to reproduce the issue by
delaying the invalidation of the subscription until the walsender
finished decoding the INSERT operation following the ALTER
SUBSCRIPTION through a debugger and using the lsn from the pg_waldump
of the INSERT after the ALTER SUBSCRIPTION. In this scenario, the
confirmed_flush_lsn ends up pointing to a location after the INSERT.
When the invalidation is eventually received and the apply
worker/walsender is restarted, the restarted walsender begins decoding
from that LSN—after the INSERT—which means the "skipped loading
publication" warning is never triggered, causing the test to fail.

Attached is a patch that ensures the walsender process is properly
restarted after ALTER SUBSCRIPTION, preventing this race condition.

Regards,
Vignesh

Attachments:

0001-Fix-race-condition-after-ALTER-SUBSCRIPTION-SET-PUBL.patchtext/x-patch; charset=US-ASCII; name=0001-Fix-race-condition-after-ALTER-SUBSCRIPTION-SET-PUBL.patchDownload
From 26c8efbf59a456f4ad8a87b504180449efe1cd69 Mon Sep 17 00:00:00 2001
From: Vignesh <vignesh21@gmail.com>
Date: Wed, 30 Apr 2025 21:38:33 +0530
Subject: [PATCH] Fix race condition after ALTER SUBSCRIPTION SET PUBLICATION

Previously, after executing ALTER SUBSCRIPTION tap_sub SET PUBLICATION, we
did not wait for the new walsender process to restart. As a result, an INSERT
executed immediately after the ALTER could be decoded and the confirmed flush
lsn is advanced. This could cause replication to resume from a point after the
INSERT. In such cases, we miss the expected warning about the missing
publication.

To fix this, we now ensure that the walsender has restarted before continuing
after ALTER SUBSCRIPTION.
---
 src/test/subscription/t/024_add_drop_pub.pl | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/src/test/subscription/t/024_add_drop_pub.pl b/src/test/subscription/t/024_add_drop_pub.pl
index b594941c7cb..e995d8b3839 100644
--- a/src/test/subscription/t/024_add_drop_pub.pl
+++ b/src/test/subscription/t/024_add_drop_pub.pl
@@ -91,10 +91,21 @@ is($result, qq(20|1|10), 'check initial data is copied to subscriber');
 $node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)");
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)");
 
+my $oldpid = $node_publisher->safe_psql('postgres',
+	"SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';"
+);
+
 # Set the subscription with a missing publication
 $node_subscriber->safe_psql('postgres',
 	"ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_3");
 
+# Wait for the walsender to restart after altering the subscription
+$node_publisher->poll_query_until('postgres',
+	"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';"
+  )
+  or die
+  "Timed out while waiting for apply worker to restart after altering the subscription";
+
 my $offset = -s $node_publisher->logfile;
 
 $node_publisher->safe_psql('postgres',"INSERT INTO tab_3 values(1)");
-- 
2.43.0

#32Tom Lane
tgl@sss.pgh.pa.us
In reply to: vignesh C (#31)
Re: Add an option to skip loading missing publication to avoid logical replication failure

vignesh C <vignesh21@gmail.com> writes:

I agree with your analysis. I was able to reproduce the issue by
delaying the invalidation of the subscription until the walsender
finished decoding the INSERT operation following the ALTER
SUBSCRIPTION through a debugger and using the lsn from the pg_waldump
of the INSERT after the ALTER SUBSCRIPTION.

Can you be a little more specific about how you reproduced this?
I tried inserting sleep() calls in various likely-looking spots
and could not get a failure that way.

regards, tom lane

#33Xuneng Zhou
xunengzhou@gmail.com
In reply to: Tom Lane (#32)
Re: Add an option to skip loading missing publication to avoid logical replication failure

+1, I was unable to reproduce this with lldb, not sure my way is
appropriate or not.

Show quoted text

Can you be a little more specific about how you reproduced this?
I tried inserting sleep() calls in various likely-looking spots
and could not get a failure that way.

regards, tom lane

#34vignesh C
vignesh21@gmail.com
In reply to: Tom Lane (#32)
1 attachment(s)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Fri, 2 May 2025 at 06:30, Tom Lane <tgl@sss.pgh.pa.us> wrote:

vignesh C <vignesh21@gmail.com> writes:

I agree with your analysis. I was able to reproduce the issue by
delaying the invalidation of the subscription until the walsender
finished decoding the INSERT operation following the ALTER
SUBSCRIPTION through a debugger and using the lsn from the pg_waldump
of the INSERT after the ALTER SUBSCRIPTION.

Can you be a little more specific about how you reproduced this?
I tried inserting sleep() calls in various likely-looking spots
and could not get a failure that way.

Test Steps:
1) Set up logical replication:
Create a publication on the publisher
Create a subscription on the subscriber
2) Create the following table on the publisher:
CREATE TABLE tab_3 (a int);
3) Create the same table on the subscriber:
CREATE TABLE tab_3 (a int);
4) On the subscriber, alter the subscription to refer to a
non-existent publication:
ALTER SUBSCRIPTION sub1 SET PUBLICATION tap_pub_3;
5) Insert data on the publisher:
INSERT INTO tab_3 VALUES (1);

As expected, the publisher logs the following warning in normal case:
2025-05-02 08:56:45.350 IST [516197] WARNING: skipped loading
publication: tap_pub_3
2025-05-02 08:56:45.350 IST [516197] DETAIL: The publication does
not exist at this point in the WAL.
2025-05-02 08:56:45.350 IST [516197] HINT: Create the publication
if it does not exist.

To simulate a delay in subscription invalidation, I modified the
maybe_reread_subscription() function as follows:
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index 4151a4b2a96..0831784aca3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3970,6 +3970,10 @@ maybe_reread_subscription(void)
     MemoryContext oldctx;
     Subscription *newsub;
     bool            started_tx = false;
+    bool            test = true;
+
+    if (test)
+        return;

This change delays the subscription invalidation logic, preventing the
apply worker from detecting the subscription change immediately.

With the patch applied, repeat steps 1–5.
Using pg_waldump, identify the LSN of the insert:
rmgr: Heap len (rec/tot): 59/ 59, tx: 756, lsn:
0/01711848, prev 0/01711810, desc: INSERT+INIT off: 1
rmgr: Transaction len (rec/tot): 46/ 46, tx: 756, lsn:
0/01711888, prev 0/01711848, desc: COMMIT 2025-05-02 09:06:09.400926
IST

Check the confirmed flush LSN from the walsender via gdb by attaching
it to the walsender process
(gdb) p *MyReplicationSlot
...
confirmed_flush = 24241928
(gdb) p /x 24241928
$4 = 0x171e708

Now attach to the apply worker, set a breakpoint at
maybe_reread_subscription, and continue execution. Once control
reaches the function, set test = false. Now it will identify that
subscription is invalidated and restart the apply worker.

As the walsender has already confirmed_flush position after the
insert, causing the newly started apply worker to miss the inserted
row entirely. This leads to the CI failure. This issue can arise when
the walsender advances more quickly than the apply worker is able to
detect and react to the subscription change.

I could not find a simpler way to reproduce this.

Regards,
Vignesh

Attachments:

skip_subscription_invalidation.patchtext/x-patch; charset=US-ASCII; name=skip_subscription_invalidation.patchDownload
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4151a4b2a96..0831784aca3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3970,6 +3970,10 @@ maybe_reread_subscription(void)
 	MemoryContext oldctx;
 	Subscription *newsub;
 	bool		started_tx = false;
+	bool		test = true;
+
+	if (test)
+		return;
 
 	/* When cache state is valid there is nothing to do here. */
 	if (MySubscriptionValid)
#35Xuneng Zhou
xunengzhou@gmail.com
In reply to: vignesh C (#31)
Re: Add an option to skip loading missing publication to avoid logical replication failure

Hi,
Is this an expected behavior?

A race between subscriber LSN feedback and publisher subscription change
processing allows the walsender to restart decoding past relevant WAL
records, bypassing the updated subscription rules for those records.

Show quoted text

On Wed, 30 Apr 2025 at 17:41, Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Apr 30, 2025 at 11:22 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Xuneng Zhou pointed out on Discord that the test case added by
7c99dc587 has caused repeated failures in CI --- though oddly,
it's not failed in the buildfarm so far as I can find. The
failures look like

timed out waiting for match: (?^:WARNING: ( [A-Z0-9]+:)? skipped

loading publication: tap_pub_3) at
/tmp/cirrus-ci-build/src/test/subscription/t/024_add_drop_pub.pl line 103.

I analyzed the relevant publisher-side CI Logs [1]:
...
2025-04-19 08:24:14.096 UTC [21961][client backend]
[024_add_drop_pub.pl][7/4:0] LOG: statement: INSERT INTO tab_3
values(1)
2025-04-19 08:24:14.098 UTC [21961][client backend]
[024_add_drop_pub.pl][:0] LOG: disconnection: session time:
0:00:00.003 user=postgres database=postgres host=[local]
2025-04-19 08:24:14.108 UTC [21797][walsender] [tap_sub][30/0:0] LOG:
released logical replication slot "tap_sub"
2025-04-19 08:24:14.108 UTC [21797][walsender] [tap_sub][:0] LOG:
disconnection: session time: 0:00:00.329 user=postgres
database=postgres host=[local]
2025-04-19 08:24:14.127 UTC [21979][not initialized] [[unknown]][:0]
LOG: connection received: host=[local]
2025-04-19 08:24:14.128 UTC [21979][walsender] [[unknown]][23/5:0]
LOG: connection authenticated: user="postgres" method=trust

(/tmp/cirrus-ci-build/build/testrun/subscription/024_add_drop_pub/data/t_024_add_drop_pub_publisher_data/pgdata/pg_hba.conf:117)

2025-04-19 08:24:14.128 UTC [21979][walsender] [[unknown]][23/5:0]
LOG: replication connection authorized: user=postgres
application_name=tap_sub
2025-04-19 08:24:14.129 UTC [21979][walsender] [tap_sub][23/6:0] LOG:
statement: SELECT pg_catalog.set_config('search_path', '', false);
2025-04-19 08:24:14.130 UTC [21979][walsender] [tap_sub][23/0:0] LOG:
received replication command: IDENTIFY_SYSTEM
2025-04-19 08:24:14.130 UTC [21979][walsender] [tap_sub][23/0:0]
STATEMENT: IDENTIFY_SYSTEM
2025-04-19 08:24:14.131 UTC [21979][walsender] [tap_sub][23/0:0] LOG:
received replication command: START_REPLICATION SLOT "tap_sub" LOGICAL
0/0 (proto_version '4', streaming 'parallel', origin 'any',
publication_names '"tap_pub_
...

This shows that walsender restarts after the "INSERT INTO tab_3
values(1)" is processed by the previous walsender ("released logical
replication slot "tap_sub"" is after "INSERT INTO tab_3 values(1)").
So, it is possible that the old apply worker has sent the confirmation
of WAL received location after the Insert (due to keep_alive message
handling). So, after the restart, the new walsender will start
processing WAL after the INSERT and wait for the skipped message LOG
timed out.

Considering the above theory is correct, after "ALTER SUBSCRIPTION
tap_sub SET PUBLICATION tap_pub_3", we should wait for the new
walsender to restart. We are already doing the same for a similar case
in 001_rep_changes.pl (See "# check that change of connection string
and/or publication list causes restart of subscription workers. We
check the state along with application_name to ensure that the
walsender is (re)started.).

Unfortunately, I will be away for the rest of the week. In the
meantime, if you or someone else is able to reproduce and fix it, then
good; otherwise, I'll take care of it after I return.

I agree with your analysis. I was able to reproduce the issue by
delaying the invalidation of the subscription until the walsender
finished decoding the INSERT operation following the ALTER
SUBSCRIPTION through a debugger and using the lsn from the pg_waldump
of the INSERT after the ALTER SUBSCRIPTION. In this scenario, the
confirmed_flush_lsn ends up pointing to a location after the INSERT.
When the invalidation is eventually received and the apply
worker/walsender is restarted, the restarted walsender begins decoding
from that LSN—after the INSERT—which means the "skipped loading
publication" warning is never triggered, causing the test to fail.

Attached is a patch that ensures the walsender process is properly
restarted after ALTER SUBSCRIPTION, preventing this race condition.

Regards,
Vignesh

#36Tom Lane
tgl@sss.pgh.pa.us
In reply to: Xuneng Zhou (#35)
Re: Add an option to skip loading missing publication to avoid logical replication failure

Xuneng Zhou <xunengzhou@gmail.com> writes:

Is this an expected behavior?

I'm wondering that too. I don't see how the repro method Vignesh
describes could correspond to a simple timing issue. It smells
like there's a bug here somewhere.

regards, tom lane

#37vignesh C
vignesh21@gmail.com
In reply to: Xuneng Zhou (#35)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Fri, 2 May 2025 at 10:11, Xuneng Zhou <xunengzhou@gmail.com> wrote:

Hi,
Is this an expected behavior?

A race between subscriber LSN feedback and publisher subscription change processing allows the walsender to restart decoding past relevant WAL records, bypassing the updated subscription rules for those records.

We have three processes involved in this scenario:
A walsender process on the publisher, responsible for decoding and
sending WAL changes.
An apply worker process on the subscriber, which applies the changes.
A session executing the ALTER SUBSCRIPTION command.

Due to the asynchronous nature of these processes, the ALTER
SUBSCRIPTION command may not be immediately observed by the apply
worker. Meanwhile, the walsender may process and decode an INSERT
statement.
If the insert targets a table (e.g., tab_3) that does not belong to
the current publication (pub1), the walsender silently skips
replicating the record and advances its decoding position. This
position is sent in a keepalive message to the subscriber, and since
there are no pending transactions to flush, the apply worker reports
it as the latest received LSN.
Later, when the apply worker eventually detects the subscription
change, it restarts—but by then, the insert has already been skipped
and is no longer eligible for replay, as the table was not part of the
publication (pub1) at the time of decoding.
This race condition arises because the three processes run
independently and may progress at different speeds due to CPU
scheduling or system load.
Thoughts?

Regards,
Vignesh

#38Xuneng Zhou
xunengzhou@gmail.com
In reply to: vignesh C (#37)
Re: Add an option to skip loading missing publication to avoid logical replication failure

Yeh, tks for your clarification. I have a basic understanding of it now. I
mean is this considered a bug or design defect in the codebase? If so,
should we prevent it from occuring in general, not just for this specific
test.

vignesh C <vignesh21@gmail.com>

Show quoted text

We have three processes involved in this scenario:
A walsender process on the publisher, responsible for decoding and
sending WAL changes.
An apply worker process on the subscriber, which applies the changes.
A session executing the ALTER SUBSCRIPTION command.

Due to the asynchronous nature of these processes, the ALTER
SUBSCRIPTION command may not be immediately observed by the apply
worker. Meanwhile, the walsender may process and decode an INSERT
statement.
If the insert targets a table (e.g., tab_3) that does not belong to
the current publication (pub1), the walsender silently skips
replicating the record and advances its decoding position. This
position is sent in a keepalive message to the subscriber, and since
there are no pending transactions to flush, the apply worker reports
it as the latest received LSN.
Later, when the apply worker eventually detects the subscription
change, it restarts—but by then, the insert has already been skipped
and is no longer eligible for replay, as the table was not part of the
publication (pub1) at the time of decoding.
This race condition arises because the three processes run
independently and may progress at different speeds due to CPU
scheduling or system load.
Thoughts?

Regards,
Vignesh

#39Tom Lane
tgl@sss.pgh.pa.us
In reply to: vignesh C (#37)
Re: Add an option to skip loading missing publication to avoid logical replication failure

vignesh C <vignesh21@gmail.com> writes:

Due to the asynchronous nature of these processes, the ALTER
SUBSCRIPTION command may not be immediately observed by the apply
worker. Meanwhile, the walsender may process and decode an INSERT
statement.
If the insert targets a table (e.g., tab_3) that does not belong to
the current publication (pub1), the walsender silently skips
replicating the record and advances its decoding position. This
position is sent in a keepalive message to the subscriber, and since
there are no pending transactions to flush, the apply worker reports
it as the latest received LSN.

So this theory presumes that the apply worker receives and reacts to
the keepalive message, yet it has not observed a relevant
subscriber-side catalog update that surely committed before the
keepalive was generated. It's fairly hard to see how that is okay,
because it's at least adjacent to something that must be considered a
bug: applying transmitted data without having observed DDL updates to
the target table. Why is the processing of keepalives laxer than the
processing of data messages?

regards, tom lane

#40vignesh C
vignesh21@gmail.com
In reply to: vignesh C (#34)
1 attachment(s)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Fri, 2 May 2025 at 09:23, vignesh C <vignesh21@gmail.com> wrote:

On Fri, 2 May 2025 at 06:30, Tom Lane <tgl@sss.pgh.pa.us> wrote:

vignesh C <vignesh21@gmail.com> writes:

I agree with your analysis. I was able to reproduce the issue by
delaying the invalidation of the subscription until the walsender
finished decoding the INSERT operation following the ALTER
SUBSCRIPTION through a debugger and using the lsn from the pg_waldump
of the INSERT after the ALTER SUBSCRIPTION.

Can you be a little more specific about how you reproduced this?
I tried inserting sleep() calls in various likely-looking spots
and could not get a failure that way.

Test Steps:
1) Set up logical replication:
Create a publication on the publisher
Create a subscription on the subscriber
2) Create the following table on the publisher:
CREATE TABLE tab_3 (a int);
3) Create the same table on the subscriber:
CREATE TABLE tab_3 (a int);
4) On the subscriber, alter the subscription to refer to a
non-existent publication:
ALTER SUBSCRIPTION sub1 SET PUBLICATION tap_pub_3;
5) Insert data on the publisher:
INSERT INTO tab_3 VALUES (1);

As expected, the publisher logs the following warning in normal case:
2025-05-02 08:56:45.350 IST [516197] WARNING: skipped loading
publication: tap_pub_3
2025-05-02 08:56:45.350 IST [516197] DETAIL: The publication does
not exist at this point in the WAL.
2025-05-02 08:56:45.350 IST [516197] HINT: Create the publication
if it does not exist.

To simulate a delay in subscription invalidation, I modified the
maybe_reread_subscription() function as follows:
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index 4151a4b2a96..0831784aca3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3970,6 +3970,10 @@ maybe_reread_subscription(void)
MemoryContext oldctx;
Subscription *newsub;
bool            started_tx = false;
+    bool            test = true;
+
+    if (test)
+        return;

This change delays the subscription invalidation logic, preventing the
apply worker from detecting the subscription change immediately.

With the patch applied, repeat steps 1–5.
Using pg_waldump, identify the LSN of the insert:
rmgr: Heap len (rec/tot): 59/ 59, tx: 756, lsn:
0/01711848, prev 0/01711810, desc: INSERT+INIT off: 1
rmgr: Transaction len (rec/tot): 46/ 46, tx: 756, lsn:
0/01711888, prev 0/01711848, desc: COMMIT 2025-05-02 09:06:09.400926
IST

Check the confirmed flush LSN from the walsender via gdb by attaching
it to the walsender process
(gdb) p *MyReplicationSlot
...
confirmed_flush = 24241928
(gdb) p /x 24241928
$4 = 0x171e708

Now attach to the apply worker, set a breakpoint at
maybe_reread_subscription, and continue execution. Once control
reaches the function, set test = false. Now it will identify that
subscription is invalidated and restart the apply worker.

As the walsender has already confirmed_flush position after the
insert, causing the newly started apply worker to miss the inserted
row entirely. This leads to the CI failure. This issue can arise when
the walsender advances more quickly than the apply worker is able to
detect and react to the subscription change.

I could not find a simpler way to reproduce this.

A simpler way to consistently reproduce the issue is to add a 1-second
sleep in the LogicalRepApplyLoop function, just before the call to
WaitLatchOrSocket. This reproduces the test failure consistently for
me. The failure reason is the same as in [1]/messages/by-id/CALDaNm2Q_pfwiCkaV920iXEbh4D=5MmD_tNQm_GRGX6-MsLxoQ@mail.gmail.com.

[1]: /messages/by-id/CALDaNm2Q_pfwiCkaV920iXEbh4D=5MmD_tNQm_GRGX6-MsLxoQ@mail.gmail.com

Regards,
Vignesh

Attachments:

ci_failure_reproduce.patchtext/x-patch; charset=US-ASCII; name=ci_failure_reproduce.patchDownload
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4151a4b2a96..d0056f5655c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3702,6 +3702,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 						if (last_received < end_lsn)
 							last_received = end_lsn;
 
+						elog(LOG, "Send feedback from 1");
 						send_feedback(last_received, reply_requested, false);
 						UpdateWorkerStats(last_received, timestamp, true);
 					}
@@ -3714,6 +3715,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			}
 		}
 
+		elog(LOG, "Send feedback from 2");
 		/* confirm all writes so far */
 		send_feedback(last_received, false, false);
 
@@ -3739,6 +3741,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		if (endofstream)
 			break;
 
+		sleep(1);
+
 		/*
 		 * Wait for more data or latch.  If we have unflushed transactions,
 		 * wake up after WalWriterDelay to see if they've been flushed yet (in
@@ -3812,6 +3816,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 				}
 			}
 
+			elog(LOG, "Send feedback from 3");
 			send_feedback(last_received, requestReply, requestReply);
 
 			/*
@@ -3910,7 +3915,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 	pq_sendint64(reply_message, now);	/* sendTime */
 	pq_sendbyte(reply_message, requestReply);	/* replyRequested */
 
-	elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
+	elog(LOG, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
 		 force,
 		 LSN_FORMAT_ARGS(recvpos),
 		 LSN_FORMAT_ARGS(writepos),
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9fa8beb6103..9896a8d74d5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -4062,7 +4062,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 static void
 WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
 {
-	elog(DEBUG2, "sending replication keepalive");
+	elog(LOG, "sending replication keepalive - writePtr %X/%X",
+		 LSN_FORMAT_ARGS(writePtr));
 
 	/* construct the message... */
 	resetStringInfo(&output_message);
#41Amit Kapila
amit.kapila16@gmail.com
In reply to: Tom Lane (#39)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Fri, May 2, 2025 at 10:24 PM Tom Lane <tgl@sss.pgh.pa.us> wrote:

vignesh C <vignesh21@gmail.com> writes:

Due to the asynchronous nature of these processes, the ALTER
SUBSCRIPTION command may not be immediately observed by the apply
worker. Meanwhile, the walsender may process and decode an INSERT
statement.
If the insert targets a table (e.g., tab_3) that does not belong to
the current publication (pub1), the walsender silently skips
replicating the record and advances its decoding position. This
position is sent in a keepalive message to the subscriber, and since
there are no pending transactions to flush, the apply worker reports
it as the latest received LSN.

So this theory presumes that the apply worker receives and reacts to
the keepalive message, yet it has not observed a relevant
subscriber-side catalog update that surely committed before the
keepalive was generated. It's fairly hard to see how that is okay,
because it's at least adjacent to something that must be considered a
bug: applying transmitted data without having observed DDL updates to
the target table. Why is the processing of keepalives laxer than the
processing of data messages?

Valid question, as of now, we don't have a specific rule about
ordering the processing of keepalives or invalidation messages. The
effect of invalidation messages is realized by calling
maybe_reread_subscription at three different times after accepting
invalidation message, (a) after starting a transaction in
begin_replication_step, (b) in the commit message handling if there is
no data modification happened in that transaction, and (c) when we
don't get any transactions for a while

The (a) ensures we consume any target table change before applying a
new transaction. The other two places ensure that we keep consuming
invalidation messages from time to time.

Now, we can consume invalidation messages during keepalive message
handling and or at some other places, to ensure that we never process
any remote message before consuming an invalidation message. However,
it is not clear to if this is a must kind of thing. We can provide
strict guarantees for ordering of messages from any one of the
servers, but providing it across nodes doesn't sound to be a
must-criterion.

--
With Regards,
Amit Kapila.

#42Xuneng Zhou
xunengzhou@gmail.com
In reply to: Amit Kapila (#41)
Re: Add an option to skip loading missing publication to avoid logical replication failure

Hi,

A clear benefit of addressing this in code is to ensure that the user sees
the log message, which can be valuable for trouble-shooting—even under race
conditions.

ereport(WARNING,

errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

errmsg("skipped loading
publication: %s", pubname),

errdetail("The publication does not
exist at this point in the WAL."),

errhint("Create the publication if
it does not exist."));

The performance impact appears low, assuming the
AcceptInvalidationMessages and maybe_reread_subscription check are
introduced only in the code path that handles keepalive messages requiring
a reply.

Show quoted text

vignesh C <vignesh21@gmail.com> writes:

Due to the asynchronous nature of these processes, the ALTER
SUBSCRIPTION command may not be immediately observed by the apply
worker. Meanwhile, the walsender may process and decode an INSERT
statement.
If the insert targets a table (e.g., tab_3) that does not belong to
the current publication (pub1), the walsender silently skips
replicating the record and advances its decoding position. This
position is sent in a keepalive message to the subscriber, and since
there are no pending transactions to flush, the apply worker reports
it as the latest received LSN.

So this theory presumes that the apply worker receives and reacts to
the keepalive message, yet it has not observed a relevant
subscriber-side catalog update that surely committed before the
keepalive was generated. It's fairly hard to see how that is okay,
because it's at least adjacent to something that must be considered a
bug: applying transmitted data without having observed DDL updates to
the target table. Why is the processing of keepalives laxer than the
processing of data messages?

Valid question, as of now, we don't have a specific rule about
ordering the processing of keepalives or invalidation messages. The
effect of invalidation messages is realized by calling
maybe_reread_subscription at three different times after accepting
invalidation message, (a) after starting a transaction in
begin_replication_step, (b) in the commit message handling if there is
no data modification happened in that transaction, and (c) when we
don't get any transactions for a while

The (a) ensures we consume any target table change before applying a
new transaction. The other two places ensure that we keep consuming
invalidation messages from time to time.

Now, we can consume invalidation messages during keepalive message
handling and or at some other places, to ensure that we never process
any remote message before consuming an invalidation message. However,
it is not clear to if this is a must kind of thing. We can provide
strict guarantees for ordering of messages from any one of the
servers, but providing it across nodes doesn't sound to be a
must-criterion.

--
With Regards,
Amit Kapila.

#43Amit Kapila
amit.kapila16@gmail.com
In reply to: Xuneng Zhou (#42)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Tue, May 6, 2025 at 3:33 PM Xuneng Zhou <xunengzhou@gmail.com> wrote:

A clear benefit of addressing this in code is to ensure that the user sees the log message, which can be valuable for trouble-shooting—even under race conditions.

I don't think we can take that guarantee because if the Insert is
concurrent or slightly before the Alter Subscription command, then
there won't be a guarantee that users will see the skipped LOG
message.

--
With Regards,
Amit Kapila.

#44Amit Kapila
amit.kapila16@gmail.com
In reply to: Amit Kapila (#43)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Tue, May 6, 2025 at 5:17 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, May 6, 2025 at 3:33 PM Xuneng Zhou <xunengzhou@gmail.com> wrote:

A clear benefit of addressing this in code is to ensure that the user sees the log message, which can be valuable for trouble-shooting—even under race conditions.

I don't think we can take that guarantee because if the Insert is
concurrent or slightly before the Alter Subscription command, then
there won't be a guarantee that users will see the skipped LOG
message.

I am planning to proceed with the test-fix proposed by Vignesh [1]/messages/by-id/CALDaNm3TH3J8fwj+NcdjdN8DZCdrnmm1kzBsHBW2nkN+h6up3A@mail.gmail.com
early next week unless we want to discuss more on this issue.

[1]: /messages/by-id/CALDaNm3TH3J8fwj+NcdjdN8DZCdrnmm1kzBsHBW2nkN+h6up3A@mail.gmail.com

--
With Regards,
Amit Kapila.

#45Xuneng Zhou
xunengzhou@gmail.com
In reply to: vignesh C (#40)
Re: Add an option to skip loading missing publication to avoid logical replication failure

Hi, I was able to reproduce the failure by adding a 1-second sleep in the
LogicalRepApplyLoop function
However, I noticed that the tests under src/test/subscription run
significantly slower— is this normal?

Also, it looks like the patch mentioned in this thread addresses the issue:
/messages/by-id/CALDaNm2Q_pfwiCkaV920iXEbh4D=5MmD_tNQm_GRGX6-MsLxoQ@mail.gmail.com

Show quoted text

A simpler way to consistently reproduce the issue is to add a 1-second
sleep in the LogicalRepApplyLoop function, just before the call to
WaitLatchOrSocket. This reproduces the test failure consistently for
me. The failure reason is the same as in [1].

[1] -
/messages/by-id/CALDaNm2Q_pfwiCkaV920iXEbh4D=5MmD_tNQm_GRGX6-MsLxoQ@mail.gmail.com

Regards,
Vignesh

#46Amit Kapila
amit.kapila16@gmail.com
In reply to: Xuneng Zhou (#45)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Sun, May 11, 2025 at 8:49 PM Xuneng Zhou <xunengzhou@gmail.com> wrote:

Hi, I was able to reproduce the failure by adding a 1-second sleep in the LogicalRepApplyLoop function
However, I noticed that the tests under src/test/subscription run significantly slower— is this normal?

Yes, because you made apply slower by adding a sleep.

Also, it looks like the patch mentioned in this thread addresses the issue:
/messages/by-id/CALDaNm2Q_pfwiCkaV920iXEbh4D=5MmD_tNQm_GRGX6-MsLxoQ@mail.gmail.com

So you are okay with a test-only fix?

With Regards,
Amit Kapila.

#47Xuneng Zhou
xunengzhou@gmail.com
In reply to: Amit Kapila (#46)
Re: Add an option to skip loading missing publication to avoid logical replication failure

If the presumed theory regarding the cause of the issue is correct — as
outlined in this email
</messages/by-id/CALDaNm2Pmbc-7KM3nRgZcq1EBhbdvWJSTie-st57oGuKP4O44w@mail.gmail.com&gt;
— and no data replication occurs in this scenario
</messages/by-id/CAA4eK1Jz20hnPRDtPDo2BbSt0Xf8u2zY4Tc84R0OAQN8M=9iCQ@mail.gmail.com&gt;
, then the proposed fix seems ok to me.
But I don’t have the expertise to fully assess the trade-offs between
enforcing strict ordering across nodes and maintaining the current behavior.

Show quoted text

Also, it looks like the patch mentioned in this thread addresses the

issue:

/messages/by-id/CALDaNm2Q_pfwiCkaV920iXEbh4D=5MmD_tNQm_GRGX6-MsLxoQ@mail.gmail.com

So you are okay with a test-only fix?

#48Amit Kapila
amit.kapila16@gmail.com
In reply to: Amit Kapila (#44)
Re: Add an option to skip loading missing publication to avoid logical replication failure

On Sat, May 10, 2025 at 5:15 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

I am planning to proceed with the test-fix proposed by Vignesh [1]
early next week unless we want to discuss more on this issue.

Pushed.

[1] - /messages/by-id/CALDaNm3TH3J8fwj+NcdjdN8DZCdrnmm1kzBsHBW2nkN+h6up3A@mail.gmail.com

--
With Regards,
Amit Kapila.