restructuring "alter table" privilege checks (was: remove redundant ownership checks)

Started by Robert Haasalmost 16 years ago10 messages
#1Robert Haas
robertmhaas@gmail.com

On Thu, Dec 17, 2009 at 10:09 PM, Stephen Frost <sfrost@snowman.net> wrote:

* Tom Lane (tgl@sss.pgh.pa.us) wrote:

KaiGai Kohei <kaigai@ak.jp.nec.com> writes:

[ patch to remove EnableDisableRule's permissions check ]

I don't particularly like this patch, mainly because I disagree with
randomly removing permissions checks without any sort of plan about
where they ought to go.

The goal of this was to increase consistancy with the rest of the code,
in particular, ATPrepCmd checks ownership rights on the table, and
anything which wants to check permissions beyond that has to do it
independently.  Do I like that?  No, not really.

After you posted this, I agreed with you a time or two that this was
kooky, but I spent some time yesterday and today reading through
tablecmds.c, and I've somewhat revised my opinion. It seems to me
that the ALTER TABLE permissions checks can be basically divided into
two parts: (A) checks on the relation that is being altered, and (B)
checks on other objects that are involved in whatever operation is
being performed.

Right now, the part-A logic is partially centralized in ATPrepCmd()
and partially spread throughout the rest of the code, and the part-B
logic is all over the place. If we want ALL the permissions checking
for any given ALTER TABLE command to happen in ONE place, we're going
to have to do one of two things, neither of which is currently making
sense to me. One, we could move all of the checks that are now done
in ATPrepCmd() down into the constituent ATPrep* functions and do all
the checks at once at the point when we have enough information. But
that almost seems like it's going backwards - we're spreading out
checks that are now (kind of) centralized. And, as Tom has pointed
out, it means doing more work before we decide whether we even had
permission to take the relation lock. Two, we could try to pull all
the permission checks that are NOT in ATPrepCmd() back in. But that
hardly seems feasible - we don't have enough information at that
point.

So, what if we treat these two cases separately? The part-B checks -
on the other operations involved in ALTER TABLE - are by definition
idiosyncratic. What type of object we're checking and what permission
we're checking for is inextricably bound up with what kind of ALTER
TABLE operation we're trying to perform. So, that's going to be hard
to centralize. But the part-A checks - on the relation itself - seem
like they could be consolidated. The reason why they are spread out
right now is mostly because of relatively minor deviations from what
ATSimplePermissions does.

A1. ATSimplePermissionsRelationOrIndex(), which a few of the ALTER
TABLE subcommands use, is exactly the same as ATSimplePermissions(),
except that it allows indices as well.
A2. ATSetStatistics() and ATSetDistinct() are also similar, but they
both allow indices and also skip the defenses against system table
modification.
A3. ATExecChangeOwner() emits a warning indices and then performs no
action (for backwards compatibility), rather than emitting an error as
ATSimplePermissions() would do. It also doesn't check permission if
the old and new owner are the same.
A4. ATExecEnableDisableTrigger() and ATExecEnableDisableRule() call,
respectively, EnableDisableTrigger and EnableDisableRule, each of
which does a redundant permissions check.

I believe that everything else just calls ATSimplePermissions(),
though it's possible I've missed something. It strikes me that if we
changed the signature for ATSimplePermissions, we could eliminate
A1-A3 (and A4 is trivial):

static void ATSimplePermissions(Relation rel, AlterTableCmd *cmd);

The plan would be to move the knowledge of which operations require
special treatment (allowing indices, system tables, etc.) into
ATSimplePermissions() and then just calling it unconditionally for ALL
object types. ATSimplePermissionsRelationOrIndex() would go away.
ATExecChangeOwner() would require some refactoring, but I think it
would end up being simpler than it is now. I also think it would be
more clear which checks are being applied to which object types.

Just to enumerate the part-B permissions checks, that is, permissions
checks on objects other than the table to which ALTER TABLE is being
directly applied, the ones I found were:

B1. ATAddForeignKeyConstrants() checks for REFERENCES permission on
the two tables involved.
B2. ATExecDropColumn() and ATExecDropConstraint() check for permission
to perform the drop on the child tables to which they decide to
recurse.
B3. ATExecAddInherit() checks permissions on the new parent.
B4. ATPrepSetTablespace() checks for permission to use the new tablespace.
B5. ATExecAddIndex() calls DefineIndex(), which also checks for rights
on the new namespace.

B2 and B3 are actually implemented at present using
ATSimplePermissions, and I think we could keep it that way under the
proposed signature change, with only minor refactoring. The others
are basically all special cases, but there aren't actually that many
of them.

It may also be worth refactoring is ATAddCheckConstraint(), which
currently does its own recursion only so that the constraint name at
the top of the inheritance hierarchy propagates all the way down
unchanged. I wonder if it would be cleaner/possible to work out the
constraint name earlier and then just use the standard recursion
mechanism.

We also have a few bits and pieces of ALTER TABLE that do not go
through ATPrepCmd() at all and therefore need to be considered
separately from the above. It looks like ALTER TABLE ... RENAME and
ALTER TABLE ... RENAME COLUMN go through ExecRenameStmt().
ExecRenameStmt() generally leaves permissions-checking to the
RenameWhatever() functions, but for RenameRelation(), renameatt(), and
renametrig() it makes an exception and does some of the work here. I
think we should push that logic back down into the constituent
functions so that this goes back to being just a dispatch function;
while we're at it, I think we should change renameatt() and
renametrig() to use the same naming convention as the rest of these
functions. It's not really buying us anything to do the check here;
it's just making it harder to find all the checks - for example, it
looks to me like the check done here is redundant with one being
performed in renameatt() anyway. Interestingly, renameatt() also has
a comment that says "this would normally be done in utility.c, but
this particular routine is recursive". That comment doesn't reflect
the way things are actually laid out, though.

ALTER TABLE ... SET SCHEMA has the same issue:
ExecAlterObjectSchemaStmt normally just calls
AlterWhateverNamespace(), but in the case where Whatever is Table, it
first calls CheckRelationOwnership(). So I think we should go ahead
and push this check down too, to match all the others.

Why do this? Well, I think the end game is that it would be nice to
provide a finite number of well-defined control points that need to be
modified to add or change permission checks. Of course, a lot more
work will be needed to do that in its entirety - this email only
addresses the ALTER TABLE stuff, and I'm not even 100% positive that
I've found everything. But I think that's part of the problem too -
as we clean this up, it will become feasible to actually list what the
control points for different permissions checks are.

Thoughts? Comments? Blistering flames of agonizing death?

...Robert

#2Tom Lane
tgl@sss.pgh.pa.us
In reply to: Robert Haas (#1)
Re: restructuring "alter table" privilege checks (was: remove redundant ownership checks)

Robert Haas <robertmhaas@gmail.com> writes:

* Tom Lane (tgl@sss.pgh.pa.us) wrote:

I don't particularly like this patch, mainly because I disagree with
randomly removing permissions checks without any sort of plan about
where they ought to go.

[ a plan for rearranging ALTER TABLE's checks ]

Works for me. All I asked for was plan first, code second, and you've
satisfied the precondition.

(I would venture that changing this stuff is probably 9.1 material at
this point, though.)

regards, tom lane

#3KaiGai Kohei
kaigai@kaigai.gr.jp
In reply to: Robert Haas (#1)
Re: restructuring "alter table" privilege checks (was: remove redundant ownership checks)

(2010/01/23 1:27), Robert Haas wrote:

So, what if we treat these two cases separately? The part-B checks -
on the other operations involved in ALTER TABLE - are by definition
idiosyncratic. What type of object we're checking and what permission
we're checking for is inextricably bound up with what kind of ALTER
TABLE operation we're trying to perform. So, that's going to be hard
to centralize. But the part-A checks - on the relation itself - seem
like they could be consolidated. The reason why they are spread out
right now is mostly because of relatively minor deviations from what
ATSimplePermissions does.

I agree with the top half of this proposition. ALTER TABLE has most
various kind of options in PostgreSQL, so some of options are not
feasible to complete all the needed checks in ATPrepCmd() stage,
such as AT_AddIndex.

However, it is unclear for me whether the revised ATSimplePermissions()
provide cleaner code than currently we have, because it also needs
a big switch ... case statement within.

Am I misunderstanding something?

A1. ATSimplePermissionsRelationOrIndex(), which a few of the ALTER
TABLE subcommands use, is exactly the same as ATSimplePermissions(),
except that it allows indices as well.
A2. ATSetStatistics() and ATSetDistinct() are also similar, but they
both allow indices and also skip the defenses against system table
modification.
A3. ATExecChangeOwner() emits a warning indices and then performs no
action (for backwards compatibility), rather than emitting an error as
ATSimplePermissions() would do. It also doesn't check permission if
the old and new owner are the same.
A4. ATExecEnableDisableTrigger() and ATExecEnableDisableRule() call,
respectively, EnableDisableTrigger and EnableDisableRule, each of
which does a redundant permissions check.

I believe that everything else just calls ATSimplePermissions(),
though it's possible I've missed something. It strikes me that if we
changed the signature for ATSimplePermissions, we could eliminate
A1-A3 (and A4 is trivial):

static void ATSimplePermissions(Relation rel, AlterTableCmd *cmd);

The plan would be to move the knowledge of which operations require
special treatment (allowing indices, system tables, etc.) into
ATSimplePermissions() and then just calling it unconditionally for ALL
object types. ATSimplePermissionsRelationOrIndex() would go away.
ATExecChangeOwner() would require some refactoring, but I think it
would end up being simpler than it is now. I also think it would be
more clear which checks are being applied to which object types.

I have a different plan to clean up these differences, especially A1 and A2.

The existing ATSimplePermissions() can be also divided into three parts,
as source code comments mentioned.
Aa) It ensures the relation has an expected relkind
Ab) It ensures ownership of the relation to be altered
Ac) It ensures the relation is not system catalog, if not allowSystemTableMods.

If we provide these three checks in separated helper function, like:
Aa) ATCheckRelkind(Relation rel, bool viewOK, bool indexOK)
Ab) ATCheckOwnership(Relation rel)
Ac) ATCheckNoCatalog(Relation rel)

The above A1 and A2 can be rewritten using combination of them, then we can
eliminate these functions to apply special treatments.
For example, the A2 can be replaced by ATCheckRelkind(rel, false, true) and
ATCheckOwnership(rel).

