From 78b8e3e959d83e898ad5befaf55fe46c6fdd6f20 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Wed, 4 Aug 2021 15:33:15 +0800 Subject: [PATCH] fix-ALTER-SUB-ADD-DROP-PUBLICATION --- src/backend/commands/subscriptioncmds.c | 76 +++++++++++++++------- src/test/subscription/t/001_rep_changes.pl | 21 +++++- 2 files changed, 71 insertions(+), 26 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5157f44058..68a128ced0 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -606,7 +606,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, } static void -AlterSubscription_refresh(Subscription *sub, bool copy_data) +AlterSubscription_refresh(Subscription *sub, bool copy_data, + AlterSubscriptionType type) { char *err; List *pubrel_names; @@ -617,6 +618,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) int off; int remove_rel_len; Relation rel = NULL; + int ntables_to_drop = 0; typedef struct SubRemoveRels { Oid relid; @@ -625,6 +627,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) SubRemoveRels *sub_remove_rels; WalReceiverConn *wrconn; + Assert(type == ALTER_SUBSCRIPTION_SET_PUBLICATION || + type == ALTER_SUBSCRIPTION_ADD_PUBLICATION || + type == ALTER_SUBSCRIPTION_DROP_PUBLICATION || + type == ALTER_SUBSCRIPTION_REFRESH); + /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); @@ -656,8 +663,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) subrel_local_oids[off++] = relstate->relid; } - qsort(subrel_local_oids, list_length(subrel_states), - sizeof(Oid), oid_cmp); + + if (type != ALTER_SUBSCRIPTION_DROP_PUBLICATION) + qsort(subrel_local_oids, list_length(subrel_states), + sizeof(Oid), oid_cmp); /* * Rels that we want to remove from subscription and drop any slots @@ -681,22 +690,25 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) Oid relid; relid = RangeVarGetRelid(rv, AccessShareLock, false); - - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); - pubrel_local_oids[off++] = relid; - if (!bsearch(&relid, subrel_local_oids, - list_length(subrel_states), sizeof(Oid), oid_cmp)) + if (type != ALTER_SUBSCRIPTION_DROP_PUBLICATION) { - AddSubscriptionRelState(sub->oid, relid, - copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, - InvalidXLogRecPtr); - ereport(DEBUG1, - (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", - rv->schemaname, rv->relname, sub->name))); + /* Check for supported relkind. */ + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + + if (!bsearch(&relid, subrel_local_oids, + list_length(subrel_states), sizeof(Oid), oid_cmp)) + { + AddSubscriptionRelState(sub->oid, relid, + copy_data ? SUBREL_STATE_INIT : + SUBREL_STATE_READY, + InvalidXLogRecPtr); + ereport(DEBUG1, + (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", + rv->schemaname, rv->relname, sub->name))); + } } } @@ -704,16 +716,30 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) * Next remove state for tables we should not care about anymore using * the data we collected above */ - qsort(pubrel_local_oids, list_length(pubrel_names), - sizeof(Oid), oid_cmp); + if (type != ALTER_SUBSCRIPTION_ADD_PUBLICATION) + { + qsort(pubrel_local_oids, list_length(pubrel_names), + sizeof(Oid), oid_cmp); + ntables_to_drop = list_length(subrel_states); + } remove_rel_len = 0; - for (off = 0; off < list_length(subrel_states); off++) + for (off = 0; off < ntables_to_drop; off++) { Oid relid = subrel_local_oids[off]; - - if (!bsearch(&relid, pubrel_local_oids, - list_length(pubrel_names), sizeof(Oid), oid_cmp)) + bool drop_table = false; + + if (type == ALTER_SUBSCRIPTION_SET_PUBLICATION || + type == ALTER_SUBSCRIPTION_REFRESH) + drop_table = !bsearch(&relid, pubrel_local_oids, + list_length(pubrel_names), + sizeof(Oid), oid_cmp); + else if (type == ALTER_SUBSCRIPTION_DROP_PUBLICATION) + drop_table = bsearch(&relid, pubrel_local_oids, + list_length(pubrel_names), + sizeof(Oid), oid_cmp); + + if (drop_table) { char state; XLogRecPtr statelsn; @@ -994,7 +1020,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, stmt->kind); } break; @@ -1045,7 +1071,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Only refresh the added/dropped list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, stmt->kind); } break; @@ -1087,7 +1113,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, stmt->kind); break; } diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 0c84d87873..f04df8b8d9 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -6,7 +6,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 32; +use Test::More tests => 34; # Initialize publisher node my $node_publisher = PostgresNode->new('publisher'); @@ -262,6 +262,25 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM temp1"); is($result, qq(1), 'check rows on subscriber with multiple publications'); +# Test changing the list of subscribed publications +# Removes publications from the list of publications +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub_temp1 DROP PUBLICATION tap_pub_temp1"); + +# pg_subscription_rel should only have one row +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel t1, pg_subscription t2 WHERE t1.srsubid = t2.oid AND t2.subname = 'tap_sub_temp1'"); +is($result, qq(1), + 'check one relation was removed from subscribed list'); + +# Add additional publications +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub_temp1 ADD PUBLICATION tap_pub_temp1"); + +# pg_subscription_rel should only have two rows +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel t1, pg_subscription t2 WHERE t1.srsubid = t2.oid AND t2.subname = 'tap_sub_temp1'"); +is($result, qq(2), + 'check one more relation was subscribed'); + # Drop subscription as we don't need it anymore $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_temp1"); -- 2.27.0