RangeVarGetRelid()
In commit 4240e429d0c2d889d0cda23c618f94e12c13ade7, we modified
RangeVarGetRelid() so that it acquires a lock on the target relation
"atomically" with respect to the name lookup. Since we lock OIDs, not
names, that's not possible, strictly speaking, but the idea is that we
detect whether any invalidation messages were processed between the
time we did the lookup and the time we acquired the relation lock. If
so, we double-check that the name still refers to the same object, and
if not, we release the lock on the object we had locked previously and
instead lock the new one. This is a good thing, because it means that
if you, for example, start a transaction, drop a table, create a new
one in its place, and commit the transaction, people who were blocked
waiting for the table lock will correctly latch onto the new table
instead of erroring out all over the place. This is obviously
advantageous in production situations where switching out tables in
this way has historically been quite difficult to do without
user-visible disruption.
The trouble with this mechanism, however, is that sometimes atomically
looking up the relation and locking it is not what you want to do.
For example, you might want to have a permission check in the middle,
so that you don't even briefly lock a table on which the user has no
permissions. Or, in the case of DROP INDEX, you might find it
necessary to lock the heap before the index, as a result of our
general rule that the heap locks should be acquired before index locks
to reduce deadlocks. Right now, there's no good way to do this. Some
code just opens the target relation with whatever lock is needed from
the get-go, and we just hope the transaction will abort before it
causes anyone else too much trouble. Other code looks up the relation
OID without getting a lock, does its checks, and then gets the lock -
but all of the places that take this approach uniformly lack the sort
of retry loop that is present in RangeVarGetRelid() - and are
therefore prone to latching onto a dropped relation, or maybe even
checking permissions on the relation with OID 123 and then dropping
the (eponymous) relation with OID 456. Although none of this seems
like a huge problem in practice, it's awfully ugly, and it seems like
it would be nice to clean it up.
The trouble is, I'm not quite sure how to do that. It seems like
permissions checks and lock-the-heap-for-this-index should be done in
RangeVarGetRelid() just after the block that says "if (retry)" and
just before the block that calls LockRelationOid(). That way, if we
end up deciding we need to retry the name lookup, we'll retry all that
other stuff as well, which is exactly right. The difficulty is that
different callers have different needs for what should go in that
space, to the degree that I'm a bit nervous about continuing to add
arguments to that function to satisfy what everybody needs. Maybe we
could make most of them Booleans and pass an "int flags" argument.
Another option would be to add a "callback" argument to that function
that would be called at that point with the relation, relId, and
oldRelId as arguments. Alternatively, we could just resign ourselves
to duplicating the loop in this function into each place in the code
that has a special-purpose requirement, but the function is complex
enough to make that a pretty unappealing prospect.
Or we could just punt the whole thing and do nothing, but I don't like
that either. Right now, for example, if user A is doing something
with a table, and user B (who has no permissions) tries to use it, the
pending request for an AccessExclusiveLock will block user C (who does
have permissions) from doing anything until A's transaction commits or
rolls back; only at that point do we realize that B has been naughty.
I'd like to figure out some way to fix that.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
Excerpts from Robert Haas's message of jue nov 17 17:51:06 -0300 2011:
The trouble is, I'm not quite sure how to do that. It seems like
permissions checks and lock-the-heap-for-this-index should be done in
RangeVarGetRelid() just after the block that says "if (retry)" and
just before the block that calls LockRelationOid(). That way, if we
end up deciding we need to retry the name lookup, we'll retry all that
other stuff as well, which is exactly right. The difficulty is that
different callers have different needs for what should go in that
space, to the degree that I'm a bit nervous about continuing to add
arguments to that function to satisfy what everybody needs. Maybe we
could make most of them Booleans and pass an "int flags" argument.
Another option would be to add a "callback" argument to that function
that would be called at that point with the relation, relId, and
oldRelId as arguments. Alternatively, we could just resign ourselves
to duplicating the loop in this function into each place in the code
that has a special-purpose requirement, but the function is complex
enough to make that a pretty unappealing prospect.
I'm for the callback.
--
Álvaro Herrera <alvherre@commandprompt.com>
The PostgreSQL Company - Command Prompt, Inc.
PostgreSQL Replication, Consulting, Custom Development, 24x7 support
On Thu, Nov 17, 2011 at 4:12 PM, Alvaro Herrera
<alvherre@commandprompt.com> wrote:
Excerpts from Robert Haas's message of jue nov 17 17:51:06 -0300 2011:
The trouble is, I'm not quite sure how to do that. It seems like
permissions checks and lock-the-heap-for-this-index should be done in
RangeVarGetRelid() just after the block that says "if (retry)" and
just before the block that calls LockRelationOid(). That way, if we
end up deciding we need to retry the name lookup, we'll retry all that
other stuff as well, which is exactly right. The difficulty is that
different callers have different needs for what should go in that
space, to the degree that I'm a bit nervous about continuing to add
arguments to that function to satisfy what everybody needs. Maybe we
could make most of them Booleans and pass an "int flags" argument.
Another option would be to add a "callback" argument to that function
that would be called at that point with the relation, relId, and
oldRelId as arguments. Alternatively, we could just resign ourselves
to duplicating the loop in this function into each place in the code
that has a special-purpose requirement, but the function is complex
enough to make that a pretty unappealing prospect.I'm for the callback.
I worked up the attached patch showing how this might work. For
demonstration purposes, the attached patch modifies REINDEX INDEX and
REINDEX TABLE to use this facility. It turns out that, in the current
code, they have opposite problems, so this is a good example of how
this gives you the best of both worlds. Suppose we do:
rhaas=# create table foo (a int primary key);
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index
"foo_pkey" for table "foo"
CREATE TABLE
rhaas=# begin;
BEGIN
rhaas=# insert into foo values (1);
INSERT 0 1
At this point, if we start up another session as non-privileged user,
REINDEX INDEX foo_pkey will immediately fail with a permissions error
(which is good), but REINDEX TABLE foo will queue up for a table lock
(which is bad). Now suppose we set up like this:
rhaas=# create table foo (a int primary key);
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index
"foo_pkey" for table "foo"
CREATE TABLE
rhaas=# begin;
BEGIN
rhaas=# drop table foo;
DROP TABLE
rhaas=# create table foo (a int primary key);
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index
"foo_pkey" for table "foo"
CREATE TABLE
If we open another session as superuser and say "REINDEX INDEX
foo_pkey", and then commit the open transaction in the first session,
it will fail:
rhaas=# reindex index foo_pkey;
ERROR: could not open relation with OID 16481
However, with the same setup, "REINDEX TABLE foo" will work.
With the attached patch, both situations are handled correctly by both commands.
Having now written the patch, I'm convinced that the callback is in
fact the right choice. It requires only very minor reorganization of
the existing code, which is always a plus.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
Attachments:
rangevargetrelid-callback.patchapplication/octet-stream; name=rangevargetrelid-callback.patchDownload
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 99e130c..2eab82b 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -111,7 +111,6 @@ static void validate_index_heapscan(Relation heapRelation,
IndexInfo *indexInfo,
Snapshot snapshot,
v_i_state *state);
-static Oid IndexGetRelation(Oid indexId);
static bool ReindexIsCurrentlyProcessingIndex(Oid indexOid);
static void SetReindexProcessing(Oid heapOid, Oid indexOid);
static void ResetReindexProcessing(void);
@@ -2763,7 +2762,7 @@ validate_index_heapscan(Relation heapRelation,
* IndexGetRelation: given an index's relation OID, get the OID of the
* relation it is an index on. Uses the system cache.
*/
-static Oid
+Oid
IndexGetRelation(Oid indexId)
{
HeapTuple tuple;
diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index 6d4d4b1..c74168c 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -219,10 +219,14 @@ Datum pg_is_other_temp_schema(PG_FUNCTION_ARGS);
* otherwise raise an error.
*
* If nowait = true, throw an error if we'd have to wait for a lock.
+ *
+ * Callback allows caller to check permissions or acquire additional locks
+ * prior to grabbing the relation lock.
*/
Oid
-RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok,
- bool nowait)
+RangeVarGetRelidExtended(const RangeVar *relation, LOCKMODE lockmode,
+ bool missing_ok, bool nowait,
+ RangeVarGetRelidCallback callback)
{
uint64 inval_count;
Oid relId;
@@ -336,6 +340,19 @@ RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok,
}
/*
+ * Invoke caller-supplied callback, if any.
+ *
+ * This callback is a good place to check permissions: we haven't taken
+ * the table lock yet (and it's really best to check permissions before
+ * locking anything!), but we've gotten far enough to know what OID we
+ * think we should lock. Of course, concurrent DDL might things while
+ * we're waiting for the lock, but in that case the callback will be
+ * invoked again for the new OID.
+ */
+ if (callback)
+ callback(relation, relId, oldRelId);
+
+ /*
* Lock relation. This will also accept any pending invalidation
* messages. If we got back InvalidOid, indicating not found, then
* there's nothing to lock, but we accept invalidation messages
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 69aa5bf..174a194 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -63,7 +63,10 @@ static void ComputeIndexAttrs(IndexInfo *indexInfo,
static Oid GetIndexOpClass(List *opclass, Oid attrType,
char *accessMethodName, Oid accessMethodId);
static char *ChooseIndexNameAddition(List *colnames);
-
+static void RangeVarCallbackForReindexTable(const RangeVar *relation,
+ Oid relId, Oid oldRelId);
+static void RangeVarCallbackForReindexIndex(const RangeVar *relation,
+ Oid relId, Oid oldRelId);
/*
* CheckIndexCompatible
@@ -1725,33 +1728,47 @@ void
ReindexIndex(RangeVar *indexRelation)
{
Oid indOid;
+
+ /* lock level used here should match index lock reindex_index() */
+ indOid = RangeVarGetRelidExtended(indexRelation, AccessExclusiveLock,
+ false, false,
+ RangeVarCallbackForReindexIndex);
+
+ reindex_index(indOid, false);
+}
+
+/*
+ * Check permissions on table before acquiring relation lock; also lock
+ * the heap before the RangeVarGetRelid() takes the index lock, to avoid
+ * deadlocks.
+ */
+static void
+RangeVarCallbackForReindexIndex(const RangeVar *relation,
+ Oid relId, Oid oldRelId)
+{
HeapTuple tuple;
- /*
- * XXX: This is not safe in the presence of concurrent DDL. We should
- * take AccessExclusiveLock here, but that would violate the rule that
- * indexes should only be locked after their parent tables. For now,
- * we live with it.
- */
- indOid = RangeVarGetRelid(indexRelation, NoLock, false, false);
- tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indOid));
+ tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relId));
if (!HeapTupleIsValid(tuple))
- elog(ERROR, "cache lookup failed for relation %u", indOid);
+ elog(ERROR, "cache lookup failed for relation %u", relId);
if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not an index",
- indexRelation->relname)));
+ relation->relname)));
/* Check permissions */
- if (!pg_class_ownercheck(indOid, GetUserId()))
+ if (!pg_class_ownercheck(relId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- indexRelation->relname);
+ relation->relname);
- ReleaseSysCache(tuple);
+ /* lock levels here should match reindex_index() heap lock */
+ if (OidIsValid(oldRelId))
+ UnlockRelationOid(IndexGetRelation(oldRelId), ShareLock);
+ LockRelationOid(IndexGetRelation(relId), ShareLock);
- reindex_index(indOid, false);
+ ReleaseSysCache(tuple);
}
/*
@@ -1762,13 +1779,29 @@ void
ReindexTable(RangeVar *relation)
{
Oid heapOid;
- HeapTuple tuple;
/* The lock level used here should match reindex_relation(). */
- heapOid = RangeVarGetRelid(relation, ShareLock, false, false);
- tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(heapOid));
+ heapOid = RangeVarGetRelidExtended(relation, ShareLock, false, false,
+ RangeVarCallbackForReindexTable);
+
+ if (!reindex_relation(heapOid, REINDEX_REL_PROCESS_TOAST))
+ ereport(NOTICE,
+ (errmsg("table \"%s\" has no indexes",
+ relation->relname)));
+}
+
+/*
+ * Check permissions on table before acquiring relation lock.
+ */
+static void
+RangeVarCallbackForReindexTable(const RangeVar *relation,
+ Oid relId, Oid oldRelId)
+{
+ HeapTuple tuple;
+
+ tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relId));
if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
- elog(ERROR, "cache lookup failed for relation %u", heapOid);
+ elog(ERROR, "cache lookup failed for relation %u", relId);
if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION &&
((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_TOASTVALUE)
@@ -1778,16 +1811,11 @@ ReindexTable(RangeVar *relation)
relation->relname)));
/* Check permissions */
- if (!pg_class_ownercheck(heapOid, GetUserId()))
+ if (!pg_class_ownercheck(relId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
relation->relname);
ReleaseSysCache(tuple);
-
- if (!reindex_relation(heapOid, REINDEX_REL_PROCESS_TOAST))
- ereport(NOTICE,
- (errmsg("table \"%s\" has no indexes",
- relation->relname)));
}
/*
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index 8b78b05..5b09b9f 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -99,5 +99,6 @@ extern bool reindex_relation(Oid relid, int flags);
extern bool ReindexIsProcessingHeap(Oid heapOid);
extern bool ReindexIsProcessingIndex(Oid indexOid);
+extern Oid IndexGetRelation(Oid indexId);
#endif /* INDEX_H */
diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h
index 904c6fd..97f8e72 100644
--- a/src/include/catalog/namespace.h
+++ b/src/include/catalog/namespace.h
@@ -47,9 +47,15 @@ typedef struct OverrideSearchPath
bool addTemp; /* implicitly prepend temp schema? */
} OverrideSearchPath;
+typedef void (*RangeVarGetRelidCallback)(const RangeVar *relation, Oid relId,
+ Oid oldRelId);
-extern Oid RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode,
- bool missing_ok, bool nowait);
+#define RangeVarGetRelid(relation, lockmode, missing_ok, nowait) \
+ RangeVarGetRelidExtended(relation, lockmode, missing_ok, nowait, NULL)
+
+extern Oid RangeVarGetRelidExtended(const RangeVar *relation,
+ LOCKMODE lockmode, bool missing_ok, bool nowait,
+ RangeVarGetRelidCallback callback);
extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation);
extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation);
extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid);
On Thu, Nov 17, 2011 at 08:59:58PM -0500, Robert Haas wrote:
On Thu, Nov 17, 2011 at 4:12 PM, Alvaro Herrera <alvherre@commandprompt.com> wrote:
Excerpts from Robert Haas's message of jue nov 17 17:51:06 -0300 2011:
The trouble is, I'm not quite sure how to do that. ?It seems like
permissions checks and lock-the-heap-for-this-index should be done in
RangeVarGetRelid() just after the block that says "if (retry)" and
just before the block that calls LockRelationOid(). ?That way, if we
end up deciding we need to retry the name lookup, we'll retry all that
other stuff as well, which is exactly right. ?The difficulty is that
different callers have different needs for what should go in that
space, to the degree that I'm a bit nervous about continuing to add
arguments to that function to satisfy what everybody needs. ?Maybe we
could make most of them Booleans and pass an "int flags" argument.
Another option would be to add a "callback" argument to that function
that would be called at that point with the relation, relId, and
oldRelId as arguments. ?Alternatively, we could just resign ourselves
to duplicating the loop in this function into each place in the code
that has a special-purpose requirement, but the function is complex
enough to make that a pretty unappealing prospect.I'm for the callback.
Having now written the patch, I'm convinced that the callback is in
fact the right choice. It requires only very minor reorganization of
the existing code, which is always a plus.
+1
How about invoking the callback earlier, before "if (lockmode == NoLock)"?
That way, a REINDEX TABLE that blocks against an ALTER TABLE OWNER TO will
respect the new owner. Also, the callback will still get used in the NoLock
case. Callbacks that would have preferred the old positioning can check relId
== oldRelId to distinguish.
--- a/src/include/catalog/namespace.h +++ b/src/include/catalog/namespace.h @@ -47,9 +47,15 @@ typedef struct OverrideSearchPath bool addTemp; /* implicitly prepend temp schema? */ } OverrideSearchPath;+typedef void (*RangeVarGetRelidCallback)(const RangeVar *relation, Oid relId, + Oid oldRelId);-extern Oid RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, - bool missing_ok, bool nowait); +#define RangeVarGetRelid(relation, lockmode, missing_ok, nowait) \ + RangeVarGetRelidExtended(relation, lockmode, missing_ok, nowait, NULL) + +extern Oid RangeVarGetRelidExtended(const RangeVar *relation, + LOCKMODE lockmode, bool missing_ok, bool nowait, + RangeVarGetRelidCallback callback);
I like the split of RangeVarGetRelidExtended from RangeVarGetRelid. Shall we
also default missing_ok=false and nowait=false for RangeVarGetRelid?
Thanks,
nm
On Thu, Nov 17, 2011 at 10:48 PM, Noah Misch <noah@leadboat.com> wrote:
On Thu, Nov 17, 2011 at 08:59:58PM -0500, Robert Haas wrote:
On Thu, Nov 17, 2011 at 4:12 PM, Alvaro Herrera <alvherre@commandprompt.com> wrote:
Excerpts from Robert Haas's message of jue nov 17 17:51:06 -0300 2011:
The trouble is, I'm not quite sure how to do that. ?It seems like
permissions checks and lock-the-heap-for-this-index should be done in
RangeVarGetRelid() just after the block that says "if (retry)" and
just before the block that calls LockRelationOid(). ?That way, if we
end up deciding we need to retry the name lookup, we'll retry all that
other stuff as well, which is exactly right. ?The difficulty is that
different callers have different needs for what should go in that
space, to the degree that I'm a bit nervous about continuing to add
arguments to that function to satisfy what everybody needs. ?Maybe we
could make most of them Booleans and pass an "int flags" argument.
Another option would be to add a "callback" argument to that function
that would be called at that point with the relation, relId, and
oldRelId as arguments. ?Alternatively, we could just resign ourselves
to duplicating the loop in this function into each place in the code
that has a special-purpose requirement, but the function is complex
enough to make that a pretty unappealing prospect.I'm for the callback.
Having now written the patch, I'm convinced that the callback is in
fact the right choice. It requires only very minor reorganization of
the existing code, which is always a plus.+1
How about invoking the callback earlier, before "if (lockmode == NoLock)"?
That way, a REINDEX TABLE that blocks against an ALTER TABLE OWNER TO will
respect the new owner. Also, the callback will still get used in the NoLock
case. Callbacks that would have preferred the old positioning can check relId
== oldRelId to distinguish.
Hmm, I guess that would work.
--- a/src/include/catalog/namespace.h +++ b/src/include/catalog/namespace.h @@ -47,9 +47,15 @@ typedef struct OverrideSearchPath bool addTemp; /* implicitly prepend temp schema? */ } OverrideSearchPath;+typedef void (*RangeVarGetRelidCallback)(const RangeVar *relation, Oid relId, + Oid oldRelId);-extern Oid RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, - bool missing_ok, bool nowait); +#define RangeVarGetRelid(relation, lockmode, missing_ok, nowait) \ + RangeVarGetRelidExtended(relation, lockmode, missing_ok, nowait, NULL) + +extern Oid RangeVarGetRelidExtended(const RangeVar *relation, + LOCKMODE lockmode, bool missing_ok, bool nowait, + RangeVarGetRelidCallback callback);I like the split of RangeVarGetRelidExtended from RangeVarGetRelid. Shall we
also default missing_ok=false and nowait=false for RangeVarGetRelid?
I thought about that, but just did it this way for now to keep the
patch small for review purposes. nowait = true is only used in one
place, so it probably makes sense to default it to false in the
non-extended version. But there are a number of callers who want
missing_ok = true behavior, so I'm inclined not to think that one
should be available in the non-extended version.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On Thu, Nov 17, 2011 at 11:49:06PM -0500, Robert Haas wrote:
On Thu, Nov 17, 2011 at 10:48 PM, Noah Misch <noah@leadboat.com> wrote:
On Thu, Nov 17, 2011 at 08:59:58PM -0500, Robert Haas wrote:
--- a/src/include/catalog/namespace.h +++ b/src/include/catalog/namespace.h @@ -47,9 +47,15 @@ typedef struct OverrideSearchPath ? ? ? bool ? ? ? ? ? ?addTemp; ? ? ? ? ? ? ? ?/* implicitly prepend temp schema? */ ?} OverrideSearchPath;+typedef void (*RangeVarGetRelidCallback)(const RangeVar *relation, Oid relId, + ? ? Oid oldRelId);-extern Oid ? RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, - ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?bool missing_ok, bool nowait); +#define RangeVarGetRelid(relation, lockmode, missing_ok, nowait) \ + ? ? ? ? ? ? RangeVarGetRelidExtended(relation, lockmode, missing_ok, nowait, NULL) + +extern Oid ? RangeVarGetRelidExtended(const RangeVar *relation, + ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?LOCKMODE lockmode, bool missing_ok, bool nowait, + ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?RangeVarGetRelidCallback callback);I like the split of RangeVarGetRelidExtended from RangeVarGetRelid. ?Shall we
also default missing_ok=false and nowait=false for RangeVarGetRelid?I thought about that, but just did it this way for now to keep the
patch small for review purposes.
Fair enough.
nowait = true is only used in one
place, so it probably makes sense to default it to false in the
non-extended version. But there are a number of callers who want
missing_ok = true behavior, so I'm inclined not to think that one
should be available in the non-extended version.
[assuming the "not" in your last sentence was unintended]
I count 1/25 callers overriding nowait and 3/25 overriding missing_ok. So, it's
looking like a less-common override than the callback function will come to be.
On Fri, Nov 18, 2011 at 8:37 AM, Noah Misch <noah@leadboat.com> wrote:
I count 1/25 callers overriding nowait and 3/25 overriding missing_ok. So, it's
looking like a less-common override than the callback function will come to be.
Yeah, you're probably right. However, I think there's another good
reason not to use that signature: in 9.1, the function had a signature
of (RangeVar *, bool). If in 9.2 it ends up with a signature of
(RangeVar *, LOCKMODE), you won't get a compiler warning (at least not
on my system) if you invoke it as RangeVarGetRelid(rel, true). You'll
just get silently (and subtly) different behavior: an error if the
relataion doesn't exist (instead of no error), and an AccessShareLock
if it does (instead of no lock). I think we're best off making sure
that any old-style usage of RangeVarGetRelid() that may be out there
in third-party code fails to compile.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On Fri, Nov 18, 2011 at 08:58:30AM -0500, Robert Haas wrote:
On Fri, Nov 18, 2011 at 8:37 AM, Noah Misch <noah@leadboat.com> wrote:
I count 1/25 callers overriding nowait and 3/25 overriding missing_ok. ?So, it's
looking like a less-common override than the callback function will come to be.Yeah, you're probably right. However, I think there's another good
reason not to use that signature: in 9.1, the function had a signature
of (RangeVar *, bool). If in 9.2 it ends up with a signature of
(RangeVar *, LOCKMODE), you won't get a compiler warning (at least not
on my system) if you invoke it as RangeVarGetRelid(rel, true). You'll
just get silently (and subtly) different behavior: an error if the
relataion doesn't exist (instead of no error), and an AccessShareLock
if it does (instead of no lock). I think we're best off making sure
that any old-style usage of RangeVarGetRelid() that may be out there
in third-party code fails to compile.
Good call.
On Fri, Nov 18, 2011 at 9:12 AM, Noah Misch <noah@leadboat.com> wrote:
Good call.
All right, here's an updated patch, and a couple of follow-on patches.
I updated the main patch (rangevargetrelid-callback-v2.patch) per
previous discussion. I also added a "callback_arg" argument to the
RangeVarGetRelidExtended(), because it's a law of nature that all
callback functions need such a thing, and this case proved to be no
exception: the old version of the REINDEX INDEX callback was flaky,
since it assumed that the index it locked during one iteration would
still exist during the next iteration, which won't be true when the
retry is caused by the index having been concurrently dropped. There
were some other bugs as well, which I believe I've now fixed.
fix-lock-table.patch applies over rangevargetrelid-callback-v2.patch
and adjusts LOCK TABLE so that we never obtain a relation lock before
validating that the object is a table and we have permission to lock
it. fix-drop-relation.patch applies over that, and makes similar
corrections to the logic for DROP TABLE/INDEX/SEQUENCE/VIEW/FOREIGN
TABLE. This means that it's no longer possible to use these commands
to launch a denial-of-service attack against a table you have no
rights to. Sadly, there are a bunch of other commands that can be
used the same way. I'd like to fix them all, but it's a decent amount
of work, so I'm working through them one at a time.
In the case of DROP, this also improves handling of concurrent DROP,
DROP-and-CREATE, RENAME-and-CREATE, and similar situations. For
example, in unpatched master:
rhaas=# create table t (a int);
CREATE TABLE
rhaas=# begin;
BEGIN
rhaas=# drop table t;
DROP TABLE
rhaas=# create table t (b int);
CREATE TABLE
And then, from another session:
rhaas=# drop table t;
When the first session commits, you get something like this:
ERROR: cache lookup failed for relation 16401
With these patches, that goes away: the concurrent DROP correctly sees
the new relation and drops that one. Or, if the concurrent
transaction just does a DROP rather than a DROP-and-CREATE, then you
get an error - but instead of a somewhat incomprehensible complaint
about a cache lookup having failed, you get the same error you would
have gotten had the relation not existed in the first place:
ERROR: table "t" does not exist
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
Attachments:
rangevargetrelid-callback-v2.patchapplication/octet-stream; name=rangevargetrelid-callback-v2.patchDownload
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 630c3ab..2d86266 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -979,7 +979,7 @@ relation_openrv(const RangeVar *relation, LOCKMODE lockmode)
Oid relOid;
/* Look up and lock the appropriate relation using namespace search */
- relOid = RangeVarGetRelid(relation, lockmode, false, false);
+ relOid = RangeVarGetRelid(relation, lockmode, false);
/* Let relation_open do the rest */
return relation_open(relOid, NoLock);
@@ -1001,7 +1001,7 @@ relation_openrv_extended(const RangeVar *relation, LOCKMODE lockmode,
Oid relOid;
/* Look up and lock the appropriate relation using namespace search */
- relOid = RangeVarGetRelid(relation, lockmode, missing_ok, false);
+ relOid = RangeVarGetRelid(relation, lockmode, missing_ok);
/* Return NULL on not-found */
if (!OidIsValid(relOid))
diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c
index 9a125bd..60c0b8a 100644
--- a/src/backend/catalog/aclchk.c
+++ b/src/backend/catalog/aclchk.c
@@ -606,7 +606,7 @@ objectNamesToOids(GrantObjectType objtype, List *objnames)
RangeVar *relvar = (RangeVar *) lfirst(cell);
Oid relOid;
- relOid = RangeVarGetRelid(relvar, NoLock, false, false);
+ relOid = RangeVarGetRelid(relvar, NoLock, false);
objects = lappend_oid(objects, relOid);
}
break;
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 99e130c..758872f 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -111,7 +111,6 @@ static void validate_index_heapscan(Relation heapRelation,
IndexInfo *indexInfo,
Snapshot snapshot,
v_i_state *state);
-static Oid IndexGetRelation(Oid indexId);
static bool ReindexIsCurrentlyProcessingIndex(Oid indexOid);
static void SetReindexProcessing(Oid heapOid, Oid indexOid);
static void ResetReindexProcessing(void);
@@ -1301,7 +1300,7 @@ index_drop(Oid indexId)
* proceeding until we commit and send out a shared-cache-inval notice
* that will make them update their index lists.
*/
- heapId = IndexGetRelation(indexId);
+ heapId = IndexGetRelation(indexId, false);
userHeapRelation = heap_open(heapId, AccessExclusiveLock);
userIndexRelation = index_open(indexId, AccessExclusiveLock);
@@ -2763,8 +2762,8 @@ validate_index_heapscan(Relation heapRelation,
* IndexGetRelation: given an index's relation OID, get the OID of the
* relation it is an index on. Uses the system cache.
*/
-static Oid
-IndexGetRelation(Oid indexId)
+Oid
+IndexGetRelation(Oid indexId, bool missing_ok)
{
HeapTuple tuple;
Form_pg_index index;
@@ -2772,7 +2771,11 @@ IndexGetRelation(Oid indexId)
tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexId));
if (!HeapTupleIsValid(tuple))
+ {
+ if (missing_ok)
+ return InvalidOid;
elog(ERROR, "cache lookup failed for index %u", indexId);
+ }
index = (Form_pg_index) GETSTRUCT(tuple);
Assert(index->indexrelid == indexId);
@@ -2800,7 +2803,7 @@ reindex_index(Oid indexId, bool skip_constraint_checks)
* Open and lock the parent heap relation. ShareLock is sufficient since
* we only need to be sure no schema or data changes are going on.
*/
- heapId = IndexGetRelation(indexId);
+ heapId = IndexGetRelation(indexId, false);
heapRelation = heap_open(heapId, ShareLock);
/*
diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index 6d4d4b1..25cad3d 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -219,10 +219,14 @@ Datum pg_is_other_temp_schema(PG_FUNCTION_ARGS);
* otherwise raise an error.
*
* If nowait = true, throw an error if we'd have to wait for a lock.
+ *
+ * Callback allows caller to check permissions or acquire additional locks
+ * prior to grabbing the relation lock.
*/
Oid
-RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok,
- bool nowait)
+RangeVarGetRelidExtended(const RangeVar *relation, LOCKMODE lockmode,
+ bool missing_ok, bool nowait,
+ RangeVarGetRelidCallback callback, void *callback_arg)
{
uint64 inval_count;
Oid relId;
@@ -309,6 +313,19 @@ RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok,
}
/*
+ * Invoke caller-supplied callback, if any.
+ *
+ * This callback is a good place to check permissions: we haven't taken
+ * the table lock yet (and it's really best to check permissions before
+ * locking anything!), but we've gotten far enough to know what OID we
+ * think we should lock. Of course, concurrent DDL might things while
+ * we're waiting for the lock, but in that case the callback will be
+ * invoked again for the new OID.
+ */
+ if (callback)
+ callback(relation, relId, oldRelId, callback_arg);
+
+ /*
* If no lock requested, we assume the caller knows what they're
* doing. They should have already acquired a heavyweight lock on
* this relation earlier in the processing of this same statement,
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index c321224..eb0584e 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -111,7 +111,7 @@ ExecRenameStmt(RenameStmt *stmt)
* in RenameRelation, renameatt, or renametrig.
*/
relid = RangeVarGetRelid(stmt->relation, AccessExclusiveLock,
- false, false);
+ false);
switch (stmt->renameType)
{
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 69aa5bf..386b95b 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -63,7 +63,10 @@ static void ComputeIndexAttrs(IndexInfo *indexInfo,
static Oid GetIndexOpClass(List *opclass, Oid attrType,
char *accessMethodName, Oid accessMethodId);
static char *ChooseIndexNameAddition(List *colnames);
-
+static void RangeVarCallbackForReindexTable(const RangeVar *relation,
+ Oid relId, Oid oldRelId, void *arg);
+static void RangeVarCallbackForReindexIndex(const RangeVar *relation,
+ Oid relId, Oid oldRelId, void *arg);
/*
* CheckIndexCompatible
@@ -129,7 +132,7 @@ CheckIndexCompatible(Oid oldId,
Datum d;
/* Caller should already have the relation locked in some way. */
- relationId = RangeVarGetRelid(heapRelation, NoLock, false, false);
+ relationId = RangeVarGetRelid(heapRelation, NoLock, false);
/*
* We can pretend isconstraint = false unconditionally. It only serves to
* decide the text of an error message that should never happen for us.
@@ -1725,33 +1728,74 @@ void
ReindexIndex(RangeVar *indexRelation)
{
Oid indOid;
- HeapTuple tuple;
+ Oid heapOid = InvalidOid;
+
+ /* lock level used here should match index lock reindex_index() */
+ indOid = RangeVarGetRelidExtended(indexRelation, AccessExclusiveLock,
+ false, false,
+ RangeVarCallbackForReindexIndex,
+ (void *) &heapOid);
+
+ reindex_index(indOid, false);
+}
+
+/*
+ * Check permissions on table before acquiring relation lock; also lock
+ * the heap before the RangeVarGetRelidExtended takes the index lock, to avoid
+ * deadlocks.
+ */
+static void
+RangeVarCallbackForReindexIndex(const RangeVar *relation,
+ Oid relId, Oid oldRelId, void *arg)
+{
+ char relkind;
+ Oid *heapOid = (Oid *) arg;
/*
- * XXX: This is not safe in the presence of concurrent DDL. We should
- * take AccessExclusiveLock here, but that would violate the rule that
- * indexes should only be locked after their parent tables. For now,
- * we live with it.
+ * If we previously locked some other index's heap, and the name we're
+ * looking up no longer refers to that relation, release the now-useless
+ * lock.
*/
- indOid = RangeVarGetRelid(indexRelation, NoLock, false, false);
- tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indOid));
- if (!HeapTupleIsValid(tuple))
- elog(ERROR, "cache lookup failed for relation %u", indOid);
+ if (relId != oldRelId && OidIsValid(oldRelId))
+ {
+ /* lock level here should match reindex_index() heap lock */
+ UnlockRelationOid(*heapOid, ShareLock);
+ *heapOid = InvalidOid;
+ }
+
+ /* If the relation does not exist, there's nothing more to do. */
+ if (!OidIsValid(relId))
+ return;
- if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
+ /*
+ * If the relation does exist, check whether it's an index. But note
+ * that the relation might have been dropped between the time we did the
+ * name lookup and now. In that case, there's nothing to do.
+ */
+ relkind = get_rel_relkind(relId);
+ if (!relkind)
+ return;
+ if (relkind != RELKIND_INDEX)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not an index",
- indexRelation->relname)));
+ errmsg("\"%s\" is not an index", relation->relname)));
/* Check permissions */
- if (!pg_class_ownercheck(indOid, GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- indexRelation->relname);
-
- ReleaseSysCache(tuple);
+ if (!pg_class_ownercheck(relId, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname);
- reindex_index(indOid, false);
+ /* Lock heap before index to avoid deadlock. */
+ if (relId != oldRelId)
+ {
+ /*
+ * Lock level here should match reindex_index() heap lock.
+ * If the OID isn't valid, it means the index as concurrently dropped,
+ * which is not a problem for us; just return normally.
+ */
+ *heapOid = IndexGetRelation(relId, true);
+ if (OidIsValid(*heapOid))
+ LockRelationOid(*heapOid, ShareLock);
+ }
}
/*
@@ -1762,27 +1806,10 @@ void
ReindexTable(RangeVar *relation)
{
Oid heapOid;
- HeapTuple tuple;
/* The lock level used here should match reindex_relation(). */
- heapOid = RangeVarGetRelid(relation, ShareLock, false, false);
- tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(heapOid));
- if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
- elog(ERROR, "cache lookup failed for relation %u", heapOid);
-
- if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION &&
- ((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_TOASTVALUE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table",
- relation->relname)));
-
- /* Check permissions */
- if (!pg_class_ownercheck(heapOid, GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- relation->relname);
-
- ReleaseSysCache(tuple);
+ heapOid = RangeVarGetRelidExtended(relation, ShareLock, false, false,
+ RangeVarCallbackForReindexTable, NULL);
if (!reindex_relation(heapOid, REINDEX_REL_PROCESS_TOAST))
ereport(NOTICE,
@@ -1791,6 +1818,37 @@ ReindexTable(RangeVar *relation)
}
/*
+ * Check permissions on table before acquiring relation lock.
+ */
+static void
+RangeVarCallbackForReindexTable(const RangeVar *relation,
+ Oid relId, Oid oldRelId, void *arg)
+{
+ char relkind;
+
+ /* Nothing to do if the relation was not found. */
+ if (!OidIsValid(relId))
+ return;
+
+ /*
+ * If the relation does exist, check whether it's an index. But note
+ * that the relation might have been dropped between the time we did the
+ * name lookup and now. In that case, there's nothing to do.
+ */
+ relkind = get_rel_relkind(relId);
+ if (!relkind)
+ return;
+ if (relkind != RELKIND_RELATION && relkind != RELKIND_TOASTVALUE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a table", relation->relname)));
+
+ /* Check permissions */
+ if (!pg_class_ownercheck(relId, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname);
+}
+
+/*
* ReindexDatabase
* Recreate indexes of a database.
*
diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c
index 875f868..7898b3c 100644
--- a/src/backend/commands/lockcmds.c
+++ b/src/backend/commands/lockcmds.c
@@ -55,7 +55,8 @@ LockTableCommand(LockStmt *lockstmt)
bool recurse = interpretInhOption(rv->inhOpt);
Oid reloid;
- reloid = RangeVarGetRelid(rv, lockstmt->mode, false, lockstmt->nowait);
+ reloid = RangeVarGetRelidExtended(rv, lockstmt->mode, false,
+ lockstmt->nowait, NULL, NULL);
rel = relation_open(reloid, NoLock);
LockTableRecurse(rel, lockstmt->mode, lockstmt->nowait, recurse);
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 54660f4..41e5301 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -425,7 +425,7 @@ AlterSequence(AlterSeqStmt *stmt)
List *owned_by;
/* Open and lock sequence. */
- relid = RangeVarGetRelid(stmt->sequence, AccessShareLock, false, false);
+ relid = RangeVarGetRelid(stmt->sequence, AccessShareLock, false);
init_sequence(relid, &elm, &seqrel);
/* allow ALTER to sequence owner only */
@@ -513,7 +513,7 @@ nextval(PG_FUNCTION_ARGS)
* whether the performance penalty is material in practice, but for now,
* we do it this way.
*/
- relid = RangeVarGetRelid(sequence, NoLock, false, false);
+ relid = RangeVarGetRelid(sequence, NoLock, false);
PG_RETURN_INT64(nextval_internal(relid));
}
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index c4622c0..b63dc0e 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -775,7 +775,7 @@ RemoveRelations(DropStmt *drop)
* that a table must be locked before its corresponding index.
* So, for now, we ignore the hazard.
*/
- relOid = RangeVarGetRelid(rel, NoLock, true, false);
+ relOid = RangeVarGetRelid(rel, NoLock, true);
/* Not there? */
if (!OidIsValid(relOid))
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index 5589528..b205dec 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -201,8 +201,7 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
* we might end up creating a pg_constraint entry referencing a
* nonexistent table.
*/
- constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false,
- false);
+ constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false);
}
/* permission checks */
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index f42504c..9d0a331 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -327,7 +327,7 @@ get_rel_oids(Oid relid, const RangeVar *vacrel)
* alternative, since we're going to commit this transaction and
* begin a new one between now and then.
*/
- relid = RangeVarGetRelid(vacrel, NoLock, false, false);
+ relid = RangeVarGetRelid(vacrel, NoLock, false);
/* Make a relation list entry for this guy */
oldcontext = MemoryContextSwitchTo(vac_context);
diff --git a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c
index 9ac28c9..35e4baa 100644
--- a/src/backend/parser/parse_relation.c
+++ b/src/backend/parser/parse_relation.c
@@ -282,7 +282,7 @@ searchRangeTable(ParseState *pstate, RangeVar *relation)
if (!relation->schemaname)
cte = scanNameSpaceForCTE(pstate, refname, &ctelevelsup);
if (!cte)
- relId = RangeVarGetRelid(relation, NoLock, true, false);
+ relId = RangeVarGetRelid(relation, NoLock, true);
/* Now look for RTEs matching either the relation/CTE or the alias */
for (levelsup = 0;
diff --git a/src/backend/parser/parse_type.c b/src/backend/parser/parse_type.c
index 2122032..91028c4 100644
--- a/src/backend/parser/parse_type.c
+++ b/src/backend/parser/parse_type.c
@@ -115,7 +115,7 @@ LookupTypeName(ParseState *pstate, const TypeName *typeName,
* of concurrent DDL. But taking a lock would carry a performance
* penalty and would also require a permissions check.
*/
- relid = RangeVarGetRelid(rel, NoLock, false, false);
+ relid = RangeVarGetRelid(rel, NoLock, false);
attnum = get_attnum(relid, field);
if (attnum == InvalidAttrNumber)
ereport(ERROR,
diff --git a/src/backend/rewrite/rewriteDefine.c b/src/backend/rewrite/rewriteDefine.c
index 17db70e..a9bb8d5 100644
--- a/src/backend/rewrite/rewriteDefine.c
+++ b/src/backend/rewrite/rewriteDefine.c
@@ -203,8 +203,7 @@ DefineRule(RuleStmt *stmt, const char *queryString)
* Find and lock the relation. Lock level should match
* DefineQueryRewrite.
*/
- relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false,
- false);
+ relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false);
/* ... and execute */
DefineQueryRewrite(stmt->rulename,
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 6f88c47..36d9edc 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -84,7 +84,7 @@ CheckRelationOwnership(RangeVar *rel, bool noCatalogs)
* AccessExclusiveLock) before verifying that the user has permissions
* is not appealing either.
*/
- relOid = RangeVarGetRelid(rel, NoLock, false, false);
+ relOid = RangeVarGetRelid(rel, NoLock, false);
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
if (!HeapTupleIsValid(tuple)) /* should not happen */
diff --git a/src/backend/utils/adt/acl.c b/src/backend/utils/adt/acl.c
index e79ba50..48fb673 100644
--- a/src/backend/utils/adt/acl.c
+++ b/src/backend/utils/adt/acl.c
@@ -1940,7 +1940,7 @@ convert_table_name(text *tablename)
relrv = makeRangeVarFromNameList(textToQualifiedNameList(tablename));
/* We might not even have permissions on this relation; don't lock it. */
- return RangeVarGetRelid(relrv, NoLock, false, false);
+ return RangeVarGetRelid(relrv, NoLock, false);
}
/*
diff --git a/src/backend/utils/adt/regproc.c b/src/backend/utils/adt/regproc.c
index 0d42a39..c050f5e 100644
--- a/src/backend/utils/adt/regproc.c
+++ b/src/backend/utils/adt/regproc.c
@@ -824,8 +824,7 @@ regclassin(PG_FUNCTION_ARGS)
names = stringToQualifiedNameList(class_name_or_oid);
/* We might not even have permissions on this relation; don't lock it. */
- result = RangeVarGetRelid(makeRangeVarFromNameList(names), NoLock, false,
- false);
+ result = RangeVarGetRelid(makeRangeVarFromNameList(names), NoLock, false);
PG_RETURN_OID(result);
}
@@ -1298,7 +1297,7 @@ text_regclass(PG_FUNCTION_ARGS)
rv = makeRangeVarFromNameList(textToQualifiedNameList(relname));
/* We might not even have permissions on this relation; don't lock it. */
- result = RangeVarGetRelid(rv, NoLock, false, false);
+ result = RangeVarGetRelid(rv, NoLock, false);
PG_RETURN_OID(result);
}
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 75923a6..29df748 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -390,7 +390,7 @@ pg_get_viewdef_name(PG_FUNCTION_ARGS)
/* Look up view name. Can't lock it - we might not have privileges. */
viewrel = makeRangeVarFromNameList(textToQualifiedNameList(viewname));
- viewoid = RangeVarGetRelid(viewrel, NoLock, false, false);
+ viewoid = RangeVarGetRelid(viewrel, NoLock, false);
PG_RETURN_TEXT_P(string_to_text(pg_get_viewdef_worker(viewoid, 0)));
}
@@ -410,7 +410,7 @@ pg_get_viewdef_name_ext(PG_FUNCTION_ARGS)
/* Look up view name. Can't lock it - we might not have privileges. */
viewrel = makeRangeVarFromNameList(textToQualifiedNameList(viewname));
- viewoid = RangeVarGetRelid(viewrel, NoLock, false, false);
+ viewoid = RangeVarGetRelid(viewrel, NoLock, false);
PG_RETURN_TEXT_P(string_to_text(pg_get_viewdef_worker(viewoid, prettyFlags)));
}
@@ -1576,7 +1576,7 @@ pg_get_serial_sequence(PG_FUNCTION_ARGS)
/* Look up table name. Can't lock it - we might not have privileges. */
tablerv = makeRangeVarFromNameList(textToQualifiedNameList(tablename));
- tableOid = RangeVarGetRelid(tablerv, NoLock, false, false);
+ tableOid = RangeVarGetRelid(tablerv, NoLock, false);
/* Get the number of the column */
column = text_to_cstring(columnname);
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index 8b78b05..9efd9af 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -99,5 +99,6 @@ extern bool reindex_relation(Oid relid, int flags);
extern bool ReindexIsProcessingHeap(Oid heapOid);
extern bool ReindexIsProcessingIndex(Oid indexOid);
+extern Oid IndexGetRelation(Oid indexId, bool missing_ok);
#endif /* INDEX_H */
diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h
index 904c6fd..d537a48 100644
--- a/src/include/catalog/namespace.h
+++ b/src/include/catalog/namespace.h
@@ -47,9 +47,16 @@ typedef struct OverrideSearchPath
bool addTemp; /* implicitly prepend temp schema? */
} OverrideSearchPath;
+typedef void (*RangeVarGetRelidCallback)(const RangeVar *relation, Oid relId,
+ Oid oldRelId, void *callback_arg);
-extern Oid RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode,
- bool missing_ok, bool nowait);
+#define RangeVarGetRelid(relation, lockmode, missing_ok) \
+ RangeVarGetRelidExtended(relation, lockmode, missing_ok, false, NULL, NULL)
+
+extern Oid RangeVarGetRelidExtended(const RangeVar *relation,
+ LOCKMODE lockmode, bool missing_ok, bool nowait,
+ RangeVarGetRelidCallback callback,
+ void *callback_arg);
extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation);
extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation);
extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid);
diff --git a/src/pl/plpgsql/src/pl_comp.c b/src/pl/plpgsql/src/pl_comp.c
index 275cad7..d1be6c9 100644
--- a/src/pl/plpgsql/src/pl_comp.c
+++ b/src/pl/plpgsql/src/pl_comp.c
@@ -1707,7 +1707,7 @@ plpgsql_parse_cwordtype(List *idents)
strVal(lsecond(idents)),
-1);
/* Can't lock relation - we might not have privileges. */
- classOid = RangeVarGetRelid(relvar, NoLock, true, false);
+ classOid = RangeVarGetRelid(relvar, NoLock, true);
if (!OidIsValid(classOid))
goto done;
fldname = strVal(lthird(idents));
@@ -1808,7 +1808,7 @@ plpgsql_parse_cwordrowtype(List *idents)
relvar = makeRangeVar(strVal(linitial(idents)),
strVal(lsecond(idents)),
-1);
- classOid = RangeVarGetRelid(relvar, NoLock, false, false);
+ classOid = RangeVarGetRelid(relvar, NoLock, false);
MemoryContextSwitchTo(oldCxt);
fix-lock-table.patchapplication/octet-stream; name=fix-lock-table.patchDownload
diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c
index 7898b3c..91642ce 100644
--- a/src/backend/commands/lockcmds.c
+++ b/src/backend/commands/lockcmds.c
@@ -23,10 +23,12 @@
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/lsyscache.h"
+#include "utils/syscache.h"
-static void LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait,
- bool recurse);
-
+static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait);
+static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode);
+static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid,
+ Oid oldrelid, void *arg);
/*
* LOCK TABLE
@@ -51,99 +53,124 @@ LockTableCommand(LockStmt *lockstmt)
foreach(p, lockstmt->relations)
{
RangeVar *rv = (RangeVar *) lfirst(p);
- Relation rel;
bool recurse = interpretInhOption(rv->inhOpt);
Oid reloid;
reloid = RangeVarGetRelidExtended(rv, lockstmt->mode, false,
- lockstmt->nowait, NULL, NULL);
- rel = relation_open(reloid, NoLock);
+ lockstmt->nowait,
+ RangeVarCallbackForLockTable,
+ (void *) &lockstmt->mode);
- LockTableRecurse(rel, lockstmt->mode, lockstmt->nowait, recurse);
+ if (recurse)
+ LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait);
}
}
/*
- * Apply LOCK TABLE recursively over an inheritance tree
+ * Before acquiring a table lock on the named table, check whether we have
+ * permission to do so.
*/
static void
-LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, bool recurse)
+RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid,
+ void *arg)
{
+ LOCKMODE lockmode = * (LOCKMODE *) arg;
+ char relkind;
AclResult aclresult;
- Oid reloid = RelationGetRelid(rel);
- /* Verify adequate privilege */
- if (lockmode == AccessShareLock)
- aclresult = pg_class_aclcheck(reloid, GetUserId(),
- ACL_SELECT);
- else
- aclresult = pg_class_aclcheck(reloid, GetUserId(),
- ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE);
- if (aclresult != ACLCHECK_OK)
- aclcheck_error(aclresult, ACL_KIND_CLASS,
- RelationGetRelationName(rel));
+ if (!OidIsValid(relid))
+ return; /* doesn't exist, so no permissions check */
+ relkind = get_rel_relkind(relid);
+ if (!relkind)
+ return; /* woops, concurrently dropped; no permissions check */
/* Currently, we only allow plain tables to be locked */
- if (rel->rd_rel->relkind != RELKIND_RELATION)
+ if (relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table",
- RelationGetRelationName(rel))));
+ rv->relname)));
- /*
- * If requested, recurse to children. We use find_inheritance_children
- * not find_all_inheritors to avoid taking locks far in advance of
- * checking privileges. This means we'll visit multiply-inheriting
- * children more than once, but that's no problem.
- */
- if (recurse)
+ /* Check permissions. */
+ aclresult = LockTableAclCheck(relid, lockmode);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_CLASS, rv->relname);
+}
+
+/*
+ * Apply LOCK TABLE recursively over an inheritance tree
+ *
+ * We use find_inheritance_children not find_all_inheritors to avoid taking
+ * locks far in advance of checking privileges. This means we'll visit
+ * multiply-inheriting children more than once, but that's no problem.
+ */
+static void
+LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
+{
+ List *children;
+ ListCell *lc;
+
+ children = find_inheritance_children(reloid, NoLock);
+
+ foreach(lc, children)
{
- List *children = find_inheritance_children(reloid, NoLock);
- ListCell *lc;
- Relation childrel;
+ Oid childreloid = lfirst_oid(lc);
+ AclResult aclresult;
+
+ /* Check permissions before acquiring the lock. */
+ aclresult = LockTableAclCheck(childreloid, lockmode);
+ if (aclresult != ACLCHECK_OK)
+ {
+ char *relname = get_rel_name(childreloid);
+ if (!relname)
+ continue; /* child concurrently dropped, just skip it */
+ aclcheck_error(aclresult, ACL_KIND_CLASS, relname);
+ }
- foreach(lc, children)
+ /* We have enough rights to lock the relation; do so. */
+ if (!nowait)
+ LockRelationOid(childreloid, lockmode);
+ else if (!ConditionalLockRelationOid(childreloid, lockmode))
{
- Oid childreloid = lfirst_oid(lc);
-
- /*
- * Acquire the lock, to protect against concurrent drops. Note
- * that a lock against an already-dropped relation's OID won't
- * fail.
- */
- if (!nowait)
- LockRelationOid(childreloid, lockmode);
- else if (!ConditionalLockRelationOid(childreloid, lockmode))
- {
- /* try to throw error by name; relation could be deleted... */
- char *relname = get_rel_name(childreloid);
-
- if (relname)
- ereport(ERROR,
- (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
- errmsg("could not obtain lock on relation \"%s\"",
- relname)));
- else
- ereport(ERROR,
- (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
- errmsg("could not obtain lock on relation with OID %u",
- reloid)));
- }
-
- /*
- * Now that we have the lock, check to see if the relation really
- * exists or not.
- */
- childrel = try_relation_open(childreloid, NoLock);
- if (!childrel)
- {
- /* Release useless lock */
- UnlockRelationOid(childreloid, lockmode);
- }
-
- LockTableRecurse(childrel, lockmode, nowait, recurse);
+ /* try to throw error by name; relation could be deleted... */
+ char *relname = get_rel_name(childreloid);
+ if (!relname)
+ continue; /* child concurrently dropped, just skip it */
+ ereport(ERROR,
+ (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+ errmsg("could not obtain lock on relation \"%s\"",
+ relname)));
}
+
+ /*
+ * Even if we got the lock, child might have been concurrently dropped.
+ * If so, we can skip it.
+ */
+ if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(childreloid)))
+ {
+ /* Release useless lock */
+ UnlockRelationOid(childreloid, lockmode);
+ continue;
+ }
+
+ LockTableRecurse(childreloid, lockmode, nowait);
}
+}
- relation_close(rel, NoLock); /* close rel, keep lock */
+/*
+ * Check whether the current user is permitted to lock this relation.
+ */
+static AclResult
+LockTableAclCheck(Oid reloid, LOCKMODE lockmode)
+{
+ AclResult aclresult;
+
+ /* Verify adequate privilege */
+ if (lockmode == AccessShareLock)
+ aclresult = pg_class_aclcheck(reloid, GetUserId(),
+ ACL_SELECT);
+ else
+ aclresult = pg_class_aclcheck(reloid, GetUserId(),
+ ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE);
+ return aclresult;
}
fix-drop-relation.patchapplication/octet-stream; name=fix-drop-relation.patchDownload
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index b63dc0e..1c3fe6a 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -235,6 +235,12 @@ static const struct dropmsgstrings dropmsgstringarray[] = {
{'\0', 0, NULL, NULL, NULL, NULL}
};
+struct DropRelationCallbackState
+{
+ char relkind;
+ Oid heapOid;
+};
+
/* Alter table target-type flags for ATSimplePermissions */
#define ATT_TABLE 0x0001
#define ATT_VIEW 0x0002
@@ -375,6 +381,9 @@ static void copy_relation_data(SMgrRelation rel, SMgrRelation dst,
ForkNumber forkNum, char relpersistence);
static const char *storage_name(char c);
+static void RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid,
+ Oid oldRelOid, void *arg);
+
/* ----------------------------------------------------------------
* DefineRelation
@@ -751,9 +760,8 @@ RemoveRelations(DropStmt *drop)
{
RangeVar *rel = makeRangeVarFromNameList((List *) lfirst(cell));
Oid relOid;
- HeapTuple tuple;
- Form_pg_class classform;
ObjectAddress obj;
+ struct DropRelationCallbackState state;
/*
* These next few steps are a great deal like relation_openrv, but we
@@ -767,15 +775,13 @@ RemoveRelations(DropStmt *drop)
*/
AcceptInvalidationMessages();
- /*
- * Look up the appropriate relation using namespace search.
- *
- * XXX: Doing this without a lock is unsafe in the presence of
- * concurrent DDL, but acquiring a lock here might violate the rule
- * that a table must be locked before its corresponding index.
- * So, for now, we ignore the hazard.
- */
- relOid = RangeVarGetRelid(rel, NoLock, true);
+ /* Look up the appropriate relation using namespace search. */
+ state.relkind = relkind;
+ state.heapOid = InvalidOid;
+ relOid = RangeVarGetRelidExtended(rel, AccessExclusiveLock, true,
+ false,
+ RangeVarCallbackForDropRelation,
+ (void *) &state);
/* Not there? */
if (!OidIsValid(relOid))
@@ -784,57 +790,12 @@ RemoveRelations(DropStmt *drop)
continue;
}
- /*
- * In DROP INDEX, attempt to acquire lock on the parent table before
- * locking the index. index_drop() will need this anyway, and since
- * regular queries lock tables before their indexes, we risk deadlock
- * if we do it the other way around. No error if we don't find a
- * pg_index entry, though --- that most likely means it isn't an
- * index, and we'll fail below.
- */
- if (relkind == RELKIND_INDEX)
- {
- tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(relOid));
- if (HeapTupleIsValid(tuple))
- {
- Form_pg_index index = (Form_pg_index) GETSTRUCT(tuple);
-
- LockRelationOid(index->indrelid, AccessExclusiveLock);
- ReleaseSysCache(tuple);
- }
- }
-
- /* Get the lock before trying to fetch the syscache entry */
- LockRelationOid(relOid, AccessExclusiveLock);
-
- tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
- if (!HeapTupleIsValid(tuple))
- elog(ERROR, "cache lookup failed for relation %u", relOid);
- classform = (Form_pg_class) GETSTRUCT(tuple);
-
- if (classform->relkind != relkind)
- DropErrorMsgWrongType(rel->relname, classform->relkind, relkind);
-
- /* Allow DROP to either table owner or schema owner */
- if (!pg_class_ownercheck(relOid, GetUserId()) &&
- !pg_namespace_ownercheck(classform->relnamespace, GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- rel->relname);
-
- if (!allowSystemTableMods && IsSystemClass(classform))
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("permission denied: \"%s\" is a system catalog",
- rel->relname)));
-
/* OK, we're ready to delete this one */
obj.classId = RelationRelationId;
obj.objectId = relOid;
obj.objectSubId = 0;
add_exact_object_address(&obj, objects);
-
- ReleaseSysCache(tuple);
}
performMultipleDeletions(objects, drop->behavior);
@@ -843,6 +804,74 @@ RemoveRelations(DropStmt *drop)
}
/*
+ * Before acquiring a table lock, check whether we have sufficient rights.
+ * In the case of DROP INDEX, also try to lock the table before the index.
+ */
+static void
+RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid,
+ void *arg)
+{
+ HeapTuple tuple;
+ struct DropRelationCallbackState *state;
+ char relkind;
+ Form_pg_class classform;
+
+ state = (struct DropRelationCallbackState *) arg;
+ relkind = state->relkind;
+
+ /*
+ * If we previously locked some other index's heap, and the name we're
+ * looking up no longer refers to that relation, release the now-useless
+ * lock.
+ */
+ if (relOid != oldRelOid && OidIsValid(state->heapOid))
+ {
+ UnlockRelationOid(state->heapOid, AccessExclusiveLock);
+ state->heapOid = InvalidOid;
+ }
+
+ /* Didn't find a relation, so need for locking or permission checks. */
+ if (!OidIsValid(relOid))
+ return;
+
+ tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
+ if (!HeapTupleIsValid(tuple))
+ return; /* concurrently dropped, so nothing to do */
+ classform = (Form_pg_class) GETSTRUCT(tuple);
+
+ if (classform->relkind != relkind)
+ DropErrorMsgWrongType(rel->relname, classform->relkind, relkind);
+
+ /* Allow DROP to either table owner or schema owner */
+ if (!pg_class_ownercheck(relOid, GetUserId()) &&
+ !pg_namespace_ownercheck(classform->relnamespace, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+ rel->relname);
+
+ if (!allowSystemTableMods && IsSystemClass(classform))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied: \"%s\" is a system catalog",
+ rel->relname)));
+
+ ReleaseSysCache(tuple);
+
+ /*
+ * In DROP INDEX, attempt to acquire lock on the parent table before
+ * locking the index. index_drop() will need this anyway, and since
+ * regular queries lock tables before their indexes, we risk deadlock
+ * if we do it the other way around. No error if we don't find a
+ * pg_index entry, though --- the relation may have been droppd.
+ */
+ if (relkind == RELKIND_INDEX && relOid != oldRelOid)
+ {
+ state->heapOid = IndexGetRelation(relOid, true);
+ if (OidIsValid(state->heapOid))
+ LockRelationOid(state->heapOid, AccessExclusiveLock);
+ }
+}
+
+/*
* ExecuteTruncate
* Executes a TRUNCATE command.
*
On Thu, Nov 24, 2011 at 10:54:50AM -0500, Robert Haas wrote:
All right, here's an updated patch, and a couple of follow-on patches.
Your committed patch looks great overall. A few cosmetic points:
--- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c
@@ -309,6 +313,19 @@ RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok,
}/* + * Invoke caller-supplied callback, if any. + * + * This callback is a good place to check permissions: we haven't taken + * the table lock yet (and it's really best to check permissions before + * locking anything!), but we've gotten far enough to know what OID we + * think we should lock. Of course, concurrent DDL might things while
That last sentence needs a word around "might things".
--- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c
@@ -767,15 +775,13 @@ RemoveRelations(DropStmt *drop)
*/
AcceptInvalidationMessages();
The above call can go away, now.
@@ -843,6 +804,74 @@ RemoveRelations(DropStmt *drop)
}/* + * Before acquiring a table lock, check whether we have sufficient rights. + * In the case of DROP INDEX, also try to lock the table before the index. + */ +static void +RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid, + void *arg) +{ + HeapTuple tuple; + struct DropRelationCallbackState *state; + char relkind; + Form_pg_class classform; + + state = (struct DropRelationCallbackState *) arg; + relkind = state->relkind; + + /* + * If we previously locked some other index's heap, and the name we're + * looking up no longer refers to that relation, release the now-useless + * lock. + */ + if (relOid != oldRelOid && OidIsValid(state->heapOid)) + { + UnlockRelationOid(state->heapOid, AccessExclusiveLock); + state->heapOid = InvalidOid; + } + + /* Didn't find a relation, so need for locking or permission checks. */
That sentence needs a word around "so need".
Thanks,
nm
On Mon, Dec 5, 2011 at 2:09 AM, Noah Misch <noah@leadboat.com> wrote:
Your committed patch looks great overall. A few cosmetic points:
Thanks for the review.
That last sentence needs a word around "might things".
Fixed.
AcceptInvalidationMessages();
The above call can go away, now.
Doesn't that still protect us against namespace-shadowing issues?
RangeVarGetRelid doesn't actually AcceptInvalidationMessages() at the
top.
That sentence needs a word around "so need".
Fixed.
Attached please find a patch with some more fixes on this same general
theme. This one tackles renaming of relations, columns, and triggers;
and changing the schema of relations. In these cases, the current
code does a permissions check before locking the table (which is good)
and uses RangeVarGetRelid() to guard against "cache lookup failure"
errors caused by concurrent DDL (also good). However, if the referent
of the name changes during the lock wait, we don't recheck
permissions; we just allow the rename or schema change on the basis
that the user had permission to do it to the relation that formerly
had that name. While this is pretty minor as security concerns go, it
seems best to clean it up, so this patch does that.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
Attachments:
rangevargetrelid-callback-round2.patchapplication/octet-stream; name=rangevargetrelid-callback-round2.patchDownload
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index eb0584e..8225155 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -97,62 +97,18 @@ ExecRenameStmt(RenameStmt *stmt)
case OBJECT_SEQUENCE:
case OBJECT_VIEW:
case OBJECT_INDEX:
+ case OBJECT_FOREIGN_TABLE:
+ RenameRelation(stmt);
+ break;
+
case OBJECT_COLUMN:
case OBJECT_ATTRIBUTE:
+ renameatt(stmt);
+ break;
+
case OBJECT_TRIGGER:
- case OBJECT_FOREIGN_TABLE:
- {
- Oid relid;
-
- CheckRelationOwnership(stmt->relation, true);
-
- /*
- * Lock level used here should match what will be taken later,
- * in RenameRelation, renameatt, or renametrig.
- */
- relid = RangeVarGetRelid(stmt->relation, AccessExclusiveLock,
- false);
-
- switch (stmt->renameType)
- {
- case OBJECT_TABLE:
- case OBJECT_SEQUENCE:
- case OBJECT_VIEW:
- case OBJECT_INDEX:
- case OBJECT_FOREIGN_TABLE:
- {
- /*
- * RENAME TABLE requires that we (still) hold
- * CREATE rights on the containing namespace, as
- * well as ownership of the table.
- */
- Oid namespaceId = get_rel_namespace(relid);
- AclResult aclresult;
-
- aclresult = pg_namespace_aclcheck(namespaceId,
- GetUserId(),
- ACL_CREATE);
- if (aclresult != ACLCHECK_OK)
- aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
- get_namespace_name(namespaceId));
-
- RenameRelation(relid, stmt->newname, stmt->renameType);
- break;
- }
- case OBJECT_COLUMN:
- case OBJECT_ATTRIBUTE:
- renameatt(relid, stmt);
- break;
- case OBJECT_TRIGGER:
- renametrig(relid,
- stmt->subname, /* old att name */
- stmt->newname); /* new att name */
- break;
- default:
- /* can't happen */ ;
- }
- break;
- }
+ renametrig(stmt);
+ break;
case OBJECT_TSPARSER:
RenameTSParser(stmt->object, stmt->newname);
@@ -227,7 +183,6 @@ ExecAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt)
case OBJECT_TABLE:
case OBJECT_VIEW:
case OBJECT_FOREIGN_TABLE:
- CheckRelationOwnership(stmt->relation, true);
AlterTableNamespace(stmt->relation, stmt->newschema,
stmt->objectType, AccessExclusiveLock);
break;
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index edec44d..e805e28 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -1474,28 +1474,24 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
{
Relation toastrel;
Oid toastidx;
- Oid toastnamespace;
char NewToastName[NAMEDATALEN];
toastrel = relation_open(newrel->rd_rel->reltoastrelid,
AccessShareLock);
toastidx = toastrel->rd_rel->reltoastidxid;
- toastnamespace = toastrel->rd_rel->relnamespace;
relation_close(toastrel, AccessShareLock);
/* rename the toast table ... */
snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
OIDOldHeap);
RenameRelationInternal(newrel->rd_rel->reltoastrelid,
- NewToastName,
- toastnamespace);
+ NewToastName);
/* ... and its index too */
snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
OIDOldHeap);
RenameRelationInternal(toastidx,
- NewToastName,
- toastnamespace);
+ NewToastName);
}
relation_close(newrel, NoLock);
}
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 1ee201c..135736a 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -2064,33 +2064,15 @@ SetRelationHasSubclass(Oid relationId, bool relhassubclass)
heap_close(relationRelation, RowExclusiveLock);
}
-
/*
- * renameatt_internal - workhorse for renameatt
+ * renameatt_check - basic sanity checks before attribute rename
*/
static void
-renameatt_internal(Oid myrelid,
- const char *oldattname,
- const char *newattname,
- bool recurse,
- bool recursing,
- int expected_parents,
- DropBehavior behavior)
+renameatt_check(Oid myrelid, Form_pg_class classform, bool recursing)
{
- Relation targetrelation;
- Relation attrelation;
- HeapTuple atttup;
- Form_pg_attribute attform;
- int attnum;
- char relkind;
+ char relkind = classform->relkind;
- /*
- * Grab an exclusive lock on the target table, which we will NOT release
- * until end of transaction.
- */
- targetrelation = relation_open(myrelid, AccessExclusiveLock);
-
- if (targetrelation->rd_rel->reloftype && !recursing)
+ if (classform->reloftype && !recursing)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot rename column of typed table")));
@@ -2102,7 +2084,6 @@ renameatt_internal(Oid myrelid,
* change names that are hardcoded into the system, hence the following
* restriction.
*/
- relkind = RelationGetForm(targetrelation)->relkind;
if (relkind != RELKIND_RELATION &&
relkind != RELKIND_VIEW &&
relkind != RELKIND_COMPOSITE_TYPE &&
@@ -2111,19 +2092,45 @@ renameatt_internal(Oid myrelid,
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table, view, composite type, index or foreign table",
- RelationGetRelationName(targetrelation))));
+ NameStr(classform->relname))));
/*
* permissions checking. only the owner of a class can change its schema.
*/
if (!pg_class_ownercheck(myrelid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- RelationGetRelationName(targetrelation));
- if (!allowSystemTableMods && IsSystemRelation(targetrelation))
+ NameStr(classform->relname));
+ if (!allowSystemTableMods && IsSystemClass(classform))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied: \"%s\" is a system catalog",
- RelationGetRelationName(targetrelation))));
+ NameStr(classform->relname))));
+}
+
+/*
+ * renameatt_internal - workhorse for renameatt
+ */
+static void
+renameatt_internal(Oid myrelid,
+ const char *oldattname,
+ const char *newattname,
+ bool recurse,
+ bool recursing,
+ int expected_parents,
+ DropBehavior behavior)
+{
+ Relation targetrelation;
+ Relation attrelation;
+ HeapTuple atttup;
+ Form_pg_attribute attform;
+ int attnum;
+
+ /*
+ * Grab an exclusive lock on the target table, which we will NOT release
+ * until end of transaction.
+ */
+ targetrelation = relation_open(myrelid, AccessExclusiveLock);
+ renameatt_check(myrelid, RelationGetForm(targetrelation), recursing);
/*
* if the 'recurse' flag is set then we are supposed to rename this
@@ -2252,14 +2259,38 @@ renameatt_internal(Oid myrelid,
relation_close(targetrelation, NoLock); /* close rel but keep lock */
}
+/*
+ * Perform permissions and integrity checks before acquiring a relation lock.
+ */
+static void
+RangeVarCallbackForRenameAttribute(const RangeVar *rv, Oid relid, Oid oldrelid,
+ void *arg)
+{
+ HeapTuple tuple;
+ Form_pg_class form;
+
+ tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+ if (!HeapTupleIsValid(tuple))
+ return; /* concurrently dropped */
+ form = (Form_pg_class) GETSTRUCT(tuple);
+ renameatt_check(relid, form, false);
+ ReleaseSysCache(tuple);
+}
/*
* renameatt - changes the name of a attribute in a relation
*/
void
-renameatt(Oid myrelid, RenameStmt *stmt)
+renameatt(RenameStmt *stmt)
{
- renameatt_internal(myrelid,
+ Oid relid;
+
+ /* lock level taken here should match renameatt_internal */
+ relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
+ false, false,
+ RangeVarCallbackForRenameAttribute,
+ NULL);
+ renameatt_internal(relid,
stmt->subname, /* old att name */
stmt->newname, /* new att name */
interpretInhOption(stmt->relation->inhOpt), /* recursive? */
@@ -2268,29 +2299,44 @@ renameatt(Oid myrelid, RenameStmt *stmt)
stmt->behavior);
}
-
/*
- * Execute ALTER TABLE/INDEX/SEQUENCE/VIEW/FOREIGN TABLE RENAME
- *
- * Caller has already done permissions checks.
+ * Perform permissions and integrity checks before acquiring a relation lock.
*/
-void
-RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
+static void
+RangeVarCallbackForRenameRelation(const RangeVar *rv, Oid relid, Oid oldrelid,
+ void *arg)
{
- Relation targetrelation;
- Oid namespaceId;
- char relkind;
+ RenameStmt *stmt = (RenameStmt *) arg;
+ ObjectType reltype;
+ HeapTuple tuple;
+ Form_pg_class classform;
+ AclResult aclresult;
+ char relkind;
- /*
- * Grab an exclusive lock on the target table, index, sequence or view,
- * which we will NOT release until end of transaction.
- *
- * Lock level used here should match ExecRenameStmt
- */
- targetrelation = relation_open(myrelid, AccessExclusiveLock);
+ tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+ if (!HeapTupleIsValid(tuple))
+ return; /* concurrently dropped */
+ classform = (Form_pg_class) GETSTRUCT(tuple);
+ relkind = classform->relkind;
- namespaceId = RelationGetNamespace(targetrelation);
- relkind = targetrelation->rd_rel->relkind;
+ /* Must own table. */
+ if (!pg_class_ownercheck(relid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+ NameStr(classform->relname));
+
+ /* No system table modifications unless explicitly allowed. */
+ if (!allowSystemTableMods && IsSystemClass(classform))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied: \"%s\" is a system catalog",
+ NameStr(classform->relname))));
+
+ /* Must (still) have CREATE rights on containing namespace. */
+ aclresult = pg_namespace_aclcheck(classform->relnamespace, GetUserId(),
+ ACL_CREATE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
+ get_namespace_name(classform->relnamespace));
/*
* For compatibility with prior releases, we don't complain if ALTER TABLE
@@ -2298,23 +2344,21 @@ RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
* ALTER SEQUENCE/VIEW/FOREIGN TABLE are only to be used with relations of
* that type.
*/
+ reltype = stmt->renameType;
if (reltype == OBJECT_SEQUENCE && relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a sequence",
- RelationGetRelationName(targetrelation))));
+ errmsg("\"%s\" is not a sequence", rv->relname)));
if (reltype == OBJECT_VIEW && relkind != RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a view",
- RelationGetRelationName(targetrelation))));
+ errmsg("\"%s\" is not a view", rv->relname)));
if (reltype == OBJECT_FOREIGN_TABLE && relkind != RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a foreign table",
- RelationGetRelationName(targetrelation))));
+ errmsg("\"%s\" is not a foreign table", rv->relname)));
/*
* Don't allow ALTER TABLE on composite types. We want people to use ALTER
@@ -2323,17 +2367,35 @@ RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
if (relkind == RELKIND_COMPOSITE_TYPE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a composite type",
- RelationGetRelationName(targetrelation)),
+ errmsg("\"%s\" is a composite type", rv->relname),
errhint("Use ALTER TYPE instead.")));
- /* Do the work */
- RenameRelationInternal(myrelid, newrelname, namespaceId);
+ ReleaseSysCache(tuple);
+}
+
+
+/*
+ * Execute ALTER TABLE/INDEX/SEQUENCE/VIEW/FOREIGN TABLE RENAME
+ */
+void
+RenameRelation(RenameStmt *stmt)
+{
+ Oid relid;
/*
- * Close rel, but keep exclusive lock!
+ * Grab an exclusive lock on the target table, index, sequence or view,
+ * which we will NOT release until end of transaction.
+ *
+ * Lock level used here should match RenameRelationInternal, to avoid
+ * lock escalation.
*/
- relation_close(targetrelation, NoLock);
+ relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
+ false, false,
+ RangeVarCallbackForRenameRelation,
+ (void *) stmt);
+
+ /* Do the work */
+ RenameRelationInternal(relid, stmt->newname);
}
/*
@@ -2346,18 +2408,20 @@ RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
* sequence, AFAIK there's no need for it to be there.
*/
void
-RenameRelationInternal(Oid myrelid, const char *newrelname, Oid namespaceId)
+RenameRelationInternal(Oid myrelid, const char *newrelname)
{
Relation targetrelation;
Relation relrelation; /* for RELATION relation */
HeapTuple reltup;
Form_pg_class relform;
+ Oid namespaceId;
/*
* Grab an exclusive lock on the target table, index, sequence or view,
* which we will NOT release until end of transaction.
*/
targetrelation = relation_open(myrelid, AccessExclusiveLock);
+ namespaceId = RelationGetNamespace(targetrelation);
/*
* Find relation's pg_class tuple, and make sure newrelname isn't in use.
@@ -5376,7 +5440,7 @@ ATExecAddIndexConstraint(AlteredTableInfo *tab, Relation rel,
ereport(NOTICE,
(errmsg("ALTER TABLE / ADD CONSTRAINT USING INDEX will rename index \"%s\" to \"%s\"",
indexName, constraintName)));
- RenameRelation(index_oid, constraintName, OBJECT_INDEX);
+ RenameRelationInternal(index_oid, constraintName);
}
/* Extra checks needed if making primary key */
@@ -9315,28 +9379,35 @@ ATExecGenericOptions(Relation rel, List *options)
heap_freetuple(tuple);
}
-
/*
- * Execute ALTER TABLE SET SCHEMA
- *
- * Note: caller must have checked ownership of the relation already
+ * Perform permissions and integrity checks before acquiring a relation lock.
*/
-void
-AlterTableNamespace(RangeVar *relation, const char *newschema,
- ObjectType stmttype, LOCKMODE lockmode)
+static void
+RangeVarCallbackForAlterTableNamespace(const RangeVar *rv, Oid relid,
+ Oid oldrelid, void *arg)
{
- Relation rel;
- Oid relid;
- Oid oldNspOid;
- Oid nspOid;
- Relation classRel;
+ HeapTuple tuple;
+ Form_pg_class form;
+ ObjectType stmttype;
+
+ tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+ if (!HeapTupleIsValid(tuple))
+ return; /* concurrently dropped */
+ form = (Form_pg_class) GETSTRUCT(tuple);
- rel = relation_openrv(relation, lockmode);
+ /* Must own table. */
+ if (!pg_class_ownercheck(relid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, rv->relname);
- relid = RelationGetRelid(rel);
- oldNspOid = RelationGetNamespace(rel);
+ /* No system table modifications unless explicitly allowed. */
+ if (!allowSystemTableMods && IsSystemClass(form))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied: \"%s\" is a system catalog",
+ rv->relname)));
/* Check relation type against type specified in the ALTER command */
+ stmttype = * (ObjectType *) arg;
switch (stmttype)
{
case OBJECT_TABLE:
@@ -9348,27 +9419,24 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
break;
case OBJECT_SEQUENCE:
- if (rel->rd_rel->relkind != RELKIND_SEQUENCE)
+ if (form->relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a sequence",
- RelationGetRelationName(rel))));
+ errmsg("\"%s\" is not a sequence", rv->relname)));
break;
case OBJECT_VIEW:
- if (rel->rd_rel->relkind != RELKIND_VIEW)
+ if (form->relkind != RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a view",
- RelationGetRelationName(rel))));
+ errmsg("\"%s\" is not a view", rv->relname)));
break;
case OBJECT_FOREIGN_TABLE:
- if (rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
+ if (form->relkind != RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a foreign table",
- RelationGetRelationName(rel))));
+ errmsg("\"%s\" is not a foreign table", rv->relname)));
break;
default:
@@ -9376,33 +9444,18 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
}
/* Can we change the schema of this tuple? */
- switch (rel->rd_rel->relkind)
+ switch (form->relkind)
{
case RELKIND_RELATION:
case RELKIND_VIEW:
+ case RELKIND_SEQUENCE:
case RELKIND_FOREIGN_TABLE:
/* ok to change schema */
break;
- case RELKIND_SEQUENCE:
- {
- /* if it's an owned sequence, disallow moving it by itself */
- Oid tableId;
- int32 colId;
-
- if (sequenceIsOwned(relid, &tableId, &colId))
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot move an owned sequence into another schema"),
- errdetail("Sequence \"%s\" is linked to table \"%s\".",
- RelationGetRelationName(rel),
- get_rel_name(tableId))));
- }
- break;
case RELKIND_COMPOSITE_TYPE:
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a composite type",
- RelationGetRelationName(rel)),
+ errmsg("\"%s\" is a composite type", rv->relname),
errhint("Use ALTER TYPE instead.")));
break;
case RELKIND_INDEX:
@@ -9412,7 +9465,45 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table, view, sequence, or foreign table",
- RelationGetRelationName(rel))));
+ rv->relname)));
+ }
+
+ ReleaseSysCache(tuple);
+}
+
+/*
+ * Execute ALTER TABLE SET SCHEMA
+ */
+void
+AlterTableNamespace(RangeVar *relation, const char *newschema,
+ ObjectType stmttype, LOCKMODE lockmode)
+{
+ Relation rel;
+ Oid relid;
+ Oid oldNspOid;
+ Oid nspOid;
+ Relation classRel;
+
+ relid = RangeVarGetRelidExtended(relation, lockmode, false, false,
+ RangeVarCallbackForAlterTableNamespace,
+ (void *) &stmttype);
+ rel = relation_open(relid, NoLock);
+
+ oldNspOid = RelationGetNamespace(rel);
+
+ /* If it's an owned sequence, disallow moving it by itself. */
+ if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
+ {
+ Oid tableId;
+ int32 colId;
+
+ if (sequenceIsOwned(relid, &tableId, &colId))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot move an owned sequence into another schema"),
+ errdetail("Sequence \"%s\" is linked to table \"%s\".",
+ RelationGetRelationName(rel),
+ get_rel_name(tableId))));
}
/* get schema OID and check its permissions */
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index b205dec..f4c93e5 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -1152,6 +1152,39 @@ get_trigger_oid(Oid relid, const char *trigname, bool missing_ok)
}
/*
+ * Perform permissions and integrity checks before acquiring a relation lock.
+ */
+static void
+RangeVarCallbackForRenameTrigger(const RangeVar *rv, Oid relid, Oid oldrelid,
+ void *arg)
+{
+ HeapTuple tuple;
+ Form_pg_class form;
+
+ tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+ if (!HeapTupleIsValid(tuple))
+ return; /* concurrently dropped */
+ form = (Form_pg_class) GETSTRUCT(tuple);
+
+ /* only tables and views can have triggers */
+ if (form->relkind != RELKIND_RELATION && form->relkind != RELKIND_VIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a table or view", rv->relname)));
+
+ /* you must own the table to rename one of its triggers */
+ if (!pg_class_ownercheck(relid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, rv->relname);
+ if (!allowSystemTableMods && IsSystemClass(form))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied: \"%s\" is a system catalog",
+ rv->relname)));
+
+ ReleaseSysCache(tuple);
+}
+
+/*
* renametrig - changes the name of a trigger on a relation
*
* trigger name is changed in trigger catalog.
@@ -1165,21 +1198,26 @@ get_trigger_oid(Oid relid, const char *trigname, bool missing_ok)
* update row in catalog
*/
void
-renametrig(Oid relid,
- const char *oldname,
- const char *newname)
+renametrig(RenameStmt *stmt)
{
Relation targetrel;
Relation tgrel;
HeapTuple tuple;
SysScanDesc tgscan;
ScanKeyData key[2];
+ Oid relid;
/*
- * Grab an exclusive lock on the target table, which we will NOT release
- * until end of transaction.
+ * Look up name, check permissions, and acquire lock (which we will NOT
+ * release until end of transaction).
*/
- targetrel = heap_open(relid, AccessExclusiveLock);
+ relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
+ false, false,
+ RangeVarCallbackForRenameTrigger,
+ NULL);
+
+ /* Have lock already, so just need to build relcache entry. */
+ targetrel = relation_open(relid, NoLock);
/*
* Scan pg_trigger twice for existing triggers on relation. We do this in
@@ -1202,14 +1240,14 @@ renametrig(Oid relid,
ScanKeyInit(&key[1],
Anum_pg_trigger_tgname,
BTEqualStrategyNumber, F_NAMEEQ,
- PointerGetDatum(newname));
+ PointerGetDatum(stmt->newname));
tgscan = systable_beginscan(tgrel, TriggerRelidNameIndexId, true,
SnapshotNow, 2, key);
if (HeapTupleIsValid(tuple = systable_getnext(tgscan)))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("trigger \"%s\" for relation \"%s\" already exists",
- newname, RelationGetRelationName(targetrel))));
+ stmt->newname, RelationGetRelationName(targetrel))));
systable_endscan(tgscan);
/*
@@ -1222,7 +1260,7 @@ renametrig(Oid relid,
ScanKeyInit(&key[1],
Anum_pg_trigger_tgname,
BTEqualStrategyNumber, F_NAMEEQ,
- PointerGetDatum(oldname));
+ PointerGetDatum(stmt->subname));
tgscan = systable_beginscan(tgrel, TriggerRelidNameIndexId, true,
SnapshotNow, 2, key);
if (HeapTupleIsValid(tuple = systable_getnext(tgscan)))
@@ -1232,7 +1270,8 @@ renametrig(Oid relid,
*/
tuple = heap_copytuple(tuple); /* need a modifiable copy */
- namestrcpy(&((Form_pg_trigger) GETSTRUCT(tuple))->tgname, newname);
+ namestrcpy(&((Form_pg_trigger) GETSTRUCT(tuple))->tgname,
+ stmt->newname);
simple_heap_update(tgrel, &tuple->t_self, tuple);
@@ -1251,7 +1290,7 @@ renametrig(Oid relid,
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("trigger \"%s\" for table \"%s\" does not exist",
- oldname, RelationGetRelationName(targetrel))));
+ stmt->subname, RelationGetRelationName(targetrel))));
}
systable_endscan(tgscan);
@@ -1261,7 +1300,7 @@ renametrig(Oid relid,
/*
* Close rel, but keep exclusive lock!
*/
- heap_close(targetrel, NoLock);
+ relation_close(targetrel, NoLock);
}
diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c
index 84ba1a6..2b8f9ae 100644
--- a/src/backend/commands/typecmds.c
+++ b/src/backend/commands/typecmds.c
@@ -3121,8 +3121,7 @@ RenameType(List *names, const char *newTypeName)
* RenameRelationInternal will call RenameTypeInternal automatically.
*/
if (typTup->typtype == TYPTYPE_COMPOSITE)
- RenameRelationInternal(typTup->typrelid, newTypeName,
- typTup->typnamespace);
+ RenameRelationInternal(typTup->typrelid, newTypeName);
else
RenameTypeInternal(typeOid, newTypeName,
typTup->typnamespace);
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 333e303..20632eb 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -45,15 +45,12 @@ extern void ExecuteTruncate(TruncateStmt *stmt);
extern void SetRelationHasSubclass(Oid relationId, bool relhassubclass);
-extern void renameatt(Oid myrelid, RenameStmt *stmt);
+extern void renameatt(RenameStmt *stmt);
-extern void RenameRelation(Oid myrelid,
- const char *newrelname,
- ObjectType reltype);
+extern void RenameRelation(RenameStmt *stmt);
extern void RenameRelationInternal(Oid myrelid,
- const char *newrelname,
- Oid namespaceId);
+ const char *newrelname);
extern void find_composite_type_dependencies(Oid typeOid,
Relation origRelation,
diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h
index e7d28f7..4e3fb7d 100644
--- a/src/include/commands/trigger.h
+++ b/src/include/commands/trigger.h
@@ -115,7 +115,7 @@ extern Oid CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
extern void RemoveTriggerById(Oid trigOid);
extern Oid get_trigger_oid(Oid relid, const char *name, bool missing_ok);
-extern void renametrig(Oid relid, const char *oldname, const char *newname);
+extern void renametrig(RenameStmt *stmt);
extern void EnableDisableTrigger(Relation rel, const char *tgname,
char fires_when, bool skip_system);
On Tue, Dec 06, 2011 at 04:02:31PM -0500, Robert Haas wrote:
On Mon, Dec 5, 2011 at 2:09 AM, Noah Misch <noah@leadboat.com> wrote:
? ? ? ? ? ? ? AcceptInvalidationMessages();
The above call can go away, now.
Doesn't that still protect us against namespace-shadowing issues?
RangeVarGetRelid doesn't actually AcceptInvalidationMessages() at the
top.
It narrows the window for race conditions of that genesis, but isn't doing so
an anti-feature? Even if not, doing that _only_ in RemoveRelations() is odd.
FWIW, this is fairly independent of your latest patches -- I just happened to
notice it due to the incidental quotation.
Attached please find a patch with some more fixes on this same general
theme. This one tackles renaming of relations, columns, and triggers;
and changing the schema of relations. In these cases, the current
code does a permissions check before locking the table (which is good)
and uses RangeVarGetRelid() to guard against "cache lookup failure"
errors caused by concurrent DDL (also good). However, if the referent
of the name changes during the lock wait, we don't recheck
permissions; we just allow the rename or schema change on the basis
that the user had permission to do it to the relation that formerly
had that name. While this is pretty minor as security concerns go, it
seems best to clean it up, so this patch does that.
I'd suggest limiting callback functions to checks that benefit greatly from
preceding the lock acquisition. In these cases, that's only the target object
ownership check; all other checks can wait for the lock. Writing correct
callback code is relatively tricky; we have no lock, so the available catalog
entries can change arbitrarily often as the function operates. It's worth the
trouble of navigating that hazard to keep the mob from freely locking all
objects. However, if the owner of "some_table" writes "ALTER VIEW some_table
RENAME TO foo", I see no commensurate benefit from reporting the relkind
mismatch before locking the relation. This would also permit sharing a
callback among almost all the call sites you've improved so far. Offhand, I
think only ReindexIndex() would retain a bespoke callback.
This patch removes two of the three callers of CheckRelationOwnership(),
copying some of its logic to two other sites. I'd suggest fixing the third
caller (standard_ProcessUtility() for CREATE INDEX) in this same patch. That
way, we can either delete the function now or adjust it to permit continued
sharing of that code under the new regime.
I like how this patch reduces the arbitrary division of authorization checks
between alter.c and tablecmds.c.
Thanks,
nm
On Wed, Dec 7, 2011 at 8:42 AM, Noah Misch <noah@leadboat.com> wrote:
It narrows the window for race conditions of that genesis, but isn't doing so
an anti-feature? Even if not, doing that _only_ in RemoveRelations() is odd.
I dunno. I was just reluctant to change things without a clear reason
for doing so, and "it's not what we do in other places" didn't seem
like enough. Really, I'd like to *always* do
AcceptInvalidationMessages() before a name lookup, but I'm not
convinced it's cheap enough for that.
Attached please find a patch with some more fixes on this same general
theme. This one tackles renaming of relations, columns, and triggers;
and changing the schema of relations. In these cases, the current
code does a permissions check before locking the table (which is good)
and uses RangeVarGetRelid() to guard against "cache lookup failure"
errors caused by concurrent DDL (also good). However, if the referent
of the name changes during the lock wait, we don't recheck
permissions; we just allow the rename or schema change on the basis
that the user had permission to do it to the relation that formerly
had that name. While this is pretty minor as security concerns go, it
seems best to clean it up, so this patch does that.I'd suggest limiting callback functions to checks that benefit greatly from
preceding the lock acquisition. In these cases, that's only the target object
ownership check; all other checks can wait for the lock. Writing correct
callback code is relatively tricky; we have no lock, so the available catalog
entries can change arbitrarily often as the function operates. It's worth the
trouble of navigating that hazard to keep the mob from freely locking all
objects. However, if the owner of "some_table" writes "ALTER VIEW some_table
RENAME TO foo", I see no commensurate benefit from reporting the relkind
mismatch before locking the relation. This would also permit sharing a
callback among almost all the call sites you've improved so far. Offhand, I
think only ReindexIndex() would retain a bespoke callback.
No, I don't think so. REINDEX INDEX needs special heap locking and
does not reject operations on system tables, REINDEX TABLE does not
reject operations on system tables, LOCK TABLE has special permissions
requirements and does not reject operations on system tables, DROP
TABLE also has special permissions requirements, RENAME ATTRIBUTE is
"normal" (i.e. must own relation, must not apply to system tables),
and RENAME RELATION has an extra permission check. It might not be
worth having separate callbacks *just* to check the relkind, but I
don't think we have any instances of that in what's already committed
or in this patch. It's an issue that is on my mind, though; and
perhaps as I keep cranking through the call sites some things will
turn up that can be merged. It's amazing how many subtly different
permissions requirements there are here, but none of them seem
terribly ill-founded, and, in any case, changing them is a job for
another day if it's to be done at all.
This patch removes two of the three callers of CheckRelationOwnership(),
copying some of its logic to two other sites. I'd suggest fixing the third
caller (standard_ProcessUtility() for CREATE INDEX) in this same patch. That
way, we can either delete the function now or adjust it to permit continued
sharing of that code under the new regime.
I had the same thought, but wasn't quite sure how to do it. That code
seems to be making the rather optimistic assumption that you can look
up the same name as many times as you want and get the same answer.
CheckRelationOwnership() looks it up, and then transformIndexStmt()
looks it up again, and then DefineIndex() looks it up again ... twice,
in the case of a concurrent build. If someone renames a relation with
the same name into a namespace earlier in our search path during all
this, the chances of totally nutty behavior seem pretty good; what
happens if we do phase 2 of the concurrent index build on a different
relation than phase 1? So I didn't attempt to tackle the problem just
because it looked to me like the code needed a little more TLC than I
had time to give it, but maybe it deserves a second look.
I like how this patch reduces the arbitrary division of authorization checks
between alter.c and tablecmds.c.
Thanks.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On Fri, Dec 09, 2011 at 03:43:19PM -0500, Robert Haas wrote:
On Wed, Dec 7, 2011 at 8:42 AM, Noah Misch <noah@leadboat.com> wrote:
It narrows the window for race conditions of that genesis, but isn't doing so
an anti-feature? ?Even if not, doing that _only_ in RemoveRelations() is odd.I dunno. I was just reluctant to change things without a clear reason
for doing so, and "it's not what we do in other places" didn't seem
like enough. Really, I'd like to *always* do
AcceptInvalidationMessages() before a name lookup, but I'm not
convinced it's cheap enough for that.
Okay.
I'd suggest limiting callback functions to checks that benefit greatly from
preceding the lock acquisition. ?In these cases, that's only the target object
ownership check; all other checks can wait for the lock. ?Writing correct
callback code is relatively tricky; we have no lock, so the available catalog
entries can change arbitrarily often as the function operates. ?It's worth the
trouble of navigating that hazard to keep the mob from freely locking all
objects. ?However, if the owner of "some_table" writes "ALTER VIEW some_table
RENAME TO foo", I see no commensurate benefit from reporting the relkind
mismatch before locking the relation. ?This would also permit sharing a
callback among almost all the call sites you've improved so far. ?Offhand, I
think only ReindexIndex() would retain a bespoke callback.No, I don't think so. REINDEX INDEX needs special heap locking and
does not reject operations on system tables, REINDEX TABLE does not
reject operations on system tables, LOCK TABLE has special permissions
requirements and does not reject operations on system tables, DROP
TABLE also has special permissions requirements, RENAME ATTRIBUTE is
"normal" (i.e. must own relation, must not apply to system tables),
and RENAME RELATION has an extra permission check. It might not be
worth having separate callbacks *just* to check the relkind, but I
don't think we have any instances of that in what's already committed
or in this patch. It's an issue that is on my mind, though; and
perhaps as I keep cranking through the call sites some things will
turn up that can be merged.
I forgot that you fixed RemoveRelations() and LockTable() in your last patch.
In addition to ReindexIndex(), which I mentioned, those two do need their own
callbacks in any case.
It also seems my last explanation didn't convey the point. Yes, nearly every
command has a different set of permissions checks. However, we don't benefit
equally from performing each of those checks before acquiring a lock.
Consider renameatt(), which checks three things: you must own the relation,
the relation must be of a supported relkind, and the relation must not be a
typed table. To limit opportunities for denial of service, let's definitely
perform the ownership check before taking a lock. The other two checks can
wait until we hold that lock. The benefit of checking them early is to avoid
making a careless relation owner wait for a lock before discovering the
invalidity of his command. That's nice as far as it goes, but let's not
proliferate callbacks for such a third-order benefit.
This patch removes two of the three callers of CheckRelationOwnership(),
copying some of its logic to two other sites. ?I'd suggest fixing the third
caller (standard_ProcessUtility() for CREATE INDEX) in this same patch. ?That
way, we can either delete the function now or adjust it to permit continued
sharing of that code under the new regime.I had the same thought, but wasn't quite sure how to do it. That code
seems to be making the rather optimistic assumption that you can look
up the same name as many times as you want and get the same answer.
CheckRelationOwnership() looks it up, and then transformIndexStmt()
looks it up again, and then DefineIndex() looks it up again ... twice,
in the case of a concurrent build. If someone renames a relation with
the same name into a namespace earlier in our search path during all
this, the chances of totally nutty behavior seem pretty good; what
happens if we do phase 2 of the concurrent index build on a different
relation than phase 1? So I didn't attempt to tackle the problem just
because it looked to me like the code needed a little more TLC than I
had time to give it, but maybe it deserves a second look.
Ah, fair enough.
On Fri, Dec 9, 2011 at 5:41 PM, Noah Misch <noah@leadboat.com> wrote:
It also seems my last explanation didn't convey the point. Yes, nearly every
command has a different set of permissions checks. However, we don't benefit
equally from performing each of those checks before acquiring a lock.
Consider renameatt(), which checks three things: you must own the relation,
the relation must be of a supported relkind, and the relation must not be a
typed table. To limit opportunities for denial of service, let's definitely
perform the ownership check before taking a lock. The other two checks can
wait until we hold that lock. The benefit of checking them early is to avoid
making a careless relation owner wait for a lock before discovering the
invalidity of his command. That's nice as far as it goes, but let's not
proliferate callbacks for such a third-order benefit.
I agree, but my point is that so far we have no callbacks that differ
only in that detail. I accept that we'd probably want to avoid that.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On Thu, Dec 15, 2011 at 07:04:20PM -0500, Robert Haas wrote:
On Fri, Dec 9, 2011 at 5:41 PM, Noah Misch <noah@leadboat.com> wrote:
It also seems my last explanation didn't convey the point. ?Yes, nearly every
command has a different set of permissions checks. ?However, we don't benefit
equally from performing each of those checks before acquiring a lock.
Consider renameatt(), which checks three things: you must own the relation,
the relation must be of a supported relkind, and the relation must not be a
typed table. ?To limit opportunities for denial of service, let's definitely
perform the ownership check before taking a lock. ?The other two checks can
wait until we hold that lock. ?The benefit of checking them early is to avoid
making a careless relation owner wait for a lock before discovering the
invalidity of his command. ?That's nice as far as it goes, but let's not
proliferate callbacks for such a third-order benefit.I agree, but my point is that so far we have no callbacks that differ
only in that detail. I accept that we'd probably want to avoid that.
To illustrate what I had in mind, here's a version of your patch that has five
callers sharing a callback. The patch is against d039fd51f79e, just prior to
your recent commits.
Attachments:
rangevargetrelid-callback-round2-nm1.patchtext/plain; charset=us-asciiDownload
*** a/src/backend/catalog/namespace.c
--- b/src/backend/catalog/namespace.c
***************
*** 211,217 **** Datum pg_is_other_temp_schema(PG_FUNCTION_ARGS);
/*
! * RangeVarGetRelid
* Given a RangeVar describing an existing relation,
* select the proper namespace and look up the relation OID.
*
--- 211,217 ----
/*
! * RangeVarGetRelidExtended
* Given a RangeVar describing an existing relation,
* select the proper namespace and look up the relation OID.
*
***************
*** 409,414 **** RangeVarGetRelidExtended(const RangeVar *relation, LOCKMODE lockmode,
--- 409,427 ----
}
/*
+ * RangeVarCallbackCheckOwner
+ * RangeVarGetRelidExtended() callback to check ownership alone.
+ */
+ void
+ RangeVarCallbackCheckOwner(const RangeVar *relation,
+ Oid relId, Oid oldRelId, void *arg)
+ {
+ /* Nothing to do if the relation was not found. */
+ if (OidIsValid(relId) && !pg_class_ownercheck(relId, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname);
+ }
+
+ /*
* RangeVarGetCreationNamespace
* Given a RangeVar describing a to-be-created relation,
* choose which namespace to create it in.
*** a/src/backend/commands/alter.c
--- b/src/backend/commands/alter.c
***************
*** 105,166 **** ExecRenameStmt(RenameStmt *stmt)
case OBJECT_SEQUENCE:
case OBJECT_VIEW:
case OBJECT_INDEX:
case OBJECT_COLUMN:
case OBJECT_ATTRIBUTE:
case OBJECT_TRIGGER:
! case OBJECT_FOREIGN_TABLE:
! {
! Oid relid;
!
! CheckRelationOwnership(stmt->relation, true);
!
! /*
! * Lock level used here should match what will be taken later,
! * in RenameRelation, renameatt, or renametrig.
! */
! relid = RangeVarGetRelid(stmt->relation, AccessExclusiveLock,
! false);
!
! switch (stmt->renameType)
! {
! case OBJECT_TABLE:
! case OBJECT_SEQUENCE:
! case OBJECT_VIEW:
! case OBJECT_INDEX:
! case OBJECT_FOREIGN_TABLE:
! {
! /*
! * RENAME TABLE requires that we (still) hold
! * CREATE rights on the containing namespace, as
! * well as ownership of the table.
! */
! Oid namespaceId = get_rel_namespace(relid);
! AclResult aclresult;
!
! aclresult = pg_namespace_aclcheck(namespaceId,
! GetUserId(),
! ACL_CREATE);
! if (aclresult != ACLCHECK_OK)
! aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
! get_namespace_name(namespaceId));
!
! RenameRelation(relid, stmt->newname, stmt->renameType);
! break;
! }
! case OBJECT_COLUMN:
! case OBJECT_ATTRIBUTE:
! renameatt(relid, stmt);
! break;
! case OBJECT_TRIGGER:
! renametrig(relid,
! stmt->subname, /* old att name */
! stmt->newname); /* new att name */
! break;
! default:
! /* can't happen */ ;
! }
! break;
! }
case OBJECT_TSPARSER:
RenameTSParser(stmt->object, stmt->newname);
--- 105,122 ----
case OBJECT_SEQUENCE:
case OBJECT_VIEW:
case OBJECT_INDEX:
+ case OBJECT_FOREIGN_TABLE:
+ RenameRelation(stmt);
+ break;
+
case OBJECT_COLUMN:
case OBJECT_ATTRIBUTE:
+ renameatt(stmt);
+ break;
+
case OBJECT_TRIGGER:
! renametrig(stmt);
! break;
case OBJECT_TSPARSER:
RenameTSParser(stmt->object, stmt->newname);
***************
*** 235,241 **** ExecAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt)
case OBJECT_TABLE:
case OBJECT_VIEW:
case OBJECT_FOREIGN_TABLE:
- CheckRelationOwnership(stmt->relation, true);
AlterTableNamespace(stmt->relation, stmt->newschema,
stmt->objectType, AccessExclusiveLock);
break;
--- 191,196 ----
*** a/src/backend/commands/cluster.c
--- b/src/backend/commands/cluster.c
***************
*** 1474,1501 **** finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
{
Relation toastrel;
Oid toastidx;
- Oid toastnamespace;
char NewToastName[NAMEDATALEN];
toastrel = relation_open(newrel->rd_rel->reltoastrelid,
AccessShareLock);
toastidx = toastrel->rd_rel->reltoastidxid;
- toastnamespace = toastrel->rd_rel->relnamespace;
relation_close(toastrel, AccessShareLock);
/* rename the toast table ... */
snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
OIDOldHeap);
RenameRelationInternal(newrel->rd_rel->reltoastrelid,
! NewToastName,
! toastnamespace);
/* ... and its index too */
snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
OIDOldHeap);
RenameRelationInternal(toastidx,
! NewToastName,
! toastnamespace);
}
relation_close(newrel, NoLock);
}
--- 1474,1497 ----
{
Relation toastrel;
Oid toastidx;
char NewToastName[NAMEDATALEN];
toastrel = relation_open(newrel->rd_rel->reltoastrelid,
AccessShareLock);
toastidx = toastrel->rd_rel->reltoastidxid;
relation_close(toastrel, AccessShareLock);
/* rename the toast table ... */
snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
OIDOldHeap);
RenameRelationInternal(newrel->rd_rel->reltoastrelid,
! NewToastName);
/* ... and its index too */
snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
OIDOldHeap);
RenameRelationInternal(toastidx,
! NewToastName);
}
relation_close(newrel, NoLock);
}
*** a/src/backend/commands/indexcmds.c
--- b/src/backend/commands/indexcmds.c
***************
*** 63,70 **** static void ComputeIndexAttrs(IndexInfo *indexInfo,
static Oid GetIndexOpClass(List *opclass, Oid attrType,
char *accessMethodName, Oid accessMethodId);
static char *ChooseIndexNameAddition(List *colnames);
- static void RangeVarCallbackForReindexTable(const RangeVar *relation,
- Oid relId, Oid oldRelId, void *arg);
static void RangeVarCallbackForReindexIndex(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg);
--- 63,68 ----
***************
*** 1806,1815 **** void
ReindexTable(RangeVar *relation)
{
Oid heapOid;
/* The lock level used here should match reindex_relation(). */
heapOid = RangeVarGetRelidExtended(relation, ShareLock, false, false,
! RangeVarCallbackForReindexTable, NULL);
if (!reindex_relation(heapOid, REINDEX_REL_PROCESS_TOAST))
ereport(NOTICE,
--- 1804,1826 ----
ReindexTable(RangeVar *relation)
{
Oid heapOid;
+ HeapTuple tuple;
/* The lock level used here should match reindex_relation(). */
heapOid = RangeVarGetRelidExtended(relation, ShareLock, false, false,
! RangeVarCallbackCheckOwner, NULL);
! tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(heapOid));
! if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
! elog(ERROR, "cache lookup failed for relation %u", heapOid);
!
! if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION &&
! ((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_TOASTVALUE)
! ereport(ERROR,
! (errcode(ERRCODE_WRONG_OBJECT_TYPE),
! errmsg("\"%s\" is not a table",
! relation->relname)));
!
! ReleaseSysCache(tuple);
if (!reindex_relation(heapOid, REINDEX_REL_PROCESS_TOAST))
ereport(NOTICE,
***************
*** 1818,1854 **** ReindexTable(RangeVar *relation)
}
/*
- * Check permissions on table before acquiring relation lock.
- */
- static void
- RangeVarCallbackForReindexTable(const RangeVar *relation,
- Oid relId, Oid oldRelId, void *arg)
- {
- char relkind;
-
- /* Nothing to do if the relation was not found. */
- if (!OidIsValid(relId))
- return;
-
- /*
- * If the relation does exist, check whether it's an index. But note
- * that the relation might have been dropped between the time we did the
- * name lookup and now. In that case, there's nothing to do.
- */
- relkind = get_rel_relkind(relId);
- if (!relkind)
- return;
- if (relkind != RELKIND_RELATION && relkind != RELKIND_TOASTVALUE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table", relation->relname)));
-
- /* Check permissions */
- if (!pg_class_ownercheck(relId, GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname);
- }
-
- /*
* ReindexDatabase
* Recreate indexes of a database.
*
--- 1829,1834 ----
*** a/src/backend/commands/tablecmds.c
--- b/src/backend/commands/tablecmds.c
***************
*** 2257,2264 **** renameatt_internal(Oid myrelid,
* renameatt - changes the name of a attribute in a relation
*/
void
! renameatt(Oid myrelid, RenameStmt *stmt)
{
renameatt_internal(myrelid,
stmt->subname, /* old att name */
stmt->newname, /* new att name */
--- 2257,2270 ----
* renameatt - changes the name of a attribute in a relation
*/
void
! renameatt(RenameStmt *stmt)
{
+ Oid myrelid;
+
+ /* lock level taken here should match renameatt_internal */
+ myrelid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
+ false, false,
+ RangeVarCallbackCheckOwner, NULL);
renameatt_internal(myrelid,
stmt->subname, /* old att name */
stmt->newname, /* new att name */
***************
*** 2269,2295 **** renameatt(Oid myrelid, RenameStmt *stmt)
}
! /*
! * Execute ALTER TABLE/INDEX/SEQUENCE/VIEW/FOREIGN TABLE RENAME
! *
! * Caller has already done permissions checks.
! */
void
! RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
{
Relation targetrelation;
Oid namespaceId;
char relkind;
/*
* Grab an exclusive lock on the target table, index, sequence or view,
* which we will NOT release until end of transaction.
*
! * Lock level used here should match ExecRenameStmt
*/
targetrelation = relation_open(myrelid, AccessExclusiveLock);
namespaceId = RelationGetNamespace(targetrelation);
relkind = targetrelation->rd_rel->relkind;
/*
--- 2275,2316 ----
}
! /* Execute ALTER TABLE/INDEX/SEQUENCE/VIEW/FOREIGN TABLE RENAME */
void
! RenameRelation(RenameStmt *stmt)
{
+ Oid myrelid;
Relation targetrelation;
Oid namespaceId;
+ AclResult aclresult;
char relkind;
+ ObjectType reltype = stmt->renameType;
/*
* Grab an exclusive lock on the target table, index, sequence or view,
* which we will NOT release until end of transaction.
*
! * Lock level used here should match RenameRelationInternal, to avoid
! * lock escalation.
*/
+ myrelid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
+ false, false,
+ RangeVarCallbackCheckOwner, NULL);
targetrelation = relation_open(myrelid, AccessExclusiveLock);
+ if (!allowSystemTableMods && IsSystemRelation(targetrelation))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied: \"%s\" is a system catalog",
+ RelationGetRelationName(targetrelation))));
+
+ /* Must (still) have CREATE rights on containing namespace. */
namespaceId = RelationGetNamespace(targetrelation);
+ aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(), ACL_CREATE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
+ get_namespace_name(namespaceId));
+
relkind = targetrelation->rd_rel->relkind;
/*
***************
*** 2328,2334 **** RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
errhint("Use ALTER TYPE instead.")));
/* Do the work */
! RenameRelationInternal(myrelid, newrelname, namespaceId);
/*
* Close rel, but keep exclusive lock!
--- 2349,2355 ----
errhint("Use ALTER TYPE instead.")));
/* Do the work */
! RenameRelationInternal(myrelid, stmt->newname);
/*
* Close rel, but keep exclusive lock!
***************
*** 2346,2363 **** RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
* sequence, AFAIK there's no need for it to be there.
*/
void
! RenameRelationInternal(Oid myrelid, const char *newrelname, Oid namespaceId)
{
Relation targetrelation;
Relation relrelation; /* for RELATION relation */
HeapTuple reltup;
Form_pg_class relform;
/*
* Grab an exclusive lock on the target table, index, sequence or view,
* which we will NOT release until end of transaction.
*/
targetrelation = relation_open(myrelid, AccessExclusiveLock);
/*
* Find relation's pg_class tuple, and make sure newrelname isn't in use.
--- 2367,2386 ----
* sequence, AFAIK there's no need for it to be there.
*/
void
! RenameRelationInternal(Oid myrelid, const char *newrelname)
{
Relation targetrelation;
Relation relrelation; /* for RELATION relation */
HeapTuple reltup;
Form_pg_class relform;
+ Oid namespaceId;
/*
* Grab an exclusive lock on the target table, index, sequence or view,
* which we will NOT release until end of transaction.
*/
targetrelation = relation_open(myrelid, AccessExclusiveLock);
+ namespaceId = RelationGetNamespace(targetrelation);
/*
* Find relation's pg_class tuple, and make sure newrelname isn't in use.
***************
*** 5376,5382 **** ATExecAddIndexConstraint(AlteredTableInfo *tab, Relation rel,
ereport(NOTICE,
(errmsg("ALTER TABLE / ADD CONSTRAINT USING INDEX will rename index \"%s\" to \"%s\"",
indexName, constraintName)));
! RenameRelation(index_oid, constraintName, OBJECT_INDEX);
}
/* Extra checks needed if making primary key */
--- 5399,5405 ----
ereport(NOTICE,
(errmsg("ALTER TABLE / ADD CONSTRAINT USING INDEX will rename index \"%s\" to \"%s\"",
indexName, constraintName)));
! RenameRelationInternal(index_oid, constraintName);
}
/* Extra checks needed if making primary key */
***************
*** 9316,9326 **** ATExecGenericOptions(Relation rel, List *options)
}
! /*
! * Execute ALTER TABLE SET SCHEMA
! *
! * Note: caller must have checked ownership of the relation already
! */
void
AlterTableNamespace(RangeVar *relation, const char *newschema,
ObjectType stmttype, LOCKMODE lockmode)
--- 9339,9345 ----
}
! /* Execute ALTER TABLE SET SCHEMA */
void
AlterTableNamespace(RangeVar *relation, const char *newschema,
ObjectType stmttype, LOCKMODE lockmode)
***************
*** 9331,9339 **** AlterTableNamespace(RangeVar *relation, const char *newschema,
Oid nspOid;
Relation classRel;
rel = relation_openrv(relation, lockmode);
! relid = RelationGetRelid(rel);
oldNspOid = RelationGetNamespace(rel);
/* Check relation type against type specified in the ALTER command */
--- 9350,9365 ----
Oid nspOid;
Relation classRel;
+ relid = RangeVarGetRelidExtended(relation, lockmode, false, false,
+ RangeVarCallbackCheckOwner, NULL);
rel = relation_openrv(relation, lockmode);
! if (!allowSystemTableMods && IsSystemRelation(rel))
! ereport(ERROR,
! (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
! errmsg("permission denied: \"%s\" is a system catalog",
! RelationGetRelationName(rel))));
!
oldNspOid = RelationGetNamespace(rel);
/* Check relation type against type specified in the ALTER command */
*** a/src/backend/commands/trigger.c
--- b/src/backend/commands/trigger.c
***************
*** 1165,1185 **** get_trigger_oid(Oid relid, const char *trigname, bool missing_ok)
* update row in catalog
*/
void
! renametrig(Oid relid,
! const char *oldname,
! const char *newname)
{
Relation targetrel;
Relation tgrel;
HeapTuple tuple;
SysScanDesc tgscan;
ScanKeyData key[2];
/*
! * Grab an exclusive lock on the target table, which we will NOT release
! * until end of transaction.
*/
! targetrel = heap_open(relid, AccessExclusiveLock);
/*
* Scan pg_trigger twice for existing triggers on relation. We do this in
--- 1165,1195 ----
* update row in catalog
*/
void
! renametrig(RenameStmt *stmt)
{
Relation targetrel;
Relation tgrel;
HeapTuple tuple;
SysScanDesc tgscan;
ScanKeyData key[2];
+ Oid relid;
/*
! * Look up name, check permissions, and acquire lock (which we will NOT
! * release until end of transaction).
*/
! relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
! false, false,
! RangeVarCallbackCheckOwner, NULL);
!
! /* Have lock already, so just need to build relcache entry. */
! targetrel = heap_open(relid, NoLock);
!
! if (!allowSystemTableMods && IsSystemRelation(targetrel))
! ereport(ERROR,
! (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
! errmsg("permission denied: \"%s\" is a system catalog",
! RelationGetRelationName(targetrel))));
/*
* Scan pg_trigger twice for existing triggers on relation. We do this in
***************
*** 1202,1215 **** renametrig(Oid relid,
ScanKeyInit(&key[1],
Anum_pg_trigger_tgname,
BTEqualStrategyNumber, F_NAMEEQ,
! PointerGetDatum(newname));
tgscan = systable_beginscan(tgrel, TriggerRelidNameIndexId, true,
SnapshotNow, 2, key);
if (HeapTupleIsValid(tuple = systable_getnext(tgscan)))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("trigger \"%s\" for relation \"%s\" already exists",
! newname, RelationGetRelationName(targetrel))));
systable_endscan(tgscan);
/*
--- 1212,1225 ----
ScanKeyInit(&key[1],
Anum_pg_trigger_tgname,
BTEqualStrategyNumber, F_NAMEEQ,
! PointerGetDatum(stmt->newname));
tgscan = systable_beginscan(tgrel, TriggerRelidNameIndexId, true,
SnapshotNow, 2, key);
if (HeapTupleIsValid(tuple = systable_getnext(tgscan)))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("trigger \"%s\" for relation \"%s\" already exists",
! stmt->newname, RelationGetRelationName(targetrel))));
systable_endscan(tgscan);
/*
***************
*** 1222,1228 **** renametrig(Oid relid,
ScanKeyInit(&key[1],
Anum_pg_trigger_tgname,
BTEqualStrategyNumber, F_NAMEEQ,
! PointerGetDatum(oldname));
tgscan = systable_beginscan(tgrel, TriggerRelidNameIndexId, true,
SnapshotNow, 2, key);
if (HeapTupleIsValid(tuple = systable_getnext(tgscan)))
--- 1232,1238 ----
ScanKeyInit(&key[1],
Anum_pg_trigger_tgname,
BTEqualStrategyNumber, F_NAMEEQ,
! PointerGetDatum(stmt->subname));
tgscan = systable_beginscan(tgrel, TriggerRelidNameIndexId, true,
SnapshotNow, 2, key);
if (HeapTupleIsValid(tuple = systable_getnext(tgscan)))
***************
*** 1232,1238 **** renametrig(Oid relid,
*/
tuple = heap_copytuple(tuple); /* need a modifiable copy */
! namestrcpy(&((Form_pg_trigger) GETSTRUCT(tuple))->tgname, newname);
simple_heap_update(tgrel, &tuple->t_self, tuple);
--- 1242,1249 ----
*/
tuple = heap_copytuple(tuple); /* need a modifiable copy */
! namestrcpy(&((Form_pg_trigger) GETSTRUCT(tuple))->tgname,
! stmt->newname);
simple_heap_update(tgrel, &tuple->t_self, tuple);
***************
*** 1251,1257 **** renametrig(Oid relid,
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("trigger \"%s\" for table \"%s\" does not exist",
! oldname, RelationGetRelationName(targetrel))));
}
systable_endscan(tgscan);
--- 1262,1268 ----
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("trigger \"%s\" for table \"%s\" does not exist",
! stmt->subname, RelationGetRelationName(targetrel))));
}
systable_endscan(tgscan);
*** a/src/backend/commands/typecmds.c
--- b/src/backend/commands/typecmds.c
***************
*** 3121,3128 **** RenameType(List *names, const char *newTypeName)
* RenameRelationInternal will call RenameTypeInternal automatically.
*/
if (typTup->typtype == TYPTYPE_COMPOSITE)
! RenameRelationInternal(typTup->typrelid, newTypeName,
! typTup->typnamespace);
else
RenameTypeInternal(typeOid, newTypeName,
typTup->typnamespace);
--- 3121,3127 ----
* RenameRelationInternal will call RenameTypeInternal automatically.
*/
if (typTup->typtype == TYPTYPE_COMPOSITE)
! RenameRelationInternal(typTup->typrelid, newTypeName);
else
RenameTypeInternal(typeOid, newTypeName,
typTup->typnamespace);
*** a/src/include/catalog/namespace.h
--- b/src/include/catalog/namespace.h
***************
*** 57,62 **** extern Oid RangeVarGetRelidExtended(const RangeVar *relation,
--- 57,64 ----
LOCKMODE lockmode, bool missing_ok, bool nowait,
RangeVarGetRelidCallback callback,
void *callback_arg);
+ extern void RangeVarCallbackCheckOwner(const RangeVar *relation,
+ Oid relId, Oid oldRelId, void *arg);
extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation);
extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation);
extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid);
*** a/src/include/commands/tablecmds.h
--- b/src/include/commands/tablecmds.h
***************
*** 45,59 **** extern void ExecuteTruncate(TruncateStmt *stmt);
extern void SetRelationHasSubclass(Oid relationId, bool relhassubclass);
! extern void renameatt(Oid myrelid, RenameStmt *stmt);
! extern void RenameRelation(Oid myrelid,
! const char *newrelname,
! ObjectType reltype);
extern void RenameRelationInternal(Oid myrelid,
! const char *newrelname,
! Oid namespaceId);
extern void find_composite_type_dependencies(Oid typeOid,
Relation origRelation,
--- 45,56 ----
extern void SetRelationHasSubclass(Oid relationId, bool relhassubclass);
! extern void renameatt(RenameStmt *stmt);
! extern void RenameRelation(RenameStmt *stmt);
extern void RenameRelationInternal(Oid myrelid,
! const char *newrelname);
extern void find_composite_type_dependencies(Oid typeOid,
Relation origRelation,
*** a/src/include/commands/trigger.h
--- b/src/include/commands/trigger.h
***************
*** 115,121 **** extern Oid CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
extern void RemoveTriggerById(Oid trigOid);
extern Oid get_trigger_oid(Oid relid, const char *name, bool missing_ok);
! extern void renametrig(Oid relid, const char *oldname, const char *newname);
extern void EnableDisableTrigger(Relation rel, const char *tgname,
char fires_when, bool skip_system);
--- 115,121 ----
extern void RemoveTriggerById(Oid trigOid);
extern Oid get_trigger_oid(Oid relid, const char *name, bool missing_ok);
! extern void renametrig(RenameStmt *stmt);
extern void EnableDisableTrigger(Relation rel, const char *tgname,
char fires_when, bool skip_system);
On Fri, Dec 16, 2011 at 10:32 AM, Noah Misch <noah@leadboat.com> wrote:
I agree, but my point is that so far we have no callbacks that differ
only in that detail. I accept that we'd probably want to avoid that.To illustrate what I had in mind, here's a version of your patch that has five
callers sharing a callback. The patch is against d039fd51f79e, just prior to
your recent commits.
I don't think RenameRelation() ought to wait until we've taken the
lock before doing this:
+ aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(), ACL_CREATE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
+ get_namespace_name(namespaceId));
I'm not sure how we can distinguished in a principled way between
permissions checks that must be conducted before locking the relation
and those that can be done afterwards. And I don't really want to
make that distinction, anyhow.
But I see your point, otherwise. What I'm tempted to do is define a
new function RangeVarGetRelidCheckOwner(RangeVar *relation, LOCKMODE
lockmode, bool system_tables_ok) that will arrange to look up the OID,
check that you own it, optionally check that it's not a system table,
and acquire the specified lock. Internally that will call
RangeVarGetRelidExtended() with a callback; the callback arg can be
used to pass through the system_tables_ok flag.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On Mon, Dec 19, 2011 at 10:11:23AM -0500, Robert Haas wrote:
On Fri, Dec 16, 2011 at 10:32 AM, Noah Misch <noah@leadboat.com> wrote:
I agree, but my point is that so far we have no callbacks that differ
only in that detail. ?I accept that we'd probably want to avoid that.To illustrate what I had in mind, here's a version of your patch that has five
callers sharing a callback. ?The patch is against d039fd51f79e, just prior to
your recent commits.I don't think RenameRelation() ought to wait until we've taken the
lock before doing this:+ aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(), ACL_CREATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_NAMESPACE, + get_namespace_name(namespaceId));I'm not sure how we can distinguished in a principled way between
permissions checks that must be conducted before locking the relation
and those that can be done afterwards. And I don't really want to
make that distinction, anyhow.
Here's the rule I used: the pre-lock checks must prove that the user could, by
some command working as-designed, have taken a lock at least as strong as the
one we're about to acquire. How does that work out in practice? Relation
owners can always take AccessExclusiveLock by way of a DROP command, so
ownership is always sufficient as a pre-lock test.
The above namespace check is no exception; the object owner has authority to
take the AccessExclusiveLock independent of his authority to ultimately
complete the rename. (Changes arising from the recent discussions about
locking namespaces during DDL would complicate the situation in other ways.)
But I see your point, otherwise. What I'm tempted to do is define a
new function RangeVarGetRelidCheckOwner(RangeVar *relation, LOCKMODE
lockmode, bool system_tables_ok) that will arrange to look up the OID,
check that you own it, optionally check that it's not a system table,
and acquire the specified lock. Internally that will call
RangeVarGetRelidExtended() with a callback; the callback arg can be
used to pass through the system_tables_ok flag.
Is the system catalog check special?
Thanks,
nm
On Mon, Dec 19, 2011 at 11:55 AM, Noah Misch <noah@leadboat.com> wrote:
Here's the rule I used: the pre-lock checks must prove that the user could, by
some command working as-designed, have taken a lock at least as strong as the
one we're about to acquire. How does that work out in practice? Relation
owners can always take AccessExclusiveLock by way of a DROP command, so
ownership is always sufficient as a pre-lock test.The above namespace check is no exception; the object owner has authority to
take the AccessExclusiveLock independent of his authority to ultimately
complete the rename. (Changes arising from the recent discussions about
locking namespaces during DDL would complicate the situation in other ways.)
Yes: and I have it mind to work on that for the current release cycle,
time permitting. It seems to me that there's no real value in
consolidating this callback if there's a change coming down the pipe
that would require us to split it right back out again.
Also, while I see the point that it wouldn't be a security issue to
defer this particular check until after the lock is taken, I still
don't think it's a good idea. I basically don't see any point in
splitting up the security checks across multiple functions. Our
permissions-checking logic is already rather diffuse; I think that
finding more reasons for some of it to be done in one function and
some of it to be done in some other function is going in the wrong
direction. It makes it easier for future people editing the code to
rearrange things in ways that subtly break security without realizing
it, and it's also one more obstacle for things like sepgsql which
would like to get control whenever we do a security check. (Dimitri's
recent comments about command triggers suggest that MAC isn't the only
application of an option to override the default access control
policy.) It's also not terribly consistent from a user perspective:
hmm, if I don't have permission to do operation X because of reason A,
then my command fails immediately; if I don't have permission because
of reason B, then it waits for a lock and then fails. Some amount of
inconsistency there is probably inevitable, because, for example, we
won't know whether you have permission on an entire inheritance
hierarchy until we start walking it. But I don't really see the point
in going out of our way to add more of it.
I think it's important to keep in mind that most of the code that is
going into those callbacks is just being moved. We're not really
ending up with any more code overall, except to the extent that there
are a couple of lines of net increase for each callback because, well,
it has a comment, and maybe a declaration, and some curly braces at
the beginning and the end. The ownership check is two lines of code;
it doesn't matter whether we duplicate that or not. In many cases, I
think the callbacks are actually increasing readability, by pulling a
bunch of related checks into their own function instead of cluttering
up the main code path with them. I don't want to end up with a lot of
needless code duplication, but I also don't feel any powerful need to
squeeze the number of callback functions down to an absolute minimum
just because I can.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On Mon, Dec 19, 2011 at 01:43:18PM -0500, Robert Haas wrote:
On Mon, Dec 19, 2011 at 11:55 AM, Noah Misch <noah@leadboat.com> wrote:
Here's the rule I used: the pre-lock checks must prove that the user could, by
some command working as-designed, have taken a lock at least as strong as the
one we're about to acquire. ?How does that work out in practice? ?Relation
owners can always take AccessExclusiveLock by way of a DROP command, so
ownership is always sufficient as a pre-lock test.The above namespace check is no exception; the object owner has authority to
take the AccessExclusiveLock independent of his authority to ultimately
complete the rename. ?(Changes arising from the recent discussions about
locking namespaces during DDL would complicate the situation in other ways.)Yes: and I have it mind to work on that for the current release cycle,
time permitting. It seems to me that there's no real value in
consolidating this callback if there's a change coming down the pipe
that would require us to split it right back out again.Also, while I see the point that it wouldn't be a security issue to
defer this particular check until after the lock is taken, I still
don't think it's a good idea. I basically don't see any point in
splitting up the security checks across multiple functions. Our
permissions-checking logic is already rather diffuse; I think that
finding more reasons for some of it to be done in one function and
some of it to be done in some other function is going in the wrong
direction. It makes it easier for future people editing the code to
rearrange things in ways that subtly break security without realizing
it, and it's also one more obstacle for things like sepgsql which
would like to get control whenever we do a security check. (Dimitri's
recent comments about command triggers suggest that MAC isn't the only
application of an option to override the default access control
policy.) It's also not terribly consistent from a user perspective:
hmm, if I don't have permission to do operation X because of reason A,
then my command fails immediately; if I don't have permission because
of reason B, then it waits for a lock and then fails. Some amount of
inconsistency there is probably inevitable, because, for example, we
won't know whether you have permission on an entire inheritance
hierarchy until we start walking it. But I don't really see the point
in going out of our way to add more of it.I think it's important to keep in mind that most of the code that is
going into those callbacks is just being moved. We're not really
ending up with any more code overall, except to the extent that there
are a couple of lines of net increase for each callback because, well,
it has a comment, and maybe a declaration, and some curly braces at
the beginning and the end. The ownership check is two lines of code;
it doesn't matter whether we duplicate that or not. In many cases, I
think the callbacks are actually increasing readability, by pulling a
bunch of related checks into their own function instead of cluttering
up the main code path with them. I don't want to end up with a lot of
needless code duplication, but I also don't feel any powerful need to
squeeze the number of callback functions down to an absolute minimum
just because I can.
I'm satisfied that you and I have a common understanding of the options and
their respective merits. There's plenty of room for judgement in choosing
which merits to seize at the expense of others. Your judgements here seem
entirely reasonable.
nm
On Mon, Dec 19, 2011 at 3:12 PM, Noah Misch <noah@leadboat.com> wrote:
I'm satisfied that you and I have a common understanding of the options and
their respective merits. There's plenty of room for judgement in choosing
which merits to seize at the expense of others. Your judgements here seem
entirely reasonable.
Thanks. I'm not trying to be difficult.
After staring at this for quite a while longer, it seemed to me that
the logic for renaming a relation was similar enough to the logic for
changing a schema that the two calbacks could reasonably be combined
using a bit of conditional logic; and that, further, the same callback
could be used, with a small amount of additional modification, for
ALTER TABLE. Here's a patch to do that.
I also notice that cluster() - which doesn't have a callback - has
exactly the same needs as ReindexRelation() - which does. So that
case can certainly share code; though I'm not quite sure what to call
the shared callback, or which file to put it in.
RangeVarCallbackForStorageRewrite?
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
Attachments:
rangevargetrelid-callback-round3.patchapplication/octet-stream; name=rangevargetrelid-callback-round3.patchDownload
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 8513837..1efab73 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -191,8 +191,7 @@ ExecAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt)
case OBJECT_TABLE:
case OBJECT_VIEW:
case OBJECT_FOREIGN_TABLE:
- AlterTableNamespace(stmt->relation, stmt->newschema,
- stmt->objectType, AccessExclusiveLock);
+ AlterTableNamespace(stmt);
break;
case OBJECT_TSPARSER:
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 135736a..dc18fef 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -383,6 +383,8 @@ static const char *storage_name(char c);
static void RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid,
Oid oldRelOid, void *arg);
+static void RangeVarCallbackForAlterRelation(const RangeVar *rv, Oid relid,
+ Oid oldrelid, void *arg);
/* ----------------------------------------------------------------
@@ -2300,81 +2302,6 @@ renameatt(RenameStmt *stmt)
}
/*
- * Perform permissions and integrity checks before acquiring a relation lock.
- */
-static void
-RangeVarCallbackForRenameRelation(const RangeVar *rv, Oid relid, Oid oldrelid,
- void *arg)
-{
- RenameStmt *stmt = (RenameStmt *) arg;
- ObjectType reltype;
- HeapTuple tuple;
- Form_pg_class classform;
- AclResult aclresult;
- char relkind;
-
- tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
- if (!HeapTupleIsValid(tuple))
- return; /* concurrently dropped */
- classform = (Form_pg_class) GETSTRUCT(tuple);
- relkind = classform->relkind;
-
- /* Must own table. */
- if (!pg_class_ownercheck(relid, GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- NameStr(classform->relname));
-
- /* No system table modifications unless explicitly allowed. */
- if (!allowSystemTableMods && IsSystemClass(classform))
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("permission denied: \"%s\" is a system catalog",
- NameStr(classform->relname))));
-
- /* Must (still) have CREATE rights on containing namespace. */
- aclresult = pg_namespace_aclcheck(classform->relnamespace, GetUserId(),
- ACL_CREATE);
- if (aclresult != ACLCHECK_OK)
- aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
- get_namespace_name(classform->relnamespace));
-
- /*
- * For compatibility with prior releases, we don't complain if ALTER TABLE
- * or ALTER INDEX is used to rename some other type of relation. But
- * ALTER SEQUENCE/VIEW/FOREIGN TABLE are only to be used with relations of
- * that type.
- */
- reltype = stmt->renameType;
- if (reltype == OBJECT_SEQUENCE && relkind != RELKIND_SEQUENCE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a sequence", rv->relname)));
-
- if (reltype == OBJECT_VIEW && relkind != RELKIND_VIEW)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a view", rv->relname)));
-
- if (reltype == OBJECT_FOREIGN_TABLE && relkind != RELKIND_FOREIGN_TABLE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a foreign table", rv->relname)));
-
- /*
- * Don't allow ALTER TABLE on composite types. We want people to use ALTER
- * TYPE for that.
- */
- if (relkind == RELKIND_COMPOSITE_TYPE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a composite type", rv->relname),
- errhint("Use ALTER TYPE instead.")));
-
- ReleaseSysCache(tuple);
-}
-
-
-/*
* Execute ALTER TABLE/INDEX/SEQUENCE/VIEW/FOREIGN TABLE RENAME
*/
void
@@ -2391,7 +2318,7 @@ RenameRelation(RenameStmt *stmt)
*/
relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
false, false,
- RangeVarCallbackForRenameRelation,
+ RangeVarCallbackForAlterRelation,
(void *) stmt);
/* Do the work */
@@ -2527,6 +2454,19 @@ CheckTableNotInUse(Relation rel, const char *stmt)
}
/*
+ * AlterTableLookupRelation
+ * Look up, and lock, the OID for the relation named by an alter table
+ * statement.
+ */
+Oid
+AlterTableLookupRelation(AlterTableStmt *stmt, LOCKMODE lockmode)
+{
+ return RangeVarGetRelidExtended(stmt->relation, lockmode, false, false,
+ RangeVarCallbackForAlterRelation,
+ (void *) stmt);
+}
+
+/*
* AlterTable
* Execute ALTER TABLE, which can be a list of subcommands
*
@@ -2560,90 +2500,26 @@ CheckTableNotInUse(Relation rel, const char *stmt)
* Thanks to the magic of MVCC, an error anywhere along the way rolls back
* the whole operation; we don't have to do anything special to clean up.
*
- * We lock the table as the first action, with an appropriate lock level
+ * The caller must lock the relation, with an appropriate lock level
* for the subcommands requested. Any subcommand that needs to rewrite
* tuples in the table forces the whole command to be executed with
- * AccessExclusiveLock. If all subcommands do not require rewrite table
- * then we may be able to use lower lock levels. We pass the lock level down
+ * AccessExclusiveLock (actually, that is currently required always, but
+ * we hope to relax it at some point). We pass the lock level down
* so that we can apply it recursively to inherited tables. Note that the
- * lock level we want as we recurse may well be higher than required for
+ * lock level we want as we recurse might well be higher than required for
* that specific subcommand. So we pass down the overall lock requirement,
* rather than reassess it at lower levels.
*/
void
-AlterTable(AlterTableStmt *stmt)
+AlterTable(Oid relid, LOCKMODE lockmode, AlterTableStmt *stmt)
{
Relation rel;
- LOCKMODE lockmode = AlterTableGetLockLevel(stmt->cmds);
- /*
- * Acquire same level of lock as already acquired during parsing.
- */
- rel = relation_openrv(stmt->relation, lockmode);
+ /* Caller is required to provide an adequate lock. */
+ rel = relation_open(relid, NoLock);
CheckTableNotInUse(rel, "ALTER TABLE");
- /* Check relation type against type specified in the ALTER command */
- switch (stmt->relkind)
- {
- case OBJECT_TABLE:
-
- /*
- * For mostly-historical reasons, we allow ALTER TABLE to apply to
- * almost all relation types.
- */
- if (rel->rd_rel->relkind == RELKIND_COMPOSITE_TYPE
- || rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table",
- RelationGetRelationName(rel))));
- break;
-
- case OBJECT_INDEX:
- if (rel->rd_rel->relkind != RELKIND_INDEX)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not an index",
- RelationGetRelationName(rel))));
- break;
-
- case OBJECT_SEQUENCE:
- if (rel->rd_rel->relkind != RELKIND_SEQUENCE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a sequence",
- RelationGetRelationName(rel))));
- break;
-
- case OBJECT_TYPE:
- if (rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a composite type",
- RelationGetRelationName(rel))));
- break;
-
- case OBJECT_VIEW:
- if (rel->rd_rel->relkind != RELKIND_VIEW)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a view",
- RelationGetRelationName(rel))));
- break;
-
- case OBJECT_FOREIGN_TABLE:
- if (rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a foreign table",
- RelationGetRelationName(rel))));
- break;
-
- default:
- elog(ERROR, "unrecognized object type: %d", (int) stmt->relkind);
- }
-
ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt),
lockmode);
}
@@ -9380,103 +9256,10 @@ ATExecGenericOptions(Relation rel, List *options)
}
/*
- * Perform permissions and integrity checks before acquiring a relation lock.
- */
-static void
-RangeVarCallbackForAlterTableNamespace(const RangeVar *rv, Oid relid,
- Oid oldrelid, void *arg)
-{
- HeapTuple tuple;
- Form_pg_class form;
- ObjectType stmttype;
-
- tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
- if (!HeapTupleIsValid(tuple))
- return; /* concurrently dropped */
- form = (Form_pg_class) GETSTRUCT(tuple);
-
- /* Must own table. */
- if (!pg_class_ownercheck(relid, GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, rv->relname);
-
- /* No system table modifications unless explicitly allowed. */
- if (!allowSystemTableMods && IsSystemClass(form))
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("permission denied: \"%s\" is a system catalog",
- rv->relname)));
-
- /* Check relation type against type specified in the ALTER command */
- stmttype = * (ObjectType *) arg;
- switch (stmttype)
- {
- case OBJECT_TABLE:
-
- /*
- * For mostly-historical reasons, we allow ALTER TABLE to apply to
- * all relation types.
- */
- break;
-
- case OBJECT_SEQUENCE:
- if (form->relkind != RELKIND_SEQUENCE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a sequence", rv->relname)));
- break;
-
- case OBJECT_VIEW:
- if (form->relkind != RELKIND_VIEW)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a view", rv->relname)));
- break;
-
- case OBJECT_FOREIGN_TABLE:
- if (form->relkind != RELKIND_FOREIGN_TABLE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a foreign table", rv->relname)));
- break;
-
- default:
- elog(ERROR, "unrecognized object type: %d", (int) stmttype);
- }
-
- /* Can we change the schema of this tuple? */
- switch (form->relkind)
- {
- case RELKIND_RELATION:
- case RELKIND_VIEW:
- case RELKIND_SEQUENCE:
- case RELKIND_FOREIGN_TABLE:
- /* ok to change schema */
- break;
- case RELKIND_COMPOSITE_TYPE:
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a composite type", rv->relname),
- errhint("Use ALTER TYPE instead.")));
- break;
- case RELKIND_INDEX:
- case RELKIND_TOASTVALUE:
- /* FALL THRU */
- default:
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table, view, sequence, or foreign table",
- rv->relname)));
- }
-
- ReleaseSysCache(tuple);
-}
-
-/*
* Execute ALTER TABLE SET SCHEMA
*/
void
-AlterTableNamespace(RangeVar *relation, const char *newschema,
- ObjectType stmttype, LOCKMODE lockmode)
+AlterTableNamespace(AlterObjectSchemaStmt *stmt)
{
Relation rel;
Oid relid;
@@ -9484,9 +9267,10 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
Oid nspOid;
Relation classRel;
- relid = RangeVarGetRelidExtended(relation, lockmode, false, false,
- RangeVarCallbackForAlterTableNamespace,
- (void *) &stmttype);
+ relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
+ false, false,
+ RangeVarCallbackForAlterRelation,
+ (void *) stmt);
rel = relation_open(relid, NoLock);
oldNspOid = RelationGetNamespace(rel);
@@ -9507,7 +9291,7 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
}
/* get schema OID and check its permissions */
- nspOid = LookupCreationNamespace(newschema);
+ nspOid = LookupCreationNamespace(stmt->newschema);
/* common checks on switching namespaces */
CheckSetNamespace(oldNspOid, nspOid, RelationRelationId, relid);
@@ -9524,7 +9308,8 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
if (rel->rd_rel->relkind == RELKIND_RELATION)
{
AlterIndexNamespaces(classRel, rel, oldNspOid, nspOid);
- AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, newschema, lockmode);
+ AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, stmt->newschema,
+ AccessExclusiveLock);
AlterConstraintNamespaces(relid, oldNspOid, nspOid, false);
}
@@ -9891,3 +9676,118 @@ AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid,
}
}
}
+
+/*
+ * Common RangeVarGetRelid callback for rename, set schema, and alter table
+ * processing.
+ */
+static void
+RangeVarCallbackForAlterRelation(const RangeVar *rv, Oid relid, Oid oldrelid,
+ void *arg)
+{
+ Node *stmt = (Node *) arg;
+ ObjectType reltype;
+ HeapTuple tuple;
+ Form_pg_class classform;
+ AclResult aclresult;
+ char relkind;
+
+ tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+ if (!HeapTupleIsValid(tuple))
+ return; /* concurrently dropped */
+ classform = (Form_pg_class) GETSTRUCT(tuple);
+ relkind = classform->relkind;
+
+ /* Must own relation. */
+ if (!pg_class_ownercheck(relid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, rv->relname);
+
+ /* No system table modifications unless explicitly allowed. */
+ if (!allowSystemTableMods && IsSystemClass(classform))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied: \"%s\" is a system catalog",
+ rv->relname)));
+
+ /*
+ * Extract the specified relation type from the statement parse tree.
+ *
+ * Also, for ALTER .. RENAME, check permissions: the user must (still)
+ * have CREATE rights on the containing namespace.
+ */
+ if (IsA(stmt, RenameStmt))
+ {
+ aclresult = pg_namespace_aclcheck(classform->relnamespace,
+ GetUserId(), ACL_CREATE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
+ get_namespace_name(classform->relnamespace));
+ reltype = ((RenameStmt *) stmt)->renameType;
+ }
+ else if (IsA(stmt, AlterObjectSchemaStmt))
+ reltype = ((AlterObjectSchemaStmt *) stmt)->objectType;
+ else if (IsA(stmt, AlterTableStmt))
+ reltype = ((AlterTableStmt *) stmt)->relkind;
+ else
+ {
+ reltype = OBJECT_TABLE; /* placate compiler */
+ elog(ERROR, "unrecognized node type: %d", (int) nodeTag(stmt));
+ }
+
+ /*
+ * For compatibility with prior releases, we allow ALTER TABLE to be
+ * used with most other types of relations (but not composite types).
+ * We allow similar flexibility for ALTER INDEX in the case of RENAME,
+ * but not otherwise. Otherwise, the user must select the correct form
+ * of the command for the relation at issue.
+ */
+ if (reltype == OBJECT_SEQUENCE && relkind != RELKIND_SEQUENCE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a sequence", rv->relname)));
+
+ if (reltype == OBJECT_VIEW && relkind != RELKIND_VIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a view", rv->relname)));
+
+ if (reltype == OBJECT_FOREIGN_TABLE && relkind != RELKIND_FOREIGN_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a foreign table", rv->relname)));
+
+ if (reltype == OBJECT_TYPE && relkind != RELKIND_COMPOSITE_TYPE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a composite type", rv->relname)));
+
+ if (reltype == OBJECT_INDEX && relkind != RELKIND_INDEX
+ && !IsA(stmt, RenameStmt))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not an index", rv->relname)));
+
+ /*
+ * Don't allow ALTER TABLE on composite types. We want people to use ALTER
+ * TYPE for that.
+ */
+ if (reltype != OBJECT_TYPE && relkind == RELKIND_COMPOSITE_TYPE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a composite type", rv->relname),
+ errhint("Use ALTER TYPE instead.")));
+
+ /*
+ * Don't allow ALTER TABLE .. SET SCHEMA on relations that can't be
+ * moved to a different schema, such as indexes and TOAST tables.
+ */
+ if (IsA(stmt, AlterObjectSchemaStmt) && relkind != RELKIND_RELATION
+ && relkind != RELKIND_VIEW && relkind != RELKIND_SEQUENCE
+ && relkind != RELKIND_FOREIGN_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a table, view, sequence, or foreign table",
+ rv->relname)));
+
+ ReleaseSysCache(tuple);
+}
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 36d9edc..329c329 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -699,12 +699,23 @@ standard_ProcessUtility(Node *parsetree,
case T_AlterTableStmt:
{
+ AlterTableStmt *atstmt = (AlterTableStmt *) parsetree;
+ Oid relid;
List *stmts;
ListCell *l;
+ LOCKMODE lockmode;
+
+ /*
+ * Figure out lock mode, and acquire lock. This also does
+ * basic permissions checks, so that we won't wait for a lock
+ * on (for example) a relation on which we have no
+ * permissions.
+ */
+ lockmode = AlterTableGetLockLevel(atstmt->cmds);
+ relid = AlterTableLookupRelation(atstmt, lockmode);
/* Run parse analysis ... */
- stmts = transformAlterTableStmt((AlterTableStmt *) parsetree,
- queryString);
+ stmts = transformAlterTableStmt(atstmt, queryString);
/* ... and do it */
foreach(l, stmts)
@@ -714,7 +725,7 @@ standard_ProcessUtility(Node *parsetree,
if (IsA(stmt, AlterTableStmt))
{
/* Do the table alteration proper */
- AlterTable((AlterTableStmt *) stmt);
+ AlterTable(relid, lockmode, (AlterTableStmt *) stmt);
}
else
{
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 20632eb..9a94861 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -24,7 +24,9 @@ extern Oid DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId);
extern void RemoveRelations(DropStmt *drop);
-extern void AlterTable(AlterTableStmt *stmt);
+extern Oid AlterTableLookupRelation(AlterTableStmt *stmt, LOCKMODE lockmode);
+
+extern void AlterTable(Oid relid, LOCKMODE lockmode, AlterTableStmt *stmt);
extern LOCKMODE AlterTableGetLockLevel(List *cmds);
@@ -32,8 +34,7 @@ extern void ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, L
extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
-extern void AlterTableNamespace(RangeVar *relation, const char *newschema,
- ObjectType stmttype, LOCKMODE lockmode);
+extern void AlterTableNamespace(AlterObjectSchemaStmt *stmt);
extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
Oid oldNspOid, Oid newNspOid,
On Mon, Dec 19, 2011 at 11:52:54PM -0500, Robert Haas wrote:
After staring at this for quite a while longer, it seemed to me that
the logic for renaming a relation was similar enough to the logic for
changing a schema that the two calbacks could reasonably be combined
using a bit of conditional logic; and that, further, the same callback
could be used, with a small amount of additional modification, for
ALTER TABLE. Here's a patch to do that.
Nice.
I also notice that cluster() - which doesn't have a callback - has
exactly the same needs as ReindexRelation() - which does. So that
case can certainly share code; though I'm not quite sure what to call
the shared callback, or which file to put it in.
RangeVarCallbackForStorageRewrite?
I'd put it in tablecmds.c and name it RangeVarCallbackOwnsTable.
A few things on the patch:
--- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c
@@ -2560,90 +2500,26 @@ CheckTableNotInUse(Relation rel, const char *stmt) * Thanks to the magic of MVCC, an error anywhere along the way rolls back * the whole operation; we don't have to do anything special to clean up. * - * We lock the table as the first action, with an appropriate lock level + * The caller must lock the relation, with an appropriate lock level * for the subcommands requested. Any subcommand that needs to rewrite * tuples in the table forces the whole command to be executed with - * AccessExclusiveLock. If all subcommands do not require rewrite table - * then we may be able to use lower lock levels. We pass the lock level down + * AccessExclusiveLock (actually, that is currently required always, but + * we hope to relax it at some point). We pass the lock level down * so that we can apply it recursively to inherited tables. Note that the - * lock level we want as we recurse may well be higher than required for + * lock level we want as we recurse might well be higher than required for * that specific subcommand. So we pass down the overall lock requirement, * rather than reassess it at lower levels. */ void -AlterTable(AlterTableStmt *stmt) +AlterTable(Oid relid, LOCKMODE lockmode, AlterTableStmt *stmt) { Relation rel; - LOCKMODE lockmode = AlterTableGetLockLevel(stmt->cmds);- /* - * Acquire same level of lock as already acquired during parsing. - */ - rel = relation_openrv(stmt->relation, lockmode); + /* Caller is required to provide an adequate lock. */ + rel = relation_open(relid, NoLock);CheckTableNotInUse(rel, "ALTER TABLE");
- /* Check relation type against type specified in the ALTER command */
- switch (stmt->relkind)
- {
- case OBJECT_TABLE:
-
- /*
- * For mostly-historical reasons, we allow ALTER TABLE to apply to
- * almost all relation types.
- */
- if (rel->rd_rel->relkind == RELKIND_COMPOSITE_TYPE
- || rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table",
- RelationGetRelationName(rel))));
RangeVarCallbackForAlterRelation() does not preserve ALTER TABLE's refusal to
operate on foreign tables.
- break;
-
- case OBJECT_INDEX:
- if (rel->rd_rel->relkind != RELKIND_INDEX)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not an index",
- RelationGetRelationName(rel))));
- break;
-
- case OBJECT_SEQUENCE:
- if (rel->rd_rel->relkind != RELKIND_SEQUENCE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a sequence",
- RelationGetRelationName(rel))));
- break;
-
- case OBJECT_TYPE:
- if (rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a composite type",
- RelationGetRelationName(rel))));
- break;
-
- case OBJECT_VIEW:
- if (rel->rd_rel->relkind != RELKIND_VIEW)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a view",
- RelationGetRelationName(rel))));
- break;
-
- case OBJECT_FOREIGN_TABLE:
- if (rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a foreign table",
- RelationGetRelationName(rel))));
- break;
-
- default:
- elog(ERROR, "unrecognized object type: %d", (int) stmt->relkind);
RangeVarCallbackForAlterRelation() does not preserve the check for unexpected
object types.
- }
-
ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt),
lockmode);
}
--- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -699,12 +699,23 @@ standard_ProcessUtility(Node *parsetree,case T_AlterTableStmt: { + AlterTableStmt *atstmt = (AlterTableStmt *) parsetree; + Oid relid; List *stmts; ListCell *l; + LOCKMODE lockmode; + + /* + * Figure out lock mode, and acquire lock. This also does + * basic permissions checks, so that we won't wait for a lock + * on (for example) a relation on which we have no + * permissions. + */ + lockmode = AlterTableGetLockLevel(atstmt->cmds); + relid = AlterTableLookupRelation(atstmt, lockmode);/* Run parse analysis ... */ - stmts = transformAlterTableStmt((AlterTableStmt *) parsetree, - queryString); + stmts = transformAlterTableStmt(atstmt, queryString);
utility.c doesn't take locks for any other command; parse analysis usually
does that. To preserve that modularity, you could add a "bool toplevel"
argument to transformAlterTableStmt(). Pass true here, false in
ATPostAlterTypeParse(). If true, use AlterTableLookupRelation() to get full
security checks. Otherwise, just call relation_openrv() as now. Would that
be an improvement?
/* ... and do it */
foreach(l, stmts)
nm
On Tue, Dec 20, 2011 at 8:14 PM, Noah Misch <noah@leadboat.com> wrote:
I also notice that cluster() - which doesn't have a callback - has
exactly the same needs as ReindexRelation() - which does. So that
case can certainly share code; though I'm not quite sure what to call
the shared callback, or which file to put it in.
RangeVarCallbackForStorageRewrite?I'd put it in tablecmds.c and name it RangeVarCallbackOwnsTable.
OK.
RangeVarCallbackForAlterRelation() does not preserve ALTER TABLE's refusal to
operate on foreign tables.
I should probably fix that, but I'm wondering if I ought to fix it by
disallowing the use of ALTER TABLE on FOREIGN TABLEs for the other
commands that share the callback as well. Allowing ALTER TABLE to
apply to any relation type is mostly a legacy thing, I think, and any
code that's new enough to know about foreign tables isn't old enough
to know about the time when you had to use ALTER TABLE to rename
views.
RangeVarCallbackForAlterRelation() does not preserve the check for unexpected
object types.
I don't feel a strong need to retain that.
utility.c doesn't take locks for any other command; parse analysis usually
does that. To preserve that modularity, you could add a "bool toplevel"
argument to transformAlterTableStmt(). Pass true here, false in
ATPostAlterTypeParse(). If true, use AlterTableLookupRelation() to get full
security checks. Otherwise, just call relation_openrv() as now. Would that
be an improvement?
Not sure. I feel that it's unwise to pass relation names all over the
backend and assume that nothing will change meanwhile; no locking we
do will prevent that, at least in the case of search path
interposition. Ultimately I think this ought to be restructured
somehow so that we look up each name ONCE and ever-after refer only to
the resulting OID (except for error message text). But I'm not sure
how to do that, and thought it might make sense to commit this much
independently of such a refactoring.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On Wed, Dec 21, 2011 at 03:16:39PM -0500, Robert Haas wrote:
On Tue, Dec 20, 2011 at 8:14 PM, Noah Misch <noah@leadboat.com> wrote:
RangeVarCallbackForAlterRelation() does not preserve ALTER TABLE's refusal to
operate on foreign tables.I should probably fix that, but I'm wondering if I ought to fix it by
disallowing the use of ALTER TABLE on FOREIGN TABLEs for the other
commands that share the callback as well. Allowing ALTER TABLE to
apply to any relation type is mostly a legacy thing, I think, and any
code that's new enough to know about foreign tables isn't old enough
to know about the time when you had to use ALTER TABLE to rename
views.
Maybe. Now that we have a release with these semantics, I'd lean toward
preserving the wart and being more careful next time. It's certainly a
borderline case, though.
RangeVarCallbackForAlterRelation() does not preserve the check for unexpected
object types.I don't feel a strong need to retain that.
Okay.
utility.c doesn't take locks for any other command; parse analysis usually
does that. ?To preserve that modularity, you could add a "bool toplevel"
argument to transformAlterTableStmt(). ?Pass true here, false in
ATPostAlterTypeParse(). ?If true, use AlterTableLookupRelation() to get full
security checks. ?Otherwise, just call relation_openrv() as now. ?Would that
be an improvement?Not sure. I feel that it's unwise to pass relation names all over the
backend and assume that nothing will change meanwhile; no locking we
do will prevent that, at least in the case of search path
interposition. Ultimately I think this ought to be restructured
somehow so that we look up each name ONCE and ever-after refer only to
the resulting OID (except for error message text). But I'm not sure
how to do that, and thought it might make sense to commit this much
independently of such a refactoring.
I agree with all that, though my suggestion would not have increased the
number of by-name lookups.