I think it allows to put a logic to decide whether we should apply permission
checks at the ATPrepCmd() stage, or not, in the existing swich ... case branch.

For some of exceptions, we can apply checks for them in the later stage
after the code gathered enough information to make access control decision.

Just to enumerate the part-B permissions checks, that is, permissions
checks on objects other than the table to which ALTER TABLE is being
directly applied, the ones I found were:

B1. ATAddForeignKeyConstrants() checks for REFERENCES permission on
the two tables involved.
B2. ATExecDropColumn() and ATExecDropConstraint() check for permission
to perform the drop on the child tables to which they decide to
recurse.
B3. ATExecAddInherit() checks permissions on the new parent.
B4. ATPrepSetTablespace() checks for permission to use the new tablespace.
B5. ATExecAddIndex() calls DefineIndex(), which also checks for rights
on the new namespace.

B2 and B3 are actually implemented at present using
ATSimplePermissions, and I think we could keep it that way under the
proposed signature change, with only minor refactoring. The others
are basically all special cases, but there aren't actually that many
of them.

It may also be worth refactoring is ATAddCheckConstraint(), which
currently does its own recursion only so that the constraint name at
the top of the inheritance hierarchy propagates all the way down
unchanged. I wonder if it would be cleaner/possible to work out the
constraint name earlier and then just use the standard recursion
mechanism.

Isn't it possible to check whether the given constraint is CHECK()
or FOREIGN KEY in the ATPrepCmd() stage, and assign individual
cmd->subtype? If CONSTR_FOREIGN, it will never recursion.

In this case, we can apply checks in ATPrepCmd() stage for CONSTR_CHECK.

We also have a few bits and pieces of ALTER TABLE that do not go
through ATPrepCmd() at all and therefore need to be considered
separately from the above. It looks like ALTER TABLE ... RENAME and
ALTER TABLE ... RENAME COLUMN go through ExecRenameStmt().
ExecRenameStmt() generally leaves permissions-checking to the
RenameWhatever() functions, but for RenameRelation(), renameatt(), and
renametrig() it makes an exception and does some of the work here. I
think we should push that logic back down into the constituent
functions so that this goes back to being just a dispatch function;
while we're at it, I think we should change renameatt() and
renametrig() to use the same naming convention as the rest of these
functions. It's not really buying us anything to do the check here;
it's just making it harder to find all the checks - for example, it
looks to me like the check done here is redundant with one being
performed in renameatt() anyway. Interestingly, renameatt() also has
a comment that says "this would normally be done in utility.c, but
this particular routine is recursive". That comment doesn't reflect
the way things are actually laid out, though.

They calls CheckRelationOwnership() just before RenameRelation(),
renameatt() and renametrig(). It applies same checks except for sanity
checks in relkind.
But here is no reason why we cannot rewrite them using ATCheckOwnership()
and ATCheckNoCatalog() in tablecmd.c, if ATSimplePermissions() are divided.

ALTER TABLE ... SET SCHEMA has the same issue:
ExecAlterObjectSchemaStmt normally just calls
AlterWhateverNamespace(), but in the case where Whatever is Table, it
first calls CheckRelationOwnership(). So I think we should go ahead
and push this check down too, to match all the others.

I agree.

Why do this? Well, I think the end game is that it would be nice to
provide a finite number of well-defined control points that need to be
modified to add or change permission checks. Of course, a lot more
work will be needed to do that in its entirety - this email only
addresses the ALTER TABLE stuff, and I'm not even 100% positive that
I've found everything. But I think that's part of the problem too -
as we clean this up, it will become feasible to actually list what the
control points for different permissions checks are.

It is a good idea to consolidate permission check into tablecmd.c from
various kind of core routines.
And, it also makes the basis clear. The permission check should be applied
after the code path gathered enough information to make its access control
decision as early as possible.

Thanks,
--
KaiGai Kohei <kaigai@kaigai.gr.jp>

