diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index ff8513e2d29..41c5e3413f6 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -33,7 +33,9 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel,
 								   Bitmapset *columns);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
 								   TupleTableSlot *slot,
-								   bool binary, Bitmapset *columns);
+								   bool binary,
+								   Bitmapset *schema_columns,
+								   Bitmapset *columns);
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
 
@@ -412,7 +414,8 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  */
 void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
-						TupleTableSlot *newslot, bool binary, Bitmapset *columns)
+						TupleTableSlot *newslot, bool binary,
+						Bitmapset *schema_columns, Bitmapset *columns)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
@@ -424,7 +427,8 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newslot, binary, columns);
+	logicalrep_write_tuple(out, rel, newslot, binary,
+						   schema_columns, columns);
 }
 
 /*
@@ -457,7 +461,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
 void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 						TupleTableSlot *oldslot, TupleTableSlot *newslot,
-						bool binary, Bitmapset *columns)
+						bool binary, Bitmapset *schema_columns,
+						Bitmapset *columns)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
@@ -478,11 +483,12 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldslot, binary, NULL);
+		logicalrep_write_tuple(out, rel, oldslot, binary, NULL, NULL);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newslot, binary, columns);
+	logicalrep_write_tuple(out, rel, newslot, binary,
+						   schema_columns, columns);
 }
 
 /*
@@ -551,7 +557,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldslot, binary, NULL);
+	logicalrep_write_tuple(out, rel, oldslot, binary, NULL, NULL);
 }
 
 /*
@@ -766,7 +772,8 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  */
 static void
 logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
