Patch for Delta Materialized View Refreshes

Started by John Dentabout 7 years ago2 messages
#1John Dent
denty@QQdd.eu
1 attachment(s)

Hi folks,

I failed to post a patch on the thread “Delta Materialized View Refreshes?” (Message-ID 1541368916681-0.post@n3.nabble.com), so I figured I’d try again and post directly this time. Hopefully this time, it’ll make it through. Thanks for your patience.

(Original message follows…)

Hi folks,

I had a crack at this, and it was pretty simple to get something working to play around with, and it seems like it might be useful.

I developed it against 10.1, as that's what I happened to be working with at the time. The patch is pretty small, and I hoped it would apply cleanly
against 11. Unfortunately it doesn't, but I doubt the issues are substantial. If there is interest in moving this forward, I'll update and re-share.

The patch enables pretty much exactly what Jeremy suggests — something like "refresh materialized view concurrently testview where type = 'main';” — with fairly obvious semantics.

Welcome comments on the patch or approach.

denty.

Attachments:

refresh-mv-where-clause.diffapplication/octet-stream; name=refresh-mv-where-clause.diff; x-unix-mode=0644Download
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 7d57f97..5f76262 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -29,6 +29,9 @@
 #include "executor/executor.h"
 #include "executor/spi.h"
 #include "miscadmin.h"
+#include "nodes/makefuncs.h"
+#include "optimizer/clauses.h"
+#include "parser/parse_clause.h"
 #include "parser/parse_relation.h"
 #include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
@@ -38,6 +41,7 @@
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
+#include "utils/ruleutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/typcache.h"
@@ -61,13 +65,13 @@ static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
 static void transientrel_shutdown(DestReceiver *self);
 static void transientrel_destroy(DestReceiver *self);
 static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
-						 const char *queryString);
+						 const char *queryString, Node *filter_clause);
 
 static char *make_temptable_name_n(char *tempname, int n);
 static void mv_GenerateOper(StringInfo buf, Oid opoid);
 
 static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
-					   int save_sec_context);
+					   int save_sec_context, const char *filter_clause_str);
 static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
 
 static void OpenMatViewIncrementalMaintenance(void);
@@ -113,6 +117,64 @@ SetMatViewPopulatedState(Relation relation, bool newstate)
 	CommandCounterIncrement();
 }
 
+/*
+ * Given a subquery, and a filter clause, create the equivalent of
+ * SELECT * FROM <subquery> WHERE <filter clause>
+ */
+static Query *
+mv_refresh_build_subquery_with_filter (Query *subquery,
+								Node *filter_clause)
+{
+	Query *scan_query = makeNode (Query);
+	
+	scan_query->commandType = 1;
+	scan_query->querySource = QSRC_ORIGINAL;
+	scan_query->resultRelation = 0;
+	scan_query->hasAggs = false;
+	scan_query->hasWindowFuncs = false;
+	scan_query->hasTargetSRFs = false;
+	scan_query->hasSubLinks = false;
+	scan_query->hasDistinctOn = false;
+	scan_query->hasRecursive = false;
+	scan_query->hasModifyingCTE = false;
+	scan_query->hasForUpdate = false;
+	scan_query->hasRowSecurity = false; // FIXME: is this correct, or should we inherit?
+	
+	scan_query->cteList = NIL;
+	
+	/* Create a dummy ParseState for addRangeTableEntryForSubquery */
+	ParseState *pstate = make_parsestate(NULL);
+	
+	RangeTblEntry *rte = addRangeTableEntryForSubquery (pstate, subquery, makeAlias ("tab", NIL), false, false);
+	scan_query->rtable = list_make1 (rte);
+	
+	RangeTblRef *rtr = makeNode (RangeTblRef);
+	rtr->rtindex = 1;
+	
+	scan_query->jointree = makeNode (FromExpr);
+	scan_query->jointree->fromlist = list_make1 (rtr);
+	
+	if (filter_clause != NULL)
+	{
+		scan_query->jointree->quals = filter_clause;
+	}
+	
+	scan_query->targetList = expandRelAttrs (pstate, rte, 1, 0, -1);
+	
+	scan_query->override = OVERRIDING_NOT_SET;
+	scan_query->returningList = NIL;
+	scan_query->rowMarks = NIL;
+	scan_query->setOperations = NULL;
+	scan_query->constraintDeps = NIL;
+	scan_query->withCheckOptions = NIL;
+	
+	scan_query->stmt_location = -1;
+	scan_query->stmt_len = -1;
+	
+	return scan_query;
+}
+
+
 /*
  * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
  *
@@ -260,6 +322,43 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
 					 errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
 	}
 
+	/* Construct a filter clause, if one was supplied. */
+	char *filter_clause_str = NULL;
+	Node *filter_clause = NULL;
+	if (stmt->whereClause != NULL)
+	{
+		if (!concurrent)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("cannot partially refresh materialized view \"%s\" that cannot be refreshed concurrently",
+							quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
+													   RelationGetRelationName(matviewRel))),
+					 errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
+		
+		ParseState *pstate = make_parsestate(NULL);
+
+		RangeTblEntry *rte = addRangeTableEntryForRelation(pstate, matviewRel,
+														   NULL, false, false);
+		addRTEtoQuery(pstate, rte, false, true, true);
+		
+		filter_clause = transformWhereClause(pstate,
+										  copyObject(stmt->whereClause),
+										  EXPR_KIND_WHERE,
+										  "WHERE");
+
+		if (contain_volatile_functions(filter_clause))
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+					 errmsg("cannot partially refresh materialized view \"%s\" using volatile expresions in the WHERE clause",
+							quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
+													   RelationGetRelationName(matviewRel))),
+					 errhint("refresh the materialized view without a WHERE clause, or adjust the expressions or function definitions.")));
+
+		filter_clause_str = deparse_expression (filter_clause,
+												deparse_context_for ("tab", matviewOid),
+												true, false);
+	}
+	
 	/*
 	 * The stored query was rewritten at the time of the MV definition, but
 	 * has not been scribbled on by the planner.
@@ -324,7 +423,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
 
 	/* Generate the data, if wanted. */
 	if (!stmt->skipData)
