Proof of concept: auto updatable views
Hi,
I've been playing around with the idea of supporting automatically
updatable views, and I have a working proof of concept. I've taken a
different approach than the previous attempts to implement this
feature (e.g., http://archives.postgresql.org/pgsql-hackers/2009-01/msg01746.php),
instead doing all the work in the rewriter, substituting the view for
its base relation rather than attempting to auto-generate any rules or
triggers.
Basically what it does is this: in the first stage of query rewriting,
just after any non-SELECT rules are applied, the new code kicks in -
if the target relation is a view, and there were no unqualified
INSTEAD rules, and there are no INSTEAD OF triggers, it tests if the
view is simply updatable. If so, the target view is replaced by its
base relation and columns are re-mapped. Then the remainder of the
rewriting process continues, recursively handling any further
non-SELECT rules or additional simply updatable views. This handles
the case of views on top of views, with or without rules and/or
triggers.
Here's a simple example:
CREATE TABLE my_table(id int primary key, val text);
CREATE VIEW my_view AS SELECT * FROM my_table WHERE id > 0;
then any modifications to the view get redirected to underlying table:
EXPLAIN ANALYSE INSERT INTO my_view VALUES(1, 'Test row');
QUERY PLAN
------------------------------------------------------------------------------------------------
Insert on my_table (cost=0.00..0.01 rows=1 width=0) (actual
time=0.208..0.208 rows=0 loops=1)
-> Result (cost=0.00..0.01 rows=1 width=0) (actual
time=0.004..0.004 rows=1 loops=1)
Total runtime: 0.327 ms
(3 rows)
EXPLAIN ANALYSE UPDATE my_view SET val='Updated' WHERE id=1;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------
Update on my_table (cost=0.00..8.27 rows=1 width=10) (actual
time=0.039..0.039 rows=0 loops=1)
-> Index Scan using my_table_pkey on my_table (cost=0.00..8.27
rows=1 width=10) (actual time=0.014..0.015 rows=1 loops=1)
Index Cond: ((id > 0) AND (id = 1))
Total runtime: 0.090 ms
(4 rows)
EXPLAIN ANALYSE DELETE FROM my_view;
QUERY PLAN
--------------------------------------------------------------------------------------------------------
Delete on my_table (cost=0.00..1.01 rows=1 width=6) (actual
time=0.030..0.030 rows=0 loops=1)
-> Seq Scan on my_table (cost=0.00..1.01 rows=1 width=6) (actual
time=0.015..0.016 rows=1 loops=1)
Filter: (id > 0)
Total runtime: 0.063 ms
(4 rows)
The patch is currently very strict about what kinds of views can be
updated (based on SQL-92), and there is no support for WITH CHECK
OPTION, because I wanted to keep this patch as simple as possible.
The consensus last time seemed to be that backwards compatibility
should be offered through a new GUC variable to allow this feature to
be disabled globally, which I've not implemented yet.
I'm also aware that my new function ChangeVarAttnos() is almost
identical to the function map_variable_attnos() that Tom recently
added, but I couldn't immediately see a neat way to merge the two. My
function handles whole-row references to the view by mapping them to a
generic RowExpr based on the view definition. I don't think a
ConvertRowtypeExpr can be used in this case, because the column names
don't necessarily match.
Obviously there's still more work to do but the early signs seem to be
encouraging.
Thoughts?
Regards,
Dean
Attachments:
auto-update-views.patchapplication/octet-stream; name=auto-update-views.patchDownload
diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c
new file mode 100644
index 8f75948..f942767
*** a/src/backend/rewrite/rewriteHandler.c
--- b/src/backend/rewrite/rewriteHandler.c
*************** fireRules(Query *parsetree,
*** 1829,1834 ****
--- 1829,2218 ----
/*
+ * get_view_query - get the query from a view's _RETURN rule.
+ */
+ static Query *
+ get_view_query(Relation view)
+ {
+ int i;
+
+ Assert(view->rd_rel->relkind == RELKIND_VIEW);
+
+ for (i = 0; i < view->rd_rules->numLocks; i++)
+ {
+ RewriteRule *rule = view->rd_rules->rules[i];
+
+ if (rule->event == CMD_SELECT)
+ {
+ /* A _RETURN rule should have only one action */
+ if (list_length(rule->actions) != 1)
+ elog(ERROR, "invalid _RETURN rule action specification");
+
+ return linitial(rule->actions);
+ }
+ }
+
+ elog(ERROR, "failed to find _RETURN rule for view");
+ return NULL; /* keep compiler quiet */
+ }
+
+
+ /*
+ * test_auto_update_view -
+ * Test if the specified view can be automatically updated. This will either
+ * return NULL (if the view can be updated) or the reason that it cannot.
+ *
+ * Note that the checks performed here are local to this view. It does not
+ * check that the view's underlying base relation is updatable.
+ */
+ static const char *
+ test_auto_update_view(Relation view)
+ {
+ Query *viewquery;
+ RangeTblRef *rtr;
+ RangeTblEntry *base_rte;
+ Bitmapset *bms;
+ ListCell *cell;
+
+ /*
+ * Check if the view is simply updatable. According to SQL-92 this means:
+ * - No DISTINCT clauses.
+ * - Every TLE is a column reference, and appears at most once.
+ * - Refers to a single base relation.
+ * - No GROUP BY or HAVING clauses.
+ * - No set operations (UNION, INTERSECT or EXCEPT).
+ * - No sub-queries in the WHERE clause.
+ *
+ * We relax this last restriction since it would actually be more work to
+ * enforce it than to simply allow it. In addition (for now) we impose
+ * the following additional constraints, based on features that are not
+ * part of SQL-92:
+ * - No DISTINCT ON clauses.
+ * - No window functions.
+ * - No CTEs (WITH or WITH RECURSIVE).
+ * - No OFFSET or LIMIT clauses.
+ * - No system columns.
+ *
+ * Note that we do these checks without recursively expanding the view.
+ * The base relation may be a view, and it might have INSTEAD OF triggers
+ * or rules, in which case it need not satisfy these constraints.
+ */
+ viewquery = get_view_query(view);
+
+ if (viewquery->distinctClause != NIL ||
+ viewquery->hasDistinctOn)
+ return "Views containing DISTINCT are not updatable";
+
+ if (viewquery->groupClause != NIL)
+ return "Views containing GROUP BY are not updatable";
+
+ if (viewquery->havingQual != NULL)
+ return "Views containing HAVING are not updatable";
+
+ if (viewquery->hasAggs)
+ return "Views containing aggregates are not updatable";
+
+ if (viewquery->hasWindowFuncs)
+ return "Views containing window functions are not updatable";
+
+ if (viewquery->setOperations != NULL)
+ return "Views containing UNION, INTERSECT or EXCEPT are not updatable";
+
+ if (viewquery->hasRecursive ||
+ viewquery->cteList != NIL)
+ return "Views containing WITH [RECURSIVE] are not updatable";
+
+ if (viewquery->limitOffset != NULL ||
+ viewquery->limitCount != NULL)
+ return "Views containing OFFSET or LIMIT are not updatable";
+
+ /*
+ * The view query should select from a single base relation, which must be
+ * a table or another view.
+ */
+ if (list_length(viewquery->jointree->fromlist) == 0)
+ return "Views with no base relations are not updatable";
+
+ if (list_length(viewquery->jointree->fromlist) > 1)
+ return "Views with multiple base relations are not updatable";
+
+ rtr = (RangeTblRef *) linitial(viewquery->jointree->fromlist);
+ if (!IsA(rtr, RangeTblRef))
+ return "Views that are not based on tables or views are not updatable";
+
+ base_rte = rt_fetch(rtr->rtindex, viewquery->rtable);
+ if (base_rte->rtekind != RTE_RELATION)
+ return "Views that are not based on tables or views are not updatable";
+
+ /*
+ * The view's targetlist entries should all be Vars referring to columns
+ * in the base relation, and no 2 should refer to the same base column.
+ *
+ * Note that we don't bother freeing resources allocated here if we
+ * return early, since that is an error condition.
+ */
+ bms = NULL;
+ foreach(cell, viewquery->targetList)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(cell);
+ Var *var;
+
+ if (!IsA(tle->expr, Var))
+ return "Views with columns that are not simple references to columns in the base relation are not updatable";
+
+ var = (Var *) tle->expr;
+ if (var->varattno < 0)
+ return "Views that refer to system columns are not updatable";
+
+ if (var->varattno == 0)
+ return "Views that refer to whole rows from the base relation are not updatable";
+
+ if (bms_is_member(var->varattno, bms))
+ return "Views that refer to the same column more than once are not updatable";
+
+ bms = bms_add_member(bms, var->varattno);
+ }
+
+ bms_free(bms);
+ return NULL; /* the view is simply updatable */
+ }
+
+
+ /*
+ * rewriteTargetView -
+ * attempt to rewrite a query where the target relation is a view, so that
+ * the view's base relation becomes the target relation.
+ *
+ * This only handles the case where there are no INSTEAD OF triggers to update
+ * the view. If there are INSTEAD OF triggers the view is expanded as part
+ * of fireRIRrules, which has dedicated code to keep both the original view
+ * target and the expanded rule actions to act as a source of data.
+ *
+ * Note that the base relation here may itself be a view, which may or may not
+ * have INSTEAD OF triggers or rules to handle the update. That case is
+ * handled by recursion in RewriteQuery.
+ */
+ static Query *
+ rewriteTargetView(Query *parsetree, Relation view)
+ {
+ TriggerDesc *trigDesc = view->trigdesc;
+ const char *auto_update_detail;
+ Query *viewquery;
+ RangeTblRef *rtr;
+ RangeTblEntry *base_rte;
+ int natt;
+ AttrNumber *base_attno;
+ bool reorder_atts;
+ int attno;
+ ListCell *cell;
+ RangeTblEntry *view_rte;
+ int rt_index;
+ List *new_rtable;
+
+ /* The view must have INSTEAD OF triggers or be simply updatable */
+ switch (parsetree->commandType)
+ {
+ case CMD_INSERT:
+ if (trigDesc && trigDesc->trig_insert_instead_row)
+ return NULL; /* INSTEAD OF INSERT trigger will be used */
+
+ auto_update_detail = test_auto_update_view(view);
+ if (auto_update_detail)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot insert into view \"%s\"",
+ RelationGetRelationName(view)),
+ errdetail("%s.", auto_update_detail),
+ errhint("You need an unconditional ON INSERT DO INSTEAD rule or an INSTEAD OF INSERT trigger.")));
+ break;
+ case CMD_UPDATE:
+ if (trigDesc && trigDesc->trig_update_instead_row)
+ return NULL; /* INSTEAD OF UPDATE trigger will be used */
+
+ auto_update_detail = test_auto_update_view(view);
+ if (auto_update_detail)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot update view \"%s\"",
+ RelationGetRelationName(view)),
+ errdetail("%s.", auto_update_detail),
+ errhint("You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger.")));
+ break;
+ case CMD_DELETE:
+ if (trigDesc && trigDesc->trig_delete_instead_row)
+ return NULL; /* INSTEAD OF DELETE trigger will be used */
+
+ auto_update_detail = test_auto_update_view(view);
+ if (auto_update_detail)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot delete from view \"%s\"",
+ RelationGetRelationName(view)),
+ errdetail("%s.", auto_update_detail),
+ errhint("You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger.")));
+ break;
+ default:
+ elog(ERROR, "unrecognized CmdType: %d", (int) parsetree->commandType);
+ break;
+ }
+
+ /*
+ * The view should have a single base relation.
+ */
+ viewquery = get_view_query(view);
+ Assert(list_length(viewquery->jointree->fromlist) == 1);
+
+ rtr = (RangeTblRef *) linitial(viewquery->jointree->fromlist);
+ Assert(IsA(rtr, RangeTblRef));
+
+ base_rte = rt_fetch(rtr->rtindex, viewquery->rtable);
+ Assert(base_rte->rtekind == RTE_RELATION);
+
+ /*
+ * The view's targetlist entries should all be Vars referring to columns
+ * in the base relation, and no 2 should refer to the same base column.
+ * Build a mapping from view column number to base relation column number,
+ * and catch the case where they are in the same order.
+ */
+ natt = list_length(viewquery->targetList);
+ base_attno = (AttrNumber *) palloc(sizeof(AttrNumber) * natt);
+ reorder_atts = false;
+
+ attno = 1;
+ foreach(cell, viewquery->targetList)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(cell);
+ Var *var = (Var *) tle->expr;
+
+ Assert(var->varattno > 0);
+ base_attno[attno - 1] = var->varattno;
+
+ if (var->varattno != attno)
+ reorder_atts = true; /* view columns in different order */
+
+ attno++;
+ }
+
+ /*
+ * The view is simply updatable, provided that its base relation is
+ * updatable, which will be checked later. To avoid having to adjust
+ * varnos, we replace the old target RTE with the base relation RTE.
+ *
+ * Note that we also need to keep the original view RTE even though it
+ * will not be used so that the correct permissions checks are done
+ * against it.
+ */
+ rt_index = 1;
+ new_rtable = NIL;
+ foreach(cell, parsetree->rtable)
+ {
+ RangeTblEntry *rte = (RangeTblEntry *) lfirst(cell);
+
+ if (rt_index == parsetree->resultRelation)
+ {
+ /*
+ * Create the new target RTE using the base relation. Copy any
+ * permission checks from the original view target RTE, adjusting
+ * the column numbers if necessary, and performing the checks as
+ * the view's owner.
+ */
+ RangeTblEntry *newrte = (RangeTblEntry *) copyObject(base_rte);
+ newrte->requiredPerms = rte->requiredPerms;
+ newrte->checkAsUser = view->rd_rel->relowner;
+ newrte->selectedCols = bms_copy(rte->selectedCols);
+ newrte->modifiedCols = bms_copy(rte->modifiedCols);
+
+ if (reorder_atts)
+ {
+ newrte->selectedCols = adjust_col_privs(newrte->selectedCols,
+ natt, base_attno);
+ newrte->modifiedCols = adjust_col_privs(newrte->modifiedCols,
+ natt, base_attno);
+ }
+
+ /*
+ * Keep the original view RTE so that executor also checks that
+ * the current user has the required permissions on the view
+ */
+ view_rte = rte;
+ rte = newrte;
+ }
+ new_rtable = lappend(new_rtable, rte);
+ rt_index++;
+ }
+ parsetree->rtable = lappend(new_rtable, view_rte);
+
+ /*
+ * For UPDATE/DELETE, rewriteTargetListUD will have added a wholerow junk
+ * TLE for the view to the end of the targetlist which we no longer need.
+ * We remove this now to avoid unnecessary work when we process the
+ * targetlist. Note that when we recurse through rewriteQuery a new junk
+ * RTE will be added to allow the executor to find the row in the base
+ * relation.
+ */
+ if (parsetree->commandType != CMD_INSERT)
+ {
+ TargetEntry *tle = (TargetEntry *)
+ lfirst(list_tail(parsetree->targetList));
+
+ Assert(IsA(tle->expr, Var) && ((Var *) tle->expr)->varattno == 0);
+ parsetree->targetList = list_delete_ptr(parsetree->targetList, tle);
+ }
+
+ /*
+ * If the view's columns are in a different order, we need to update the
+ * target list to point to the new columns in the base relation, and
+ * similarly adjust any Vars in the query that refer to the result
+ * relation.
+ */
+ if (reorder_atts)
+ {
+ /* Update target list entries */
+ foreach(cell, parsetree->targetList)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(cell);
+
+ if (!tle->resjunk)
+ {
+ if (tle->resno > 0 && tle->resno <= natt)
+ tle->resno = base_attno[tle->resno - 1];
+ }
+ }
+
+ /* Update any Var attribute numbers for the result relation */
+ parsetree =
+ (Query *) ChangeVarAttnos((Node *) parsetree,
+ parsetree->resultRelation,
+ natt, base_attno,
+ viewquery->targetList,
+ view_rte->eref->colnames, 0);
+ }
+
+ /*
+ * For UPDATE/DELETE, add any quals from the view's jointree to the main
+ * jointree to filter rows. We know that there is just one relation in
+ * the view, so any of its quals that refer to it need to be adjusted to
+ * refer to it's new location in the main query.
+ *
+ * For INSERT the view's quals are not needed for now. When we implement
+ * WITH CHECK OPTION, this might be a good time to collect them.
+ */
+ if (parsetree->commandType != CMD_INSERT)
+ {
+ Node *viewqual = copyObject(viewquery->jointree->quals);
+
+ ChangeVarNodes(viewqual, rtr->rtindex, parsetree->resultRelation, 0);
+ AddQual(parsetree, (Node *) viewqual);
+ }
+
+ /* Tidy up */
+ pfree(base_attno);
+
+ return parsetree;
+ }
+
+
+ /*
* RewriteQuery -
* rewrites the query and apply the rules again on the queries rewritten
*
*************** RewriteQuery(Query *parsetree, List *rew
*** 1842,1847 ****
--- 2226,2232 ----
bool instead = false;
bool returning = false;
Query *qual_product = NULL;
+ List *product_queries = NIL;
List *rewritten = NIL;
ListCell *lc1;
*************** RewriteQuery(Query *parsetree, List *rew
*** 1993,2001 ****
result_relation, parsetree);
if (locks != NIL)
- {
- List *product_queries;
-
product_queries = fireRules(parsetree,
result_relation,
event,
--- 2378,2383 ----
*************** RewriteQuery(Query *parsetree, List *rew
*** 2004,2009 ****
--- 2386,2416 ----
&returning,
&qual_product);
+ /*
+ * If there are no unqualified INSTEAD rules, and the target relation
+ * is a view without any INSTEAD OF triggers, then we have a problem
+ * unless it can be automatically updated.
+ *
+ * If we can automatically update the view, then we do so here and add
+ * the resulting query to the product queries list, so that it gets
+ * recursively rewritten if necessary. We mark this as an unqualified
+ * INSTEAD to prevent the original query from being executed. Note
+ * also that if this succeeds it will have rewritten any returning
+ * list too.
+ */
+ if (!instead && rt_entry_relation->rd_rel->relkind == RELKIND_VIEW)
+ {
+ Query *query = qual_product ? qual_product : parsetree;
+ Query *newquery = rewriteTargetView(query, rt_entry_relation);
+
+ if (newquery != NULL)
+ {
+ product_queries = lappend(product_queries, newquery);
+ instead = true;
+ returning = true;
+ }
+ }
+
/*
* If we got any product queries, recursively rewrite them --- but
* first check for recursion!
*************** RewriteQuery(Query *parsetree, List *rew
*** 2040,2046 ****
rewrite_events = list_delete_first(rewrite_events);
}
- }
/*
* If there is an INSTEAD, and the original query has a RETURNING, we
--- 2447,2452 ----
diff --git a/src/backend/rewrite/rewriteManip.c b/src/backend/rewrite/rewriteManip.c
new file mode 100644
index 9c778ef..0ed5666
*** a/src/backend/rewrite/rewriteManip.c
--- b/src/backend/rewrite/rewriteManip.c
***************
*** 13,18 ****
--- 13,19 ----
*/
#include "postgres.h"
+ #include "access/sysattr.h"
#include "catalog/pg_type.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
*************** ResolveNew(Node *node, int target_varno,
*** 1451,1453 ****
--- 1452,1599 ----
(void *) &context,
outer_hasSubLinks);
}
+
+
+ /*
+ * Adjust the columns in a set needing particular privileges, using the
+ * specified attribute number mappings. This is used for simply updatable
+ * views to map permissions checks on the view columns onto the matching
+ * columns in the underlying base relation.
+ */
+ Bitmapset *
+ adjust_col_privs(Bitmapset *cols, int natt, AttrNumber *new_attno)
+ {
+ Bitmapset *result = NULL;
+ Bitmapset *tmpcols;
+ AttrNumber col;
+
+ tmpcols = bms_copy(cols);
+ while ((col = bms_first_member(tmpcols)) >= 0)
+ {
+ AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
+
+ if (attno <= 0)
+ result = bms_add_member(result, col); /* system column */
+ else if (attno <= natt)
+ result = bms_add_member(result,
+ new_attno[attno - 1] - FirstLowInvalidHeapAttributeNumber);
+ }
+ bms_free(tmpcols);
+
+ return result;
+ }
+
+
+ /*
+ * ChangeVarAttnos - Adjust the attribute numbers in Var nodes
+ *
+ * Find all Var nodes in the given tree belonging to a specific relation
+ * (identified by sublevels_up and rt_index), and change their varattno
+ * fields using the new attribute numbers specified. This is used for simply
+ * updatable views to map references to view columns onto references to
+ * columns in the underlying base relation.
+ *
+ * Any whole-row references to the view are mapped onto ROW expressions
+ * constructed from the view's targetlist.
+ */
+
+ typedef struct
+ {
+ int rt_index;
+ int natt;
+ AttrNumber *new_attno;
+ List *view_tlist;
+ List *view_colnames;
+ int sublevels_up;
+ } ChangeVarAttnos_context;
+
+ static Node *
+ ChangeVarAttnos_mutator(Node *node, ChangeVarAttnos_context *context)
+ {
+ if (node == NULL)
+ return NULL;
+ if (IsA(node, Var))
+ {
+ Var *var = (Var *) node;
+
+ if (var->varlevelsup == context->sublevels_up &&
+ var->varno == context->rt_index)
+ {
+ if (var->varattno > 0)
+ {
+ /*
+ * A regular column in the view is mapped to the corresponding
+ * column in the base relation
+ */
+ Var *newvar;
+
+ if (var->varattno > context->natt)
+ elog(ERROR, "varattno %d too large for view",
+ var->varattno);
+
+ newvar = (Var *) copyObject(var);
+ newvar->varattno = context->new_attno[var->varattno - 1];
+ return (Node *) newvar;
+ }
+ else if (var->varattno == 0)
+ {
+ /*
+ * For a wholerow variable in the view, we construct a RowExpr
+ * from the view's targetlist
+ */
+ RowExpr *row = makeNode(RowExpr);
+ ListCell *cell;
+
+ row->args = NIL;
+ foreach(cell, context->view_tlist)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(cell);
+ Var *view_var = (Var *) copyObject(tle->expr);
+
+ view_var->varno = var->varno;
+ row->args = lappend(row->args, view_var);
+ }
+
+ row->row_typeid = var->vartype;
+ row->row_format = COERCE_IMPLICIT_CAST;
+ row->colnames = copyObject(context->view_colnames);
+ row->location = -1;
+ return (Node *) row;
+ }
+ }
+ }
+ if (IsA(node, Query))
+ {
+ /* Recurse into subselects */
+ Query *result;
+
+ context->sublevels_up++;
+ result = query_tree_mutator((Query *) node, ChangeVarAttnos_mutator,
+ (void *) context, 0);
+ context->sublevels_up--;
+ return (Node *) result;
+ }
+ return expression_tree_mutator(node, ChangeVarAttnos_mutator,
+ (void *) context);
+ }
+
+ Node *
+ ChangeVarAttnos(Node *node, int rt_index, int natt, AttrNumber *new_attno,
+ List *view_tlist, List *view_colnames, int sublevels_up)
+ {
+ ChangeVarAttnos_context context;
+
+ context.rt_index = rt_index;
+ context.natt = natt;
+ context.new_attno = new_attno;
+ context.view_tlist = view_tlist;
+ context.view_colnames = view_colnames;
+ context.sublevels_up = sublevels_up;
+
+ /*
+ * Must be prepared to start with a Query or a bare expression tree; if
+ * it's a Query, we don't want to increment sublevels_up.
+ */
+ return query_or_expression_tree_mutator(node, ChangeVarAttnos_mutator,
+ (void *) &context, 0);
+ }
diff --git a/src/include/rewrite/rewriteManip.h b/src/include/rewrite/rewriteManip.h
new file mode 100644
index 6f57b37..560305e
*** a/src/include/rewrite/rewriteManip.h
--- b/src/include/rewrite/rewriteManip.h
*************** extern Node *ResolveNew(Node *node, int
*** 75,78 ****
--- 75,85 ----
List *targetlist, int event, int update_varno,
bool *outer_hasSubLinks);
+ extern Bitmapset *adjust_col_privs(Bitmapset *cols, int natt,
+ AttrNumber *new_attno);
+
+ extern Node *ChangeVarAttnos(Node *node, int rt_index,
+ int natt, AttrNumber *new_attno,
+ List *view_tlist, List *view_colnames, int sublevels_up);
+
#endif /* REWRITEMANIP_H */
My thoughts on this is that it would be a very valuable feature to have, and
would make Postgres views behave more like they always were intended to behave,
which is indistinguishible to users from tables in behavior where all possible,
and that the reverse mapping would be automatic with the DBMS being given only
the view-defining SELECT, where possible. -- Darren Duncan
Dean Rasheed wrote:
I've been playing around with the idea of supporting automatically
updatable views, and I have a working proof of concept. I've taken a
different approach than the previous attempts to implement this
feature (e.g., http://archives.postgresql.org/pgsql-hackers/2009-01/msg01746.php),
instead doing all the work in the rewriter, substituting the view for
its base relation rather than attempting to auto-generate any rules or
triggers.Basically what it does is this: in the first stage of query rewriting,
just after any non-SELECT rules are applied, the new code kicks in -
if the target relation is a view, and there were no unqualified
INSTEAD rules, and there are no INSTEAD OF triggers, it tests if the
view is simply updatable. If so, the target view is replaced by its
base relation and columns are re-mapped. Then the remainder of the
rewriting process continues, recursively handling any further
non-SELECT rules or additional simply updatable views. This handles
the case of views on top of views, with or without rules and/or
triggers.
<snip>
Show quoted text
Obviously there's still more work to do but the early signs seem to be
encouraging.Thoughts?
On Sun, Jul 1, 2012 at 6:35 PM, Dean Rasheed <dean.a.rasheed@gmail.com> wrote:
I've been playing around with the idea of supporting automatically
updatable views, and I have a working proof of concept. I've taken a
different approach than the previous attempts to implement this
feature (e.g., http://archives.postgresql.org/pgsql-hackers/2009-01/msg01746.php),
instead doing all the work in the rewriter, substituting the view for
its base relation rather than attempting to auto-generate any rules or
triggers.Basically what it does is this: in the first stage of query rewriting,
just after any non-SELECT rules are applied, the new code kicks in -
if the target relation is a view, and there were no unqualified
INSTEAD rules, and there are no INSTEAD OF triggers, it tests if the
view is simply updatable. If so, the target view is replaced by its
base relation and columns are re-mapped. Then the remainder of the
rewriting process continues, recursively handling any further
non-SELECT rules or additional simply updatable views. This handles
the case of views on top of views, with or without rules and/or
triggers.
Regrettably, I don't have time to look at this in detail right now,
but please add it to the next CommitFest so it gets looked at. It
sounds like it could be very cool.
The consensus last time seemed to be that backwards compatibility
should be offered through a new GUC variable to allow this feature to
be disabled globally, which I've not implemented yet.
I think the backward-compatibility concerns with this approach would
be much less than with the previously-proposed approach, so I'm not
100% sure we need a backward compatibility knob. If we're going to
have one, a reloption would probably be a better fit than a GUC, now
that views support reloptions.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
Robert Haas <robertmhaas@gmail.com> writes:
On Sun, Jul 1, 2012 at 6:35 PM, Dean Rasheed <dean.a.rasheed@gmail.com> wrote:
Basically what it does is this: in the first stage of query rewriting,
just after any non-SELECT rules are applied, the new code kicks in -
if the target relation is a view, and there were no unqualified
INSTEAD rules, and there are no INSTEAD OF triggers, it tests if ...
The consensus last time seemed to be that backwards compatibility
should be offered through a new GUC variable to allow this feature to
be disabled globally, which I've not implemented yet.
I think the backward-compatibility concerns with this approach would
be much less than with the previously-proposed approach, so I'm not
100% sure we need a backward compatibility knob.
If the above description is correct, the behavior is changed only in
cases that previously threw errors, so I tend to agree that no
"backwards compatibility knob" is needed.
regards, tom lane
On 2 July 2012 21:34, Tom Lane <tgl@sss.pgh.pa.us> wrote:
Robert Haas <robertmhaas@gmail.com> writes:
On Sun, Jul 1, 2012 at 6:35 PM, Dean Rasheed <dean.a.rasheed@gmail.com> wrote:
Basically what it does is this: in the first stage of query rewriting,
just after any non-SELECT rules are applied, the new code kicks in -
if the target relation is a view, and there were no unqualified
INSTEAD rules, and there are no INSTEAD OF triggers, it tests if ...The consensus last time seemed to be that backwards compatibility
should be offered through a new GUC variable to allow this feature to
be disabled globally, which I've not implemented yet.I think the backward-compatibility concerns with this approach would
be much less than with the previously-proposed approach, so I'm not
100% sure we need a backward compatibility knob.If the above description is correct, the behavior is changed only in
cases that previously threw errors, so I tend to agree that no
"backwards compatibility knob" is needed.
Yeah, I think you're right - the default ACLs will typically give
sensible behaviour. So unless users have been cavalier with the use of
GRANT ALL, their existing views are only going to start becoming
updatable to the owners of the views (and only then if the view owner
already has write permission on the underlying table).
Regards,
Dean
I've been looking at this further and I have made some improvements,
but also found a problem.
On 1 July 2012 23:35, Dean Rasheed <dean.a.rasheed@gmail.com> wrote:
I'm also aware that my new function ChangeVarAttnos() is almost
identical to the function map_variable_attnos() that Tom recently
added, but I couldn't immediately see a neat way to merge the two. My
function handles whole-row references to the view by mapping them to a
generic RowExpr based on the view definition. I don't think a
ConvertRowtypeExpr can be used in this case, because the column names
don't necessarily match.
I improved on this by reusing the existing function ResolveNew() which
reduced the size of the patch.
The problem, however, is that the original patch is not safe for
UPDATE/DELETE on security barrier views, because it mixes the user's
quals with those on the view. For example a query like
UPDATE some_view SET col=... WHERE user_quals
will get rewritten as
UPDATE base_table SET base_col=... WHERE user_quals AND view_quals
which potentially leaks data hidden by the view's quals.
So I think that it needs to use a subquery to isolate user_quals from
view_quals. The least invasive way I could see to do that was to
record the view quals in a new security barrier quals field on the
RTE, and then turn that into a subquery at the end of the rewriting
process. This approach avoids the extensive changes to the rewriter
that I think would otherwise be needed if the RTE were changed into a
subquery near the start of the rewriting process. It is also possible
that this code might be reusable in the row-level security patch by
Kohei KaiGai to handle modifications to tables with RLS quals,
although I haven't looked too closely at that patch yet.
Another complication is that the executor appears to have no
rechecking code for subqueries in nodeSubqueryscan.c, so it looks like
I need to lock rows coming from the base relation in the subquery. So
for example, given the following setup
CREATE TABLE private_t(a int, b text);
INSERT INTO private_t VALUES (1, 'Private');
INSERT INTO private_t
SELECT i, 'Public '||i FROM generate_series(2,10000) g(i);
CREATE INDEX private_t_a_idx ON private_t(a);
ANALYSE private_t;
CREATE VIEW public_v WITH (security_barrier=true)
AS SELECT b AS bb, a AS aa FROM private_t WHERE a != 1;
CREATE OR REPLACE FUNCTION snoop(b text)
RETURNS boolean AS
$$
BEGIN
RAISE NOTICE 'b=%', b;
RETURN false;
END;
$$
LANGUAGE plpgsql STRICT IMMUTABLE COST 0.000001;
an update on the view will get rewritten as an update on the base
table from a subquery scan as follows:
EXPLAIN (costs off) UPDATE public_v SET aa=aa WHERE snoop(bb) AND aa<=2;
Update on private_t
-> Subquery Scan on private_t
Filter: snoop(private_t.b)
-> LockRows
-> Index Scan using private_t_a_idx on private_t
Index Cond: (a <= 2)
Filter: (a <> 1)
The LockRows node appears to give the expected behaviour for
concurrently modified rows, but I'm not really sure if that's the
right approach.
None of this new code kicks in for non-security barrier views, so the
kinds of plans I posted upthread remain unchanged in that case. But
now a significant fraction of the patch is code added to handle
security barrier views. Of course we could simply say that such views
aren't updatable, but that seems like an annoying limitation if there
is a feasible way round it.
Thoughts?
Regards,
Dean
Attachments:
auto-update-views.patchapplication/octet-stream; name=auto-update-views.patchDownload
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
new file mode 100644
index 71d5323..603ed16
*** a/src/backend/nodes/copyfuncs.c
--- b/src/backend/nodes/copyfuncs.c
*************** _copyRangeTblEntry(const RangeTblEntry *
*** 1980,1985 ****
--- 1980,1986 ----
COPY_SCALAR_FIELD(checkAsUser);
COPY_BITMAPSET_FIELD(selectedCols);
COPY_BITMAPSET_FIELD(modifiedCols);
+ COPY_NODE_FIELD(securityQuals);
return newnode;
}
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
new file mode 100644
index d690ca7..3d1ba98
*** a/src/backend/nodes/equalfuncs.c
--- b/src/backend/nodes/equalfuncs.c
*************** _equalRangeTblEntry(const RangeTblEntry
*** 2296,2301 ****
--- 2296,2302 ----
COMPARE_SCALAR_FIELD(checkAsUser);
COMPARE_BITMAPSET_FIELD(selectedCols);
COMPARE_BITMAPSET_FIELD(modifiedCols);
+ COMPARE_NODE_FIELD(securityQuals);
return true;
}
diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c
new file mode 100644
index b130902..4d461b2
*** a/src/backend/nodes/nodeFuncs.c
--- b/src/backend/nodes/nodeFuncs.c
*************** range_table_walker(List *rtable,
*** 1942,1947 ****
--- 1942,1950 ----
return true;
break;
}
+
+ if (walker(rte->securityQuals, context))
+ return true;
}
return false;
}
*************** range_table_mutator(List *rtable,
*** 2653,2658 ****
--- 2656,2662 ----
MUTATE(newrte->values_lists, rte->values_lists, List *);
break;
}
+ MUTATE(newrte->securityQuals, rte->securityQuals, List *);
newrt = lappend(newrt, newrte);
}
return newrt;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
new file mode 100644
index 9dee041..b98c2c9
*** a/src/backend/nodes/outfuncs.c
--- b/src/backend/nodes/outfuncs.c
*************** _outRangeTblEntry(StringInfo str, const
*** 2369,2374 ****
--- 2369,2375 ----
WRITE_OID_FIELD(checkAsUser);
WRITE_BITMAPSET_FIELD(selectedCols);
WRITE_BITMAPSET_FIELD(modifiedCols);
+ WRITE_NODE_FIELD(securityQuals);
}
static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
new file mode 100644
index 1eb7582..bb4009c
*** a/src/backend/nodes/readfuncs.c
--- b/src/backend/nodes/readfuncs.c
*************** _readRangeTblEntry(void)
*** 1229,1234 ****
--- 1229,1235 ----
READ_OID_FIELD(checkAsUser);
READ_BITMAPSET_FIELD(selectedCols);
READ_BITMAPSET_FIELD(modifiedCols);
+ READ_NODE_FIELD(securityQuals);
READ_DONE();
}
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
new file mode 100644
index ccd69fc..3cb0a65
*** a/src/backend/optimizer/plan/setrefs.c
--- b/src/backend/optimizer/plan/setrefs.c
*************** set_plan_references(PlannerInfo *root, P
*** 224,229 ****
--- 224,230 ----
newrte->ctecoltypes = NIL;
newrte->ctecoltypmods = NIL;
newrte->ctecolcollations = NIL;
+ newrte->securityQuals = NIL;
glob->finalrtable = lappend(glob->finalrtable, newrte);
diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c
new file mode 100644
index 8f75948..b087715
*** a/src/backend/rewrite/rewriteHandler.c
--- b/src/backend/rewrite/rewriteHandler.c
***************
*** 14,19 ****
--- 14,20 ----
#include "postgres.h"
#include "access/sysattr.h"
+ #include "catalog/heap.h"
#include "catalog/pg_type.h"
#include "commands/trigger.h"
#include "nodes/makefuncs.h"
*************** static List *matchLocks(CmdType event, R
*** 60,65 ****
--- 61,67 ----
int varno, Query *parsetree);
static Query *fireRIRrules(Query *parsetree, List *activeRIRs,
bool forUpdatePushedDown);
+ static Query *rewriteSecurityQuals(Query *parsetree);
/*
*************** fireRules(Query *parsetree,
*** 1829,1834 ****
--- 1831,2652 ----
/*
+ * get_view_query - get the query from a view's _RETURN rule.
+ */
+ static Query *
+ get_view_query(Relation view)
+ {
+ int i;
+
+ Assert(view->rd_rel->relkind == RELKIND_VIEW);
+
+ for (i = 0; i < view->rd_rules->numLocks; i++)
+ {
+ RewriteRule *rule = view->rd_rules->rules[i];
+
+ if (rule->event == CMD_SELECT)
+ {
+ /* A _RETURN rule should have only one action */
+ if (list_length(rule->actions) != 1)
+ elog(ERROR, "invalid _RETURN rule action specification");
+
+ return linitial(rule->actions);
+ }
+ }
+
+ elog(ERROR, "failed to find _RETURN rule for view");
+ return NULL; /* keep compiler quiet */
+ }
+
+
+ /*
+ * test_auto_update_view -
+ * Test if the specified view can be automatically updated. This will
+ * either return NULL (if the view can be updated) or the reason that it
+ * cannot.
+ *
+ * Note that the checks performed here are local to this view. It does not
+ * check that the view's underlying base relation is updatable.
+ */
+ static const char *
+ test_auto_update_view(Relation view)
+ {
+ Query *viewquery;
+ RangeTblRef *rtr;
+ RangeTblEntry *base_rte;
+ Bitmapset *bms;
+ ListCell *cell;
+
+ /*
+ * Check if the view is simply updatable. According to SQL-92 this means:
+ * - No DISTINCT clauses.
+ * - Every TLE is a column reference, and appears at most once.
+ * - Refers to a single base relation.
+ * - No GROUP BY or HAVING clauses.
+ * - No set operations (UNION, INTERSECT or EXCEPT).
+ * - No sub-queries in the WHERE clause.
+ *
+ * We relax this last restriction since it would actually be more work to
+ * enforce it than to simply allow it. In addition (for now) we impose
+ * the following additional constraints, based on features that are not
+ * part of SQL-92:
+ * - No DISTINCT ON clauses.
+ * - No window functions.
+ * - No CTEs (WITH or WITH RECURSIVE).
+ * - No OFFSET or LIMIT clauses.
+ * - No system columns.
+ *
+ * Note that we do these checks without recursively expanding the view.
+ * The base relation may be a view, and it might have INSTEAD OF triggers
+ * or rules, in which case it need not satisfy these constraints.
+ */
+ viewquery = get_view_query(view);
+
+ if (viewquery->distinctClause != NIL ||
+ viewquery->hasDistinctOn)
+ return "Views containing DISTINCT are not updatable";
+
+ if (viewquery->groupClause != NIL)
+ return "Views containing GROUP BY are not updatable";
+
+ if (viewquery->havingQual != NULL)
+ return "Views containing HAVING are not updatable";
+
+ if (viewquery->hasAggs)
+ return "Views containing aggregates are not updatable";
+
+ if (viewquery->hasWindowFuncs)
+ return "Views containing window functions are not updatable";
+
+ if (viewquery->setOperations != NULL)
+ return "Views containing UNION, INTERSECT or EXCEPT are not updatable";
+
+ if (viewquery->hasRecursive ||
+ viewquery->cteList != NIL)
+ return "Views containing WITH [RECURSIVE] are not updatable";
+
+ if (viewquery->limitOffset != NULL ||
+ viewquery->limitCount != NULL)
+ return "Views containing OFFSET or LIMIT are not updatable";
+
+ /*
+ * The view query should select from a single base relation, which must be
+ * a table or another view.
+ */
+ if (list_length(viewquery->jointree->fromlist) == 0)
+ return "Views with no base relations are not updatable";
+
+ if (list_length(viewquery->jointree->fromlist) > 1)
+ return "Views with multiple base relations are not updatable";
+
+ rtr = (RangeTblRef *) linitial(viewquery->jointree->fromlist);
+ if (!IsA(rtr, RangeTblRef))
+ return "Views that are not based on tables or views are not updatable";
+
+ base_rte = rt_fetch(rtr->rtindex, viewquery->rtable);
+ if (base_rte->rtekind != RTE_RELATION)
+ return "Views that are not based on tables or views are not updatable";
+
+ /*
+ * The view's targetlist entries should all be Vars referring to columns
+ * in the base relation, and no 2 should refer to the same base column.
+ *
+ * Note that we don't bother freeing resources allocated here if we
+ * return early, since that is an error condition.
+ */
+ bms = NULL;
+ foreach(cell, viewquery->targetList)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(cell);
+ Var *var = (Var *) tle->expr;
+
+ if (!IsA(var, Var))
+ return "Views with columns that are not simple references to columns in the base relation are not updatable";
+
+ if (var->varattno < 0)
+ return "Views that refer to system columns are not updatable";
+
+ if (var->varattno == 0)
+ return "Views that refer to whole rows from the base relation are not updatable";
+
+ if (bms_is_member(var->varattno, bms))
+ return "Views that refer to the same column more than once are not updatable";
+
+ bms = bms_add_member(bms, var->varattno);
+ }
+
+ bms_free(bms);
+ return NULL; /* the view is simply updatable */
+ }
+
+
+ /*
+ * rewriteTargetView -
+ * attempt to rewrite a query where the target relation is a view, so that
+ * the view's base relation becomes the target relation.
+ *
+ * This only handles the case where there are no INSTEAD OF triggers to update
+ * the view. If there are INSTEAD OF triggers, this function will return NULL
+ * and the view modification is handled later by fireRIRrules.
+ *
+ * Note that the base relation here may itself be a view, which may or may not
+ * have INSTEAD OF triggers or rules to handle the update. That is handled by
+ * the recursion in RewriteQuery.
+ */
+ static Query *
+ rewriteTargetView(Query *parsetree, Relation view)
+ {
+ TriggerDesc *trigDesc = view->trigdesc;
+ const char *auto_update_detail;
+ Query *viewquery;
+ RangeTblRef *rtr;
+ int base_rt_index;
+ RangeTblEntry *base_rte;
+ List *view_targetlist;
+ bool same_cols;
+ ListCell *cell;
+ int rt_index;
+ List *new_rtable;
+ RangeTblEntry *view_rte;
+
+ /* The view must have INSTEAD OF triggers or be simply updatable */
+ switch (parsetree->commandType)
+ {
+ case CMD_INSERT:
+ if (trigDesc && trigDesc->trig_insert_instead_row)
+ return NULL; /* INSTEAD OF INSERT trigger will be used */
+
+ auto_update_detail = test_auto_update_view(view);
+ if (auto_update_detail)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot insert into view \"%s\"",
+ RelationGetRelationName(view)),
+ errdetail("%s.", auto_update_detail),
+ errhint("You need an unconditional ON INSERT DO INSTEAD rule or an INSTEAD OF INSERT trigger.")));
+ break;
+ case CMD_UPDATE:
+ if (trigDesc && trigDesc->trig_update_instead_row)
+ return NULL; /* INSTEAD OF UPDATE trigger will be used */
+
+ auto_update_detail = test_auto_update_view(view);
+ if (auto_update_detail)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot update view \"%s\"",
+ RelationGetRelationName(view)),
+ errdetail("%s.", auto_update_detail),
+ errhint("You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger.")));
+ break;
+ case CMD_DELETE:
+ if (trigDesc && trigDesc->trig_delete_instead_row)
+ return NULL; /* INSTEAD OF DELETE trigger will be used */
+
+ auto_update_detail = test_auto_update_view(view);
+ if (auto_update_detail)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot delete from view \"%s\"",
+ RelationGetRelationName(view)),
+ errdetail("%s.", auto_update_detail),
+ errhint("You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger.")));
+ break;
+ default:
+ elog(ERROR, "unrecognized CmdType: %d", (int) parsetree->commandType);
+ break;
+ }
+
+ /*
+ * The view is simply updatable, which means that it should have a single
+ * base relation.
+ */
+ viewquery = get_view_query(view);
+ Assert(list_length(viewquery->jointree->fromlist) == 1);
+
+ rtr = (RangeTblRef *) linitial(viewquery->jointree->fromlist);
+ Assert(IsA(rtr, RangeTblRef));
+
+ base_rt_index = rtr->rtindex;
+ base_rte = rt_fetch(base_rt_index, viewquery->rtable);
+ Assert(base_rte->rtekind == RTE_RELATION);
+
+ /*
+ * Make a copy of the view's targetlist, adjusting its Vars to allow it
+ * to be plugged into the main query.
+ */
+ view_targetlist = copyObject(viewquery->targetList);
+
+ ChangeVarNodes((Node *) view_targetlist,
+ base_rt_index, parsetree->resultRelation, 0);
+
+ /**
+ * Test for the simple case where the view has all the same columns as the
+ * base relation, in the same order.
+ */
+ same_cols = true;
+ foreach(cell, view_targetlist)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(cell);
+ Var *var = (Var *) tle->expr;
+
+ if (var->varattno != tle->resno)
+ {
+ /* view columns are in a different order */
+ same_cols = false;
+ break;
+ }
+ }
+
+ /*
+ * Replace the old view target RTE with a new RTE referring to the base
+ * relation. Since we are not changing the result relation index, there
+ * is no need to update any varnos in the original query.
+ *
+ * Note that we need to keep the original view RTE, even though it will
+ * not be used directly in the query, so that the correct permissions
+ * checks are still done against it. This is appended to the end of the
+ * rangetable.
+ */
+ rt_index = 1;
+ new_rtable = NIL;
+ foreach(cell, parsetree->rtable)
+ {
+ RangeTblEntry *rte = (RangeTblEntry *) lfirst(cell);
+
+ if (rt_index == parsetree->resultRelation)
+ {
+ /*
+ * Create the new target RTE using the base relation. Copy any
+ * permissions checks from the original view target RTE, adjusting
+ * the column numbers if necessary, and performing the checks as
+ * the view's owner.
+ */
+ RangeTblEntry *newrte = (RangeTblEntry *) copyObject(base_rte);
+ newrte->requiredPerms = rte->requiredPerms;
+ newrte->checkAsUser = view->rd_rel->relowner;
+ newrte->selectedCols = bms_copy(rte->selectedCols);
+ newrte->modifiedCols = bms_copy(rte->modifiedCols);
+
+ if (!same_cols)
+ {
+ newrte->selectedCols = adjust_column_set(newrte->selectedCols,
+ view_targetlist);
+ newrte->modifiedCols = adjust_column_set(newrte->modifiedCols,
+ view_targetlist);
+ }
+
+ /*
+ * Move any security barrier quals from the view RTE onto the new
+ * base relation RTE, since the original view RTE will not be
+ * referenced in the final query.
+ */
+ newrte->securityQuals = rte->securityQuals;
+ rte->securityQuals = NIL;
+
+ /*
+ * Keep the original view RTE so that executor also checks that
+ * the current user has the required permissions on the view
+ */
+ view_rte = rte;
+ rte = newrte;
+ }
+ new_rtable = lappend(new_rtable, rte);
+ rt_index++;
+ }
+ parsetree->rtable = lappend(new_rtable, view_rte);
+
+ /*
+ * For UPDATE/DELETE, rewriteTargetListUD will have added a wholerow junk
+ * TLE for the view to the end of the targetlist which we no longer need.
+ * We remove this now to avoid unnecessary work when we process the
+ * targetlist. Note that when we recurse through rewriteQuery a new junk
+ * TLE will be added to allow the executor to find the row in the base
+ * relation.
+ */
+ if (parsetree->commandType != CMD_INSERT)
+ {
+ TargetEntry *tle = (TargetEntry *)
+ lfirst(list_tail(parsetree->targetList));
+
+ Assert(IsA(tle->expr, Var) && ((Var *) tle->expr)->varattno == 0);
+ parsetree->targetList = list_delete_ptr(parsetree->targetList, tle);
+ }
+
+ /*
+ * If the view's columns don't match the base relation's columns, we need
+ * to update the targetlist to point to the columns in the base relation,
+ * and similarly update any Vars in the query that refer to the result
+ * relation.
+ *
+ * Note that this destroys the resno ordering of the targetlist, but that
+ * will be restored when we recurse through rewriteQuery which will invoke
+ * rewriteTargetListIU on this updated targetlist.
+ */
+ if (!same_cols)
+ {
+ /* Update the main query's targetlist */
+ foreach(cell, parsetree->targetList)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(cell);
+ TargetEntry *view_tle;
+
+ /* Ignore system columns */
+ if (tle->resjunk || tle->resno <= 0)
+ continue;
+
+ view_tle = get_tle_by_resno(view_targetlist, tle->resno);
+ if (view_tle == NULL)
+ elog(ERROR, "View column %d not found", tle->resno);
+
+ tle->resno = ((Var *) view_tle->expr)->varattno;
+ }
+
+ /* Update any Vars referring to the result relation. */
+ parsetree =
+ (Query *) ResolveNew((Node *) parsetree,
+ parsetree->resultRelation,
+ 0,
+ view_rte,
+ view_targetlist,
+ parsetree->commandType,
+ parsetree->resultRelation,
+ &parsetree->hasSubLinks);
+ }
+
+ /*
+ * For UPDATE/DELETE, deal with any quals from the view's jointree. We
+ * know that there is just one relation in the view, so any references to
+ * that relation in the view's quals need to be updated to refer to the
+ * corresponding relation in the main query (the result relation).
+ *
+ * If the view is not a security barrier then we simply add the view quals
+ * to the main query. However, if the view is a security barrier then its
+ * quals must take precedence over any of the user's quals. Here we just
+ * make a note of such quals on the relevant RTE so that we can deal with
+ * them later in rewriteSecurityQuals, which will turn the RTE into a
+ * subquery.
+ *
+ * For INSERT the view's quals are not needed for now. When we implement
+ * WITH CHECK OPTION, this might be a good time to collect them.
+ */
+ if (parsetree->commandType != CMD_INSERT &&
+ viewquery->jointree->quals != NULL)
+ {
+ Node *viewqual = copyObject(viewquery->jointree->quals);
+
+ ChangeVarNodes(viewqual, base_rt_index, parsetree->resultRelation, 0);
+
+ if (RelationIsSecurityView(view))
+ {
+ base_rte = rt_fetch(parsetree->resultRelation, parsetree->rtable);
+ base_rte->securityQuals = lcons(viewqual,
+ base_rte->securityQuals);
+ }
+ else
+ AddQual(parsetree, (Node *) viewqual);
+ }
+
+ return parsetree;
+ }
+
+
+ /*
+ * rewriteSecurityQual -
+ * rewrite the specified security barrier qual on a query RTE, turning the
+ * RTE into a subquery.
+ */
+ static Query *
+ rewriteSecurityQual(Query *parsetree, int rt_index,
+ bool is_result_relation, Node *qual)
+ {
+ RangeTblEntry *rte;
+ List *targetlist = NIL;
+ List *inverse_targetlist = NIL;
+ List *colnames = NIL;
+ Relation relation;
+ AttrNumber attno;
+ Var *var;
+ TargetEntry *tle;
+ Query *subquery;
+ RangeTblEntry *subrte;
+ RangeTblRef *subrtr;
+ ListCell *cell;
+
+ rte = rt_fetch(rt_index, parsetree->rtable);
+
+ /*
+ * There are 2 possible cases:
+ *
+ * 1). A relation RTE, which we turn into a subquery RTE containing all
+ * referenced columns.
+ *
+ * 2). A subquery RTE (either from a prior call to this function or from
+ * an expanded view). In this case we build a new subquery on top of it
+ * to isolate this security barrier qual from any other quals.
+ */
+ switch (rte->rtekind)
+ {
+ case RTE_RELATION:
+ /*
+ * Build the subquery targetlist from the columns used from the
+ * underlying table and an inverse targetlist to map varattnos in
+ * the main query onto the new subquery columns.
+ */
+ relation = heap_open(rte->relid, NoLock);
+
+ for (attno = FirstLowInvalidHeapAttributeNumber;
+ attno <= relation->rd_att->natts; attno++)
+ {
+ Form_pg_attribute att_tup;
+ char *attname;
+
+ /* Ignore columns that aren't used */
+ if (!attribute_used((Node *) parsetree, rt_index, attno, 0))
+ continue;
+
+ if (attno == InvalidAttrNumber)
+ {
+ /* whole-row attribute */
+ var = makeWholeRowVar(rte, 1, 0, false);
+ attname = "wholerow";
+ }
+ else
+ {
+ /* regular table column or system attribute */
+ if (attno >= 1)
+ att_tup = relation->rd_att->attrs[attno - 1];
+ else
+ att_tup = SystemAttributeDefinition(attno,
+ relation->rd_rel->relhasoids);
+
+ var = makeVar(1,
+ attno,
+ att_tup->atttypid,
+ att_tup->atttypmod,
+ att_tup->attcollation,
+ 0);
+ attname = NameStr(att_tup->attname);
+ }
+
+ /* target entry for new subquery targetlist */
+ tle = makeTargetEntry((Expr *) var,
+ list_length(targetlist) + 1,
+ pstrdup(attname),
+ false);
+ targetlist = lappend(targetlist, tle);
+
+ /* inverse target entry for rewriting the main query */
+ var = copyObject(var);
+ var->varattno = list_length(targetlist);
+ tle = makeTargetEntry((Expr *) var,
+ attno,
+ pstrdup(attname),
+ false);
+ inverse_targetlist = lappend(inverse_targetlist, tle);
+
+ colnames = lappend(colnames, makeString(pstrdup(attname)));
+ }
+ heap_close(relation, NoLock);
+
+ /*
+ * Turn the main relation RTE into a security barrier subquery
+ * RTE, moving all permissions checks and rowMarks down into the
+ * subquery.
+ */
+ subquery = makeNode(Query);
+ subquery->commandType = CMD_SELECT;
+ subquery->querySource = QSRC_INSTEAD_RULE;
+ subquery->targetList = targetlist;
+
+ subrte = copyObject(rte);
+ subrte->inFromCl = true;
+ subrte->securityQuals = NIL;
+ subquery->rtable = list_make1(subrte);
+
+ subrtr = makeNode(RangeTblRef);
+ subrtr->rtindex = 1;
+ subquery->jointree = makeFromExpr(list_make1(subrtr), qual);
+ subquery->hasSubLinks = checkExprHasSubLink(qual);
+
+ foreach(cell, parsetree->rowMarks)
+ {
+ RowMarkClause *rc = (RowMarkClause *) lfirst(cell);
+ if (rc->rti == rt_index)
+ {
+ parsetree->rowMarks = list_delete(parsetree->rowMarks,
+ rc);
+ rc->rti = 1;
+ subquery->rowMarks = list_make1(rc);
+ subquery->hasForUpdate = rc->forUpdate;
+ break;
+ }
+ }
+
+ /*
+ * If this RTE was the result relation, then we need to lock the
+ * rows coming from it. Note that by the time we get here, this
+ * RTE will no longer be the result relation, so we have to rely
+ * on the flag passed in.
+ */
+ if (is_result_relation)
+ applyLockingClause(subquery, 1, true, false, true);
+
+ rte->rtekind = RTE_SUBQUERY;
+ rte->relid = InvalidOid;
+ rte->subquery = subquery;
+ rte->security_barrier = true;
+ rte->eref = makeAlias(rte->eref->aliasname, colnames);
+ rte->inh = false; /* must not be set for a subquery */
+
+ /* the permissions checks have now been move down */
+ rte->requiredPerms = 0;
+ rte->checkAsUser = InvalidOid;
+ rte->selectedCols = NULL;
+ rte->modifiedCols = NULL;
+
+ /*
+ * Update any varattnos in the main query that refer to this RTE,
+ * using the entries from the inverse targetlist.
+ */
+ return (Query *) ResolveNew((Node *) parsetree,
+ rt_index,
+ 0,
+ rte,
+ inverse_targetlist,
+ parsetree->commandType,
+ rt_index,
+ &parsetree->hasSubLinks);
+
+ case RTE_SUBQUERY:
+ /*
+ * Build a new subquery that includes all the same columns as the
+ * original subquery.
+ */
+ foreach(cell, rte->subquery->targetList)
+ {
+ tle = (TargetEntry *) lfirst(cell);
+ var = makeVarFromTargetEntry(1, tle);
+
+ tle = makeTargetEntry((Expr *) var,
+ list_length(targetlist) + 1,
+ pstrdup(tle->resname),
+ tle->resjunk);
+ targetlist = lappend(targetlist, tle);
+ }
+
+ subquery = makeNode(Query);
+ subquery->commandType = CMD_SELECT;
+ subquery->querySource = QSRC_INSTEAD_RULE;
+ subquery->targetList = targetlist;
+
+ subrte = makeNode(RangeTblEntry);
+ subrte->rtekind = RTE_SUBQUERY;
+ subrte->subquery = rte->subquery;
+ subrte->security_barrier = rte->security_barrier;
+ subrte->eref = copyObject(rte->eref);
+ subrte->inFromCl = true;
+ subquery->rtable = list_make1(subrte);
+
+ subrtr = makeNode(RangeTblRef);
+ subrtr->rtindex = 1;
+ subquery->jointree = makeFromExpr(list_make1(subrtr), qual);
+ subquery->hasSubLinks = checkExprHasSubLink(qual);
+
+ rte->subquery = subquery;
+ rte->security_barrier = true;
+
+ return parsetree;
+
+ default:
+ elog(ERROR, "invalid range table entry for security barrier qual");
+ }
+
+ return NULL;
+ }
+
+
+ /*
+ * rewriteSecurityQualsOnSubLink -
+ * Apply rewriteSecurityQuals() to each SubLink (subselect in expression)
+ * found in the given tree.
+ *
+ * NOTE: although this has the form of a walker, we cheat and modify the
+ * SubLink nodes in-place. This is safe because we are not descending into
+ * subqueries, so no parts of the tree we are modifying are being traversed.
+ *
+ * Each SubLink subselect is replaced with a possibly-rewritten subquery.
+ */
+ static bool
+ rewriteSecurityQualsOnSubLink(Node *node, void *ctx)
+ {
+ if (node == NULL)
+ return false;
+ if (IsA(node, SubLink))
+ {
+ SubLink *sub = (SubLink *) node;
+
+ sub->subselect = (Node *)
+ rewriteSecurityQuals((Query *) sub->subselect);
+ /* Fall through to process lefthand args of SubLink */
+ }
+
+ /*
+ * Do NOT recurse into Query nodes, because rewriteSecurityQuals already
+ * processed all subqueries in the rtable and cteList.
+ */
+ return expression_tree_walker(node, rewriteSecurityQualsOnSubLink, ctx);
+ }
+
+
+ /*
+ * rewriteSecurityQuals -
+ * rewrites any security barrier quals on RTEs in the query, turning them
+ * into subqueries to allow the planner to enforce them before any user
+ * quals where necessary. Currently such security barrier quals can only
+ * have come from automatically updatable security barrier views.
+ *
+ * We do this at the end of the rewriting process (after any SELECT rules
+ * have been applied) so that the new security barrier subqueries wrap any
+ * remaining views after they are expanded.
+ *
+ * Any given RTE may have multiple security barrier quals in a list, from
+ * which we create a set of nested subqueries to isolate each security barrier
+ * from the others, providing protection against malicious user security
+ * barriers. The first item in the list represents the innermost subquery.
+ */
+ static Query *
+ rewriteSecurityQuals(Query *parsetree)
+ {
+ ListCell *l;
+ int rt_index;
+
+ /*
+ * Process each RTE in the rtable list. Security barrier quals are
+ * initially only added to the result relation, but subsequent rules
+ * may change that, so they may be anywhere.
+ *
+ * Note that this is deliberately not a foreach loop, since the whole
+ * parsetree may be mutated each time through the loop.
+ */
+ rt_index = 0;
+ while (rt_index < list_length(parsetree->rtable))
+ {
+ RangeTblEntry *rte;
+ bool is_result_relation;
+ int qual_idx;
+
+ ++rt_index;
+ rte = rt_fetch(rt_index, parsetree->rtable);
+
+ if (rte->securityQuals == NIL)
+ continue;
+
+ /*
+ * Ignore any RTEs that aren't used in the query (such RTEs may be
+ * present for permissions checks).
+ */
+ if (rt_index != parsetree->resultRelation &&
+ !rangeTableEntry_used((Node *) parsetree, rt_index, 0))
+ continue;
+
+ /*
+ * Recursively process any security barrier quals in subquery RTEs
+ * before processing any at this query level.
+ */
+ if (rte->rtekind == RTE_SUBQUERY)
+ rte->subquery = rewriteSecurityQuals(rte->subquery);
+
+ /*
+ * If this RTE is the target then we need to make a copy of it before
+ * expanding it. The unexpanded copy will become the new target, and
+ * the expanded RTE will be the source of rows to update/delete.
+ */
+ is_result_relation = rt_index == parsetree->resultRelation;
+ if (is_result_relation)
+ {
+ RangeTblEntry *newrte = copyObject(rte);
+ parsetree->rtable = lappend(parsetree->rtable, newrte);
+ parsetree->resultRelation = list_length(parsetree->rtable);
+
+ /*
+ * Wipe out any copied security quals on the new target to prevent
+ * infinite recursion.
+ */
+ newrte->securityQuals = NIL;
+
+ /*
+ * There's no need to do permissions checks twice, so wipe out the
+ * permissions info for the original RTE (we prefer to keep the
+ * bits set on the result RTE).
+ */
+ rte->requiredPerms = 0;
+ rte->checkAsUser = InvalidOid;
+ rte->selectedCols = NULL;
+ rte->modifiedCols = NULL;
+
+ /*
+ * For the most part, Vars referencing the original relation
+ * should remain as as they are, meaning that they implicitly
+ * represent OLD values. But in the RETURNING list if any, we
+ * want such Vars to represent NEW values, so change them to
+ * reference the new RTE.
+ *
+ * Since ChangeVarNodes scribbles on the tree in-place, copy the
+ * RETURNING list first for safety.
+ */
+ parsetree->returningList = copyObject(parsetree->returningList);
+ ChangeVarNodes((Node *) parsetree->returningList, rt_index,
+ parsetree->resultRelation, 0);
+ }
+
+ /*
+ * Process each security qual in turn, starting with the innermost
+ * one and working outwards.
+ *
+ * Note that we can't use a foreach loop here because the whole
+ * parsetree may be mutated each time through the loop. For the same
+ * reason we must re-fetch the RTE each time.
+ */
+ qual_idx = 0;
+ while (qual_idx < list_length(rte->securityQuals))
+ {
+ Node *qual = (Node *) list_nth(rte->securityQuals, qual_idx);
+
+ parsetree = rewriteSecurityQual(parsetree, rt_index,
+ is_result_relation, qual);
+
+ /* re-fetch the RTE in case it has been re-written */
+ rte = rt_fetch(rt_index, parsetree->rtable);
+
+ qual_idx++;
+ }
+ rte->securityQuals = NIL;
+ }
+
+ /* Recurse into subqueries in WITH */
+ foreach(l, parsetree->cteList)
+ {
+ CommonTableExpr *cte = (CommonTableExpr *) lfirst(l);
+
+ cte->ctequery = (Node *)
+ rewriteSecurityQuals((Query *) cte->ctequery);
+ }
+
+ /*
+ * Recurse into sublink subqueries too, without descending into the rtable
+ * or the cteList, which we have already processed.
+ *
+ * XXX: Can such SubLink subselects ever actually have security quals?
+ */
+ if (parsetree->hasSubLinks)
+ query_tree_walker(parsetree, rewriteSecurityQualsOnSubLink, NULL,
+ QTW_IGNORE_RC_SUBQUERIES);
+
+ return parsetree;
+ }
+
+
+ /*
* RewriteQuery -
* rewrites the query and apply the rules again on the queries rewritten
*
*************** RewriteQuery(Query *parsetree, List *rew
*** 1842,1847 ****
--- 2660,2666 ----
bool instead = false;
bool returning = false;
Query *qual_product = NULL;
+ List *product_queries = NIL;
List *rewritten = NIL;
ListCell *lc1;
*************** RewriteQuery(Query *parsetree, List *rew
*** 1993,2001 ****
result_relation, parsetree);
if (locks != NIL)
- {
- List *product_queries;
-
product_queries = fireRules(parsetree,
result_relation,
event,
--- 2812,2817 ----
*************** RewriteQuery(Query *parsetree, List *rew
*** 2004,2009 ****
--- 2820,2850 ----
&returning,
&qual_product);
+ /*
+ * If there are no unqualified INSTEAD rules, and the target relation
+ * is a view without any INSTEAD OF triggers, then we have a problem
+ * unless it can be automatically updated.
+ *
+ * If we can automatically update the view, then we do so here and add
+ * the resulting query to the product queries list, so that it gets
+ * recursively rewritten if necessary. We mark this as an unqualified
+ * INSTEAD to prevent the original query from being executed. Note
+ * also that if this succeeds it will have rewritten any returning
+ * list too.
+ */
+ if (!instead && rt_entry_relation->rd_rel->relkind == RELKIND_VIEW)
+ {
+ Query *query = qual_product ? qual_product : parsetree;
+ Query *newquery = rewriteTargetView(query, rt_entry_relation);
+
+ if (newquery != NULL)
+ {
+ product_queries = lappend(product_queries, newquery);
+ instead = true;
+ returning = true;
+ }
+ }
+
/*
* If we got any product queries, recursively rewrite them --- but
* first check for recursion!
*************** RewriteQuery(Query *parsetree, List *rew
*** 2040,2046 ****
rewrite_events = list_delete_first(rewrite_events);
}
- }
/*
* If there is an INSTEAD, and the original query has a RETURNING, we
--- 2881,2886 ----
*************** QueryRewrite(Query *parsetree)
*** 2181,2187 ****
/*
* Step 2
*
! * Apply all the RIR rules on each query
*
* This is also a handy place to mark each query with the original queryId
*/
--- 3021,3028 ----
/*
* Step 2
*
! * Apply all the RIR rules on each query, and then expand any security
! * quals that apply to RTEs in the query.
*
* This is also a handy place to mark each query with the original queryId
*/
*************** QueryRewrite(Query *parsetree)
*** 2191,2196 ****
--- 3032,3038 ----
Query *query = (Query *) lfirst(l);
query = fireRIRrules(query, NIL, false);
+ query = rewriteSecurityQuals(query);
query->queryId = input_query_id;
diff --git a/src/backend/rewrite/rewriteManip.c b/src/backend/rewrite/rewriteManip.c
new file mode 100644
index 5fcf274..7fe2be5
*** a/src/backend/rewrite/rewriteManip.c
--- b/src/backend/rewrite/rewriteManip.c
***************
*** 13,18 ****
--- 13,19 ----
*/
#include "postgres.h"
+ #include "access/sysattr.h"
#include "catalog/pg_type.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
*************** ResolveNew(Node *node, int target_varno,
*** 1440,1442 ****
--- 1441,1483 ----
(void *) &context,
outer_hasSubLinks);
}
+
+
+ /*
+ * adjust_column_set - replace columns in a set with entries from a targetlist
+ *
+ * Non-system columns in the set are replaced by the entry with matching resno
+ * from targetlist, and system columns are left unchanged. This is used for
+ * simply updatable views to map permissions checks on the view columns onto
+ * the matching columns in the underlying base relation.
+ */
+ Bitmapset *
+ adjust_column_set(Bitmapset *cols, List *targetlist)
+ {
+ Bitmapset *tmpcols = bms_copy(cols);
+ Bitmapset *result = NULL;
+ AttrNumber col;
+
+ while ((col = bms_first_member(tmpcols)) >= 0)
+ {
+ AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
+
+ if (attno <= 0)
+ /* system column - leave it unchanged */
+ result = bms_add_member(result, col);
+ else if (attno <= list_length(targetlist))
+ {
+ /* non-system column - update it from the targetlist */
+ TargetEntry *tle = get_tle_by_resno(targetlist, attno);
+ if (tle != NULL && IsA(tle->expr, Var))
+ {
+ Var *var = (Var *) tle->expr;
+ result = bms_add_member(result,
+ var->varattno - FirstLowInvalidHeapAttributeNumber);
+ }
+ }
+ }
+ bms_free(tmpcols);
+
+ return result;
+ }
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
new file mode 100644
index 82b6c0c..0b4e0b5
*** a/src/include/catalog/catversion.h
--- b/src/include/catalog/catversion.h
***************
*** 53,58 ****
*/
/* yyyymmddN */
! #define CATALOG_VERSION_NO 201208071
#endif
--- 53,58 ----
*/
/* yyyymmddN */
! #define CATALOG_VERSION_NO 201208111
#endif
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
new file mode 100644
index f433166..217134e
*** a/src/include/nodes/parsenodes.h
--- b/src/include/nodes/parsenodes.h
*************** typedef struct RangeTblEntry
*** 765,770 ****
--- 765,771 ----
Oid checkAsUser; /* if valid, check access as this role */
Bitmapset *selectedCols; /* columns needing SELECT permission */
Bitmapset *modifiedCols; /* columns needing INSERT/UPDATE permission */
+ List *securityQuals; /* any security barrier quals to apply */
} RangeTblEntry;
/*
diff --git a/src/include/rewrite/rewriteManip.h b/src/include/rewrite/rewriteManip.h
new file mode 100644
index e13331d..4e0b867
*** a/src/include/rewrite/rewriteManip.h
--- b/src/include/rewrite/rewriteManip.h
*************** extern Node *ResolveNew(Node *node, int
*** 74,77 ****
--- 74,79 ----
List *targetlist, int event, int update_varno,
bool *outer_hasSubLinks);
+ extern Bitmapset *adjust_column_set(Bitmapset *cols, List *targetlist);
+
#endif /* REWRITEMANIP_H */
On 12 August 2012 22:14, Dean Rasheed <dean.a.rasheed@gmail.com> wrote:
I've been looking at this further and I have made some improvements...
Here's an updated WIP patch which I'll add to the next commitfest.
I've added information schema updates and some new regression tests. I
still need to work on some doc updates.
Regards,
Dean
Attachments:
auto-update-views.patchapplication/octet-stream; name=auto-update-views.patchDownload
diff --git a/src/backend/catalog/information_schema.sql b/src/backend/catalog/information_schema.sql
new file mode 100644
index 4bd942f..440d848
*** a/src/backend/catalog/information_schema.sql
--- b/src/backend/catalog/information_schema.sql
*************** CREATE VIEW views AS
*** 2497,2510 ****
CAST('NONE' AS character_data) AS check_option,
CAST(
! CASE WHEN EXISTS (SELECT 1 FROM pg_rewrite WHERE ev_class = c.oid AND ev_type = '2' AND is_instead)
! AND EXISTS (SELECT 1 FROM pg_rewrite WHERE ev_class = c.oid AND ev_type = '4' AND is_instead)
! THEN 'YES' ELSE 'NO' END
AS yes_or_no) AS is_updatable,
CAST(
! CASE WHEN EXISTS (SELECT 1 FROM pg_rewrite WHERE ev_class = c.oid AND ev_type = '3' AND is_instead)
! THEN 'YES' ELSE 'NO' END
AS yes_or_no) AS is_insertable_into,
CAST(
--- 2497,2507 ----
CAST('NONE' AS character_data) AS check_option,
CAST(
! CASE WHEN pg_is_view_updatable(c.oid) THEN 'YES' ELSE 'NO' END
AS yes_or_no) AS is_updatable,
CAST(
! CASE WHEN pg_is_view_insertable(c.oid) THEN 'YES' ELSE 'NO' END
AS yes_or_no) AS is_insertable_into,
CAST(
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
new file mode 100644
index f34f704..f4cb6dc
*** a/src/backend/nodes/copyfuncs.c
--- b/src/backend/nodes/copyfuncs.c
*************** _copyRangeTblEntry(const RangeTblEntry *
*** 1994,1999 ****
--- 1994,2000 ----
COPY_SCALAR_FIELD(checkAsUser);
COPY_BITMAPSET_FIELD(selectedCols);
COPY_BITMAPSET_FIELD(modifiedCols);
+ COPY_NODE_FIELD(securityQuals);
return newnode;
}
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
new file mode 100644
index b4b1c22..51c73b5
*** a/src/backend/nodes/equalfuncs.c
--- b/src/backend/nodes/equalfuncs.c
*************** _equalRangeTblEntry(const RangeTblEntry
*** 2305,2310 ****
--- 2305,2311 ----
COMPARE_SCALAR_FIELD(checkAsUser);
COMPARE_BITMAPSET_FIELD(selectedCols);
COMPARE_BITMAPSET_FIELD(modifiedCols);
+ COMPARE_NODE_FIELD(securityQuals);
return true;
}
diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c
new file mode 100644
index b130902..4d461b2
*** a/src/backend/nodes/nodeFuncs.c
--- b/src/backend/nodes/nodeFuncs.c
*************** range_table_walker(List *rtable,
*** 1942,1947 ****
--- 1942,1950 ----
return true;
break;
}
+
+ if (walker(rte->securityQuals, context))
+ return true;
}
return false;
}
*************** range_table_mutator(List *rtable,
*** 2653,2658 ****
--- 2656,2662 ----
MUTATE(newrte->values_lists, rte->values_lists, List *);
break;
}
+ MUTATE(newrte->securityQuals, rte->securityQuals, List *);
newrt = lappend(newrt, newrte);
}
return newrt;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
new file mode 100644
index 01f381e..eed6869
*** a/src/backend/nodes/outfuncs.c
--- b/src/backend/nodes/outfuncs.c
*************** _outRangeTblEntry(StringInfo str, const
*** 2382,2387 ****
--- 2382,2388 ----
WRITE_OID_FIELD(checkAsUser);
WRITE_BITMAPSET_FIELD(selectedCols);
WRITE_BITMAPSET_FIELD(modifiedCols);
+ WRITE_NODE_FIELD(securityQuals);
}
static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
new file mode 100644
index 1eb7582..bb4009c
*** a/src/backend/nodes/readfuncs.c
--- b/src/backend/nodes/readfuncs.c
*************** _readRangeTblEntry(void)
*** 1229,1234 ****
--- 1229,1235 ----
READ_OID_FIELD(checkAsUser);
READ_BITMAPSET_FIELD(selectedCols);
READ_BITMAPSET_FIELD(modifiedCols);
+ READ_NODE_FIELD(securityQuals);
READ_DONE();
}
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
new file mode 100644
index ccd69fc..3cb0a65
*** a/src/backend/optimizer/plan/setrefs.c
--- b/src/backend/optimizer/plan/setrefs.c
*************** set_plan_references(PlannerInfo *root, P
*** 224,229 ****
--- 224,230 ----
newrte->ctecoltypes = NIL;
newrte->ctecoltypmods = NIL;
newrte->ctecolcollations = NIL;
+ newrte->securityQuals = NIL;
glob->finalrtable = lappend(glob->finalrtable, newrte);
diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c
new file mode 100644
index 8f75948..2f9a118
*** a/src/backend/rewrite/rewriteHandler.c
--- b/src/backend/rewrite/rewriteHandler.c
***************
*** 14,19 ****
--- 14,20 ----
#include "postgres.h"
#include "access/sysattr.h"
+ #include "catalog/heap.h"
#include "catalog/pg_type.h"
#include "commands/trigger.h"
#include "nodes/makefuncs.h"
*************** static List *matchLocks(CmdType event, R
*** 60,65 ****
--- 61,67 ----
int varno, Query *parsetree);
static Query *fireRIRrules(Query *parsetree, List *activeRIRs,
bool forUpdatePushedDown);
+ static Query *rewriteSecurityQuals(Query *parsetree);
/*
*************** fireRules(Query *parsetree,
*** 1829,1834 ****
--- 1831,2650 ----
/*
+ * get_view_query - get the query from a view's _RETURN rule.
+ */
+ static Query *
+ get_view_query(Relation view)
+ {
+ int i;
+
+ Assert(view->rd_rel->relkind == RELKIND_VIEW);
+
+ for (i = 0; i < view->rd_rules->numLocks; i++)
+ {
+ RewriteRule *rule = view->rd_rules->rules[i];
+
+ if (rule->event == CMD_SELECT)
+ {
+ /* A _RETURN rule should have only one action */
+ if (list_length(rule->actions) != 1)
+ elog(ERROR, "invalid _RETURN rule action specification");
+
+ return linitial(rule->actions);
+ }
+ }
+
+ elog(ERROR, "failed to find _RETURN rule for view");
+ return NULL; /* keep compiler quiet */
+ }
+
+
+ /*
+ * test_auto_update_view -
+ * Test if the specified view can be automatically updated. This will
+ * either return NULL (if the view can be updated) or the reason that it
+ * cannot.
+ *
+ * Note that the checks performed here are local to this view. It does not
+ * check that the view's underlying base relation is updatable.
+ */
+ static const char *
+ test_auto_update_view(Relation view)
+ {
+ Query *viewquery;
+ RangeTblRef *rtr;
+ RangeTblEntry *base_rte;
+ Bitmapset *bms;
+ ListCell *cell;
+
+ /*
+ * Check if the view is simply updatable. According to SQL-92 this means:
+ * - No DISTINCT clauses.
+ * - Every TLE is a column reference, and appears at most once.
+ * - Refers to a single base relation.
+ * - No GROUP BY or HAVING clauses.
+ * - No set operations (UNION, INTERSECT or EXCEPT).
+ * - No sub-queries in the WHERE clause.
+ *
+ * We relax this last restriction since it would actually be more work to
+ * enforce it than to simply allow it. In addition (for now) we impose
+ * the following additional constraints, based on features that are not
+ * part of SQL-92:
+ * - No DISTINCT ON clauses.
+ * - No window functions.
+ * - No CTEs (WITH or WITH RECURSIVE).
+ * - No OFFSET or LIMIT clauses.
+ * - No system columns.
+ *
+ * Note that we do these checks without recursively expanding the view.
+ * The base relation may be a view, and it might have INSTEAD OF triggers
+ * or rules, in which case it need not satisfy these constraints.
+ */
+ viewquery = get_view_query(view);
+
+ if (viewquery->distinctClause != NIL ||
+ viewquery->hasDistinctOn)
+ return "Views containing DISTINCT are not updatable";
+
+ if (viewquery->groupClause != NIL)
+ return "Views containing GROUP BY are not updatable";
+
+ if (viewquery->havingQual != NULL)
+ return "Views containing HAVING are not updatable";
+
+ if (viewquery->hasAggs)
+ return "Views containing aggregates are not updatable";
+
+ if (viewquery->hasWindowFuncs)
+ return "Views containing window functions are not updatable";
+
+ if (viewquery->setOperations != NULL)
+ return "Views containing UNION, INTERSECT or EXCEPT are not updatable";
+
+ if (viewquery->hasRecursive ||
+ viewquery->cteList != NIL)
+ return "Views containing WITH [RECURSIVE] are not updatable";
+
+ if (viewquery->limitOffset != NULL ||
+ viewquery->limitCount != NULL)
+ return "Views containing OFFSET or LIMIT are not updatable";
+
+ /*
+ * The view query should select from a single base relation, which must be
+ * a table or another view.
+ */
+ if (list_length(viewquery->jointree->fromlist) == 0)
+ return "Views with no base relations are not updatable";
+
+ if (list_length(viewquery->jointree->fromlist) > 1)
+ return "Views with multiple base relations are not updatable";
+
+ rtr = (RangeTblRef *) linitial(viewquery->jointree->fromlist);
+ if (!IsA(rtr, RangeTblRef))
+ return "Views that are not based on tables or views are not updatable";
+
+ base_rte = rt_fetch(rtr->rtindex, viewquery->rtable);
+ if (base_rte->rtekind != RTE_RELATION)
+ return "Views that are not based on tables or views are not updatable";
+
+ /*
+ * The view's targetlist entries should all be Vars referring to columns
+ * in the base relation, and no 2 should refer to the same base column.
+ *
+ * Note that we don't bother freeing resources allocated here if we
+ * return early, since that is an error condition.
+ */
+ bms = NULL;
+ foreach(cell, viewquery->targetList)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(cell);
+ Var *var = (Var *) tle->expr;
+
+ if (!IsA(var, Var))
+ return "Views with columns that are not simple references to columns in the base relation are not updatable";
+
+ if (var->varattno < 0)
+ return "Views that refer to system columns are not updatable";
+
+ if (var->varattno == 0)
+ return "Views that refer to whole rows from the base relation are not updatable";
+
+ if (bms_is_member(var->varattno, bms))
+ return "Views that refer to the same column more than once are not updatable";
+
+ bms = bms_add_member(bms, var->varattno);
+ }
+
+ bms_free(bms);
+ return NULL; /* the view is simply updatable */
+ }
+
+
+ /*
+ * rewriteTargetView -
+ * attempt to rewrite a query where the target relation is a view, so that
+ * the view's base relation becomes the target relation.
+ *
+ * This only handles the case where there are no INSTEAD OF triggers to update
+ * the view. If there are INSTEAD OF triggers, this function will return NULL
+ * and the view modification is handled later by fireRIRrules.
+ *
+ * Note that the base relation here may itself be a view, which may or may not
+ * have INSTEAD OF triggers or rules to handle the update. That is handled by
+ * the recursion in RewriteQuery.
+ */
+ static Query *
+ rewriteTargetView(Query *parsetree, Relation view)
+ {
+ TriggerDesc *trigDesc = view->trigdesc;
+ const char *auto_update_detail;
+ Query *viewquery;
+ RangeTblRef *rtr;
+ int base_rt_index;
+ RangeTblEntry *base_rte;
+ List *view_targetlist;
+ bool same_cols;
+ ListCell *cell;
+ int rt_index;
+ List *new_rtable;
+ RangeTblEntry *view_rte;
+
+ /* The view must have INSTEAD OF triggers or be simply updatable */
+ switch (parsetree->commandType)
+ {
+ case CMD_INSERT:
+ if (trigDesc && trigDesc->trig_insert_instead_row)
+ return NULL; /* INSTEAD OF INSERT trigger will be used */
+
+ auto_update_detail = test_auto_update_view(view);
+ if (auto_update_detail)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot insert into view \"%s\"",
+ RelationGetRelationName(view)),
+ errdetail("%s.", auto_update_detail),
+ errhint("You need an unconditional ON INSERT DO INSTEAD rule or an INSTEAD OF INSERT trigger.")));
+ break;
+ case CMD_UPDATE:
+ if (trigDesc && trigDesc->trig_update_instead_row)
+ return NULL; /* INSTEAD OF UPDATE trigger will be used */
+
+ auto_update_detail = test_auto_update_view(view);
+ if (auto_update_detail)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot update view \"%s\"",
+ RelationGetRelationName(view)),
+ errdetail("%s.", auto_update_detail),
+ errhint("You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger.")));
+ break;
+ case CMD_DELETE:
+ if (trigDesc && trigDesc->trig_delete_instead_row)
+ return NULL; /* INSTEAD OF DELETE trigger will be used */
+
+ auto_update_detail = test_auto_update_view(view);
+ if (auto_update_detail)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot delete from view \"%s\"",
+ RelationGetRelationName(view)),
+ errdetail("%s.", auto_update_detail),
+ errhint("You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger.")));
+ break;
+ default:
+ elog(ERROR, "unrecognized CmdType: %d", (int) parsetree->commandType);
+ break;
+ }
+
+ /*
+ * The view is simply updatable, which means that it should have a single
+ * base relation.
+ */
+ viewquery = get_view_query(view);
+ Assert(list_length(viewquery->jointree->fromlist) == 1);
+
+ rtr = (RangeTblRef *) linitial(viewquery->jointree->fromlist);
+ Assert(IsA(rtr, RangeTblRef));
+
+ base_rt_index = rtr->rtindex;
+ base_rte = rt_fetch(base_rt_index, viewquery->rtable);
+ Assert(base_rte->rtekind == RTE_RELATION);
+
+ /*
+ * Make a copy of the view's targetlist, adjusting its Vars to allow it
+ * to be plugged into the main query.
+ */
+ view_targetlist = copyObject(viewquery->targetList);
+
+ ChangeVarNodes((Node *) view_targetlist,
+ base_rt_index, parsetree->resultRelation, 0);
+
+ /**
+ * Test for the simple case where the view has all the same columns as the
+ * base relation, in the same order.
+ */
+ same_cols = true;
+ foreach(cell, view_targetlist)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(cell);
+ Var *var = (Var *) tle->expr;
+
+ if (var->varattno != tle->resno)
+ {
+ /* view columns are in a different order */
+ same_cols = false;
+ break;
+ }
+ }
+
+ /*
+ * Replace the old view target RTE with a new RTE referring to the base
+ * relation. Since we are not changing the result relation index, there
+ * is no need to update any varnos in the original query.
+ *
+ * Note that we need to keep the original view RTE, even though it will
+ * not be used directly in the query, so that the correct permissions
+ * checks are still done against it. This is appended to the end of the
+ * rangetable.
+ */
+ rt_index = 1;
+ new_rtable = NIL;
+ foreach(cell, parsetree->rtable)
+ {
+ RangeTblEntry *rte = (RangeTblEntry *) lfirst(cell);
+
+ if (rt_index == parsetree->resultRelation)
+ {
+ /*
+ * Create the new target RTE using the base relation. Copy any
+ * permissions checks from the original view target RTE, adjusting
+ * the column numbers if necessary, and performing the checks as
+ * the view's owner.
+ */
+ RangeTblEntry *newrte = (RangeTblEntry *) copyObject(base_rte);
+ newrte->requiredPerms = rte->requiredPerms;
+ newrte->checkAsUser = view->rd_rel->relowner;
+ newrte->selectedCols = bms_copy(rte->selectedCols);
+ newrte->modifiedCols = bms_copy(rte->modifiedCols);
+
+ if (!same_cols)
+ {
+ newrte->selectedCols = adjust_column_set(newrte->selectedCols,
+ view_targetlist);
+ newrte->modifiedCols = adjust_column_set(newrte->modifiedCols,
+ view_targetlist);
+ }
+
+ /*
+ * Move any security barrier quals from the view RTE onto the new
+ * base relation RTE, since the original view RTE will not be
+ * referenced in the final query.
+ */
+ newrte->securityQuals = rte->securityQuals;
+ rte->securityQuals = NIL;
+
+ /*
+ * Keep the original view RTE so that executor also checks that
+ * the current user has the required permissions on the view
+ */
+ view_rte = rte;
+ rte = newrte;
+ }
+ new_rtable = lappend(new_rtable, rte);
+ rt_index++;
+ }
+ parsetree->rtable = lappend(new_rtable, view_rte);
+
+ /*
+ * For UPDATE/DELETE, rewriteTargetListUD will have added a wholerow junk
+ * TLE for the view to the end of the targetlist which we no longer need.
+ * We remove this now to avoid unnecessary work when we process the
+ * targetlist. Note that when we recurse through rewriteQuery a new junk
+ * TLE will be added to allow the executor to find the row in the base
+ * relation.
+ */
+ if (parsetree->commandType != CMD_INSERT)
+ {
+ TargetEntry *tle = (TargetEntry *)
+ lfirst(list_tail(parsetree->targetList));
+
+ Assert(IsA(tle->expr, Var) && ((Var *) tle->expr)->varattno == 0);
+ parsetree->targetList = list_delete_ptr(parsetree->targetList, tle);
+ }
+
+ /*
+ * If the view's columns don't match the base relation's columns, we need
+ * to update the targetlist to point to the columns in the base relation,
+ * and similarly update any Vars in the query that refer to the result
+ * relation.
+ *
+ * Note that this destroys the resno ordering of the targetlist, but that
+ * will be restored when we recurse through rewriteQuery which will invoke
+ * rewriteTargetListIU on this updated targetlist.
+ */
+ if (!same_cols)
+ {
+ /* Update the main query's targetlist */
+ foreach(cell, parsetree->targetList)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(cell);
+ TargetEntry *view_tle;
+
+ /* Ignore system columns */
+ if (tle->resjunk || tle->resno <= 0)
+ continue;
+
+ view_tle = get_tle_by_resno(view_targetlist, tle->resno);
+ if (view_tle == NULL)
+ elog(ERROR, "View column %d not found", tle->resno);
+
+ tle->resno = ((Var *) view_tle->expr)->varattno;
+ }
+
+ /* Update any Vars referring to the result relation. */
+ parsetree =
+ (Query *) ResolveNew((Node *) parsetree,
+ parsetree->resultRelation,
+ 0,
+ view_rte,
+ view_targetlist,
+ parsetree->commandType,
+ parsetree->resultRelation,
+ &parsetree->hasSubLinks);
+ }
+
+ /*
+ * For UPDATE/DELETE, deal with any quals from the view's jointree. We
+ * know that there is just one relation in the view, so any references to
+ * that relation in the view's quals need to be updated to refer to the
+ * corresponding relation in the main query (the result relation).
+ *
+ * If the view is not a security barrier then we simply add the view quals
+ * to the main query. However, if the view is a security barrier then its
+ * quals must take precedence over any of the user's quals. Here we just
+ * make a note of such quals on the relevant RTE so that we can deal with
+ * them later in rewriteSecurityQuals, which will turn the RTE into a
+ * subquery.
+ *
+ * For INSERT the view's quals are not needed for now. When we implement
+ * WITH CHECK OPTION, this might be a good time to collect them.
+ */
+ if (parsetree->commandType != CMD_INSERT &&
+ viewquery->jointree->quals != NULL)
+ {
+ Node *viewqual = copyObject(viewquery->jointree->quals);
+
+ ChangeVarNodes(viewqual, base_rt_index, parsetree->resultRelation, 0);
+
+ if (RelationIsSecurityView(view))
+ {
+ base_rte = rt_fetch(parsetree->resultRelation, parsetree->rtable);
+ base_rte->securityQuals = lcons(viewqual,
+ base_rte->securityQuals);
+ }
+ else
+ AddQual(parsetree, (Node *) viewqual);
+ }
+
+ return parsetree;
+ }
+
+
+ /*
+ * rewriteSecurityQual -
+ * rewrite the specified security barrier qual on a query RTE, turning the
+ * RTE into a subquery.
+ */
+ static Query *
+ rewriteSecurityQual(Query *parsetree, int rt_index,
+ bool is_result_relation, Node *qual)
+ {
+ RangeTblEntry *rte;
+ List *targetlist = NIL;
+ List *inverse_targetlist = NIL;
+ List *colnames = NIL;
+ Relation relation;
+ AttrNumber attno;
+ Var *var;
+ TargetEntry *tle;
+ Query *subquery;
+ RangeTblEntry *subrte;
+ RangeTblRef *subrtr;
+ ListCell *cell;
+
+ rte = rt_fetch(rt_index, parsetree->rtable);
+
+ /*
+ * There are 2 possible cases:
+ *
+ * 1). A relation RTE, which we turn into a subquery RTE containing all
+ * referenced columns.
+ *
+ * 2). A subquery RTE (either from a prior call to this function or from
+ * an expanded view). In this case we build a new subquery on top of it
+ * to isolate this security barrier qual from any other quals.
+ */
+ switch (rte->rtekind)
+ {
+ case RTE_RELATION:
+ /*
+ * Build the subquery targetlist from the columns used from the
+ * underlying table and an inverse targetlist to map varattnos in
+ * the main query onto the new subquery columns.
+ */
+ relation = heap_open(rte->relid, NoLock);
+
+ for (attno = FirstLowInvalidHeapAttributeNumber;
+ attno <= relation->rd_att->natts; attno++)
+ {
+ Form_pg_attribute att_tup;
+ char *attname;
+
+ /* Ignore columns that aren't used */
+ if (!attribute_used((Node *) parsetree, rt_index, attno, 0))
+ continue;
+
+ if (attno == InvalidAttrNumber)
+ {
+ /* whole-row attribute */
+ var = makeWholeRowVar(rte, 1, 0, false);
+ attname = "wholerow";
+ }
+ else
+ {
+ /* regular table column or system attribute */
+ if (attno >= 1)
+ att_tup = relation->rd_att->attrs[attno - 1];
+ else
+ att_tup = SystemAttributeDefinition(attno,
+ relation->rd_rel->relhasoids);
+
+ var = makeVar(1,
+ attno,
+ att_tup->atttypid,
+ att_tup->atttypmod,
+ att_tup->attcollation,
+ 0);
+ attname = NameStr(att_tup->attname);
+ }
+
+ /* target entry for new subquery targetlist */
+ tle = makeTargetEntry((Expr *) var,
+ list_length(targetlist) + 1,
+ pstrdup(attname),
+ false);
+ targetlist = lappend(targetlist, tle);
+
+ /* inverse target entry for rewriting the main query */
+ var = copyObject(var);
+ var->varattno = list_length(targetlist);
+ tle = makeTargetEntry((Expr *) var,
+ attno,
+ pstrdup(attname),
+ false);
+ inverse_targetlist = lappend(inverse_targetlist, tle);
+
+ colnames = lappend(colnames, makeString(pstrdup(attname)));
+ }
+ heap_close(relation, NoLock);
+
+ /*
+ * Turn the main relation RTE into a security barrier subquery
+ * RTE, moving all permissions checks and rowMarks down into the
+ * subquery.
+ */
+ subquery = makeNode(Query);
+ subquery->commandType = CMD_SELECT;
+ subquery->querySource = QSRC_INSTEAD_RULE;
+ subquery->targetList = targetlist;
+
+ subrte = copyObject(rte);
+ subrte->inFromCl = true;
+ subrte->securityQuals = NIL;
+ subquery->rtable = list_make1(subrte);
+
+ subrtr = makeNode(RangeTblRef);
+ subrtr->rtindex = 1;
+ subquery->jointree = makeFromExpr(list_make1(subrtr), qual);
+ subquery->hasSubLinks = checkExprHasSubLink(qual);
+
+ foreach(cell, parsetree->rowMarks)
+ {
+ RowMarkClause *rc = (RowMarkClause *) lfirst(cell);
+ if (rc->rti == rt_index)
+ {
+ parsetree->rowMarks = list_delete(parsetree->rowMarks,
+ rc);
+ rc->rti = 1;
+ subquery->rowMarks = list_make1(rc);
+ subquery->hasForUpdate = rc->forUpdate;
+ break;
+ }
+ }
+
+ /*
+ * If this RTE was the result relation, then we need to lock the
+ * rows coming from it. Note that by the time we get here, this
+ * RTE will no longer be the result relation, so we have to rely
+ * on the flag passed in.
+ */
+ if (is_result_relation)
+ applyLockingClause(subquery, 1, true, false, true);
+
+ rte->rtekind = RTE_SUBQUERY;
+ rte->relid = InvalidOid;
+ rte->subquery = subquery;
+ rte->security_barrier = true;
+ rte->eref = makeAlias(rte->eref->aliasname, colnames);
+ rte->inh = false; /* must not be set for a subquery */
+
+ /* the permissions checks have now been move down */
+ rte->requiredPerms = 0;
+ rte->checkAsUser = InvalidOid;
+ rte->selectedCols = NULL;
+ rte->modifiedCols = NULL;
+
+ /*
+ * Update any varattnos in the main query that refer to this RTE,
+ * using the entries from the inverse targetlist.
+ */
+ return (Query *) ResolveNew((Node *) parsetree,
+ rt_index,
+ 0,
+ rte,
+ inverse_targetlist,
+ parsetree->commandType,
+ rt_index,
+ &parsetree->hasSubLinks);
+
+ case RTE_SUBQUERY:
+ /*
+ * Build a new subquery that includes all the same columns as the
+ * original subquery.
+ */
+ foreach(cell, rte->subquery->targetList)
+ {
+ tle = (TargetEntry *) lfirst(cell);
+ var = makeVarFromTargetEntry(1, tle);
+
+ tle = makeTargetEntry((Expr *) var,
+ list_length(targetlist) + 1,
+ pstrdup(tle->resname),
+ tle->resjunk);
+ targetlist = lappend(targetlist, tle);
+ }
+
+ subquery = makeNode(Query);
+ subquery->commandType = CMD_SELECT;
+ subquery->querySource = QSRC_INSTEAD_RULE;
+ subquery->targetList = targetlist;
+
+ subrte = makeNode(RangeTblEntry);
+ subrte->rtekind = RTE_SUBQUERY;
+ subrte->subquery = rte->subquery;
+ subrte->security_barrier = rte->security_barrier;
+ subrte->eref = copyObject(rte->eref);
+ subrte->inFromCl = true;
+ subquery->rtable = list_make1(subrte);
+
+ subrtr = makeNode(RangeTblRef);
+ subrtr->rtindex = 1;
+ subquery->jointree = makeFromExpr(list_make1(subrtr), qual);
+ subquery->hasSubLinks = checkExprHasSubLink(qual);
+
+ rte->subquery = subquery;
+ rte->security_barrier = true;
+
+ return parsetree;
+
+ default:
+ elog(ERROR, "invalid range table entry for security barrier qual");
+ }
+
+ return NULL;
+ }
+
+
+ /*
+ * rewriteSecurityQualsOnSubLink -
+ * Apply rewriteSecurityQuals() to each SubLink (subselect in expression)
+ * found in the given tree.
+ *
+ * NOTE: although this has the form of a walker, we cheat and modify the
+ * SubLink nodes in-place. This is safe because we are not descending into
+ * subqueries, so no parts of the tree we are modifying are being traversed.
+ *
+ * Each SubLink subselect is replaced with a possibly-rewritten subquery.
+ */
+ static bool
+ rewriteSecurityQualsOnSubLink(Node *node, void *ctx)
+ {
+ if (node == NULL)
+ return false;
+ if (IsA(node, SubLink))
+ {
+ SubLink *sub = (SubLink *) node;
+
+ sub->subselect = (Node *)
+ rewriteSecurityQuals((Query *) sub->subselect);
+ /* Fall through to process lefthand args of SubLink */
+ }
+
+ /*
+ * Do NOT recurse into Query nodes, because rewriteSecurityQuals already
+ * processed all subqueries in the rtable and cteList.
+ */
+ return expression_tree_walker(node, rewriteSecurityQualsOnSubLink, ctx);
+ }
+
+
+ /*
+ * rewriteSecurityQuals -
+ * rewrites any security barrier quals on RTEs in the query, turning them
+ * into subqueries to allow the planner to enforce them before any user
+ * quals where necessary. Currently such security barrier quals can only
+ * have come from automatically updatable security barrier views.
+ *
+ * We do this at the end of the rewriting process (after any SELECT rules
+ * have been applied) so that the new security barrier subqueries wrap any
+ * remaining views after they are expanded.
+ *
+ * Any given RTE may have multiple security barrier quals in a list, from
+ * which we create a set of nested subqueries to isolate each security barrier
+ * from the others, providing protection against malicious user security
+ * barriers. The first item in the list represents the innermost subquery.
+ */
+ static Query *
+ rewriteSecurityQuals(Query *parsetree)
+ {
+ ListCell *l;
+ int rt_index;
+
+ /*
+ * Process each RTE in the rtable list. Security barrier quals are
+ * initially only added to the result relation, but subsequent rules
+ * may change that, so they may be anywhere.
+ *
+ * Note that this is deliberately not a foreach loop, since the whole
+ * parsetree may be mutated each time through the loop.
+ */
+ rt_index = 0;
+ while (rt_index < list_length(parsetree->rtable))
+ {
+ RangeTblEntry *rte;
+ bool is_result_relation;
+ int qual_idx;
+
+ ++rt_index;
+ rte = rt_fetch(rt_index, parsetree->rtable);
+
+ if (rte->securityQuals == NIL)
+ continue;
+
+ /*
+ * Ignore any RTEs that aren't used in the query (such RTEs may be
+ * present for permissions checks).
+ */
+ if (rt_index != parsetree->resultRelation &&
+ !rangeTableEntry_used((Node *) parsetree, rt_index, 0))
+ continue;
+
+ /*
+ * Recursively process any security barrier quals in subquery RTEs
+ * before processing any at this query level.
+ */
+ if (rte->rtekind == RTE_SUBQUERY)
+ rte->subquery = rewriteSecurityQuals(rte->subquery);
+
+ /*
+ * If this RTE is the target then we need to make a copy of it before
+ * expanding it. The unexpanded copy will become the new target, and
+ * the expanded RTE will be the source of rows to update/delete.
+ */
+ is_result_relation = rt_index == parsetree->resultRelation;
+ if (is_result_relation)
+ {
+ RangeTblEntry *newrte = copyObject(rte);
+ parsetree->rtable = lappend(parsetree->rtable, newrte);
+ parsetree->resultRelation = list_length(parsetree->rtable);
+
+ /*
+ * Wipe out any copied security quals on the new target to prevent
+ * infinite recursion.
+ */
+ newrte->securityQuals = NIL;
+
+ /*
+ * There's no need to do permissions checks twice, so wipe out the
+ * permissions info for the original RTE (we prefer to keep the
+ * bits set on the result RTE).
+ */
+ rte->requiredPerms = 0;
+ rte->checkAsUser = InvalidOid;
+ rte->selectedCols = NULL;
+ rte->modifiedCols = NULL;
+
+ /*
+ * For the most part, Vars referencing the original relation
+ * should remain as as they are, meaning that they implicitly
+ * represent OLD values. But in the RETURNING list if any, we
+ * want such Vars to represent NEW values, so change them to
+ * reference the new RTE.
+ *
+ * Since ChangeVarNodes scribbles on the tree in-place, copy the
+ * RETURNING list first for safety.
+ */
+ parsetree->returningList = copyObject(parsetree->returningList);
+ ChangeVarNodes((Node *) parsetree->returningList, rt_index,
+ parsetree->resultRelation, 0);
+ }
+
+ /*
+ * Process each security qual in turn, starting with the innermost
+ * one and working outwards.
+ *
+ * Note that we can't use a foreach loop here because the whole
+ * parsetree may be mutated each time through the loop. For the same
+ * reason we must re-fetch the RTE each time.
+ */
+ qual_idx = 0;
+ while (qual_idx < list_length(rte->securityQuals))
+ {
+ Node *qual = (Node *) list_nth(rte->securityQuals, qual_idx);
+
+ parsetree = rewriteSecurityQual(parsetree, rt_index,
+ is_result_relation, qual);
+
+ /* re-fetch the RTE in case it has been re-written */
+ rte = rt_fetch(rt_index, parsetree->rtable);
+
+ qual_idx++;
+ }
+ rte->securityQuals = NIL;
+ }
+
+ /* Recurse into subqueries in WITH */
+ foreach(l, parsetree->cteList)
+ {
+ CommonTableExpr *cte = (CommonTableExpr *) lfirst(l);
+
+ cte->ctequery = (Node *)
+ rewriteSecurityQuals((Query *) cte->ctequery);
+ }
+
+ /*
+ * Recurse into sublink subqueries too, without descending into the rtable
+ * or the cteList, which we have already processed.
+ */
+ if (parsetree->hasSubLinks)
+ query_tree_walker(parsetree, rewriteSecurityQualsOnSubLink, NULL,
+ QTW_IGNORE_RC_SUBQUERIES);
+
+ return parsetree;
+ }
+
+
+ /*
* RewriteQuery -
* rewrites the query and apply the rules again on the queries rewritten
*
*************** RewriteQuery(Query *parsetree, List *rew
*** 1842,1847 ****
--- 2658,2664 ----
bool instead = false;
bool returning = false;
Query *qual_product = NULL;
+ List *product_queries = NIL;
List *rewritten = NIL;
ListCell *lc1;
*************** RewriteQuery(Query *parsetree, List *rew
*** 1993,2001 ****
result_relation, parsetree);
if (locks != NIL)
- {
- List *product_queries;
-
product_queries = fireRules(parsetree,
result_relation,
event,
--- 2810,2815 ----
*************** RewriteQuery(Query *parsetree, List *rew
*** 2004,2009 ****
--- 2818,2848 ----
&returning,
&qual_product);
+ /*
+ * If there are no unqualified INSTEAD rules, and the target relation
+ * is a view without any INSTEAD OF triggers, then we have a problem
+ * unless it can be automatically updated.
+ *
+ * If we can automatically update the view, then we do so here and add
+ * the resulting query to the product queries list, so that it gets
+ * recursively rewritten if necessary. We mark this as an unqualified
+ * INSTEAD to prevent the original query from being executed. Note
+ * also that if this succeeds it will have rewritten any returning
+ * list too.
+ */
+ if (!instead && rt_entry_relation->rd_rel->relkind == RELKIND_VIEW)
+ {
+ Query *query = qual_product ? qual_product : parsetree;
+ Query *newquery = rewriteTargetView(query, rt_entry_relation);
+
+ if (newquery != NULL)
+ {
+ product_queries = lappend(product_queries, newquery);
+ instead = true;
+ returning = true;
+ }
+ }
+
/*
* If we got any product queries, recursively rewrite them --- but
* first check for recursion!
*************** RewriteQuery(Query *parsetree, List *rew
*** 2040,2046 ****
rewrite_events = list_delete_first(rewrite_events);
}
- }
/*
* If there is an INSTEAD, and the original query has a RETURNING, we
--- 2879,2884 ----
*************** QueryRewrite(Query *parsetree)
*** 2181,2187 ****
/*
* Step 2
*
! * Apply all the RIR rules on each query
*
* This is also a handy place to mark each query with the original queryId
*/
--- 3019,3026 ----
/*
* Step 2
*
! * Apply all the RIR rules on each query, and then expand any security
! * quals that apply to RTEs in the query.
*
* This is also a handy place to mark each query with the original queryId
*/
*************** QueryRewrite(Query *parsetree)
*** 2191,2196 ****
--- 3030,3036 ----
Query *query = (Query *) lfirst(l);
query = fireRIRrules(query, NIL, false);
+ query = rewriteSecurityQuals(query);
query->queryId = input_query_id;
*************** QueryRewrite(Query *parsetree)
*** 2245,2247 ****
--- 3085,3212 ----
return results;
}
+
+
+ /*
+ * is_relation_updatable - test if the specified relation is updatable.
+ *
+ * This is used for the information schema views, which have separate concepts
+ * of "updatable" and "trigger updatable" although their precise definitions
+ * are not entirely clear in the SQL standard. We take "updatable" to mean
+ * that the relation can be updated without the need for triggers (either
+ * because it has a suitable update rule, or because it is simple enough to be
+ * automatically updated), and "trigger updatable" means it has a suitable
+ * INSTEAD OF trigger.
+ *
+ * In the case of an automatically updatable view, the base relation must also
+ * be updatable, which we take to mean the base relation is either updatable
+ * or trigger updatable.
+ */
+ static bool
+ is_relation_updatable(Oid oid, CmdType event, bool includeTriggers)
+ {
+ Relation rel;
+ RuleLock *rulelocks;
+
+ rel = heap_open(oid, AccessShareLock);
+
+ /* Look for an unconditional DO INSTEAD rule */
+ rulelocks = rel->rd_rules;
+ if (rulelocks != NULL)
+ {
+ int i;
+
+ for (i = 0; i < rulelocks->numLocks; i++)
+ {
+ if (rulelocks->rules[i]->event == event &&
+ rulelocks->rules[i]->isInstead &&
+ rulelocks->rules[i]->qual == NULL)
+ {
+ heap_close(rel, AccessShareLock);
+ return true;
+ }
+ }
+ }
+
+ /* Maybe also check for an INSTEAD OF trigger */
+ if (includeTriggers)
+ {
+ TriggerDesc *trigDesc = rel->trigdesc;
+ bool updatable = false;
+
+ switch (event)
+ {
+ case CMD_INSERT:
+ updatable = trigDesc && trigDesc->trig_insert_instead_row;
+ break;
+
+ case CMD_UPDATE:
+ updatable = trigDesc && trigDesc->trig_update_instead_row;
+ break;
+
+ case CMD_DELETE:
+ updatable = trigDesc && trigDesc->trig_delete_instead_row;
+ break;
+
+ default:
+ elog(ERROR, "unrecognized CmdType: %d", (int) event);
+ break;
+ }
+
+ if (updatable)
+ {
+ heap_close(rel, AccessShareLock);
+ return true;
+ }
+ }
+
+ /* Check if this is an automatically updatable view */
+ if (rel->rd_rel->relkind == RELKIND_VIEW &&
+ test_auto_update_view(rel) == NULL)
+ {
+ Query *viewquery;
+ RangeTblRef *rtr;
+ RangeTblEntry *base_rte;
+ Oid baseoid;
+
+ /* The base relation must also be updatable */
+ viewquery = get_view_query(rel);
+ rtr = (RangeTblRef *) linitial(viewquery->jointree->fromlist);
+ base_rte = rt_fetch(rtr->rtindex, viewquery->rtable);
+
+ if (base_rte->relkind == RELKIND_RELATION)
+ {
+ /* Tables are always updatable */
+ heap_close(rel, AccessShareLock);
+ return true;
+ }
+ else
+ {
+ /* Do a recursive check for any other kind of base relation */
+ baseoid = base_rte->relid;
+ heap_close(rel, AccessShareLock);
+ return is_relation_updatable(baseoid, event, true);
+ }
+ }
+
+ /* If we reach here, the relation is not updatable */
+ heap_close(rel, AccessShareLock);
+ return false;
+ }
+
+ Datum
+ pg_is_view_updatable(PG_FUNCTION_ARGS)
+ {
+ Oid viewoid = PG_GETARG_OID(0);
+
+ PG_RETURN_BOOL(is_relation_updatable(viewoid, CMD_UPDATE, false) &&
+ is_relation_updatable(viewoid, CMD_DELETE, false));
+ }
+
+ Datum
+ pg_is_view_insertable(PG_FUNCTION_ARGS)
+ {
+ Oid viewoid = PG_GETARG_OID(0);
+
+ PG_RETURN_BOOL(is_relation_updatable(viewoid, CMD_INSERT, false));
+ }
diff --git a/src/backend/rewrite/rewriteManip.c b/src/backend/rewrite/rewriteManip.c
new file mode 100644
index ef04c34..d9bd5b4
*** a/src/backend/rewrite/rewriteManip.c
--- b/src/backend/rewrite/rewriteManip.c
***************
*** 13,18 ****
--- 13,19 ----
*/
#include "postgres.h"
+ #include "access/sysattr.h"
#include "catalog/pg_type.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
*************** ResolveNew(Node *node, int target_varno,
*** 1443,1445 ****
--- 1444,1486 ----
(void *) &context,
outer_hasSubLinks);
}
+
+
+ /*
+ * adjust_column_set - replace columns in a set with entries from a targetlist
+ *
+ * Non-system columns in the set are replaced by the entry with matching resno
+ * from targetlist, and system columns are left unchanged. This is used for
+ * simply updatable views to map permissions checks on the view columns onto
+ * the matching columns in the underlying base relation.
+ */
+ Bitmapset *
+ adjust_column_set(Bitmapset *cols, List *targetlist)
+ {
+ Bitmapset *tmpcols = bms_copy(cols);
+ Bitmapset *result = NULL;
+ AttrNumber col;
+
+ while ((col = bms_first_member(tmpcols)) >= 0)
+ {
+ AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
+
+ if (attno <= 0)
+ /* system column - leave it unchanged */
+ result = bms_add_member(result, col);
+ else if (attno <= list_length(targetlist))
+ {
+ /* non-system column - update it from the targetlist */
+ TargetEntry *tle = get_tle_by_resno(targetlist, attno);
+ if (tle != NULL && IsA(tle->expr, Var))
+ {
+ Var *var = (Var *) tle->expr;
+ result = bms_add_member(result,
+ var->varattno - FirstLowInvalidHeapAttributeNumber);
+ }
+ }
+ }
+ bms_free(tmpcols);
+
+ return result;
+ }
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
new file mode 100644
index 77a3b41..c0673b7
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
*************** DATA(insert OID = 2232 ( pg_get_functio
*** 1958,1963 ****
--- 1958,1967 ----
DESCR("identity argument list of a function");
DATA(insert OID = 2165 ( pg_get_function_result PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 25 "26" _null_ _null_ _null_ _null_ pg_get_function_result _null_ _null_ _null_ ));
DESCR("result type of a function");
+ DATA(insert OID = 3170 ( pg_is_view_updatable PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 16 "26" _null_ _null_ _null_ _null_ pg_is_view_updatable _null_ _null_ _null_ ));
+ DESCR("whether or not a view is updatable");
+ DATA(insert OID = 3171 ( pg_is_view_insertable PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 16 "26" _null_ _null_ _null_ _null_ pg_is_view_insertable _null_ _null_ _null_ ));
+ DESCR("whether or not a view is insertable");
DATA(insert OID = 1686 ( pg_get_keywords PGNSP PGUID 12 10 400 0 0 f f f f t t s 0 0 2249 "" "{25,18,25}" "{o,o,o}" "{word,catcode,catdesc}" _null_ pg_get_keywords _null_ _null_ _null_ ));
DESCR("list of SQL keywords");
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
new file mode 100644
index 19178b5..08eebd5
*** a/src/include/nodes/parsenodes.h
--- b/src/include/nodes/parsenodes.h
*************** typedef struct RangeTblEntry
*** 765,770 ****
--- 765,771 ----
Oid checkAsUser; /* if valid, check access as this role */
Bitmapset *selectedCols; /* columns needing SELECT permission */
Bitmapset *modifiedCols; /* columns needing INSERT/UPDATE permission */
+ List *securityQuals; /* any security barrier quals to apply */
} RangeTblEntry;
/*
diff --git a/src/include/rewrite/rewriteHandler.h b/src/include/rewrite/rewriteHandler.h
new file mode 100644
index 50625d4..9879c5c
*** a/src/include/rewrite/rewriteHandler.h
--- b/src/include/rewrite/rewriteHandler.h
***************
*** 20,24 ****
--- 20,26 ----
extern List *QueryRewrite(Query *parsetree);
extern void AcquireRewriteLocks(Query *parsetree, bool forUpdatePushedDown);
extern Node *build_column_default(Relation rel, int attrno);
+ extern Datum pg_is_view_updatable(PG_FUNCTION_ARGS);
+ extern Datum pg_is_view_insertable(PG_FUNCTION_ARGS);
#endif /* REWRITEHANDLER_H */
diff --git a/src/include/rewrite/rewriteManip.h b/src/include/rewrite/rewriteManip.h
new file mode 100644
index e13331d..4e0b867
*** a/src/include/rewrite/rewriteManip.h
--- b/src/include/rewrite/rewriteManip.h
*************** extern Node *ResolveNew(Node *node, int
*** 74,77 ****
--- 74,79 ----
List *targetlist, int event, int update_varno,
bool *outer_hasSubLinks);
+ extern Bitmapset *adjust_column_set(Bitmapset *cols, List *targetlist);
+
#endif /* REWRITEMANIP_H */
diff --git a/src/test/regress/expected/triggers.out b/src/test/regress/expected/triggers.out
new file mode 100644
index b5af066..5439120
*** a/src/test/regress/expected/triggers.out
--- b/src/test/regress/expected/triggers.out
*************** DROP TABLE min_updates_test_oids;
*** 820,839 ****
-- Test triggers on views
--
CREATE VIEW main_view AS SELECT a, b FROM main_table;
- -- Updates should fail without rules or triggers
- INSERT INTO main_view VALUES (1,2);
- ERROR: cannot insert into view "main_view"
- HINT: You need an unconditional ON INSERT DO INSTEAD rule or an INSTEAD OF INSERT trigger.
- UPDATE main_view SET b = 20 WHERE a = 50;
- ERROR: cannot update view "main_view"
- HINT: You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger.
- DELETE FROM main_view WHERE a = 50;
- ERROR: cannot delete from view "main_view"
- HINT: You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger.
- -- Should fail even when there are no matching rows
- DELETE FROM main_view WHERE a = 51;
- ERROR: cannot delete from view "main_view"
- HINT: You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger.
-- VIEW trigger function
CREATE OR REPLACE FUNCTION view_trigger() RETURNS trigger
LANGUAGE plpgsql AS $$
--- 820,825 ----
diff --git a/src/test/regress/expected/updatable_views.out b/src/test/regress/expected/updatable_views.out
new file mode 100644
index ...3c0a424
*** a/src/test/regress/expected/updatable_views.out
--- b/src/test/regress/expected/updatable_views.out
***************
*** 0 ****
--- 1,744 ----
+ --
+ -- UPDATABLE VIEWS
+ --
+ CREATE TABLE base_tbl (a int PRIMARY KEY, b text DEFAULT 'Unspecified');
+ INSERT INTO base_tbl VALUES (1, 'Row 1');
+ INSERT INTO base_tbl VALUES (2, 'Row 2');
+ -- non-updatable views
+ CREATE VIEW ro_view1 AS SELECT DISTINCT a, b FROM base_tbl; -- DISTINCT not supported
+ CREATE VIEW ro_view2 AS SELECT a, b FROM base_tbl GROUP BY a, b; -- GROUP BY not supported
+ CREATE VIEW ro_view3 AS SELECT 1 FROM base_tbl HAVING max(a) > 0; -- HAVING not supported
+ CREATE VIEW ro_view4 AS SELECT count(*) FROM base_tbl; -- Aggregate functions not supported
+ CREATE VIEW ro_view5 AS SELECT a, rank() OVER() FROM base_tbl; -- Window functions not supported
+ CREATE VIEW ro_view6 AS SELECT a, b FROM base_tbl UNION SELECT -a, b FROM base_tbl; -- Set ops not supported
+ CREATE VIEW ro_view7 AS WITH t AS (SELECT a, b FROM base_tbl) SELECT * FROM t; -- WITH [RECURSIVE] not supported
+ CREATE VIEW ro_view8 AS SELECT a, b FROM base_tbl ORDER BY a OFFSET 1; -- OFFSET not supported
+ CREATE VIEW ro_view9 AS SELECT a, b FROM base_tbl ORDER BY a LIMIT 1; -- LIMIT not supported
+ CREATE VIEW ro_view10 AS SELECT 1 AS a; -- No base relations
+ CREATE VIEW ro_view11 AS SELECT b1.a, b2.b FROM base_tbl b1, base_tbl b2; -- Multiple base relations
+ CREATE VIEW ro_view12 AS SELECT * FROM generate_series(1, 10) AS g(a); -- SRF in rangetable
+ CREATE VIEW ro_view13 AS SELECT a, b FROM (SELECT * FROM base_tbl) AS t; -- Subselect in rangetable
+ CREATE VIEW ro_view14 AS SELECT ctid FROM base_tbl; -- System columns not supported
+ CREATE VIEW ro_view15 AS SELECT a, upper(b) FROM base_tbl; -- Expression/function in targetlist
+ CREATE VIEW ro_view16 AS SELECT a, b, a AS aa FROM base_tbl; -- Repeated column
+ CREATE VIEW ro_view17 AS SELECT * FROM ro_view1; -- Base relation not updatable
+ SELECT table_name, is_updatable, is_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'ro_view%'
+ ORDER BY table_name;
+ table_name | is_updatable | is_insertable_into
+ ------------+--------------+--------------------
+ ro_view1 | NO | NO
+ ro_view10 | NO | NO
+ ro_view11 | NO | NO
+ ro_view12 | NO | NO
+ ro_view13 | NO | NO
+ ro_view14 | NO | NO
+ ro_view15 | NO | NO
+ ro_view16 | NO | NO
+ ro_view17 | NO | NO
+ ro_view2 | NO | NO
+ ro_view3 | NO | NO
+ ro_view4 | NO | NO
+ ro_view5 | NO | NO
+ ro_view6 | NO | NO
+ ro_view7 | NO | NO
+ ro_view8 | NO | NO
+ ro_view9 | NO | NO
+ (17 rows)
+
+ DELETE FROM ro_view1;
+ ERROR: cannot delete from view "ro_view1"
+ DETAIL: Views containing DISTINCT are not updatable.
+ HINT: You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger.
+ DELETE FROM ro_view2;
+ ERROR: cannot delete from view "ro_view2"
+ DETAIL: Views containing GROUP BY are not updatable.
+ HINT: You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger.
+ DELETE FROM ro_view3;
+ ERROR: cannot delete from view "ro_view3"
+ DETAIL: Views containing HAVING are not updatable.
+ HINT: You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger.
+ DELETE FROM ro_view4;
+ ERROR: cannot delete from view "ro_view4"
+ DETAIL: Views containing aggregates are not updatable.
+ HINT: You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger.
+ DELETE FROM ro_view5;
+ ERROR: cannot delete from view "ro_view5"
+ DETAIL: Views containing window functions are not updatable.
+ HINT: You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger.
+ DELETE FROM ro_view6;
+ ERROR: cannot delete from view "ro_view6"
+ DETAIL: Views containing UNION, INTERSECT or EXCEPT are not updatable.
+ HINT: You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger.
+ UPDATE ro_view7 SET a=a+1;
+ ERROR: cannot update view "ro_view7"
+ DETAIL: Views containing WITH [RECURSIVE] are not updatable.
+ HINT: You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger.
+ UPDATE ro_view8 SET a=a+1;
+ ERROR: cannot update view "ro_view8"
+ DETAIL: Views containing OFFSET or LIMIT are not updatable.
+ HINT: You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger.
+ UPDATE ro_view9 SET a=a+1;
+ ERROR: cannot update view "ro_view9"
+ DETAIL: Views containing OFFSET or LIMIT are not updatable.
+ HINT: You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger.
+ UPDATE ro_view10 SET a=a+1;
+ ERROR: cannot update view "ro_view10"
+ DETAIL: Views with no base relations are not updatable.
+ HINT: You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger.
+ UPDATE ro_view11 SET a=a+1;
+ ERROR: cannot update view "ro_view11"
+ DETAIL: Views with multiple base relations are not updatable.
+ HINT: You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger.
+ UPDATE ro_view12 SET a=a+1;
+ ERROR: cannot update view "ro_view12"
+ DETAIL: Views that are not based on tables or views are not updatable.
+ HINT: You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger.
+ INSERT INTO ro_view13 VALUES (3, 'Row 3');
+ ERROR: cannot insert into view "ro_view13"
+ DETAIL: Views that are not based on tables or views are not updatable.
+ HINT: You need an unconditional ON INSERT DO INSTEAD rule or an INSTEAD OF INSERT trigger.
+ INSERT INTO ro_view14 VALUES (null);
+ ERROR: cannot insert into view "ro_view14"
+ DETAIL: Views that refer to system columns are not updatable.
+ HINT: You need an unconditional ON INSERT DO INSTEAD rule or an INSTEAD OF INSERT trigger.
+ INSERT INTO ro_view15 VALUES (3, 'ROW 3');
+ ERROR: cannot insert into view "ro_view15"
+ DETAIL: Views with columns that are not simple references to columns in the base relation are not updatable.
+ HINT: You need an unconditional ON INSERT DO INSTEAD rule or an INSTEAD OF INSERT trigger.
+ INSERT INTO ro_view16 VALUES (3, 'Row 3', 3);
+ ERROR: cannot insert into view "ro_view16"
+ DETAIL: Views that refer to the same column more than once are not updatable.
+ HINT: You need an unconditional ON INSERT DO INSTEAD rule or an INSTEAD OF INSERT trigger.
+ INSERT INTO ro_view17 VALUES (3, 'ROW 3');
+ ERROR: cannot insert into view "ro_view1"
+ DETAIL: Views containing DISTINCT are not updatable.
+ HINT: You need an unconditional ON INSERT DO INSTEAD rule or an INSTEAD OF INSERT trigger.
+ DROP VIEW ro_view1, ro_view2, ro_view3, ro_view4, ro_view5,
+ ro_view6, ro_view7, ro_view8, ro_view9, ro_view10,
+ ro_view11, ro_view12, ro_view13, ro_view14, ro_view15,
+ ro_view16, ro_view17;
+ -- simple updatable view
+ CREATE VIEW rw_view1 AS SELECT * FROM base_tbl;
+ SELECT table_name, is_updatable, is_insertable_into
+ FROM information_schema.views
+ WHERE table_name = 'rw_view1';
+ table_name | is_updatable | is_insertable_into
+ ------------+--------------+--------------------
+ rw_view1 | YES | YES
+ (1 row)
+
+ INSERT INTO rw_view1 VALUES (3, 'Row 3');
+ INSERT INTO rw_view1 (a) VALUES (4);
+ UPDATE rw_view1 SET a=0 WHERE a=1;
+ DELETE FROM rw_view1 WHERE b='Row 2';
+ SELECT * FROM base_tbl;
+ a | b
+ ---+-------------
+ 3 | Row 3
+ 4 | Unspecified
+ 0 | Row 1
+ (3 rows)
+
+ EXPLAIN (costs off) UPDATE rw_view1 SET a=1 WHERE a=0;
+ QUERY PLAN
+ --------------------------------------------------
+ Update on base_tbl
+ -> Index Scan using base_tbl_pkey on base_tbl
+ Index Cond: (a = 0)
+ (3 rows)
+
+ EXPLAIN (costs off) DELETE FROM rw_view1 WHERE a=0;
+ QUERY PLAN
+ --------------------------------------------------
+ Delete on base_tbl
+ -> Index Scan using base_tbl_pkey on base_tbl
+ Index Cond: (a = 0)
+ (3 rows)
+
+ -- view on top of view hiding private data
+ INSERT INTO base_tbl VALUES (-1, 'Private data');
+ CREATE VIEW rw_view2 AS SELECT * FROM rw_view1 WHERE b !~* 'private';
+ SELECT table_name, is_updatable, is_insertable_into
+ FROM information_schema.views
+ WHERE table_name = 'rw_view2';
+ table_name | is_updatable | is_insertable_into
+ ------------+--------------+--------------------
+ rw_view2 | YES | YES
+ (1 row)
+
+ SELECT * FROM base_tbl;
+ a | b
+ ----+--------------
+ 3 | Row 3
+ 4 | Unspecified
+ 0 | Row 1
+ -1 | Private data
+ (4 rows)
+
+ SELECT * FROM rw_view2;
+ a | b
+ ---+-------------
+ 3 | Row 3
+ 4 | Unspecified
+ 0 | Row 1
+ (3 rows)
+
+ INSERT INTO rw_view2 VALUES (5, 'Row 5');
+ INSERT INTO rw_view2 (a) VALUES (6);
+ SELECT * FROM rw_view2;
+ a | b
+ ---+-------------
+ 3 | Row 3
+ 4 | Unspecified
+ 0 | Row 1
+ 5 | Row 5
+ 6 | Unspecified
+ (5 rows)
+
+ UPDATE rw_view2 SET b='Row 6' WHERE a=6;
+ DELETE FROM rw_view2 WHERE a=5;
+ SELECT * FROM rw_view2;
+ a | b
+ ---+-------------
+ 3 | Row 3
+ 4 | Unspecified
+ 0 | Row 1
+ 6 | Row 6
+ (4 rows)
+
+ EXPLAIN (costs off) UPDATE rw_view2 SET a=1 WHERE a=0;
+ QUERY PLAN
+ --------------------------------------------------
+ Update on base_tbl
+ -> Index Scan using base_tbl_pkey on base_tbl
+ Index Cond: (a = 0)
+ Filter: (b !~* 'private'::text)
+ (4 rows)
+
+ EXPLAIN (costs off) DELETE FROM rw_view2 WHERE a=0;
+ QUERY PLAN
+ --------------------------------------------------
+ Delete on base_tbl
+ -> Index Scan using base_tbl_pkey on base_tbl
+ Index Cond: (a = 0)
+ Filter: (b !~* 'private'::text)
+ (4 rows)
+
+ -- snoop on private value
+ CREATE FUNCTION snoop(a int, b text)
+ RETURNS boolean AS
+ $$
+ BEGIN
+ RAISE NOTICE 'a=%, b=%', a, b;
+ RETURN true;
+ END;
+ $$
+ LANGUAGE plpgsql COST 0.000001;
+ SELECT * FROM rw_view2 WHERE snoop(a,b);
+ NOTICE: a=3, b=Row 3
+ NOTICE: a=4, b=Unspecified
+ NOTICE: a=0, b=Row 1
+ NOTICE: a=-1, b=Private data
+ NOTICE: a=6, b=Row 6
+ a | b
+ ---+-------------
+ 3 | Row 3
+ 4 | Unspecified
+ 0 | Row 1
+ 6 | Row 6
+ (4 rows)
+
+ UPDATE rw_view2 SET a=a WHERE snoop(a,b);
+ NOTICE: a=3, b=Row 3
+ NOTICE: a=4, b=Unspecified
+ NOTICE: a=0, b=Row 1
+ NOTICE: a=-1, b=Private data
+ NOTICE: a=6, b=Row 6
+ DELETE FROM rw_view2 WHERE NOT snoop(a,b);
+ NOTICE: a=-1, b=Private data
+ NOTICE: a=3, b=Row 3
+ NOTICE: a=4, b=Unspecified
+ NOTICE: a=0, b=Row 1
+ NOTICE: a=6, b=Row 6
+ EXPLAIN (costs off) SELECT * FROM rw_view2 WHERE snoop(a,b);
+ QUERY PLAN
+ -----------------------------------------------------
+ Seq Scan on base_tbl
+ Filter: (snoop(a, b) AND (b !~* 'private'::text))
+ (2 rows)
+
+ EXPLAIN (costs off) UPDATE rw_view2 SET a=a WHERE snoop(a,b);
+ QUERY PLAN
+ -----------------------------------------------------------
+ Update on base_tbl
+ -> Seq Scan on base_tbl
+ Filter: (snoop(a, b) AND (b !~* 'private'::text))
+ (3 rows)
+
+ EXPLAIN (costs off) DELETE FROM rw_view2 WHERE NOT snoop(a,b);
+ QUERY PLAN
+ -----------------------------------------------------------------
+ Delete on base_tbl
+ -> Seq Scan on base_tbl
+ Filter: ((NOT snoop(a, b)) AND (b !~* 'private'::text))
+ (3 rows)
+
+ -- security barrier view prevents snooping
+ ALTER VIEW rw_view2 SET (security_barrier = true);
+ SELECT * FROM rw_view2 WHERE snoop(a,b);
+ NOTICE: a=3, b=Row 3
+ NOTICE: a=4, b=Unspecified
+ NOTICE: a=0, b=Row 1
+ NOTICE: a=6, b=Row 6
+ a | b
+ ---+-------------
+ 3 | Row 3
+ 4 | Unspecified
+ 0 | Row 1
+ 6 | Row 6
+ (4 rows)
+
+ UPDATE rw_view2 SET a=a WHERE snoop(a,b);
+ NOTICE: a=3, b=Row 3
+ NOTICE: a=4, b=Unspecified
+ NOTICE: a=0, b=Row 1
+ NOTICE: a=6, b=Row 6
+ DELETE FROM rw_view2 WHERE NOT snoop(a,b);
+ NOTICE: a=3, b=Row 3
+ NOTICE: a=4, b=Unspecified
+ NOTICE: a=0, b=Row 1
+ NOTICE: a=6, b=Row 6
+ EXPLAIN (costs off) SELECT * FROM rw_view2 WHERE snoop(a,b);
+ QUERY PLAN
+ -----------------------------------------
+ Subquery Scan on rw_view2
+ Filter: snoop(rw_view2.a, rw_view2.b)
+ -> Seq Scan on base_tbl
+ Filter: (b !~* 'private'::text)
+ (4 rows)
+
+ EXPLAIN (costs off) UPDATE rw_view2 SET a=a WHERE snoop(a,b);
+ QUERY PLAN
+ -----------------------------------------------------
+ Update on base_tbl
+ -> Subquery Scan on base_tbl
+ Filter: snoop(base_tbl.a, base_tbl.b)
+ -> LockRows
+ -> Seq Scan on base_tbl
+ Filter: (b !~* 'private'::text)
+ (6 rows)
+
+ EXPLAIN (costs off) DELETE FROM rw_view2 WHERE NOT snoop(a,b);
+ QUERY PLAN
+ -----------------------------------------------------
+ Delete on base_tbl
+ -> Subquery Scan on base_tbl
+ Filter: (NOT snoop(base_tbl.a, base_tbl.b))
+ -> LockRows
+ -> Seq Scan on base_tbl
+ Filter: (b !~* 'private'::text)
+ (6 rows)
+
+ -- security barrier view on top of security barrier view
+ CREATE VIEW rw_view3 AS SELECT * FROM rw_view2 WHERE snoop(a,b);
+ SELECT table_name, is_updatable, is_insertable_into
+ FROM information_schema.views
+ WHERE table_name = 'rw_view3';
+ table_name | is_updatable | is_insertable_into
+ ------------+--------------+--------------------
+ rw_view3 | YES | YES
+ (1 row)
+
+ SELECT * FROM rw_view3;
+ NOTICE: a=3, b=Row 3
+ NOTICE: a=4, b=Unspecified
+ NOTICE: a=0, b=Row 1
+ NOTICE: a=6, b=Row 6
+ a | b
+ ---+-------------
+ 3 | Row 3
+ 4 | Unspecified
+ 0 | Row 1
+ 6 | Row 6
+ (4 rows)
+
+ UPDATE rw_view3 SET a=a WHERE a=5;
+ DELETE FROM rw_view3 WHERE a=5;
+ EXPLAIN (costs off) SELECT * FROM rw_view3;
+ QUERY PLAN
+ -----------------------------------------
+ Subquery Scan on rw_view2
+ Filter: snoop(rw_view2.a, rw_view2.b)
+ -> Seq Scan on base_tbl
+ Filter: (b !~* 'private'::text)
+ (4 rows)
+
+ EXPLAIN (costs off) UPDATE rw_view3 SET a=a WHERE a=5;
+ QUERY PLAN
+ --------------------------------------------------------------
+ Update on base_tbl
+ -> Subquery Scan on base_tbl
+ Filter: snoop(base_tbl.a, base_tbl.b)
+ -> LockRows
+ -> Index Scan using base_tbl_pkey on base_tbl
+ Index Cond: (a = 5)
+ Filter: (b !~* 'private'::text)
+ (7 rows)
+
+ EXPLAIN (costs off) DELETE FROM rw_view3 WHERE a=5;
+ QUERY PLAN
+ --------------------------------------------------------------
+ Delete on base_tbl
+ -> Subquery Scan on base_tbl
+ Filter: snoop(base_tbl.a, base_tbl.b)
+ -> LockRows
+ -> Index Scan using base_tbl_pkey on base_tbl
+ Index Cond: (a = 5)
+ Filter: (b !~* 'private'::text)
+ (7 rows)
+
+ DROP VIEW rw_view1, rw_view2, rw_view3;
+ -- view on top of view with rules
+ CREATE VIEW rw_view1 AS SELECT * FROM base_tbl OFFSET 0; -- not updatable without rules/triggers
+ CREATE VIEW rw_view2 AS SELECT * FROM rw_view1;
+ SELECT table_name, is_updatable, is_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'rw_view%'
+ ORDER BY table_name;
+ table_name | is_updatable | is_insertable_into
+ ------------+--------------+--------------------
+ rw_view1 | NO | NO
+ rw_view2 | NO | NO
+ (2 rows)
+
+ CREATE RULE rw_view1_ins_rule AS ON INSERT TO rw_view1
+ DO INSTEAD INSERT INTO base_tbl VALUES (NEW.a, NEW.b) RETURNING *;
+ SELECT table_name, is_updatable, is_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'rw_view%'
+ ORDER BY table_name;
+ table_name | is_updatable | is_insertable_into
+ ------------+--------------+--------------------
+ rw_view1 | NO | YES
+ rw_view2 | NO | YES
+ (2 rows)
+
+ CREATE RULE rw_view1_upd_rule AS ON UPDATE TO rw_view1
+ DO INSTEAD UPDATE base_tbl SET b=NEW.b WHERE a=OLD.a RETURNING NEW.*;
+ SELECT table_name, is_updatable, is_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'rw_view%'
+ ORDER BY table_name;
+ table_name | is_updatable | is_insertable_into
+ ------------+--------------+--------------------
+ rw_view1 | NO | YES
+ rw_view2 | NO | YES
+ (2 rows)
+
+ CREATE RULE rw_view1_del_rule AS ON DELETE TO rw_view1
+ DO INSTEAD DELETE FROM base_tbl WHERE a=OLD.a RETURNING OLD.*;
+ SELECT table_name, is_updatable, is_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'rw_view%'
+ ORDER BY table_name;
+ table_name | is_updatable | is_insertable_into
+ ------------+--------------+--------------------
+ rw_view1 | YES | YES
+ rw_view2 | YES | YES
+ (2 rows)
+
+ SELECT * FROM rw_view2;
+ a | b
+ ----+--------------
+ -1 | Private data
+ 3 | Row 3
+ 4 | Unspecified
+ 0 | Row 1
+ 6 | Row 6
+ (5 rows)
+
+ INSERT INTO rw_view2 VALUES (7, 'Row 7') RETURNING *;
+ a | b
+ ---+-------
+ 7 | Row 7
+ (1 row)
+
+ UPDATE rw_view2 SET b='Row seven' WHERE a=7 RETURNING *;
+ a | b
+ ---+-----------
+ 7 | Row seven
+ (1 row)
+
+ SELECT * FROM rw_view2;
+ a | b
+ ----+--------------
+ -1 | Private data
+ 3 | Row 3
+ 4 | Unspecified
+ 0 | Row 1
+ 6 | Row 6
+ 7 | Row seven
+ (6 rows)
+
+ DELETE FROM rw_view2 WHERE a=7 RETURNING *;
+ a | b
+ ---+-----------
+ 7 | Row seven
+ (1 row)
+
+ -- view on top of view with triggers
+ DROP RULE rw_view1_ins_rule ON rw_view1;
+ DROP RULE rw_view1_upd_rule ON rw_view1;
+ DROP RULE rw_view1_del_rule ON rw_view1;
+ SELECT table_name, is_updatable, is_insertable_into,
+ is_trigger_updatable, is_trigger_deletable,
+ is_trigger_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'rw_view%'
+ ORDER BY table_name;
+ table_name | is_updatable | is_insertable_into | is_trigger_updatable | is_trigger_deletable | is_trigger_insertable_into
+ ------------+--------------+--------------------+----------------------+----------------------+----------------------------
+ rw_view1 | NO | NO | NO | NO | NO
+ rw_view2 | NO | NO | NO | NO | NO
+ (2 rows)
+
+ CREATE FUNCTION rw_view1_trig_fn()
+ RETURNS trigger AS
+ $$
+ BEGIN
+ IF TG_OP = 'INSERT' THEN
+ INSERT INTO base_tbl VALUES (NEW.a, NEW.b);
+ RETURN NEW;
+ ELSIF TG_OP = 'UPDATE' THEN
+ UPDATE base_tbl SET b=NEW.b WHERE a=OLD.a;
+ RETURN NEW;
+ ELSIF TG_OP = 'DELETE' THEN
+ DELETE FROM base_tbl WHERE a=OLD.a;
+ RETURN OLD;
+ END IF;
+ END;
+ $$
+ LANGUAGE plpgsql;
+ CREATE TRIGGER rw_view1_ins_trig INSTEAD OF INSERT ON rw_view1
+ FOR EACH ROW EXECUTE PROCEDURE rw_view1_trig_fn();
+ SELECT table_name, is_updatable, is_insertable_into,
+ is_trigger_updatable, is_trigger_deletable,
+ is_trigger_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'rw_view%'
+ ORDER BY table_name;
+ table_name | is_updatable | is_insertable_into | is_trigger_updatable | is_trigger_deletable | is_trigger_insertable_into
+ ------------+--------------+--------------------+----------------------+----------------------+----------------------------
+ rw_view1 | NO | NO | NO | NO | YES
+ rw_view2 | NO | YES | NO | NO | NO
+ (2 rows)
+
+ CREATE TRIGGER rw_view1_upd_trig INSTEAD OF UPDATE ON rw_view1
+ FOR EACH ROW EXECUTE PROCEDURE rw_view1_trig_fn();
+ SELECT table_name, is_updatable, is_insertable_into,
+ is_trigger_updatable, is_trigger_deletable,
+ is_trigger_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'rw_view%'
+ ORDER BY table_name;
+ table_name | is_updatable | is_insertable_into | is_trigger_updatable | is_trigger_deletable | is_trigger_insertable_into
+ ------------+--------------+--------------------+----------------------+----------------------+----------------------------
+ rw_view1 | NO | NO | YES | NO | YES
+ rw_view2 | NO | YES | NO | NO | NO
+ (2 rows)
+
+ CREATE TRIGGER rw_view1_del_trig INSTEAD OF DELETE ON rw_view1
+ FOR EACH ROW EXECUTE PROCEDURE rw_view1_trig_fn();
+ SELECT table_name, is_updatable, is_insertable_into,
+ is_trigger_updatable, is_trigger_deletable,
+ is_trigger_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'rw_view%'
+ ORDER BY table_name;
+ table_name | is_updatable | is_insertable_into | is_trigger_updatable | is_trigger_deletable | is_trigger_insertable_into
+ ------------+--------------+--------------------+----------------------+----------------------+----------------------------
+ rw_view1 | NO | NO | YES | YES | YES
+ rw_view2 | YES | YES | NO | NO | NO
+ (2 rows)
+
+ SELECT * FROM rw_view2;
+ a | b
+ ----+--------------
+ -1 | Private data
+ 3 | Row 3
+ 4 | Unspecified
+ 0 | Row 1
+ 6 | Row 6
+ (5 rows)
+
+ INSERT INTO rw_view2 VALUES (7, 'Row 7') RETURNING *;
+ a | b
+ ---+-------
+ 7 | Row 7
+ (1 row)
+
+ UPDATE rw_view2 SET b='Row seven' WHERE a=7 RETURNING *;
+ a | b
+ ---+-----------
+ 7 | Row seven
+ (1 row)
+
+ SELECT * FROM rw_view2;
+ a | b
+ ----+--------------
+ -1 | Private data
+ 3 | Row 3
+ 4 | Unspecified
+ 0 | Row 1
+ 6 | Row 6
+ 7 | Row seven
+ (6 rows)
+
+ DELETE FROM rw_view2 WHERE a=7 RETURNING *;
+ a | b
+ ---+-----------
+ 7 | Row seven
+ (1 row)
+
+ DROP VIEW rw_view1, rw_view2;
+ -- test whole row from view
+ CREATE VIEW rw_view1 AS SELECT b AS bb, a AS aa FROM base_tbl;
+ CREATE FUNCTION rw_view1_aa(x rw_view1)
+ RETURNS int AS $$ SELECT x.aa $$ LANGUAGE sql;
+ UPDATE rw_view1 v SET bb='Updated row 6' WHERE rw_view1_aa(v)=6
+ RETURNING rw_view1_aa(v), v.bb;
+ rw_view1_aa | bb
+ -------------+---------------
+ 6 | Updated row 6
+ (1 row)
+
+ SELECT * FROM base_tbl;
+ a | b
+ ----+---------------
+ -1 | Private data
+ 3 | Row 3
+ 4 | Unspecified
+ 0 | Row 1
+ 6 | Updated row 6
+ (5 rows)
+
+ EXPLAIN (costs off)
+ UPDATE rw_view1 v SET bb='Updated row 6' WHERE rw_view1_aa(v)=6
+ RETURNING rw_view1_aa(v), v.bb;
+ QUERY PLAN
+ --------------------------------------------------
+ Update on base_tbl
+ -> Index Scan using base_tbl_pkey on base_tbl
+ Index Cond: (a = 6)
+ (3 rows)
+
+ DROP FUNCTION rw_view1_aa(rw_view1);
+ DROP VIEW rw_view1;
+ DROP TABLE base_tbl;
+ -- permissions checks
+ CREATE USER view_user1;
+ CREATE USER view_user2;
+ SET SESSION AUTHORIZATION view_user1;
+ CREATE TABLE base_tbl(a int, b text, c float);
+ INSERT INTO base_tbl VALUES (1, 'Row 1', 1.0);
+ CREATE VIEW rw_view1 AS SELECT b AS bb, c AS cc, a AS aa FROM base_tbl;
+ INSERT INTO rw_view1 VALUES ('Row 2', 2.0, 2);
+ GRANT SELECT ON base_tbl TO view_user2;
+ GRANT SELECT ON rw_view1 TO view_user2;
+ GRANT UPDATE (a,c) ON base_tbl TO view_user2;
+ GRANT UPDATE (bb,cc) ON rw_view1 TO view_user2;
+ RESET SESSION AUTHORIZATION;
+ SET SESSION AUTHORIZATION view_user2;
+ CREATE VIEW rw_view2 AS SELECT b AS bb, c AS cc, a AS aa FROM base_tbl;
+ SELECT * FROM base_tbl; -- ok
+ a | b | c
+ ---+-------+---
+ 1 | Row 1 | 1
+ 2 | Row 2 | 2
+ (2 rows)
+
+ SELECT * FROM rw_view1; -- ok
+ bb | cc | aa
+ -------+----+----
+ Row 1 | 1 | 1
+ Row 2 | 2 | 2
+ (2 rows)
+
+ SELECT * FROM rw_view2; -- ok
+ bb | cc | aa
+ -------+----+----
+ Row 1 | 1 | 1
+ Row 2 | 2 | 2
+ (2 rows)
+
+ INSERT INTO base_tbl VALUES (3, 'Row 3', 3.0); -- not allowed
+ ERROR: permission denied for relation base_tbl
+ INSERT INTO rw_view1 VALUES ('Row 3', 3.0, 3); -- not allowed
+ ERROR: permission denied for relation rw_view1
+ INSERT INTO rw_view2 VALUES ('Row 3', 3.0, 3); -- not allowed
+ ERROR: permission denied for relation base_tbl
+ UPDATE base_tbl SET a=a, c=c; -- ok
+ UPDATE base_tbl SET b=b; -- not allowed
+ ERROR: permission denied for relation base_tbl
+ UPDATE rw_view1 SET bb=bb, cc=cc; -- ok
+ UPDATE rw_view1 SET aa=aa; -- not allowed
+ ERROR: permission denied for relation rw_view1
+ UPDATE rw_view2 SET aa=aa, cc=cc; -- ok
+ UPDATE rw_view2 SET bb=bb; -- not allowed
+ ERROR: permission denied for relation base_tbl
+ DELETE FROM base_tbl; -- not allowed
+ ERROR: permission denied for relation base_tbl
+ DELETE FROM rw_view1; -- not allowed
+ ERROR: permission denied for relation rw_view1
+ DELETE FROM rw_view2; -- not allowed
+ ERROR: permission denied for relation base_tbl
+ RESET SESSION AUTHORIZATION;
+ SET SESSION AUTHORIZATION view_user1;
+ GRANT INSERT, DELETE ON base_tbl TO view_user2;
+ RESET SESSION AUTHORIZATION;
+ SET SESSION AUTHORIZATION view_user2;
+ INSERT INTO base_tbl VALUES (3, 'Row 3', 3.0); -- ok
+ INSERT INTO rw_view1 VALUES ('Row 4', 4.0, 4); -- not allowed
+ ERROR: permission denied for relation rw_view1
+ INSERT INTO rw_view2 VALUES ('Row 4', 4.0, 4); -- ok
+ DELETE FROM base_tbl WHERE a=1; -- ok
+ DELETE FROM rw_view1 WHERE aa=2; -- not allowed
+ ERROR: permission denied for relation rw_view1
+ DELETE FROM rw_view2 WHERE aa=2; -- ok
+ SELECT * FROM base_tbl;
+ a | b | c
+ ---+-------+---
+ 3 | Row 3 | 3
+ 4 | Row 4 | 4
+ (2 rows)
+
+ RESET SESSION AUTHORIZATION;
+ SET SESSION AUTHORIZATION view_user1;
+ REVOKE INSERT, DELETE ON base_tbl FROM view_user2;
+ GRANT INSERT, DELETE ON rw_view1 TO view_user2;
+ RESET SESSION AUTHORIZATION;
+ SET SESSION AUTHORIZATION view_user2;
+ INSERT INTO base_tbl VALUES (5, 'Row 5', 5.0); -- not allowed
+ ERROR: permission denied for relation base_tbl
+ INSERT INTO rw_view1 VALUES ('Row 5', 5.0, 5); -- ok
+ INSERT INTO rw_view2 VALUES ('Row 6', 6.0, 6); -- not allowed
+ ERROR: permission denied for relation base_tbl
+ DELETE FROM base_tbl WHERE a=3; -- not allowed
+ ERROR: permission denied for relation base_tbl
+ DELETE FROM rw_view1 WHERE aa=3; -- ok
+ DELETE FROM rw_view2 WHERE aa=4; -- not allowed
+ ERROR: permission denied for relation base_tbl
+ SELECT * FROM base_tbl;
+ a | b | c
+ ---+-------+---
+ 4 | Row 4 | 4
+ 5 | Row 5 | 5
+ (2 rows)
+
+ RESET SESSION AUTHORIZATION;
+ DROP VIEW rw_view1, rw_view2;
+ DROP TABLE base_tbl;
+ DROP USER view_user1;
+ DROP USER view_user2;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
new file mode 100644
index ac29194..e25031e
*** a/src/test/regress/parallel_schedule
--- b/src/test/regress/parallel_schedule
*************** test: create_index create_view
*** 59,65 ****
# ----------
# Another group of parallel tests
# ----------
! test: create_aggregate create_function_3 create_cast constraints triggers inherit create_table_like typed_table vacuum drop_if_exists
# ----------
# sanity_check does a vacuum, affecting the sort order of SELECT *
--- 59,65 ----
# ----------
# Another group of parallel tests
# ----------
! test: create_aggregate create_function_3 create_cast constraints triggers inherit create_table_like typed_table vacuum drop_if_exists updatable_views
# ----------
# sanity_check does a vacuum, affecting the sort order of SELECT *
diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule
new file mode 100644
index 8576a7f..f84ba64
*** a/src/test/regress/serial_schedule
--- b/src/test/regress/serial_schedule
*************** test: create_table_like
*** 67,72 ****
--- 67,73 ----
test: typed_table
test: vacuum
test: drop_if_exists
+ test: updatable_views
test: sanity_check
test: errors
test: select
diff --git a/src/test/regress/sql/triggers.sql b/src/test/regress/sql/triggers.sql
new file mode 100644
index f88cb81..ec579ec
*** a/src/test/regress/sql/triggers.sql
--- b/src/test/regress/sql/triggers.sql
*************** DROP TABLE min_updates_test_oids;
*** 611,623 ****
CREATE VIEW main_view AS SELECT a, b FROM main_table;
- -- Updates should fail without rules or triggers
- INSERT INTO main_view VALUES (1,2);
- UPDATE main_view SET b = 20 WHERE a = 50;
- DELETE FROM main_view WHERE a = 50;
- -- Should fail even when there are no matching rows
- DELETE FROM main_view WHERE a = 51;
-
-- VIEW trigger function
CREATE OR REPLACE FUNCTION view_trigger() RETURNS trigger
LANGUAGE plpgsql AS $$
--- 611,616 ----
diff --git a/src/test/regress/sql/updatable_views.sql b/src/test/regress/sql/updatable_views.sql
new file mode 100644
index ...8d2b8d7
*** a/src/test/regress/sql/updatable_views.sql
--- b/src/test/regress/sql/updatable_views.sql
***************
*** 0 ****
--- 1,335 ----
+ --
+ -- UPDATABLE VIEWS
+ --
+
+ CREATE TABLE base_tbl (a int PRIMARY KEY, b text DEFAULT 'Unspecified');
+ INSERT INTO base_tbl VALUES (1, 'Row 1');
+ INSERT INTO base_tbl VALUES (2, 'Row 2');
+
+ -- non-updatable views
+ CREATE VIEW ro_view1 AS SELECT DISTINCT a, b FROM base_tbl; -- DISTINCT not supported
+ CREATE VIEW ro_view2 AS SELECT a, b FROM base_tbl GROUP BY a, b; -- GROUP BY not supported
+ CREATE VIEW ro_view3 AS SELECT 1 FROM base_tbl HAVING max(a) > 0; -- HAVING not supported
+ CREATE VIEW ro_view4 AS SELECT count(*) FROM base_tbl; -- Aggregate functions not supported
+ CREATE VIEW ro_view5 AS SELECT a, rank() OVER() FROM base_tbl; -- Window functions not supported
+ CREATE VIEW ro_view6 AS SELECT a, b FROM base_tbl UNION SELECT -a, b FROM base_tbl; -- Set ops not supported
+ CREATE VIEW ro_view7 AS WITH t AS (SELECT a, b FROM base_tbl) SELECT * FROM t; -- WITH [RECURSIVE] not supported
+ CREATE VIEW ro_view8 AS SELECT a, b FROM base_tbl ORDER BY a OFFSET 1; -- OFFSET not supported
+ CREATE VIEW ro_view9 AS SELECT a, b FROM base_tbl ORDER BY a LIMIT 1; -- LIMIT not supported
+ CREATE VIEW ro_view10 AS SELECT 1 AS a; -- No base relations
+ CREATE VIEW ro_view11 AS SELECT b1.a, b2.b FROM base_tbl b1, base_tbl b2; -- Multiple base relations
+ CREATE VIEW ro_view12 AS SELECT * FROM generate_series(1, 10) AS g(a); -- SRF in rangetable
+ CREATE VIEW ro_view13 AS SELECT a, b FROM (SELECT * FROM base_tbl) AS t; -- Subselect in rangetable
+ CREATE VIEW ro_view14 AS SELECT ctid FROM base_tbl; -- System columns not supported
+ CREATE VIEW ro_view15 AS SELECT a, upper(b) FROM base_tbl; -- Expression/function in targetlist
+ CREATE VIEW ro_view16 AS SELECT a, b, a AS aa FROM base_tbl; -- Repeated column
+ CREATE VIEW ro_view17 AS SELECT * FROM ro_view1; -- Base relation not updatable
+
+ SELECT table_name, is_updatable, is_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'ro_view%'
+ ORDER BY table_name;
+
+ DELETE FROM ro_view1;
+ DELETE FROM ro_view2;
+ DELETE FROM ro_view3;
+ DELETE FROM ro_view4;
+ DELETE FROM ro_view5;
+ DELETE FROM ro_view6;
+ UPDATE ro_view7 SET a=a+1;
+ UPDATE ro_view8 SET a=a+1;
+ UPDATE ro_view9 SET a=a+1;
+ UPDATE ro_view10 SET a=a+1;
+ UPDATE ro_view11 SET a=a+1;
+ UPDATE ro_view12 SET a=a+1;
+ INSERT INTO ro_view13 VALUES (3, 'Row 3');
+ INSERT INTO ro_view14 VALUES (null);
+ INSERT INTO ro_view15 VALUES (3, 'ROW 3');
+ INSERT INTO ro_view16 VALUES (3, 'Row 3', 3);
+ INSERT INTO ro_view17 VALUES (3, 'ROW 3');
+
+ DROP VIEW ro_view1, ro_view2, ro_view3, ro_view4, ro_view5,
+ ro_view6, ro_view7, ro_view8, ro_view9, ro_view10,
+ ro_view11, ro_view12, ro_view13, ro_view14, ro_view15,
+ ro_view16, ro_view17;
+
+ -- simple updatable view
+ CREATE VIEW rw_view1 AS SELECT * FROM base_tbl;
+
+ SELECT table_name, is_updatable, is_insertable_into
+ FROM information_schema.views
+ WHERE table_name = 'rw_view1';
+
+ INSERT INTO rw_view1 VALUES (3, 'Row 3');
+ INSERT INTO rw_view1 (a) VALUES (4);
+ UPDATE rw_view1 SET a=0 WHERE a=1;
+ DELETE FROM rw_view1 WHERE b='Row 2';
+ SELECT * FROM base_tbl;
+
+ EXPLAIN (costs off) UPDATE rw_view1 SET a=1 WHERE a=0;
+ EXPLAIN (costs off) DELETE FROM rw_view1 WHERE a=0;
+
+ -- view on top of view hiding private data
+ INSERT INTO base_tbl VALUES (-1, 'Private data');
+ CREATE VIEW rw_view2 AS SELECT * FROM rw_view1 WHERE b !~* 'private';
+
+ SELECT table_name, is_updatable, is_insertable_into
+ FROM information_schema.views
+ WHERE table_name = 'rw_view2';
+
+ SELECT * FROM base_tbl;
+ SELECT * FROM rw_view2;
+
+ INSERT INTO rw_view2 VALUES (5, 'Row 5');
+ INSERT INTO rw_view2 (a) VALUES (6);
+ SELECT * FROM rw_view2;
+ UPDATE rw_view2 SET b='Row 6' WHERE a=6;
+ DELETE FROM rw_view2 WHERE a=5;
+ SELECT * FROM rw_view2;
+
+ EXPLAIN (costs off) UPDATE rw_view2 SET a=1 WHERE a=0;
+ EXPLAIN (costs off) DELETE FROM rw_view2 WHERE a=0;
+
+ -- snoop on private value
+ CREATE FUNCTION snoop(a int, b text)
+ RETURNS boolean AS
+ $$
+ BEGIN
+ RAISE NOTICE 'a=%, b=%', a, b;
+ RETURN true;
+ END;
+ $$
+ LANGUAGE plpgsql COST 0.000001;
+
+ SELECT * FROM rw_view2 WHERE snoop(a,b);
+ UPDATE rw_view2 SET a=a WHERE snoop(a,b);
+ DELETE FROM rw_view2 WHERE NOT snoop(a,b);
+
+ EXPLAIN (costs off) SELECT * FROM rw_view2 WHERE snoop(a,b);
+ EXPLAIN (costs off) UPDATE rw_view2 SET a=a WHERE snoop(a,b);
+ EXPLAIN (costs off) DELETE FROM rw_view2 WHERE NOT snoop(a,b);
+
+ -- security barrier view prevents snooping
+ ALTER VIEW rw_view2 SET (security_barrier = true);
+
+ SELECT * FROM rw_view2 WHERE snoop(a,b);
+ UPDATE rw_view2 SET a=a WHERE snoop(a,b);
+ DELETE FROM rw_view2 WHERE NOT snoop(a,b);
+
+ EXPLAIN (costs off) SELECT * FROM rw_view2 WHERE snoop(a,b);
+ EXPLAIN (costs off) UPDATE rw_view2 SET a=a WHERE snoop(a,b);
+ EXPLAIN (costs off) DELETE FROM rw_view2 WHERE NOT snoop(a,b);
+
+ -- security barrier view on top of security barrier view
+ CREATE VIEW rw_view3 AS SELECT * FROM rw_view2 WHERE snoop(a,b);
+
+ SELECT table_name, is_updatable, is_insertable_into
+ FROM information_schema.views
+ WHERE table_name = 'rw_view3';
+
+ SELECT * FROM rw_view3;
+ UPDATE rw_view3 SET a=a WHERE a=5;
+ DELETE FROM rw_view3 WHERE a=5;
+
+ EXPLAIN (costs off) SELECT * FROM rw_view3;
+ EXPLAIN (costs off) UPDATE rw_view3 SET a=a WHERE a=5;
+ EXPLAIN (costs off) DELETE FROM rw_view3 WHERE a=5;
+
+ DROP VIEW rw_view1, rw_view2, rw_view3;
+
+ -- view on top of view with rules
+ CREATE VIEW rw_view1 AS SELECT * FROM base_tbl OFFSET 0; -- not updatable without rules/triggers
+ CREATE VIEW rw_view2 AS SELECT * FROM rw_view1;
+
+ SELECT table_name, is_updatable, is_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'rw_view%'
+ ORDER BY table_name;
+
+ CREATE RULE rw_view1_ins_rule AS ON INSERT TO rw_view1
+ DO INSTEAD INSERT INTO base_tbl VALUES (NEW.a, NEW.b) RETURNING *;
+
+ SELECT table_name, is_updatable, is_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'rw_view%'
+ ORDER BY table_name;
+
+ CREATE RULE rw_view1_upd_rule AS ON UPDATE TO rw_view1
+ DO INSTEAD UPDATE base_tbl SET b=NEW.b WHERE a=OLD.a RETURNING NEW.*;
+
+ SELECT table_name, is_updatable, is_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'rw_view%'
+ ORDER BY table_name;
+
+ CREATE RULE rw_view1_del_rule AS ON DELETE TO rw_view1
+ DO INSTEAD DELETE FROM base_tbl WHERE a=OLD.a RETURNING OLD.*;
+
+ SELECT table_name, is_updatable, is_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'rw_view%'
+ ORDER BY table_name;
+
+ SELECT * FROM rw_view2;
+ INSERT INTO rw_view2 VALUES (7, 'Row 7') RETURNING *;
+ UPDATE rw_view2 SET b='Row seven' WHERE a=7 RETURNING *;
+ SELECT * FROM rw_view2;
+ DELETE FROM rw_view2 WHERE a=7 RETURNING *;
+
+ -- view on top of view with triggers
+ DROP RULE rw_view1_ins_rule ON rw_view1;
+ DROP RULE rw_view1_upd_rule ON rw_view1;
+ DROP RULE rw_view1_del_rule ON rw_view1;
+
+ SELECT table_name, is_updatable, is_insertable_into,
+ is_trigger_updatable, is_trigger_deletable,
+ is_trigger_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'rw_view%'
+ ORDER BY table_name;
+
+ CREATE FUNCTION rw_view1_trig_fn()
+ RETURNS trigger AS
+ $$
+ BEGIN
+ IF TG_OP = 'INSERT' THEN
+ INSERT INTO base_tbl VALUES (NEW.a, NEW.b);
+ RETURN NEW;
+ ELSIF TG_OP = 'UPDATE' THEN
+ UPDATE base_tbl SET b=NEW.b WHERE a=OLD.a;
+ RETURN NEW;
+ ELSIF TG_OP = 'DELETE' THEN
+ DELETE FROM base_tbl WHERE a=OLD.a;
+ RETURN OLD;
+ END IF;
+ END;
+ $$
+ LANGUAGE plpgsql;
+
+ CREATE TRIGGER rw_view1_ins_trig INSTEAD OF INSERT ON rw_view1
+ FOR EACH ROW EXECUTE PROCEDURE rw_view1_trig_fn();
+
+ SELECT table_name, is_updatable, is_insertable_into,
+ is_trigger_updatable, is_trigger_deletable,
+ is_trigger_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'rw_view%'
+ ORDER BY table_name;
+
+ CREATE TRIGGER rw_view1_upd_trig INSTEAD OF UPDATE ON rw_view1
+ FOR EACH ROW EXECUTE PROCEDURE rw_view1_trig_fn();
+
+ SELECT table_name, is_updatable, is_insertable_into,
+ is_trigger_updatable, is_trigger_deletable,
+ is_trigger_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'rw_view%'
+ ORDER BY table_name;
+
+ CREATE TRIGGER rw_view1_del_trig INSTEAD OF DELETE ON rw_view1
+ FOR EACH ROW EXECUTE PROCEDURE rw_view1_trig_fn();
+
+ SELECT table_name, is_updatable, is_insertable_into,
+ is_trigger_updatable, is_trigger_deletable,
+ is_trigger_insertable_into
+ FROM information_schema.views
+ WHERE table_name LIKE 'rw_view%'
+ ORDER BY table_name;
+
+ SELECT * FROM rw_view2;
+ INSERT INTO rw_view2 VALUES (7, 'Row 7') RETURNING *;
+ UPDATE rw_view2 SET b='Row seven' WHERE a=7 RETURNING *;
+ SELECT * FROM rw_view2;
+ DELETE FROM rw_view2 WHERE a=7 RETURNING *;
+
+ DROP VIEW rw_view1, rw_view2;
+
+ -- test whole row from view
+ CREATE VIEW rw_view1 AS SELECT b AS bb, a AS aa FROM base_tbl;
+
+ CREATE FUNCTION rw_view1_aa(x rw_view1)
+ RETURNS int AS $$ SELECT x.aa $$ LANGUAGE sql;
+
+ UPDATE rw_view1 v SET bb='Updated row 6' WHERE rw_view1_aa(v)=6
+ RETURNING rw_view1_aa(v), v.bb;
+ SELECT * FROM base_tbl;
+
+ EXPLAIN (costs off)
+ UPDATE rw_view1 v SET bb='Updated row 6' WHERE rw_view1_aa(v)=6
+ RETURNING rw_view1_aa(v), v.bb;
+
+ DROP FUNCTION rw_view1_aa(rw_view1);
+ DROP VIEW rw_view1;
+ DROP TABLE base_tbl;
+
+ -- permissions checks
+ CREATE USER view_user1;
+ CREATE USER view_user2;
+
+ SET SESSION AUTHORIZATION view_user1;
+ CREATE TABLE base_tbl(a int, b text, c float);
+ INSERT INTO base_tbl VALUES (1, 'Row 1', 1.0);
+ CREATE VIEW rw_view1 AS SELECT b AS bb, c AS cc, a AS aa FROM base_tbl;
+ INSERT INTO rw_view1 VALUES ('Row 2', 2.0, 2);
+
+ GRANT SELECT ON base_tbl TO view_user2;
+ GRANT SELECT ON rw_view1 TO view_user2;
+ GRANT UPDATE (a,c) ON base_tbl TO view_user2;
+ GRANT UPDATE (bb,cc) ON rw_view1 TO view_user2;
+ RESET SESSION AUTHORIZATION;
+
+ SET SESSION AUTHORIZATION view_user2;
+ CREATE VIEW rw_view2 AS SELECT b AS bb, c AS cc, a AS aa FROM base_tbl;
+ SELECT * FROM base_tbl; -- ok
+ SELECT * FROM rw_view1; -- ok
+ SELECT * FROM rw_view2; -- ok
+
+ INSERT INTO base_tbl VALUES (3, 'Row 3', 3.0); -- not allowed
+ INSERT INTO rw_view1 VALUES ('Row 3', 3.0, 3); -- not allowed
+ INSERT INTO rw_view2 VALUES ('Row 3', 3.0, 3); -- not allowed
+
+ UPDATE base_tbl SET a=a, c=c; -- ok
+ UPDATE base_tbl SET b=b; -- not allowed
+ UPDATE rw_view1 SET bb=bb, cc=cc; -- ok
+ UPDATE rw_view1 SET aa=aa; -- not allowed
+ UPDATE rw_view2 SET aa=aa, cc=cc; -- ok
+ UPDATE rw_view2 SET bb=bb; -- not allowed
+
+ DELETE FROM base_tbl; -- not allowed
+ DELETE FROM rw_view1; -- not allowed
+ DELETE FROM rw_view2; -- not allowed
+ RESET SESSION AUTHORIZATION;
+
+ SET SESSION AUTHORIZATION view_user1;
+ GRANT INSERT, DELETE ON base_tbl TO view_user2;
+ RESET SESSION AUTHORIZATION;
+
+ SET SESSION AUTHORIZATION view_user2;
+ INSERT INTO base_tbl VALUES (3, 'Row 3', 3.0); -- ok
+ INSERT INTO rw_view1 VALUES ('Row 4', 4.0, 4); -- not allowed
+ INSERT INTO rw_view2 VALUES ('Row 4', 4.0, 4); -- ok
+ DELETE FROM base_tbl WHERE a=1; -- ok
+ DELETE FROM rw_view1 WHERE aa=2; -- not allowed
+ DELETE FROM rw_view2 WHERE aa=2; -- ok
+ SELECT * FROM base_tbl;
+ RESET SESSION AUTHORIZATION;
+
+ SET SESSION AUTHORIZATION view_user1;
+ REVOKE INSERT, DELETE ON base_tbl FROM view_user2;
+ GRANT INSERT, DELETE ON rw_view1 TO view_user2;
+ RESET SESSION AUTHORIZATION;
+
+ SET SESSION AUTHORIZATION view_user2;
+ INSERT INTO base_tbl VALUES (5, 'Row 5', 5.0); -- not allowed
+ INSERT INTO rw_view1 VALUES ('Row 5', 5.0, 5); -- ok
+ INSERT INTO rw_view2 VALUES ('Row 6', 6.0, 6); -- not allowed
+ DELETE FROM base_tbl WHERE a=3; -- not allowed
+ DELETE FROM rw_view1 WHERE aa=3; -- ok
+ DELETE FROM rw_view2 WHERE aa=4; -- not allowed
+ SELECT * FROM base_tbl;
+ RESET SESSION AUTHORIZATION;
+
+ DROP VIEW rw_view1, rw_view2;
+ DROP TABLE base_tbl;
+ DROP USER view_user1;
+ DROP USER view_user2;
On 27 August 2012 20:26, Dean Rasheed <dean.a.rasheed@gmail.com> wrote:
Here's an updated WIP patch which I'll add to the next commitfest.
Re-sending gzipped (apparently the mail system corrupted it last time).
Regards,
Dean
Attachments:
On Sun, Aug 12, 2012 at 5:14 PM, Dean Rasheed <dean.a.rasheed@gmail.com> wrote:
None of this new code kicks in for non-security barrier views, so the
kinds of plans I posted upthread remain unchanged in that case. But
now a significant fraction of the patch is code added to handle
security barrier views. Of course we could simply say that such views
aren't updatable, but that seems like an annoying limitation if there
is a feasible way round it.
Maybe it'd be a good idea to split this into two patches: the first
could implement the feature but exclude security_barrier views, and
the second could lift that restriction.
Just a thought.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On 30 August 2012 20:05, Robert Haas <robertmhaas@gmail.com> wrote:
On Sun, Aug 12, 2012 at 5:14 PM, Dean Rasheed <dean.a.rasheed@gmail.com> wrote:
None of this new code kicks in for non-security barrier views, so the
kinds of plans I posted upthread remain unchanged in that case. But
now a significant fraction of the patch is code added to handle
security barrier views. Of course we could simply say that such views
aren't updatable, but that seems like an annoying limitation if there
is a feasible way round it.Maybe it'd be a good idea to split this into two patches: the first
could implement the feature but exclude security_barrier views, and
the second could lift that restriction.
Yes, I think that makes sense.
I should hopefully get some time to look at it over the weekend.
Regards,
Dean
On 31 August 2012 07:59, Dean Rasheed <dean.a.rasheed@gmail.com> wrote:
On 30 August 2012 20:05, Robert Haas <robertmhaas@gmail.com> wrote:
On Sun, Aug 12, 2012 at 5:14 PM, Dean Rasheed <dean.a.rasheed@gmail.com> wrote:
None of this new code kicks in for non-security barrier views, so the
kinds of plans I posted upthread remain unchanged in that case. But
now a significant fraction of the patch is code added to handle
security barrier views. Of course we could simply say that such views
aren't updatable, but that seems like an annoying limitation if there
is a feasible way round it.Maybe it'd be a good idea to split this into two patches: the first
could implement the feature but exclude security_barrier views, and
the second could lift that restriction.Yes, I think that makes sense.
I should hopefully get some time to look at it over the weekend.
Here's an updated patch for the base feature (without support for
security barrier views) with updated docs and regression tests.
Regards,
Dean