Index: doc/src/sgml/mvcc.sgml
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/doc/src/sgml/mvcc.sgml,v
retrieving revision 2.69
diff -c -r2.69 mvcc.sgml
*** doc/src/sgml/mvcc.sgml	18 Feb 2007 01:21:49 -0000	2.69
--- doc/src/sgml/mvcc.sgml	7 Oct 2008 11:27:21 -0000
***************
*** 504,512 ****
      most <productname>PostgreSQL</productname> commands automatically
      acquire locks of appropriate modes to ensure that referenced
      tables are not dropped or modified in incompatible ways while the
!     command executes.  (For example, <command>ALTER TABLE</> cannot safely be
!     executed concurrently with other operations on the same table, so it
!     obtains an exclusive lock on the table to enforce that.)
     </para>
  
     <para>
--- 504,512 ----
      most <productname>PostgreSQL</productname> commands automatically
      acquire locks of appropriate modes to ensure that referenced
      tables are not dropped or modified in incompatible ways while the
!     command executes.  (For example, <command>TRUNCATE</> 
!     cannot safely be executed concurrently with other operations on the 
!     same table, so it obtains an exclusive lock on the table to enforce that.)
     </para>
  
     <para>
***************
*** 647,654 ****
          </para>
  
          <para>
!          Acquired by <command>CREATE INDEX</command>
!          (without <option>CONCURRENTLY</option>).
          </para>
         </listitem>
        </varlistentry>
--- 647,659 ----
          </para>
  
          <para>
!          Acquired by <command>CREATE INDEX</command> (without 
!          <option>CONCURRENTLY</option>), <command>CREATE TRIGGER</command>, 
!          <command>CREATE RULE</command> (all except ON SELECT rules),
!          <command>ALTER TABLE</command> (if all of its subcommands are one 
!          of the following subcommands: <command>ADD UNIQUE</command>, 
!          <command>ADD PRIMARY KEY</command> or 
!          <command>ADD FOREIGN KEY</command>).
          </para>
         </listitem>
        </varlistentry>
***************
*** 714,724 ****
          </para>
  
          <para>
!          Acquired by the <command>ALTER TABLE</command>, <command>DROP
!          TABLE</command>, <command>TRUNCATE</command>, <command>REINDEX</command>,
           <command>CLUSTER</command>, and <command>VACUUM FULL</command>
           commands.  This is also the default lock mode for <command>LOCK
           TABLE</command> statements that do not specify a mode explicitly.
          </para>
         </listitem>
        </varlistentry>
--- 719,731 ----
          </para>
  
          <para>
!          Acquired by the <command>DROP TABLE</command>, 
!          <command>TRUNCATE</command>, <command>REINDEX</command>,
           <command>CLUSTER</command>, and <command>VACUUM FULL</command>
           commands.  This is also the default lock mode for <command>LOCK
           TABLE</command> statements that do not specify a mode explicitly.
+          <command>ALTER TABLE</command> will take a lock at this level, 
+          with some exceptions, noted above under SHARE LOCK.
          </para>
         </listitem>
        </varlistentry>
Index: src/backend/commands/tablecmds.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/commands/tablecmds.c,v
retrieving revision 1.266
diff -c -r1.266 tablecmds.c
*** src/backend/commands/tablecmds.c	8 Sep 2008 00:47:40 -0000	1.266
--- src/backend/commands/tablecmds.c	7 Oct 2008 10:14:53 -0000
***************
*** 249,269 ****
  							 Relation rel, Relation pkrel, Oid constraintOid);
  static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
  						 Oid constraintOid);
! static void ATController(Relation rel, List *cmds, bool recurse);
  static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
! 		  bool recurse, bool recursing);
! static void ATRewriteCatalogs(List **wqueue);
  static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					  AlterTableCmd *cmd);
  static void ATRewriteTables(List **wqueue);
  static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
  static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
  static void ATSimplePermissions(Relation rel, bool allowView);
  static void ATSimplePermissionsRelationOrIndex(Relation rel);
  static void ATSimpleRecursion(List **wqueue, Relation rel,
! 				  AlterTableCmd *cmd, bool recurse);
  static void ATOneLevelRecursion(List **wqueue, Relation rel,
! 					AlterTableCmd *cmd);
  static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
  				AlterTableCmd *cmd);
  static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
--- 249,269 ----
  							 Relation rel, Relation pkrel, Oid constraintOid);
  static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
  						 Oid constraintOid);
! static void ATController(Relation rel, List *cmds, bool recurse, bool needFullLock);
  static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
! 		  bool recurse, bool recursing, bool needFullLock);
! static void ATRewriteCatalogs(List **wqueue, bool needFullLock);
  static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					  AlterTableCmd *cmd, bool needFullLock);
  static void ATRewriteTables(List **wqueue);
  static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
  static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
  static void ATSimplePermissions(Relation rel, bool allowView);
  static void ATSimplePermissionsRelationOrIndex(Relation rel);
  static void ATSimpleRecursion(List **wqueue, Relation rel,
! 				  AlterTableCmd *cmd, bool recurse, bool needFullLock);
  static void ATOneLevelRecursion(List **wqueue, Relation rel,
! 					AlterTableCmd *cmd, bool needFullLock);
  static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
  				AlterTableCmd *cmd);
  static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
***************
*** 284,297 ****
  				 DropBehavior behavior,
  				 bool recurse, bool recursing);
  static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
! 			   IndexStmt *stmt, bool is_rebuild);
  static void ATExecAddConstraint(List **wqueue,
  								AlteredTableInfo *tab, Relation rel,
! 								Node *newConstraint, bool recurse);
  static void ATAddCheckConstraint(List **wqueue,
  								 AlteredTableInfo *tab, Relation rel,
  								 Constraint *constr,
! 								 bool recurse, bool recursing);
  static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
  						  FkConstraint *fkconstraint);
  static void ATExecDropConstraint(Relation rel, const char *constrName,
--- 284,299 ----
  				 DropBehavior behavior,
  				 bool recurse, bool recursing);
  static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