-		processed = refresh_matview_datafill(dest, dataQuery, queryString);
+		processed = refresh_matview_datafill(dest, dataQuery, queryString, filter_clause);
 
 	/* Make the matview match the newly generated data. */
 	if (concurrent)
@@ -334,7 +433,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
 		PG_TRY();
 		{
 			refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
-								   save_sec_context);
+								   save_sec_context, filter_clause_str);
 		}
 		PG_CATCH();
 		{
@@ -382,7 +481,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
  */
 static uint64
 refresh_matview_datafill(DestReceiver *dest, Query *query,
-						 const char *queryString)
+						 const char *queryString, Node *filter_clause)
 {
 	List	   *rewritten;
 	PlannedStmt *plan;
@@ -402,6 +501,12 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
 
 	/* Check for user-requested abort. */
 	CHECK_FOR_INTERRUPTS();
+	
+	/* Construct as subquery and apply filter, if filter is provided. */
+	if (filter_clause != NULL)
+	{
+		query = mv_refresh_build_subquery_with_filter (query, filter_clause);
+	}
 
 	/* Plan the query which will generate data for the refresh. */
 	plan = pg_plan_query(query, 0, NULL);
@@ -610,7 +715,7 @@ mv_GenerateOper(StringInfo buf, Oid opoid)
  */
 static void
 refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
-					   int save_sec_context)
+					   int save_sec_context, const char *filter_clause_str)
 {
 	StringInfoData querybuf;
 	Relation	matviewRel;
@@ -689,8 +794,12 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
 	appendStringInfo(&querybuf,
 					 "CREATE TEMP TABLE %s AS "
 					 "SELECT mv.ctid AS tid, newdata "
-					 "FROM %s mv FULL JOIN %s newdata ON (",
-					 diffname, matviewname, tempname);
+					 "FROM ("
+					 "  SELECT tab.ctid, tab.*, row (tab.*) tab_row FROM %s tab %s %s" // FIXME: rename column vars to avoid conflit with
+					 ") mv FULL JOIN %s newdata ON (",
+					 diffname, matviewname,
+					 filter_clause_str != NULL ? "WHERE" : "", filter_clause_str,
+					 tempname);
 
 	/*
 	 * Get the list of index OIDs for the table from the relcache, and look up
@@ -772,7 +881,7 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
 	Assert(foundUniqueIndex);
 
 	appendStringInfoString(&querybuf,
-						   " AND newdata OPERATOR(pg_catalog.*=) mv) "
+						   " AND newdata OPERATOR(pg_catalog.*=) mv.tab_row) "
 						   "WHERE newdata IS NULL OR mv IS NULL "
 						   "ORDER BY tid");
 
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 4d67070..c4a5114 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3803,6 +3803,7 @@ _copyRefreshMatViewStmt(const RefreshMatViewStmt *from)
 	COPY_SCALAR_FIELD(concurrent);
 	COPY_SCALAR_FIELD(skipData);
 	COPY_NODE_FIELD(relation);
+	COPY_NODE_FIELD(whereClause);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 8d92c03..e9ab121 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1696,6 +1696,7 @@ _equalRefreshMatViewStmt(const RefreshMatViewStmt *a, const RefreshMatViewStmt *
 	COMPARE_SCALAR_FIELD(concurrent);
 	COMPARE_SCALAR_FIELD(skipData);
 	COMPARE_NODE_FIELD(relation);
+	COMPARE_NODE_FIELD(whereClause);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 7d0de99..358dfd4 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -3972,12 +3972,13 @@ OptNoLog:	UNLOGGED					{ $$ = RELPERSISTENCE_UNLOGGED; }
  *****************************************************************************/
 
 RefreshMatViewStmt:
-			REFRESH MATERIALIZED VIEW opt_concurrently qualified_name opt_with_data
+			REFRESH MATERIALIZED VIEW opt_concurrently qualified_name opt_with_data where_clause
 				{
 					RefreshMatViewStmt *n = makeNode(RefreshMatViewStmt);
 					n->concurrent = $4;
 					n->relation = $5;
 					n->skipData = !($6);
+					n->whereClause = $7;
 					$$ = (Node *) n;
 				}
 		;
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index ef6753e..3b7091b 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3150,6 +3150,7 @@ typedef struct RefreshMatViewStmt
 	NodeTag		type;
 	bool		concurrent;		/* allow concurrent access? */
 	bool		skipData;		/* true for WITH NO DATA */
+	Node	   *whereClause;	/* WHERE qualification */
 	RangeVar   *relation;		/* relation to insert into */
 } RefreshMatViewStmt;
 
#2John Dent
denty@QQdd.eu
In reply to: John Dent (#1)
1 attachment(s)
Re: Patch for Delta Materialized View Refreshes

Hi folks,

I’ve updated this patch against 11.0, and tidied up a few loose ends.

Attachments:

refresh-mv-where-clause-#2.diffapplication/octet-stream; name=refresh-mv-where-clause-#2.diff; x-unix-mode=0644Download
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index e1eb7c3..da525d4 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -31,6 +31,9 @@
 #include "executor/executor.h"
 #include "executor/spi.h"
 #include "miscadmin.h"
+#include "nodes/makefuncs.h"
+#include "optimizer/clauses.h"
+#include "parser/parse_clause.h"
 #include "parser/parse_relation.h"
 #include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
@@ -40,6 +43,7 @@
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
+#include "utils/ruleutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 
@@ -62,10 +66,10 @@ static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
 static void transientrel_shutdown(DestReceiver *self);
 static void transientrel_destroy(DestReceiver *self);
 static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
-						 const char *queryString);
+						 const char *queryString, Node *filterClause);
 static char *make_temptable_name_n(char *tempname, int n);
 static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
-					   int save_sec_context);
+					   int save_sec_context, const char *filterClauseStr);
 static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
 static bool is_usable_unique_index(Relation indexRel);
 static void OpenMatViewIncrementalMaintenance(void);
@@ -111,6 +115,61 @@ SetMatViewPopulatedState(Relation relation, bool newstate)
 	CommandCounterIncrement();
 }
 
