From 6567e49f95823532bc2ceccf87ed570ca4ce398d Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler@timbira.com.br>
Date: Tue, 27 Feb 2018 04:03:13 +0000
Subject: [PATCH 3/3] Row filtering for logical replication

When you define or modify a publication you optionally can filter rows
to be published using a WHERE condition. This condition is any
expression that evaluates to boolean. Only those rows that
satisfy the WHERE condition will be sent to subscribers.
---
 doc/src/sgml/ref/alter_publication.sgml     |  9 ++-
 doc/src/sgml/ref/create_publication.sgml    | 14 ++++-
 src/backend/catalog/pg_publication.c        | 45 +++++++++++--
 src/backend/commands/publicationcmds.c      | 69 ++++++++++++++------
 src/backend/parser/gram.y                   | 26 ++++++--
 src/backend/parser/parse_agg.c              | 10 +++
 src/backend/parser/parse_expr.c             |  5 ++
 src/backend/parser/parse_func.c             |  3 +
 src/backend/replication/logical/worker.c    |  2 +-
 src/backend/replication/pgoutput/pgoutput.c | 98 ++++++++++++++++++++++++++++-
 src/include/catalog/pg_publication.h        |  8 ++-
 src/include/catalog/pg_publication_rel.h    | 11 +++-
 src/include/nodes/nodes.h                   |  1 +
 src/include/nodes/parsenodes.h              | 11 +++-
 src/include/parser/parse_node.h             |  3 +-
 src/include/replication/logicalrelation.h   |  2 +
 src/test/subscription/t/001_rep_changes.pl  | 29 ++++++++-
 17 files changed, 299 insertions(+), 47 deletions(-)

diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index 534e598..dc579b2 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -21,8 +21,8 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
-ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE <replaceable class="parameter">condition</replaceable> ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE <replaceable class="parameter">condition</replaceable> ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_USER | SESSION_USER }
@@ -91,7 +91,10 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
       table name, only that table is affected.  If <literal>ONLY</literal> is not
       specified, the table and all its descendant tables (if any) are
       affected.  Optionally, <literal>*</literal> can be specified after the table
-      name to explicitly indicate that descendant tables are included.
+      name to explicitly indicate that descendant tables are included. If the
+      optional <literal>WHERE</literal> clause is specified, rows that do not
+      satisfy the <replaceable class="parameter">condition</replaceable> will
+      not be published.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index bfe12d5..e42f3d4 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
-    [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+    [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE <replaceable class="parameter">condition</replaceable> ] [, ...]
       | FOR ALL TABLES ]
     [ WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 
@@ -68,7 +68,10 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
       that table is added to the publication.  If <literal>ONLY</literal> is not
       specified, the table and all its descendant tables (if any) are added.
       Optionally, <literal>*</literal> can be specified after the table name to
-      explicitly indicate that descendant tables are included.
+      explicitly indicate that descendant tables are included. If the optional
+      <literal>WHERE</literal> clause is specified, rows that do not satisfy
+      the <replaceable class="parameter">condition</replaceable> will not be
+      published.
      </para>
 
      <para>
@@ -184,6 +187,13 @@ CREATE PUBLICATION mypublication FOR TABLE users, departments;
   </para>
 
   <para>
+   Create a publication that publishes all changes from active departments:
+<programlisting>
+CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE);
+</programlisting>
+  </para>
+
+  <para>
    Create a publication that publishes all changes in all tables:
 <programlisting>
 CREATE PUBLICATION alltables FOR ALL TABLES;
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index ba18258..43d754d 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -34,6 +34,10 @@
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
 
+#include "parser/parse_clause.h"
+#include "parser/parse_collate.h"
+#include "parser/parse_relation.h"
+
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/catcache.h"
@@ -142,18 +146,21 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
  * Insert new publication / relation mapping.
  */
 ObjectAddress
-publication_add_relation(Oid pubid, Relation targetrel,
+publication_add_relation(Oid pubid, PublicationRelationQual *targetrel,
 						 bool if_not_exists)
 {
 	Relation	rel;
 	HeapTuple	tup;
 	Datum		values[Natts_pg_publication_rel];
 	bool		nulls[Natts_pg_publication_rel];
-	Oid			relid = RelationGetRelid(targetrel);
+	Oid			relid = RelationGetRelid(targetrel->relation);
 	Oid			prrelid;
 	Publication *pub = GetPublication(pubid);
 	ObjectAddress myself,
 				referenced;
+	ParseState		*pstate;
+	RangeTblEntry	*rte;
+	Node			*whereclause;
 
 	rel = heap_open(PublicationRelRelationId, RowExclusiveLock);
 
@@ -173,10 +180,26 @@ publication_add_relation(Oid pubid, Relation targetrel,
 		ereport(ERROR,
 				(errcode(ERRCODE_DUPLICATE_OBJECT),
 				 errmsg("relation \"%s\" is already member of publication \"%s\"",
-						RelationGetRelationName(targetrel), pub->name)));
+						RelationGetRelationName(targetrel->relation), pub->name)));
 	}
 