-					   bool binary, Bitmapset *columns)
+					   bool binary,
+					   Bitmapset *schema_columns, Bitmapset *columns)
 {
 	TupleDesc	desc;
 	Datum	   *values;
@@ -783,7 +790,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
 		if (att->attisdropped || att->attgenerated)
 			continue;
 
-		if (!column_in_column_list(att->attnum, columns))
+		if (!column_in_column_list(att->attnum, schema_columns))
 			continue;
 
 		nliveatts++;
@@ -804,10 +811,23 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
 		if (att->attisdropped || att->attgenerated)
 			continue;
 
-		if (!column_in_column_list(att->attnum, columns))
+		/*
+		 * Columns that are not in schema (union of column lists) should
+		 * be skipped entirely.
+		 */
+		if (!column_in_column_list(att->attnum, schema_columns))
 			continue;
 
-		if (isnull[i])
+		/*
+		 * Columns not in the column list (derived consindering row filters)
+		 * we just send NULL.
+		 *
+		 * XXX Not sure this is quite correct, though. Imagine you replicate
+		 * values for columns (A,B), but it changes the row filter. Can we
+		 * send NULL that would overwrite "proper" value replicated earlier?
+		 */
+		if (isnull[i] ||
+			!column_in_column_list(att->attnum, columns))
 		{
 			pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
 			continue;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 49ceec3bdc8..fd547e16f4a 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -126,6 +126,20 @@ static bool FetchTableStates(bool *started_tx);
 
 StringInfo	copybuf = NULL;
 
+/*
+ * Info use to track and evaluate row filters for each publication the relation
+ * is included in, and calculat ethe column list.
+ */
+typedef struct PublicationInfo {
+
+	/* row filter (expression state) */
+	Node	   *rowfilter;
+
+	/* column list */
+	Bitmapset  *columns;
+
+} PublicationInfo;
+
 /*
  * Exit routine for synchronization worker.
  */
@@ -696,14 +710,14 @@ copy_read_data(void *outbuf, int minread, int maxread)
  */
 static void
 fetch_remote_table_info(char *nspname, char *relname,
-						LogicalRepRelation *lrel, List **qual)
+						LogicalRepRelation *lrel, List **pubinfos)
 {
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
 	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID};
 	Oid			attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
-	Oid			qualRow[] = {TEXTOID};
+	Oid			qualRow[] = {TEXTOID, INT2VECTOROID};
 	bool		isnull;
 	int			natt;
 	ListCell   *lc;
@@ -878,6 +892,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	/* We don't know the number of rows coming, so allocate enough space. */
 	lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
 	lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
+	lrel->attnums = palloc0(MaxTupleAttributeNumber * sizeof(int16));
 	lrel->attkeys = NULL;
 
 	/*
@@ -905,6 +920,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 		Assert(!isnull);
 
 		lrel->attnames[natt] = rel_colname;
+		lrel->attnums[natt] = attnum;
 		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
 		Assert(!isnull);
 
@@ -943,6 +959,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	 * 3) one of the subscribed publications is declared as ALL TABLES IN
 	 * SCHEMA that includes this relation
 	 */
+	*pubinfos = NIL;
 	if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
 	{
 		StringInfoData pub_names;
@@ -965,7 +982,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 		/* Check for row filters. */
 		resetStringInfo(&cmd);
 		appendStringInfo(&cmd,
-						 "SELECT DISTINCT pg_get_expr(pr.prqual, pr.prrelid)"
+						 "SELECT DISTINCT pg_get_expr(pr.prqual, pr.prrelid), pr.prattrs"
 						 "  FROM pg_publication p"
 						 "  LEFT OUTER JOIN pg_publication_rel pr"
 						 "       ON (p.oid = pr.prpubid AND pr.prrelid = %u),"
@@ -976,7 +993,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 						 lrel->remoteid,
 						 pub_names.data);
 
-		res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
+		res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, lengthof(qualRow), qualRow);
 
 		if (res->status != WALRCV_OK_TUPLES)
 			ereport(ERROR,
@@ -993,21 +1010,31 @@ fetch_remote_table_info(char *nspname, char *relname,
 		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 		while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 		{
+			PublicationInfo *pubinfo;
 			Datum		rf = slot_getattr(slot, 1, &isnull);
+			Datum		cl;
+
+			pubinfo = (PublicationInfo	*) palloc0(sizeof(PublicationInfo));
+
+			if (!isnull)
+				pubinfo->rowfilter = (Node *) makeString(TextDatumGetCString(rf));
+
+			cl = slot_getattr(slot, 2, &isnull);
 
 			if (!isnull)
-				*qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
-			else
 			{
-				/* Ignore filters and cleanup as necessary. */
-				if (*qual)
+				int i;
+				int2vector *prattrs = (int2vector *) cl;
+
+				for (i = 0; i < prattrs->dim1; i++)
 				{
-					list_free_deep(*qual);
-					*qual = NIL;
+					pubinfo->columns = bms_add_member(pubinfo->columns,
+													  prattrs->values[i]);
 				}
-				break;
 			}
 
+			*pubinfos = lappend(*pubinfos, pubinfo);
+
 			ExecClearTuple(slot);
 		}
 		ExecDropSingleTupleTableSlot(slot);
@@ -1028,7 +1055,7 @@ copy_table(Relation rel)
 {
 	LogicalRepRelMapEntry *relmapentry;
 	LogicalRepRelation lrel;
-	List	   *qual = NIL;
+	List	   *pubinfos = NIL;
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	CopyFromState cstate;
@@ -1037,7 +1064,7 @@ copy_table(Relation rel)
 
 	/* Get the publisher relation info. */
 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
-							RelationGetRelationName(rel), &lrel, &qual);
+							RelationGetRelationName(rel), &lrel, &pubinfos);
 
 	/* Put the relation into relmap. */
 	logicalrep_relmap_update(&lrel);
@@ -1050,7 +1077,9 @@ copy_table(Relation rel)
 	initStringInfo(&cmd);
 
 	/* Regular table with no row filter */
-	if (lrel.relkind == RELKIND_RELATION && qual == NIL)
+	/* FIXME pubinfos is never NULL now, need to detect absence of row filters
+	 * in a different way */
+	if (lrel.relkind == RELKIND_RELATION && pubinfos == NIL)
 	{
 		appendStringInfo(&cmd, "COPY %s (",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
@@ -1076,11 +1105,55 @@ copy_table(Relation rel)
 		 * (SELECT ...), but we can't just do SELECT * because we need to not
 		 * copy generated columns. For tables with any row filters, build a
 		 * SELECT query with OR'ed row filters for COPY.
+		 *
+		 * FIXME can be simplified if all subscriptions have the same column
+		 * list (or no column list), in which case we don't need the CASE
+		 * expressions at all.
 		 */
 		appendStringInfoString(&cmd, "COPY (SELECT ");
 		for (int i = 0; i < lrel.natts; i++)
 		{
-			appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+			ListCell   *lc;
+			StringInfoData qual;
+			bool		no_filter = false;
+			bool		is_first = true;
+
+			initStringInfo(&qual);
+
+			/* find all row filters for the column, combine them using OR */
+			foreach (lc, pubinfos)
+			{
+				PublicationInfo *pubinfo = (PublicationInfo *) lfirst(lc);
+
+				/* not included in this publication column list */
+				if (pubinfo->columns != NULL &&
+					!bms_is_member(lrel.attnums[i], pubinfo->columns))
+					continue;
+
+				/* covered by this publication, is there an expression? */
+				if (pubinfo->rowfilter == NULL)
+				{
+					no_filter = true;
+					break;
+				}
+
+				if (is_first)
+				{
+					appendStringInfo(&qual, "%s", strVal(pubinfo->rowfilter));
+					is_first = false;
+				}
+				else
+					appendStringInfo(&qual, " OR %s", strVal(pubinfo->rowfilter));
+			}
+
+			if (no_filter)
+				appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+			else
+				appendStringInfo(&cmd, "(CASE WHEN (%s) THEN %s ELSE NULL END) AS %s",
+								 qual.data,
+								 quote_identifier(lrel.attnames[i]),
+								 quote_identifier(lrel.attnames[i]));
+			
 			if (i < lrel.natts - 1)
 				appendStringInfoString(&cmd, ", ");
 		}
@@ -1095,6 +1168,24 @@ copy_table(Relation rel)
 			appendStringInfoString(&cmd, "ONLY ");
 
 		appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
+
+		{
+			List *qual = NIL;
+			ListCell *lc;
+
+			foreach (lc, pubinfos)
+			{
+				PublicationInfo *pubinfo = (PublicationInfo *) lfirst(lc);
+
+				if (pubinfo->rowfilter == NULL)
+				{
+					qual = NIL;
+					break;
+				}
+
+				qual = lappend(qual, pubinfo->rowfilter);
+			}
+
 		/* list of OR'ed filters */
 		if (qual != NIL)
 		{
@@ -1110,6 +1201,8 @@ copy_table(Relation rel)
 			list_free_deep(qual);
 		}
 
+		}
+
 		appendStringInfoString(&cmd, ") TO STDOUT");
 	}
 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index b197bfd565d..85456e6d9f5 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -137,13 +137,13 @@ typedef struct RelationSyncEntry
 	PublicationActions pubactions;
 
 	/*
-	 * ExprState array for row filter. Different publication actions don't
-	 * allow multiple expressions to always be combined into one, because
-	 * updates or deletes restrict the column in expression to be part of the
-	 * replica identity index whereas inserts do not have this restriction, so
-	 * there is one ExprState per publication action.
+	 * Info about row filters and column lists for each publication this
+	 * relation is included in. We keep a list with per-publication info in
+	 * order to calculate appropriate column list depending on which row
+	 * filter(s) match. Each element contains an ExprState for the filter
+	 * and column list.
 	 */
-	ExprState  *exprstate[NUM_ROWFILTER_PUBACTIONS];
+	dlist_head	pubinfos;		/* one per publication */
 	EState	   *estate;			/* executor state used for row filter */
 	TupleTableSlot *new_slot;	/* slot for storing new tuple */
 	TupleTableSlot *old_slot;	/* slot for storing old tuple */
@@ -208,6 +208,29 @@ typedef struct PGOutputTxnData
 								 * been sent */
 }		PGOutputTxnData;
 
+/*
+ * Info use to track and evaluate row filters for each publication the relation
+ * is included in, and calculat ethe column list.
+ */
+typedef struct PublicationInfo {
+
+	/* doubly-linked list */
+	dlist_node	node;
+
+	/* publication OID (XXX not really needed) */
+	Oid			oid;
+
+	/* row filter (expression state) */
+	ExprState  *rowfilter;
+
+	/* column list */
+	Bitmapset  *columns;
+
+	/* actions published by the publication */
+	PublicationActions	pubactions;
+
+} PublicationInfo;
+
 /* Map used to remember which relation schemas we sent. */
 static HTAB *RelationSyncCache = NULL;
 
@@ -235,12 +258,8 @@ static bool pgoutput_row_filter_exec_expr(ExprState *state,
 static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
 								TupleTableSlot **new_slot_ptr,
 								RelationSyncEntry *entry,
-								ReorderBufferChangeType *action);
-
-/* column list routines */
-static void pgoutput_column_list_init(PGOutputData *data,
-									  List *publications,
-									  RelationSyncEntry *entry);
+								ReorderBufferChangeType *action,
+								Bitmapset **column_list);
 
 /*
  * Specify output plugin callbacks
@@ -822,18 +841,24 @@ pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
 }
 
 /*
- * Initialize the row filter.
+ * Initialize the row filter and column list.
+ *
+ * Prepare information (ExprState, etc) used to evaluate per-publication row
+ * filters, and column lists.
+ *
+ * We also calculate a "total" column list as a union of all per-publication
+ * column lists, irrespectedly of row filters. This is used to send schema
+ * for the relsync entry, etc.
  */
 static void
 pgoutput_row_filter_init(PGOutputData *data, List *publications,
 						 RelationSyncEntry *entry)
 {
 	ListCell   *lc;
-	List	   *rfnodes[] = {NIL, NIL, NIL};	/* One per pubaction */
-	bool		no_filter[] = {false, false, false};	/* One per pubaction */
 	MemoryContext oldctx;
-	int			idx;
-	bool		has_filter = true;
+	bool		all_columns = false;
+
+	dlist_init(&entry->pubinfos);
 
 	/*
 	 * Find if there are any row filters for this relation. If there are, then
@@ -855,7 +880,12 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
 		Publication *pub = lfirst(lc);
 		HeapTuple	rftuple = NULL;
 		Datum		rfdatum = 0;
+		Datum		cfdatum = 0;
 		bool		pub_no_filter = false;
+		bool		pub_no_list = false;
+		Relation	relation = RelationIdGetRelation(entry->publish_as_relid);
+
+		PublicationInfo *pubinfo = NULL;
 
 		if (pub->alltables)
 		{
@@ -865,6 +895,7 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
 			 * publications it does).
 			 */
 			pub_no_filter = true;
+			pub_no_list = true;
 		}
 		else
 		{
@@ -881,191 +912,75 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
 				rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
 										  Anum_pg_publication_rel_prqual,
 										  &pub_no_filter);
+
+				/*
+				 * Lookup the column list attribute.
+				 *
+				 * Null indicates no list.
+				 */
+				cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
+										  Anum_pg_publication_rel_prattrs,
+										  &pub_no_list);
 			}
 			else
 			{
 				pub_no_filter = true;
+				pub_no_list = true;
 			}
 		}
 
