From b5094ec11384e785dd506430f1f90e56478e8e3d Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Fri, 27 Jul 2018 11:16:11 +0900
Subject: [PATCH 2/3] Refactor VACUUM execution to avoid early lock lookups

A caller of VACUUM can perform early lookup obtention which can cause
other sessions to block on the request done, causing potentially DOS
attacks as even a non-privileged user can attempt a truncation of a
critical catalog table to block even all incoming connection attempts.

Contrary to TRUNCATE, a client could attempt a system-wide VACUUM after
building the list of relations to VACUUM, which can cause vacuum_rel()
to try to lock the relation but the thing would just lock.  When the
client specifies a list of relations and the relation needs to be
skipped, fail hard so as there is no conflict with any relation a user
has no rights to work on.

vacuum_rel() already had the sanity checks needed, except that those
were applied too late.  This commit refactors the code so as relation
skips are checked beforehand, making it safe to lock lookup attacks.

Author: Michael Paquier
Discussion: https://postgr.es/m/152512087100.19803.12733865831237526317@wrigleys.postgresql.org
---
 src/backend/commands/vacuum.c                 | 183 +++++++++++++-----
 .../isolation/expected/vacuum-conflict.out    |  89 +++++++++
 src/test/isolation/isolation_schedule         |   1 +
 src/test/isolation/specs/vacuum-conflict.spec |  33 ++++
 4 files changed, 254 insertions(+), 52 deletions(-)
 create mode 100644 src/test/isolation/expected/vacuum-conflict.out
 create mode 100644 src/test/isolation/specs/vacuum-conflict.spec

diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 5736f12b8f..79dd01874e 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -68,7 +68,11 @@ static BufferAccessStrategy vac_strategy;
 
 
 /* non-export function prototypes */