#4Robert Haas
robertmhaas@gmail.com
In reply to: KaiGai Kohei (#3)
Re: restructuring "alter table" privilege checks (was: remove redundant ownership checks)

On Sat, Jan 23, 2010 at 2:17 AM, KaiGai Kohei <kaigai@kaigai.gr.jp> wrote:

However, it is unclear for me whether the revised ATSimplePermissions()
provide cleaner code than currently we have, because it also needs
a big switch ... case statement within.

Am I misunderstanding something?

Well, not everyone is going to agree on what "cleaner code" means in
every case, but the reason that I like my design better is because it
moves all of the decision making out of ATPrepCmd() into
ATSimplePermissions(). What you're proposing would mean that
ATPrepCmd() would basically continue to know everything about which
operations need which permissions checks, which I don't think is going
to scale very well to alternative security providers, if we eventually
decide to support such a thing.

It may also be worth refactoring is ATAddCheckConstraint(), which
currently does its own recursion only so that the constraint name at
the top of the inheritance hierarchy propagates all the way down
unchanged.  I wonder if it would be cleaner/possible to work out the
constraint name earlier and then just use the standard recursion
mechanism.

Isn't it possible to check whether the given constraint is CHECK()
or FOREIGN KEY in the ATPrepCmd() stage, and assign individual
cmd->subtype? If CONSTR_FOREIGN, it will never recursion.

In this case, we can apply checks in ATPrepCmd() stage for CONSTR_CHECK.

I don't understand what you're saying here.

It is a good idea to consolidate permission check into tablecmd.c from
various kind of core routines.
And, it also makes the basis clear. The permission check should be applied
after the code path gathered enough information to make its access control
decision as early as possible.

And just as importantly, in a consistent place.

...Robert

#5KaiGai Kohei
kaigai@kaigai.gr.jp
In reply to: Robert Haas (#4)
Re: restructuring "alter table" privilege checks (was: remove redundant ownership checks)

(2010/01/24 9:08), Robert Haas wrote:

On Sat, Jan 23, 2010 at 2:17 AM, KaiGai Kohei<kaigai@kaigai.gr.jp> wrote:

However, it is unclear for me whether the revised ATSimplePermissions()
provide cleaner code than currently we have, because it also needs
a big switch ... case statement within.

Am I misunderstanding something?

Well, not everyone is going to agree on what "cleaner code" means in
every case, but the reason that I like my design better is because it
moves all of the decision making out of ATPrepCmd() into
ATSimplePermissions(). What you're proposing would mean that
ATPrepCmd() would basically continue to know everything about which
operations need which permissions checks, which I don't think is going
to scale very well to alternative security providers, if we eventually
decide to support such a thing.

Hmm. Indeed, the existing ATPrepCmd() closely combines the permission
checks and controls of code path (inheritance recursion and AT_PASS_*),
and it is worthwhile to divide these independent logic into two.

In your plan, where the new ATSimplePermissions() should be called?

If we still call it from the ATPrepCmd() stage, it needs to apply
special treatments some of operations with part-B logic.

One other candidate of the entrypoint is the head of each ATExecXXXX()
functions. In this case, we have already gathered enough information
in B2(ATExecDropColumn, ATExecDropConstraint), B3(ATExecAddInherit)
and B4(ATPrepSetTablespace). In B1(ATAddForeignKeyConstrants), we
will need a bit more steps to collect information, but we can call
it just after all the needed information.
We can also have enough information at the head of B5(ATExecAddIndex).
However, my preference is to apply checks at the DefineIndex() when
"check_rights" equals true, because it also allows to consolidate
permission checks in ProcessUtility() with T_IndexStmt.

It may also be worth refactoring is ATAddCheckConstraint(), which
currently does its own recursion only so that the constraint name at
the top of the inheritance hierarchy propagates all the way down
unchanged. I wonder if it would be cleaner/possible to work out the
constraint name earlier and then just use the standard recursion
mechanism.

Isn't it possible to check whether the given constraint is CHECK()
or FOREIGN KEY in the ATPrepCmd() stage, and assign individual
cmd->subtype? If CONSTR_FOREIGN, it will never recursion.

In this case, we can apply checks in ATPrepCmd() stage for CONSTR_CHECK.

I don't understand what you're saying here.

I'd like to introduce it using a pseudo code.

Currently, in ATPrepCmd(),

| case AT_AddConstraint: /* ADD CONSTRAINT */
| ATSimplePermissions(rel, false);
| /* Recursion occurs during execution phase */
| /* No command-specific prep needed except saving recurse flag */
| if (recurse)
| cmd->subtype = AT_AddConstraintRecurse;
| pass = AT_PASS_ADD_CONSTR;
| break;

What I said is:

| case AT_AddConstraint: /* ADD CONSTRAINT */
| {
| Constraint *newCons = (Constraint *)cmd->def;
| if (newCons->contype == CONSTR_CHECK)
| {
| ATSimplePermissions(rel, false);
| if (recurse)
| ATSimpleRecursion(wqueue, rel, cmd, recurse);
| cmd->subtype = AT_AddCheckContraint;
| }
| else if (newCond->contype == CONSTR_FOREIGN)
| {
| /* Permission checks are applied during execution stage */
| /* And, it never recurse */
| cmd->subtype = AT_AddFKConstraint;
| }
| else
| elog(ERROR, "unrecognized constraint type");
|
| pass = AT_PASS_ADD_CONSTR;
| break;
| }

It will allow to eliminate self recursion in ATAddCheckConstraint() and
to apply permission checks for new CHECK constraint in ATPrepCmd() phase.

Perhaps, it may be consolidated to ATPrepAddConstraint().

It is a good idea to consolidate permission check into tablecmd.c from
various kind of core routines.
And, it also makes the basis clear. The permission check should be applied
after the code path gathered enough information to make its access control
decision as early as possible.

And just as importantly, in a consistent place.

I agree.

Thanks,
--
KaiGai Kohei <kaigai@kaigai.gr.jp>

#6Robert Haas
robertmhaas@gmail.com
In reply to: KaiGai Kohei (#5)
Re: restructuring "alter table" privilege checks (was: remove redundant ownership checks)

On Sat, Jan 23, 2010 at 8:33 PM, KaiGai Kohei <kaigai@kaigai.gr.jp> wrote:

(2010/01/24 9:08), Robert Haas wrote:

On Sat, Jan 23, 2010 at 2:17 AM, KaiGai Kohei<kaigai@kaigai.gr.jp>  wrote:

However, it is unclear for me whether the revised ATSimplePermissions()
provide cleaner code than currently we have, because it also needs
a big switch ... case statement within.

Am I misunderstanding something?

Well, not everyone is going to agree on what "cleaner code" means in
every case, but the reason that I like my design better is because it
moves all of the decision making out of ATPrepCmd() into
ATSimplePermissions().  What you're proposing would mean that
ATPrepCmd() would basically continue to know everything about which
operations need which permissions checks, which I don't think is going
to scale very well to alternative security providers, if we eventually
decide to support such a thing.

Hmm. Indeed, the existing ATPrepCmd() closely combines the permission
checks and controls of code path (inheritance recursion and AT_PASS_*),
and it is worthwhile to divide these independent logic into two.

Yeah, that's what I thought, too.

In your plan, where the new ATSimplePermissions() should be called?

From ATPrepCmd(), just before the big switch statement.

If we still call it from the ATPrepCmd() stage, it needs to apply
special treatments some of operations with part-B logic.

Not sure if I understand what you mean. ATSimplePermissions() won't
be responsible for applying permissions checks related to other
objects upon which the command is operating (e.g. the other table, if
adding a foreign key). It will however be responsible for knowing
everything about which permission checks to apply to the main table
involved, which will require some special-case logic for certain
command types.

One other candidate of the entrypoint is the head of each ATExecXXXX()
functions. [...snip...]

I don't think this is a good idea. Calling it in just one place seems
less error-prone and easier to audit.

It may also be worth refactoring is ATAddCheckConstraint(), which
currently does its own recursion only so that the constraint name at
the top of the inheritance hierarchy propagates all the way down
unchanged.  I wonder if it would be cleaner/possible to work out the
constraint name earlier and then just use the standard recursion
mechanism.

Isn't it possible to check whether the given constraint is CHECK()
or FOREIGN KEY in the ATPrepCmd() stage, and assign individual
cmd->subtype? If CONSTR_FOREIGN, it will never recursion.

In this case, we can apply checks in ATPrepCmd() stage for CONSTR_CHECK.

I don't understand what you're saying here.

I'd like to introduce it using a pseudo code.

Currently, in ATPrepCmd(),

|    case AT_AddConstraint:  /* ADD CONSTRAINT */
|        ATSimplePermissions(rel, false);
|        /* Recursion occurs during execution phase */
|        /* No command-specific prep needed except saving recurse flag */
|        if (recurse)
|            cmd->subtype = AT_AddConstraintRecurse;
|        pass = AT_PASS_ADD_CONSTR;
|        break;

What I said is:

|    case AT_AddConstraint:  /* ADD CONSTRAINT */
|    {
|        Constraint   *newCons = (Constraint *)cmd->def;
|        if (newCons->contype == CONSTR_CHECK)
|        {
|            ATSimplePermissions(rel, false);
|            if (recurse)
|                ATSimpleRecursion(wqueue, rel, cmd, recurse);
|            cmd->subtype = AT_AddCheckContraint;
|        }
|        else if (newCond->contype == CONSTR_FOREIGN)
|        {
|            /* Permission checks are applied during execution stage */
|            /* And, it never recurse */
|            cmd->subtype = AT_AddFKConstraint;
|        }
|        else
|            elog(ERROR, "unrecognized constraint type");
|
|        pass = AT_PASS_ADD_CONSTR;
|        break;
|    }

It will allow to eliminate self recursion in ATAddCheckConstraint() and
to apply permission checks for new CHECK constraint in ATPrepCmd() phase.

Perhaps, it may be consolidated to ATPrepAddConstraint().

I don't really see that this gains us anything.

...Robert

#7KaiGai Kohei
kaigai@kaigai.gr.jp
In reply to: Robert Haas (#6)
Re: restructuring "alter table" privilege checks

(2010/01/24 11:27), Robert Haas wrote:

On Sat, Jan 23, 2010 at 8:33 PM, KaiGai Kohei<kaigai@kaigai.gr.jp> wrote:

(2010/01/24 9:08), Robert Haas wrote:

On Sat, Jan 23, 2010 at 2:17 AM, KaiGai Kohei<kaigai@kaigai.gr.jp> wrote:

However, it is unclear for me whether the revised ATSimplePermissions()
provide cleaner code than currently we have, because it also needs
a big switch ... case statement within.

Am I misunderstanding something?

Well, not everyone is going to agree on what "cleaner code" means in
every case, but the reason that I like my design better is because it
moves all of the decision making out of ATPrepCmd() into
ATSimplePermissions(). What you're proposing would mean that
ATPrepCmd() would basically continue to know everything about which
operations need which permissions checks, which I don't think is going
to scale very well to alternative security providers, if we eventually
decide to support such a thing.

Hmm. Indeed, the existing ATPrepCmd() closely combines the permission
checks and controls of code path (inheritance recursion and AT_PASS_*),
and it is worthwhile to divide these independent logic into two.

Yeah, that's what I thought, too.

In your plan, where the new ATSimplePermissions() should be called?

From ATPrepCmd(), just before the big switch statement.

If we still call it from the ATPrepCmd() stage, it needs to apply
special treatments some of operations with part-B logic.

Not sure if I understand what you mean. ATSimplePermissions() won't
be responsible for applying permissions checks related to other
objects upon which the command is operating (e.g. the other table, if
adding a foreign key). It will however be responsible for knowing
everything about which permission checks to apply to the main table
involved, which will require some special-case logic for certain
command types.

One other candidate of the entrypoint is the head of each ATExecXXXX()
functions. [...snip...]

I don't think this is a good idea. Calling it in just one place seems
less error-prone and easier to audit.

Yes, most of the ALTER TABLE options runs ATPrepCmd() except for RENAME
TO and SET SCHEMA, so it is a good candidate to apply less error-prone
permission checks.

The reason why I introduced this alternative idea is from the perspective
of simple basis/concept about where we should apply permission checks,
although it needs larger number of entrypoints compared to head of the
ATPrepCmd().

If we put the new ATSimplePermissions() with all the needed information
just after gathering them at the execution stage, we don't need to have
some of exceptions which takes additional checks except for ownership
on the relation to be altered.

Of course, code simpleness is important. Likewise, I think simpleness in
basis/concept (less number of special treatments) is also important.

I'd like to introduce it using a pseudo code.

:

It will allow to eliminate self recursion in ATAddCheckConstraint() and
to apply permission checks for new CHECK constraint in ATPrepCmd() phase.

Perhaps, it may be consolidated to ATPrepAddConstraint().

I don't really see that this gains us anything.

Hmm, indeed, here is no more benefit except for eliminating one self recursion.

Thanks,
--
KaiGai Kohei <kaigai@kaigai.gr.jp>

#8Robert Haas
robertmhaas@gmail.com
In reply to: KaiGai Kohei (#7)
Re: restructuring "alter table" privilege checks

On Sat, Jan 23, 2010 at 10:11 PM, KaiGai Kohei <kaigai@kaigai.gr.jp> wrote:

If we put the new ATSimplePermissions() with all the needed information
just after gathering them at the execution stage, we don't need to have
some of exceptions which takes additional checks except for ownership
on the relation to be altered.

Maybe I'm still not understanding, but I don't see how you're going to
do this without a massive pile of spaghetti code and a function with
about 12 parameters. Feel free to show some code, but I think this is
a non-starter.

...Robert

#9KaiGai Kohei
kaigai@kaigai.gr.jp
In reply to: Robert Haas (#8)
Re: restructuring "alter table" privilege checks

(2010/01/24 12:16), Robert Haas wrote:

On Sat, Jan 23, 2010 at 10:11 PM, KaiGai Kohei<kaigai@kaigai.gr.jp> wrote:

If we put the new ATSimplePermissions() with all the needed information
just after gathering them at the execution stage, we don't need to have
some of exceptions which takes additional checks except for ownership
on the relation to be altered.

Maybe I'm still not understanding, but I don't see how you're going to
do this without a massive pile of spaghetti code and a function with
about 12 parameters. Feel free to show some code, but I think this is
a non-starter.

Hhmmm,...

Indeed, indeed, if a single function tries to handle all the ALTER TABLE
options, it needs many function arguments, because ALTER TABLE is one of
the most functional statement in PostgreSQL...

If the basis is head of the execution phase, it does not need a big switch
... case branch in ATSimplePermissions, because it is already branched in
ATExecCmd(). However, it also has tradeoff that we have multiple minor version
of functions to check ALTER TABLE permissions.

Perhaps, it may be a good idea to make two conceptual patches both head of
the ATPrepCmd() and ATExec*(). They will make clear good/bad points between
two approaches.

Is it waste of efforts?

Thanks,
--
KaiGai Kohei <kaigai@kaigai.gr.jp>

#10KaiGai Kohei
kaigai@kaigai.gr.jp
In reply to: KaiGai Kohei (#9)
2 attachment(s)
Re: restructuring "alter table" privilege checks

(2010/01/24 12:40), KaiGai Kohei wrote:

Perhaps, it may be a good idea to make two conceptual patches both head of
the ATPrepCmd() and ATExec*(). They will make clear good/bad points between
two approaches.

I tried to make two conceptual patches.

* pgsql-at-rework-prep.1.patch

It adds ATPermCmd(Relation rel, AlterTableCmd *cmd) that is called from the
head of ATPrepCmd(). This function enables to divide the logic of permission
checks depending on cmd->subtype from ATPrepCmd().
In most of subcommand (it does not check permission except for ownership of
the relation to be altered), it calls ATSimplePermissions() or similar.
Or, it does not anything at the stage for rest of exceptions.

Good: Here is only one entrypoint to call ATPermCmd().
Bad: Although most of logics are consolidated into ATPremCmd(), we need to
put individual checks on some of exception cases.

Was it matching with what you suggested? Or, am I missing something?

* pgsql-at-rework-exec.2.patch

It moves permission checks into the head (or just after all needed information
was gathered) of ATExec*() functions. The ATPrepCmd() checks only correctness
of relkind and ensure the relation is not system catalog.
This basis/concept can be applied to ALTER TABLE RENAME TO/SET SCHEMA cases also.

Good: Concept is clear, and less exceptions.
Good: We can apply this concept (just before execution) on other database
objects which does not have explicit preparation stage.
Bad: All the ATExec*() function has to call the permission checks.

My preference is the later approach. Indeed, it has larger number of entrypoints
compared to the ATPermCom() functions, but its concept is clear and we can also
apply same basis on the code path that does not go through ATPrepCmd().

P.S In the right way, this patch should also touch CheckRelationOwnership() or
DefineIndex() logic, but I omitted them because of simplifies.

Thanks,
--
KaiGai Kohei <kaigai@kaigai.gr.jp>

Attachments:

pgsql-at-rework-exec.2.patchtext/x-patch; name=pgsql-at-rework-exec.2.patchDownload
*** a/src/backend/commands/tablecmds.c
--- b/src/backend/commands/tablecmds.c
***************
*** 262,269 **** static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
  static void ATRewriteTables(List **wqueue);
  static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
  static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
! static void ATSimplePermissions(Relation rel, bool allowView);
! static void ATSimplePermissionsRelationOrIndex(Relation rel);
  static void ATSimpleRecursion(List **wqueue, Relation rel,
  				  AlterTableCmd *cmd, bool recurse);
  static void ATOneLevelRecursion(List **wqueue, Relation rel,
--- 262,270 ----
  static void ATRewriteTables(List **wqueue);
  static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
  static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
! static void ATCheckRelkind(Relation rel, bool viewOK, bool indexOK);
! static void ATCheckOwnership(Relation rel);
! static void ATCheckNoCatalog(Relation rel);
  static void ATSimpleRecursion(List **wqueue, Relation rel,
  				  AlterTableCmd *cmd, bool recurse);
  static void ATOneLevelRecursion(List **wqueue, Relation rel,
***************
*** 280,291 **** static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
  				 const char *colName);
  static void ATExecColumnDefault(Relation rel, const char *colName,
  					Node *newDefault);
- static void ATPrepSetStatistics(Relation rel, const char *colName,
- 					Node *newValue);
  static void ATExecSetStatistics(Relation rel, const char *colName,
  					Node *newValue);
- static void ATPrepSetDistinct(Relation rel, const char *colName,
- 					Node *newValue);
  static void ATExecSetDistinct(Relation rel, const char *colName,
  				 Node *newValue);
  static void ATExecSetStorage(Relation rel, const char *colName,
--- 281,288 ----
***************
*** 1929,1942 **** renameatt(Oid myrelid,
  	 *
  	 * normally, only the owner of a class can change its schema.
  	 */
! 	if (!pg_class_ownercheck(myrelid, GetUserId()))
! 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
! 					   RelationGetRelationName(targetrelation));
! 	if (!allowSystemTableMods && IsSystemRelation(targetrelation))
! 		ereport(ERROR,
! 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
! 				 errmsg("permission denied: \"%s\" is a system catalog",
! 						RelationGetRelationName(targetrelation))));
  
  	/*
  	 * if the 'recurse' flag is set then we are supposed to rename this
--- 1926,1934 ----
  	 *
  	 * normally, only the owner of a class can change its schema.
  	 */
! 	ATCheckOwnership(targetrelation);
! 
! 	ATCheckNoCatalog(targetrelation);
  
  	/*
  	 * if the 'recurse' flag is set then we are supposed to rename this
***************
*** 2048,2053 **** RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
--- 2040,2046 ----
  	Relation	targetrelation;
  	Oid			namespaceId;
  	char		relkind;
+ 	AclResult	aclresult;
  
  	/*
  	 * Grab an exclusive lock on the target table, index, sequence or view,
***************
*** 2075,2080 **** RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
--- 2068,2086 ----
  						RelationGetRelationName(targetrelation))));
  
  	/*
+ 	 * Permission checks
+ 	 * XXX - CheckRelationOwnership() should be removed from alter.c
+ 	 */
+ 	ATCheckOwnership(targetrelation);
+ 
+ 	ATCheckNoCatalog(targetrelation);
+ 
+ 	aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(), ACL_CREATE);
+ 	if (aclresult != ACLCHECK_OK)
+ 		aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
+ 					   get_namespace_name(namespaceId));
+ 
+ 	/*
  	 * Don't allow ALTER TABLE on composite types. We want people to use ALTER
  	 * TYPE for that.
  	 */
***************
*** 2382,2395 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  	switch (cmd->subtype)
  	{
  		case AT_AddColumn:		/* ADD COLUMN */
! 			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
  			ATPrepAddColumn(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
  			break;
  		case AT_AddColumnToView:		/* add column via CREATE OR REPLACE
  										 * VIEW */
! 			ATSimplePermissions(rel, true);
  			/* Performs own recursion */
  			ATPrepAddColumn(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
--- 2388,2403 ----
  	switch (cmd->subtype)
  	{
  		case AT_AddColumn:		/* ADD COLUMN */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* Performs own recursion */
  			ATPrepAddColumn(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
  			break;
  		case AT_AddColumnToView:		/* add column via CREATE OR REPLACE
  										 * VIEW */
! 			ATCheckRelkind(rel, true, false);
! 			ATCheckNoCatalog(rel);
  			/* Performs own recursion */
  			ATPrepAddColumn(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
***************
*** 2402,2444 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			 * substitutes default values into INSERTs before it expands
  			 * rules.
  			 */
! 			ATSimplePermissions(rel, true);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
  			break;
  		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
! 			ATSimplePermissions(rel, false);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetNotNull:		/* ALTER COLUMN SET NOT NULL */
! 			ATSimplePermissions(rel, false);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_SetStatistics:	/* ALTER COLUMN SET STATISTICS */
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
! 			/* Performs own permission checks */
! 			ATPrepSetStatistics(rel, cmd->name, cmd->def);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetDistinct:	/* ALTER COLUMN SET STATISTICS DISTINCT */
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
! 			/* Performs own permission checks */
! 			ATPrepSetDistinct(rel, cmd->name, cmd->def);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetStorage:		/* ALTER COLUMN SET STORAGE */
! 			ATSimplePermissions(rel, false);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_DropColumn:		/* DROP COLUMN */
! 			ATSimplePermissions(rel, false);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
--- 2410,2455 ----
  			 * substitutes default values into INSERTs before it expands
  			 * rules.
  			 */
! 			ATCheckRelkind(rel, true, false);
! 			ATCheckNoCatalog(rel);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
  			break;
  		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetNotNull:		/* ALTER COLUMN SET NOT NULL */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_SetStatistics:	/* ALTER COLUMN SET STATISTICS */
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
! 			ATCheckRelkind(rel, false, true);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetDistinct:	/* ALTER COLUMN SET STATISTICS DISTINCT */
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
! 			ATCheckRelkind(rel, false, true);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetStorage:		/* ALTER COLUMN SET STORAGE */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_DropColumn:		/* DROP COLUMN */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
***************
*** 2446,2458 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			pass = AT_PASS_DROP;
  			break;
  		case AT_AddIndex:		/* ADD INDEX */
! 			ATSimplePermissions(rel, false);
  			/* This command never recurses */
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_INDEX;
  			break;
  		case AT_AddConstraint:	/* ADD CONSTRAINT */
! 			ATSimplePermissions(rel, false);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
--- 2457,2471 ----
  			pass = AT_PASS_DROP;
  			break;
  		case AT_AddIndex:		/* ADD INDEX */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* This command never recurses */
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_INDEX;
  			break;
  		case AT_AddConstraint:	/* ADD CONSTRAINT */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
***************
*** 2460,2466 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_DropConstraint:	/* DROP CONSTRAINT */
! 			ATSimplePermissions(rel, false);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
--- 2473,2480 ----
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_DropConstraint:	/* DROP CONSTRAINT */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
***************
*** 2468,2474 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			pass = AT_PASS_DROP;
  			break;
  		case AT_AlterColumnType:		/* ALTER COLUMN TYPE */
! 			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
  			ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
  			pass = AT_PASS_ALTER_TYPE;
--- 2482,2489 ----
  			pass = AT_PASS_DROP;
  			break;
  		case AT_AlterColumnType:		/* ALTER COLUMN TYPE */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* Performs own recursion */
  			ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
  			pass = AT_PASS_ALTER_TYPE;
***************
*** 2480,2499 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			break;
  		case AT_ClusterOn:		/* CLUSTER ON */
  		case AT_DropCluster:	/* SET WITHOUT CLUSTER */
! 			ATSimplePermissions(rel, false);
  			/* These commands never recurse */
  			/* No command-specific prep needed */
  			pass = AT_PASS_MISC;
  			break;
  		case AT_AddOids:		/* SET WITH OIDS */
! 			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
  			if (!rel->rd_rel->relhasoids || recursing)
  				ATPrepAddOids(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
  			break;
  		case AT_DropOids:		/* SET WITHOUT OIDS */
! 			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
  			if (rel->rd_rel->relhasoids)
  			{
--- 2495,2517 ----
  			break;
  		case AT_ClusterOn:		/* CLUSTER ON */
  		case AT_DropCluster:	/* SET WITHOUT CLUSTER */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* These commands never recurse */
  			/* No command-specific prep needed */
  			pass = AT_PASS_MISC;
  			break;
  		case AT_AddOids:		/* SET WITH OIDS */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* Performs own recursion */
  			if (!rel->rd_rel->relhasoids || recursing)
  				ATPrepAddOids(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
  			break;
  		case AT_DropOids:		/* SET WITHOUT OIDS */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* Performs own recursion */
  			if (rel->rd_rel->relhasoids)
  			{
***************
*** 2507,2520 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetTableSpace:	/* SET TABLESPACE */
! 			ATSimplePermissionsRelationOrIndex(rel);
  			/* This command never recurses */
  			ATPrepSetTableSpace(tab, rel, cmd->name);
  			pass = AT_PASS_MISC;	/* doesn't actually matter */
  			break;
  		case AT_SetRelOptions:	/* SET (...) */
  		case AT_ResetRelOptions:		/* RESET (...) */
! 			ATSimplePermissionsRelationOrIndex(rel);
  			/* This command never recurses */
  			/* No command-specific prep needed */
  			pass = AT_PASS_MISC;
--- 2525,2540 ----
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetTableSpace:	/* SET TABLESPACE */
! 			ATCheckRelkind(rel, false, true);
! 			ATCheckNoCatalog(rel);
  			/* This command never recurses */
  			ATPrepSetTableSpace(tab, rel, cmd->name);
  			pass = AT_PASS_MISC;	/* doesn't actually matter */
  			break;
  		case AT_SetRelOptions:	/* SET (...) */
  		case AT_ResetRelOptions:		/* RESET (...) */
! 			ATCheckRelkind(rel, false, true);
! 			ATCheckNoCatalog(rel);
  			/* This command never recurses */
  			/* No command-specific prep needed */
  			pass = AT_PASS_MISC;
***************
*** 2533,2539 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  		case AT_DisableRule:
  		case AT_AddInherit:		/* INHERIT / NO INHERIT */
  		case AT_DropInherit:
! 			ATSimplePermissions(rel, false);
  			/* These commands never recurse */
  			/* No command-specific prep needed */
  			pass = AT_PASS_MISC;
--- 2553,2560 ----
  		case AT_DisableRule:
  		case AT_AddInherit:		/* INHERIT / NO INHERIT */
  		case AT_DropInherit:
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* These commands never recurse */
  			/* No command-specific prep needed */
  			pass = AT_PASS_MISC;
***************
*** 2709,2715 **** ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
--- 2730,2746 ----
  			 */
  			break;
  		case AT_SetTableSpace:	/* SET TABLESPACE */
+ 			{
+ 				Oid			tablespaceId = tab->newTableSpace;
+ 				AclResult	aclresult;
  
+ 				ATCheckOwnership(rel);
+ 				aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
+ 												   ACL_CREATE);
+ 				if (aclresult != ACLCHECK_OK)
+ 					aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
+ 								   get_tablespace_name(tablespaceId));
+ 			}
  			/*
  			 * Nothing to do here; Phase 3 does the work
  			 */
***************
*** 3278,3343 **** ATGetQueueEntry(List **wqueue, Relation rel)
  }
  
  /*
!  * ATSimplePermissions
   *
!  * - Ensure that it is a relation (or possibly a view)
!  * - Ensure this user is the owner
!  * - Ensure that it is not a system table
   */
  static void
! ATSimplePermissions(Relation rel, bool allowView)
  {
! 	if (rel->rd_rel->relkind != RELKIND_RELATION)
! 	{
! 		if (allowView)
! 		{
! 			if (rel->rd_rel->relkind != RELKIND_VIEW)
! 				ereport(ERROR,
! 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 						 errmsg("\"%s\" is not a table or view",
! 								RelationGetRelationName(rel))));
! 		}
! 		else
! 			ereport(ERROR,
! 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 					 errmsg("\"%s\" is not a table",
! 							RelationGetRelationName(rel))));
! 	}
  
! 	/* Permissions checks */
! 	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
! 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
! 					   RelationGetRelationName(rel));
! 
! 	if (!allowSystemTableMods && IsSystemRelation(rel))
  		ereport(ERROR,
! 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
! 				 errmsg("permission denied: \"%s\" is a system catalog",
! 						RelationGetRelationName(rel))));
  }
  
- /*
-  * ATSimplePermissionsRelationOrIndex
-  *
-  * - Ensure that it is a relation or an index
-  * - Ensure this user is the owner
-  * - Ensure that it is not a system table
-  */
  static void
! ATSimplePermissionsRelationOrIndex(Relation rel)
  {
- 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
- 		rel->rd_rel->relkind != RELKIND_INDEX)
- 		ereport(ERROR,
- 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- 				 errmsg("\"%s\" is not a table or index",
- 						RelationGetRelationName(rel))));
- 
- 	/* Permissions checks */
  	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
  		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
  					   RelationGetRelationName(rel));
  
  	if (!allowSystemTableMods && IsSystemRelation(rel))
  		ereport(ERROR,
  				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
--- 3309,3347 ----
  }
  
  /*
!  * ATCheckRelkind
!  * ATCheckOwnership
!  * ATCheckOwnership
   *
!  * These are originally checked in ATSimplePermissions
   */
  static void
! ATCheckRelkind(Relation rel, bool viewOK, bool indexOK)
  {
! 	char	relkind = rel->rd_rel->relkind;
  
! 	if (relkind != RELKIND_RELATION &&
! 		(!viewOK || relkind != RELKIND_VIEW) &&
! 		(!indexOK || relkind != RELKIND_INDEX))
  		ereport(ERROR,
! 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 				 errmsg("\"%s\" is not a table%s%s",
! 						RelationGetRelationName(rel),
! 						!viewOK ? "" : " or view",
! 						!indexOK ? "" : " or index")));
  }
  
  static void
! ATCheckOwnership(Relation rel)
  {
  	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
  		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
  					   RelationGetRelationName(rel));
+ }
  
+ static void
+ ATCheckNoCatalog(Relation rel)
+ {
  	if (!allowSystemTableMods && IsSystemRelation(rel))
  		ereport(ERROR,
  				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
***************
*** 3584,3589 **** ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
--- 3588,3596 ----
  	Form_pg_type tform;
  	Expr	   *defval;
  
+ 	/* permission check to add a new column */
+ 	ATCheckOwnership(rel);
+ 
  	attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
  
  	/*
***************
*** 3884,3889 **** ATExecDropNotNull(Relation rel, const char *colName)
--- 3891,3899 ----
  	List	   *indexoidlist;
  	ListCell   *indexoidscan;
  
+ 	/* permission check */
+ 	ATCheckOwnership(rel);
+ 
  	/*
  	 * lookup the attribute
  	 */
***************
*** 3976,3981 **** ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
--- 3986,3994 ----
  	AttrNumber	attnum;
  	Relation	attr_rel;
  
+ 	/* permission check */
+ 	ATCheckOwnership(rel);
+ 
  	/*
  	 * lookup the attribute
  	 */
***************
*** 4026,4031 **** ATExecColumnDefault(Relation rel, const char *colName,
--- 4039,4047 ----
  {
  	AttrNumber	attnum;
  
+ 	/* permission check */
+ 	ATCheckOwnership(rel);
+ 
  	/*
  	 * get the number of the attribute
  	 */
***************
*** 4071,4098 **** ATExecColumnDefault(Relation rel, const char *colName,
   * ALTER TABLE ALTER COLUMN SET STATISTICS
   */
  static void
- ATPrepSetStatistics(Relation rel, const char *colName, Node *newValue)
- {
- 	/*
- 	 * We do our own permission checking because (a) we want to allow SET
- 	 * STATISTICS on indexes (for expressional index columns), and (b) we want
- 	 * to allow SET STATISTICS on system catalogs without requiring
- 	 * allowSystemTableMods to be turned on.
- 	 */
- 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
- 		rel->rd_rel->relkind != RELKIND_INDEX)
- 		ereport(ERROR,
- 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- 				 errmsg("\"%s\" is not a table or index",
- 						RelationGetRelationName(rel))));
- 
- 	/* Permissions checks */
- 	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
- 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- 					   RelationGetRelationName(rel));
- }
- 
- static void
  ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
  {
  	int			newtarget;
--- 4087,4092 ----
***************
*** 4100,4105 **** ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
--- 4094,4102 ----
  	HeapTuple	tuple;
  	Form_pg_attribute attrtuple;
  
+ 	/* permission check */
+ 	ATCheckOwnership(rel);
+ 
  	Assert(IsA(newValue, Integer));
  	newtarget = intVal(newValue);
  
***************
*** 4155,4182 **** ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
   * ALTER TABLE ALTER COLUMN SET STATISTICS DISTINCT
   */
  static void
- ATPrepSetDistinct(Relation rel, const char *colName, Node *newValue)
- {
- 	/*
- 	 * We do our own permission checking because (a) we want to allow SET
- 	 * DISTINCT on indexes (for expressional index columns), and (b) we want
- 	 * to allow SET DISTINCT on system catalogs without requiring
- 	 * allowSystemTableMods to be turned on.
- 	 */
- 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
- 		rel->rd_rel->relkind != RELKIND_INDEX)
- 		ereport(ERROR,
- 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- 				 errmsg("\"%s\" is not a table or index",
- 						RelationGetRelationName(rel))));
- 
- 	/* Permissions checks */
- 	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
- 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- 					   RelationGetRelationName(rel));
- }
- 
- static void
  ATExecSetDistinct(Relation rel, const char *colName, Node *newValue)
  {
  	float4		newdistinct;
--- 4152,4157 ----
***************
*** 4184,4189 **** ATExecSetDistinct(Relation rel, const char *colName, Node *newValue)
--- 4159,4167 ----
  	HeapTuple	tuple;
  	Form_pg_attribute attrtuple;
  
+ 	/* permission check */
+ 	ATCheckOwnership(rel);
+ 
  	switch (nodeTag(newValue))
  	{
  		case T_Integer:
***************
*** 4251,4256 **** ATExecSetStorage(Relation rel, const char *colName, Node *newValue)
--- 4229,4237 ----
  	HeapTuple	tuple;
  	Form_pg_attribute attrtuple;
  
+ 	/* permission check */
+ 	ATCheckOwnership(rel);
+ 
  	Assert(IsA(newValue, String));
  	storagemode = strVal(newValue);
  
***************
*** 4332,4340 **** ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
  	List	   *children;
  	ObjectAddress object;
  
! 	/* At top level, permission check was done in ATPrepCmd, else do it */
  	if (recursing)
! 		ATSimplePermissions(rel, false);
  
  	/*
  	 * get the number of the attribute
--- 4313,4327 ----
  	List	   *children;
  	ObjectAddress object;
  
! 	/* At top level, sanity check was done in ATPrepCmd, else do it */
  	if (recursing)
! 	{
! 		ATCheckRelkind(rel, false, false);
! 		ATCheckNoCatalog(rel);
! 	}
! 
! 	/* permission check */
! 	ATCheckOwnership(rel);
  
  	/*
  	 * get the number of the attribute
***************
*** 4525,4530 **** ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
--- 4512,4528 ----
  
  	Assert(IsA(stmt, IndexStmt));
  
+ 	/*
+      * XXX - IMO, we should apply all the permission checks in DefineIndex()
+ 	 * when it is invoked with check_rights == true.
+ 	 * Here are only two code paths. The one is ATExecAddIndex() by AT_AddIndex,
+ 	 * the other is standard_ProcessUtility() by T_IndexStmt which also calls
+ 	 * CheckRelationOwnership() on the relation to be indexed just before
+ 	 * DefineIndex() invocation.
+ 	 */
+ 	ATCheckOwnership(rel);
+ 	/* XXX - ACL_CREATE on pg_namespace should be checked here? */
+ 
  	/* suppress schema rights check when rebuilding existing index */
  	check_rights = !is_rebuild;
  	/* skip index build if phase 3 will have to rewrite table anyway */
***************
*** 4634,4642 **** ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
  	List	   *children;
  	ListCell   *child;
  
! 	/* At top level, permission check was done in ATPrepCmd, else do it */
  	if (recursing)
! 		ATSimplePermissions(rel, false);
  
  	/*
  	 * Call AddRelationNewConstraints to do the work, making sure it works on
--- 4632,4645 ----
  	List	   *children;
  	ListCell   *child;
  
! 	/* At top level, sanity check was done in ATPrepCmd, else do it */
  	if (recursing)
! 	{
! 		ATCheckRelkind(rel, false, false);
! 		ATCheckNoCatalog(rel);
! 	}
! 	/* Permission check */
! 	ATCheckOwnership(rel);
  
  	/*
  	 * Call AddRelationNewConstraints to do the work, making sure it works on
***************
*** 4754,4770 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
  	 * Validity checks (permission checks wait till we have the column
  	 * numbers)
  	 */
! 	if (pkrel->rd_rel->relkind != RELKIND_RELATION)
! 		ereport(ERROR,
! 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 				 errmsg("referenced relation \"%s\" is not a table",
! 						RelationGetRelationName(pkrel))));
! 
! 	if (!allowSystemTableMods && IsSystemRelation(pkrel))
! 		ereport(ERROR,
! 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
! 				 errmsg("permission denied: \"%s\" is a system catalog",
! 						RelationGetRelationName(pkrel))));
  
  	/*
  	 * Disallow reference from permanent table to temp table or vice versa.
--- 4757,4764 ----
  	 * Validity checks (permission checks wait till we have the column
  	 * numbers)
  	 */
! 	ATCheckRelkind(pkrel, false, false);
! 	ATCheckNoCatalog(pkrel);
  
  	/*
  	 * Disallow reference from permanent table to temp table or vice versa.
***************
*** 4832,4837 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
--- 4826,4832 ----
  	/*
  	 * Now we can check permissions.
  	 */
+ 	ATCheckOwnership(rel);
  	checkFkeyPermissions(pkrel, pkattnum, numpks);
  	checkFkeyPermissions(rel, fkattnum, numfks);
  
***************
*** 5582,5590 **** ATExecDropConstraint(Relation rel, const char *constrName,
  	bool		found = false;
  	bool		is_check_constraint = false;
  
! 	/* At top level, permission check was done in ATPrepCmd, else do it */
  	if (recursing)
! 		ATSimplePermissions(rel, false);
  
  	conrel = heap_open(ConstraintRelationId, RowExclusiveLock);
  
--- 5577,5590 ----
  	bool		found = false;
  	bool		is_check_constraint = false;
  
! 	/* At top level, sanity check was done in ATPrepCmd, else do it */
  	if (recursing)
! 	{
! 		ATCheckRelkind(rel, false, false);
! 		ATCheckNoCatalog(rel);
! 	}
! 	/* Permission checks */
! 	ATCheckOwnership(rel);
  
  	conrel = heap_open(ConstraintRelationId, RowExclusiveLock);
  
***************
*** 5915,5920 **** ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
--- 5915,5923 ----
  	SysScanDesc scan;
  	HeapTuple	depTup;
  
+ 	/* Permission checks */
+ 	ATCheckOwnership(rel);
+ 
  	attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
  
  	/* Look up the target column */
***************
*** 6695,6700 **** ATExecClusterOn(Relation rel, const char *indexName)
--- 6698,6706 ----
  {
  	Oid			indexOid;
  
+ 	/* Permission checks */
+ 	ATCheckOwnership(rel);
+ 
  	indexOid = get_relname_relid(indexName, rel->rd_rel->relnamespace);
  
  	if (!OidIsValid(indexOid))
***************
*** 6719,6724 **** ATExecClusterOn(Relation rel, const char *indexName)
--- 6725,6733 ----
  static void
  ATExecDropCluster(Relation rel)
  {
+ 	/* Permission checks */
+ 	ATCheckOwnership(rel);
+ 
  	mark_index_clustered(rel, InvalidOid);
  }
  
***************
*** 6729,6735 **** static void
  ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
  {
  	Oid			tablespaceId;
- 	AclResult	aclresult;
  
  	/* Check that the tablespace exists */
  	tablespaceId = get_tablespace_oid(tablespacename);
--- 6738,6743 ----
***************
*** 6738,6748 **** ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
  				(errcode(ERRCODE_UNDEFINED_OBJECT),
  				 errmsg("tablespace \"%s\" does not exist", tablespacename)));
  
- 	/* Check its permissions */
- 	aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), ACL_CREATE);
- 	if (aclresult != ACLCHECK_OK)
- 		aclcheck_error(aclresult, ACL_KIND_TABLESPACE, tablespacename);
- 
  	/* Save info for Phase 3 to do the real work */
  	if (OidIsValid(tab->newTableSpace))
  		ereport(ERROR,
--- 6746,6751 ----
***************
*** 6769,6774 **** ATExecSetRelOptions(Relation rel, List *defList, bool isReset)
--- 6772,6780 ----
  	bool		repl_repl[Natts_pg_class];
  	static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
  
+ 	/* Permission checks */
+ 	ATCheckOwnership(rel);
+ 
  	if (defList == NIL)
  		return;					/* nothing to do */
  
***************
*** 7091,7096 **** static void
--- 7097,7105 ----
  ATExecEnableDisableTrigger(Relation rel, char *trigname,
  						   char fires_when, bool skip_system)
  {
+ 	/* Permission checks */
+ 	ATCheckOwnership(rel);
+ 
  	EnableDisableTrigger(rel, trigname, fires_when, skip_system);
  }
  
***************
*** 7103,7108 **** static void
--- 7112,7123 ----
  ATExecEnableDisableRule(Relation rel, char *trigname,
  						char fires_when)
  {
+ 	/*
+ 	 * Permission checks
+ 	 * XXX - pg_class_ownercheck() in EnableDisableRule() should be removed
+ 	 */
+ 	ATCheckOwnership(rel);
+ 
  	EnableDisableRule(rel, trigname, fires_when);
  }
  
***************
*** 7131,7140 **** ATExecAddInherit(Relation child_rel, RangeVar *parent)
  	parent_rel = heap_openrv(parent, AccessShareLock);
  
  	/*
! 	 * Must be owner of both parent and child -- child was checked by
! 	 * ATSimplePermissions call in ATPrepCmd
  	 */
! 	ATSimplePermissions(parent_rel, false);
  
  	/* Permanent rels cannot inherit from temporary ones */
  	if (parent_rel->rd_istemp && !child_rel->rd_istemp)
--- 7146,7161 ----
  	parent_rel = heap_openrv(parent, AccessShareLock);
  
  	/*
! 	 * Sanity check
  	 */
! 	ATCheckRelkind(parent_rel, false, false);
! 	ATCheckNoCatalog(parent_rel);
! 
! 	/*
! 	 * Must be owner of both parent and child
! 	 */
! 	ATCheckOwnership(child_rel);
! 	ATCheckOwnership(parent_rel);
  
  	/* Permanent rels cannot inherit from temporary ones */
  	if (parent_rel->rd_istemp && !child_rel->rd_istemp)
***************
*** 7782,7787 **** AlterTableNamespace(RangeVar *relation, const char *newschema,
--- 7803,7816 ----
  							RelationGetRelationName(rel))));
  	}
  
+ 	/*
+ 	 * Permission checks
+ 	 * XXX - CheckRelationOwnership() should be removed from alter.c
+ 	 */
+ 	ATCheckOwnership(rel);
+ 
+ 	ATCheckNoCatalog(rel);
+ 
  	/* get schema OID and check its permissions */
  	nspOid = LookupCreationNamespace(newschema);
  
pgsql-at-rework-prep.1.patchtext/x-patch; name=pgsql-at-rework-prep.1.patchDownload
*** a/src/backend/commands/tablecmds.c
--- b/src/backend/commands/tablecmds.c
***************
*** 254,259 **** static void validateForeignKeyConstraint(Constraint *fkconstraint,
--- 254,260 ----
  static void createForeignKeyTriggers(Relation rel, Constraint *fkconstraint,
  						 Oid constraintOid, Oid indexOid);
  static void ATController(Relation rel, List *cmds, bool recurse);
+ static void ATPermCmd(Relation rel, AlterTableCmd *cmd);
  static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  		  bool recurse, bool recursing);
  static void ATRewriteCatalogs(List **wqueue);
***************
*** 262,267 **** static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
--- 263,271 ----
  static void ATRewriteTables(List **wqueue);
  static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
  static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
+ static void ATCheckRelkind(Relation rel, bool viewOK, bool indexOK);
+ static void ATCheckOwnership(Relation rel);
+ static void ATCheckNoCatalog(Relation rel);
  static void ATSimplePermissions(Relation rel, bool allowView);
  static void ATSimplePermissionsRelationOrIndex(Relation rel);
  static void ATSimpleRecursion(List **wqueue, Relation rel,
***************
*** 284,291 **** static void ATPrepSetStatistics(Relation rel, const char *colName,
  					Node *newValue);
  static void ATExecSetStatistics(Relation rel, const char *colName,
  					Node *newValue);
- static void ATPrepSetDistinct(Relation rel, const char *colName,
- 					Node *newValue);
  static void ATExecSetDistinct(Relation rel, const char *colName,
  				 Node *newValue);
  static void ATExecSetStorage(Relation rel, const char *colName,
--- 288,293 ----
***************
*** 1929,1942 **** renameatt(Oid myrelid,
  	 *
  	 * normally, only the owner of a class can change its schema.
  	 */
! 	if (!pg_class_ownercheck(myrelid, GetUserId()))
! 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
! 					   RelationGetRelationName(targetrelation));
! 	if (!allowSystemTableMods && IsSystemRelation(targetrelation))
! 		ereport(ERROR,
! 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
! 				 errmsg("permission denied: \"%s\" is a system catalog",
! 						RelationGetRelationName(targetrelation))));
  
  	/*
  	 * if the 'recurse' flag is set then we are supposed to rename this
--- 1931,1938 ----
  	 *
  	 * normally, only the owner of a class can change its schema.
  	 */
! 	ATCheckOwnership(targetrelation);
! 	ATCheckNoCatalog(targetrelation);
  
  	/*
  	 * if the 'recurse' flag is set then we are supposed to rename this
***************
*** 2047,2052 **** RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
--- 2043,2049 ----
  {
  	Relation	targetrelation;
  	Oid			namespaceId;
+ 	AclResult	aclresult;
  	char		relkind;
  
  	/*
***************
*** 2075,2080 **** RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
--- 2072,2089 ----
  						RelationGetRelationName(targetrelation))));
  
  	/*
+ 	 * XXX - we assume CheckRelationOwnership() and pg_namespace_aclcheck()
+ 	 * should be moved to here from ExecRenameStmt().
+ 	 */
+ 	ATCheckOwnership(targetrelation);
+ 	aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(), ACL_CREATE);
+ 	if (aclresult != ACLCHECK_OK)
+ 		aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
+ 					   get_namespace_name(namespaceId));
+ 
+ 	ATCheckNoCatalog(targetrelation);
+ 
+ 	/*
  	 * Don't allow ALTER TABLE on composite types. We want people to use ALTER
  	 * TYPE for that.
  	 */
***************
*** 2349,2354 **** ATController(Relation rel, List *cmds, bool recurse)
--- 2358,2462 ----
  }
  
  /*
+  * ATPermCmd
+  *
+  * It checks permission on the given Relation to be altered, if possible.
+  * Some of ALTER TABLE option needs to be checked during execution phase.
+  */
+ static void
+ ATPermCmd(Relation rel, AlterTableCmd *cmd)
+ {
+ 	switch (cmd->subtype)
+ 	{
+ 		case AT_AddColumn:		/* ADD COLUMN */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_AddColumnToView:	/* add column via CREATE OR REPLACE VIEW */
+ 			ATSimplePermissions(rel, true);
+ 			break;
+ 		case AT_ColumnDefault:	/* ALTER COLUMN DEFAULT */
+ 			ATSimplePermissions(rel, true);
+ 			break;
+ 		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_SetNotNull:		/* ALTER COLUMN SET NOT NULL */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_SetStatistics:	/* ALTER COLUMN SET STATISTICS */
+ 		case AT_SetDistinct:	/* ALTER COLUMN SET STATISTICS DISTINCT */
+ 			ATCheckRelkind(rel, false, true);
+ 			ATCheckOwnership(rel);
+ 			break;
+ 		case AT_SetStorage:		/* ALTER COLUMN SET STORAGE */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_DropColumn:		/* DROP COLUMN */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_AddIndex:		/* ADD INDEX */
+ 			/* Permission checks are during execution phase */
+ 			break;
+ 		case AT_AddConstraint:	/* ADD CONSTRAINT */
+ 			/* Permission checks are during execution phase */
+ 			break;
+ 		case AT_DropConstraint:	/* DROP CONSTRAINT */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_AlterColumnType:		/* ALTER COLUMN TYPE */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_ChangeOwner:	/* ALTER OWNER */
+ 			/* Permission checks are during execution phase */
+ 			break;
+ 		case AT_ClusterOn:		/* CLUSTER ON */
+ 		case AT_DropCluster:	/* SET WITHOUT CLUSTER */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_AddOids:		/* SET WITH OIDS */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_DropOids:		/* SET WITHOUT OIDS */
+ 			ATSimplePermissions(rel, false);
+ 			/*
+ 			 * Note that ATPrepCmd() with AT_DropOids also calls ATPrepCmd() with
+ 			 * pseudo AT_DropColumn recursively, so it applies permission checks
+ 			 * twice.
+ 			 */
+ 			break;
+ 		case AT_SetTableSpace:	/* SET TABLESPACE */
+ 			/* Permission checks are during ATPrepSetTablespace() */
+ 			break;
+ 		case AT_SetRelOptions:	/* SET (...) */
+ 		case AT_ResetRelOptions:		/* RESET (...) */
+ 			ATSimplePermissionsRelationOrIndex(rel);
+ 			break;
+ 		case AT_EnableTrig:		/* ENABLE TRIGGER variants */
+ 		case AT_EnableAlwaysTrig:
+ 		case AT_EnableReplicaTrig:
+ 		case AT_EnableTrigAll:
+ 		case AT_EnableTrigUser:
+ 		case AT_DisableTrig:	/* DISABLE TRIGGER variants */
+ 		case AT_DisableTrigAll:
+ 		case AT_DisableTrigUser:
+ 		case AT_EnableRule:		/* ENABLE/DISABLE RULE variants */
+ 		case AT_EnableAlwaysRule:
+ 		case AT_EnableReplicaRule:
+ 		case AT_DisableRule:
+ 		case AT_DropInherit:	/* NO INHERIT */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_AddInherit:		/* INHERIT */
+ 			/* Permission checks are during preparation phase */
+ 			break;
+ 		default:				/* oops */
+ 			elog(ERROR, "unrecognized alter table type: %d",
+ 				 (int) cmd->subtype);
+ 			break;
+ 	}
+ }
+ 
+ /*
   * ATPrepCmd
   *
   * Traffic cop for ALTER TABLE Phase 1 operations, including simple
***************
*** 2382,2395 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  	switch (cmd->subtype)
  	{
  		case AT_AddColumn:		/* ADD COLUMN */
- 			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
  			ATPrepAddColumn(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
  			break;
  		case AT_AddColumnToView:		/* add column via CREATE OR REPLACE
  										 * VIEW */
- 			ATSimplePermissions(rel, true);
  			/* Performs own recursion */
  			ATPrepAddColumn(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
--- 2490,2501 ----
***************
*** 2402,2444 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			 * substitutes default values into INSERTs before it expands
  			 * rules.
  			 */
- 			ATSimplePermissions(rel, true);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
  			break;
  		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
- 			ATSimplePermissions(rel, false);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetNotNull:		/* ALTER COLUMN SET NOT NULL */
- 			ATSimplePermissions(rel, false);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_SetStatistics:	/* ALTER COLUMN SET STATISTICS */
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
- 			/* Performs own permission checks */
- 			ATPrepSetStatistics(rel, cmd->name, cmd->def);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetDistinct:	/* ALTER COLUMN SET STATISTICS DISTINCT */
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
- 			/* Performs own permission checks */
- 			ATPrepSetDistinct(rel, cmd->name, cmd->def);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetStorage:		/* ALTER COLUMN SET STORAGE */
- 			ATSimplePermissions(rel, false);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_DropColumn:		/* DROP COLUMN */
- 			ATSimplePermissions(rel, false);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
--- 2508,2541 ----
***************
*** 2446,2458 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			pass = AT_PASS_DROP;
  			break;
  		case AT_AddIndex:		/* ADD INDEX */
- 			ATSimplePermissions(rel, false);
  			/* This command never recurses */
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_INDEX;
  			break;
  		case AT_AddConstraint:	/* ADD CONSTRAINT */
- 			ATSimplePermissions(rel, false);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
--- 2543,2553 ----
***************
*** 2460,2466 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_DropConstraint:	/* DROP CONSTRAINT */
- 			ATSimplePermissions(rel, false);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
--- 2555,2560 ----
***************
*** 2468,2474 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			pass = AT_PASS_DROP;
  			break;
  		case AT_AlterColumnType:		/* ALTER COLUMN TYPE */
- 			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
  			ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
  			pass = AT_PASS_ALTER_TYPE;
--- 2562,2567 ----
***************
*** 2480,2499 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			break;
  		case AT_ClusterOn:		/* CLUSTER ON */
  		case AT_DropCluster:	/* SET WITHOUT CLUSTER */
- 			ATSimplePermissions(rel, false);
  			/* These commands never recurse */
  			/* No command-specific prep needed */
  			pass = AT_PASS_MISC;
  			break;
  		case AT_AddOids:		/* SET WITH OIDS */
- 			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
  			if (!rel->rd_rel->relhasoids || recursing)
  				ATPrepAddOids(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
  			break;
  		case AT_DropOids:		/* SET WITHOUT OIDS */
- 			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
  			if (rel->rd_rel->relhasoids)
  			{
--- 2573,2589 ----
***************
*** 2507,2524 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetTableSpace:	/* SET TABLESPACE */
- 			ATSimplePermissionsRelationOrIndex(rel);
  			/* This command never recurses */
  			ATPrepSetTableSpace(tab, rel, cmd->name);
  			pass = AT_PASS_MISC;	/* doesn't actually matter */
  			break;
  		case AT_SetRelOptions:	/* SET (...) */
  		case AT_ResetRelOptions:		/* RESET (...) */
- 			ATSimplePermissionsRelationOrIndex(rel);
- 			/* This command never recurses */
- 			/* No command-specific prep needed */
- 			pass = AT_PASS_MISC;
- 			break;
  		case AT_EnableTrig:		/* ENABLE TRIGGER variants */
  		case AT_EnableAlwaysTrig:
  		case AT_EnableReplicaTrig:
--- 2597,2608 ----
***************
*** 2533,2539 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  		case AT_DisableRule:
  		case AT_AddInherit:		/* INHERIT / NO INHERIT */
  		case AT_DropInherit:
- 			ATSimplePermissions(rel, false);
  			/* These commands never recurse */
  			/* No command-specific prep needed */
  			pass = AT_PASS_MISC;
--- 2617,2622 ----
***************
*** 3278,3314 **** ATGetQueueEntry(List **wqueue, Relation rel)
  }
  
  /*
!  * ATSimplePermissions
   *
!  * - Ensure that it is a relation (or possibly a view)
!  * - Ensure this user is the owner
!  * - Ensure that it is not a system table
   */
  static void
! ATSimplePermissions(Relation rel, bool allowView)
  {
! 	if (rel->rd_rel->relkind != RELKIND_RELATION)
! 	{
! 		if (allowView)
! 		{
! 			if (rel->rd_rel->relkind != RELKIND_VIEW)
! 				ereport(ERROR,
! 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 						 errmsg("\"%s\" is not a table or view",
! 								RelationGetRelationName(rel))));
! 		}
! 		else
! 			ereport(ERROR,
! 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 					 errmsg("\"%s\" is not a table",
! 							RelationGetRelationName(rel))));
! 	}
  
! 	/* Permissions checks */
  	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
  		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
  					   RelationGetRelationName(rel));
  
  	if (!allowSystemTableMods && IsSystemRelation(rel))
  		ereport(ERROR,
  				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
--- 3361,3399 ----
  }
  
  /*
!  * ATCheckRelkind
!  * ATCheckOwnership
!  * ATCheckOwnership
   *
!  * These are originally checked in ATSimplePermissions
   */
  static void
! ATCheckRelkind(Relation rel, bool viewOK, bool indexOK)
  {
! 	char	relkind = rel->rd_rel->relkind;
  
! 	if (relkind != RELKIND_RELATION &&
! 		(!viewOK || relkind != RELKIND_RELATION) &&
! 		(!indexOK || relkind != RELKIND_INDEX))
! 		ereport(ERROR,
! 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 				 errmsg("\"%s\" is not a table%s%s",
! 						RelationGetRelationName(rel),
! 						!viewOK ? "" : " or view",
! 						!indexOK ? "" : " or index")));
! }
! 
! static void
! ATCheckOwnership(Relation rel)
! {
  	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
  		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
  					   RelationGetRelationName(rel));
+ }
  