-		if (pub_no_filter)
-		{
-			if (rftuple)
-				ReleaseSysCache(rftuple);
-
-			no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
-			no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
-			no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
-
-			/*
-			 * Quick exit if all the DML actions are publicized via this
-			 * publication.
-			 */
-			if (no_filter[PUBACTION_INSERT] &&
-				no_filter[PUBACTION_UPDATE] &&
-				no_filter[PUBACTION_DELETE])
-			{
-				has_filter = false;
-				break;
-			}
-
-			/* No additional work for this publication. Next one. */
-			continue;
-		}
-
-		/* Form the per pubaction row filter lists. */
-		if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
-			rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
-												TextDatumGetCString(rfdatum));
-		if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
-			rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
-												TextDatumGetCString(rfdatum));
-		if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
-			rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
-												TextDatumGetCString(rfdatum));
-
-		ReleaseSysCache(rftuple);
-	}							/* loop all subscribed publications */
-
-	/* Clean the row filter */
-	for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
-	{
-		if (no_filter[idx])
-		{
-			list_free_deep(rfnodes[idx]);
-			rfnodes[idx] = NIL;
-		}
-	}
-
-	if (has_filter)
-	{
-		Relation	relation = RelationIdGetRelation(entry->publish_as_relid);
-
 		pgoutput_ensure_entry_cxt(data, entry);
 
-		/*
-		 * Now all the filters for all pubactions are known. Combine them when
-		 * their pubactions are the same.
-		 */
 		oldctx = MemoryContextSwitchTo(entry->entry_cxt);
-		entry->estate = create_estate_for_relation(relation);
-		for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
-		{
-			List	   *filters = NIL;
-			Expr	   *rfnode;
 
-			if (rfnodes[idx] == NIL)
-				continue;
-
-			foreach(lc, rfnodes[idx])
-				filters = lappend(filters, stringToNode((char *) lfirst(lc)));
-
-			/* combine the row filter and cache the ExprState */
-			rfnode = make_orclause(filters);
-			entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
-		}						/* for each pubaction */
-		MemoryContextSwitchTo(oldctx);
-
-		RelationClose(relation);
-	}
-}
+		pubinfo = (PublicationInfo *) palloc0(sizeof(PublicationInfo));
 
