From 0812127447a77029f669c5baa0eaca48f7397315 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Thu, 26 Jul 2018 13:38:05 +0900
Subject: [PATCH 2/3] Refactor VACUUM execution to avoid early lock lookups

A caller of VACUUM can perform early lookup obtention which can cause
other sessions to block on the request done, causing potentially DOS
attacks as even a non-privileged user can attempt a truncation of a
critical catalog table to block even all incoming connection attempts.

Contrary to TRUNCATE, a client could attempt a system-wide VACUUM after
building the list of relations to VACUUM, which can cause vacuum_rel()
to try to lock the relation but the thing would just lock.  When the
client specifies a list of relations and the relation needs to be
skipped, fail hard so as there is no conflict with any relation a user
has no rights to work on.

vacuum_rel() already had the sanity checks needed, except that those
were applied too late.  This commit refactors the code so as relation
skips are checked beforehand, making it safe to lock lookup attacks.

Author: Michael Paquier
Discussion: https://postgr.es/m/152512087100.19803.12733865831237526317@wrigleys.postgresql.org
---
 src/backend/commands/vacuum.c | 168 +++++++++++++++++++++++-----------
 1 file changed, 116 insertions(+), 52 deletions(-)

diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 5736f12b8f..502564db68 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -30,11 +30,13 @@
 #include "access/multixact.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "catalog/catalog.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
 #include "commands/cluster.h"
+#include "commands/tablecmds.h"
 #include "commands/vacuum.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
@@ -47,6 +49,7 @@
 #include "utils/acl.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
@@ -68,7 +71,10 @@ static BufferAccessStrategy vac_strategy;
 
 
 /* non-export function prototypes */
-static List *expand_vacuum_rel(VacuumRelation *vrel);
+static void RangeVarCallbackForVacuum(const RangeVar *relation,
+				  Oid relId, Oid oldRelId, void *arg);
+static bool vacuum_skip_rel(Oid relid, Form_pg_class reltuple, int elevel);
+static List *expand_vacuum_rel(VacuumRelation *vrel, int options);
 static List *get_all_vacuum_rels(void);
 static void vac_truncate_clog(TransactionId frozenXID,
 				  MultiXactId minMulti,
@@ -257,7 +263,7 @@ vacuum(int options, List *relations, VacuumParams *params,
 			List	   *sublist;
 			MemoryContext old_context;
 
-			sublist = expand_vacuum_rel(vrel);
+			sublist = expand_vacuum_rel(vrel, options);
 			old_context = MemoryContextSwitchTo(vac_context);
 			newrels = list_concat(newrels, sublist);
 			MemoryContextSwitchTo(old_context);
@@ -408,6 +414,101 @@ vacuum(int options, List *relations, VacuumParams *params,
 	vac_context = NULL;
 }
 
+/*
+ * Callback to RangeVarGetRelidExtended() for VACUUM processing when
+ * the list of relations to work on is provided by caller of a manual
+ * VACUUM.
+ */
+void
+RangeVarCallbackForVacuum(const RangeVar *relation,
+						  Oid relId, Oid oldRelId, void *arg)
+{
+	HeapTuple	tuple;
+	int		   *options = (int *) arg;
+	Form_pg_class reltuple;
+
+	Assert(!IsAutoVacuumWorkerProcess());
+
+	/* Nothing to do if the relation was not found. */
+	if (!OidIsValid(relId))
+		return;
+
+	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relId));
+	if (!HeapTupleIsValid(tuple))	/* should not happen */
+		elog(ERROR, "cache lookup failed for relation %u", relId);
+
+	reltuple = (Form_pg_class) GETSTRUCT(tuple);
+
+	/* Check permissions */
+
+	/*
+	 * VACUUM FULL takes an exclusive lock on the relation vacuumed, hence
+	 * restrict its access similarly to what vacuum_rel() would do.  Issuing
+	 * an ERROR makes sure that there is no lock escalation when a
+	 * non-privileged user lists this relation.
+	 */
+	if (((*options) & VACOPT_FULL) != 0)
+	{
+		(void) vacuum_skip_rel(relId, reltuple, ERROR);
+	}
+
+	ReleaseSysCache(tuple);
+}
+
+/*
+ * Check if a given relation can be safely vacuumed or not.  If the
+ * relation needs to be skipped, issue a log message and return true to
+ * let the caller know what to do with this relation.
+ */
+static bool
+vacuum_skip_rel(Oid relid, Form_pg_class reltuple, int elevel)
+{
+	/*
+	 * Check permissions.
+	 *
+	 * We allow the user to vacuum a table if he is superuser, the table
+	 * owner, or the database owner (but in the latter case, only if it's not
+	 * a shared relation).  pg_class_ownercheck includes the superuser case.
+	 *
+	 * Note we choose to treat permissions failure as a WARNING and keep
+	 * trying to vacuum the rest of the DB --- is this appropriate?
+	 */
+	if (!(pg_class_ownercheck(relid, GetUserId()) ||
+		  (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !reltuple->relisshared)))
+	{
+		if (reltuple->relisshared)
+			ereport(elevel,
+					(errmsg("skipping \"%s\" --- only superuser can vacuum it",
+							NameStr(reltuple->relname))));
+		else if (reltuple->relnamespace == PG_CATALOG_NAMESPACE)
+			ereport(elevel,
+					(errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
+							NameStr(reltuple->relname))));
+		else
+			ereport(elevel,
+					(errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
+							NameStr(reltuple->relname))));
+		return true;
+	}
+
+	/*
+	 * Check that it's of a vacuumable relkind.
+	 */
+	if (reltuple->relkind != RELKIND_RELATION &&
+		reltuple->relkind != RELKIND_MATVIEW &&
+		reltuple->relkind != RELKIND_TOASTVALUE &&
+		reltuple->relkind != RELKIND_PARTITIONED_TABLE)
+	{
+		ereport(elevel,
+				(errmsg("skipping \"%s\" --- cannot vacuum non-tables or special system tables",
+						NameStr(reltuple->relname))));
+		return true;
+	}
+
+	return false;
+}
+
+
 /*
  * Given a VacuumRelation, fill in the table OID if it wasn't specified,
  * and optionally add VacuumRelations for partitions of the table.
@@ -423,7 +524,7 @@ vacuum(int options, List *relations, VacuumParams *params,
  * are made in vac_context.
  */
 static List *