-	check_publication_add_relation(targetrel);
+	check_publication_add_relation(targetrel->relation);
+
+	/* Set up a pstate to parse with */
+	pstate = make_parsestate(NULL);
+	pstate->p_sourcetext = nodeToString(targetrel->whereClause);
+
+	rte = addRangeTableEntryForRelation(pstate, targetrel->relation,
+										NULL, false, false);
+	addRTEtoQuery(pstate, rte, false, true, true);
+
+	whereclause = transformWhereClause(pstate,
+								copyObject(targetrel->whereClause),
+								EXPR_KIND_PUBLICATION_WHERE,
+								"PUBLICATION");
+
+	/* Fix up collation information */
+	assign_expr_collations(pstate, whereclause);
 
 	/* Form a tuple. */
 	memset(values, 0, sizeof(values));
@@ -187,6 +210,12 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	values[Anum_pg_publication_rel_prrelid - 1] =
 		ObjectIdGetDatum(relid);
 
+	/* Add row filter, if available */
+	if (whereclause)
+		values[Anum_pg_publication_rel_prrowfilter - 1] = CStringGetTextDatum(nodeToString(whereclause));
+	else
+		nulls[Anum_pg_publication_rel_prrowfilter - 1] = true;
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -203,11 +232,17 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	ObjectAddressSet(referenced, RelationRelationId, relid);
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
 
+	/* Add dependency on the objects mentioned in the row filter expression */
+	if (whereclause)
+		recordDependencyOnExpr(&myself, whereclause, pstate->p_rtable, DEPENDENCY_NORMAL);
+
+	free_parsestate(pstate);
+
 	/* Close the table. */
 	heap_close(rel, RowExclusiveLock);
 
 	/* Invalidate relcache so that publication info is rebuilt. */
-	CacheInvalidateRelcache(targetrel);
+	CacheInvalidateRelcache(targetrel->relation);
 
 	return myself;
 }
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 9c5aa9e..96347bb 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -324,6 +324,27 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 	Assert(list_length(stmt->tables) > 0);
 
+	/*
+	 * ALTER PUBLICATION ... DROP TABLE cannot contain a WHERE clause.  Use
+	 * publication_table_list node (that accepts a WHERE clause) but forbid the
+	 * WHERE clause in it.  The use of relation_expr_list node just for the
+	 * DROP TABLE part does not worth the trouble.
+	 */
+	if (stmt->tableAction == DEFELEM_DROP)
+	{
+		ListCell	*lc;
+
+		foreach(lc, stmt->tables)
+		{
+			PublicationTable *t = lfirst(lc);
+			if (t->whereClause)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("cannot use a WHERE clause for removing table from publication \"%s\"",
+								NameStr(pubform->pubname))));
+		}
+	}
+
 	rels = OpenTableList(stmt->tables);
 
 	if (stmt->tableAction == DEFELEM_ADD)