-/*
- * Initialize the column list.
- */
-static void
-pgoutput_column_list_init(PGOutputData *data, List *publications,
-						  RelationSyncEntry *entry)
-{
-	ListCell   *lc;
+		pubinfo->oid = pub->oid;
+		pubinfo->pubactions = pub->pubactions;
 
-	/*
-	 * Find if there are any column lists for this relation. If there are,
-	 * build a bitmap merging all the column lists.
-	 *
-	 * All the given publication-table mappings must be checked.
-	 *
-	 * Multiple publications might have multiple column lists for this relation.
-	 *
-	 * FOR ALL TABLES and FOR ALL TABLES IN SCHEMA implies "don't use column
-	 * list" so it takes precedence.
-	 */
-	foreach(lc, publications)
-	{
-		Publication *pub = lfirst(lc);
-		HeapTuple	cftuple = NULL;
-		Datum		cfdatum = 0;
+		if (!pub_no_filter)
+		{
+			entry->estate = create_estate_for_relation(relation);
 
-		/*
-		 * Assume there's no column list. Only if we find pg_publication_rel
-		 * entry with a column list we'll switch it to false.
-		 */
-		bool		pub_no_list = true;
+			pubinfo->rowfilter
+				= ExecPrepareExpr(stringToNode(TextDatumGetCString(rfdatum)),
+								  entry->estate);
+		}
 
 		/*
-		 * If the publication is FOR ALL TABLES then it is treated the same as if
-		 * there are no column lists (even if other publications have a list).
+		 * Build the column list bitmap in the per-entry context.
+		 *
+		 * We need to merge column lists from all publications, so we
+		 * update the same bitmapset. If the column list is null, we
+		 * interpret it as replicating all columns.
 		 */
-		if (!pub->alltables)
+		pubinfo->columns = NULL;
+		if (!pub_no_list)	/* when not null */
 		{
-			/*
-			 * Check for the presence of a column list in this publication.
-			 *
-			 * Note: If we find no pg_publication_rel row, it's a publication
-			 * defined for a whole schema, so it can't have a column list, just
-			 * like a FOR ALL TABLES publication.
-			 */
-			cftuple = SearchSysCache2(PUBLICATIONRELMAP,
-									  ObjectIdGetDatum(entry->publish_as_relid),
-									  ObjectIdGetDatum(pub->oid));
-
-			if (HeapTupleIsValid(cftuple))
-			{
-				/*
-				 * Lookup the column list attribute.
-				 *
-				 * Note: We update the pub_no_list value directly, because if
-				 * the value is NULL, we have no list (and vice versa).
-				 */
-				cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
-										  Anum_pg_publication_rel_prattrs,
-										  &pub_no_list);
-
-				/*
-				 * Build the column list bitmap in the per-entry context.
-				 *
-				 * We need to merge column lists from all publications, so we
-				 * update the same bitmapset. If the column list is null, we
-				 * interpret it as replicating all columns.
-				 */
-				if (!pub_no_list)	/* when not null */
-				{
-					pgoutput_ensure_entry_cxt(data, entry);
+			pubinfo->columns = pub_collist_to_bitmapset(pubinfo->columns,
+														cfdatum,
+														entry->entry_cxt);
 
-					entry->columns = pub_collist_to_bitmapset(entry->columns,
-															  cfdatum,
-															  entry->entry_cxt);
-				}
-			}
+			entry->columns = pub_collist_to_bitmapset(entry->columns,
+													  cfdatum,
+													  entry->entry_cxt);
 		}
+		else
+			all_columns = true;
 
-		/*
-		 * Found a publication with no column list, so we're done. But first
-		 * discard column list we might have from preceding publications.
-		 */
-		if (pub_no_list)
-		{
-			if (cftuple)
-				ReleaseSysCache(cftuple);
+		if (HeapTupleIsValid(rftuple))
+			ReleaseSysCache(rftuple);
 
-			bms_free(entry->columns);
-			entry->columns = NULL;
+		MemoryContextSwitchTo(oldctx);
+		RelationClose(relation);
 
-			break;
-		}
+		dlist_push_tail(&entry->pubinfos, &pubinfo->node);
 
-		ReleaseSysCache(cftuple);
 	}							/* loop all subscribed publications */
