From b0d4d6a5bfa8b139612db946db786cd3a63de9bc Mon Sep 17 00:00:00 2001
From: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Date: Mon, 21 Oct 2024 16:47:59 +0530
Subject: [PATCH 2/2] Improve table permission error

In order to select or modify a column, a user requires either appropriate
permissions on the table or appropriate permissions on the column or both. When
neither permissions exist, the user gets an error message "permission denied on
the table ..." which may lead the user into thinking that they need table level
permissions. That is not correct. Improve the error to mention both the table as
well as the column for which the permissions are missing.

Author: Ashutosh Bapat
Discussion: https://postgr.es/m/CAExHW5swFANiB9JmqRoGg_Rkr%2BM%3Dqh%2Bci_zfOtQXFT%2BA%3D%2BjB-A%40mail.gmail.com
---
 src/backend/catalog/aclchk.c                  | 65 +++++++++++++++----
 src/backend/executor/execMain.c               | 65 +++++++++++++++----
 src/backend/statistics/extended_stats.c       |  2 +-
 src/backend/utils/adt/acl.c                   | 12 ++--
 src/include/utils/acl.h                       |  6 +-
 src/test/regress/expected/updatable_views.out |  5 +-
 6 files changed, 119 insertions(+), 36 deletions(-)

diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c
index 95eb0b12277..6ed255ddaf5 100644
--- a/src/backend/catalog/aclchk.c
+++ b/src/backend/catalog/aclchk.c
@@ -2998,10 +2998,40 @@ aclcheck_error_col(AclResult aclerr, ObjectType objtype,
 			/* no error, so return to caller */
 			break;
 		case ACLCHECK_NO_PRIV:
-			ereport(ERROR,
-					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-					 errmsg("permission denied for column \"%s\" of relation \"%s\"",
-							colname, objectname)));
+			if (!colname)
+			{
+				/*
+				 * No column has the required privilege checks even though any
+				 * column with the required privileges would have sufficed.
+				 */
+				ereport(ERROR,
+						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+						 errmsg("permission denied for relation \"%s\" and all of its columns",
+								objectname),
+						 errhint("required permission on the relation or any of its columns")));
+			}
+			else if (objtype == OBJECT_COLUMN)
+			{
+				/*
+				 * The column (as against its relation) doesn't have required
+				 * privileges.
+				 */
+				ereport(ERROR,
+						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+						 errmsg("permission denied for column \"%s\" of relation \"%s\"",
+								colname, objectname)));
+			}
+			else
+			{
+				/*
+				 * Neither the relation nor the column has the required
+				 * privileges.
+				 */
+				ereport(ERROR,
+						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+						 errmsg("permission denied for relation \"%s\" as well as its column \"%s\"",
+								objectname, colname)));
+			}
 			break;
 		case ACLCHECK_NOT_OWNER:
 			/* relation msg is OK since columns don't have separate owners */
