Reducing some DDL Locks to ShareLock

Started by Simon Riggsover 17 years ago1 messages
#1Simon Riggs
simon@2ndQuadrant.com
1 attachment(s)

It seems possible to change some DDL commands/subcommands to use a
ShareLock rather than an AccessExclusiveLock. Enclosed patch implements
this reduction for CREATE TRIGGER, CREATE RULE and ALTER TABLE.

We can relax locking as long as we can verify that the changes effect
DML statements *only*. I've marked the commands/subcommands this can be
applied to with an "s" (for ShareLock) and noted others that must remain
"x" (for AccessExclusiveLock).

s CREATE RULE (only non-ON SELECT rules)

s CREATE TRIGGER

ALTER TABLE
x RENAME [ COLUMN ] column TO new_column
x RENAME TO new_name
x SET SCHEMA new_schema
x ADD [ COLUMN ] column type [ column_constraint [ ... ] ]
x DROP [ COLUMN ] column [ RESTRICT | CASCADE ]
x ALTER [ COLUMN ] column TYPE type [ USING expression ]
x ALTER [ COLUMN ] column SET DEFAULT expression
x ALTER [ COLUMN ] column DROP DEFAULT
s ALTER [ COLUMN ] column SET NOT NULL
x ALTER [ COLUMN ] column DROP NOT NULL
s ALTER [ COLUMN ] column SET STATISTICS integer
s ALTER [ COLUMN ] column SET STORAGE { PLAIN | EXTERNAL | EXTENDED | MAIN }
s ADD table_constraint
x DROP CONSTRAINT constraint_name [ RESTRICT | CASCADE ]
s DISABLE TRIGGER [ trigger_name | ALL | USER ]
s ENABLE TRIGGER [ trigger_name | ALL | USER ]
s ENABLE REPLICA TRIGGER trigger_name
s ENABLE ALWAYS TRIGGER trigger_name
x DISABLE RULE rewrite_rule_name
x ENABLE RULE rewrite_rule_name
x ENABLE REPLICA RULE rewrite_rule_name
x ENABLE ALWAYS RULE rewrite_rule_name
s CLUSTER ON index_name
s SET WITHOUT CLUSTER
x SET WITHOUT OIDS
s SET ( storage_parameter = value [, ... ] )
s RESET ( storage_parameter [, ... ] )
x INHERIT parent_table
x NO INHERIT parent_table
x OWNER TO new_owner
x SET TABLESPACE new_tablespace

If an ALTER TABLE has more than one sub-command we take the highest lock
to cover the execution of all sub-commands, since we want to avoid lock
upgrades and deadlocks.

e.g.
ALTER TABLE foo ADD PRIMARY KEY (col1); will take ShareLock
whereas
ALTER TABLE foo ADD PRIMARY KEY (col1), INHERIT bar; will require an
AccessExclusiveLock

I've checked definitions of these subcommands here
http://developer.postgresql.org/pgdocs/postgres/sql-altertable.html
As noted, Rules can be ON SELECT, so we can't enable/disable them
without an AccessExclusiveLock. Other than that, many of the subcommands
don't change the actual table data, they just change settings for future
DML changes to the table.

This will
* allow ALTER TABLE ADD FOREIGN KEY to run in parallel with pg_restore
* allow trigger-based replication systems to add triggers more easily
* reduce the number of AccessExclusiveLocks that have to be dealt with
in Hot Standby mode.

Couple of points of note:

* Patch implements this by changing pg_class.reltriggers so it only has
two values, 0 or 1. Do we want to change this to a boolean, or leave as
an int2 for backwards compatibility? Code works either way. Enclosed
patch keeps it as an int2 for now, easily changed on request.

* transformIndexStatement() and DefineIndex() both take the wrong level
of locks in existing code, when called from ALTER TABLE. Not sure
whether this should be fixed in conjunction with these changes or not.
Doesn't seem to make any difference, since the "wrong level" is taken
after the initial lock is taken, so we never actually notice.

* ATRewriteTables() uses the wrong lock level in existing code when we
call validateForeignKeyConstraint(). It uses RowShareLock(), which is
not sufficient to stop DML when checking using the SELECT in
RI_Initial_Check(). Again, at present we take an AccessExclusiveLock
first, so not a problem now. Changed to ShareLock in patch.

Patch passes make check on an assert build, plus manual observation of
lock order and locks held. I've also introduced two new lock functions
LockHeldByMe() and RelationLockedByMe() to allow Assert() checking of
lock levels held. These are added in key points in ALTER TABLE code to
ensure no mistakes are made about required lock levels.

doc/src/sgml/mvcc.sgml | 17 !!
src/backend/commands/tablecmds.c | 278 ++++++!!!!!!!!!!!!!!!!!!!!!
src/backend/commands/trigger.c | 88 -!!!!!!!!!!
src/backend/parser/parse_utilcmd.c | 21 !!
src/backend/rewrite/rewriteDefine.c | 10 !
src/backend/storage/lmgr/lmgr.c | 12 +
src/backend/storage/lmgr/lock.c | 30 +++
src/backend/utils/adt/ri_triggers.c | 2
src/include/commands/tablecmds.h | 2
src/include/storage/lmgr.h | 2
src/include/storage/lock.h | 1
11 files changed, 107 insertions(+), 11 deletions(-), 345 mods(!)

Patch is overall fairly straightforward:
* Assess required lock level
* Pass down decision through APIs to allow recursing to inherited tables
* Change comments at various places (or not)
* Trigger code needs to work around no knowing how many triggers are
present, so relcache now handled similarly to Rules

Patch correctly handles lock levels in cases such a reindex of indexes
following ALTER TYPE of an indexed column. It might appear from the code
that this had been missed, but tracing code shows it is correctly set.

There is still scope for an "ALTER TABLE CONCURRENTLY" command as has
been suggested. This proposal neither blocks that nor is blocked by it.
Not every application can be changed to accept new syntax, nor is it
desirable for every application to issue these commands only in
top-level transactions.

There are probably even more tweaks that we could do than the above
list, but the above seems fairly straightforward for now. We can always
do more later.

--
Simon Riggs www.2ndQuadrant.com
PostgreSQL Training, Services and Support

Attachments:

ddl_lock_reduce.v4.patchtext/x-patch; charset=utf-8; name=ddl_lock_reduce.v4.patchDownload
Index: doc/src/sgml/mvcc.sgml
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/doc/src/sgml/mvcc.sgml,v
retrieving revision 2.69
diff -c -r2.69 mvcc.sgml
*** doc/src/sgml/mvcc.sgml	18 Feb 2007 01:21:49 -0000	2.69
--- doc/src/sgml/mvcc.sgml	6 Oct 2008 10:53:45 -0000
***************
*** 504,512 ****
      most <productname>PostgreSQL</productname> commands automatically
      acquire locks of appropriate modes to ensure that referenced
      tables are not dropped or modified in incompatible ways while the
!     command executes.  (For example, <command>ALTER TABLE</> cannot safely be
!     executed concurrently with other operations on the same table, so it
!     obtains an exclusive lock on the table to enforce that.)
     </para>
  
     <para>
--- 504,512 ----
      most <productname>PostgreSQL</productname> commands automatically
      acquire locks of appropriate modes to ensure that referenced
      tables are not dropped or modified in incompatible ways while the
!     command executes.  (For example, <command>TRUNCATE</> 
!     cannot safely be executed concurrently with other operations on the 
!     same table, so it obtains an exclusive lock on the table to enforce that.)
     </para>
  
     <para>
***************
*** 649,654 ****
--- 649,657 ----
          <para>
           Acquired by <command>CREATE INDEX</command>
           (without <option>CONCURRENTLY</option>).
+          <command>ALTER TABLE</command> will take a lock at this level if
+          all of its subcommands do not make changes that need to be
+ 		 immediately visible to SELECT statements.
          </para>
         </listitem>
        </varlistentry>
***************
*** 714,724 ****
          </para>
  
          <para>