+ static void
+ ATCheckNoCatalog(Relation rel)
+ {
  	if (!allowSystemTableMods && IsSystemRelation(rel))
  		ereport(ERROR,
  				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
***************
*** 3317,3322 **** ATSimplePermissions(Relation rel, bool allowView)
--- 3402,3422 ----
  }
  
  /*
+  * ATSimplePermissions
+  *
+  * - Ensure that it is a relation (or possibly a view)
+  * - Ensure this user is the owner
+  * - Ensure that it is not a system table
+  */
+ static void
+ ATSimplePermissions(Relation rel, bool allowView)
+ {
+ 	ATCheckRelkind(rel, allowView, false);
+ 	ATCheckOwnership(rel);
+ 	ATCheckNoCatalog(rel);
+ }
+ 
+ /*
   * ATSimplePermissionsRelationOrIndex
   *
   * - Ensure that it is a relation or an index
***************
*** 3326,3348 **** ATSimplePermissions(Relation rel, bool allowView)
  static void
  ATSimplePermissionsRelationOrIndex(Relation rel)
  {
! 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
! 		rel->rd_rel->relkind != RELKIND_INDEX)
! 		ereport(ERROR,
! 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 				 errmsg("\"%s\" is not a table or index",
! 						RelationGetRelationName(rel))));
! 
! 	/* Permissions checks */
! 	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
! 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
! 					   RelationGetRelationName(rel));
! 
! 	if (!allowSystemTableMods && IsSystemRelation(rel))
! 		ereport(ERROR,
! 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
! 				 errmsg("permission denied: \"%s\" is a system catalog",
! 						RelationGetRelationName(rel))));
  }
  
  /*
--- 3426,3434 ----
  static void
  ATSimplePermissionsRelationOrIndex(Relation rel)
  {
! 	ATCheckRelkind(rel, false, true);
! 	ATCheckOwnership(rel);
! 	ATCheckNoCatalog(rel);
  }
  
  /*
***************
*** 4071,4098 **** ATExecColumnDefault(Relation rel, const char *colName,
   * ALTER TABLE ALTER COLUMN SET STATISTICS
   */
  static void