@@ -3946,14 +3976,19 @@ pg_attribute_aclcheck_ext(Oid table_oid, AttrNumber attnum,
  * Exported routine for checking a user's access privileges to any/all columns
  *
  * If 'how' is ACLMASK_ANY, then returns ACLCHECK_OK if user has any of the
- * privileges identified by 'mode' on any non-dropped column in the relation;
- * otherwise returns a suitable error code (in practice, always
- * ACLCHECK_NO_PRIV).
+ * privileges identified by 'mode' on any non-dropped column in the relation.
+ * Otherwise returns a suitable error code (in practice, always
+ * ACLCHECK_NO_PRIV) with attnum, if provided, set to
+ * FirstLowInvalidHeapAttributeNumber.
  *
  * If 'how' is ACLMASK_ALL, then returns ACLCHECK_OK if user has any of the
  * privileges identified by 'mode' on each non-dropped column in the relation
- * (and there must be at least one such column); otherwise returns a suitable
- * error code (in practice, always ACLCHECK_NO_PRIV).
+ * (and there must be at least one such column). Otherwise returns a suitable
+ * error code (in practice, always ACLCHECK_NO_PRIV) with attnum, if provided,
+ * set to the first column without required privileges. If there are no columns,
+ * attnum is set to FirstLowInvalidHeapAttributeNumber.
+ *
+ * When the function returns ACLCHECK_OK, attnum is undefined, if provided.
  *
  * As with pg_attribute_aclmask, only privileges granted directly on the
  * column(s) are considered here.
@@ -3963,9 +3998,9 @@ pg_attribute_aclcheck_ext(Oid table_oid, AttrNumber attnum,
  */
 AclResult
 pg_attribute_aclcheck_all(Oid table_oid, Oid roleid, AclMode mode,
-						  AclMaskHow how)
+						  AclMaskHow how, AttrNumber *attnum)
 {
-	return pg_attribute_aclcheck_all_ext(table_oid, roleid, mode, how, NULL);
+	return pg_attribute_aclcheck_all_ext(table_oid, roleid, mode, how, NULL, attnum);
 }
 
 /*
@@ -3975,7 +4010,7 @@ pg_attribute_aclcheck_all(Oid table_oid, Oid roleid, AclMode mode,
 AclResult
 pg_attribute_aclcheck_all_ext(Oid table_oid, Oid roleid,
 							  AclMode mode, AclMaskHow how,
-							  bool *is_missing)
+							  bool *is_missing, AttrNumber *attnum)
 {
 	AclResult	result;
 	HeapTuple	classTuple;
@@ -4014,6 +4049,8 @@ pg_attribute_aclcheck_all_ext(Oid table_oid, Oid roleid,
 	 * report failure in such cases for either value of 'how'.
 	 */
 	result = ACLCHECK_NO_PRIV;
+	if (attnum)
+		*attnum = FirstLowInvalidHeapAttributeNumber;
 
 	for (curr_att = 1; curr_att <= nattrs; curr_att++)
 	{
@@ -4076,7 +4113,11 @@ pg_attribute_aclcheck_all_ext(Oid table_oid, Oid roleid,
 		{
 			result = ACLCHECK_NO_PRIV;
 			if (how == ACLMASK_ALL)
+			{
+				if (attnum)
+					*attnum = curr_att;
 				break;			/* fail on any failure */
+			}
 		}
 	}
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index cc9a594cba5..8ee042065f3 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -84,10 +84,12 @@ static void ExecutePlan(EState *estate, PlanState *planstate,
 						ScanDirection direction,
 						DestReceiver *dest,
 						bool execute_once);
-static bool ExecCheckOneRelPerms(RTEPermissionInfo *perminfo);
+static bool ExecCheckOneRelPerms(RTEPermissionInfo *perminfo,
+								 AttrNumber *attnum);
 static bool ExecCheckPermissionsModified(Oid relOid, Oid userid,
 										 Bitmapset *modifiedCols,
-										 AclMode requiredPerms);
+										 AclMode requiredPerms,
+										 AttrNumber *attnum);
 static void ExecCheckXactReadOnly(PlannedStmt *plannedstmt);
 static void EvalPlanQualStart(EPQState *epqstate, Plan *planTree);
 
@@ -614,15 +616,39 @@ ExecCheckPermissions(List *rangeTable, List *rteperminfos,
 	foreach(l, rteperminfos)
 	{
 		RTEPermissionInfo *perminfo = lfirst_node(RTEPermissionInfo, l);
+		AttrNumber	attno;
 
 		Assert(OidIsValid(perminfo->relid));
-		result = ExecCheckOneRelPerms(perminfo);
+		result = ExecCheckOneRelPerms(perminfo, &attno);
 		if (!result)
 		{
 			if (ereport_on_violation)
-				aclcheck_error(ACLCHECK_NO_PRIV,
-							   get_relkind_objtype(get_rel_relkind(perminfo->relid)),
-							   get_rel_name(perminfo->relid));
+			{
+				if (AttributeNumberIsValid(attno))
+				{
+					const char *colname;
+
+					if (attno > FirstLowInvalidHeapAttributeNumber)
+						colname = get_attname(perminfo->relid, attno, true);
+					else
+					{
+						/*
+						 * None of the columns have required privileges. We
+						 * cannot name any one column. aclcheck_error_col()
+						 * knows how to handle this case.
+						 */
+						colname = NULL;
+					}
+
+					aclcheck_error_col(ACLCHECK_NO_PRIV,
+									   get_relkind_objtype(get_rel_relkind(perminfo->relid)),
+									   get_rel_name(perminfo->relid), colname);
+				}
+				else
+					aclcheck_error(ACLCHECK_NO_PRIV,
+								   get_relkind_objtype(get_rel_relkind(perminfo->relid)),
+								   get_rel_name(perminfo->relid));
+			}
 			return false;
 		}
 	}
@@ -636,9 +662,14 @@ ExecCheckPermissions(List *rangeTable, List *rteperminfos,
 /*
  * ExecCheckOneRelPerms
  *		Check access permissions for a single relation.
+ *
+ * If column level privileges are examined, the first column to fail the checks
+ * is returned in attnum. Please refer to pg_attribute_aclcheck_all() for
+ * details. When column privileges are not examined, attnum is set to
+ * InvalidAttrNumber.
  */
 static bool
-ExecCheckOneRelPerms(RTEPermissionInfo *perminfo)
+ExecCheckOneRelPerms(RTEPermissionInfo *perminfo, AttrNumber *attnum)
 {
 	AclMode		requiredPerms;
 	AclMode		relPerms;
@@ -648,6 +679,7 @@ ExecCheckOneRelPerms(RTEPermissionInfo *perminfo)
 
 	requiredPerms = perminfo->requiredPerms;
 	Assert(requiredPerms != 0);
+	*attnum = InvalidAttrNumber;
 
 	/*
 	 * userid to check as: current user unless we have a setuid indication.
@@ -695,7 +727,7 @@ ExecCheckOneRelPerms(RTEPermissionInfo *perminfo)
 			if (bms_is_empty(perminfo->selectedCols))
 			{
 				if (pg_attribute_aclcheck_all(relOid, userid, ACL_SELECT,
-											  ACLMASK_ANY) != ACLCHECK_OK)
+											  ACLMASK_ANY, attnum) != ACLCHECK_OK)
 					return false;
 			}
 
@@ -708,14 +740,17 @@ ExecCheckOneRelPerms(RTEPermissionInfo *perminfo)
 				{
 					/* Whole-row reference, must have priv on all cols */
 					if (pg_attribute_aclcheck_all(relOid, userid, ACL_SELECT,
-												  ACLMASK_ALL) != ACLCHECK_OK)
+												  ACLMASK_ALL, attnum) != ACLCHECK_OK)
 						return false;
 				}
 				else
 				{
 					if (pg_attribute_aclcheck(relOid, attno, userid,
 											  ACL_SELECT) != ACLCHECK_OK)
+					{
+						*attnum = attno;
 						return false;
+					}
 				}
 			}
 		}
