From 182a78d1b14616479421a433f1784f401e2ac294 Mon Sep 17 00:00:00 2001 From: rahila Date: Mon, 7 Jun 2021 16:27:21 +0530 Subject: [PATCH] Add column filtering to logical replication Add capability to specifiy column names while linking the table to a publication, at the time of CREATE or ALTER publication. This will allow replicating only the specified columns. Other columns, if any, on the subscriber will be populated locally or NULL will be inserted if no value is supplied for the column by the upstream during INSERT. This facilitates replication to a table on subscriber containing only the subscribed/filtered columns. If no filter is specified, all the columns are replicated. REPLICA IDENTITY columns are always replicated. Thus, prohibit adding relation to publication, if column filters do not contain REPLICA IDENTITY. Add a tap test for the same in src/test/subscription. --- src/backend/access/common/relation.c | 21 +++ src/backend/catalog/pg_publication.c | 65 ++++++++- src/backend/commands/copyfromparse.c | 1 - src/backend/commands/publicationcmds.c | 50 +++++-- src/backend/nodes/copyfuncs.c | 13 ++ src/backend/nodes/equalfuncs.c | 12 ++ src/backend/nodes/outfuncs.c | 12 ++ src/backend/nodes/readfuncs.c | 16 +++ src/backend/parser/gram.y | 27 +++- src/backend/replication/logical/proto.c | 86 ++++++++--- src/backend/replication/logical/tablesync.c | 97 ++++++++++++- src/backend/replication/pgoutput/pgoutput.c | 75 ++++++++-- src/include/catalog/pg_publication.h | 9 +- src/include/catalog/pg_publication_rel.h | 4 + src/include/nodes/nodes.h | 1 + src/include/nodes/parsenodes.h | 6 + src/include/replication/logicalproto.h | 6 +- src/include/utils/rel.h | 1 + src/test/subscription/t/021_column_filter.pl | 143 +++++++++++++++++++ 19 files changed, 574 insertions(+), 71 deletions(-) create mode 100644 src/test/subscription/t/021_column_filter.pl diff --git a/src/backend/access/common/relation.c b/src/backend/access/common/relation.c index 632d13c1ea..59c1136f2e 100644 --- a/src/backend/access/common/relation.c +++ b/src/backend/access/common/relation.c @@ -21,12 +21,14 @@ #include "postgres.h" #include "access/relation.h" +#include "access/sysattr.h" #include "access/xact.h" #include "catalog/namespace.h" #include "miscadmin.h" #include "pgstat.h" #include "storage/lmgr.h" #include "utils/inval.h" +#include "utils/lsyscache.h" #include "utils/syscache.h" @@ -215,3 +217,22 @@ relation_close(Relation relation, LOCKMODE lockmode) if (lockmode != NoLock) UnlockRelationId(&relid, lockmode); } + +/* + * Return a bitmapset of attributes given the list of column names + */ +Bitmapset* +get_table_columnset(Oid relid, List *columns, Bitmapset *att_map) +{ + ListCell *cell; + foreach(cell, columns) + { + const char *attname = lfirst(cell); + int attnum = get_attnum(relid, attname); + + if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, att_map)) + att_map = bms_add_member(att_map, + attnum - FirstLowInvalidHeapAttributeNumber); + } + return att_map; +} diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 2a2fe03c13..6687fcb12d 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -47,8 +47,12 @@ * error if not. */ static void -check_publication_add_relation(Relation targetrel) +check_publication_add_relation(Relation targetrel, List *targetcols) { + bool replidentfull = (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); + Oid relid = RelationGetRelid(targetrel); + Bitmapset *idattrs; + /* Must be a regular or partitioned table */ if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) @@ -73,6 +77,35 @@ check_publication_add_relation(Relation targetrel) errmsg("cannot add relation \"%s\" to publication", RelationGetRelationName(targetrel)), errdetail("Temporary and unlogged relations cannot be replicated."))); + + /* + * Cannot specify column filter when REPLICA IDENTITY IS FULL + * or if column filter does not contain REPLICA IDENITY columns + */ + if (targetcols != NIL) + { + if (replidentfull) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add relation \"%s\" to publication", + RelationGetRelationName(targetrel)), + errdetail("Cannot have column filter with REPLICA IDENTITY FULL"))); + else + { + Bitmapset *filtermap = NULL; + idattrs = RelationGetIndexAttrBitmap(targetrel, INDEX_ATTR_BITMAP_IDENTITY_KEY); + + filtermap = get_table_columnset(relid, targetcols, filtermap); + if (!bms_is_subset(idattrs, filtermap)) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add relation \"%s\" to publication", + RelationGetRelationName(targetrel)), + errdetail("Column filter must include REPLICA IDENTITY columns"))); + } + } + } } /* @@ -141,18 +174,20 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS) * Insert new publication / relation mapping. */ ObjectAddress -publication_add_relation(Oid pubid, Relation targetrel, +publication_add_relation(Oid pubid, PublicationRelationInfo *targetrel, bool if_not_exists) { Relation rel; HeapTuple tup; Datum values[Natts_pg_publication_rel]; bool nulls[Natts_pg_publication_rel]; - Oid relid = RelationGetRelid(targetrel); + Oid relid = RelationGetRelid(targetrel->relation); Oid prrelid; Publication *pub = GetPublication(pubid); ObjectAddress myself, referenced; + ListCell *lc; + List *target_cols = NIL; rel = table_open(PublicationRelRelationId, RowExclusiveLock); @@ -172,10 +207,18 @@ publication_add_relation(Oid pubid, Relation targetrel, ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("relation \"%s\" is already member of publication \"%s\"", - RelationGetRelationName(targetrel), pub->name))); + RelationGetRelationName(targetrel->relation), pub->name))); } - check_publication_add_relation(targetrel); + foreach(lc, targetrel->columns) + { + char *colname; + + colname = strVal(lfirst(lc)); + target_cols = lappend(target_cols, colname); + + } + check_publication_add_relation(targetrel->relation, target_cols); /* Form a tuple. */ memset(values, 0, sizeof(values)); @@ -188,6 +231,8 @@ publication_add_relation(Oid pubid, Relation targetrel, ObjectIdGetDatum(pubid); values[Anum_pg_publication_rel_prrelid - 1] = ObjectIdGetDatum(relid); + values[Anum_pg_publication_rel_prattrs - 1] = + PointerGetDatum(strlist_to_textarray(target_cols)); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -196,7 +241,15 @@ publication_add_relation(Oid pubid, Relation targetrel, heap_freetuple(tup); ObjectAddressSet(myself, PublicationRelRelationId, prrelid); + foreach(lc, target_cols) + { + int attnum; + attnum = get_attnum(relid, lfirst(lc)); + /* Add dependency on the column */ + ObjectAddressSubSet(referenced, RelationRelationId, relid, attnum); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + } /* Add dependency on the publication */ ObjectAddressSet(referenced, PublicationRelationId, pubid); recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); @@ -209,7 +262,7 @@ publication_add_relation(Oid pubid, Relation targetrel, table_close(rel, RowExclusiveLock); /* Invalidate relcache so that publication info is rebuilt. */ - CacheInvalidateRelcache(targetrel); + CacheInvalidateRelcache(targetrel->relation); return myself; } diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index fdf57f1556..515728df67 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -839,7 +839,6 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("extra data after last expected column"))); - fieldno = 0; /* Loop to read the user attributes on the line. */ diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 8487eeb7e6..aee5645e31 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -393,7 +393,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, foreach(newlc, rels) { - Relation newrel = (Relation) lfirst(newlc); + PublicationRelationInfo *newpubrel = (PublicationRelationInfo *) lfirst(newlc); + Relation newrel = newpubrel->relation; if (RelationGetRelid(newrel) == oldrelid) { @@ -401,13 +402,20 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, break; } } - + /* Not yet in the list, open it and add to the list */ if (!found) { Relation oldrel = table_open(oldrelid, ShareUpdateExclusiveLock); - - delrels = lappend(delrels, oldrel); + /* + * Wrap relation into PublicationRelationInfo + */ + PublicationRelationInfo *pubrel = palloc(sizeof(PublicationRelationInfo)); + pubrel->relation = oldrel; + pubrel->relid = oldrelid; + /* This is not needed to delete a table */ + pubrel->columns = NIL; + delrels = lappend(delrels, pubrel); } } @@ -498,9 +506,9 @@ RemovePublicationRelById(Oid proid) } /* - * Open relations specified by a RangeVar list. - * The returned tables are locked in ShareUpdateExclusiveLock mode in order to - * add them to a publication. + * Open relations specified by a PublicationTable list. + * In the returned list of PublicationRelationInfo, tables are locked + * in ShareUpdateExclusiveLock mode in order to add them to a publication. */ static List * OpenTableList(List *tables) @@ -514,10 +522,12 @@ OpenTableList(List *tables) */ foreach(lc, tables) { - RangeVar *rv = lfirst_node(RangeVar, lc); + PublicationTable *t = lfirst(lc); + RangeVar *rv = castNode(RangeVar, t->relation); bool recurse = rv->inh; Relation rel; Oid myrelid; + PublicationRelationInfo *pub_rel; /* Allow query cancel in case this takes a long time */ CHECK_FOR_INTERRUPTS(); @@ -538,7 +548,11 @@ OpenTableList(List *tables) continue; } - rels = lappend(rels, rel); + pub_rel = palloc(sizeof(PublicationRelationInfo)); + pub_rel->relation = rel; + pub_rel->relid = myrelid; + pub_rel->columns = t->columns; + rels = lappend(rels, pub_rel); relids = lappend_oid(relids, myrelid); /* @@ -571,7 +585,11 @@ OpenTableList(List *tables) /* find_all_inheritors already got lock */ rel = table_open(childrelid, NoLock); - rels = lappend(rels, rel); + pub_rel = palloc(sizeof(PublicationRelationInfo)); + pub_rel->relation = rel; + pub_rel->relid = childrelid; + pub_rel->columns = t->columns; + rels = lappend(rels, pub_rel); relids = lappend_oid(relids, childrelid); } } @@ -592,9 +610,9 @@ CloseTableList(List *rels) foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); + PublicationRelationInfo *pub_rel = (PublicationRelationInfo *)lfirst(lc); - table_close(rel, NoLock); + table_close(pub_rel->relation, NoLock); } } @@ -611,7 +629,8 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); + PublicationRelationInfo *pub_rel = (PublicationRelationInfo *)lfirst(lc); + Relation rel = pub_rel->relation; ObjectAddress obj; /* Must be owner of the table or superuser. */ @@ -619,7 +638,7 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind), RelationGetRelationName(rel)); - obj = publication_add_relation(pubid, rel, if_not_exists); + obj = publication_add_relation(pubid, pub_rel, if_not_exists); if (stmt) { EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, @@ -643,7 +662,8 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); + PublicationRelationInfo *pubrel = (PublicationRelationInfo *) lfirst(lc); + Relation rel = pubrel->relation; Oid relid = RelationGetRelid(rel); prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 38251c2b8e..44f1a4e7e6 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4939,6 +4939,16 @@ _copyForeignKeyCacheInfo(const ForeignKeyCacheInfo *from) return newnode; } +static PublicationTable * +_copyPublicationTable(const PublicationTable *from) +{ + PublicationTable *newnode = makeNode(PublicationTable); + + COPY_NODE_FIELD(relation); + COPY_NODE_FIELD(columns); + + return newnode; +} /* * copyObjectImpl -- implementation of copyObject(); see nodes/nodes.h @@ -5854,6 +5864,9 @@ copyObjectImpl(const void *from) case T_PartitionCmd: retval = _copyPartitionCmd(from); break; + case T_PublicationTable: + retval = _copyPublicationTable(from); + break; /* * MISCELLANEOUS NODES diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 8a1762000c..b0f37b2ceb 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -3114,6 +3114,15 @@ _equalValue(const Value *a, const Value *b) return true; } +static bool +_equalPublicationTable(const PublicationTable *a, const PublicationTable *b) +{ + COMPARE_NODE_FIELD(relation); + COMPARE_NODE_FIELD(columns); + + return true; +} + /* * equal * returns whether two nodes are equal @@ -3862,6 +3871,9 @@ equal(const void *a, const void *b) case T_PartitionCmd: retval = _equalPartitionCmd(a, b); break; + case T_PublicationTable: + retval = _equalPublicationTable(a, b); + break; default: elog(ERROR, "unrecognized node type: %d", diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 87561cbb6f..f04eb536c9 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -3821,6 +3821,15 @@ _outPartitionRangeDatum(StringInfo str, const PartitionRangeDatum *node) WRITE_LOCATION_FIELD(location); } +static void +_outPublicationTable(StringInfo str, const PublicationTable *node) +{ + WRITE_NODE_TYPE("PUBLICATIONTABLE"); + + WRITE_NODE_FIELD(relation); + WRITE_NODE_FIELD(columns); +} + /* * outNode - * converts a Node into ascii string and append it to 'str' @@ -4520,6 +4529,9 @@ outNode(StringInfo str, const void *obj) case T_PartitionRangeDatum: _outPartitionRangeDatum(str, obj); break; + case T_PublicationTable: + _outPublicationTable(str, obj); + break; default: diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 0dd1ad7dfc..0be46165b4 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -2702,6 +2702,20 @@ _readPartitionRangeDatum(void) READ_DONE(); } +/* + * _readPublicationTable + */ +static PublicationTable * +_readPublicationTable(void) +{ + READ_LOCALS(PublicationTable); + + READ_NODE_FIELD(relation); + READ_NODE_FIELD(columns); + + READ_DONE(); +} + /* * parseNodeString * @@ -2973,6 +2987,8 @@ parseNodeString(void) return_value = _readPartitionBoundSpec(); else if (MATCH("PARTITIONRANGEDATUM", 19)) return_value = _readPartitionRangeDatum(); + else if (MATCH("PUBLICATIONTABLE", 16)) + return_value = _readPublicationTable(); else { elog(ERROR, "badly formatted node string \"%.32s\"...", token); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 39a2849eba..2c9af95db8 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -426,14 +426,14 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); transform_element_list transform_type_list TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list - drop_option_list + drop_option_list publication_table_list %type opt_routine_body %type group_clause %type group_by_list %type group_by_item empty_grouping_set rollup_clause cube_clause %type grouping_sets_clause -%type opt_publication_for_tables publication_for_tables +%type opt_publication_for_tables publication_for_tables publication_table %type opt_fdw_options fdw_options %type fdw_option @@ -9620,7 +9620,7 @@ opt_publication_for_tables: ; publication_for_tables: - FOR TABLE relation_expr_list + FOR TABLE publication_table_list { $$ = (Node *) $3; } @@ -9630,6 +9630,21 @@ publication_for_tables: } ; +publication_table_list: + publication_table + { $$ = list_make1($1); } + | publication_table_list ',' publication_table + { $$ = lappend($1, $3); } + ; + +publication_table: relation_expr opt_column_list + { + PublicationTable *n = makeNode(PublicationTable); + n->relation = $1; + n->columns = $2; + $$ = (Node *) n; + } + ; /***************************************************************************** * @@ -9651,7 +9666,7 @@ AlterPublicationStmt: n->options = $5; $$ = (Node *)n; } - | ALTER PUBLICATION name ADD_P TABLE relation_expr_list + | ALTER PUBLICATION name ADD_P TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; @@ -9659,7 +9674,7 @@ AlterPublicationStmt: n->tableAction = DEFELEM_ADD; $$ = (Node *)n; } - | ALTER PUBLICATION name SET TABLE relation_expr_list + | ALTER PUBLICATION name SET TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; @@ -9667,7 +9682,7 @@ AlterPublicationStmt: n->tableAction = DEFELEM_SET; $$ = (Node *)n; } - | ALTER PUBLICATION name DROP TABLE relation_expr_list + | ALTER PUBLICATION name DROP TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 9f5bf4b639..e5712d4f64 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -29,9 +29,9 @@ #define TRUNCATE_CASCADE (1<<0) #define TRUNCATE_RESTART_SEQS (1<<1) -static void logicalrep_write_attrs(StringInfo out, Relation rel); +static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple, bool binary); + HeapTuple tuple, bool binary, Bitmapset *att_map); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -398,7 +398,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) */ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, - HeapTuple newtuple, bool binary) + HeapTuple newtuple, bool binary, Bitmapset *att_map) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -410,7 +410,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, binary, att_map); } /* @@ -442,7 +442,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, - HeapTuple oldtuple, HeapTuple newtuple, bool binary) + HeapTuple oldtuple, HeapTuple newtuple, bool binary, Bitmapset *att_map) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -463,11 +463,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, binary, att_map); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, binary, att_map); } /* @@ -536,7 +536,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, binary, NULL); } /* @@ -651,7 +651,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, * Write relation description to the output stream. */ void -logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) +logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *att_map) { char *relname; @@ -673,7 +673,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) pq_sendbyte(out, rel->rd_rel->relreplident); /* send the attribute info */ - logicalrep_write_attrs(out, rel); + logicalrep_write_attrs(out, rel, att_map); } /* @@ -749,20 +749,37 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary) +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary, + Bitmapset *att_map) { TupleDesc desc; Datum values[MaxTupleAttributeNumber]; bool isnull[MaxTupleAttributeNumber]; int i; uint16 nliveatts = 0; + Bitmapset *idattrs = NULL; + bool replidentfull; + Form_pg_attribute att; desc = RelationGetDescr(rel); + replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); + if (!replidentfull) + idattrs = RelationGetIdentityKeyBitmap(rel); + for (i = 0; i < desc->natts; i++) { + att = TupleDescAttr(desc, i); if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) continue; + /* + * Do not increment count of attributes if not a part of column filters + * except for replica identity columns or if replica identity is full. + */ + if (att_map != NULL && !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, att_map) + && !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, idattrs) + && !replidentfull) + continue; nliveatts++; } pq_sendint16(out, nliveatts); @@ -800,6 +817,16 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar continue; } + /* + * Do not send attribute data if it is not a part of column filters, + * except if it is a part of REPLICA IDENTITY or REPLICA IDENTITY is + * full, send the data. + */ + if (att_map != NULL && !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, att_map) + && !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, idattrs) + && !replidentfull) + continue; + typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid)); if (!HeapTupleIsValid(typtup)) elog(ERROR, "cache lookup failed for type %u", att->atttypid); @@ -904,7 +931,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) * Write relation attribute metadata to the stream. */ static void -logicalrep_write_attrs(StringInfo out, Relation rel) +logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map) { TupleDesc desc; int i; @@ -914,20 +941,34 @@ logicalrep_write_attrs(StringInfo out, Relation rel) desc = RelationGetDescr(rel); + /* fetch bitmap of REPLICATION IDENTITY attributes */ + replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); + if (!replidentfull) + idattrs = RelationGetIdentityKeyBitmap(rel); + /* send number of live attributes */ for (i = 0; i < desc->natts; i++) { - if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped || att->attgenerated) + continue; + /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ + if (replidentfull || bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + idattrs)) + { + nliveatts++; + continue; + } + /* Skip sending if not a part of column filter */ + if (att_map != NULL && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + att_map)) continue; nliveatts++; } pq_sendint16(out, nliveatts); - /* fetch bitmap of REPLICATION IDENTITY attributes */ - replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); - if (!replidentfull) - idattrs = RelationGetIdentityKeyBitmap(rel); - /* send the attributes */ for (i = 0; i < desc->natts; i++) { @@ -937,6 +978,13 @@ logicalrep_write_attrs(StringInfo out, Relation rel) if (att->attisdropped || att->attgenerated) continue; + /* Exlude filtered columns, REPLICA IDENTITY COLUMNS CAN'T BE EXCLUDED */ + if (att_map != NULL && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + att_map) && !bms_is_member(att->attnum + - FirstLowInvalidHeapAttributeNumber, idattrs) + && !replidentfull) + continue; /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ if (replidentfull || bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, @@ -944,7 +992,6 @@ logicalrep_write_attrs(StringInfo out, Relation rel) flags |= LOGICALREP_IS_REPLICA_IDENTITY; pq_sendbyte(out, flags); - /* attribute name */ pq_sendstring(out, NameStr(att->attname)); @@ -953,6 +1000,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel) /* attribute mode */ pq_sendint32(out, att->atttypmod); + } bms_free(idattrs); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index f07983a43c..9bd834914b 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -111,6 +111,7 @@ #include "replication/origin.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "utils/array.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -695,19 +696,27 @@ fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel) { WalRcvExecResult *res; + WalRcvExecResult *res_pub; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; + TupleTableSlot *slot_pub; + Oid tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID}; Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID}; + Oid pubRow[] = {TEXTARRAYOID}; bool isnull; - int natt; + int natt,i; + Datum *elems; + int nelems; + List *pub_columns = NIL; + ListCell *lc; + bool am_partition = false; lrel->nspname = nspname; lrel->relname = relname; /* First fetch Oid and replica identity. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind" + appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition" " FROM pg_catalog.pg_class c" " INNER JOIN pg_catalog.pg_namespace n" " ON (c.relnamespace = n.oid)" @@ -737,6 +746,7 @@ fetch_remote_table_info(char *nspname, char *relname, Assert(!isnull); lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); Assert(!isnull); + am_partition = DatumGetChar(slot_getattr(slot, 4, &isnull)); ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); @@ -774,11 +784,79 @@ fetch_remote_table_info(char *nspname, char *relname, natt = 0; slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + + /* + * Now, fetch the values of publications' column filters + * For a partition, use pg_inherit to find the parent, + * as the pg_publication_rel contains only the topmost parent + * table entry in case the table is partitioned. + * Run a recursive query to iterate through all the parents + * of the partition and retreive the record for the parent + * that exists in pg_publication_rel. + */ + resetStringInfo(&cmd); + if (!am_partition) + appendStringInfo(&cmd, "SELECT prattrs from pg_publication_rel" + " WHERE prrelid = %u", lrel->remoteid); + else + appendStringInfo(&cmd, "WITH RECURSIVE t(inhparent) AS ( SELECT inhparent from pg_inherits where inhrelid = %u" + " UNION SELECT pg.inhparent from pg_inherits pg, t where inhrelid = t.inhparent)" + " SELECT prattrs from pg_publication_rel WHERE prrelid IN (SELECT inhparent from t)", lrel->remoteid); + + res_pub = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(pubRow), pubRow); + + if (res_pub->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch published columns info for table \"%s.%s\" from publisher: %s", + nspname, relname, res_pub->err))); + slot_pub = MakeSingleTupleTableSlot(res_pub->tupledesc, &TTSOpsMinimalTuple); + + while (tuplestore_gettupleslot(res_pub->tuplestore, true, false, slot_pub)) + { + deconstruct_array(DatumGetArrayTypePCopy(slot_getattr(slot_pub, 1, &isnull)), + TEXTOID, -1, false, 'i', + &elems, NULL, &nelems); + for (i = 0; i < nelems; i++) + pub_columns = lappend(pub_columns, TextDatumGetCString(elems[i])); + ExecClearTuple(slot_pub); + } + ExecDropSingleTupleTableSlot(slot_pub); + walrcv_clear_result(res_pub); + + /* + * Store the column names only if they are contained in column filter + * LogicalRepRelation will only contain attributes corresponding + * to those specficied in column filters. + */ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { - lrel->attnames[natt] = + char * rel_colname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + bool found = false; Assert(!isnull); + if (pub_columns != NIL) + { + foreach(lc, pub_columns) + { + char *pub_colname = lfirst(lc); + if(!strcmp(pub_colname, rel_colname)) + { + found = true; + lrel->attnames[natt] = rel_colname; + break; + } + } + } + else + { + found = true; + lrel->attnames[natt] = rel_colname; + } + if (!found) + continue; + lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); Assert(!isnull); if (DatumGetBool(slot_getattr(slot, 3, &isnull))) @@ -829,8 +907,17 @@ copy_table(Relation rel) /* Start copy on the publisher. */ initStringInfo(&cmd); if (lrel.relkind == RELKIND_RELATION) - appendStringInfo(&cmd, "COPY %s TO STDOUT", + { + appendStringInfo(&cmd, "COPY %s (", quote_qualified_identifier(lrel.nspname, lrel.relname)); + for (int i = 0; i < lrel.natts; i++) + { + appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); + if (i < lrel.natts - 1) + appendStringInfoString(&cmd, ", "); + } + appendStringInfo(&cmd, ") TO STDOUT"); + } else { /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 14d737fd93..aa2a46a503 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -15,16 +15,19 @@ #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_rel_d.h" #include "commands/defrem.h" #include "fmgr.h" #include "replication/logical.h" #include "replication/logicalproto.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "utils/builtins.h" #include "utils/int8.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/rel.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -81,11 +84,11 @@ static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx); + LogicalDecodingContext *ctx, + Bitmapset *att_map); static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); - /* * Entry in the map used to remember which relation schemas we sent. * @@ -130,6 +133,7 @@ typedef struct RelationSyncEntry * having identical TupleDesc. */ TupleConversionMap *map; + Bitmapset *att_map; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ @@ -570,11 +574,11 @@ maybe_send_schema(LogicalDecodingContext *ctx, } MemoryContextSwitchTo(oldctx); - send_relation_and_attrs(ancestor, xid, ctx); + send_relation_and_attrs(ancestor, xid, ctx, relentry->att_map); RelationClose(ancestor); } - send_relation_and_attrs(relation, xid, ctx); + send_relation_and_attrs(relation, xid, ctx, relentry->att_map); if (in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); @@ -587,7 +591,8 @@ maybe_send_schema(LogicalDecodingContext *ctx, */ static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx) + LogicalDecodingContext *ctx, + Bitmapset *att_map) { TupleDesc desc = RelationGetDescr(relation); int i; @@ -609,14 +614,24 @@ send_relation_and_attrs(Relation relation, TransactionId xid, if (att->atttypid < FirstGenbkiObjectId) continue; - + /* + * Do not send type information if attribute is + * not present in column filter. + * XXX Allow sending type information for REPLICA + * IDENTITY COLUMNS with user created type. + * even when they are not mentioned in column filters. + */ + if (att_map != NULL && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + att_map)) + continue; OutputPluginPrepareWrite(ctx, false); logicalrep_write_typ(ctx->out, xid, att->atttypid); OutputPluginWrite(ctx, false); } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, xid, relation); + logicalrep_write_rel(ctx->out, xid, relation, att_map); OutputPluginWrite(ctx, false); } @@ -693,7 +708,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); logicalrep_write_insert(ctx->out, xid, relation, tuple, - data->binary); + data->binary, relentry->att_map); OutputPluginWrite(ctx, true); break; } @@ -722,7 +737,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); logicalrep_write_update(ctx->out, xid, relation, oldtuple, - newtuple, data->binary); + newtuple, data->binary, relentry->att_map); OutputPluginWrite(ctx, true); break; } @@ -1119,6 +1134,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); bool found; + Oid ancestor_id; MemoryContext oldctx; Assert(RelationSyncCache != NULL); @@ -1139,8 +1155,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->publish_as_relid = InvalidOid; - entry->map = NULL; /* will be set by maybe_send_schema() if - * needed */ + entry->att_map = NULL; + entry->map = NULL; /* will be set by maybe_send_schema() if needed */ } /* Validate the entry */ @@ -1171,6 +1187,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) { Publication *pub = lfirst(lc); bool publish = false; + bool ancestor_published = false; if (pub->alltables) { @@ -1181,7 +1198,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) if (!publish) { - bool ancestor_published = false; /* * For a partition, check if any of the ancestors are @@ -1206,6 +1222,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) pub->oid)) { ancestor_published = true; + ancestor_id = ancestor; if (pub->pubviaroot) publish_as_relid = ancestor; } @@ -1224,15 +1241,41 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) if (publish && (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot)) { + int nelems, i; + bool isnull; + Datum *elems; + HeapTuple pub_rel_tuple; + Datum pub_rel_cols; + List *columns = NIL; + + if (ancestor_published) + pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(ancestor_id), + ObjectIdGetDatum(pub->oid)); + else + pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(publish_as_relid), + ObjectIdGetDatum(pub->oid)); + if (HeapTupleIsValid(pub_rel_tuple)) + { + pub_rel_cols = SysCacheGetAttr(PUBLICATIONRELMAP, pub_rel_tuple, Anum_pg_publication_rel_prattrs, &isnull); + if (!isnull) + { + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + deconstruct_array(DatumGetArrayTypePCopy(pub_rel_cols), + TEXTOID, -1, false, 'i', + &elems, NULL, &nelems); + for (i = 0; i < nelems; i++) + columns = lappend(columns, TextDatumGetCString(elems[i])); + entry->att_map = get_table_columnset(publish_as_relid, columns, entry->att_map); + MemoryContextSwitchTo(oldctx); + } + ReleaseSysCache(pub_rel_tuple); + } entry->pubactions.pubinsert |= pub->pubactions.pubinsert; entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } - if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete && entry->pubactions.pubtruncate) - break; } list_free(pubids); @@ -1328,6 +1371,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) entry->schema_sent = false; list_free(entry->streamed_txns); entry->streamed_txns = NIL; + bms_free(entry->att_map); + entry->att_map = NULL; if (entry->map) { /* diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index f332bad4d4..7bdc9bb9b8 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -83,6 +83,13 @@ typedef struct Publication PublicationActions pubactions; } Publication; +typedef struct PublicationRelationInfo +{ + Oid relid; + Relation relation; + List *columns; +} PublicationRelationInfo; + extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); extern List *GetRelationPublications(Oid relid); @@ -108,7 +115,7 @@ extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(bool pubviaroot); extern bool is_publishable_relation(Relation rel); -extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, +extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelationInfo *targetrel, bool if_not_exists); extern Oid get_publication_oid(const char *pubname, bool missing_ok); diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index b5d5504cbb..d1d4eec2c0 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -31,6 +31,9 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) Oid oid; /* oid */ Oid prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ Oid prrelid BKI_LOOKUP(pg_class); /* Oid of the relation */ +#ifdef CATALOG_VARLEN + text prattrs[1]; /* Variable length field starts here */ +#endif } FormData_pg_publication_rel; /* ---------------- @@ -40,6 +43,7 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) */ typedef FormData_pg_publication_rel *Form_pg_publication_rel; +DECLARE_TOAST(pg_publication_rel, 8895, 8896); DECLARE_UNIQUE_INDEX_PKEY(pg_publication_rel_oid_index, 6112, PublicationRelObjectIndexId, on pg_publication_rel using btree(oid oid_ops)); DECLARE_UNIQUE_INDEX(pg_publication_rel_prrelid_prpubid_index, 6113, PublicationRelPrrelidPrpubidIndexId, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops)); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 6a4d82f0a8..56d13ff022 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -490,6 +490,7 @@ typedef enum NodeTag T_PartitionRangeDatum, T_PartitionCmd, T_VacuumRelation, + T_PublicationTable, /* * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h) diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 7af13dee43..a9660e405c 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3624,6 +3624,12 @@ typedef struct AlterTSConfigurationStmt bool missing_ok; /* for DROP - skip error if missing? */ } AlterTSConfigurationStmt; +typedef struct PublicationTable +{ + NodeTag type; + RangeVar *relation; /* relation to be published */ + List *columns; /* List of columns in a publication table */ +} PublicationTable; typedef struct CreatePublicationStmt { diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 83741dcf42..709b4be916 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -207,11 +207,11 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin, extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, - bool binary); + bool binary, Bitmapset *att_map); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple, bool binary); + HeapTuple newtuple, bool binary, Bitmapset *att_map); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); @@ -228,7 +228,7 @@ extern List *logicalrep_read_truncate(StringInfo in, extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, - Relation rel); + Relation rel, Bitmapset *att_map); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid); diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index b4faa1c123..b4c49fa32f 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -680,5 +680,6 @@ RelationGetSmgr(Relation rel) /* routines in utils/cache/relcache.c */ extern void RelationIncrementReferenceCount(Relation rel); extern void RelationDecrementReferenceCount(Relation rel); +extern Bitmapset* get_table_columnset(Oid relid, List *columns, Bitmapset *att_map); #endif /* REL_H */ diff --git a/src/test/subscription/t/021_column_filter.pl b/src/test/subscription/t/021_column_filter.pl new file mode 100644 index 0000000000..334e14da95 --- /dev/null +++ b/src/test/subscription/t/021_column_filter.pl @@ -0,0 +1,143 @@ +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test TRUNCATE +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 9; + +# setup + +my $node_publisher = PostgresNode->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = PostgresNode->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + qq(max_logical_replication_workers = 6)); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int)"); +# Test with weird column names +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, B varchar, \"c'\" int)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)"); +#Test replication with multi-level partition +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, \"c'\" int)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, b varchar)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)"); + +#Test create publication with column filtering +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1 FOR TABLE tab1(a, \"B\"), tab3(\"a'\",\"c'\"), test_part(a,b)"); + +my $result = $node_publisher->safe_psql('postgres', + "select relname, prattrs from pg_publication_rel pb, pg_class pc where pb.prrelid = pc.oid;"); +is($result, qq(tab1|{a,B} +tab3|{a',c'} +test_part|{a,b}), 'publication relation updated'); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" +); +#Initial sync +$node_publisher->wait_for_catchup('sub1'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1,2,3)"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab3 VALUES (1,2,3)"); +#Test for replication of partition data +$node_publisher->safe_psql('postgres', + "INSERT INTO test_part VALUES (1,'abc', '2021-07-04 12:00:00')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_part VALUES (2,'bcd', '2021-07-03 11:12:13')"); +#Test for replication of multi-level partition data +$node_publisher->safe_psql('postgres', + "INSERT INTO test_part VALUES (4,'abc', '2021-07-04 12:00:00')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_part VALUES (5,'bcd', '2021-07-03 11:12:13')"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab1"); +is($result, qq(1|2|), 'insert on column c is not replicated'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab3"); +is($result, qq(1|3), 'insert on column b is not replicated'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM test_part"); +is($result, qq(1|abc\n2|bcd\n4|abc\n5|bcd), 'insert on all columns is replicated'); + +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET c = 5 where a = 1"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab1"); +is($result, qq(1|2|), 'update on column c is not replicated'); + +#Test alter publication with column filtering +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub1 ADD TABLE tab2(a, b)"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION" +); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 VALUES (1,'abc',3)"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab2"); +is($result, qq(1|abc), 'insert on column c is not replicated'); + +$node_publisher->safe_psql('postgres', + "UPDATE tab2 SET c = 5 where a = 1"); +is($result, qq(1|abc), 'update on column c is not replicated'); + +#Test error conditions +my ($psql_rc, $psql_out, $psql_err) = $node_publisher->psql('postgres', + "CREATE PUBLICATION pub2 FOR TABLE test_part(b)"); +like($psql_err, qr/Column filter must include REPLICA IDENTITY columns/, 'Error when column filter does not contain REPLICA IDENTITY'); + +$node_publisher->safe_psql('postgres', + "ALTER TABLE test_part DROP COLUMN b"); +$result = $node_publisher->safe_psql('postgres', + "select relname, prattrs from pg_publication_rel pb, pg_class pc where pb.prrelid = pc.oid;"); +is($result, qq(tab1|{a,B} +tab2|{a,b} +tab3|{a',c'}), 'publication relation test_part removed'); -- 2.17.2 (Apple Git-113)