- ATPrepSetStatistics(Relation rel, const char *colName, Node *newValue)
- {
- 	/*
- 	 * We do our own permission checking because (a) we want to allow SET
- 	 * STATISTICS on indexes (for expressional index columns), and (b) we want
- 	 * to allow SET STATISTICS on system catalogs without requiring
- 	 * allowSystemTableMods to be turned on.
- 	 */
- 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
- 		rel->rd_rel->relkind != RELKIND_INDEX)
- 		ereport(ERROR,
- 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- 				 errmsg("\"%s\" is not a table or index",
- 						RelationGetRelationName(rel))));
- 
- 	/* Permissions checks */
- 	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
- 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- 					   RelationGetRelationName(rel));
- }
- 
- static void
  ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
  {
  	int			newtarget;
--- 4157,4162 ----
***************
*** 4155,4182 **** ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
   * ALTER TABLE ALTER COLUMN SET STATISTICS DISTINCT
   */
  static void
- ATPrepSetDistinct(Relation rel, const char *colName, Node *newValue)
- {
- 	/*
- 	 * We do our own permission checking because (a) we want to allow SET
- 	 * DISTINCT on indexes (for expressional index columns), and (b) we want
- 	 * to allow SET DISTINCT on system catalogs without requiring
- 	 * allowSystemTableMods to be turned on.
- 	 */
- 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
- 		rel->rd_rel->relkind != RELKIND_INDEX)
- 		ereport(ERROR,
- 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- 				 errmsg("\"%s\" is not a table or index",
- 						RelationGetRelationName(rel))));
- 
- 	/* Permissions checks */
- 	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
- 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- 					   RelationGetRelationName(rel));
- }
- 
- static void
  ATExecSetDistinct(Relation rel, const char *colName, Node *newValue)
  {
  	float4		newdistinct;
--- 4219,4224 ----
***************
*** 4525,4530 **** ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
--- 4567,4582 ----
  
  	Assert(IsA(stmt, IndexStmt));
  
+ 	/*
+ 	 * XXX - IMO, we should apply all the permission checks in DefineIndex()
+ 	 * when it is invoked with check_rights == true.
+ 	 * Here are only two code paths. The one is ATExecAddIndex() by AT_AddIndex,
+ 	 * the other is standard_ProcessUtility() by T_IndexStmt which also calls
+ 	 * CheckRelationOwnership() on the relation to be indexed just before
+ 	 * DefineIndex() invocation.
+ 	 */
+ 	ATSimplePermissions(rel, false);
+ 
  	/* suppress schema rights check when rebuilding existing index */
  	check_rights = !is_rebuild;
  	/* skip index build if phase 3 will have to rewrite table anyway */
***************
*** 4634,4642 **** ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
  	List	   *children;
  	ListCell   *child;
  
! 	/* At top level, permission check was done in ATPrepCmd, else do it */
! 	if (recursing)
! 		ATSimplePermissions(rel, false);
  
  	/*
  	 * Call AddRelationNewConstraints to do the work, making sure it works on
--- 4686,4693 ----
  	List	   *children;
  	ListCell   *child;
  
! 	/* ATPermCmd does not check permission */
! 	ATSimplePermissions(rel, false);
  
  	/*
  	 * Call AddRelationNewConstraints to do the work, making sure it works on
***************
*** 4742,4747 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
--- 4793,4804 ----
  	Oid			constrOid;
  
  	/*
+ 	 * Sanity checks can be early
+ 	 */
+ 	ATCheckRelkind(rel, false, false);
+ 	ATCheckNoCatalog(rel);
+ 
+ 	/*
  	 * Grab an exclusive lock on the pk table, so that someone doesn't delete
  	 * rows out from under us. (Although a lesser lock would do for that
  	 * purpose, we'll need exclusive lock anyway to add triggers to the pk
***************
*** 4754,4770 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
  	 * Validity checks (permission checks wait till we have the column
  	 * numbers)
  	 */
! 	if (pkrel->rd_rel->relkind != RELKIND_RELATION)
! 		ereport(ERROR,
! 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 				 errmsg("referenced relation \"%s\" is not a table",
! 						RelationGetRelationName(pkrel))));
! 
! 	if (!allowSystemTableMods && IsSystemRelation(pkrel))
! 		ereport(ERROR,
! 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
! 				 errmsg("permission denied: \"%s\" is a system catalog",
! 						RelationGetRelationName(pkrel))));
  
  	/*
  	 * Disallow reference from permanent table to temp table or vice versa.
--- 4811,4818 ----
  	 * Validity checks (permission checks wait till we have the column
  	 * numbers)
  	 */
