From 01437a45bfd069080ffe0eb45288bfddd3de6009 Mon Sep 17 00:00:00 2001
From: Christoph Heiss <christoph.heiss@cybertec.at>
Date: Wed, 2 Feb 2022 16:44:38 +0100
Subject: [PATCH v4 1/3] Add new boolean reloption security_invoker to views

When this reloption is set to true, all references to the underlying tables will
be checked against the invoking user rather than the view owner, as is currently
implemented.

Signed-off-by: Christoph Heiss <christoph.heiss@cybertec.at>
---
 src/backend/access/common/reloptions.c    | 11 ++++
 src/backend/nodes/copyfuncs.c             |  1 +
 src/backend/nodes/equalfuncs.c            |  1 +
 src/backend/nodes/outfuncs.c              |  1 +
 src/backend/nodes/readfuncs.c             |  1 +
 src/backend/optimizer/plan/subselect.c    |  1 +
 src/backend/optimizer/prep/prepjointree.c |  1 +
 src/backend/rewrite/rewriteHandler.c      | 17 ++++--
 src/backend/utils/cache/relcache.c        | 63 +++++++++++++----------
 src/include/nodes/parsenodes.h            |  1 +
 src/include/utils/rel.h                   | 11 ++++
 11 files changed, 77 insertions(+), 32 deletions(-)

diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index d592655258..c7c62a0076 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -140,6 +140,15 @@ static relopt_bool boolRelOpts[] =
 		},
 		false
 	},