-static List *expand_vacuum_rel(VacuumRelation *vrel);
+static void RangeVarCallbackForVacuum(const RangeVar *relation,
+				  Oid relId, Oid oldRelId, void *arg);
+static bool vacuum_check_rel(Oid relid, Form_pg_class reltuple, int elevel,
+							 bool include_toast);
+static List *expand_vacuum_rel(VacuumRelation *vrel, int options);
 static List *get_all_vacuum_rels(void);
 static void vac_truncate_clog(TransactionId frozenXID,
 				  MultiXactId minMulti,
@@ -257,7 +261,7 @@ vacuum(int options, List *relations, VacuumParams *params,
 			List	   *sublist;
 			MemoryContext old_context;
 
-			sublist = expand_vacuum_rel(vrel);
+			sublist = expand_vacuum_rel(vrel, options);
 			old_context = MemoryContextSwitchTo(vac_context);
 			newrels = list_concat(newrels, sublist);
 			MemoryContextSwitchTo(old_context);
@@ -408,6 +412,117 @@ vacuum(int options, List *relations, VacuumParams *params,
 	vac_context = NULL;
 }
 
+/*
+ * Callback to RangeVarGetRelidExtended() for VACUUM processing when
+ * the list of relations to work on is provided by caller of a manual
+ * VACUUM.
+ */
+void
+RangeVarCallbackForVacuum(const RangeVar *relation,
+						  Oid relId, Oid oldRelId, void *arg)
+{
+	HeapTuple	tuple;
+	int		   *options = (int *) arg;
+	Form_pg_class reltuple;
+
+	Assert(!IsAutoVacuumWorkerProcess());
+
+	/* Nothing to do if the relation was not found. */
+	if (!OidIsValid(relId))
+		return;
+
+	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relId));
+	if (!HeapTupleIsValid(tuple))	/* should not happen */
+		elog(ERROR, "cache lookup failed for relation %u", relId);
+
+	reltuple = (Form_pg_class) GETSTRUCT(tuple);
+
+	/* Check permissions */
+
+	/*
+	 * VACUUM FULL takes an exclusive lock on the relation vacuumed, hence
+	 * restrict its access similarly to what vacuum_rel() would do.  Issuing
+	 * an ERROR makes sure that there is no lock escalation when a
+	 * non-privileged user lists this relation.
+	 */
+	if (((*options) & VACOPT_FULL) != 0)
+	{
+		(void) vacuum_check_rel(relId, reltuple, ERROR, false);
+	}
+
+	ReleaseSysCache(tuple);
+}
+
+/*
+ * Check if a given relation can be safely vacuumed or not.  If the
+ * relation needs to be skipped, issue a log message and return true to
+ * let the caller know what to do with this relation.
+ */
+static bool
+vacuum_check_rel(Oid relid, Form_pg_class reltuple, int elevel,
+				 bool include_toast)
+{
+	const char *errstr;
+	const char *logdetail;
+
+	/*
+	 * gettext_noop makes sure that no translation happens until the log
+	 * is generated.
+	 */
+	if (elevel >= ERROR)
+		errstr = gettext_noop("cannot vacuum relation \"%s\"");
+	else
+		errstr = gettext_noop("skipping vacuum of relation \"%s\"");
+
+	/*
+	 * Check permissions.
+	 *
+	 * We allow the user to vacuum a table if he is superuser, the table
+	 * owner, or the database owner (but in the latter case, only if it's not
+	 * a shared relation).  pg_class_ownercheck includes the superuser case.
+	 *
+	 * Note we may choose to treat permissions failure as a WARNING (depending
+	 * on elevel set by caller) and keep trying to vacuum the rest of the
+	 * database --- is this appropriate?
+	 */
+	if (!(pg_class_ownercheck(relid, GetUserId()) ||
+		  (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !reltuple->relisshared)))
+	{
+		if (reltuple->relisshared)
+			logdetail = gettext_noop("Only superuser can vacuum it.");
+		else if (reltuple->relnamespace == PG_CATALOG_NAMESPACE)
+			logdetail = gettext_noop("Only superuser or database owner can vacuum it.");
+		else
+			logdetail = gettext_noop("Only table or database owner can vacuum it.");
+
+		ereport(elevel,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 errmsg(errstr, NameStr(reltuple->relname)),
+				 errdetail_log("%s", logdetail)));
+		return true;
+	}
+
+	/*
+	 * Check that it's of a vacuumable relkind.  Toast relations are
+	 * discarded if this check is done at early stages when building the
+	 * list of relations to vacuum.
+	 */
+	if (reltuple->relkind != RELKIND_RELATION &&
+		reltuple->relkind != RELKIND_MATVIEW &&
+		reltuple->relkind != RELKIND_PARTITIONED_TABLE &&
+		!(include_toast && reltuple->relkind == RELKIND_TOASTVALUE))
+	{
+		ereport(elevel,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg(errstr, NameStr(reltuple->relname)),
+				 errdetail("Non-tables or special system tables are not allowed.")));
+		return true;
+	}
+
+	return false;
+}
+
+
 /*
  * Given a VacuumRelation, fill in the table OID if it wasn't specified,
  * and optionally add VacuumRelations for partitions of the table.
@@ -423,7 +538,7 @@ vacuum(int options, List *relations, VacuumParams *params,
  * are made in vac_context.
  */
 static List *
-expand_vacuum_rel(VacuumRelation *vrel)
+expand_vacuum_rel(VacuumRelation *vrel, int options)
 {
 	List	   *vacrels = NIL;
 	MemoryContext oldcontext;
@@ -454,7 +569,9 @@ expand_vacuum_rel(VacuumRelation *vrel)
 		 * below, as well as find_all_inheritors's expectation that the caller
 		 * holds some lock on the starting relation.
 		 */
-		relid = RangeVarGetRelid(vrel->relation, AccessShareLock, false);
+		relid = RangeVarGetRelidExtended(vrel->relation, AccessShareLock,
+										 0, RangeVarCallbackForVacuum,
+										 (void *) &options);
 
 		/*
 		 * Make a returnable VacuumRelation for this rel.
@@ -545,15 +662,10 @@ get_all_vacuum_rels(void)
 	{
 		Form_pg_class classForm = (Form_pg_class) GETSTRUCT(tuple);
 		MemoryContext oldcontext;
+		Oid			relid = HeapTupleGetOid(tuple);
 
-		/*
-		 * We include partitioned tables here; depending on which operation is
-		 * to be performed, caller will decide whether to process or ignore
-		 * them.
-		 */
-		if (classForm->relkind != RELKIND_RELATION &&
-			classForm->relkind != RELKIND_MATVIEW &&
-			classForm->relkind != RELKIND_PARTITIONED_TABLE)
+		/* check permissions of relation */
+		if (vacuum_check_rel(relid, classForm, DEBUG1, false))
 			continue;
 
 		/*
@@ -563,7 +675,7 @@ get_all_vacuum_rels(void)
 		 */
 		oldcontext = MemoryContextSwitchTo(vac_context);
 		vacrels = lappend(vacrels, makeVacuumRelation(NULL,
-													  HeapTupleGetOid(tuple),
+													  relid,
 													  NIL));
 		MemoryContextSwitchTo(oldcontext);
 	}