@@ -345,9 +366,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 			foreach(newlc, rels)
 			{
-				Relation	newrel = (Relation) lfirst(newlc);
+				PublicationRelationQual	*newrel = (PublicationRelationQual *) lfirst(newlc);
 
-				if (RelationGetRelid(newrel) == oldrelid)
+				if (RelationGetRelid(newrel->relation) == oldrelid)
 				{
 					found = true;
 					break;
@@ -356,7 +377,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 			if (!found)
 			{
-				Relation	oldrel = heap_open(oldrelid,
+				PublicationRelationQual *oldrel = palloc(sizeof(PublicationRelationQual));
+				oldrel->relation = heap_open(oldrelid,
 											   ShareUpdateExclusiveLock);
 
 				delrels = lappend(delrels, oldrel);
@@ -479,16 +501,18 @@ OpenTableList(List *tables)
 	List	   *relids = NIL;
 	List	   *rels = NIL;
 	ListCell   *lc;
+	PublicationRelationQual	*relqual;
 
 	/*
 	 * Open, share-lock, and check all the explicitly-specified relations
 	 */
 	foreach(lc, tables)
 	{
-		RangeVar   *rv = lfirst(lc);
-		Relation	rel;
-		bool		recurse = rv->inh;
-		Oid			myrelid;
+		PublicationTable	*t = lfirst(lc);
+		RangeVar  			*rv = t->relation;
+		Relation			rel;
+		bool				recurse = rv->inh;
+		Oid					myrelid;
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -507,7 +531,10 @@ OpenTableList(List *tables)
 			heap_close(rel, ShareUpdateExclusiveLock);
 			continue;
 		}
-		rels = lappend(rels, rel);
+		relqual = palloc(sizeof(PublicationRelationQual));
+		relqual->relation = rel;
+		relqual->whereClause = t->whereClause;
+		rels = lappend(rels, relqual);
 		relids = lappend_oid(relids, myrelid);
 
 		if (recurse)
@@ -537,7 +564,11 @@ OpenTableList(List *tables)
 
 				/* find_all_inheritors already got lock */
 				rel = heap_open(childrelid, NoLock);
-				rels = lappend(rels, rel);
+				relqual = palloc(sizeof(PublicationRelationQual));
+				relqual->relation = rel;
+				/* child inherits WHERE clause from parent */
+				relqual->whereClause = t->whereClause;
+				rels = lappend(rels, relqual);
 				relids = lappend_oid(relids, childrelid);
 			}
 		}
@@ -558,10 +589,12 @@ CloseTableList(List *rels)
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		PublicationRelationQual	*rel = (PublicationRelationQual *) lfirst(lc);
 
-		heap_close(rel, NoLock);
+		heap_close(rel->relation, NoLock);
 	}
+
+	list_free_deep(rels);
 }
 
 /*
@@ -577,13 +610,13 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		PublicationRelationQual	*rel = (PublicationRelationQual *) lfirst(lc);
 		ObjectAddress obj;
 
 		/* Must be owner of the table or superuser. */
-		if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
-			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
-						   RelationGetRelationName(rel));
+		if (!pg_class_ownercheck(RelationGetRelid(rel->relation), GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->relation->rd_rel->relkind),
+						   RelationGetRelationName(rel->relation));
 
 		obj = publication_add_relation(pubid, rel, if_not_exists);
 		if (stmt)
@@ -609,8 +642,8 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
-		Oid			relid = RelationGetRelid(rel);
+		PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc);
+		Oid			relid = RelationGetRelid(rel->relation);
 
 		prid = GetSysCacheOid2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
 							   ObjectIdGetDatum(pubid));
@@ -622,7 +655,7 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 			ereport(ERROR,
 					(errcode(ERRCODE_UNDEFINED_OBJECT),
 					 errmsg("relation \"%s\" is not part of the publication",
-							RelationGetRelationName(rel))));
+							RelationGetRelationName(rel->relation))));
 		}
 
 		ObjectAddressSet(obj, PublicationRelRelationId, prid);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index bf32362..94cdd7d 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -396,13 +396,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 				relation_expr_list dostmt_opt_list
 				transform_element_list transform_type_list
 				TriggerTransitions TriggerReferencing
-				publication_name_list
+				publication_name_list publication_table_list
 				vacuum_relation_list opt_vacuum_relation_list
 
 %type <list>	group_by_list
 %type <node>	group_by_item empty_grouping_set rollup_clause cube_clause
 %type <node>	grouping_sets_clause
-%type <node>	opt_publication_for_tables publication_for_tables
+%type <node>	opt_publication_for_tables publication_for_tables publication_table_elem
 %type <value>	publication_name_item
 
 %type <list>	opt_fdw_options fdw_options
@@ -9520,7 +9520,7 @@ opt_publication_for_tables:
 		;
 
 publication_for_tables:
-			FOR TABLE relation_expr_list
+			FOR TABLE publication_table_list
 				{
 					$$ = (Node *) $3;
 				}
@@ -9551,7 +9551,7 @@ AlterPublicationStmt:
 					n->options = $5;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name ADD_P TABLE relation_expr_list
+			| ALTER PUBLICATION name ADD_P TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9559,7 +9559,7 @@ AlterPublicationStmt:
 					n->tableAction = DEFELEM_ADD;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name SET TABLE relation_expr_list
+			| ALTER PUBLICATION name SET TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9567,7 +9567,7 @@ AlterPublicationStmt:
 					n->tableAction = DEFELEM_SET;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name DROP TABLE relation_expr_list
