diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 42c06af239..376409b667 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -82,7 +82,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, static bool publications_valid; static bool in_streaming; -static List *LoadPublications(List *pubnames); +static List *LoadPublications(List *pubnames, bool *skipped); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, @@ -1726,9 +1726,13 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we just skip the publications that don't exist yet. 'skipped' + * will be true if we find any publication from the given list that doesn't + * exist. */ static List * -LoadPublications(List *pubnames) +LoadPublications(List *pubnames, bool *skipped) { List *result = NIL; ListCell *lc; @@ -1736,9 +1740,12 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + *skipped = true; } return result; @@ -1993,7 +2000,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) } /* Validate the entry */ - if (!entry->replicate_valid) + if (!entry->replicate_valid || !publications_valid) { Oid schemaId = get_rel_namespace(relid); List *pubids = GetRelationPublications(relid); @@ -2010,6 +2017,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); List *rel_publications = NIL; + bool skipped_pub = false; /* Reload publications if needed before use. */ if (!publications_valid) @@ -2020,9 +2028,15 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) list_free_deep(data->publications); data->publications = NIL; } - data->publications = LoadPublications(data->publication_names); + data->publications = LoadPublications(data->publication_names, &skipped_pub); MemoryContextSwitchTo(oldctx); - publications_valid = true; + + /* + * We don't consider the publications to be valid till we have + * information of all the publications. + */ + if (!skipped_pub) + publications_valid = true; } /*