@@ -728,14 +763,14 @@ ExecCheckOneRelPerms(RTEPermissionInfo *perminfo)
 			!ExecCheckPermissionsModified(relOid,
 										  userid,
 										  perminfo->insertedCols,
-										  ACL_INSERT))
+										  ACL_INSERT, attnum))
 			return false;
 
 		if (remainingPerms & ACL_UPDATE &&
 			!ExecCheckPermissionsModified(relOid,
 										  userid,
 										  perminfo->updatedCols,
-										  ACL_UPDATE))
+										  ACL_UPDATE, attnum))
 			return false;
 	}
 	return true;
@@ -748,7 +783,7 @@ ExecCheckOneRelPerms(RTEPermissionInfo *perminfo)
  */
 static bool
 ExecCheckPermissionsModified(Oid relOid, Oid userid, Bitmapset *modifiedCols,
-							 AclMode requiredPerms)
+							 AclMode requiredPerms, AttrNumber *attnum)
 {
 	int			col = -1;
 
@@ -760,7 +795,7 @@ ExecCheckPermissionsModified(Oid relOid, Oid userid, Bitmapset *modifiedCols,
 	if (bms_is_empty(modifiedCols))
 	{
 		if (pg_attribute_aclcheck_all(relOid, userid, requiredPerms,
-									  ACLMASK_ANY) != ACLCHECK_OK)
+									  ACLMASK_ANY, attnum) != ACLCHECK_OK)
 			return false;
 	}
 