+/*
+ * Given a subquery, and a filter clause, create the equivalent of
+ * SELECT * FROM <subquery> WHERE <filter clause>
+ */
+static Query *
+mv_refresh_build_subquery_with_filter (Query *subquery,
+								Node *filterClause)
+{
+	Query *scan_query = makeNode (Query);
+	
+	scan_query->commandType = 1;
+	scan_query->querySource = QSRC_ORIGINAL;
+	scan_query->resultRelation = 0;
+	scan_query->hasAggs = false;
+	scan_query->hasWindowFuncs = false;
+	scan_query->hasTargetSRFs = false;
+	scan_query->hasSubLinks = false;
+	scan_query->hasDistinctOn = false;
+	scan_query->hasRecursive = false;
+	scan_query->hasModifyingCTE = false;
+	scan_query->hasForUpdate = false;
+	scan_query->hasRowSecurity = false; // FIXME: is this correct, or should we inherit?
+	
+	scan_query->cteList = NIL;
+	
+	/* Create a dummy ParseState for addRangeTableEntryForSubquery */
+	ParseState *pstate = make_parsestate(NULL);
+	
+	RangeTblEntry *rte = addRangeTableEntryForSubquery (pstate, subquery, makeAlias ("tab", NIL), false, false);
+	scan_query->rtable = list_make1 (rte);
+	
+	RangeTblRef *rtr = makeNode (RangeTblRef);
+	rtr->rtindex = 1;
+	
+	scan_query->jointree = makeNode (FromExpr);
+	scan_query->jointree->fromlist = list_make1 (rtr);
+	
+	scan_query->jointree->quals = filterClause;
+	
+	scan_query->targetList = expandRelAttrs (pstate, rte, 1, 0, -1);
+	
+	scan_query->override = OVERRIDING_NOT_SET;
+	scan_query->returningList = NIL;
+	scan_query->rowMarks = NIL;
+	scan_query->setOperations = NULL;
+	scan_query->constraintDeps = NIL;
+	scan_query->withCheckOptions = NIL;
+	
+	scan_query->stmt_location = -1;
+	scan_query->stmt_len = -1;
+	
+	return scan_query;
+}
+
+
 /*
  * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
  *
@@ -247,6 +306,43 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
 					 errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
 	}
 
+	/* Construct a filter clause, if one was supplied. */
+	char *filterClauseStr = NULL;
+	Node *filterClause = NULL;
+	if (stmt->whereClause != NULL)
+	{
+		if (!concurrent)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("cannot partially refresh materialized view \"%s\" that is not refreshed CONCURRENTLY",
+							quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
+													   RelationGetRelationName(matviewRel))),
+					 errhint("refresh the materialized view CONCURRENTLY.")));
+		
+		ParseState *pstate = make_parsestate(NULL);
+
+		RangeTblEntry *rte = addRangeTableEntryForRelation(pstate, matviewRel,
+														   NULL, false, false);
+		addRTEtoQuery(pstate, rte, false, true, true);
+		
+		filterClause = transformWhereClause(pstate,
+										  copyObject(stmt->whereClause),
+										  EXPR_KIND_WHERE,
+										  "WHERE");
+
+		if (contain_volatile_functions(filterClause))
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+					 errmsg("cannot partially refresh materialized view \"%s\" with volatile expresions in the WHERE clause",
+							quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
+													   RelationGetRelationName(matviewRel))),
+					 errhint("refresh the materialized view without a WHERE clause, or adjust the expressions or function definitions.")));
+
+		filterClauseStr = deparse_expression (filterClause,
+												deparse_context_for ("tab", matviewOid),
+												true, false);
+	}
+	
 	/*
 	 * The stored query was rewritten at the time of the MV definition, but
 	 * has not been scribbled on by the planner.
@@ -311,7 +407,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
 
 	/* Generate the data, if wanted. */
 	if (!stmt->skipData)
-		processed = refresh_matview_datafill(dest, dataQuery, queryString);
+		processed = refresh_matview_datafill(dest, dataQuery, queryString, filterClause);
 
 	/* Make the matview match the newly generated data. */
 	if (concurrent)
@@ -321,7 +417,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
 		PG_TRY();
 		{
 			refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
-								   save_sec_context);
+								   save_sec_context, filterClauseStr);
 		}
 		PG_CATCH();
 		{
@@ -365,11 +461,15 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
  * Execute the given query, sending result rows to "dest" (which will
  * insert them into the target matview).
  *
+ * If filterCluase is provided, the query is executed as a subsquery,
+ * and the filterClause is applied as a WHERE clause at the top
+ * level.
+ *
  * Returns number of rows inserted.
  */
 static uint64
 refresh_matview_datafill(DestReceiver *dest, Query *query,
-						 const char *queryString)
+						 const char *queryString, Node *filterClause)
 {
 	List	   *rewritten;
 	PlannedStmt *plan;
@@ -389,6 +489,12 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
 
 	/* Check for user-requested abort. */
 	CHECK_FOR_INTERRUPTS();
+	
+	/* Construct as subquery and apply filter, if filter is provided. */
+	if (filterClause != NULL)
+	{
+		query = mv_refresh_build_subquery_with_filter (query, filterClause);
+	}
 
 	/* Plan the query which will generate data for the refresh. */
 	plan = pg_plan_query(query, 0, NULL);
@@ -575,10 +681,13 @@ make_temptable_name_n(char *tempname, int n)
  * incremental maintenance.  It also doesn't seem reasonable or safe to allow
  * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
  * this command.
+ *
+ * If filterCluaseStr[ing] is provided, the join is executed aginst the contents
+ * of the MV, filtered by the supplied clause.
  */
 static void
 refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
-					   int save_sec_context)
+					   int save_sec_context, const char *filterClauseStr)
 {
 	StringInfoData querybuf;
 	Relation	matviewRel;
@@ -656,8 +765,38 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
 	appendStringInfo(&querybuf,
 					 "CREATE TEMP TABLE %s AS "
 					 "SELECT mv.ctid AS tid, newdata "
-					 "FROM %s mv FULL JOIN %s newdata ON (",
-					 diffname, matviewname, tempname);
+					 "FROM ("
+					 "  SELECT tab.ctid, row (tab.*) tab_row",
+					 diffname);
+
+	/*
+	 * Append each of the columns of the table, but name them "attrN" in order
+	 * that the name does not conflict with either ctid (which won't happen) or
+	 * tab_row (which might).
+	 */
+	 tupdesc = RelationGetDescr (matviewRel);
+	 for (int attrnum = 1; attrnum <= RelationGetNumberOfAttributes (matviewRel); attrnum++)
+	 {
+		 Form_pg_attribute attr = TupleDescAttr (tupdesc, attrnum - 1);
+		 appendStringInfo(&querybuf,
+						  ", %s AS attr%d",
+						  quote_qualified_identifier("tab", NameStr(attr->attname)),
+						  attrnum);
+	 }
+
+	appendStringInfo(&querybuf,
+					 " FROM %s tab",
+					 matviewname);
+
+	/* Append the filter string, if one was supplied. */
+	if (filterClauseStr != NULL)
+	{
+		appendStringInfo(&querybuf, " WHERE %s", filterClauseStr);
+	}
+	
+	appendStringInfo(&querybuf,
+					 ") mv FULL JOIN %s newdata ON (",
+					 tempname);
 
 	/*
 	 * Get the list of index OIDs for the table from the relcache, and look up
@@ -665,7 +804,6 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
 	 * columns present in all unique indexes which only reference columns and
 	 * include all rows.
 	 */