! 			   IndexStmt *stmt, bool is_rebuild, bool needFullLock);
  static void ATExecAddConstraint(List **wqueue,
  								AlteredTableInfo *tab, Relation rel,
! 								Node *newConstraint, bool recurse, 
! 								bool needFullLock);
  static void ATAddCheckConstraint(List **wqueue,
  								 AlteredTableInfo *tab, Relation rel,
  								 Constraint *constr,
! 								 bool recurse, bool recursing, 
! 								 bool needFullLock);
  static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
  						  FkConstraint *fkconstraint);
  static void ATExecDropConstraint(Relation rel, const char *constrName,
***************
*** 2124,2131 ****
   * Disallow ALTER TABLE (and similar commands) when the current backend has
   * any open reference to the target table besides the one just acquired by
   * the calling command; this implies there's an open cursor or active plan.
!  * We need this check because our AccessExclusiveLock doesn't protect us
!  * against stomping on our own foot, only other people's feet!
   *
   * For ALTER TABLE, the only case known to cause serious trouble is ALTER
   * COLUMN TYPE, and some changes are obviously pretty benign, so this could
--- 2126,2133 ----
   * Disallow ALTER TABLE (and similar commands) when the current backend has
   * any open reference to the target table besides the one just acquired by
   * the calling command; this implies there's an open cursor or active plan.
!  * We need this check because our lock doesn't protect us against stomping
!  * on our own foot, only other people's feet!
   *
   * For ALTER TABLE, the only case known to cause serious trouble is ALTER
   * COLUMN TYPE, and some changes are obviously pretty benign, so this could
***************
*** 2202,2212 ****
   *
   * Thanks to the magic of MVCC, an error anywhere along the way rolls back
   * the whole operation; we don't have to do anything special to clean up.
   */
  void
  AlterTable(AlterTableStmt *stmt)
  {
! 	Relation	rel = relation_openrv(stmt->relation, AccessExclusiveLock);
  
  	CheckTableNotInUse(rel, "ALTER TABLE");
  
--- 2204,2231 ----
   *
   * Thanks to the magic of MVCC, an error anywhere along the way rolls back
   * the whole operation; we don't have to do anything special to clean up.
+  *
+  * We lock the table as the first action, with an appropriate lock level
+  * for the subcommands requested. Any subcommand that needs to rewrite
+  * tuples in the table forces the whole command to be executed with
+  * AccessExclusiveLock. If all subcommands do not require rewrite table
+  * then we are able to use just ShareLock. We pass the lock level down
+  * so that we can apply it recursively to inherited tables. Note that the
+  * lock level we want as we recurse may well be higher than required for 
+  * that specific subcommand. So we pass down the overall lock requirement, 
+  * rather than reassess it at lower levels.
   */
  void
  AlterTable(AlterTableStmt *stmt)
  {
! 	Relation	rel;
! 	bool		needFullLock = AlterTableLockLevel(stmt->cmds);
! 
! 	/*
! 	 * Acquire same level of lock as already acquired during parsing.
! 	 */
! 	rel = relation_openrv(stmt->relation, 
! 							needFullLock ? AccessExclusiveLock : ShareLock);
  
  	CheckTableNotInUse(rel, "ALTER TABLE");
  
***************
*** 2248,2254 ****
  			elog(ERROR, "unrecognized object type: %d", (int) stmt->relkind);
  	}
  
! 	ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt));
  }
  
  /*
--- 2267,2274 ----
  			elog(ERROR, "unrecognized object type: %d", (int) stmt->relkind);
  	}
  
! 	ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt), 
! 						needFullLock);
  }
  
  /*
***************
*** 2265,2277 ****
  void
  AlterTableInternal(Oid relid, List *cmds, bool recurse)
  {
! 	Relation	rel = relation_open(relid, AccessExclusiveLock);
  
! 	ATController(rel, cmds, recurse);
  }
  
  static void
! ATController(Relation rel, List *cmds, bool recurse)
  {
  	List	   *wqueue = NIL;
  	ListCell   *lcmd;
--- 2285,2406 ----
  void
  AlterTableInternal(Oid relid, List *cmds, bool recurse)
  {
! 	Relation	rel;
! 
! 	rel = relation_open(relid, ShareLock);
! 
! 	ATController(rel, cmds, recurse, false);
! }
! 
! /*
!  * AlterTableLockLevel
!  *
!  * Sets the overall lock level required for the supplied list of subcommands.
!  * Policy for doing this set according to needs of AlterTable(), see
!  * comments there for overall explanation. 
!  *
!  * Returns true if we need AccessExclusiveLock, false means ShareLock.
!  *
!  * Function is called before and after parsing, so it must give same
!  * answer each time it is called. Some subcommands are transformed
!  * into other subcommand types, so the transform must never be made to a
!  * lower lock level than previously assigned. All transforms are noted below.
!  */
! bool
! AlterTableLockLevel(List *cmds)
! {
! 	ListCell   *lcmd;
! 	bool		needFullLock = false;
! 
! 	foreach(lcmd, cmds)
! 	{
! 		AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
! 
! 		switch (cmd->subtype)
! 		{
! 			/*
! 			 * Need AccessExclusiveLock for these subcommands.
! 			 */
! 			case AT_AddColumn:			/* must rewrite heap */
! 			case AT_DropColumn:			/* change visible to SELECT */
! 			case AT_AlterColumnType:	/* must rewrite heap */
! 			case AT_DropConstraint:		/* as DROP INDEX */
! 			case AT_DropOids:			/* calls AT_DropColumn */
! 			case AT_EnableAlwaysRule:	/* as DROP INDEX */
! 			case AT_EnableReplicaRule:	/* as DROP INDEX */
! 			case AT_EnableRule:			/* as DROP INDEX */
! 			case AT_DisableRule:		/* as DROP INDEX */
! 			case AT_AddInherit:			/* change visible to SELECT */
! 			case AT_DropInherit:		/* change visible to SELECT */
! 			case AT_ChangeOwner:		/* change visible to SELECT */
! 			case AT_SetTableSpace:		/* must rewrite heap */
! 			case AT_DropNotNull:		/* may change some SQL plans */
! 				needFullLock = true;
! 				break;
! 
! 			/*
! 			 * XXX Potentially ShareLocks, with the correct catalog locking
! 			 */
! 			case AT_ColumnDefault:
! 			case AT_SetNotNull:
! 			case AT_SetStatistics:
! 			case AT_SetStorage:				
! 			case AT_ProcessedConstraint:	/* becomes AT_AddConstraint */
! 			case AT_AddConstraintRecurse:	/* becomes AT_AddConstraint */
! 			case AT_ClusterOn:
! 			case AT_DropCluster:
! 			case AT_EnableTrig:
! 			case AT_EnableAlwaysTrig:
! 			case AT_EnableReplicaTrig:
! 			case AT_EnableTrigAll:
! 			case AT_EnableTrigUser:
! 			case AT_DisableTrig:
! 			case AT_DisableTrigAll:
! 			case AT_DisableTrigUser:
! 			case AT_SetRelOptions:
! 			case AT_ResetRelOptions:
! 				needFullLock = true;
! 				break;
  
! 			/*
! 			 * Only ADD UNIQUE, ADD PRIMARY KEY and ADD FOREIGN KEY can use
! 			 * ShareLocks currently.
! 			 */
! 			case AT_AddConstraint:			/* as CREATE INDEX */
! 				if (IsA(cmd->def, Constraint))
! 				{
! 					Constraint *con = (Constraint *) cmd->def;
! 
! 					if (con->contype != CONSTR_PRIMARY &&
! 						con->contype != CONSTR_UNIQUE)
! 						needFullLock = true;
! 				}
! 				else if (IsA(cmd->def, FkConstraint))
! 					needFullLock = false;
! 				else
! 					needFullLock = true;
! 				break;
! 
! 			/*
! 			 * ShareLock is all we need for these subcommands.
! 			 * Be very careful about moving commands onto this list. If
! 			 * you are unsure, place subcommand above, not here.
! 			 */
! 			case AT_AddIndex:				/* from ADD CONSTRAINT */
! 				break;
! 
! 			default:				/* oops */
! 				elog(ERROR, "unrecognized alter table type: %d",
! 					 (int) cmd->subtype);
! 				break;
! 		}
! 	}
! 
! 	return needFullLock;
  }
  
  static void
