From 856403e4943954267b0e9b4fa96ccc6955838210 Mon Sep 17 00:00:00 2001
From: Vignesh <vignesh21@gmail.com>
Date: Mon, 3 Mar 2025 11:54:27 +0530
Subject: [PATCH v6] Fix logical replication breakage after ALTER SUBSCRIPTION
 ... SET PUBLICATION

Altering a subscription with `ALTER SUBSCRIPTION ... SET PUBLICATION`
could cause logical replication to break under certain conditions. When
the apply worker restarts after executing SET PUBLICATION, it continues
using the existing replication slot and origin. If the replication origin
was not updated before the restart, the WAL start location could point to
a position prior to the existence of the specified publication, leading to
persistent start of apply worker and reporting errors.

This patch skips loading the publication if the publication does not exist
and loads the publication and updates the relation entry when the publication
gets created.

Discussion: https://www.postgresql.org/message-id/flat/CALDaNm0-n8FGAorM%2BbTxkzn%2BAOUyx5%3DL_XmnvOP6T24%2B-NcBKg%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/CAA4eK1+T-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q@mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7d464f656aa..4cfd2b02023 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1762,6 +1762,8 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 
 /*
  * Load publications from the list of publication names.
+ *
+ * Here, we just skip the publications that don't exist yet.
  */
 static List *
 LoadPublications(List *pubnames)
@@ -1772,9 +1774,12 @@ LoadPublications(List *pubnames)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, false);
+		Publication *pub = GetPublicationByName(pubname, true);
 
-		result = lappend(result, pub);
+		if (pub)
+			result = lappend(result, pub);
+		else
+			elog(WARNING, "skipped loading publication: %s", pubname);
 	}
 
 	return result;
@@ -2053,8 +2058,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->attrmap = NULL;
 	}
 
-	/* Validate the entry */
-	if (!entry->replicate_valid)
+	/*
+	 * Validate the entry if (a) the entry is not valid, or (b) there is a
+	 * change in the publications.
+	 */
+	if (!entry->replicate_valid || !publications_valid)
 	{
 		Oid			schemaId = get_rel_namespace(relid);
 		List	   *pubids = GetRelationPublications(relid);
-- 
2.43.0