!          Acquired by the <command>ALTER TABLE</command>, <command>DROP
!          TABLE</command>, <command>TRUNCATE</command>, <command>REINDEX</command>,
           <command>CLUSTER</command>, and <command>VACUUM FULL</command>
           commands.  This is also the default lock mode for <command>LOCK
           TABLE</command> statements that do not specify a mode explicitly.
          </para>
         </listitem>
        </varlistentry>
--- 717,731 ----
          </para>
  
          <para>
!          Acquired by the <command>DROP TABLE</command>, 
!          <command>TRUNCATE</command>, <command>REINDEX</command>,
           <command>CLUSTER</command>, and <command>VACUUM FULL</command>
           commands.  This is also the default lock mode for <command>LOCK
           TABLE</command> statements that do not specify a mode explicitly.
+          <command>ALTER TABLE</command> will take a lock at this level if
+          <command>ALTER TABLE</command> will take a lock at this level if
+          any of its subcommands make changes that need to be
+ 		 immediately visible to SELECT statements.
          </para>
         </listitem>
        </varlistentry>
Index: src/backend/commands/tablecmds.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/commands/tablecmds.c,v
retrieving revision 1.266
diff -c -r1.266 tablecmds.c
*** src/backend/commands/tablecmds.c	8 Sep 2008 00:47:40 -0000	1.266
--- src/backend/commands/tablecmds.c	6 Oct 2008 17:11:11 -0000
***************
*** 249,269 ****
  							 Relation rel, Relation pkrel, Oid constraintOid);
  static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
  						 Oid constraintOid);
! static void ATController(Relation rel, List *cmds, bool recurse);
  static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
! 		  bool recurse, bool recursing);
! static void ATRewriteCatalogs(List **wqueue);
  static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					  AlterTableCmd *cmd);
  static void ATRewriteTables(List **wqueue);
  static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
  static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
  static void ATSimplePermissions(Relation rel, bool allowView);
  static void ATSimplePermissionsRelationOrIndex(Relation rel);
  static void ATSimpleRecursion(List **wqueue, Relation rel,
! 				  AlterTableCmd *cmd, bool recurse);
  static void ATOneLevelRecursion(List **wqueue, Relation rel,
! 					AlterTableCmd *cmd);
  static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
  				AlterTableCmd *cmd);
  static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
--- 249,269 ----
  							 Relation rel, Relation pkrel, Oid constraintOid);
  static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
  						 Oid constraintOid);
! static void ATController(Relation rel, List *cmds, bool recurse, bool needFullLock);
  static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
! 		  bool recurse, bool recursing, bool needFullLock);
! static void ATRewriteCatalogs(List **wqueue, bool needFullLock);
  static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					  AlterTableCmd *cmd, bool needFullLock);
  static void ATRewriteTables(List **wqueue);
  static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
  static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
  static void ATSimplePermissions(Relation rel, bool allowView);
  static void ATSimplePermissionsRelationOrIndex(Relation rel);
  static void ATSimpleRecursion(List **wqueue, Relation rel,
! 				  AlterTableCmd *cmd, bool recurse, bool needFullLock);
  static void ATOneLevelRecursion(List **wqueue, Relation rel,
! 					AlterTableCmd *cmd, bool needFullLock);
  static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
  				AlterTableCmd *cmd);
  static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
***************
*** 284,297 ****
  				 DropBehavior behavior,
  				 bool recurse, bool recursing);
  static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
! 			   IndexStmt *stmt, bool is_rebuild);
  static void ATExecAddConstraint(List **wqueue,
  								AlteredTableInfo *tab, Relation rel,
! 								Node *newConstraint, bool recurse);
  static void ATAddCheckConstraint(List **wqueue,
  								 AlteredTableInfo *tab, Relation rel,
  								 Constraint *constr,
! 								 bool recurse, bool recursing);
  static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
  						  FkConstraint *fkconstraint);
  static void ATExecDropConstraint(Relation rel, const char *constrName,
--- 284,299 ----
  				 DropBehavior behavior,
  				 bool recurse, bool recursing);
  static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
! 			   IndexStmt *stmt, bool is_rebuild, bool needFullLock);
  static void ATExecAddConstraint(List **wqueue,
  								AlteredTableInfo *tab, Relation rel,
! 								Node *newConstraint, bool recurse, 
! 								bool needFullLock);
  static void ATAddCheckConstraint(List **wqueue,
  								 AlteredTableInfo *tab, Relation rel,
  								 Constraint *constr,
! 								 bool recurse, bool recursing, 
! 								 bool needFullLock);
  static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
  						  FkConstraint *fkconstraint);
  static void ATExecDropConstraint(Relation rel, const char *constrName,
***************
*** 2124,2131 ****
   * Disallow ALTER TABLE (and similar commands) when the current backend has
   * any open reference to the target table besides the one just acquired by
   * the calling command; this implies there's an open cursor or active plan.
!  * We need this check because our AccessExclusiveLock doesn't protect us
!  * against stomping on our own foot, only other people's feet!
   *
   * For ALTER TABLE, the only case known to cause serious trouble is ALTER
   * COLUMN TYPE, and some changes are obviously pretty benign, so this could
--- 2126,2133 ----
   * Disallow ALTER TABLE (and similar commands) when the current backend has
   * any open reference to the target table besides the one just acquired by
   * the calling command; this implies there's an open cursor or active plan.
!  * We need this check because our lock doesn't protect us against stomping
!  * on our own foot, only other people's feet!
   *
   * For ALTER TABLE, the only case known to cause serious trouble is ALTER
   * COLUMN TYPE, and some changes are obviously pretty benign, so this could
***************
*** 2202,2212 ****
   *
   * Thanks to the magic of MVCC, an error anywhere along the way rolls back
   * the whole operation; we don't have to do anything special to clean up.
   */
  void
  AlterTable(AlterTableStmt *stmt)
  {
! 	Relation	rel = relation_openrv(stmt->relation, AccessExclusiveLock);
  
  	CheckTableNotInUse(rel, "ALTER TABLE");
  
--- 2204,2231 ----
   *
   * Thanks to the magic of MVCC, an error anywhere along the way rolls back
   * the whole operation; we don't have to do anything special to clean up.
+  *
+  * We lock the table as the first action, with an appropriate lock level
+  * for the subcommands requested. Any subcommand that needs to rewrite
+  * tuples in the table forces the whole command to be executed with
+  * AccessExclusiveLock. If all subcommands do not require rewrite table
+  * then we are able to use just ShareLock. We pass the lock level down
+  * so that we can apply it recursively to inherited tables. Note that the
+  * lock level we want as we recurse may well be higher than required for 
+  * that specific subcommand. So we pass down the overall lock requirement, 
+  * rather than reassess it at lower levels.
   */
  void
  AlterTable(AlterTableStmt *stmt)
  {
! 	Relation	rel;
! 	bool		needFullLock = AlterTableLockLevel(stmt->cmds);
! 
! 	/*
! 	 * Acquire same level of lock as already acquired during parsing.
! 	 */
! 	rel = relation_openrv(stmt->relation, 
! 							needFullLock ? AccessExclusiveLock : ShareLock);
  
  	CheckTableNotInUse(rel, "ALTER TABLE");
  
***************
*** 2248,2254 ****
  			elog(ERROR, "unrecognized object type: %d", (int) stmt->relkind);
  	}
  
! 	ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt));
  }
  
  /*
--- 2267,2274 ----
  			elog(ERROR, "unrecognized object type: %d", (int) stmt->relkind);
  	}
  
! 	ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt), 
! 						needFullLock);
  }
  
  /*
***************
*** 2265,2277 ****
  void
  AlterTableInternal(Oid relid, List *cmds, bool recurse)
  {
! 	Relation	rel = relation_open(relid, AccessExclusiveLock);
  
! 	ATController(rel, cmds, recurse);
  }
  
  static void
! ATController(Relation rel, List *cmds, bool recurse)
  {
  	List	   *wqueue = NIL;
  	ListCell   *lcmd;
--- 2285,2382 ----
  void
  AlterTableInternal(Oid relid, List *cmds, bool recurse)
  {
! 	Relation	rel;
! 
! 	rel = relation_open(relid, ShareLock);
! 
! 	ATController(rel, cmds, recurse, false);
! }
! 
! /*
!  * AlterTableLockLevel
!  *
!  * Sets the overall lock level required for the supplied list of subcommands.
!  * Policy for doing this set according to needs of AlterTable(), see
!  * comments there for overall explanation. 
!  *
!  * Returns true if we need AccessExclusiveLock, false means ShareLock.
!  *
!  * Function is called before and after parsing, so it must give same
!  * answer each time it is called. Some subcommands are transformed
!  * into other subcommand types, so the transform must never be made to a
!  * lower lock level than previously assigned. All transforms are noted below.
!  */
! bool
! AlterTableLockLevel(List *cmds)
! {
! 	ListCell   *lcmd;
! 	bool		needFullLock = false;
! 
! 	foreach(lcmd, cmds)
! 	{
! 		AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
! 
! 		switch (cmd->subtype)
! 		{
! 			/*
! 			 * Need AccessExclusiveLock for these subcommands.
! 			 */
! 			case AT_AddColumn:			/* must rewrite heap */
! 			case AT_DropColumn:			/* change visible to SELECT */
! 			case AT_AlterColumnType:	/* must rewrite heap */
! 			case AT_DropConstraint:		/* as DROP INDEX */
! 			case AT_DropOids:			/* calls AT_DropColumn */
! 			case AT_EnableAlwaysRule:	/* as DROP INDEX */
! 			case AT_EnableReplicaRule:	/* as DROP INDEX */
! 			case AT_EnableRule:			/* as DROP INDEX */
! 			case AT_DisableRule:		/* as DROP INDEX */
! 			case AT_AddInherit:			/* change visible to SELECT */
! 			case AT_DropInherit:		/* change visible to SELECT */
! 			case AT_ChangeOwner:		/* change visible to SELECT */
! 			case AT_SetTableSpace:		/* must rewrite heap */
! 			case AT_DropNotNull:		/* may change some SQL plans */
! 				needFullLock = true;
! 				break;
! 
! 			/*
! 			 * ShareLock is all we need for these subcommands.
! 			 * Be very careful about moving commands onto this list. If
! 			 * you are unsure, place subcommand on the above list, not here.
! 			 */
! 			case AT_ColumnDefault:
! 			case AT_SetNotNull:
! 			case AT_SetStatistics:
! 			case AT_SetStorage:				/* only effects DML */
! 			case AT_AddIndex:				/* from ADD CONSTRAINT */
! 			case AT_AddConstraint:			/* as CREATE INDEX */
! 			case AT_ProcessedConstraint:	/* becomes AT_AddConstraint */
! 			case AT_AddConstraintRecurse:	/* becomes AT_AddConstraint */
! 			case AT_ClusterOn:
! 			case AT_DropCluster:
! 			case AT_EnableTrig:
! 			case AT_EnableAlwaysTrig:
! 			case AT_EnableReplicaTrig:
! 			case AT_EnableTrigAll:
! 			case AT_EnableTrigUser:
! 			case AT_DisableTrig:
! 			case AT_DisableTrigAll:
! 			case AT_DisableTrigUser:
! 			case AT_SetRelOptions:
! 			case AT_ResetRelOptions:
! 				break;
  
! 			default:				/* oops */
! 				elog(ERROR, "unrecognized alter table type: %d",
! 					 (int) cmd->subtype);
! 				break;
! 		}
! 	}
! 
! 	return needFullLock;
  }
  
  static void