! ATController(Relation rel, List *cmds, bool recurse, bool needFullLock)
  {
  	List	   *wqueue = NIL;
  	ListCell   *lcmd;
***************
*** 2281,2294 ****
  	{
  		AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
  
! 		ATPrepCmd(&wqueue, rel, cmd, recurse, false);
  	}
  
  	/* Close the relation, but keep lock until commit */
  	relation_close(rel, NoLock);
  
  	/* Phase 2: update system catalogs */
! 	ATRewriteCatalogs(&wqueue);
  
  	/* Phase 3: scan/rewrite tables as needed */
  	ATRewriteTables(&wqueue);
--- 2410,2423 ----
  	{
  		AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
  
! 		ATPrepCmd(&wqueue, rel, cmd, recurse, false, needFullLock);
  	}
  
  	/* Close the relation, but keep lock until commit */
  	relation_close(rel, NoLock);
  
  	/* Phase 2: update system catalogs */
! 	ATRewriteCatalogs(&wqueue, needFullLock);
  
  	/* Phase 3: scan/rewrite tables as needed */
  	ATRewriteTables(&wqueue);
***************
*** 2300,2311 ****
   * Traffic cop for ALTER TABLE Phase 1 operations, including simple
   * recursion and permission checks.
   *
!  * Caller must have acquired AccessExclusiveLock on relation already.
   * This lock should be held until commit.
   */
  static void
  ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
! 		  bool recurse, bool recursing)
  {
  	AlteredTableInfo *tab;
  	int			pass;
--- 2429,2440 ----
   * Traffic cop for ALTER TABLE Phase 1 operations, including simple
   * recursion and permission checks.
   *
!  * Caller must have acquired appropriate lock type on relation already.
   * This lock should be held until commit.
   */
  static void
  ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
! 		  bool recurse, bool recursing, bool needFullLock)
  {
  	AlteredTableInfo *tab;
  	int			pass;
***************
*** 2342,2372 ****
  			 * rules.
  			 */
  			ATSimplePermissions(rel, true);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
  			break;
  		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetNotNull:		/* ALTER COLUMN SET NOT NULL */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_SetStatistics:	/* ALTER COLUMN STATISTICS */
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* Performs own permission checks */
  			ATPrepSetStatistics(rel, cmd->name, cmd->def);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetStorage:		/* ALTER COLUMN STORAGE */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_COL_ATTRS;
  			break;
--- 2471,2501 ----
  			 * rules.
  			 */
  			ATSimplePermissions(rel, true);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse, needFullLock);
  			/* No command-specific prep needed */
  			pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
  			break;
  		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse, needFullLock);
  			/* No command-specific prep needed */
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetNotNull:		/* ALTER COLUMN SET NOT NULL */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse, needFullLock);
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_SetStatistics:	/* ALTER COLUMN STATISTICS */
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse, needFullLock);
  			/* Performs own permission checks */
  			ATPrepSetStatistics(rel, cmd->name, cmd->def);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetStorage:		/* ALTER COLUMN STORAGE */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse, needFullLock);
  			/* No command-specific prep needed */
  			pass = AT_PASS_COL_ATTRS;
  			break;
***************
*** 2428,2434 ****
  				dropCmd->subtype = AT_DropColumn;
  				dropCmd->name = pstrdup("oid");
  				dropCmd->behavior = cmd->behavior;
! 				ATPrepCmd(wqueue, rel, dropCmd, recurse, false);
  			}
  			pass = AT_PASS_DROP;
  			break;
--- 2557,2563 ----
  				dropCmd->subtype = AT_DropColumn;
  				dropCmd->name = pstrdup("oid");
  				dropCmd->behavior = cmd->behavior;
! 				ATPrepCmd(wqueue, rel, dropCmd, recurse, false, needFullLock);
  			}
  			pass = AT_PASS_DROP;
  			break;
***************
*** 2483,2489 ****
   * conflicts).
   */
  static void
! ATRewriteCatalogs(List **wqueue)
  {
  	int			pass;
  	ListCell   *ltab;
--- 2612,2618 ----
   * conflicts).
   */
  static void
! ATRewriteCatalogs(List **wqueue, bool needFullLock)
  {
  	int			pass;
  	ListCell   *ltab;
***************
*** 2508,2520 ****
  			if (subcmds == NIL)
  				continue;
  
! 			/*
! 			 * Exclusive lock was obtained by phase 1, needn't get it again
  			 */
  			rel = relation_open(tab->relid, NoLock);
  
  			foreach(lcmd, subcmds)
! 				ATExecCmd(wqueue, tab, rel, (AlterTableCmd *) lfirst(lcmd));
  
  			/*
  			 * After the ALTER TYPE pass, do cleanup work (this is not done in
--- 2637,2649 ----
  			if (subcmds == NIL)
  				continue;
  
! 			/* 
! 			 * Appropriate lock was obtained by phase 1, needn't get it again
  			 */
  			rel = relation_open(tab->relid, NoLock);
  
  			foreach(lcmd, subcmds)
! 				ATExecCmd(wqueue, tab, rel, (AlterTableCmd *) lfirst(lcmd), needFullLock);
  
  			/*
  			 * After the ALTER TYPE pass, do cleanup work (this is not done in
***************
*** 2549,2555 ****
   */
  static void
  ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 		  AlterTableCmd *cmd)
  {
  	switch (cmd->subtype)
  	{
--- 2678,2684 ----
   */
  static void
  ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 		  AlterTableCmd *cmd, bool needFullLock)
  {
  	switch (cmd->subtype)
  	{
***************
*** 2578,2593 ****
  			ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
  			break;
  		case AT_AddIndex:		/* ADD INDEX */
! 			ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false);
  			break;
  		case AT_ReAddIndex:		/* ADD INDEX */
! 			ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true);
  			break;
  		case AT_AddConstraint:	/* ADD CONSTRAINT */
! 			ATExecAddConstraint(wqueue, tab, rel, cmd->def, false);
  			break;
  		case AT_AddConstraintRecurse:	/* ADD CONSTRAINT with recursion */
! 			ATExecAddConstraint(wqueue, tab, rel, cmd->def, true);
  			break;
  		case AT_DropConstraint:	/* DROP CONSTRAINT */
  			ATExecDropConstraint(rel, cmd->name, cmd->behavior, false, false);
--- 2707,2722 ----
  			ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
  			break;
  		case AT_AddIndex:		/* ADD INDEX */
! 			ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false, needFullLock);
  			break;
  		case AT_ReAddIndex:		/* ADD INDEX */