+
+	/* any of the publications replicates all columns */
+	if (all_columns)
+		entry->columns = NULL;
 }
 
 /*
@@ -1115,7 +1030,8 @@ init_tuple_slot(PGOutputData *data, Relation relation,
 }
 
 /*
- * Change is checked against the row filter if any.
+ * Change is checked against the row filter if any, and calculate the column
+ * list applicable to the operation (with respect to matching row filters).
  *
  * Returns true if the change is to be replicated, else false.
  *
@@ -1136,6 +1052,8 @@ init_tuple_slot(PGOutputData *data, Relation relation,
  *
  * The new action is updated in the action parameter.
  *
+ * The calculated column list is returned in the column_list parameter.
+ *
  * The new slot could be updated when transforming the UPDATE into INSERT,
  * because the original new tuple might not have column values from the replica
  * identity.
@@ -1167,17 +1085,21 @@ init_tuple_slot(PGOutputData *data, Relation relation,
 static bool
 pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
 					TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
-					ReorderBufferChangeType *action)
+					ReorderBufferChangeType *action, Bitmapset **column_list)
 {
 	TupleDesc	desc;
 	int			i;
-	bool		old_matched,
-				new_matched,
+	bool		old_matched_any = false,
+				new_matched_any = false,
 				result;
-	TupleTableSlot *tmp_new_slot;
+	TupleTableSlot *tmp_new_slot = NULL;
 	TupleTableSlot *new_slot = *new_slot_ptr;
-	ExprContext *ecxt;
-	ExprState  *filter_exprstate;
+	dlist_iter	iter;
+	bool		matching = false;
+
+	/* Column list calculated from publications matching the row filter. */
+	Bitmapset  *columns = NULL;
+	bool		all_columns = false;
 
 	/*
 	 * We need this map to avoid relying on ReorderBufferChangeType enums
@@ -1195,115 +1117,191 @@ pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
 
 	Assert(new_slot || old_slot);
 
-	/* Get the corresponding row filter */
-	filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
+	dlist_foreach(iter, &entry->pubinfos)
+	{
+		ExprContext *ecxt;
+		bool		old_matched,
+					new_matched;
 
-	/* Bail out if there is no row filter */
-	if (!filter_exprstate)
-		return true;
+		PublicationInfo  *pubinfo
+			= dlist_container(PublicationInfo, node, iter.cur);
 
-	elog(DEBUG3, "table \"%s.%s\" has row filter",
-		 get_namespace_name(RelationGetNamespace(relation)),
-		 RelationGetRelationName(relation));
+		/* ignore publications not replicating this action */
+		if ((*action == REORDER_BUFFER_CHANGE_INSERT) &&
+			(!pubinfo->pubactions.pubinsert))
+			continue;
+		else if ((*action == REORDER_BUFFER_CHANGE_UPDATE) &&
+				 (!pubinfo->pubactions.pubupdate))
+			continue;
+		else if ((*action == REORDER_BUFFER_CHANGE_DELETE) &&
+			(!pubinfo->pubactions.pubdelete))
+			continue;
 
-	ResetPerTupleExprContext(entry->estate);
+		if (!pubinfo->rowfilter)
+		{
+			matching = true;
 
-	ecxt = GetPerTupleExprContext(entry->estate);
+			/*
+			 * Update/merge the column list.
+			 *
+			 * If the publication has no column list, we interpret it as a list
+			 * with all columns. Otherwise we just add it to the bitmap.
+			 *
+			 * FIXME This is repeated in three places. Maybe refactor?
+			 */
+			if (!pubinfo->columns)
+			{
+				all_columns = true;
+				bms_free(columns);
+				columns = NULL;
+			}
+			else if (!all_columns)
+				columns = bms_union(columns, pubinfo->columns);
 
-	/*
-	 * For the following occasions where there is only one tuple, we can
-	 * evaluate the row filter for that tuple and return.
-	 *
-	 * For inserts, we only have the new tuple.
-	 *
-	 * For updates, we can have only a new tuple when none of the replica
-	 * identity columns changed and none of those columns have external data
-	 * but we still need to evaluate the row filter for the new tuple as the
-	 * existing values of those columns might not match the filter. Also, users
-	 * can use constant expressions in the row filter, so we anyway need to
-	 * evaluate it for the new tuple.
-	 *
-	 * For deletes, we only have the old tuple.
-	 */
-	if (!new_slot || !old_slot)
-	{
-		ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
-		result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+			continue;
+		}
 
-		return result;
-	}
+		elog(DEBUG3, "table \"%s.%s\" has row filter",
+			get_namespace_name(RelationGetNamespace(relation)),
+			RelationGetRelationName(relation));
 
-	/*
-	 * Both the old and new tuples must be valid only for updates and need to
-	 * be checked against the row filter.
-	 */
-	Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
+		ResetPerTupleExprContext(entry->estate);
+
+		ecxt = GetPerTupleExprContext(entry->estate);
 
-	slot_getallattrs(new_slot);
-	slot_getallattrs(old_slot);
+		/*
+		 * For the following occasions where there is only one tuple, we can
+		 * evaluate the row filter for that tuple and return.
+		 *
+		 * For inserts, we only have the new tuple.
+		 *
+		 * For updates, we can have only a new tuple when none of the replica
+		 * identity columns changed and none of those columns have external data
+		 * but we still need to evaluate the row filter for the new tuple as the
+		 * existing values of those columns might not match the filter. Also, users
+		 * can use constant expressions in the row filter, so we anyway need to
+		 * evaluate it for the new tuple.
+		 *
+		 * For deletes, we only have the old tuple.
+		 */
+		if (!new_slot || !old_slot)
+		{
+			ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
+			result = pgoutput_row_filter_exec_expr(pubinfo->rowfilter, ecxt);
 
-	tmp_new_slot = NULL;
-	desc = RelationGetDescr(relation);
+			matching |= result;
 
-	/*
-	 * The new tuple might not have all the replica identity columns, in which
-	 * case it needs to be copied over from the old tuple.
-	 */
-	for (i = 0; i < desc->natts; i++)
-	{
-		Form_pg_attribute att = TupleDescAttr(desc, i);
+			/*
+			 * FIXME refactor to reuse this code in multiple places
+			 * 
+			 * XXX Should this only update column list when using new slot?
+			 * If evaluationg old slot, that's delete, no?
+			 */
+			if (result)
+			{
+				if (!pubinfo->columns)
+				{
+					all_columns = true;
+					bms_free(columns);
+					columns = NULL;
+				}
+				else if (!all_columns)
+				{
+					columns = bms_union(columns, pubinfo->columns);
+				}
+			}
+
+			continue;
+		}
 
 		/*
-		 * if the column in the new tuple or old tuple is null, nothing to do
+		 * Both the old and new tuples must be valid only for updates and need to
+		 * be checked against the row filter.
 		 */
-		if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
-			continue;
+		Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
+
+		slot_getallattrs(new_slot);
+		slot_getallattrs(old_slot);
+
+		tmp_new_slot = NULL;
+		desc = RelationGetDescr(relation);
 
 		/*
-		 * Unchanged toasted replica identity columns are only logged in the
-		 * old tuple. Copy this over to the new tuple. The changed (or WAL
-		 * Logged) toast values are always assembled in memory and set as
-		 * VARTAG_INDIRECT. See ReorderBufferToastReplace.
+		 * The new tuple might not have all the replica identity columns, in which
+		 * case it needs to be copied over from the old tuple.
 		 */
-		if (att->attlen == -1 &&
-			VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
-			!VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
+		for (i = 0; i < desc->natts; i++)
 		{
-			if (!tmp_new_slot)
+			Form_pg_attribute att = TupleDescAttr(desc, i);
+
+			/*
+			 * if the column in the new tuple or old tuple is null, nothing to do
+			 */
+			if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
+				continue;
+
+			/*
+			 * Unchanged toasted replica identity columns are only logged in the
+			 * old tuple. Copy this over to the new tuple. The changed (or WAL
+			 * Logged) toast values are always assembled in memory and set as
+			 * VARTAG_INDIRECT. See ReorderBufferToastReplace.
+			 */
+			if (att->attlen == -1 &&
+				VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
+				!VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
 			{
-				tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
-				ExecClearTuple(tmp_new_slot);
+				if (!tmp_new_slot)
+				{
+					tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
+					ExecClearTuple(tmp_new_slot);
 
-				memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
-					   desc->natts * sizeof(Datum));
-				memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
-					   desc->natts * sizeof(bool));
-			}
+					memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
+						   desc->natts * sizeof(Datum));
+					memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
+						   desc->natts * sizeof(bool));
+				}
 
-			tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
-			tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
+				tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
+				tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
+			}
 		}