+	{
+		{
+			"security_invoker",
+			"Check permissions against underlying tables as the calling user, not as view owner",
+			RELOPT_KIND_VIEW,
+			AccessExclusiveLock
+		},
+		false
+	},
 	{
 		{
 			"vacuum_truncate",
@@ -1996,6 +2005,8 @@ view_reloptions(Datum reloptions, bool validate)
 	static const relopt_parse_elt tab[] = {
 		{"security_barrier", RELOPT_TYPE_BOOL,
 		offsetof(ViewOptions, security_barrier)},
+		{"security_invoker", RELOPT_TYPE_BOOL,
+		offsetof(ViewOptions, security_invoker)},
 		{"check_option", RELOPT_TYPE_ENUM,
 		offsetof(ViewOptions, check_option)}
 	};
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 90b5da51c9..b171992ba8 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -2465,6 +2465,7 @@ _copyRangeTblEntry(const RangeTblEntry *from)
 	COPY_NODE_FIELD(tablesample);
 	COPY_NODE_FIELD(subquery);
 	COPY_SCALAR_FIELD(security_barrier);
+	COPY_SCALAR_FIELD(security_invoker);
 	COPY_SCALAR_FIELD(jointype);
 	COPY_SCALAR_FIELD(joinmergedcols);
 	COPY_NODE_FIELD(joinaliasvars);
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 06345da3ba..a832c5fefe 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2766,6 +2766,7 @@ _equalRangeTblEntry(const RangeTblEntry *a, const RangeTblEntry *b)
 	COMPARE_NODE_FIELD(tablesample);
 	COMPARE_NODE_FIELD(subquery);
 	COMPARE_SCALAR_FIELD(security_barrier);
+	COMPARE_SCALAR_FIELD(security_invoker);
 	COMPARE_SCALAR_FIELD(jointype);
 	COMPARE_SCALAR_FIELD(joinmergedcols);
 	COMPARE_NODE_FIELD(joinaliasvars);
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 2b0236937a..883284ad0d 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -3261,6 +3261,7 @@ _outRangeTblEntry(StringInfo str, const RangeTblEntry *node)
 		case RTE_SUBQUERY:
 			WRITE_NODE_FIELD(subquery);
 			WRITE_BOOL_FIELD(security_barrier);
+			WRITE_BOOL_FIELD(security_invoker);
 			break;
 		case RTE_JOIN:
 			WRITE_ENUM_FIELD(jointype, JoinType);
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 3f68f7c18d..ad825bb27f 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1444,6 +1444,7 @@ _readRangeTblEntry(void)
 		case RTE_SUBQUERY:
 			READ_NODE_FIELD(subquery);
 			READ_BOOL_FIELD(security_barrier);
+			READ_BOOL_FIELD(security_invoker);
 			break;
 		case RTE_JOIN:
 			READ_ENUM_FIELD(jointype, JoinType);
diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c
index 41bd1ae7d4..30c66b9c6d 100644
--- a/src/backend/optimizer/plan/subselect.c
+++ b/src/backend/optimizer/plan/subselect.c
@@ -1216,6 +1216,7 @@ inline_cte_walker(Node *node, inline_cte_walker_context *context)
 			rte->rtekind = RTE_SUBQUERY;
 			rte->subquery = newquery;
 			rte->security_barrier = false;
+			rte->security_invoker = false;
 
 			/* Zero out CTE-specific fields */
 			rte->ctename = NULL;
diff --git a/src/backend/optimizer/prep/prepjointree.c b/src/backend/optimizer/prep/prepjointree.c
index 282589dec8..bd7dc1c348 100644
--- a/src/backend/optimizer/prep/prepjointree.c
+++ b/src/backend/optimizer/prep/prepjointree.c
@@ -660,6 +660,7 @@ preprocess_function_rtes(PlannerInfo *root)
 				rte->rtekind = RTE_SUBQUERY;
 				rte->subquery = funcquery;
 				rte->security_barrier = false;
+				rte->security_invoker = false;
 				/* Clear fields that should not be set in a subquery RTE */
 				rte->functions = NIL;
 				rte->funcordinality = false;
diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c
index 3d82138cb3..22a7dbb0a5 100644
--- a/src/backend/rewrite/rewriteHandler.c
+++ b/src/backend/rewrite/rewriteHandler.c
@@ -1838,6 +1838,7 @@ ApplyRetrieveRule(Query *parsetree,
 	rte->rtekind = RTE_SUBQUERY;
 	rte->subquery = rule_action;
 	rte->security_barrier = RelationIsSecurityView(relation);
+	rte->security_invoker = RelationHasSecurityInvoker(relation);
 	/* Clear fields that should not be set in a subquery RTE */
 	rte->relid = InvalidOid;
 	rte->relkind = 0;
@@ -3242,9 +3243,13 @@ rewriteTargetView(Query *parsetree, Relation view)
 				   0);
 
 	/*
-	 * Mark the new target RTE for the permissions checks that we want to
-	 * enforce against the view owner, as distinct from the query caller.  At
-	 * the relation level, require the same INSERT/UPDATE/DELETE permissions
+	 * If the view has security_invoker set, mark the new target RTE for the
+	 * permissions checks that we want to enforce against the query caller, as
+	 * distince from the view owner.
+	 * In all other cases, we want to enforce them against the view owner,
+	 * not the query caller.
+	 *
+	 * At the relation level, require the same INSERT/UPDATE/DELETE permissions
 	 * that the query caller needs against the view.  We drop the ACL_SELECT
 	 * bit that is presumably in new_rte->requiredPerms initially.
 	 *
@@ -3253,7 +3258,11 @@ rewriteTargetView(Query *parsetree, Relation view)
 	 * the executor still performs appropriate permissions checks for the
 	 * query caller's use of the view.
 	 */
-	new_rte->checkAsUser = view->rd_rel->relowner;
+	if (RelationHasSecurityInvoker(view))
+		new_rte->checkAsUser = view_rte->checkAsUser;
+	else
+		new_rte->checkAsUser = view->rd_rel->relowner;
+
 	new_rte->requiredPerms = view_rte->requiredPerms;
 
 	/*
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 2e760e8a3b..9ae03e3e8d 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -825,11 +825,14 @@ RelationBuildRuleLock(Relation relation)
 		pfree(rule_str);
 
 		/*
-		 * We want the rule's table references to be checked as though by the
-		 * table owner, not the user referencing the rule.  Therefore, scan
-		 * through the rule's actions and set the checkAsUser field on all
-		 * rtable entries.  We have to look at the qual as well, in case it
-		 * contains sublinks.
+		 * If we're dealing with a view that has the security_invoker relopt
+		 * set to true, we want the rule's table references to be checked as
+		 * the user referencing the rule.
+		 *
+		 * In all other cases, we want the rule's table references to be checked
+		 * as though by the table owner.  Therefore, scan through the rule's
+		 * actions and set the checkAsUser field on all rtable entries.  We
+		 * have to look at the qual as well, in case it contains sublinks.
 		 *
 		 * The reason for doing this when the rule is loaded, rather than when
 		 * it is stored, is that otherwise ALTER TABLE OWNER would have to
@@ -837,8 +840,12 @@ RelationBuildRuleLock(Relation relation)
 		 * the rule tree during load is relatively cheap (compared to
 		 * constructing it in the first place), so we do it here.
 		 */
-		setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
-		setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);
+		if (!(relation->rd_rel->relkind == RELKIND_VIEW
+			  && RelationHasSecurityInvoker(relation)))
+		{
+			setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
+			setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);
+		}
 
 		if (numlocks >= maxlocks)
 		{
@@ -1163,27 +1170,6 @@ retry:
 	 */
 	RelationBuildTupleDesc(relation);
 
-	/*
-	 * Fetch rules and triggers that affect this relation
-	 */
-	if (relation->rd_rel->relhasrules)
-		RelationBuildRuleLock(relation);
-	else
-	{
-		relation->rd_rules = NULL;
-		relation->rd_rulescxt = NULL;
-	}
-
-	if (relation->rd_rel->relhastriggers)
-		RelationBuildTriggers(relation);
-	else
-		relation->trigdesc = NULL;
-
-	if (relation->rd_rel->relrowsecurity)
-		RelationBuildRowSecurity(relation);
-	else
-		relation->rd_rsdesc = NULL;
-
 	/* foreign key data is not loaded till asked for */
 	relation->rd_fkeylist = NIL;
 	relation->rd_fkeyvalid = false;
@@ -1215,6 +1201,27 @@ retry:
 	/* extract reloptions if any */
 	RelationParseRelOptions(relation, pg_class_tuple);
 
+	/*
+	 * Fetch rules and triggers that affect this relation
+	 */
+	if (relation->rd_rel->relhasrules)
+		RelationBuildRuleLock(relation);
+	else
+	{
+		relation->rd_rules = NULL;
+		relation->rd_rulescxt = NULL;
+	}
+
+	if (relation->rd_rel->relhastriggers)
+		RelationBuildTriggers(relation);
+	else
+		relation->trigdesc = NULL;
+
+	if (relation->rd_rel->relrowsecurity)
+		RelationBuildRowSecurity(relation);
+	else
+		relation->rd_rsdesc = NULL;
+
 	/*
 	 * initialize the relation lock manager information
 	 */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 3e9bdc781f..1362f5d111 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1042,6 +1042,7 @@ typedef struct RangeTblEntry
 	 */
 	Query	   *subquery;		/* the sub-query */
 	bool		security_barrier;	/* is from security_barrier view? */
+	bool		security_invoker;	/* from a view with security_invoker set? */
 
 	/*
 	 * Fields valid for a join RTE (else NULL/zero):
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 6da1b220cd..9a8866c91d 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -398,6 +398,7 @@ typedef struct ViewOptions
 {
 	int32		vl_len_;		/* varlena header (do not touch directly!) */
 	bool		security_barrier;
+	bool		security_invoker;
 	ViewOptCheckOption check_option;
 } ViewOptions;
 
@@ -411,6 +412,16 @@ typedef struct ViewOptions
 	 (relation)->rd_options ?												\
 	  ((ViewOptions *) (relation)->rd_options)->security_barrier : false)
 
+/*
+ * RelationHasSecurityRelationPermissions
+ *		Returns true if the relation has the security invoker property set, or
+ *		not.  Note multiple eval of argument!
+ */
+#define RelationHasSecurityInvoker(relation)								\
+	(AssertMacro(relation->rd_rel->relkind == RELKIND_VIEW),				\
+	 (relation)->rd_options ?												\
+	  ((ViewOptions *) (relation)->rd_options)->security_invoker : false)
+
 /*
  * RelationHasCheckOption
  *		Returns true if the relation is a view defined with either the local
-- 
2.35.1