+			| ALTER PUBLICATION name DROP TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9577,6 +9577,20 @@ AlterPublicationStmt:
 				}
 		;
 
+publication_table_list:
+			publication_table_elem									{ $$ = list_make1($1); }
+			| publication_table_list ',' publication_table_elem		{ $$ = lappend($1, $3); }
+		;
+
+publication_table_elem: relation_expr OptWhereClause
+				{
+					PublicationTable *n = makeNode(PublicationTable);
+					n->relation = $1;
+					n->whereClause = $2;
+					$$ = (Node *) n;
+				}
+		;
+
 /*****************************************************************************
  *
  * CREATE SUBSCRIPTION name ...
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index 377a7ed..7e1c3d8 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -522,6 +522,13 @@ check_agglevels_and_constraints(ParseState *pstate, Node *expr)
 				err = _("grouping operations are not allowed in CALL arguments");
 
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			if (isAgg)
+				err = _("aggregate functions are not allowed in publication WHERE conditions");
+			else
+				err = _("grouping operations are not allowed in publication WHERE conditions");
+
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
@@ -902,6 +909,9 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
 		case EXPR_KIND_CALL_ARGUMENT:
 			err = _("window functions are not allowed in CALL arguments");
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("window functions are not allowed in publication WHERE conditions");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c
index 385e54a..7bd1695 100644
--- a/src/backend/parser/parse_expr.c
+++ b/src/backend/parser/parse_expr.c
@@ -1849,6 +1849,9 @@ transformSubLink(ParseState *pstate, SubLink *sublink)
 		case EXPR_KIND_CALL_ARGUMENT:
 			err = _("cannot use subquery in CALL argument");
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("cannot use subquery in publication WHERE condition");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
@@ -3475,6 +3478,8 @@ ParseExprKindName(ParseExprKind exprKind)
 			return "PARTITION BY";
 		case EXPR_KIND_CALL_ARGUMENT:
 			return "CALL";
+		case EXPR_KIND_PUBLICATION_WHERE:
+			return "publication WHERE";
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 2a4ac09..8e9cc58 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -2293,6 +2293,9 @@ check_srf_call_placement(ParseState *pstate, Node *last_srf, int location)
 		case EXPR_KIND_CALL_ARGUMENT:
 			err = _("set-returning functions are not allowed in CALL arguments");
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("set-returning functions are not allowed in publication WHERE conditions");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 6820c1a..fe0a6ca 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -182,7 +182,7 @@ ensure_transaction(void)
  *
  * This is based on similar code in copy.c
  */