! 			ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true, needFullLock);
  			break;
  		case AT_AddConstraint:	/* ADD CONSTRAINT */
! 			ATExecAddConstraint(wqueue, tab, rel, cmd->def, false, needFullLock);
  			break;
  		case AT_AddConstraintRecurse:	/* ADD CONSTRAINT with recursion */
! 			ATExecAddConstraint(wqueue, tab, rel, cmd->def, true, needFullLock);
  			break;
  		case AT_DropConstraint:	/* DROP CONSTRAINT */
  			ATExecDropConstraint(rel, cmd->name, cmd->behavior, false, false);
***************
*** 2725,2730 ****
--- 2854,2860 ----
  			ObjectAddress object;
  
  			OldHeap = heap_open(tab->relid, NoLock);
+ 			Assert(RelationLockedByMe(OldHeap, AccessExclusiveLock));
  
  			/*
  			 * We can never allow rewriting of shared or nailed-in-cache
***************
*** 2847,2858 ****
  				Relation	refrel;
  
  				if (rel == NULL)
- 				{
  					/* Long since locked, no need for another */
  					rel = heap_open(tab->relid, NoLock);
- 				}
  
! 				refrel = heap_open(con->refrelid, RowShareLock);
  
  				validateForeignKeyConstraint(fkconstraint, rel, refrel,
  											 con->conid);
--- 2977,2992 ----
  				Relation	refrel;
  
  				if (rel == NULL)
  					/* Long since locked, no need for another */
  					rel = heap_open(tab->relid, NoLock);
  
! 				/*
! 				 * We need to prevent write access to table while
! 				 * we make the check. RowShareLock is needed if we
! 				 * use triggers, but if we want to avoid using
! 				 * row-level locks if we can.
! 				 */
! 				refrel = heap_open(con->refrelid, ShareLock);
  
  				validateForeignKeyConstraint(fkconstraint, rel, refrel,
  											 con->conid);
***************
*** 2893,2901 ****
--- 3027,3041 ----
  	newTupDesc = RelationGetDescr(oldrel);		/* includes all mods */
  
  	if (OidIsValid(OIDNewHeap))
+ 	{
  		newrel = heap_open(OIDNewHeap, AccessExclusiveLock);
+ 		Assert(RelationLockedByMe(oldrel, AccessExclusiveLock));
+ 	}
  	else
+ 	{
  		newrel = NULL;
+ 		Assert(RelationLockedByMe(oldrel, ShareLock));
+ 	}
  
  	/*
  	 * If we need to rewrite the table, the operation has to be propagated to
***************
*** 3230,3236 ****
   */
  static void
  ATSimpleRecursion(List **wqueue, Relation rel,
! 				  AlterTableCmd *cmd, bool recurse)
  {
  	/*
  	 * Propagate to children if desired.  Non-table relations never have
--- 3370,3376 ----
   */
  static void
  ATSimpleRecursion(List **wqueue, Relation rel,
! 				  AlterTableCmd *cmd, bool recurse, bool needFullLock)
  {
  	/*
  	 * Propagate to children if desired.  Non-table relations never have
***************
*** 3257,3265 ****
  
  			if (childrelid == relid)
  				continue;
! 			childrel = relation_open(childrelid, AccessExclusiveLock);
  			CheckTableNotInUse(childrel, "ALTER TABLE");
! 			ATPrepCmd(wqueue, childrel, cmd, false, true);
  			relation_close(childrel, NoLock);
  		}
  	}
--- 3397,3408 ----
  
  			if (childrelid == relid)
  				continue;
! 
! 			childrel = relation_open(childrelid, 
! 						needFullLock ? AccessExclusiveLock : ShareLock);
! 
  			CheckTableNotInUse(childrel, "ALTER TABLE");
! 			ATPrepCmd(wqueue, childrel, cmd, false, true, needFullLock);
  			relation_close(childrel, NoLock);
  		}
  	}
***************
*** 3275,3281 ****
   */
  static void
  ATOneLevelRecursion(List **wqueue, Relation rel,
! 					AlterTableCmd *cmd)
  {
  	Oid			relid = RelationGetRelid(rel);
  	ListCell   *child;
--- 3418,3424 ----
   */
  static void
  ATOneLevelRecursion(List **wqueue, Relation rel,
! 					AlterTableCmd *cmd, bool needFullLock)
  {
  	Oid			relid = RelationGetRelid(rel);
  	ListCell   *child;
***************
*** 3289,3297 ****
  		Oid			childrelid = lfirst_oid(child);
  		Relation	childrel;
  
! 		childrel = relation_open(childrelid, AccessExclusiveLock);
  		CheckTableNotInUse(childrel, "ALTER TABLE");
! 		ATPrepCmd(wqueue, childrel, cmd, true, true);
  		relation_close(childrel, NoLock);
  	}
  }
--- 3432,3442 ----
  		Oid			childrelid = lfirst_oid(child);
  		Relation	childrel;
  
! 		childrel = relation_open(childrelid, 
! 						needFullLock ? AccessExclusiveLock : ShareLock);
! 
  		CheckTableNotInUse(childrel, "ALTER TABLE");
! 		ATPrepCmd(wqueue, childrel, cmd, true, true, needFullLock);
  		relation_close(childrel, NoLock);
  	}
  }
***************
*** 3427,3433 ****
  		colDefChild->inhcount = 1;
  		colDefChild->is_local = false;
  