-	}
 
-	ecxt->ecxt_scantuple = old_slot;
-	old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+		ecxt->ecxt_scantuple = old_slot;
 
-	if (tmp_new_slot)
-	{
-		ExecStoreVirtualTuple(tmp_new_slot);
-		ecxt->ecxt_scantuple = tmp_new_slot;
-	}
-	else
-		ecxt->ecxt_scantuple = new_slot;
+		old_matched = pgoutput_row_filter_exec_expr(pubinfo->rowfilter, ecxt);
+		old_matched_any |= old_matched;
+
+		if (tmp_new_slot)
+		{
+			ExecStoreVirtualTuple(tmp_new_slot);
+			ecxt->ecxt_scantuple = tmp_new_slot;
+		}
+		else
+			ecxt->ecxt_scantuple = new_slot;
 
-	new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+		new_matched = pgoutput_row_filter_exec_expr(pubinfo->rowfilter, ecxt);
+		new_matched_any |= new_matched;
 
-	/*
-	 * Case 1: if both tuples don't match the row filter, bailout. Send
-	 * nothing.
-	 */
-	if (!old_matched && !new_matched)
-		return false;
+		/*
+		 * Case 1: if both tuples don't match the row filter, bailout. Send
+		 * nothing.
+		 */
+		if (!old_matched && !new_matched)
+			continue;	/* continue with the next row filter */
+
+		/*
+		 * Case 4: if both tuples match the row filter, transformation isn't
+		 * required. (*action is default UPDATE).
+		 */
+		if (!pubinfo->columns)
+		{
+			all_columns = true;
+			bms_free(columns);
+			columns = NULL;
+		}
+		else if (!all_columns && new_matched)
+			columns = bms_union(columns, pubinfo->columns);
+	}
 
 	/*
 	 * Case 2: if the old tuple doesn't satisfy the row filter but the new
@@ -1314,9 +1312,10 @@ pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
 	 * while inserting the tuple in the downstream node, we have all the
 	 * required column values.
 	 */