! ATController(Relation rel, List *cmds, bool recurse, bool needFullLock)
  {
  	List	   *wqueue = NIL;
  	ListCell   *lcmd;
***************
*** 2281,2294 ****
  	{
  		AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
  
! 		ATPrepCmd(&wqueue, rel, cmd, recurse, false);
  	}
  
  	/* Close the relation, but keep lock until commit */
  	relation_close(rel, NoLock);
  
  	/* Phase 2: update system catalogs */
! 	ATRewriteCatalogs(&wqueue);
  
  	/* Phase 3: scan/rewrite tables as needed */
  	ATRewriteTables(&wqueue);
--- 2386,2399 ----
  	{
  		AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
  
! 		ATPrepCmd(&wqueue, rel, cmd, recurse, false, needFullLock);
  	}
  
  	/* Close the relation, but keep lock until commit */
  	relation_close(rel, NoLock);
  
  	/* Phase 2: update system catalogs */
! 	ATRewriteCatalogs(&wqueue, needFullLock);
  
  	/* Phase 3: scan/rewrite tables as needed */
  	ATRewriteTables(&wqueue);
***************
*** 2300,2311 ****
   * Traffic cop for ALTER TABLE Phase 1 operations, including simple
   * recursion and permission checks.
   *
!  * Caller must have acquired AccessExclusiveLock on relation already.
   * This lock should be held until commit.
   */
  static void
  ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
! 		  bool recurse, bool recursing)
  {
  	AlteredTableInfo *tab;
  	int			pass;
--- 2405,2416 ----
   * Traffic cop for ALTER TABLE Phase 1 operations, including simple
   * recursion and permission checks.
   *
!  * Caller must have acquired appropriate lock type on relation already.
   * This lock should be held until commit.
   */
  static void
  ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
! 		  bool recurse, bool recursing, bool needFullLock)
  {
  	AlteredTableInfo *tab;
  	int			pass;
***************
*** 2342,2372 ****
  			 * rules.
  			 */
  			ATSimplePermissions(rel, true);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
  			break;
  		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetNotNull:		/* ALTER COLUMN SET NOT NULL */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_SetStatistics:	/* ALTER COLUMN STATISTICS */
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* Performs own permission checks */
  			ATPrepSetStatistics(rel, cmd->name, cmd->def);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetStorage:		/* ALTER COLUMN STORAGE */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_COL_ATTRS;
  			break;
--- 2447,2477 ----
  			 * rules.
  			 */
  			ATSimplePermissions(rel, true);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse, needFullLock);
  			/* No command-specific prep needed */
  			pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
  			break;
  		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse, needFullLock);
  			/* No command-specific prep needed */
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetNotNull:		/* ALTER COLUMN SET NOT NULL */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse, needFullLock);
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_SetStatistics:	/* ALTER COLUMN STATISTICS */
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse, needFullLock);
  			/* Performs own permission checks */
  			ATPrepSetStatistics(rel, cmd->name, cmd->def);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetStorage:		/* ALTER COLUMN STORAGE */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse, needFullLock);
  			/* No command-specific prep needed */
  			pass = AT_PASS_COL_ATTRS;
  			break;
***************
*** 2428,2434 ****
  				dropCmd->subtype = AT_DropColumn;
  				dropCmd->name = pstrdup("oid");
  				dropCmd->behavior = cmd->behavior;
! 				ATPrepCmd(wqueue, rel, dropCmd, recurse, false);
  			}
  			pass = AT_PASS_DROP;
  			break;
--- 2533,2539 ----
  				dropCmd->subtype = AT_DropColumn;
  				dropCmd->name = pstrdup("oid");
  				dropCmd->behavior = cmd->behavior;
! 				ATPrepCmd(wqueue, rel, dropCmd, recurse, false, needFullLock);
  			}
  			pass = AT_PASS_DROP;
  			break;
***************
*** 2483,2489 ****
   * conflicts).
   */
  static void
! ATRewriteCatalogs(List **wqueue)
  {
  	int			pass;
  	ListCell   *ltab;
--- 2588,2594 ----
   * conflicts).
   */
  static void
