From ac986d987b1eae6dae320deb76686cac2d904bf2 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 5 Aug 2021 18:29:26 +0800 Subject: [PATCH] 0001-fix-ALTER-SUB-ADD-DROP-PUBLICATION Currently, in function AlterSubscription_refresh(), it will compare the tables from the target publication with the tables in local pg_subscription_rel. And then add the tables to pg_subscription_rel which not exists yet, and delele the tables from pg_subscription_rel which doesn't exists in target publication. But When ALTER SUB ADD/DROP PUBLICATION, the code only pass added or dropped publications as the target publication to AlterSubscription_refresh() which could result in wrong record in pg_subscription_rel and unexpected table sync. Fix the use of AlterSubscription_refresh() by passing correct publications to it and distinguish the refresh logic of ADD and DROP. --- src/backend/commands/subscriptioncmds.c | 71 +++++++---- src/test/subscription/t/024_alter_sub_pub.pl | 124 +++++++++++++++++++ 2 files changed, 171 insertions(+), 24 deletions(-) create mode 100644 src/test/subscription/t/024_alter_sub_pub.pl diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5157f44058..6b14ef1fbc 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -606,7 +606,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, } static void -AlterSubscription_refresh(Subscription *sub, bool copy_data) +AlterSubscription_refresh(Subscription *sub, bool copy_data, + AlterSubscriptionType type) { char *err; List *pubrel_names; @@ -617,6 +618,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) int off; int remove_rel_len; Relation rel = NULL; + int ntables_to_drop = 0; typedef struct SubRemoveRels { Oid relid; @@ -625,6 +627,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) SubRemoveRels *sub_remove_rels; WalReceiverConn *wrconn; + Assert(type == ALTER_SUBSCRIPTION_SET_PUBLICATION || + type == ALTER_SUBSCRIPTION_ADD_PUBLICATION || + type == ALTER_SUBSCRIPTION_DROP_PUBLICATION || + type == ALTER_SUBSCRIPTION_REFRESH); + /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); @@ -656,8 +663,14 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) subrel_local_oids[off++] = relstate->relid; } - qsort(subrel_local_oids, list_length(subrel_states), - sizeof(Oid), oid_cmp); + + /* + * Don't need to lookup oid in subrel_local_oids if we are DROPing + * PUBLICATION, so skip the sort. + */ + if (type != ALTER_SUBSCRIPTION_DROP_PUBLICATION) + qsort(subrel_local_oids, list_length(subrel_states), + sizeof(Oid), oid_cmp); /* * Rels that we want to remove from subscription and drop any slots @@ -681,34 +694,41 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) Oid relid; relid = RangeVarGetRelid(rv, AccessShareLock, false); - - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); - pubrel_local_oids[off++] = relid; - if (!bsearch(&relid, subrel_local_oids, - list_length(subrel_states), sizeof(Oid), oid_cmp)) + if (type != ALTER_SUBSCRIPTION_DROP_PUBLICATION) { - AddSubscriptionRelState(sub->oid, relid, - copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, - InvalidXLogRecPtr); - ereport(DEBUG1, - (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", - rv->schemaname, rv->relname, sub->name))); + /* Check for supported relkind. */ + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + + if (!bsearch(&relid, subrel_local_oids, + list_length(subrel_states), sizeof(Oid), oid_cmp)) + { + AddSubscriptionRelState(sub->oid, relid, + copy_data ? SUBREL_STATE_INIT : + SUBREL_STATE_READY, + InvalidXLogRecPtr); + ereport(DEBUG1, + (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", + rv->schemaname, rv->relname, sub->name))); + } } } /* * Next remove state for tables we should not care about anymore using - * the data we collected above + * the data we collected above, if we are not ADDing PUBLICATION. */ - qsort(pubrel_local_oids, list_length(pubrel_names), - sizeof(Oid), oid_cmp); + if (type != ALTER_SUBSCRIPTION_ADD_PUBLICATION) + { + qsort(pubrel_local_oids, list_length(pubrel_names), + sizeof(Oid), oid_cmp); + ntables_to_drop = list_length(subrel_states); + } remove_rel_len = 0; - for (off = 0; off < list_length(subrel_states); off++) + for (off = 0; off < ntables_to_drop; off++) { Oid relid = subrel_local_oids[off]; @@ -994,7 +1014,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, stmt->kind); } break; @@ -1043,9 +1063,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); /* Only refresh the added/dropped list of publications. */ - sub->publications = stmt->publication; + if (stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION) + sub->publications = stmt->publication; + else + sub->publications = publist; - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, stmt->kind); } break; @@ -1087,7 +1110,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, stmt->kind); break; } diff --git a/src/test/subscription/t/024_alter_sub_pub.pl b/src/test/subscription/t/024_alter_sub_pub.pl new file mode 100644 index 0000000000..974182aebe --- /dev/null +++ b/src/test/subscription/t/024_alter_sub_pub.pl @@ -0,0 +1,124 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# This test checks that constraints work on subscriber +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 8; + +# Initialize publisher node +my $node_publisher = PostgresNode->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgresNode->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create tables on publisher +$node_publisher->safe_psql('postgres', "CREATE TABLE temp1 (a int)"); +$node_publisher->safe_psql('postgres', "CREATE TABLE temp2 (a int)"); +$node_publisher->safe_psql('postgres', "CREATE TABLE temp3 (a int)"); + +# Create tables on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE temp1 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE temp2 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE temp3 (a int)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_temp1 FOR TABLE temp1"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_temp2 FOR TABLE temp2"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_temp3 FOR TABLE temp3, temp2"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_temp1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub_temp1, tap_pub_temp2, tap_pub_temp3" +); + +# check initial pg_subscription_rel +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(3), + 'three relation was subscribed'); + +# Test changing the list of subscribed publications +# Removes table from publication +$node_publisher->safe_psql('postgres', "ALTER PUBLICATION tap_pub_temp1 DROP TABLE temp1"); + +# Remove publications from the list of publications +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub_temp1 DROP PUBLICATION tap_pub_temp1"); + +# temp1 should have been deleted from pg_subscription_rel +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel"); + +is($result, qq(2), + 'check one relation was removed from subscribed list'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel where srrelid::regclass::text = 'temp1'"); +is($result, qq(0), + 'check relation temp1 was removed from subscribed list'); + + +# Removes publications from the list of publications +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub_temp1 DROP PUBLICATION tap_pub_temp3"); + +# one row should have been deleted from pg_subscription_rel +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(1), + 'check one relation was removed from subscribed list'); + +# temp3 should have been deleted from pg_subscription_rel +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel where srrelid::regclass::text = 'temp3'"); +is($result, qq(0), + 'check relation temp1 was removed from subscribed list'); + +# temp2 should still exists +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel where srrelid::regclass::text = 'temp2'"); +is($result, qq(1), + 'check relation temp1 was removed from subscribed list'); + +# Readd table to publication +$node_publisher->safe_psql('postgres', "ALTER PUBLICATION tap_pub_temp1 ADD TABLE temp1"); + +# Add additional publications +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub_temp1 ADD PUBLICATION tap_pub_temp1"); + +# pg_subscription_rel should have two rows(temp1, temp2) +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(2), + 'check one more relation was subscribed'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel where srrelid::regclass::text = 'temp1'"); +is($result, qq(1), + 'check relation temp1 was added to subscribed list'); + +# Drop subscription as we don't need it anymore +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_temp1"); + +# Drop publications as we don't need them anymore +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_temp1"); +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_temp2"); +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_temp3"); + +# Clean up the tables on both publisher and subscriber as we don't need them +$node_publisher->safe_psql('postgres', "DROP TABLE temp1"); +$node_publisher->safe_psql('postgres', "DROP TABLE temp2"); +$node_publisher->safe_psql('postgres', "DROP TABLE temp3"); +$node_subscriber->safe_psql('postgres', "DROP TABLE temp1"); +$node_subscriber->safe_psql('postgres', "DROP TABLE temp2"); +$node_subscriber->safe_psql('postgres', "DROP TABLE temp3"); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); -- 2.27.0