-	if (!old_matched && new_matched)
+	if (!old_matched_any && new_matched_any)
 	{
 		*action = REORDER_BUFFER_CHANGE_INSERT;
+		matching = true;
 
 		if (tmp_new_slot)
 			*new_slot_ptr = tmp_new_slot;
@@ -1329,15 +1328,18 @@ pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
 	 * This transformation does not require another tuple. The Old tuple will
 	 * be used for DELETE.
 	 */
-	else if (old_matched && !new_matched)
+	else if (old_matched_any && !new_matched_any)
+	{
 		*action = REORDER_BUFFER_CHANGE_DELETE;
+		matching = true;
+	}
+	else if (old_matched_any && new_matched_any)
+		matching = true;
 
-	/*
-	 * Case 4: if both tuples match the row filter, transformation isn't
-	 * required. (*action is default UPDATE).
-	 */
+	if (column_list)
+		*column_list = columns;
 
-	return true;
+	return matching;
 }
 
 /*
@@ -1359,6 +1361,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	ReorderBufferChangeType action = change->action;
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
+	Bitmapset	   *columns = NULL;
 
 	if (!is_publishable_relation(relation))
 		return;
@@ -1423,7 +1426,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 			/* Check row filter */
 			if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
-									 &action))
+									 &action, &columns))
 				break;
 
 			/*
@@ -1444,7 +1447,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 			OutputPluginPrepareWrite(ctx, true);
 			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
-									data->binary, relentry->columns);
+									data->binary,
+									relentry->columns, columns);
 			OutputPluginWrite(ctx, true);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
@@ -1483,7 +1487,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 			/* Check row filter */
 			if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-									 relentry, &action))