-	tupdesc = matviewRel->rd_att;
 	opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
 	foundUniqueIndex = false;
 
@@ -751,8 +889,8 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
 
 				leftop = quote_qualified_identifier("newdata",
 													NameStr(attr->attname));
-				rightop = quote_qualified_identifier("mv",
-													 NameStr(attr->attname));
+				/* Name the right operand according to its attribute number. */
+				rightop = psprintf ("mv.attr%d", attnum);
 
 				generate_operator_clause(&querybuf,
 										 leftop, attrtype,
@@ -779,7 +917,7 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
 	Assert(foundUniqueIndex);
 
 	appendStringInfoString(&querybuf,
-						   " AND newdata OPERATOR(pg_catalog.*=) mv) "
+						   " AND newdata OPERATOR(pg_catalog.*=) mv.tab_row) "
 						   "WHERE newdata IS NULL OR mv IS NULL "
 						   "ORDER BY tid");
 
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 648758d..098c6b3 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3900,6 +3900,7 @@ _copyRefreshMatViewStmt(const RefreshMatViewStmt *from)
 	COPY_SCALAR_FIELD(concurrent);
 	COPY_SCALAR_FIELD(skipData);
 	COPY_NODE_FIELD(relation);
+	COPY_NODE_FIELD(whereClause);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 6a971d0..c12abb6 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1710,6 +1710,7 @@ _equalRefreshMatViewStmt(const RefreshMatViewStmt *a, const RefreshMatViewStmt *
 	COMPARE_SCALAR_FIELD(concurrent);
 	COMPARE_SCALAR_FIELD(skipData);
 	COMPARE_NODE_FIELD(relation);
+	COMPARE_NODE_FIELD(whereClause);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 02b500e..e01b785 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -4135,12 +4135,13 @@ OptNoLog:	UNLOGGED					{ $$ = RELPERSISTENCE_UNLOGGED; }
  *****************************************************************************/
 
 RefreshMatViewStmt:
-			REFRESH MATERIALIZED VIEW opt_concurrently qualified_name opt_with_data
+			REFRESH MATERIALIZED VIEW opt_concurrently qualified_name opt_with_data where_clause
 				{
 					RefreshMatViewStmt *n = makeNode(RefreshMatViewStmt);
 					n->concurrent = $4;
 					n->relation = $5;
 					n->skipData = !($6);
+					n->whereClause = $7;
 					$$ = (Node *) n;
 				}
 		;
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index a49b013..eebeee8 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3211,6 +3211,7 @@ typedef struct RefreshMatViewStmt
 	NodeTag		type;
 	bool		concurrent;		/* allow concurrent access? */
 	bool		skipData;		/* true for WITH NO DATA */
+	Node	   *whereClause;	/* WHERE qualification */
 	RangeVar   *relation;		/* relation to insert into */
 } RefreshMatViewStmt;