-expand_vacuum_rel(VacuumRelation *vrel)
+expand_vacuum_rel(VacuumRelation *vrel, int options)
 {
 	List	   *vacrels = NIL;
 	MemoryContext oldcontext;
@@ -454,7 +555,9 @@ expand_vacuum_rel(VacuumRelation *vrel)
 		 * below, as well as find_all_inheritors's expectation that the caller
 		 * holds some lock on the starting relation.
 		 */
-		relid = RangeVarGetRelid(vrel->relation, AccessShareLock, false);
+		relid = RangeVarGetRelidExtended(vrel->relation, AccessShareLock,
+										 false, RangeVarCallbackForVacuum,
+										 (void *) &options);
 
 		/*
 		 * Make a returnable VacuumRelation for this rel.
@@ -545,15 +648,10 @@ get_all_vacuum_rels(void)
 	{
 		Form_pg_class classForm = (Form_pg_class) GETSTRUCT(tuple);
 		MemoryContext oldcontext;
+		Oid			relid = HeapTupleGetOid(tuple);
 
-		/*
-		 * We include partitioned tables here; depending on which operation is
-		 * to be performed, caller will decide whether to process or ignore
-		 * them.
-		 */
-		if (classForm->relkind != RELKIND_RELATION &&
-			classForm->relkind != RELKIND_MATVIEW &&
-			classForm->relkind != RELKIND_PARTITIONED_TABLE)
+		/* check permissions of relation */
+		if (vacuum_skip_rel(relid, classForm, DEBUG1))
 			continue;
 
 		/*
@@ -563,7 +661,7 @@ get_all_vacuum_rels(void)
 		 */
 		oldcontext = MemoryContextSwitchTo(vac_context);
 		vacrels = lappend(vacrels, makeVacuumRelation(NULL,
-													  HeapTupleGetOid(tuple),
+													  relid,
 													  NIL));
 		MemoryContextSwitchTo(oldcontext);
 	}
@@ -1436,47 +1534,13 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 	}
 
 	/*
-	 * Check permissions.
-	 *
-	 * We allow the user to vacuum a table if he is superuser, the table
-	 * owner, or the database owner (but in the latter case, only if it's not
-	 * a shared relation).  pg_class_ownercheck includes the superuser case.
-	 *
-	 * Note we choose to treat permissions failure as a WARNING and keep
-	 * trying to vacuum the rest of the DB --- is this appropriate?
+	 * Check if relation needs to be skipped.  This check happens as well
+	 * when building the relation list to VACUUM for a manual operation,
+	 * and needs to be done here as well as this could be spawned across
+	 * multiple transactions.
 	 */
-	if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
-		  (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
+	if (vacuum_skip_rel(RelationGetRelid(onerel), onerel->rd_rel, WARNING))
 	{
-		if (onerel->rd_rel->relisshared)
-			ereport(WARNING,
-					(errmsg("skipping \"%s\" --- only superuser can vacuum it",
-							RelationGetRelationName(onerel))));
-		else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
-			ereport(WARNING,
-					(errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
-							RelationGetRelationName(onerel))));
-		else
-			ereport(WARNING,
-					(errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
-							RelationGetRelationName(onerel))));
-		relation_close(onerel, lmode);
-		PopActiveSnapshot();
-		CommitTransactionCommand();
-		return false;
-	}
-
-	/*
-	 * Check that it's of a vacuumable relkind.
-	 */
-	if (onerel->rd_rel->relkind != RELKIND_RELATION &&
-		onerel->rd_rel->relkind != RELKIND_MATVIEW &&
-		onerel->rd_rel->relkind != RELKIND_TOASTVALUE &&
-		onerel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-	{
-		ereport(WARNING,
-				(errmsg("skipping \"%s\" --- cannot vacuum non-tables or special system tables",
-						RelationGetRelationName(onerel))));
 		relation_close(onerel, lmode);
 		PopActiveSnapshot();
 		CommitTransactionCommand();
-- 
2.18.0