! ATRewriteCatalogs(List **wqueue, bool needFullLock)
  {
  	int			pass;
  	ListCell   *ltab;
***************
*** 2508,2520 ****
  			if (subcmds == NIL)
  				continue;
  
! 			/*
! 			 * Exclusive lock was obtained by phase 1, needn't get it again
  			 */
  			rel = relation_open(tab->relid, NoLock);
  
  			foreach(lcmd, subcmds)
! 				ATExecCmd(wqueue, tab, rel, (AlterTableCmd *) lfirst(lcmd));
  
  			/*
  			 * After the ALTER TYPE pass, do cleanup work (this is not done in
--- 2613,2625 ----
  			if (subcmds == NIL)
  				continue;
  
! 			/* 
! 			 * Appropriate lock was obtained by phase 1, needn't get it again
  			 */
  			rel = relation_open(tab->relid, NoLock);
  
  			foreach(lcmd, subcmds)
! 				ATExecCmd(wqueue, tab, rel, (AlterTableCmd *) lfirst(lcmd), needFullLock);
  
  			/*
  			 * After the ALTER TYPE pass, do cleanup work (this is not done in
***************
*** 2549,2612 ****
   */
  static void
  ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 		  AlterTableCmd *cmd)
  {
  	switch (cmd->subtype)
  	{
  		case AT_AddColumn:		/* ADD COLUMN */
  			ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def);
  			break;
  		case AT_ColumnDefault:	/* ALTER COLUMN DEFAULT */
  			ATExecColumnDefault(rel, cmd->name, cmd->def);
  			break;
  		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
  			ATExecDropNotNull(rel, cmd->name);
  			break;
  		case AT_SetNotNull:		/* ALTER COLUMN SET NOT NULL */
  			ATExecSetNotNull(tab, rel, cmd->name);
  			break;
  		case AT_SetStatistics:	/* ALTER COLUMN STATISTICS */
  			ATExecSetStatistics(rel, cmd->name, cmd->def);
  			break;
  		case AT_SetStorage:		/* ALTER COLUMN STORAGE */
  			ATExecSetStorage(rel, cmd->name, cmd->def);
  			break;
  		case AT_DropColumn:		/* DROP COLUMN */
  			ATExecDropColumn(rel, cmd->name, cmd->behavior, false, false);
  			break;
  		case AT_DropColumnRecurse:		/* DROP COLUMN with recursion */
  			ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
  			break;
  		case AT_AddIndex:		/* ADD INDEX */
! 			ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false);
  			break;
  		case AT_ReAddIndex:		/* ADD INDEX */
! 			ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true);
  			break;
  		case AT_AddConstraint:	/* ADD CONSTRAINT */
! 			ATExecAddConstraint(wqueue, tab, rel, cmd->def, false);
  			break;
  		case AT_AddConstraintRecurse:	/* ADD CONSTRAINT with recursion */
! 			ATExecAddConstraint(wqueue, tab, rel, cmd->def, true);
  			break;
  		case AT_DropConstraint:	/* DROP CONSTRAINT */
  			ATExecDropConstraint(rel, cmd->name, cmd->behavior, false, false);
  			break;
  		case AT_DropConstraintRecurse:	/* DROP CONSTRAINT with recursion */
  			ATExecDropConstraint(rel, cmd->name, cmd->behavior, true, false);
  			break;
  		case AT_AlterColumnType:		/* ALTER COLUMN TYPE */
  			ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def);
  			break;
  		case AT_ChangeOwner:	/* ALTER OWNER */
  			ATExecChangeOwner(RelationGetRelid(rel),
  							  get_roleid_checked(cmd->name),
  							  false);
  			break;
  		case AT_ClusterOn:		/* CLUSTER ON */
  			ATExecClusterOn(rel, cmd->name);
  			break;
  		case AT_DropCluster:	/* SET WITHOUT CLUSTER */
  			ATExecDropCluster(rel);
  			break;
  		case AT_DropOids:		/* SET WITHOUT OIDS */
--- 2654,2735 ----
   */
  static void
  ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 		  AlterTableCmd *cmd, bool needFullLock)
  {
  	switch (cmd->subtype)
  	{
  		case AT_AddColumn:		/* ADD COLUMN */
+ 			Assert(RelationLockedByMe(rel, AccessExclusiveLock));
  			ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def);
  			break;
  		case AT_ColumnDefault:	/* ALTER COLUMN DEFAULT */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecColumnDefault(rel, cmd->name, cmd->def);
  			break;
  		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecDropNotNull(rel, cmd->name);
  			break;
  		case AT_SetNotNull:		/* ALTER COLUMN SET NOT NULL */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecSetNotNull(tab, rel, cmd->name);
  			break;
  		case AT_SetStatistics:	/* ALTER COLUMN STATISTICS */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecSetStatistics(rel, cmd->name, cmd->def);
  			break;
  		case AT_SetStorage:		/* ALTER COLUMN STORAGE */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecSetStorage(rel, cmd->name, cmd->def);
  			break;
  		case AT_DropColumn:		/* DROP COLUMN */
+ 			Assert(RelationLockedByMe(rel, AccessExclusiveLock));
  			ATExecDropColumn(rel, cmd->name, cmd->behavior, false, false);
  			break;
  		case AT_DropColumnRecurse:		/* DROP COLUMN with recursion */
+ 			Assert(RelationLockedByMe(rel, AccessExclusiveLock));
  			ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
  			break;
  		case AT_AddIndex:		/* ADD INDEX */
! 			Assert(RelationLockedByMe(rel, ShareLock));
! 			ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false, needFullLock);
  			break;
  		case AT_ReAddIndex:		/* ADD INDEX */
! 			Assert(RelationLockedByMe(rel, AccessExclusiveLock));
! 			ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true, needFullLock);
  			break;
  		case AT_AddConstraint:	/* ADD CONSTRAINT */
! 			Assert(RelationLockedByMe(rel, ShareLock));
! 			ATExecAddConstraint(wqueue, tab, rel, cmd->def, false, needFullLock);
  			break;
  		case AT_AddConstraintRecurse:	/* ADD CONSTRAINT with recursion */
! 			Assert(RelationLockedByMe(rel, ShareLock));
! 			ATExecAddConstraint(wqueue, tab, rel, cmd->def, true, needFullLock);
  			break;
  		case AT_DropConstraint:	/* DROP CONSTRAINT */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecDropConstraint(rel, cmd->name, cmd->behavior, false, false);
  			break;
  		case AT_DropConstraintRecurse:	/* DROP CONSTRAINT with recursion */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecDropConstraint(rel, cmd->name, cmd->behavior, true, false);
  			break;
  		case AT_AlterColumnType:		/* ALTER COLUMN TYPE */
+ 			Assert(RelationLockedByMe(rel, AccessExclusiveLock));
  			ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def);
  			break;
  		case AT_ChangeOwner:	/* ALTER OWNER */
+ 			Assert(RelationLockedByMe(rel, AccessExclusiveLock));
  			ATExecChangeOwner(RelationGetRelid(rel),
  							  get_roleid_checked(cmd->name),
  							  false);
  			break;
  		case AT_ClusterOn:		/* CLUSTER ON */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecClusterOn(rel, cmd->name);
  			break;
  		case AT_DropCluster:	/* SET WITHOUT CLUSTER */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecDropCluster(rel);
  			break;
  		case AT_DropOids:		/* SET WITHOUT OIDS */
***************
*** 2623,2688 ****
--- 2746,2827 ----
  			 */
  			break;
  		case AT_SetRelOptions:	/* SET (...) */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecSetRelOptions(rel, (List *) cmd->def, false);
  			break;
  		case AT_ResetRelOptions:		/* RESET (...) */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecSetRelOptions(rel, (List *) cmd->def, true);
  			break;
  
  		case AT_EnableTrig:		/* ENABLE TRIGGER name */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecEnableDisableTrigger(rel, cmd->name,
  									   TRIGGER_FIRES_ON_ORIGIN, false);
  			break;
  		case AT_EnableAlwaysTrig:		/* ENABLE ALWAYS TRIGGER name */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecEnableDisableTrigger(rel, cmd->name,
  									   TRIGGER_FIRES_ALWAYS, false);
  			break;
  		case AT_EnableReplicaTrig:		/* ENABLE REPLICA TRIGGER name */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecEnableDisableTrigger(rel, cmd->name,
  									   TRIGGER_FIRES_ON_REPLICA, false);
  			break;
  		case AT_DisableTrig:	/* DISABLE TRIGGER name */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecEnableDisableTrigger(rel, cmd->name,
  									   TRIGGER_DISABLED, false);
  			break;
  		case AT_EnableTrigAll:	/* ENABLE TRIGGER ALL */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecEnableDisableTrigger(rel, NULL,
  									   TRIGGER_FIRES_ON_ORIGIN, false);
  			break;
  		case AT_DisableTrigAll:	/* DISABLE TRIGGER ALL */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecEnableDisableTrigger(rel, NULL,
  									   TRIGGER_DISABLED, false);
  			break;
  		case AT_EnableTrigUser:	/* ENABLE TRIGGER USER */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecEnableDisableTrigger(rel, NULL,
  									   TRIGGER_FIRES_ON_ORIGIN, true);
  			break;
  		case AT_DisableTrigUser:		/* DISABLE TRIGGER USER */