! 	ATCheckRelkind(pkrel, false, false);
! 	ATCheckNoCatalog(pkrel);
  
  	/*
  	 * Disallow reference from permanent table to temp table or vice versa.
***************
*** 4832,4837 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
--- 4880,4886 ----
  	/*
  	 * Now we can check permissions.
  	 */
+ 	ATCheckOwnership(rel);
  	checkFkeyPermissions(pkrel, pkattnum, numpks);
  	checkFkeyPermissions(rel, fkattnum, numfks);
  
***************
*** 6739,6748 **** ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
--- 6788,6802 ----
  				 errmsg("tablespace \"%s\" does not exist", tablespacename)));
  
  	/* Check its permissions */
+ 	ATCheckRelkind(rel, false, true);
+ 	ATCheckOwnership(rel);
+ 
  	aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), ACL_CREATE);
  	if (aclresult != ACLCHECK_OK)
  		aclcheck_error(aclresult, ACL_KIND_TABLESPACE, tablespacename);
  
+ 	ATCheckNoCatalog(rel);
+ 
  	/* Save info for Phase 3 to do the real work */
  	if (OidIsValid(tab->newTableSpace))
  		ereport(ERROR,
***************
*** 7131,7139 **** ATExecAddInherit(Relation child_rel, RangeVar *parent)
  	parent_rel = heap_openrv(parent, AccessShareLock);
  
  	/*
! 	 * Must be owner of both parent and child -- child was checked by
! 	 * ATSimplePermissions call in ATPrepCmd
  	 */
  	ATSimplePermissions(parent_rel, false);
  
  	/* Permanent rels cannot inherit from temporary ones */
--- 7185,7193 ----
  	parent_rel = heap_openrv(parent, AccessShareLock);
  
  	/*
! 	 * Must be owner of both parent and child
  	 */
+ 	ATSimplePermissions(child_rel, false);
  	ATSimplePermissions(parent_rel, false);
  
  	/* Permanent rels cannot inherit from temporary ones */
***************
*** 7782,7787 **** AlterTableNamespace(RangeVar *relation, const char *newschema,
--- 7836,7847 ----
  							RelationGetRelationName(rel))));
  	}
  
+ 	/*
+ 	 * XXX - No need to check relkind here
+ 	 */
+ 	ATCheckOwnership(rel);
+ 	ATCheckNoCatalog(rel);
+ 
  	/* get schema OID and check its permissions */
  	nspOid = LookupCreationNamespace(newschema);