@@ -1436,47 +1548,14 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 	}
 
 	/*
-	 * Check permissions.
-	 *
-	 * We allow the user to vacuum a table if he is superuser, the table
-	 * owner, or the database owner (but in the latter case, only if it's not
-	 * a shared relation).  pg_class_ownercheck includes the superuser case.
-	 *
-	 * Note we choose to treat permissions failure as a WARNING and keep
-	 * trying to vacuum the rest of the DB --- is this appropriate?
+	 * Check if relation needs to be skipped.  This check happens as well
+	 * when building the relation list to VACUUM for a manual operation,
+	 * and needs to be done here as well as this could be spawned across
+	 * multiple transactions.
 	 */
-	if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
-		  (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
+	if (vacuum_check_rel(RelationGetRelid(onerel), onerel->rd_rel, WARNING,
+						 true))
 	{
-		if (onerel->rd_rel->relisshared)
-			ereport(WARNING,
-					(errmsg("skipping \"%s\" --- only superuser can vacuum it",
-							RelationGetRelationName(onerel))));
-		else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
-			ereport(WARNING,
-					(errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
-							RelationGetRelationName(onerel))));
-		else
-			ereport(WARNING,
-					(errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
-							RelationGetRelationName(onerel))));
-		relation_close(onerel, lmode);
-		PopActiveSnapshot();
-		CommitTransactionCommand();
-		return false;
-	}
-
-	/*
-	 * Check that it's of a vacuumable relkind.
-	 */
-	if (onerel->rd_rel->relkind != RELKIND_RELATION &&
-		onerel->rd_rel->relkind != RELKIND_MATVIEW &&
-		onerel->rd_rel->relkind != RELKIND_TOASTVALUE &&
-		onerel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-	{
-		ereport(WARNING,
-				(errmsg("skipping \"%s\" --- cannot vacuum non-tables or special system tables",
-						RelationGetRelationName(onerel))));
 		relation_close(onerel, lmode);
 		PopActiveSnapshot();
 		CommitTransactionCommand();
diff --git a/src/test/isolation/expected/vacuum-conflict.out b/src/test/isolation/expected/vacuum-conflict.out
new file mode 100644
index 0000000000..a8b7120dd4
--- /dev/null
+++ b/src/test/isolation/expected/vacuum-conflict.out
@@ -0,0 +1,89 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1_begin s1_catalog_lookup s2_auth s2_vacuum s1_commit
+step s1_begin: BEGIN;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s2_auth: SET ROLE regress_truncate_conflict;
+WARNING:  skipping vacuum of relation "pg_authid"
+step s2_vacuum: VACUUM pg_authid;
+step s1_commit: COMMIT;
+
+starting permutation: s1_begin s2_auth s2_vacuum s1_catalog_lookup s1_commit
+step s1_begin: BEGIN;
+step s2_auth: SET ROLE regress_truncate_conflict;
+WARNING:  skipping vacuum of relation "pg_authid"
+step s2_vacuum: VACUUM pg_authid;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s1_commit: COMMIT;
+
+starting permutation: s1_begin s2_auth s1_catalog_lookup s2_vacuum s1_commit
+step s1_begin: BEGIN;
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+WARNING:  skipping vacuum of relation "pg_authid"
+step s2_vacuum: VACUUM pg_authid;
+step s1_commit: COMMIT;
+
+starting permutation: s2_auth s2_vacuum s1_begin s1_catalog_lookup s1_commit
+step s2_auth: SET ROLE regress_truncate_conflict;
+WARNING:  skipping vacuum of relation "pg_authid"
+step s2_vacuum: VACUUM pg_authid;
+step s1_begin: BEGIN;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s1_commit: COMMIT;
+
+starting permutation: s1_begin s1_catalog_lookup s2_auth s2_full s1_commit
+step s1_begin: BEGIN;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s2_full: VACUUM FULL pg_authid;
+ERROR:  cannot vacuum relation "pg_authid"
+step s1_commit: COMMIT;
+
+starting permutation: s1_begin s2_auth s2_full s1_catalog_lookup s1_commit
+step s1_begin: BEGIN;
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s2_full: VACUUM FULL pg_authid;
+ERROR:  cannot vacuum relation "pg_authid"
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s1_commit: COMMIT;
+
+starting permutation: s1_begin s2_auth s1_catalog_lookup s2_full s1_commit
+step s1_begin: BEGIN;
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s2_full: VACUUM FULL pg_authid;
+ERROR:  cannot vacuum relation "pg_authid"
+step s1_commit: COMMIT;
+
+starting permutation: s2_auth s2_full s1_begin s1_catalog_lookup s1_commit
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s2_full: VACUUM FULL pg_authid;
+ERROR:  cannot vacuum relation "pg_authid"
+step s1_begin: BEGIN;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s1_commit: COMMIT;
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index 48ae740739..0ff25888af 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -77,3 +77,4 @@ test: partition-key-update-3
 test: partition-key-update-4
 test: plpgsql-toast
 test: truncate-conflict
+test: vacuum-conflict
diff --git a/src/test/isolation/specs/vacuum-conflict.spec b/src/test/isolation/specs/vacuum-conflict.spec
new file mode 100644
index 0000000000..b9bd174014
--- /dev/null
+++ b/src/test/isolation/specs/vacuum-conflict.spec
@@ -0,0 +1,33 @@
+# Test for lock lookup conflicts with VACUUM when working on unowned
+# relations, particularly catalogs should trigger an ERROR for VACUUM
+# FULL and fall through for a manual VACUUM, skipping the relation.
+
+setup
+{
+	CREATE ROLE regress_truncate_conflict;
+}
+
+teardown
+{
+	DROP ROLE regress_truncate_conflict;
+}
+
+session "s1"
+step "s1_begin"          { BEGIN; }
+step "s1_catalog_lookup" { SELECT count(*) >= 0 FROM pg_stat_activity; }
+step "s1_commit"         { COMMIT; }
+
+session "s2"
+step "s2_auth"           { SET ROLE regress_truncate_conflict; }
+step "s2_vacuum"         { VACUUM pg_authid; }
+step "s2_full"           { VACUUM FULL pg_authid; }
+
+permutation "s1_begin" "s1_catalog_lookup" "s2_auth" "s2_vacuum" "s1_commit"
+permutation "s1_begin" "s2_auth" "s2_vacuum" "s1_catalog_lookup" "s1_commit"
+permutation "s1_begin" "s2_auth" "s1_catalog_lookup" "s2_vacuum" "s1_commit"
+permutation "s2_auth" "s2_vacuum" "s1_begin" "s1_catalog_lookup" "s1_commit"
+
+permutation "s1_begin" "s1_catalog_lookup" "s2_auth" "s2_full" "s1_commit"
+permutation "s1_begin" "s2_auth" "s2_full" "s1_catalog_lookup" "s1_commit"
+permutation "s1_begin" "s2_auth" "s1_catalog_lookup" "s2_full" "s1_commit"
+permutation "s2_auth" "s2_full" "s1_begin" "s1_catalog_lookup" "s1_commit"
-- 
2.18.0