+									 relentry, &action, &columns))
 				break;
 
 			/* Send BEGIN if we haven't yet */
@@ -1503,12 +1507,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				case REORDER_BUFFER_CHANGE_INSERT:
 					logicalrep_write_insert(ctx->out, xid, targetrel,
 											new_slot, data->binary,
-											relentry->columns);
+											relentry->columns, columns);
 					break;
 				case REORDER_BUFFER_CHANGE_UPDATE:
 					logicalrep_write_update(ctx->out, xid, targetrel,
 											old_slot, new_slot, data->binary,
-											relentry->columns);
+											relentry->columns, columns);
 					break;
 				case REORDER_BUFFER_CHANGE_DELETE:
 					logicalrep_write_delete(ctx->out, xid, targetrel,
@@ -1547,7 +1551,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				/* Check row filter */
 				if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-										 relentry, &action))
+										 relentry, &action, NULL))
 					break;
 
 				/* Send BEGIN if we haven't yet */
@@ -1977,7 +1981,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 		entry->new_slot = NULL;
 		entry->old_slot = NULL;
-		memset(entry->exprstate, 0, sizeof(entry->exprstate));
+		dlist_init(&entry->pubinfos);
 		entry->entry_cxt = NULL;
 		entry->publish_as_relid = InvalidOid;
 		entry->columns = NULL;
@@ -2056,7 +2060,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 
 		entry->entry_cxt = NULL;
 		entry->estate = NULL;
-		memset(entry->exprstate, 0, sizeof(entry->exprstate));
+		dlist_init(&entry->pubinfos);
 
 		/*
 		 * Build publication cache. We can't use one provided by relcache as
@@ -2192,11 +2196,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			/* Initialize the tuple slot and map */
 			init_tuple_slot(data, relation, entry);
 
-			/* Initialize the row filter */
+			/* Initialize the row filter and column list info */
 			pgoutput_row_filter_init(data, rel_publications, entry);
-
-			/* Initialize the column list */
-			pgoutput_column_list_init(data, rel_publications, entry);
 		}
 
 		list_free(pubids);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index a771ab8ff33..1a3f4f34bf4 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -104,6 +104,7 @@ typedef struct LogicalRepRelation
 	char	   *relname;		/* relation name */
 	int			natts;			/* number of columns */
 	char	  **attnames;		/* column names */
+	int16	   *attnums;		/* column attnums */
 	Oid		   *atttyps;		/* column types */
 	char		replident;		/* replica identity */
 	char		relkind;		/* remote relation kind */
@@ -209,12 +210,14 @@ extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *newslot,
-									bool binary, Bitmapset *columns);
+									bool binary, Bitmapset *schema_columns,
+									Bitmapset *columns);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *oldslot,
-									TupleTableSlot *newslot, bool binary, Bitmapset *columns);
+									TupleTableSlot *newslot, bool binary,
+									Bitmapset *schema_columns, Bitmapset *columns);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);
