From 411cf2a1af7960b7604b4120e5f5061734f3c796 Mon Sep 17 00:00:00 2001 From: Houzhijie Date: Mon, 2 Aug 2021 20:09:29 +0800 Subject: [PATCH] fix ALTER SUB ADD DROP PUBLICATION --- src/backend/commands/subscriptioncmds.c | 76 +++++++++++++++++-------- 1 file changed, 51 insertions(+), 25 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 22ae982328..9addfdd869 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -625,7 +625,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, } static void -AlterSubscription_refresh(Subscription *sub, bool copy_data) +AlterSubscription_refresh(Subscription *sub, bool copy_data, + AlterSubscriptionType type) { char *err; List *pubrel_names; @@ -636,6 +637,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) int off; int remove_rel_len; Relation rel = NULL; + int ntables_to_drop = 0; typedef struct SubRemoveRels { Oid relid; @@ -644,6 +646,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) SubRemoveRels *sub_remove_rels; WalReceiverConn *wrconn; + Assert(type == ALTER_SUBSCRIPTION_SET_PUBLICATION || + type == ALTER_SUBSCRIPTION_ADD_PUBLICATION || + type == ALTER_SUBSCRIPTION_DROP_PUBLICATION || + type == ALTER_SUBSCRIPTION_REFRESH); + /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); @@ -675,8 +682,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) subrel_local_oids[off++] = relstate->relid; } - qsort(subrel_local_oids, list_length(subrel_states), - sizeof(Oid), oid_cmp); + + if (type != ALTER_SUBSCRIPTION_DROP_PUBLICATION) + qsort(subrel_local_oids, list_length(subrel_states), + sizeof(Oid), oid_cmp); /* * Rels that we want to remove from subscription and drop any slots @@ -700,22 +709,25 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) Oid relid; relid = RangeVarGetRelid(rv, AccessShareLock, false); - - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); - pubrel_local_oids[off++] = relid; - if (!bsearch(&relid, subrel_local_oids, - list_length(subrel_states), sizeof(Oid), oid_cmp)) + if (type != ALTER_SUBSCRIPTION_DROP_PUBLICATION) { - AddSubscriptionRelState(sub->oid, relid, - copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, - InvalidXLogRecPtr); - ereport(DEBUG1, - (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", - rv->schemaname, rv->relname, sub->name))); + /* Check for supported relkind. */ + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + + if (!bsearch(&relid, subrel_local_oids, + list_length(subrel_states), sizeof(Oid), oid_cmp)) + { + AddSubscriptionRelState(sub->oid, relid, + copy_data ? SUBREL_STATE_INIT : + SUBREL_STATE_READY, + InvalidXLogRecPtr); + ereport(DEBUG1, + (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", + rv->schemaname, rv->relname, sub->name))); + } } } @@ -723,16 +735,30 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) * Next remove state for tables we should not care about anymore using * the data we collected above */ - qsort(pubrel_local_oids, list_length(pubrel_names), - sizeof(Oid), oid_cmp); + if (type != ALTER_SUBSCRIPTION_ADD_PUBLICATION) + { + qsort(pubrel_local_oids, list_length(pubrel_names), + sizeof(Oid), oid_cmp); + ntables_to_drop = list_length(subrel_states); + } remove_rel_len = 0; - for (off = 0; off < list_length(subrel_states); off++) + for (off = 0; off < ntables_to_drop; off++) { Oid relid = subrel_local_oids[off]; - - if (!bsearch(&relid, pubrel_local_oids, - list_length(pubrel_names), sizeof(Oid), oid_cmp)) + bool drop_table = false; + + if (type == ALTER_SUBSCRIPTION_SET_PUBLICATION || + type == ALTER_SUBSCRIPTION_REFRESH) + drop_table = !bsearch(&relid, pubrel_local_oids, + list_length(pubrel_names), + sizeof(Oid), oid_cmp); + else if (type == ALTER_SUBSCRIPTION_DROP_PUBLICATION) + drop_table = bsearch(&relid, pubrel_local_oids, + list_length(pubrel_names), + sizeof(Oid), oid_cmp); + + if (drop_table) { char state; XLogRecPtr statelsn; @@ -1019,7 +1045,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, stmt->kind); } break; @@ -1070,7 +1096,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Only refresh the added/dropped list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, stmt->kind); } break; @@ -1112,7 +1138,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, stmt->kind); break; } -- 2.28.0.windows.1