@@ -778,7 +813,11 @@ ExecCheckPermissionsModified(Oid relOid, Oid userid, Bitmapset *modifiedCols,
 		{
 			if (pg_attribute_aclcheck(relOid, attno, userid,
 									  requiredPerms) != ACLCHECK_OK)
+			{
+				if (attnum)
+					*attnum = attno;
 				return false;
+			}
 		}
 	}
 	return true;
diff --git a/src/backend/statistics/extended_stats.c b/src/backend/statistics/extended_stats.c
index 99fdf208dba..79d261c692f 100644
--- a/src/backend/statistics/extended_stats.c
+++ b/src/backend/statistics/extended_stats.c
@@ -1662,7 +1662,7 @@ statext_is_compatible_clause(PlannerInfo *root, Node *clause, Index relid,
 			{
 				/* Whole-row reference, so must have access to all columns */
 				if (pg_attribute_aclcheck_all(rte->relid, userid, ACL_SELECT,
-											  ACLMASK_ALL) != ACLCHECK_OK)
+											  ACLMASK_ALL, NULL) != ACLCHECK_OK)
 					return false;
 			}
 			else
diff --git a/src/backend/utils/adt/acl.c b/src/backend/utils/adt/acl.c
index 2a716cc6b7f..973b568c4b5 100644
--- a/src/backend/utils/adt/acl.c
+++ b/src/backend/utils/adt/acl.c
@@ -2330,7 +2330,7 @@ has_any_column_privilege_name_name(PG_FUNCTION_ARGS)
 	aclresult = pg_class_aclcheck(tableoid, roleid, mode);
 	if (aclresult != ACLCHECK_OK)
 		aclresult = pg_attribute_aclcheck_all(tableoid, roleid, mode,
-											  ACLMASK_ANY);
+											  ACLMASK_ANY, NULL);
 
 	PG_RETURN_BOOL(aclresult == ACLCHECK_OK);
 }
@@ -2359,7 +2359,7 @@ has_any_column_privilege_name(PG_FUNCTION_ARGS)
 	aclresult = pg_class_aclcheck(tableoid, roleid, mode);
 	if (aclresult != ACLCHECK_OK)
 		aclresult = pg_attribute_aclcheck_all(tableoid, roleid, mode,
-											  ACLMASK_ANY);
+											  ACLMASK_ANY, NULL);
 
 	PG_RETURN_BOOL(aclresult == ACLCHECK_OK);
 }
@@ -2390,7 +2390,7 @@ has_any_column_privilege_name_id(PG_FUNCTION_ARGS)
 		if (is_missing)
 			PG_RETURN_NULL();
 		aclresult = pg_attribute_aclcheck_all_ext(tableoid, roleid, mode,
-												  ACLMASK_ANY, &is_missing);
+												  ACLMASK_ANY, &is_missing, NULL);
 		if (is_missing)
 			PG_RETURN_NULL();
 	}