-static EState *
+EState *
 create_estate_for_relation(Relation rel)
 {
 	EState	   *estate;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index d538f25..30bdefa 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -12,13 +12,23 @@
  */
 #include "postgres.h"
 
+#include "catalog/pg_type.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel.h"
+
+#include "executor/executor.h"
+#include "nodes/execnodes.h"
+#include "nodes/nodeFuncs.h"
+#include "optimizer/planner.h"
+#include "parser/parse_coerce.h"
 
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
+#include "replication/logicalrelation.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
 
+#include "utils/builtins.h"
 #include "utils/inval.h"
 #include "utils/int8.h"
 #include "utils/lsyscache.h"
@@ -56,6 +66,7 @@ typedef struct RelationSyncEntry
 	bool		schema_sent;	/* did we send the schema? */
 	bool		replicate_valid;
 	PublicationActions pubactions;
+	List		*row_filter;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -286,6 +297,62 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	/* ... then check row filter */
+	if (list_length(relentry->row_filter) > 0)
+	{
+		HeapTuple		old_tuple;
+		HeapTuple		new_tuple;
+		TupleDesc		tupdesc;
+		EState			*estate;
+		ExprContext		*ecxt;
+		MemoryContext	oldcxt;
+		ListCell		*lc;
+
+		old_tuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL;
+		new_tuple = change->data.tp.newtuple ? &change->data.tp.newtuple->tuple : NULL;
+		tupdesc = RelationGetDescr(relation);
+		estate = create_estate_for_relation(relation);
+
+		/* prepare context per tuple */
+		ecxt = GetPerTupleExprContext(estate);
+		oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
+		ecxt->ecxt_scantuple = ExecInitExtraTupleSlot(estate, tupdesc);
+		MemoryContextSwitchTo(oldcxt);
+
+		ExecStoreTuple(new_tuple ? new_tuple : old_tuple, ecxt->ecxt_scantuple, InvalidBuffer, false);
+
+		foreach (lc, relentry->row_filter)
+		{
+			Node		*row_filter;
+			ExprState	*expr_state;
+			Expr		*expr;
+			Oid			expr_type;
+			Datum		res;
+			bool		isnull;
+			char		*s = NULL;
+
+			row_filter = (Node *) lfirst(lc);
+
+			/* evaluates row filter */
+			expr_type = exprType(row_filter);
+			expr = (Expr *) coerce_to_target_type(NULL, row_filter, expr_type, BOOLOID, -1, COERCION_ASSIGNMENT, COERCE_IMPLICIT_CAST, -1);
+			expr = expression_planner(expr);
+			expr_state = ExecInitExpr(expr, NULL);
+			res = ExecEvalExpr(expr_state, ecxt, &isnull);
+
+			/* if tuple does not match row filter, bail out */
+			if (!DatumGetBool(res) || isnull)
+				return;
+
+			s = nodeToString(row_filter);
+			elog(DEBUG2, "filter \"%s\" was matched", s);
+			pfree(s);
+		}
+
+		ExecDropSingleTupleTableSlot(ecxt->ecxt_scantuple);
+		FreeExecutorState(estate);
+	}
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -506,10 +573,14 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		 */
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = false;
+		entry->row_filter = NIL;
 
 		foreach(lc, data->publications)
 		{
 			Publication *pub = lfirst(lc);
+			HeapTuple	rf_tuple;
+			Datum		rf_datum;
+			bool		rf_isnull;
 
 			/*
 			 * Skip tables that look like they are from a heap rewrite (see
@@ -543,9 +614,25 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 			}
 
-			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete)
-				break;
+			/* Cache row filters, if available */
+			rf_tuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(pub->oid));
+			if (HeapTupleIsValid(rf_tuple))
+			{
+				rf_datum = SysCacheGetAttr(PUBLICATIONRELMAP, rf_tuple, Anum_pg_publication_rel_prrowfilter, &rf_isnull);
+
+				if (!rf_isnull)
+				{
+					MemoryContext oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+					char	*s = TextDatumGetCString(rf_datum);
+					Node	*rf_node = stringToNode(s);
+					entry->row_filter = lappend(entry->row_filter, rf_node);
+					MemoryContextSwitchTo(oldctx);
+
+					elog(DEBUG2, "row filter \"%s\" found for publication \"%s\" and relation \"%s\"", s, pub->name, get_rel_name(relid));
+				}
+
+				ReleaseSysCache(rf_tuple);
+			}
 		}
 
 		list_free(pubids);
@@ -620,5 +707,10 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	 */
 	hash_seq_init(&status, RelationSyncCache);
 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+	{
 		entry->replicate_valid = false;
+		if (list_length(entry->row_filter) > 0)
+			list_free(entry->row_filter);
+		entry->row_filter = NIL;
+	}
 }
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 37e77b8..28962e6 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -86,6 +86,12 @@ typedef struct Publication
 	PublicationActions pubactions;
 } Publication;
 
+typedef struct PublicationRelationQual
+{
+	Relation	relation;
+	Node		*whereClause;
+} PublicationRelationQual;
+
 extern Publication *GetPublication(Oid pubid);
 extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
 extern List *GetRelationPublications(Oid relid);
@@ -94,7 +100,7 @@ extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(void);
 
 extern bool is_publishable_relation(Relation rel);
-extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
+extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelationQual *targetrel,
 						 bool if_not_exists);
 
 extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index 033b600..585f855 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -29,8 +29,12 @@
 
 CATALOG(pg_publication_rel,6106)
 {
-	Oid			prpubid;		/* Oid of the publication */
-	Oid			prrelid;		/* Oid of the relation */
+	Oid				prpubid;		/* Oid of the publication */
+	Oid				prrelid;		/* Oid of the relation */
+
+#ifdef	CATALOG_VARLEN				/* variable-length fields start here */
+	pg_node_tree	prrowfilter;	/* nodeToString representation of row filter */
+#endif
 } FormData_pg_publication_rel;
 
 /* ----------------
@@ -45,8 +49,9 @@ typedef FormData_pg_publication_rel *Form_pg_publication_rel;
  * ----------------
  */
 
-#define Natts_pg_publication_rel				2
+#define Natts_pg_publication_rel				3
 #define Anum_pg_publication_rel_prpubid			1
 #define Anum_pg_publication_rel_prrelid			2
