From 4711ee538a35c7a5c4eb4f23258e3bd8a3ab0bb4 Mon Sep 17 00:00:00 2001 From: rahila Date: Mon, 7 Jun 2021 16:27:21 +0530 Subject: [PATCH] Add column filtering to logical replication Add capability to specifiy column names while linking the table to a publication, at the time of CREATE or ALTER publication. This will allow replicating only the specified columns. Rest of the columns on the subscriber will be populated locally. This facilitates replication to a table on subscriber containing only the subscribed/filtered columns. If column names are not specified, all the columns are replicated. REPLICA IDENTITY columns are always replicated irrespective of the column filters. Add a tap test for the same in src/test/subscription. --- src/backend/catalog/pg_publication.c | 20 +++- src/backend/commands/copyfromparse.c | 1 - src/backend/commands/publicationcmds.c | 50 +++++--- src/backend/nodes/copyfuncs.c | 13 +++ src/backend/nodes/equalfuncs.c | 12 ++ src/backend/nodes/outfuncs.c | 12 ++ src/backend/nodes/readfuncs.c | 16 +++ src/backend/parser/gram.y | 27 ++++- src/backend/replication/logical/proto.c | 86 +++++++++++--- src/backend/replication/logical/relation.c | 1 - src/backend/replication/logical/tablesync.c | 96 ++++++++++++++- src/backend/replication/pgoutput/pgoutput.c | 95 ++++++++++++--- src/include/catalog/pg_publication.h | 9 +- src/include/catalog/pg_publication_rel.h | 4 + src/include/nodes/nodes.h | 1 + src/include/nodes/parsenodes.h | 6 + src/include/replication/logicalproto.h | 6 +- src/test/subscription/t/021_column_filter.pl | 116 +++++++++++++++++++ 18 files changed, 499 insertions(+), 72 deletions(-) create mode 100644 src/test/subscription/t/021_column_filter.pl diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 2a2fe03c13..ad04ffe04b 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -141,18 +141,20 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS) * Insert new publication / relation mapping. */ ObjectAddress -publication_add_relation(Oid pubid, Relation targetrel, +publication_add_relation(Oid pubid, PublicationRelationInfo *targetrel, bool if_not_exists) { Relation rel; HeapTuple tup; Datum values[Natts_pg_publication_rel]; bool nulls[Natts_pg_publication_rel]; - Oid relid = RelationGetRelid(targetrel); + Oid relid = RelationGetRelid(targetrel->relation); Oid prrelid; Publication *pub = GetPublication(pubid); ObjectAddress myself, referenced; + ListCell *lc; + List *target_cols = NIL; rel = table_open(PublicationRelRelationId, RowExclusiveLock); @@ -172,10 +174,10 @@ publication_add_relation(Oid pubid, Relation targetrel, ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("relation \"%s\" is already member of publication \"%s\"", - RelationGetRelationName(targetrel), pub->name))); + RelationGetRelationName(targetrel->relation), pub->name))); } - check_publication_add_relation(targetrel); + check_publication_add_relation(targetrel->relation); /* Form a tuple. */ memset(values, 0, sizeof(values)); @@ -188,6 +190,14 @@ publication_add_relation(Oid pubid, Relation targetrel, ObjectIdGetDatum(pubid); values[Anum_pg_publication_rel_prrelid - 1] = ObjectIdGetDatum(relid); + foreach(lc, targetrel->columns) + { + char *colname; + colname = strVal(lfirst(lc)); + target_cols = lappend(target_cols, colname); + } + values[Anum_pg_publication_rel_prattrs - 1] = + PointerGetDatum(strlist_to_textarray(target_cols)); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -209,7 +219,7 @@ publication_add_relation(Oid pubid, Relation targetrel, table_close(rel, RowExclusiveLock); /* Invalidate relcache so that publication info is rebuilt. */ - CacheInvalidateRelcache(targetrel); + CacheInvalidateRelcache(targetrel->relation); return myself; } diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index fdf57f1556..515728df67 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -839,7 +839,6 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("extra data after last expected column"))); - fieldno = 0; /* Loop to read the user attributes on the line. */ diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 8487eeb7e6..aee5645e31 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -393,7 +393,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, foreach(newlc, rels) { - Relation newrel = (Relation) lfirst(newlc); + PublicationRelationInfo *newpubrel = (PublicationRelationInfo *) lfirst(newlc); + Relation newrel = newpubrel->relation; if (RelationGetRelid(newrel) == oldrelid) { @@ -401,13 +402,20 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, break; } } - + /* Not yet in the list, open it and add to the list */ if (!found) { Relation oldrel = table_open(oldrelid, ShareUpdateExclusiveLock); - - delrels = lappend(delrels, oldrel); + /* + * Wrap relation into PublicationRelationInfo + */ + PublicationRelationInfo *pubrel = palloc(sizeof(PublicationRelationInfo)); + pubrel->relation = oldrel; + pubrel->relid = oldrelid; + /* This is not needed to delete a table */ + pubrel->columns = NIL; + delrels = lappend(delrels, pubrel); } } @@ -498,9 +506,9 @@ RemovePublicationRelById(Oid proid) } /* - * Open relations specified by a RangeVar list. - * The returned tables are locked in ShareUpdateExclusiveLock mode in order to - * add them to a publication. + * Open relations specified by a PublicationTable list. + * In the returned list of PublicationRelationInfo, tables are locked + * in ShareUpdateExclusiveLock mode in order to add them to a publication. */ static List * OpenTableList(List *tables) @@ -514,10 +522,12 @@ OpenTableList(List *tables) */ foreach(lc, tables) { - RangeVar *rv = lfirst_node(RangeVar, lc); + PublicationTable *t = lfirst(lc); + RangeVar *rv = castNode(RangeVar, t->relation); bool recurse = rv->inh; Relation rel; Oid myrelid; + PublicationRelationInfo *pub_rel; /* Allow query cancel in case this takes a long time */ CHECK_FOR_INTERRUPTS(); @@ -538,7 +548,11 @@ OpenTableList(List *tables) continue; } - rels = lappend(rels, rel); + pub_rel = palloc(sizeof(PublicationRelationInfo)); + pub_rel->relation = rel; + pub_rel->relid = myrelid; + pub_rel->columns = t->columns; + rels = lappend(rels, pub_rel); relids = lappend_oid(relids, myrelid); /* @@ -571,7 +585,11 @@ OpenTableList(List *tables) /* find_all_inheritors already got lock */ rel = table_open(childrelid, NoLock); - rels = lappend(rels, rel); + pub_rel = palloc(sizeof(PublicationRelationInfo)); + pub_rel->relation = rel; + pub_rel->relid = childrelid; + pub_rel->columns = t->columns; + rels = lappend(rels, pub_rel); relids = lappend_oid(relids, childrelid); } } @@ -592,9 +610,9 @@ CloseTableList(List *rels) foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); + PublicationRelationInfo *pub_rel = (PublicationRelationInfo *)lfirst(lc); - table_close(rel, NoLock); + table_close(pub_rel->relation, NoLock); } } @@ -611,7 +629,8 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); + PublicationRelationInfo *pub_rel = (PublicationRelationInfo *)lfirst(lc); + Relation rel = pub_rel->relation; ObjectAddress obj; /* Must be owner of the table or superuser. */ @@ -619,7 +638,7 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind), RelationGetRelationName(rel)); - obj = publication_add_relation(pubid, rel, if_not_exists); + obj = publication_add_relation(pubid, pub_rel, if_not_exists); if (stmt) { EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, @@ -643,7 +662,8 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); + PublicationRelationInfo *pubrel = (PublicationRelationInfo *) lfirst(lc); + Relation rel = pubrel->relation; Oid relid = RelationGetRelid(rel); prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 29020c908e..0763802502 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4951,6 +4951,16 @@ _copyForeignKeyCacheInfo(const ForeignKeyCacheInfo *from) return newnode; } +static PublicationTable * +_copyPublicationTable(const PublicationTable *from) +{ + PublicationTable *newnode = makeNode(PublicationTable); + + COPY_NODE_FIELD(relation); + COPY_NODE_FIELD(columns); + + return newnode; +} /* * copyObjectImpl -- implementation of copyObject(); see nodes/nodes.h @@ -5866,6 +5876,9 @@ copyObjectImpl(const void *from) case T_PartitionCmd: retval = _copyPartitionCmd(from); break; + case T_PublicationTable: + retval = _copyPublicationTable(from); + break; /* * MISCELLANEOUS NODES diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 8a1762000c..b0f37b2ceb 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -3114,6 +3114,15 @@ _equalValue(const Value *a, const Value *b) return true; } +static bool +_equalPublicationTable(const PublicationTable *a, const PublicationTable *b) +{ + COMPARE_NODE_FIELD(relation); + COMPARE_NODE_FIELD(columns); + + return true; +} + /* * equal * returns whether two nodes are equal @@ -3862,6 +3871,9 @@ equal(const void *a, const void *b) case T_PartitionCmd: retval = _equalPartitionCmd(a, b); break; + case T_PublicationTable: + retval = _equalPublicationTable(a, b); + break; default: elog(ERROR, "unrecognized node type: %d", diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 87561cbb6f..f04eb536c9 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -3821,6 +3821,15 @@ _outPartitionRangeDatum(StringInfo str, const PartitionRangeDatum *node) WRITE_LOCATION_FIELD(location); } +static void +_outPublicationTable(StringInfo str, const PublicationTable *node) +{ + WRITE_NODE_TYPE("PUBLICATIONTABLE"); + + WRITE_NODE_FIELD(relation); + WRITE_NODE_FIELD(columns); +} + /* * outNode - * converts a Node into ascii string and append it to 'str' @@ -4520,6 +4529,9 @@ outNode(StringInfo str, const void *obj) case T_PartitionRangeDatum: _outPartitionRangeDatum(str, obj); break; + case T_PublicationTable: + _outPublicationTable(str, obj); + break; default: diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 77d082d8b4..6b2d8efb01 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -2702,6 +2702,20 @@ _readPartitionRangeDatum(void) READ_DONE(); } +/* + * _readPublicationTable + */ +static PublicationTable * +_readPublicationTable(void) +{ + READ_LOCALS(PublicationTable); + + READ_NODE_FIELD(relation); + READ_NODE_FIELD(columns); + + READ_DONE(); +} + /* * parseNodeString * @@ -2973,6 +2987,8 @@ parseNodeString(void) return_value = _readPartitionBoundSpec(); else if (MATCH("PARTITIONRANGEDATUM", 19)) return_value = _readPartitionRangeDatum(); + else if (MATCH("PUBLICATIONTABLE", 16)) + return_value = _readPublicationTable(); else { elog(ERROR, "badly formatted node string \"%.32s\"...", token); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 39a2849eba..2c9af95db8 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -426,14 +426,14 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); transform_element_list transform_type_list TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list - drop_option_list + drop_option_list publication_table_list %type opt_routine_body %type group_clause %type group_by_list %type group_by_item empty_grouping_set rollup_clause cube_clause %type grouping_sets_clause -%type opt_publication_for_tables publication_for_tables +%type opt_publication_for_tables publication_for_tables publication_table %type opt_fdw_options fdw_options %type fdw_option @@ -9620,7 +9620,7 @@ opt_publication_for_tables: ; publication_for_tables: - FOR TABLE relation_expr_list + FOR TABLE publication_table_list { $$ = (Node *) $3; } @@ -9630,6 +9630,21 @@ publication_for_tables: } ; +publication_table_list: + publication_table + { $$ = list_make1($1); } + | publication_table_list ',' publication_table + { $$ = lappend($1, $3); } + ; + +publication_table: relation_expr opt_column_list + { + PublicationTable *n = makeNode(PublicationTable); + n->relation = $1; + n->columns = $2; + $$ = (Node *) n; + } + ; /***************************************************************************** * @@ -9651,7 +9666,7 @@ AlterPublicationStmt: n->options = $5; $$ = (Node *)n; } - | ALTER PUBLICATION name ADD_P TABLE relation_expr_list + | ALTER PUBLICATION name ADD_P TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; @@ -9659,7 +9674,7 @@ AlterPublicationStmt: n->tableAction = DEFELEM_ADD; $$ = (Node *)n; } - | ALTER PUBLICATION name SET TABLE relation_expr_list + | ALTER PUBLICATION name SET TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; @@ -9667,7 +9682,7 @@ AlterPublicationStmt: n->tableAction = DEFELEM_SET; $$ = (Node *)n; } - | ALTER PUBLICATION name DROP TABLE relation_expr_list + | ALTER PUBLICATION name DROP TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 52b65e9572..8bfecf44ca 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -29,9 +29,9 @@ #define TRUNCATE_CASCADE (1<<0) #define TRUNCATE_RESTART_SEQS (1<<1) -static void logicalrep_write_attrs(StringInfo out, Relation rel); +static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple, bool binary); + HeapTuple tuple, bool binary, Bitmapset *att_map); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -398,7 +398,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) */ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, - HeapTuple newtuple, bool binary) + HeapTuple newtuple, bool binary, Bitmapset *att_map) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -410,7 +410,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, binary, att_map); } /* @@ -442,7 +442,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, - HeapTuple oldtuple, HeapTuple newtuple, bool binary) + HeapTuple oldtuple, HeapTuple newtuple, bool binary, Bitmapset *att_map) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -463,11 +463,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, binary, att_map); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, binary, att_map); } /* @@ -536,7 +536,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, binary, NULL); } /* @@ -651,7 +651,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, * Write relation description to the output stream. */ void -logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) +logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *att_map) { char *relname; @@ -673,7 +673,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) pq_sendbyte(out, rel->rd_rel->relreplident); /* send the attribute info */ - logicalrep_write_attrs(out, rel); + logicalrep_write_attrs(out, rel, att_map); } /* @@ -749,20 +749,37 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary) +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary, + Bitmapset *att_map) { TupleDesc desc; Datum values[MaxTupleAttributeNumber]; bool isnull[MaxTupleAttributeNumber]; int i; uint16 nliveatts = 0; + Bitmapset *idattrs = NULL; + bool replidentfull; + Form_pg_attribute att; desc = RelationGetDescr(rel); + replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); + if (!replidentfull) + idattrs = RelationGetIdentityKeyBitmap(rel); + for (i = 0; i < desc->natts; i++) { + att = TupleDescAttr(desc, i); if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) continue; + /* + * Do not increment count of attributes if not a part of column filters + * except for replica identity columns or if replica identity is full. + */ + if (att_map != NULL && !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, att_map) + && !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, idattrs) + && !replidentfull) + continue; nliveatts++; } pq_sendint16(out, nliveatts); @@ -800,6 +817,16 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar continue; } + /* + * Do not send attribute data if it is not a part of column filters, + * except if it is a part of REPLICA IDENTITY or REPLICA IDENTITY is + * full, send the data. + */ + if (att_map != NULL && !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, att_map) + && !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, idattrs) + && !replidentfull) + continue; + typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid)); if (!HeapTupleIsValid(typtup)) elog(ERROR, "cache lookup failed for type %u", att->atttypid); @@ -904,7 +931,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) * Write relation attribute metadata to the stream. */ static void -logicalrep_write_attrs(StringInfo out, Relation rel) +logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map) { TupleDesc desc; int i; @@ -914,20 +941,34 @@ logicalrep_write_attrs(StringInfo out, Relation rel) desc = RelationGetDescr(rel); + /* fetch bitmap of REPLICATION IDENTITY attributes */ + replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); + if (!replidentfull) + idattrs = RelationGetIdentityKeyBitmap(rel); + /* send number of live attributes */ for (i = 0; i < desc->natts; i++) { - if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped || att->attgenerated) + continue; + /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ + if (replidentfull || bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + idattrs)) + { + nliveatts++; + continue; + } + /* Skip sending if not a part of column filter */ + if (att_map != NULL && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + att_map)) continue; nliveatts++; } pq_sendint16(out, nliveatts); - /* fetch bitmap of REPLICATION IDENTITY attributes */ - replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); - if (!replidentfull) - idattrs = RelationGetIdentityKeyBitmap(rel); - /* send the attributes */ for (i = 0; i < desc->natts; i++) { @@ -937,6 +978,13 @@ logicalrep_write_attrs(StringInfo out, Relation rel) if (att->attisdropped || att->attgenerated) continue; + /* Exlude filtered columns, REPLICA IDENTITY COLUMNS CAN'T BE EXCLUDED */ + if (att_map != NULL && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + att_map) && !bms_is_member(att->attnum + - FirstLowInvalidHeapAttributeNumber, idattrs) + && !replidentfull) + continue; /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ if (replidentfull || bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, @@ -944,7 +992,6 @@ logicalrep_write_attrs(StringInfo out, Relation rel) flags |= LOGICALREP_IS_REPLICA_IDENTITY; pq_sendbyte(out, flags); - /* attribute name */ pq_sendstring(out, NameStr(att->attname)); @@ -953,6 +1000,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel) /* attribute mode */ pq_sendint32(out, att->atttypmod); + } bms_free(idattrs); diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index c37e2a7e29..d7a7b00841 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -354,7 +354,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) attnum = logicalrep_rel_att_by_name(remoterel, NameStr(attr->attname)); - entry->attrmap->attnums[i] = attnum; if (attnum >= 0) missingatts = bms_del_member(missingatts, attnum); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index f07983a43c..f336a384a1 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -111,6 +111,7 @@ #include "replication/origin.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "utils/array.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -695,19 +696,27 @@ fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel) { WalRcvExecResult *res; + WalRcvExecResult *res_pub; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; + TupleTableSlot *slot_pub; + Oid tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID}; Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID}; + Oid pubRow[] = {TEXTARRAYOID}; bool isnull; - int natt; + int natt,i; + Datum *elems; + int nelems; + List *pub_columns = NIL; + ListCell *lc; + bool am_partition = false; lrel->nspname = nspname; lrel->relname = relname; /* First fetch Oid and replica identity. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind" + appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition" " FROM pg_catalog.pg_class c" " INNER JOIN pg_catalog.pg_namespace n" " ON (c.relnamespace = n.oid)" @@ -737,6 +746,7 @@ fetch_remote_table_info(char *nspname, char *relname, Assert(!isnull); lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); Assert(!isnull); + am_partition = DatumGetChar(slot_getattr(slot, 4, &isnull)); ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); @@ -774,11 +784,78 @@ fetch_remote_table_info(char *nspname, char *relname, natt = 0; slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + + /* + * Now, fetch the values of publications' column filters + * For a partition, use pg_inherit to find the parent, + * as the pg_publication_rel contains only the topmost parent + * table entry in case of table partitioning. + * + * XXX Modify the join query to be able to fetch topmost parent, + * Currently it fetches immediate parent of the partition. + */ + resetStringInfo(&cmd); + if (!am_partition) + appendStringInfo(&cmd, "SELECT prattrs from pg_publication_rel" + " WHERE prrelid = %u", lrel->remoteid); + else + appendStringInfo(&cmd, "SELECT prattrs from pg_publication_rel pb, pg_inherits pinh" + " WHERE pb.prrelid = pinh.inhparent AND pinh.inhrelid = %u", lrel->remoteid); + + res_pub = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(pubRow), pubRow); + + if (res_pub->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch published columns info for table \"%s.%s\" from publisher: %s", + nspname, relname, res_pub->err))); + slot_pub = MakeSingleTupleTableSlot(res_pub->tupledesc, &TTSOpsMinimalTuple); + + while (tuplestore_gettupleslot(res_pub->tuplestore, true, false, slot_pub)) + { + deconstruct_array(DatumGetArrayTypePCopy(slot_getattr(slot_pub, 1, &isnull)), + TEXTOID, -1, false, 'i', + &elems, NULL, &nelems); + for (i = 0; i < nelems; i++) + pub_columns = lappend(pub_columns, TextDatumGetCString(elems[i])); + ExecClearTuple(slot_pub); + } + ExecDropSingleTupleTableSlot(slot_pub); + walrcv_clear_result(res_pub); + + /* + * Store the column names only if they are contained in column filter + * LogicalRepRelation will only contain attributes corresponding + * to those specficied in column filters. + */ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { - lrel->attnames[natt] = + char * rel_colname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + bool found = false; Assert(!isnull); + if (pub_columns != NIL) + { + foreach(lc, pub_columns) + { + char *pub_colname = lfirst(lc); + if(!strcmp(pub_colname, rel_colname)) + { + found = true; + lrel->attnames[natt] = rel_colname; + break; + } + } + } + else + { + found = true; + lrel->attnames[natt] = rel_colname; + } + if (!found) + continue; + lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); Assert(!isnull); if (DatumGetBool(slot_getattr(slot, 3, &isnull))) @@ -829,8 +906,17 @@ copy_table(Relation rel) /* Start copy on the publisher. */ initStringInfo(&cmd); if (lrel.relkind == RELKIND_RELATION) - appendStringInfo(&cmd, "COPY %s TO STDOUT", + { + appendStringInfo(&cmd, "COPY %s (", quote_qualified_identifier(lrel.nspname, lrel.relname)); + for (int i = 0; i < lrel.natts; i++) + { + appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); + if (i < lrel.natts - 1) + appendStringInfoString(&cmd, ", "); + } + appendStringInfo(&cmd, ") TO STDOUT"); + } else { /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 14d737fd93..033f36e00c 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -15,12 +15,14 @@ #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_rel_d.h" #include "commands/defrem.h" #include "fmgr.h" #include "replication/logical.h" #include "replication/logicalproto.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "utils/builtins.h" #include "utils/int8.h" #include "utils/inval.h" #include "utils/lsyscache.h" @@ -81,10 +83,12 @@ static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx); + LogicalDecodingContext *ctx, + Bitmapset *att_map); static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); +static Bitmapset* get_table_columnset(Oid relid, List *columns, Bitmapset *att_map); /* * Entry in the map used to remember which relation schemas we sent. @@ -130,6 +134,7 @@ typedef struct RelationSyncEntry * having identical TupleDesc. */ TupleConversionMap *map; + Bitmapset *att_map; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ @@ -570,11 +575,11 @@ maybe_send_schema(LogicalDecodingContext *ctx, } MemoryContextSwitchTo(oldctx); - send_relation_and_attrs(ancestor, xid, ctx); + send_relation_and_attrs(ancestor, xid, ctx, relentry->att_map); RelationClose(ancestor); } - send_relation_and_attrs(relation, xid, ctx); + send_relation_and_attrs(relation, xid, ctx, relentry->att_map); if (in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); @@ -587,7 +592,8 @@ maybe_send_schema(LogicalDecodingContext *ctx, */ static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx) + LogicalDecodingContext *ctx, + Bitmapset *att_map) { TupleDesc desc = RelationGetDescr(relation); int i; @@ -609,14 +615,24 @@ send_relation_and_attrs(Relation relation, TransactionId xid, if (att->atttypid < FirstGenbkiObjectId) continue; - + /* + * Do not send type information if attribute is + * not present in column filter. + * XXX Allow sending type information for REPLICA + * IDENTITY COLUMNS with user created type. + * even when they are not mentioned in column filters. + */ + if (att_map != NULL && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + att_map)) + continue; OutputPluginPrepareWrite(ctx, false); logicalrep_write_typ(ctx->out, xid, att->atttypid); OutputPluginWrite(ctx, false); } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, xid, relation); + logicalrep_write_rel(ctx->out, xid, relation, att_map); OutputPluginWrite(ctx, false); } @@ -690,10 +706,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (relentry->map) tuple = execute_attr_map_tuple(tuple, relentry->map); } - OutputPluginPrepareWrite(ctx, true); logicalrep_write_insert(ctx->out, xid, relation, tuple, - data->binary); + data->binary, relentry->att_map); OutputPluginWrite(ctx, true); break; } @@ -719,10 +734,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, relentry->map); } } - OutputPluginPrepareWrite(ctx, true); logicalrep_write_update(ctx->out, xid, relation, oldtuple, - newtuple, data->binary); + newtuple, data->binary, relentry->att_map); OutputPluginWrite(ctx, true); break; } @@ -1119,6 +1133,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); bool found; + Oid ancestor_id; MemoryContext oldctx; Assert(RelationSyncCache != NULL); @@ -1139,8 +1154,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->publish_as_relid = InvalidOid; - entry->map = NULL; /* will be set by maybe_send_schema() if - * needed */ + entry->att_map = NULL; + entry->map = NULL; /* will be set by maybe_send_schema() if needed */ } /* Validate the entry */ @@ -1171,6 +1186,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) { Publication *pub = lfirst(lc); bool publish = false; + bool ancestor_published = false; if (pub->alltables) { @@ -1181,7 +1197,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) if (!publish) { - bool ancestor_published = false; /* * For a partition, check if any of the ancestors are @@ -1206,6 +1221,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) pub->oid)) { ancestor_published = true; + ancestor_id = ancestor; if (pub->pubviaroot) publish_as_relid = ancestor; } @@ -1224,15 +1240,41 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) if (publish && (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot)) { + int nelems, i; + bool isnull; + Datum *elems; + HeapTuple pub_rel_tuple; + Datum pub_rel_cols; + List *columns = NIL; + + if (ancestor_published) + pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(ancestor_id), + ObjectIdGetDatum(pub->oid)); + else + pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(publish_as_relid), + ObjectIdGetDatum(pub->oid)); + if (HeapTupleIsValid(pub_rel_tuple)) + { + pub_rel_cols = SysCacheGetAttr(PUBLICATIONRELMAP, pub_rel_tuple, Anum_pg_publication_rel_prattrs, &isnull); + if (!isnull) + { + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + deconstruct_array(DatumGetArrayTypePCopy(pub_rel_cols), + TEXTOID, -1, false, 'i', + &elems, NULL, &nelems); + for (i = 0; i < nelems; i++) + columns = lappend(columns, TextDatumGetCString(elems[i])); + entry->att_map = get_table_columnset(publish_as_relid, columns, entry->att_map); + MemoryContextSwitchTo(oldctx); + } + ReleaseSysCache(pub_rel_tuple); + } entry->pubactions.pubinsert |= pub->pubactions.pubinsert; entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } - if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete && entry->pubactions.pubtruncate) - break; } list_free(pubids); @@ -1244,6 +1286,25 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) return entry; } +/* + * Return a bitmapset of attributes given the list of column names + */ +static Bitmapset* +get_table_columnset(Oid relid, List *columns, Bitmapset *att_map) +{ + ListCell *cell; + foreach(cell, columns) + { + const char *attname = lfirst(cell); + int attnum = get_attnum(relid, attname); + + if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, att_map)) + att_map = bms_add_member(att_map, + attnum - FirstLowInvalidHeapAttributeNumber); + } + return att_map; +} + /* * Cleanup list of streamed transactions and update the schema_sent flag. * @@ -1328,6 +1389,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) entry->schema_sent = false; list_free(entry->streamed_txns); entry->streamed_txns = NIL; + bms_free(entry->att_map); + entry->att_map = NULL; if (entry->map) { /* diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index f332bad4d4..7bdc9bb9b8 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -83,6 +83,13 @@ typedef struct Publication PublicationActions pubactions; } Publication; +typedef struct PublicationRelationInfo +{ + Oid relid; + Relation relation; + List *columns; +} PublicationRelationInfo; + extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); extern List *GetRelationPublications(Oid relid); @@ -108,7 +115,7 @@ extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(bool pubviaroot); extern bool is_publishable_relation(Relation rel); -extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, +extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelationInfo *targetrel, bool if_not_exists); extern Oid get_publication_oid(const char *pubname, bool missing_ok); diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index b5d5504cbb..d1d4eec2c0 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -31,6 +31,9 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) Oid oid; /* oid */ Oid prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ Oid prrelid BKI_LOOKUP(pg_class); /* Oid of the relation */ +#ifdef CATALOG_VARLEN + text prattrs[1]; /* Variable length field starts here */ +#endif } FormData_pg_publication_rel; /* ---------------- @@ -40,6 +43,7 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) */ typedef FormData_pg_publication_rel *Form_pg_publication_rel; +DECLARE_TOAST(pg_publication_rel, 8895, 8896); DECLARE_UNIQUE_INDEX_PKEY(pg_publication_rel_oid_index, 6112, PublicationRelObjectIndexId, on pg_publication_rel using btree(oid oid_ops)); DECLARE_UNIQUE_INDEX(pg_publication_rel_prrelid_prpubid_index, 6113, PublicationRelPrrelidPrpubidIndexId, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops)); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 6a4d82f0a8..56d13ff022 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -490,6 +490,7 @@ typedef enum NodeTag T_PartitionRangeDatum, T_PartitionCmd, T_VacuumRelation, + T_PublicationTable, /* * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h) diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index e28248af32..bbdfaa2f45 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3624,6 +3624,12 @@ typedef struct AlterTSConfigurationStmt bool missing_ok; /* for DROP - skip error if missing? */ } AlterTSConfigurationStmt; +typedef struct PublicationTable +{ + NodeTag type; + RangeVar *relation; /* relation to be published */ + List *columns; /* List of columns in a publication table */ +} PublicationTable; typedef struct CreatePublicationStmt { diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 2e29513151..cb47341b6c 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -207,11 +207,11 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin, extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, - bool binary); + bool binary, Bitmapset *att_map); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple, bool binary); + HeapTuple newtuple, bool binary, Bitmapset *att_map); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); @@ -228,7 +228,7 @@ extern List *logicalrep_read_truncate(StringInfo in, extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, - Relation rel); + Relation rel, Bitmapset *att_map); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid); diff --git a/src/test/subscription/t/021_column_filter.pl b/src/test/subscription/t/021_column_filter.pl new file mode 100644 index 0000000000..f78fdbf52f --- /dev/null +++ b/src/test/subscription/t/021_column_filter.pl @@ -0,0 +1,116 @@ +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test TRUNCATE +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 7; + +# setup + +my $node_publisher = PostgresNode->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = PostgresNode->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + qq(max_logical_replication_workers = 6)); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int)"); +# Test with weird column names +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, B varchar, \"c'\" int)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, \"c'\" int)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, b varchar)"); + +#Test create publication with column filtering +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1 FOR TABLE tab1(a, \"B\"), tab3(\"a'\",\"c'\"), test_part(b)"); + +my $result = $node_publisher->safe_psql('postgres', + "select relname, prattrs from pg_publication_rel pb, pg_class pc where pb.prrelid = pc.oid;"); +is($result, qq(tab1|{a,B} +tab3|{a',c'} +test_part|{b}), 'publication relation updated'); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" +); +#Initial sync +$node_publisher->wait_for_catchup('sub1'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1,2,3)"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab3 VALUES (1,2,3)"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_part VALUES (1,'abc', '2021-07-04 12:00:00')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_part VALUES (2,'bcd', '2021-07-03 11:12:13')"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab1"); +is($result, qq(1|2|), 'insert on column c is not replicated'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab3"); +is($result, qq(1|3), 'insert on column b is not replicated'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM test_part"); +is($result, qq(1|abc\n2|bcd), 'insert on all columns is replicated'); + +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET c = 5 where a = 1"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab1"); +is($result, qq(1|2|), 'update on column c is not replicated'); + +#Test alter publication with column filtering +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub1 ADD TABLE tab2(a, b)"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION" +); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 VALUES (1,'abc',3)"); +#sleep(5); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab2"); +is($result, qq(1|abc), 'insert on column c is not replicated'); + +$node_publisher->safe_psql('postgres', + "UPDATE tab2 SET c = 5 where a = 1"); +is($result, qq(1|abc), 'update on column c is not replicated'); -- 2.17.2 (Apple Git-113)