@@ -2424,7 +2424,7 @@ has_any_column_privilege_id(PG_FUNCTION_ARGS)
 		if (is_missing)
 			PG_RETURN_NULL();
 		aclresult = pg_attribute_aclcheck_all_ext(tableoid, roleid, mode,
-												  ACLMASK_ANY, &is_missing);
+												  ACLMASK_ANY, &is_missing, NULL);
 		if (is_missing)
 			PG_RETURN_NULL();
 	}
@@ -2454,7 +2454,7 @@ has_any_column_privilege_id_name(PG_FUNCTION_ARGS)
 	aclresult = pg_class_aclcheck(tableoid, roleid, mode);
 	if (aclresult != ACLCHECK_OK)
 		aclresult = pg_attribute_aclcheck_all(tableoid, roleid, mode,
-											  ACLMASK_ANY);
+											  ACLMASK_ANY, NULL);
 
 	PG_RETURN_BOOL(aclresult == ACLCHECK_OK);
 }
@@ -2483,7 +2483,7 @@ has_any_column_privilege_id_id(PG_FUNCTION_ARGS)
 		if (is_missing)
 			PG_RETURN_NULL();
 		aclresult = pg_attribute_aclcheck_all_ext(tableoid, roleid, mode,
-												  ACLMASK_ANY, &is_missing);
+												  ACLMASK_ANY, &is_missing, NULL);
 		if (is_missing)
 			PG_RETURN_NULL();
 	}
diff --git a/src/include/utils/acl.h b/src/include/utils/acl.h
index 731d84b2a93..6656d88dd51 100644
--- a/src/include/utils/acl.h
+++ b/src/include/utils/acl.h
@@ -254,10 +254,12 @@ extern AclResult pg_attribute_aclcheck_ext(Oid table_oid, AttrNumber attnum,
 										   Oid roleid, AclMode mode,
 										   bool *is_missing);
 extern AclResult pg_attribute_aclcheck_all(Oid table_oid, Oid roleid,
-										   AclMode mode, AclMaskHow how);
+										   AclMode mode, AclMaskHow how,
+										   AttrNumber *attnum);
 extern AclResult pg_attribute_aclcheck_all_ext(Oid table_oid, Oid roleid,
 											   AclMode mode, AclMaskHow how,
-											   bool *is_missing);
+											   bool *is_missing,
+											   AttrNumber *attnum);
 extern AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode);
 extern AclResult pg_class_aclcheck_ext(Oid table_oid, Oid roleid,
 									   AclMode mode, bool *is_missing);
diff --git a/src/test/regress/expected/updatable_views.out b/src/test/regress/expected/updatable_views.out
index 8786058ed0c..25f9d67bd7a 100644
--- a/src/test/regress/expected/updatable_views.out
+++ b/src/test/regress/expected/updatable_views.out
@@ -1375,7 +1375,7 @@ SELECT * FROM rw_view2; -- ok
 (2 rows)
 
 INSERT INTO base_tbl VALUES (3, 'Row 3', 3.0); -- not allowed
-ERROR:  permission denied for table base_tbl
+ERROR:  permission denied for relation "base_tbl" as well as its column "a"
 INSERT INTO rw_view1 VALUES ('Row 3', 3.0, 3); -- not allowed
 ERROR:  permission denied for view rw_view1
 INSERT INTO rw_view2 VALUES ('Row 3', 3.0, 3); -- not allowed
@@ -1506,7 +1506,8 @@ SELECT * FROM rw_view1;
 (1 row)
 
 SELECT * FROM rw_view1 FOR UPDATE;  -- not allowed
-ERROR:  permission denied for table base_tbl
+ERROR:  permission denied for relation "base_tbl" and all of its columns
+HINT:  required permission on the relation or any of its columns
 UPDATE rw_view1 SET b = 'foo' WHERE a = 1;  -- not allowed
 ERROR:  permission denied for table base_tbl
 MERGE INTO rw_view1 t USING (VALUES (1)) AS v(a) ON t.a = v.a
-- 
2.34.1