! 		ATOneLevelRecursion(wqueue, rel, childCmd);
  	}
  	else
  	{
--- 3572,3578 ----
  		colDefChild->inhcount = 1;
  		colDefChild->is_local = false;
  
! 		ATOneLevelRecursion(wqueue, rel, childCmd, true);
  	}
  	else
  	{
***************
*** 4235,4241 ****
   */
  static void
  ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
! 			   IndexStmt *stmt, bool is_rebuild)
  {
  	bool		check_rights;
  	bool		skip_build;
--- 4380,4386 ----
   */
  static void
  ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
! 			   IndexStmt *stmt, bool is_rebuild, bool needFullLock)
  {
  	bool		check_rights;
  	bool		skip_build;
***************
*** 4275,4281 ****
   */
  static void
  ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					Node *newConstraint, bool recurse)
  {
  	switch (nodeTag(newConstraint))
  	{
--- 4420,4426 ----
   */
  static void
  ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					Node *newConstraint, bool recurse, bool needFullLock)
  {
  	switch (nodeTag(newConstraint))
  	{
***************
*** 4293,4299 ****
  				{
  					case CONSTR_CHECK:
  						ATAddCheckConstraint(wqueue, tab, rel,
! 											 constr, recurse, false);
  						break;
  					default:
  						elog(ERROR, "unrecognized constraint type: %d",
--- 4438,4444 ----
  				{
  					case CONSTR_CHECK:
  						ATAddCheckConstraint(wqueue, tab, rel,
! 											 constr, recurse, false, needFullLock);
  						break;
  					default:
  						elog(ERROR, "unrecognized constraint type: %d",
***************
*** 4356,4362 ****
   */
  static void
  ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					 Constraint *constr, bool recurse, bool recursing)
  {
  	List	   *newcons;
  	ListCell   *lcon;
--- 4501,4508 ----
   */
  static void
  ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					 Constraint *constr, bool recurse, bool recursing,
! 					 bool needFullLock)
  {
  	List	   *newcons;
  	ListCell   *lcon;
***************
*** 4428,4434 ****
  		Relation	childrel;
  		AlteredTableInfo *childtab;
  
! 		childrel = heap_open(childrelid, AccessExclusiveLock);
  		CheckTableNotInUse(childrel, "ALTER TABLE");
  
  		/* Find or create work queue entry for this table */
--- 4574,4582 ----
  		Relation	childrel;
  		AlteredTableInfo *childtab;
  
! 		childrel = heap_open(childrelid, 
! 						needFullLock ? AccessExclusiveLock : ShareLock);
! 
  		CheckTableNotInUse(childrel, "ALTER TABLE");
  
  		/* Find or create work queue entry for this table */
***************
*** 4436,4442 ****
  
  		/* Recurse to child */
  		ATAddCheckConstraint(wqueue, childtab, childrel,
! 							 constr, recurse, true);
  
  		heap_close(childrel, NoLock);
  	}
--- 4584,4590 ----
  
  		/* Recurse to child */
  		ATAddCheckConstraint(wqueue, childtab, childrel,
! 							 constr, recurse, true, needFullLock);
  
  		heap_close(childrel, NoLock);
  	}
***************
*** 4470,4482 ****
  	Oid			constrOid;
  
  	/*
! 	 * Grab an exclusive lock on the pk table, so that someone doesn't delete
! 	 * rows out from under us. (Although a lesser lock would do for that
! 	 * purpose, we'll need exclusive lock anyway to add triggers to the pk
! 	 * table; trying to start with a lesser lock will just create a risk of
! 	 * deadlock.)
  	 */
! 	pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
  
  	/*
  	 * Validity and permissions checks
--- 4618,4629 ----
  	Oid			constrOid;
  
  	/*
! 	 * Grab a ShareLock lock on the pk table, so that someone doesn't delete
! 	 * rows out from under us. We can't take a lower level of lock, since
! 	 * we need ShareLocks to add triggers for the constraint. (Remember: we
! 	 * add triggers to *both* tables when we add a Foreign Key).
  	 */
! 	pkrel = heap_openrv(fkconstraint->pktable, ShareLock);
  
  	/*
  	 * Validity and permissions checks
***************
*** 4993,4999 ****
   * Scan the existing rows in a table to verify they meet a proposed FK
   * constraint.
   *
!  * Caller must have opened and locked both relations.
   */
  static void
  validateForeignKeyConstraint(FkConstraint *fkconstraint,
--- 5140,5146 ----
   * Scan the existing rows in a table to verify they meet a proposed FK
   * constraint.
   *
!  * Caller must have opened and locked both relations at ShareLock or higher.
   */
  static void
  validateForeignKeyConstraint(FkConstraint *fkconstraint,
***************
*** 5541,5547 ****
  	 * alter would put them out of step.
  	 */
  	if (recurse)
! 		ATSimpleRecursion(wqueue, rel, cmd, recurse);
  	else if (!recursing &&
  			 find_inheritance_children(RelationGetRelid(rel)) != NIL)
  		ereport(ERROR,
--- 5688,5694 ----
  	 * alter would put them out of step.
  	 */
  	if (recurse)
! 		ATSimpleRecursion(wqueue, rel, cmd, recurse, true);
  	else if (!recursing &&
  			 find_inheritance_children(RelationGetRelid(rel)) != NIL)
  		ereport(ERROR,
***************
*** 5938,5943 ****
--- 6085,6094 ----
  	 */
  }
  
+ /*
+  * Called to add additional work items if ALTER TYPE touched indexed
+  * columns. Hence we always take AccessExclusiveLocks here.
+  */
  static void
  ATPostAlterTypeParse(char *cmd, List **wqueue)
  {
Index: src/backend/commands/trigger.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/commands/trigger.c,v
retrieving revision 1.237
diff -c -r1.237 trigger.c
*** src/backend/commands/trigger.c	1 Sep 2008 20:42:44 -0000	1.237
--- src/backend/commands/trigger.c	7 Oct 2008 10:15:25 -0000
***************
*** 95,101 ****
  	Oid			funcoid;
  	Oid			funcrettype;
  	Oid			trigoid;
- 	int			found = 0;
  	int			i;
  	char		constrtrigname[NAMEDATALEN];
  	char	   *trigname;
--- 95,100 ----
***************
*** 103,110 ****
  	Oid			constrrelid = InvalidOid;
  	ObjectAddress myself,
  				referenced;
  
! 	rel = heap_openrv(stmt->relation, AccessExclusiveLock);
  
  	if (rel->rd_rel->relkind != RELKIND_RELATION)
  		ereport(ERROR,
--- 102,115 ----
  	Oid			constrrelid = InvalidOid;
  	ObjectAddress myself,
  				referenced;
+ 	Form_pg_class rd_rel;
  
! 	/*
! 	 * ShareLock is sufficient to prevent concurrent write activity
! 	 * to relation. If we had SELECT Triggers we would need to take
! 	 * an AccessExclusiveLock, just as we do with ON SELECT Rules.
! 	 */
! 	rel = heap_openrv(stmt->relation, ShareLock);
  
  	if (rel->rd_rel->relkind != RELKIND_RELATION)
  		ereport(ERROR,
***************
*** 280,292 ****
  	}
  
  	/*
! 	 * Scan pg_trigger for existing triggers on relation.  We do this mainly
! 	 * because we must count them; a secondary benefit is to give a nice error
! 	 * message if there's already a trigger of the same name. (The unique
! 	 * index on tgrelid/tgname would complain anyway.)
  	 *
! 	 * NOTE that this is cool only because we have AccessExclusiveLock on the
! 	 * relation, so the trigger set won't be changing underneath us.
  	 */
  	ScanKeyInit(&key,
  				Anum_pg_trigger_tgrelid,
--- 285,297 ----
  	}
  
  	/*
! 	 * Scan pg_trigger for existing triggers on relation.  We do this 
! 	 * because we want to give a nice error message if there's already a 
! 	 * trigger of the same name. (The unique index on tgrelid/tgname would 
! 	 * complain anyway.)
  	 *
! 	 * We don't have AccessExclusiveLock, so its possible for us to succeed
! 	 * here then later fail when we insert the trigger. We tried.
  	 */
  	ScanKeyInit(&key,
  				Anum_pg_trigger_tgrelid,
***************
*** 303,309 ****
  					(errcode(ERRCODE_DUPLICATE_OBJECT),
  				  errmsg("trigger \"%s\" for relation \"%s\" already exists",
  						 trigname, stmt->relation->relname)));
- 		found++;
  	}
  	systable_endscan(tgscan);
  
--- 308,313 ----
***************
*** 405,415 ****
  		elog(ERROR, "cache lookup failed for relation %u",
  			 RelationGetRelid(rel));
  
! 	((Form_pg_class) GETSTRUCT(tuple))->reltriggers = found + 1;
  
! 	simple_heap_update(pgrel, &tuple->t_self, tuple);
! 
! 	CatalogUpdateIndexes(pgrel, tuple);
  
  	heap_freetuple(tuple);
  	heap_close(pgrel, RowExclusiveLock);
--- 409,432 ----
  		elog(ERROR, "cache lookup failed for relation %u",
  			 RelationGetRelid(rel));
  
! 	rd_rel = (Form_pg_class) GETSTRUCT(tuple);
  
! 	/*
! 	 * If anything changed, write out the tuple using non-transactional
! 	 * update, allowing us to have concurrent CREATE TRIGGER. See lengthy
! 	 * comments in index_update_stats().
! 	 */
! 	if (rd_rel->reltriggers == 0)
! 	{
! 		rd_rel->reltriggers = 1;
! 		heap_inplace_update(pgrel, tuple);
! 		/* the above sends a cache inval message */
! 	}
! 	else
! 	{
! 		/* no need to change tuple, but force relcache inval anyway */
! 		CacheInvalidateRelcacheByTuple(tuple);
! 	}
  
  	heap_freetuple(tuple);
  	heap_close(pgrel, RowExclusiveLock);
***************
*** 870,878 ****
  	 * Update relation's pg_class entry.  Crucial side-effect: other backends
  	 * (and this one too!) are sent SI message to make them rebuild relcache
  	 * entries.
- 	 *
- 	 * Note this is OK only because we have AccessExclusiveLock on the rel, so
- 	 * no one else is creating/deleting triggers on this rel at the same time.
  	 */
  	pgrel = heap_open(RelationRelationId, RowExclusiveLock);
  	tuple = SearchSysCacheCopy(RELOID,
--- 887,892 ----
***************
*** 882,892 ****
  		elog(ERROR, "cache lookup failed for relation %u", relid);
  	classForm = (Form_pg_class) GETSTRUCT(tuple);
  
- 	if (classForm->reltriggers == 0)	/* should not happen */
- 		elog(ERROR, "relation \"%s\" has reltriggers = 0",
- 			 RelationGetRelationName(rel));
- 	classForm->reltriggers--;
- 
  	simple_heap_update(pgrel, &tuple->t_self, tuple);
  
  	CatalogUpdateIndexes(pgrel, tuple);
--- 896,901 ----
***************
*** 1134,1151 ****
  RelationBuildTriggers(Relation relation)
  {
  	TriggerDesc *trigdesc;
! 	int			ntrigs = relation->rd_rel->reltriggers;
! 	Trigger    *triggers;
! 	int			found = 0;
  	Relation	tgrel;
  	ScanKeyData skey;
  	SysScanDesc tgscan;
  	HeapTuple	htup;
  	MemoryContext oldContext;
  
! 	Assert(ntrigs > 0);			/* else I should not have been called */
! 
! 	triggers = (Trigger *) palloc(ntrigs * sizeof(Trigger));
  
  	/*
  	 * Note: since we scan the triggers using TriggerRelidNameIndexId, we will
--- 1143,1166 ----
  RelationBuildTriggers(Relation relation)
  {
  	TriggerDesc *trigdesc;
! 	int			numtrigs;
! 	int			maxtrigs;
! 	Trigger    **triggers;
! 	Trigger		*trigarray;
! 	int			i;
  	Relation	tgrel;
  	ScanKeyData skey;
  	SysScanDesc tgscan;
  	HeapTuple	htup;
  	MemoryContext oldContext;
  
! 	/*
! 	 * allocate an array to hold the triggers (the array is extended if
! 	 * necessary)
! 	 */
! 	maxtrigs = 4;
! 	triggers = (Trigger **) palloc(sizeof(Trigger *) * maxtrigs);
! 	numtrigs = 0;
  
  	/*
  	 * Note: since we scan the triggers using TriggerRelidNameIndexId, we will
***************
*** 1167,1176 ****
  		Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(htup);
  		Trigger    *build;
  
! 		if (found >= ntrigs)
! 			elog(ERROR, "too many trigger records found for relation \"%s\"",
! 				 RelationGetRelationName(relation));
! 		build = &(triggers[found]);
  
  		build->tgoid = HeapTupleGetOid(htup);
  		build->tgname = DatumGetCString(DirectFunctionCall1(nameout,
--- 1182,1188 ----
  		Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(htup);
  		Trigger    *build;
  
! 		build = (Trigger *) palloc(sizeof(Trigger));
  
  		build->tgoid = HeapTupleGetOid(htup);
  		build->tgname = DatumGetCString(DirectFunctionCall1(nameout,
***************
*** 1199,1205 ****
  			bytea	   *val;
  			bool		isnull;
  			char	   *p;
- 			int			i;
  
  			val = DatumGetByteaP(fastgetattr(htup,
  											 Anum_pg_trigger_tgargs,
--- 1211,1216 ----
***************
*** 1218,1240 ****
  		else
  			build->tgargs = NULL;
  
! 		found++;
  	}
  
  	systable_endscan(tgscan);
  	heap_close(tgrel, AccessShareLock);
  
! 	if (found != ntrigs)
! 		elog(ERROR, "%d trigger record(s) not found for relation \"%s\"",
! 			 ntrigs - found,
! 			 RelationGetRelationName(relation));
  
  	/* Build trigdesc */
  	trigdesc = (TriggerDesc *) palloc0(sizeof(TriggerDesc));
! 	trigdesc->triggers = triggers;
! 	trigdesc->numtriggers = ntrigs;
! 	for (found = 0; found < ntrigs; found++)
! 		InsertTrigger(trigdesc, &(triggers[found]), found);
  
  	/* Copy completed trigdesc into cache storage */
  	oldContext = MemoryContextSwitchTo(CacheMemoryContext);
--- 1229,1266 ----
  		else
  			build->tgargs = NULL;
  
! 		if (numtrigs >= maxtrigs)
! 		{
! 			maxtrigs *= 2;
! 			triggers = (Trigger **) repalloc(triggers, sizeof(Trigger *) * maxtrigs);
! 		}
! 		triggers[numtrigs++] = build;
  	}
  
  	systable_endscan(tgscan);
  	heap_close(tgrel, AccessShareLock);
  
! 	if (numtrigs == 0)
! 	{
! 		pfree(triggers);
! 		return;
! 	}
  
  	/* Build trigdesc */
  	trigdesc = (TriggerDesc *) palloc0(sizeof(TriggerDesc));
!  	trigarray = (Trigger *) palloc(numtrigs * sizeof(Trigger));
! 
! 	for (i = 0; i < numtrigs; i++)
! 	{
! 		memcpy(&(trigarray[i]), triggers[i], sizeof(Trigger));
! 		pfree(triggers[i]);
! 	}
! 	pfree(triggers);
! 
!  	trigdesc->triggers = trigarray;
! 	trigdesc->numtriggers = numtrigs;
! 	for (i = 0; i < numtrigs; i++)
! 		InsertTrigger(trigdesc, &(trigarray[i]), i);
  
  	/* Copy completed trigdesc into cache storage */
  	oldContext = MemoryContextSwitchTo(CacheMemoryContext);
Index: src/backend/parser/parse_utilcmd.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/parser/parse_utilcmd.c,v
retrieving revision 2.17
diff -c -r2.17 parse_utilcmd.c
*** src/backend/parser/parse_utilcmd.c	1 Sep 2008 20:42:45 -0000	2.17
--- src/backend/parser/parse_utilcmd.c	7 Oct 2008 10:14:53 -0000
***************
*** 1292,1298 ****
  }
  
  /*
!  * transformIndexStmt - parse analysis for CREATE INDEX
   *
   * Note: this is a no-op for an index not using either index expressions or
   * a predicate expression.	There are several code paths that create indexes
--- 1292,1298 ----
  }
  
  /*
!  * transformIndexStmt - parse analysis for CREATE INDEX and ALTER TABLE
   *
   * Note: this is a no-op for an index not using either index expressions or
   * a predicate expression.	There are several code paths that create indexes
***************
*** 1318,1324 ****
  	 * because addRangeTableEntry() would acquire only AccessShareLock,
  	 * leaving DefineIndex() needing to do a lock upgrade with consequent risk
  	 * of deadlock.  Make sure this stays in sync with the type of lock
! 	 * DefineIndex() wants.
  	 */
  	rel = heap_openrv(stmt->relation,
  				  (stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock));
--- 1318,1325 ----
  	 * because addRangeTableEntry() would acquire only AccessShareLock,
  	 * leaving DefineIndex() needing to do a lock upgrade with consequent risk
  	 * of deadlock.  Make sure this stays in sync with the type of lock
! 	 * DefineIndex() wants. If we are being called by ALTER TABLE, we may
! 	 * already hold a higher lock...
  	 */
  	rel = heap_openrv(stmt->relation,
  				  (stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock));
***************
*** 1674,1679 ****
--- 1675,1681 ----
  	List	   *newcmds = NIL;
  	bool		skipValidation = true;
  	AlterTableCmd *newcmd;
+ 	bool		needFullLock;
  
  	/*
  	 * We must not scribble on the passed-in AlterTableStmt, so copy it. (This
***************
*** 1682,1694 ****
  	stmt = (AlterTableStmt *) copyObject(stmt);
  
  	/*
! 	 * Acquire exclusive lock on the target relation, which will be held until
  	 * end of transaction.	This ensures any decisions we make here based on
  	 * the state of the relation will still be good at execution. We must get
! 	 * exclusive lock now because execution will; taking a lower grade lock
! 	 * now and trying to upgrade later risks deadlock.
  	 */
! 	rel = relation_openrv(stmt->relation, AccessExclusiveLock);
  
  	/* Set up pstate */
  	pstate = make_parsestate(NULL);
--- 1684,1703 ----
  	stmt = (AlterTableStmt *) copyObject(stmt);
  
  	/*
! 	 * Assign the appropriate lock level for this list of subcommands.
! 	 */
! 	needFullLock = AlterTableLockLevel(stmt->cmds);
! 
! 	/*
! 	 * Acquire appropriate lock on the target relation, which will be held until
  	 * end of transaction.	This ensures any decisions we make here based on
  	 * the state of the relation will still be good at execution. We must get
! 	 * lock now because execution will; taking a lower grade lock now and 
! 	 * trying to upgrade later risks deadlock.  Any new commands we add
! 	 * after this must not upgrade the lock level requested here.
  	 */
! 	rel = relation_openrv(stmt->relation, 
! 							needFullLock ? AccessExclusiveLock : ShareLock);
  
  	/* Set up pstate */
  	pstate = make_parsestate(NULL);
Index: src/backend/rewrite/rewriteDefine.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/rewrite/rewriteDefine.c,v
retrieving revision 1.129
diff -c -r1.129 rewriteDefine.c
*** src/backend/rewrite/rewriteDefine.c	25 Aug 2008 22:42:34 -0000	1.129
--- src/backend/rewrite/rewriteDefine.c	7 Oct 2008 10:14:53 -0000
***************
*** 236,246 ****
  	/*
  	 * If we are installing an ON SELECT rule, we had better grab
  	 * AccessExclusiveLock to ensure no SELECTs are currently running on the
! 	 * event relation.	For other types of rules, it might be sufficient to
! 	 * grab ShareLock to lock out insert/update/delete actions.  But for now,
! 	 * let's just grab AccessExclusiveLock all the time.
  	 */
! 	event_relation = heap_open(event_relid, AccessExclusiveLock);
  
  	/*
  	 * Check user has permission to apply rules to this relation.
--- 236,248 ----
  	/*
  	 * If we are installing an ON SELECT rule, we had better grab
  	 * AccessExclusiveLock to ensure no SELECTs are currently running on the
! 	 * event relation.	For other types of rules, it is sufficient to
! 	 * grab ShareLock to lock out insert/update/delete actions.
  	 */
! 	if (event_type == CMD_SELECT)
! 		event_relation = heap_open(event_relid, AccessExclusiveLock);
! 	else
! 		event_relation = heap_open(event_relid, ShareLock);
  
  	/*
  	 * Check user has permission to apply rules to this relation.
Index: src/backend/rewrite/rewriteSupport.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/rewrite/rewriteSupport.c,v
retrieving revision 1.66
diff -c -r1.66 rewriteSupport.c
*** src/backend/rewrite/rewriteSupport.c	19 Jun 2008 00:46:05 -0000	1.66
--- src/backend/rewrite/rewriteSupport.c	7 Oct 2008 11:13:29 -0000
***************
*** 73,84 ****
  		/* Do the update */
  		classForm->relhasrules = relHasRules;
  		if (relIsBecomingView)
  			classForm->relkind = RELKIND_VIEW;
  
! 		simple_heap_update(relationRelation, &tuple->t_self, tuple);
  
! 		/* Keep the catalog indexes up to date */
! 		CatalogUpdateIndexes(relationRelation, tuple);
  	}
  	else
  	{
--- 73,88 ----
  		/* Do the update */
  		classForm->relhasrules = relHasRules;
  		if (relIsBecomingView)
+ 		{
  			classForm->relkind = RELKIND_VIEW;
  
! 			simple_heap_update(relationRelation, &tuple->t_self, tuple);
  
! 			/* Keep the catalog indexes up to date */
! 			CatalogUpdateIndexes(relationRelation, tuple);
! 		}
! 		else
! 			heap_inplace_update(relationRelation, tuple);
  	}
  	else
  	{
Index: src/backend/storage/lmgr/lmgr.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/storage/lmgr/lmgr.c,v
retrieving revision 1.97
diff -c -r1.97 lmgr.c
*** src/backend/storage/lmgr/lmgr.c	4 Mar 2008 19:54:06 -0000	1.97
--- src/backend/storage/lmgr/lmgr.c	7 Oct 2008 10:14:53 -0000
***************
*** 181,186 ****
--- 181,198 ----
  		AcceptInvalidationMessages();
  }
  
+ bool
+ RelationLockedByMe(Relation relation, LOCKMODE lockmode)
+ {
+ 	LOCKTAG		tag;
+ 
+ 	SET_LOCKTAG_RELATION(tag,
+ 						 relation->rd_lockInfo.lockRelId.dbId,
+ 						 relation->rd_lockInfo.lockRelId.relId);
+ 
+ 	return LockHeldByMe(&tag, lockmode);
+ }
+ 
  /*
   *		ConditionalLockRelation
   *
Index: src/backend/storage/lmgr/lock.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/storage/lmgr/lock.c,v
retrieving revision 1.184
diff -c -r1.184 lock.c
*** src/backend/storage/lmgr/lock.c	1 Aug 2008 13:16:09 -0000	1.184
--- src/backend/storage/lmgr/lock.c	7 Oct 2008 10:14:53 -0000
***************
*** 438,443 ****
--- 438,473 ----
  	return lockhash;
  }
  
+ bool
+ LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode)
+ {
+ 	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
+ 	LockMethod	lockMethodTable;
+ 	LOCALLOCKTAG localtag;
+ 	LOCALLOCK  *locallock;
+ 	bool		found = false;
+ 	LOCKMODE	lockmode_search = lockmode;
+ 
+ 	Assert(!(lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)));
+ 	lockMethodTable = LockMethods[lockmethodid];
+ 	Assert(!(lockmode <= 0 || lockmode > lockMethodTable->numLockModes));
+ 
+ 	/*
+ 	 * Find or create a LOCALLOCK entry for this lock and lockmode
+ 	 */
+ 	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
+ 	localtag.lock = *locktag;
+ 
+ 	while (!found && lockmode_search <= lockMethodTable->numLockModes)
+ 	{
+ 		localtag.mode = lockmode_search++;
+ 		locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
+ 											  (void *) &localtag,
+ 											  HASH_FIND, &found);
+ 	}
+ 
+ 	return found;
+ }
  
  /*
   * LockAcquire -- Check for lock conflicts, sleep if conflict found,
Index: src/backend/utils/adt/ri_triggers.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/utils/adt/ri_triggers.c,v
retrieving revision 1.110
diff -c -r1.110 ri_triggers.c
*** src/backend/utils/adt/ri_triggers.c	15 Sep 2008 23:37:39 -0000	1.110
--- src/backend/utils/adt/ri_triggers.c	7 Oct 2008 10:14:53 -0000
***************
*** 2601,2607 ****
   *	This is not a trigger procedure, but is called during ALTER TABLE
   *	ADD FOREIGN KEY to validate the initial table contents.
   *
!  *	We expect that an exclusive lock has been taken on rel and pkrel;
   *	hence, we do not need to lock individual rows for the check.
   *
   *	If the check fails because the current user doesn't have permissions
--- 2601,2607 ----
   *	This is not a trigger procedure, but is called during ALTER TABLE
   *	ADD FOREIGN KEY to validate the initial table contents.
   *
!  *	We expect that a ShareLock or higher has been taken on rel and pkrel;
   *	hence, we do not need to lock individual rows for the check.
   *
   *	If the check fails because the current user doesn't have permissions
Index: src/include/commands/tablecmds.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/commands/tablecmds.h,v
retrieving revision 1.41
diff -c -r1.41 tablecmds.h
*** src/include/commands/tablecmds.h	19 Jun 2008 00:46:06 -0000	1.41
--- src/include/commands/tablecmds.h	7 Oct 2008 10:14:53 -0000
***************
*** 24,29 ****
--- 24,31 ----
  
  extern void AlterTable(AlterTableStmt *stmt);
  
+ extern bool AlterTableLockLevel(List *cmds);
+ 
  extern void ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing);
  
  extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
Index: src/include/storage/lmgr.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/storage/lmgr.h,v
retrieving revision 1.62
diff -c -r1.62 lmgr.h
*** src/include/storage/lmgr.h	12 May 2008 00:00:53 -0000	1.62
--- src/include/storage/lmgr.h	7 Oct 2008 10:14:53 -0000
***************
*** 32,37 ****
--- 32,39 ----
  extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode);
  extern void UnlockRelation(Relation relation, LOCKMODE lockmode);
  
+ extern bool RelationLockedByMe(Relation relation, LOCKMODE lockmode);
+ 
  extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
  extern void UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
  
Index: src/include/storage/lock.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/storage/lock.h,v
retrieving revision 1.114
diff -c -r1.114 lock.h
*** src/include/storage/lock.h	16 Sep 2008 01:56:26 -0000	1.114
--- src/include/storage/lock.h	7 Oct 2008 10:14:53 -0000
***************
*** 467,472 ****
--- 467,473 ----
  extern void InitLocks(void);
  extern LockMethod GetLocksMethodTable(const LOCK *lock);
  extern uint32 LockTagHashCode(const LOCKTAG *locktag);
+ extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode);
  extern LockAcquireResult LockAcquire(const LOCKTAG *locktag,
  			LOCKMODE lockmode,
  			bool sessionLock,