+#define	Anum_pg_publication_rel_prrowfilter		3
 
 #endif							/* PG_PUBLICATION_REL_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 74b094a..499d839 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -471,6 +471,7 @@ typedef enum NodeTag
 	T_PartitionRangeDatum,
 	T_PartitionCmd,
 	T_VacuumRelation,
+	T_PublicationTable,
 
 	/*
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index ac292bc..9800acf 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3419,12 +3419,19 @@ typedef struct AlterTSConfigurationStmt
 } AlterTSConfigurationStmt;
 
 
+typedef struct PublicationTable
+{
+	NodeTag		type;
+	RangeVar	*relation;		/* relation to be published */
+	Node		*whereClause;	/* qualifications */
+} PublicationTable;
+
 typedef struct CreatePublicationStmt
 {
 	NodeTag		type;
 	char	   *pubname;		/* Name of of the publication */
 	List	   *options;		/* List of DefElem nodes */
-	List	   *tables;			/* Optional list of tables to add */
+	List	   *tables;			/* Optional list of PublicationTable to add */
 	bool		for_all_tables; /* Special publication for all tables in db */
 } CreatePublicationStmt;
 
@@ -3437,7 +3444,7 @@ typedef struct AlterPublicationStmt
 	List	   *options;		/* List of DefElem nodes */
 
 	/* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */
-	List	   *tables;			/* List of tables to add/drop */
+	List	   *tables;			/* List of PublicationTable to add/drop */
 	bool		for_all_tables; /* Special publication for all tables in db */
 	DefElemAction tableAction;	/* What action to perform with the tables */
 } AlterPublicationStmt;
diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h
index 0230543..8e3c735 100644
--- a/src/include/parser/parse_node.h
+++ b/src/include/parser/parse_node.h
@@ -69,7 +69,8 @@ typedef enum ParseExprKind
 	EXPR_KIND_TRIGGER_WHEN,		/* WHEN condition in CREATE TRIGGER */
 	EXPR_KIND_POLICY,			/* USING or WITH CHECK expr in policy */
 	EXPR_KIND_PARTITION_EXPRESSION, /* PARTITION BY expression */
-	EXPR_KIND_CALL_ARGUMENT		/* procedure argument in CALL */
+	EXPR_KIND_CALL_ARGUMENT,	/* procedure argument in CALL */
+	EXPR_KIND_PUBLICATION_WHERE	/* WHERE condition for a table in PUBLICATION */
 } ParseExprKind;
 
 
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index d4250c2..32f1312 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -39,4 +39,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
 extern Oid	logicalrep_typmap_getid(Oid remoteid);
 
+extern EState *create_estate_for_relation(Relation rel);
+
 #endif							/* LOGICALRELATION_H */
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index e0104cd..d40ae03 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 16;
+use Test::More tests => 17;
 
 # Initialize publisher node
 my $node_publisher = get_new_node('publisher');
@@ -31,6 +31,8 @@ $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab_mixed (a int primary key, b text)");
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_mixed (a, b) VALUES (1, 'foo')");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter (a int primary key, b text)");
 
 # Setup structure on subscriber
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)");
@@ -39,6 +41,8 @@ $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int)");
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
 $node_subscriber->safe_psql('postgres',
 	"CREATE TABLE tab_rep (a int primary key)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter (a int primary key, b text)");
 
 # different column count and order than on publisher
 $node_subscriber->safe_psql('postgres',
@@ -54,10 +58,12 @@ $node_publisher->safe_psql('postgres',
 );
 $node_publisher->safe_psql('postgres',
 	"ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins");
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_row_filter FOR TABLE tab_rowfilter WHERE (a > 1000 AND b <> 'filtered')");
 
 my $appname = 'tap_sub';
 $node_subscriber->safe_psql('postgres',
-"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub, tap_pub_ins_only"
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub, tap_pub_ins_only, tap_pub_row_filter"
 );
 
 $node_publisher->wait_for_catchup($appname);
@@ -76,6 +82,25 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins");
 is($result, qq(1002), 'check initial data was copied to subscriber');
 
+# row filter
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter (a, b) VALUES (1, 'not replicated')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter (a, b) VALUES (1500, 'filtered')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter (a, b) VALUES (1980, 'not filtered')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter (a, b) SELECT x, 'test ' || x FROM generate_series(990,1003) x");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab_rowfilter");
+is($result, qq(1980|not filtered
+1001|test 1001
+1002|test 1002
+1003|test 1003), 'check initial data was copied to subscriber');
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_ins SELECT generate_series(1,50)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20");
-- 
2.7.4