+ 			Assert(RelationLockedByMe(rel, ShareLock));
  			ATExecEnableDisableTrigger(rel, NULL,
  									   TRIGGER_DISABLED, true);
  			break;
  
  		case AT_EnableRule:		/* ENABLE RULE name */
+ 			Assert(RelationLockedByMe(rel, AccessExclusiveLock));
  			ATExecEnableDisableRule(rel, cmd->name,
  									RULE_FIRES_ON_ORIGIN);
  			break;
  		case AT_EnableAlwaysRule:		/* ENABLE ALWAYS RULE name */
+ 			Assert(RelationLockedByMe(rel, AccessExclusiveLock));
  			ATExecEnableDisableRule(rel, cmd->name,
  									RULE_FIRES_ALWAYS);
  			break;
  		case AT_EnableReplicaRule:		/* ENABLE REPLICA RULE name */
+ 			Assert(RelationLockedByMe(rel, AccessExclusiveLock));
  			ATExecEnableDisableRule(rel, cmd->name,
  									RULE_FIRES_ON_REPLICA);
  			break;
  		case AT_DisableRule:	/* DISABLE RULE name */
+ 			Assert(RelationLockedByMe(rel, AccessExclusiveLock));
  			ATExecEnableDisableRule(rel, cmd->name,
  									RULE_DISABLED);
  			break;
  
  		case AT_AddInherit:
+ 			Assert(RelationLockedByMe(rel, AccessExclusiveLock));
  			ATExecAddInherit(rel, (RangeVar *) cmd->def);
  			break;
  		case AT_DropInherit:
+ 			Assert(RelationLockedByMe(rel, AccessExclusiveLock));
  			ATExecDropInherit(rel, (RangeVar *) cmd->def);
  			break;
  		default:				/* oops */
***************
*** 2725,2730 ****
--- 2864,2870 ----
  			ObjectAddress object;
  
  			OldHeap = heap_open(tab->relid, NoLock);
+ 			Assert(RelationLockedByMe(OldHeap, AccessExclusiveLock));
  
  			/*
  			 * We can never allow rewriting of shared or nailed-in-cache
***************
*** 2850,2858 ****
  				{
  					/* Long since locked, no need for another */
  					rel = heap_open(tab->relid, NoLock);
  				}
  
! 				refrel = heap_open(con->refrelid, RowShareLock);
  
  				validateForeignKeyConstraint(fkconstraint, rel, refrel,
  											 con->conid);
--- 2990,3005 ----
  				{
  					/* Long since locked, no need for another */
  					rel = heap_open(tab->relid, NoLock);
+ 					Assert(RelationLockedByMe(rel, ShareLock));
  				}
  
! 				/*
! 				 * We need to prevent write access to table while
! 				 * we make the check. RowShareLock is needed if we
! 				 * use triggers, but if we want to avoid using
! 				 * row-level locks if we can.
! 				 */
! 				refrel = heap_open(con->refrelid, ShareLock);
  
  				validateForeignKeyConstraint(fkconstraint, rel, refrel,
  											 con->conid);
***************
*** 2893,2901 ****
--- 3040,3054 ----
  	newTupDesc = RelationGetDescr(oldrel);		/* includes all mods */
  
  	if (OidIsValid(OIDNewHeap))
+ 	{
  		newrel = heap_open(OIDNewHeap, AccessExclusiveLock);
+ 		Assert(RelationLockedByMe(oldrel, AccessExclusiveLock));
+ 	}
  	else
+ 	{
  		newrel = NULL;
+ 		Assert(RelationLockedByMe(oldrel, ShareLock));
+ 	}
  
  	/*
  	 * If we need to rewrite the table, the operation has to be propagated to
***************
*** 3230,3236 ****
   */
  static void
  ATSimpleRecursion(List **wqueue, Relation rel,
! 				  AlterTableCmd *cmd, bool recurse)
  {
  	/*
  	 * Propagate to children if desired.  Non-table relations never have
--- 3383,3389 ----
   */
  static void
  ATSimpleRecursion(List **wqueue, Relation rel,
! 				  AlterTableCmd *cmd, bool recurse, bool needFullLock)
  {
  	/*
  	 * Propagate to children if desired.  Non-table relations never have
***************
*** 3257,3265 ****
  
  			if (childrelid == relid)
  				continue;
! 			childrel = relation_open(childrelid, AccessExclusiveLock);
  			CheckTableNotInUse(childrel, "ALTER TABLE");
! 			ATPrepCmd(wqueue, childrel, cmd, false, true);
  			relation_close(childrel, NoLock);
  		}
  	}
--- 3410,3421 ----
  
  			if (childrelid == relid)
  				continue;
! 
! 			childrel = relation_open(childrelid, 
! 						needFullLock ? AccessExclusiveLock : ShareLock);
! 
  			CheckTableNotInUse(childrel, "ALTER TABLE");
! 			ATPrepCmd(wqueue, childrel, cmd, false, true, needFullLock);
  			relation_close(childrel, NoLock);
  		}
  	}
***************
*** 3275,3281 ****
   */
  static void
  ATOneLevelRecursion(List **wqueue, Relation rel,
! 					AlterTableCmd *cmd)
  {
  	Oid			relid = RelationGetRelid(rel);
  	ListCell   *child;
--- 3431,3437 ----
   */
  static void
  ATOneLevelRecursion(List **wqueue, Relation rel,
! 					AlterTableCmd *cmd, bool needFullLock)
  {
  	Oid			relid = RelationGetRelid(rel);
  	ListCell   *child;
***************
*** 3289,3297 ****
  		Oid			childrelid = lfirst_oid(child);
  		Relation	childrel;
  
! 		childrel = relation_open(childrelid, AccessExclusiveLock);
  		CheckTableNotInUse(childrel, "ALTER TABLE");
! 		ATPrepCmd(wqueue, childrel, cmd, true, true);
  		relation_close(childrel, NoLock);
  	}
  }
--- 3445,3455 ----
  		Oid			childrelid = lfirst_oid(child);
  		Relation	childrel;
  
! 		childrel = relation_open(childrelid, 
! 						needFullLock ? AccessExclusiveLock : ShareLock);
! 
  		CheckTableNotInUse(childrel, "ALTER TABLE");
! 		ATPrepCmd(wqueue, childrel, cmd, true, true, needFullLock);
  		relation_close(childrel, NoLock);
  	}
  }
***************
*** 3427,3433 ****
  		colDefChild->inhcount = 1;
  		colDefChild->is_local = false;
  
