From f8301073b332c896c8d356bdea5d5d1534c21a6f Mon Sep 17 00:00:00 2001 From: amit Date: Thu, 7 Nov 2019 18:10:44 +0900 Subject: [PATCH 2/2] Support adding partitioned tables to publication --- doc/src/sgml/logical-replication.sgml | 23 +++++++++---- doc/src/sgml/ref/create_publication.sgml | 13 ++++---- src/backend/catalog/pg_publication.c | 51 ++++++++++++++++------------- src/backend/commands/publicationcmds.c | 12 +++++-- src/backend/commands/subscriptioncmds.c | 27 +++++++++++++-- src/backend/executor/execReplication.c | 19 +++++------ src/backend/replication/logical/relation.c | 1 + src/backend/replication/logical/tablesync.c | 24 +++++++++++--- src/include/replication/logicalproto.h | 1 + src/test/regress/expected/publication.out | 3 -- 10 files changed, 115 insertions(+), 59 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index f657d1d06e..d67015e160 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -402,13 +402,22 @@ - Replication is only possible from base tables to base tables. That is, - the tables on the publication and on the subscription side must be normal - tables, not views, materialized views, partition root tables, or foreign - tables. In the case of partitions, you can therefore replicate a - partition hierarchy one-to-one, but you cannot currently replicate to a - differently partitioned setup. Attempts to replicate tables other than - base tables will result in an error. + Replication is only supported by regular and partitioned tables. + Attempts to replicate relations other than regular and partitioned tables, + such as views, materialized views, or foreign tables, will result in an + error. However, note that replicating from a regular table to partitioned + table or vice versa is not supported. + + + + When a partitioned table is added to a publication, all of its existing + and future partitions are implicitly considered to be part of the + publication. Any changes made to the leaf partitions are sent to the + subscription server which must contain a partitioned table with partition + hierarchy matching one-to-one with the publication side partitioned + table. For partitioned tables on the two sides to match one-to-one, each + partition with a given partition constraint must have the same name on + both sides. diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index 99f87ca393..5e11868989 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -68,15 +68,16 @@ CREATE PUBLICATION name that table is added to the publication. If ONLY is not specified, the table and all its descendant tables (if any) are added. Optionally, * can be specified after the table name to - explicitly indicate that descendant tables are included. + explicitly indicate that descendant tables are included. However, adding + a partitioned table to a publication never explicitly adds its partitions, + because partitions are implicitly published due to the partitioned table + being added to the publication. - Only persistent base tables can be part of a publication. Temporary - tables, unlogged tables, foreign tables, materialized views, regular - views, and partitioned tables cannot be part of a publication. To - replicate a partitioned table, add the individual partitions to the - publication. + Only persistent base tables and partitioned tables can be part of a + publication. Temporary tables, unlogged tables, foreign tables, + materialized views, regular views cannot be part of a publication. diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 80b98e2c3c..d4cc805499 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -30,6 +30,7 @@ #include "catalog/namespace.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" +#include "catalog/partition.h" #include "catalog/pg_type.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" @@ -50,17 +51,9 @@ static void check_publication_add_relation(Relation targetrel) { - /* Give more specific error for partitioned tables */ - if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("\"%s\" is a partitioned table", - RelationGetRelationName(targetrel)), - errdetail("Adding partitioned tables to publications is not supported."), - errhint("You can add the table partitions individually."))); - - /* Must be table */ - if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION) + /* Must be a regular or partitioned table */ + if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && + RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("\"%s\" is not a table", @@ -106,7 +99,8 @@ check_publication_add_relation(Relation targetrel) static bool is_publishable_class(Oid relid, Form_pg_class reltuple) { - return reltuple->relkind == RELKIND_RELATION && + return (reltuple->relkind == RELKIND_RELATION || + reltuple->relkind == RELKIND_PARTITIONED_TABLE) && !IsCatalogRelationOid(relid) && reltuple->relpersistence == RELPERSISTENCE_PERMANENT && relid >= FirstNormalObjectId; @@ -224,7 +218,8 @@ publication_add_relation(Oid pubid, Relation targetrel, /* - * Finds all publications associated with the relation. + * Finds all publications associated with the relation and if it's a + * partition, also with any of its ancestors. */ List * GetRelationPublications(Relation rel) @@ -233,20 +228,32 @@ GetRelationPublications(Relation rel) List *result = NIL; CatCList *pubrellist; int i; + ListCell *lc; + List *target_rels = NIL; - /* Find all publications associated with the relation. */ - pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP, - ObjectIdGetDatum(relid)); - for (i = 0; i < pubrellist->n_members; i++) + /* For a partition, include its ancestors' publications, if any. */ + if (rel->rd_rel->relispartition) + target_rels = get_partition_ancestors(RelationGetRelid(rel)); + + target_rels = lappend_oid(target_rels, relid); + + foreach(lc, target_rels) { - HeapTuple tup = &pubrellist->members[i]->tuple; - Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid; + Oid relid = lfirst_oid(lc); - result = lappend_oid(result, pubid); + pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid)); + for (i = 0; i < pubrellist->n_members; i++) + { + HeapTuple tup = &pubrellist->members[i]->tuple; + Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid; + + result = lappend_oid(result, pubid); + } + + ReleaseSysCacheList(pubrellist); } - ReleaseSysCacheList(pubrellist); - return result; } diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index f115d4bf80..db17c47495 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -502,7 +502,8 @@ RemovePublicationRelById(Oid proid) /* * Open relations specified by a RangeVar list. - * The returned tables are locked in ShareUpdateExclusiveLock mode. + * The returned tables are locked in ShareUpdateExclusiveLock mode in order to + * add them to a publication. */ static List * OpenTableList(List *tables) @@ -543,8 +544,13 @@ OpenTableList(List *tables) rels = lappend(rels, rel); relids = lappend_oid(relids, myrelid); - /* Add children of this rel, if requested */ - if (recurse) + /* + * Add children of this rel, if requested, so that they too are added + * to the publication. A partitioned table can't have any inheritance + * children other than its partitions, which need not be explicitly + * added to the publication. + */ + if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) { List *children; ListCell *child; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 11c0f305ff..eb555f1d64 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1149,11 +1149,20 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) Assert(list_length(publications) > 0); initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, c.relkind\n" + appendStringInfoString(&cmd, "SELECT s.schemaname, s.tablename, s.relkind FROM (\n" + " SELECT DISTINCT t.pubname, t.schemaname, t.tablename, c.relkind\n" " FROM pg_catalog.pg_publication_tables t\n" " JOIN pg_class c ON t.schemaname = c.relnamespace::regnamespace::name\n" " AND t.tablename = c.relname\n" - " WHERE t.pubname IN ("); + " UNION\n" + " SELECT DISTINCT t.pubname, s.schemaname, s.tablename, s.relkind\n" + " FROM pg_catalog.pg_publication_tables t,\n" + " LATERAL (SELECT c.relnamespace::regnamespace::name, c.relname, c.relkind\n" + " FROM pg_class c\n" + " JOIN pg_partition_tree(t.schemaname || '.' || t.tablename) p\n" + " ON p.relid = c.oid\n" + " WHERE p.level > 0) AS s(schemaname, tablename, relkind)) s\n" + " WHERE s.pubname IN ("); first = true; foreach(lc, publications) @@ -1224,5 +1233,19 @@ ValidateSubscriptionRel(PublicationTable *pt) local_relkind = get_rel_relkind(relid); CheckSubscriptionRelkind(local_relkind, rv->schemaname, rv->relname); + /* + * Cannot replicate from a regular to a partitioned table or vice + * versa. + */ + if (local_relkind != pt->relkind) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot use relation \"%s.%s\" as logical replication target", + rv->schemaname, rv->relname), + errdetail("\"%s.%s\" is a %s on subscription side whereas a %s on publication side", + rv->schemaname, rv->relname, + local_relkind == RELKIND_RELATION ? "regular table" : "partitioned table", + pt->relkind == RELKIND_RELATION ? "regular table" : "partitioned table"))); + return relid; } diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 95e027c970..f05f44c99f 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -591,17 +591,10 @@ CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname) { /* - * We currently only support writing to regular tables. However, give a - * more specific error for partitioned and foreign tables. + * We currently only support writing to regular and partitioned tables. + * However, give a more specific error for foreign tables. */ - if (relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot use relation \"%s.%s\" as logical replication target", - nspname, relname), - errdetail("\"%s.%s\" is a partitioned table.", - nspname, relname))); - else if (relkind == RELKIND_FOREIGN_TABLE) + if (relkind == RELKIND_FOREIGN_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", @@ -609,7 +602,11 @@ CheckSubscriptionRelkind(char relkind, const char *nspname, errdetail("\"%s.%s\" is a foreign table.", nspname, relname))); - if (relkind != RELKIND_RELATION) + /* + * Subscription for partitioned tables are really placeholder objects, as + * replication itself occurs on the individual partition level. + */ + if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index f938d1fa48..6a10593e79 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -177,6 +177,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel) entry->remoterel.remoteid = remoterel->remoteid; entry->remoterel.nspname = pstrdup(remoterel->nspname); entry->remoterel.relname = pstrdup(remoterel->relname); + entry->remoterel.relkind = remoterel->relkind; entry->remoterel.natts = remoterel->natts; entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 7881079e96..fa469c8c67 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -637,7 +637,8 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Get information about remote relation in similar fashion the RELATION - * message provides during replication. + * message provides during replication. XXX - while we fetch relkind too + * here, the RELATION message doesn't provide it */ static void fetch_remote_table_info(char *nspname, char *relname, @@ -646,7 +647,7 @@ fetch_remote_table_info(char *nspname, char *relname, WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[2] = {OIDOID, CHAROID}; + Oid tableRow[3] = {OIDOID, CHAROID, CHAROID}; Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID}; bool isnull; int natt; @@ -656,16 +657,16 @@ fetch_remote_table_info(char *nspname, char *relname, /* First fetch Oid and replica identity. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT c.oid, c.relreplident" + appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind" " FROM pg_catalog.pg_class c" " INNER JOIN pg_catalog.pg_namespace n" " ON (c.relnamespace = n.oid)" " WHERE n.nspname = %s" " AND c.relname = %s" - " AND c.relkind = 'r'", + " AND pg_relation_is_publishable(c.oid)", quote_literal_cstr(nspname), quote_literal_cstr(relname)); - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + res = walrcv_exec(wrconn, cmd.data, 3, tableRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, @@ -682,6 +683,8 @@ fetch_remote_table_info(char *nspname, char *relname, Assert(!isnull); lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull)); Assert(!isnull); + lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); @@ -769,6 +772,17 @@ copy_table(Relation rel) relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); Assert(rel == relmapentry->localrel); + /* + * If either table is partitioned, skip copying. Individual partitions + * will be copied instead. + */ + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE || + lrel.relkind == RELKIND_PARTITIONED_TABLE) + { + logicalrep_rel_close(relmapentry, NoLock); + return; + } + /* Start copy on the publisher. */ initStringInfo(&cmd); appendStringInfo(&cmd, "COPY %s TO STDOUT", diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 3fc430af01..0fea368d99 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -45,6 +45,7 @@ typedef struct LogicalRepRelation LogicalRepRelId remoteid; /* unique id of the relation */ char *nspname; /* schema name */ char *relname; /* relation name */ + char relkind; /* relation kind */ int natts; /* number of columns */ char **attnames; /* column names */ Oid *atttyps; /* column types */ diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index feb51e4add..ee0db9b07b 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -144,9 +144,6 @@ ERROR: "testpub_view" is not a table DETAIL: Only tables can be added to publications. -- fail - partitioned table ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted; -ERROR: "testpub_parted" is a partitioned table -DETAIL: Adding partitioned tables to publications is not supported. -HINT: You can add the table partitions individually. ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1; ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1; ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk; -- 2.11.0