! 		ATOneLevelRecursion(wqueue, rel, childCmd);
  	}
  	else
  	{
--- 3585,3591 ----
  		colDefChild->inhcount = 1;
  		colDefChild->is_local = false;
  
! 		ATOneLevelRecursion(wqueue, rel, childCmd, true);
  	}
  	else
  	{
***************
*** 4235,4241 ****
   */
  static void
  ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
! 			   IndexStmt *stmt, bool is_rebuild)
  {
  	bool		check_rights;
  	bool		skip_build;
--- 4393,4399 ----
   */
  static void
  ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
! 			   IndexStmt *stmt, bool is_rebuild, bool needFullLock)
  {
  	bool		check_rights;
  	bool		skip_build;
***************
*** 4275,4281 ****
   */
  static void
  ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					Node *newConstraint, bool recurse)
  {
  	switch (nodeTag(newConstraint))
  	{
--- 4433,4439 ----
   */
  static void
  ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					Node *newConstraint, bool recurse, bool needFullLock)
  {
  	switch (nodeTag(newConstraint))
  	{
***************
*** 4293,4299 ****
  				{
  					case CONSTR_CHECK:
  						ATAddCheckConstraint(wqueue, tab, rel,
! 											 constr, recurse, false);
  						break;
  					default:
  						elog(ERROR, "unrecognized constraint type: %d",
--- 4451,4457 ----
  				{
  					case CONSTR_CHECK:
  						ATAddCheckConstraint(wqueue, tab, rel,
! 											 constr, recurse, false, needFullLock);
  						break;
  					default:
  						elog(ERROR, "unrecognized constraint type: %d",
***************
*** 4356,4362 ****
   */
  static void
  ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					 Constraint *constr, bool recurse, bool recursing)
  {
  	List	   *newcons;
  	ListCell   *lcon;
--- 4514,4521 ----
   */
  static void
  ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					 Constraint *constr, bool recurse, bool recursing,
! 					 bool needFullLock)
  {
  	List	   *newcons;
  	ListCell   *lcon;
***************
*** 4428,4434 ****
  		Relation	childrel;
  		AlteredTableInfo *childtab;
  
! 		childrel = heap_open(childrelid, AccessExclusiveLock);
  		CheckTableNotInUse(childrel, "ALTER TABLE");
  
  		/* Find or create work queue entry for this table */
--- 4587,4595 ----
  		Relation	childrel;
  		AlteredTableInfo *childtab;
  
! 		childrel = heap_open(childrelid, 
! 						needFullLock ? AccessExclusiveLock : ShareLock);
! 
  		CheckTableNotInUse(childrel, "ALTER TABLE");
  
  		/* Find or create work queue entry for this table */
***************
*** 4436,4442 ****
  
  		/* Recurse to child */
  		ATAddCheckConstraint(wqueue, childtab, childrel,
! 							 constr, recurse, true);
  
  		heap_close(childrel, NoLock);
  	}
--- 4597,4603 ----
  
  		/* Recurse to child */
  		ATAddCheckConstraint(wqueue, childtab, childrel,
! 							 constr, recurse, true, needFullLock);
  
  		heap_close(childrel, NoLock);
  	}
***************
*** 4470,4482 ****
  	Oid			constrOid;
  
  	/*
! 	 * Grab an exclusive lock on the pk table, so that someone doesn't delete
! 	 * rows out from under us. (Although a lesser lock would do for that
! 	 * purpose, we'll need exclusive lock anyway to add triggers to the pk
! 	 * table; trying to start with a lesser lock will just create a risk of
! 	 * deadlock.)
  	 */
! 	pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
  
  	/*
  	 * Validity and permissions checks
--- 4631,4642 ----
  	Oid			constrOid;
  
  	/*
! 	 * Grab a ShareLock lock on the pk table, so that someone doesn't delete
! 	 * rows out from under us. We can't take a lower level of lock, since
! 	 * we need ShareLocks to add triggers for the constraint. (Remember: we
! 	 * add triggers to *both* tables when we add a Foreign Key).
  	 */
! 	pkrel = heap_openrv(fkconstraint->pktable, ShareLock);
  
  	/*
  	 * Validity and permissions checks
***************
*** 4993,4999 ****
   * Scan the existing rows in a table to verify they meet a proposed FK
   * constraint.
   *
!  * Caller must have opened and locked both relations.
   */
  static void
  validateForeignKeyConstraint(FkConstraint *fkconstraint,
--- 5153,5159 ----
   * Scan the existing rows in a table to verify they meet a proposed FK
   * constraint.
   *
!  * Caller must have opened and locked both relations at ShareLock or higher.
   */
  static void
  validateForeignKeyConstraint(FkConstraint *fkconstraint,
***************
*** 5541,5547 ****
  	 * alter would put them out of step.
  	 */
  	if (recurse)
! 		ATSimpleRecursion(wqueue, rel, cmd, recurse);
  	else if (!recursing &&
  			 find_inheritance_children(RelationGetRelid(rel)) != NIL)
  		ereport(ERROR,
--- 5701,5707 ----
  	 * alter would put them out of step.
  	 */
  	if (recurse)
! 		ATSimpleRecursion(wqueue, rel, cmd, recurse, true);
  	else if (!recursing &&
  			 find_inheritance_children(RelationGetRelid(rel)) != NIL)
  		ereport(ERROR,
***************
*** 5938,5943 ****
--- 6098,6107 ----
  	 */
  }
  
+ /*
+  * Called to add additional work items if ALTER TYPE touched indexed
+  * columns. Hence we always take AccessExclusiveLocks here.
+  */
  static void
  ATPostAlterTypeParse(char *cmd, List **wqueue)
  {
Index: src/backend/commands/trigger.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/commands/trigger.c,v
retrieving revision 1.237
diff -c -r1.237 trigger.c
*** src/backend/commands/trigger.c	1 Sep 2008 20:42:44 -0000	1.237
--- src/backend/commands/trigger.c	3 Oct 2008 10:48:16 -0000
***************
*** 95,101 ****
  	Oid			funcoid;
  	Oid			funcrettype;
  	Oid			trigoid;
- 	int			found = 0;
  	int			i;
  	char		constrtrigname[NAMEDATALEN];
  	char	   *trigname;
--- 95,100 ----
***************
*** 104,110 ****
  	ObjectAddress myself,
  				referenced;
  
! 	rel = heap_openrv(stmt->relation, AccessExclusiveLock);
  
  	if (rel->rd_rel->relkind != RELKIND_RELATION)
  		ereport(ERROR,
--- 103,114 ----
  	ObjectAddress myself,
  				referenced;
  
! 	/*
! 	 * ShareLock is sufficient to prevent concurrent write activity
! 	 * to relation. If we had SELECT Triggers we would need to take
! 	 * an AccessExclusiveLock, just as we do with ON SELECT Rules.
! 	 */
! 	rel = heap_openrv(stmt->relation, ShareLock);
  
  	if (rel->rd_rel->relkind != RELKIND_RELATION)
  		ereport(ERROR,
***************
*** 280,292 ****
  	}
  
  	/*
! 	 * Scan pg_trigger for existing triggers on relation.  We do this mainly
! 	 * because we must count them; a secondary benefit is to give a nice error
! 	 * message if there's already a trigger of the same name. (The unique
! 	 * index on tgrelid/tgname would complain anyway.)
  	 *
! 	 * NOTE that this is cool only because we have AccessExclusiveLock on the
! 	 * relation, so the trigger set won't be changing underneath us.
  	 */
  	ScanKeyInit(&key,
  				Anum_pg_trigger_tgrelid,
--- 284,296 ----
  	}
  
  	/*
! 	 * Scan pg_trigger for existing triggers on relation.  We do this 
! 	 * because we want to give a nice error message if there's already a 
! 	 * trigger of the same name. (The unique index on tgrelid/tgname would 
! 	 * complain anyway.)
  	 *
! 	 * We don't have AccessExclusiveLock, so its possible for us to succeed
! 	 * here then later fail when we insert the trigger. We tried.
  	 */
  	ScanKeyInit(&key,
  				Anum_pg_trigger_tgrelid,
***************
*** 303,309 ****
  					(errcode(ERRCODE_DUPLICATE_OBJECT),
  				  errmsg("trigger \"%s\" for relation \"%s\" already exists",
  						 trigname, stmt->relation->relname)));
- 		found++;
  	}
  	systable_endscan(tgscan);
  
--- 307,312 ----
***************
*** 405,411 ****
  		elog(ERROR, "cache lookup failed for relation %u",
  			 RelationGetRelid(rel));
  
! 	((Form_pg_class) GETSTRUCT(tuple))->reltriggers = found + 1;
  
  	simple_heap_update(pgrel, &tuple->t_self, tuple);
  
--- 408,414 ----
  		elog(ERROR, "cache lookup failed for relation %u",
  			 RelationGetRelid(rel));
  
! 	((Form_pg_class) GETSTRUCT(tuple))->reltriggers = 1;
  
  	simple_heap_update(pgrel, &tuple->t_self, tuple);
  
***************
*** 870,878 ****
  	 * Update relation's pg_class entry.  Crucial side-effect: other backends
  	 * (and this one too!) are sent SI message to make them rebuild relcache
  	 * entries.
- 	 *
- 	 * Note this is OK only because we have AccessExclusiveLock on the rel, so
- 	 * no one else is creating/deleting triggers on this rel at the same time.
  	 */
  	pgrel = heap_open(RelationRelationId, RowExclusiveLock);
  	tuple = SearchSysCacheCopy(RELOID,
--- 873,878 ----
***************
*** 882,892 ****
  		elog(ERROR, "cache lookup failed for relation %u", relid);
  	classForm = (Form_pg_class) GETSTRUCT(tuple);
  
- 	if (classForm->reltriggers == 0)	/* should not happen */
- 		elog(ERROR, "relation \"%s\" has reltriggers = 0",
- 			 RelationGetRelationName(rel));
- 	classForm->reltriggers--;
- 
  	simple_heap_update(pgrel, &tuple->t_self, tuple);
  
  	CatalogUpdateIndexes(pgrel, tuple);
--- 882,887 ----
***************
*** 1134,1151 ****
  RelationBuildTriggers(Relation relation)
  {
  	TriggerDesc *trigdesc;
! 	int			ntrigs = relation->rd_rel->reltriggers;
! 	Trigger    *triggers;
! 	int			found = 0;
  	Relation	tgrel;
  	ScanKeyData skey;
  	SysScanDesc tgscan;
  	HeapTuple	htup;
  	MemoryContext oldContext;
  
! 	Assert(ntrigs > 0);			/* else I should not have been called */
! 
! 	triggers = (Trigger *) palloc(ntrigs * sizeof(Trigger));
  
  	/*
  	 * Note: since we scan the triggers using TriggerRelidNameIndexId, we will
--- 1129,1152 ----
  RelationBuildTriggers(Relation relation)
  {
  	TriggerDesc *trigdesc;
! 	int			numtrigs;
! 	int			maxtrigs;
! 	Trigger    **triggers;
! 	Trigger		*trigarray;
! 	int			i;
  	Relation	tgrel;
  	ScanKeyData skey;
  	SysScanDesc tgscan;
  	HeapTuple	htup;
  	MemoryContext oldContext;
  
! 	/*
! 	 * allocate an array to hold the triggers (the array is extended if
! 	 * necessary)
! 	 */
! 	maxtrigs = 4;
! 	triggers = (Trigger **) palloc(sizeof(Trigger *) * maxtrigs);
! 	numtrigs = 0;
  
  	/*
  	 * Note: since we scan the triggers using TriggerRelidNameIndexId, we will
***************
*** 1167,1176 ****
  		Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(htup);
  		Trigger    *build;
  
! 		if (found >= ntrigs)
! 			elog(ERROR, "too many trigger records found for relation \"%s\"",
! 				 RelationGetRelationName(relation));
! 		build = &(triggers[found]);
  
  		build->tgoid = HeapTupleGetOid(htup);
  		build->tgname = DatumGetCString(DirectFunctionCall1(nameout,
--- 1168,1174 ----
  		Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(htup);
  		Trigger    *build;
  
! 		build = (Trigger *) palloc(sizeof(Trigger));
  
  		build->tgoid = HeapTupleGetOid(htup);
  		build->tgname = DatumGetCString(DirectFunctionCall1(nameout,
***************
*** 1199,1205 ****
  			bytea	   *val;
  			bool		isnull;
  			char	   *p;
- 			int			i;
  
  			val = DatumGetByteaP(fastgetattr(htup,
  											 Anum_pg_trigger_tgargs,
--- 1197,1202 ----
***************
*** 1218,1240 ****
  		else
  			build->tgargs = NULL;
  
! 		found++;
  	}
  
  	systable_endscan(tgscan);
  	heap_close(tgrel, AccessShareLock);
  
! 	if (found != ntrigs)
! 		elog(ERROR, "%d trigger record(s) not found for relation \"%s\"",
! 			 ntrigs - found,
! 			 RelationGetRelationName(relation));
  
  	/* Build trigdesc */
  	trigdesc = (TriggerDesc *) palloc0(sizeof(TriggerDesc));
! 	trigdesc->triggers = triggers;
! 	trigdesc->numtriggers = ntrigs;
! 	for (found = 0; found < ntrigs; found++)
! 		InsertTrigger(trigdesc, &(triggers[found]), found);
  
  	/* Copy completed trigdesc into cache storage */
  	oldContext = MemoryContextSwitchTo(CacheMemoryContext);
--- 1215,1252 ----
  		else
  			build->tgargs = NULL;
  
! 		if (numtrigs >= maxtrigs)
! 		{
! 			maxtrigs *= 2;
! 			triggers = (Trigger **) repalloc(triggers, sizeof(Trigger *) * maxtrigs);
! 		}
! 		triggers[numtrigs++] = build;
  	}
  
  	systable_endscan(tgscan);
  	heap_close(tgrel, AccessShareLock);
  
! 	if (numtrigs == 0)
! 	{
! 		pfree(triggers);
! 		return;
! 	}
  
  	/* Build trigdesc */
  	trigdesc = (TriggerDesc *) palloc0(sizeof(TriggerDesc));
!  	trigarray = (Trigger *) palloc(numtrigs * sizeof(Trigger));
! 
! 	for (i = 0; i < numtrigs; i++)
! 	{
! 		memcpy(&(trigarray[i]), triggers[i], sizeof(Trigger));
! 		pfree(triggers[i]);
! 	}
! 	pfree(triggers);
! 
!  	trigdesc->triggers = trigarray;
! 	trigdesc->numtriggers = numtrigs;
! 	for (i = 0; i < numtrigs; i++)
! 		InsertTrigger(trigdesc, &(trigarray[i]), i);
  
  	/* Copy completed trigdesc into cache storage */
  	oldContext = MemoryContextSwitchTo(CacheMemoryContext);
Index: src/backend/parser/parse_utilcmd.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/parser/parse_utilcmd.c,v
retrieving revision 2.17
diff -c -r2.17 parse_utilcmd.c
*** src/backend/parser/parse_utilcmd.c	1 Sep 2008 20:42:45 -0000	2.17
--- src/backend/parser/parse_utilcmd.c	6 Oct 2008 12:18:06 -0000
***************
*** 1292,1298 ****
  }
  
  /*
!  * transformIndexStmt - parse analysis for CREATE INDEX
   *
   * Note: this is a no-op for an index not using either index expressions or
   * a predicate expression.	There are several code paths that create indexes
--- 1292,1298 ----
  }
  
  /*
!  * transformIndexStmt - parse analysis for CREATE INDEX and ALTER TABLE
   *
   * Note: this is a no-op for an index not using either index expressions or
   * a predicate expression.	There are several code paths that create indexes
***************
*** 1318,1324 ****
  	 * because addRangeTableEntry() would acquire only AccessShareLock,
  	 * leaving DefineIndex() needing to do a lock upgrade with consequent risk
  	 * of deadlock.  Make sure this stays in sync with the type of lock
! 	 * DefineIndex() wants.
  	 */
  	rel = heap_openrv(stmt->relation,
  				  (stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock));
--- 1318,1325 ----
  	 * because addRangeTableEntry() would acquire only AccessShareLock,
  	 * leaving DefineIndex() needing to do a lock upgrade with consequent risk
  	 * of deadlock.  Make sure this stays in sync with the type of lock
! 	 * DefineIndex() wants. If we are being called by ALTER TABLE, we may
! 	 * already hold a higher lock...
  	 */
  	rel = heap_openrv(stmt->relation,
  				  (stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock));
***************
*** 1674,1679 ****
--- 1675,1681 ----
  	List	   *newcmds = NIL;
  	bool		skipValidation = true;
  	AlterTableCmd *newcmd;
+ 	bool		needFullLock;
  
  	/*
  	 * We must not scribble on the passed-in AlterTableStmt, so copy it. (This
***************
*** 1682,1694 ****
  	stmt = (AlterTableStmt *) copyObject(stmt);
  
  	/*
! 	 * Acquire exclusive lock on the target relation, which will be held until
  	 * end of transaction.	This ensures any decisions we make here based on
  	 * the state of the relation will still be good at execution. We must get
! 	 * exclusive lock now because execution will; taking a lower grade lock
! 	 * now and trying to upgrade later risks deadlock.
  	 */
! 	rel = relation_openrv(stmt->relation, AccessExclusiveLock);
  
  	/* Set up pstate */
  	pstate = make_parsestate(NULL);
--- 1684,1703 ----
  	stmt = (AlterTableStmt *) copyObject(stmt);
  
  	/*
! 	 * Assign the appropriate lock level for this list of subcommands.
! 	 */
! 	needFullLock = AlterTableLockLevel(stmt->cmds);
! 
! 	/*
! 	 * Acquire appropriate lock on the target relation, which will be held until
  	 * end of transaction.	This ensures any decisions we make here based on
  	 * the state of the relation will still be good at execution. We must get
! 	 * lock now because execution will; taking a lower grade lock now and 
! 	 * trying to upgrade later risks deadlock.  Any new commands we add
! 	 * after this must not upgrade the lock level requested here.
  	 */
! 	rel = relation_openrv(stmt->relation, 
! 							needFullLock ? AccessExclusiveLock : ShareLock);
  
  	/* Set up pstate */
  	pstate = make_parsestate(NULL);
Index: src/backend/rewrite/rewriteDefine.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/rewrite/rewriteDefine.c,v
retrieving revision 1.129
diff -c -r1.129 rewriteDefine.c
*** src/backend/rewrite/rewriteDefine.c	25 Aug 2008 22:42:34 -0000	1.129
--- src/backend/rewrite/rewriteDefine.c	2 Oct 2008 18:44:32 -0000
***************
*** 236,246 ****
  	/*
  	 * If we are installing an ON SELECT rule, we had better grab
  	 * AccessExclusiveLock to ensure no SELECTs are currently running on the
! 	 * event relation.	For other types of rules, it might be sufficient to
! 	 * grab ShareLock to lock out insert/update/delete actions.  But for now,
! 	 * let's just grab AccessExclusiveLock all the time.
  	 */
! 	event_relation = heap_open(event_relid, AccessExclusiveLock);
  
  	/*
  	 * Check user has permission to apply rules to this relation.
--- 236,248 ----
  	/*
  	 * If we are installing an ON SELECT rule, we had better grab
  	 * AccessExclusiveLock to ensure no SELECTs are currently running on the
! 	 * event relation.	For other types of rules, it is sufficient to
! 	 * grab ShareLock to lock out insert/update/delete actions.
  	 */
! 	if (event_type == CMD_SELECT)
! 		event_relation = heap_open(event_relid, AccessExclusiveLock);
! 	else
! 		event_relation = heap_open(event_relid, ShareLock);
  
  	/*
  	 * Check user has permission to apply rules to this relation.
Index: src/backend/storage/lmgr/lmgr.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/storage/lmgr/lmgr.c,v
retrieving revision 1.97
diff -c -r1.97 lmgr.c
*** src/backend/storage/lmgr/lmgr.c	4 Mar 2008 19:54:06 -0000	1.97
--- src/backend/storage/lmgr/lmgr.c	3 Oct 2008 12:44:54 -0000
***************
*** 181,186 ****
--- 181,198 ----
  		AcceptInvalidationMessages();
  }
  
+ bool
+ RelationLockedByMe(Relation relation, LOCKMODE lockmode)
+ {
+ 	LOCKTAG		tag;
+ 
+ 	SET_LOCKTAG_RELATION(tag,
+ 						 relation->rd_lockInfo.lockRelId.dbId,
+ 						 relation->rd_lockInfo.lockRelId.relId);
+ 
+ 	return LockHeldByMe(&tag, lockmode);
+ }
+ 
  /*
   *		ConditionalLockRelation
   *
Index: src/backend/storage/lmgr/lock.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/storage/lmgr/lock.c,v
retrieving revision 1.184
diff -c -r1.184 lock.c
*** src/backend/storage/lmgr/lock.c	1 Aug 2008 13:16:09 -0000	1.184
--- src/backend/storage/lmgr/lock.c	3 Oct 2008 12:34:09 -0000
***************
*** 438,443 ****
--- 438,473 ----
  	return lockhash;
  }
  
+ bool
+ LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode)
+ {
+ 	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
+ 	LockMethod	lockMethodTable;
+ 	LOCALLOCKTAG localtag;
+ 	LOCALLOCK  *locallock;
+ 	bool		found = false;
+ 	LOCKMODE	lockmode_search = lockmode;
+ 
+ 	Assert(!(lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)));
+ 	lockMethodTable = LockMethods[lockmethodid];
+ 	Assert(!(lockmode <= 0 || lockmode > lockMethodTable->numLockModes));
+ 
+ 	/*
+ 	 * Find or create a LOCALLOCK entry for this lock and lockmode
+ 	 */
+ 	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
+ 	localtag.lock = *locktag;
+ 
+ 	while (!found && lockmode_search <= lockMethodTable->numLockModes)
+ 	{
+ 		localtag.mode = lockmode_search++;
+ 		locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
+ 											  (void *) &localtag,
+ 											  HASH_FIND, &found);
+ 	}
+ 
+ 	return found;
+ }
  
  /*
   * LockAcquire -- Check for lock conflicts, sleep if conflict found,
Index: src/backend/utils/adt/ri_triggers.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/utils/adt/ri_triggers.c,v
retrieving revision 1.110
diff -c -r1.110 ri_triggers.c
*** src/backend/utils/adt/ri_triggers.c	15 Sep 2008 23:37:39 -0000	1.110
--- src/backend/utils/adt/ri_triggers.c	2 Oct 2008 18:44:32 -0000
***************
*** 2601,2607 ****
   *	This is not a trigger procedure, but is called during ALTER TABLE
   *	ADD FOREIGN KEY to validate the initial table contents.
   *
!  *	We expect that an exclusive lock has been taken on rel and pkrel;
   *	hence, we do not need to lock individual rows for the check.
   *
   *	If the check fails because the current user doesn't have permissions
--- 2601,2607 ----
   *	This is not a trigger procedure, but is called during ALTER TABLE
   *	ADD FOREIGN KEY to validate the initial table contents.
   *
!  *	We expect that a ShareLock or higher has been taken on rel and pkrel;
   *	hence, we do not need to lock individual rows for the check.
   *
   *	If the check fails because the current user doesn't have permissions
Index: src/include/commands/tablecmds.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/commands/tablecmds.h,v
retrieving revision 1.41
diff -c -r1.41 tablecmds.h
*** src/include/commands/tablecmds.h	19 Jun 2008 00:46:06 -0000	1.41
--- src/include/commands/tablecmds.h	6 Oct 2008 16:38:36 -0000
***************
*** 24,29 ****
--- 24,31 ----
  
  extern void AlterTable(AlterTableStmt *stmt);
  
+ extern bool AlterTableLockLevel(List *cmds);
+ 
  extern void ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing);
  
  extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
Index: src/include/storage/lmgr.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/storage/lmgr.h,v
retrieving revision 1.62
diff -c -r1.62 lmgr.h
*** src/include/storage/lmgr.h	12 May 2008 00:00:53 -0000	1.62
--- src/include/storage/lmgr.h	3 Oct 2008 12:36:17 -0000
***************
*** 32,37 ****
--- 32,39 ----
  extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode);
  extern void UnlockRelation(Relation relation, LOCKMODE lockmode);
  
+ extern bool RelationLockedByMe(Relation relation, LOCKMODE lockmode);
+ 
  extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
  extern void UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
  
Index: src/include/storage/lock.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/storage/lock.h,v
retrieving revision 1.114
diff -c -r1.114 lock.h
*** src/include/storage/lock.h	16 Sep 2008 01:56:26 -0000	1.114
--- src/include/storage/lock.h	3 Oct 2008 12:35:43 -0000
***************
*** 467,472 ****
--- 467,473 ----
  extern void InitLocks(void);
  extern LockMethod GetLocksMethodTable(const LOCK *lock);
  extern uint32 LockTagHashCode(const LOCKTAG *locktag);
+ extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode);
  extern LockAcquireResult LockAcquire(const LOCKTAG *locktag,
  			LOCKMODE lockmode,
  			bool sessionLock,