diff --git a/contrib/tcn/tcn.c b/contrib/tcn/tcn.c
index 6a8a96f60339118f7edb11668c5db23fbf85c211..c9594f68ad463aaf7c86fcb96ed51961cbfa3af1 100644
*** a/contrib/tcn/tcn.c
--- b/contrib/tcn/tcn.c
*************** triggered_change_notification(PG_FUNCTIO
*** 141,148 ****
  		if (!HeapTupleIsValid(indexTuple))		/* should not happen */
  			elog(ERROR, "cache lookup failed for index %u", indexoid);
  		index = (Form_pg_index) GETSTRUCT(indexTuple);
! 		/* we're only interested if it is the primary key */
! 		if (index->indisprimary)
  		{
  			int			numatts = index->indnatts;
  
--- 141,148 ----
  		if (!HeapTupleIsValid(indexTuple))		/* should not happen */
  			elog(ERROR, "cache lookup failed for index %u", indexoid);
  		index = (Form_pg_index) GETSTRUCT(indexTuple);
! 		/* we're only interested if it is the primary key and valid */
! 		if (index->indisprimary && IndexIsValid(index))
  		{
  			int			numatts = index->indnatts;
  
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index f99919093ca0da8c59ee4f4df0643837dfbdb38b..5f270404bf1279ad2ba745ca417ab00b0b5dbb08 100644
*** a/doc/src/sgml/catalogs.sgml
--- b/doc/src/sgml/catalogs.sgml
***************
*** 3480,3486 ****
         index is possibly incomplete: it must still be modified by
         <command>INSERT</>/<command>UPDATE</> operations, but it cannot safely
         be used for queries. If it is unique, the uniqueness property is not
!        true either.
        </entry>
       </row>
  
--- 3480,3486 ----
         index is possibly incomplete: it must still be modified by
         <command>INSERT</>/<command>UPDATE</> operations, but it cannot safely
         be used for queries. If it is unique, the uniqueness property is not
!        guaranteed true either.
        </entry>
       </row>
  
***************
*** 3508,3513 ****
--- 3508,3523 ----
       </row>
  
       <row>
+       <entry><structfield>indislive</structfield></entry>
+       <entry><type>bool</type></entry>
+       <entry></entry>
+       <entry>
+        If false, the index is in process of being dropped, and should be
+        ignored for all purposes (including HOT-safety decisions)
+       </entry>
+      </row>
+ 
+      <row>
        <entry><structfield>indkey</structfield></entry>
        <entry><type>int2vector</type></entry>
        <entry><literal><link linkend="catalog-pg-attribute"><structname>pg_attribute</structname></link>.attnum</literal></entry>
diff --git a/doc/src/sgml/ref/drop_index.sgml b/doc/src/sgml/ref/drop_index.sgml
index 343f7aca003dd1687e7370da663cc26554955dbd..98fd9c966c3b2cbd0fdc3bf5609e2ab6d64f6b16 100644
*** a/doc/src/sgml/ref/drop_index.sgml
--- b/doc/src/sgml/ref/drop_index.sgml
*************** DROP INDEX [ CONCURRENTLY ] [ IF EXISTS 
*** 40,73 ****
  
    <variablelist>
     <varlistentry>
!     <term><literal>IF EXISTS</literal></term>
      <listitem>
       <para>
!       Do not throw an error if the index does not exist. A notice is issued
!       in this case.
       </para>
      </listitem>
     </varlistentry>
  
     <varlistentry>
!     <term><literal>CONCURRENTLY</literal></term>
      <listitem>
       <para>
!       When this option is used, <productname>PostgreSQL</> will drop the
!       index without taking any locks that prevent concurrent selects, inserts,
!       updates, or deletes on the table; whereas a standard index drop
!       waits for a lock that locks out everything on the table until it's done.
!       Concurrent drop index is a two stage process. First, we mark the index
!       both invalid and not ready then commit the change. Next we wait until
!       there are no users locking the table who can see the index.
!      </para>
!      <para>
!       There are several caveats to be aware of when using this option.
!       Only one index name can be specified if the <literal>CONCURRENTLY</literal>
!       parameter is specified.  Regular <command>DROP INDEX</> command can be
!       performed within a transaction block, but
!       <command>DROP INDEX CONCURRENTLY</> cannot.
!       The CASCADE option is not supported when dropping an index concurrently.
       </para>
      </listitem>
     </varlistentry>
--- 40,72 ----
  
    <variablelist>
     <varlistentry>
!     <term><literal>CONCURRENTLY</literal></term>
      <listitem>
       <para>
!       Drop the index without locking out concurrent selects, inserts, updates,
!       and deletes on the index's table.  A normal <command>DROP INDEX</>
!       acquires exclusive lock on the table, blocking other accesses until the
!       index drop can be completed.  With this option, the command instead
!       waits until conflicting transactions have completed.
!      </para>
!      <para>
!       There are several caveats to be aware of when using this option.
!       Only one index name can be specified, and the <literal>CASCADE</> option
!       is not supported.  (Thus, an index that supports a <literal>UNIQUE</> or
!       <literal>PRIMARY KEY</> constraint cannot be dropped this way.)
!       Also, regular <command>DROP INDEX</> commands can be
!       performed within a transaction block, but
!       <command>DROP INDEX CONCURRENTLY</> cannot.
       </para>
      </listitem>
     </varlistentry>
  
     <varlistentry>
!     <term><literal>IF EXISTS</literal></term>
      <listitem>
       <para>
!       Do not throw an error if the index does not exist. A notice is issued
!       in this case.
       </para>
      </listitem>
     </varlistentry>
diff --git a/src/backend/access/heap/README.HOT b/src/backend/access/heap/README.HOT
index f12cad44e56c363e62fa617497bbedfe1ba8c1fe..4cf3c3a0d4c2db96a57e73e46fdd7463db439f79 100644
*** a/src/backend/access/heap/README.HOT
--- b/src/backend/access/heap/README.HOT
*************** from the index, as well as ensuring that
*** 386,391 ****
--- 386,419 ----
  rows in a broken HOT chain (the first condition is stronger than the
  second).  Finally, we can mark the index valid for searches.
  
+ Note that we do not need to set pg_index.indcheckxmin in this code path,
+ because we have outwaited any transactions that would need to avoid using
+ the index.  (indcheckxmin is only needed because non-concurrent CREATE
+ INDEX doesn't want to wait; its stronger lock would create too much risk of
+ deadlock if it did.)
+ 
+ 
+ DROP INDEX CONCURRENTLY
+ -----------------------
+ 
+ DROP INDEX CONCURRENTLY is sort of the reverse sequence of CREATE INDEX
+ CONCURRENTLY.  We first mark the index as not indisvalid, and then wait for
+ any transactions that could be using it in queries to end.  (During this
+ time, index updates must still be performed as normal, since such
+ transactions might expect freshly inserted tuples to be findable.)
+ Then, we clear indisready and indislive, and again wait for transactions
+ that could be updating the index to end.  Finally we can drop the index
+ normally (though taking only ShareUpdateExclusiveLock on its parent table).
+ 
+ The reason we need the pg_index.indislive flag is that after the second
+ wait step begins, we don't want transactions to be touching the index at
+ all; otherwise they might suffer errors if the DROP finally commits while
+ they are reading catalog entries for the index.  If we had only indisvalid
+ and indisready, this state would be indistinguishable from the first stage
+ of CREATE INDEX CONCURRENTLY --- but in that state, we *do* want
+ transactions to examine the index, since they must consider it in
+ HOT-safety checks.
+ 
  
  Limitations and Restrictions
  ----------------------------
diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index b9cfee2e81246e638b43c844fba0b985b054db25..0b61c97eb4da391e66b7094115983094b3da2409 100644
*** a/src/backend/catalog/dependency.c
--- b/src/backend/catalog/dependency.c
*************** deleteOneObject(const ObjectAddress *obj
*** 995,1001 ****
  	int			nkeys;
  	SysScanDesc scan;
  	HeapTuple	tup;
- 	Oid			depRelOid = depRel->rd_id;
  
  	/* DROP hook of the objects being removed */
  	if (object_access_hook)
--- 995,1000 ----
*************** deleteOneObject(const ObjectAddress *obj
*** 1008,1016 ****
  	}
  
  	/*
! 	 * Close depRel if we are doing a drop concurrently. The individual
! 	 * deletion has to commit the transaction and we don't want dangling
! 	 * references.
  	 */
  	if (flags & PERFORM_DELETION_CONCURRENTLY)
  		heap_close(depRel, RowExclusiveLock);
--- 1007,1015 ----
  	}
  
  	/*
! 	 * Close depRel if we are doing a drop concurrently.  The object deletion
! 	 * subroutine will commit the current transaction, so we can't keep the
! 	 * relation open across doDeletion().
  	 */
  	if (flags & PERFORM_DELETION_CONCURRENTLY)
  		heap_close(depRel, RowExclusiveLock);
*************** deleteOneObject(const ObjectAddress *obj
*** 1018,1041 ****
  	/*
  	 * Delete the object itself, in an object-type-dependent way.
  	 *
! 	 * Do this before removing outgoing dependencies as deletions can be
! 	 * happening in concurrent mode. That will only happen for a single object
! 	 * at once and if so the object will be invalidated inside a separate
! 	 * transaction and only dropped inside a transaction thats in-progress when
! 	 * doDeletion returns. This way no observer can see dangling dependency
! 	 * entries.
  	 */
  	doDeletion(object, flags);
  
  	/*
! 	 * Reopen depRel if we closed it before
  	 */
  	if (flags & PERFORM_DELETION_CONCURRENTLY)
! 		depRel = heap_open(depRelOid, RowExclusiveLock);
  
  	/*
! 	 * Then remove any pg_depend records that link from this object to
! 	 * others.	(Any records linking to this object should be gone already.)
  	 *
  	 * When dropping a whole object (subId = 0), remove all pg_depend records
  	 * for its sub-objects too.
--- 1017,1039 ----
  	/*
  	 * Delete the object itself, in an object-type-dependent way.
  	 *
! 	 * We used to do this after removing the outgoing dependency links, but it
! 	 * seems just as reasonable to do it beforehand.  In the concurrent case
! 	 * we *must* do it in this order, because we can't make any transactional
! 	 * updates before calling doDeletion() --- they'd get committed right
! 	 * away, which is not cool if the deletion then fails.
  	 */
  	doDeletion(object, flags);
  
  	/*
! 	 * Reopen depRel if we closed it above
  	 */
  	if (flags & PERFORM_DELETION_CONCURRENTLY)
! 		depRel = heap_open(DependRelationId, RowExclusiveLock);
  
  	/*
! 	 * Now remove any pg_depend records that link from this object to others.
! 	 * (Any records linking to this object should be gone already.)
  	 *
  	 * When dropping a whole object (subId = 0), remove all pg_depend records
  	 * for its sub-objects too.
*************** AcquireDeletionLock(const ObjectAddress 
*** 1258,1272 ****
--- 1256,1278 ----
  {
  	if (object->classId == RelationRelationId)
  	{
+ 		/*
+ 		 * In DROP INDEX CONCURRENTLY, take only ShareUpdateExclusiveLock on
+ 		 * the index for the moment.  index_drop() will promote the lock once
+ 		 * it's safe to do so.  In all other cases we need full exclusive
+ 		 * lock.
+ 		 */
  		if (flags & PERFORM_DELETION_CONCURRENTLY)
  			LockRelationOid(object->objectId, ShareUpdateExclusiveLock);
  		else
  			LockRelationOid(object->objectId, AccessExclusiveLock);
  	}
  	else
+ 	{
  		/* assume we should lock the whole object not a sub-object */
  		LockDatabaseObject(object->classId, object->objectId, 0,
  						   AccessExclusiveLock);
+ 	}
  }
  
  /*
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index d2d91c1b786e3247ba3b289830e19896e4ea6a64..78dbc5dccf4fd41efb70190b19a74041d67f89c7 100644
*** a/src/backend/catalog/index.c
--- b/src/backend/catalog/index.c
*************** static void ResetReindexPending(void);
*** 125,130 ****
--- 125,134 ----
   *		See whether an existing relation has a primary key.
   *
   * Caller must have suitable lock on the relation.
+  *
+  * Note: we intentionally do not check IndexIsValid here; that's because this
+  * is used to enforce the rule that there can be only one indisprimary index,
+  * and we want that to be true even if said index is invalid.
   */
  static bool
  relationHasPrimaryKey(Relation rel)
*************** UpdateIndexRelation(Oid indexoid,
*** 608,613 ****
--- 612,618 ----
  	values[Anum_pg_index_indcheckxmin - 1] = BoolGetDatum(false);
  	/* we set isvalid and isready the same way */
  	values[Anum_pg_index_indisready - 1] = BoolGetDatum(isvalid);
+ 	values[Anum_pg_index_indislive - 1] = BoolGetDatum(true);
  	values[Anum_pg_index_indkey - 1] = PointerGetDatum(indkey);
  	values[Anum_pg_index_indcollation - 1] = PointerGetDatum(indcollation);
  	values[Anum_pg_index_indclass - 1] = PointerGetDatum(indclass);
*************** index_constraint_create(Relation heapRel
*** 1258,1265 ****
  	 * Note: since this is a transactional update, it's unsafe against
  	 * concurrent SnapshotNow scans of pg_index.  When making an existing
  	 * index into a constraint, caller must have a table lock that prevents
! 	 * concurrent table updates, and there is a risk that concurrent readers
! 	 * of the table will miss seeing this index at all.
  	 */
  	if (update_pgindex && (mark_as_primary || deferrable))
  	{
--- 1263,1271 ----
  	 * Note: since this is a transactional update, it's unsafe against
  	 * concurrent SnapshotNow scans of pg_index.  When making an existing
  	 * index into a constraint, caller must have a table lock that prevents
! 	 * concurrent table updates; if it's less than a full exclusive lock,
! 	 * there is a risk that concurrent readers of the table will miss seeing
! 	 * this index at all.
  	 */
  	if (update_pgindex && (mark_as_primary || deferrable))
  	{
*************** index_drop(Oid indexId, bool concurrent)
*** 1317,1324 ****
  	LockRelId	heaprelid,
  				indexrelid;
  	LOCKTAG		heaplocktag;
  	VirtualTransactionId *old_lockholders;
- 	Form_pg_index indexForm;
  
  	/*
  	 * To drop an index safely, we must grab exclusive lock on its parent
--- 1323,1330 ----
  	LockRelId	heaprelid,
  				indexrelid;
  	LOCKTAG		heaplocktag;
+ 	LOCKMODE	lockmode;
  	VirtualTransactionId *old_lockholders;
  
  	/*
  	 * To drop an index safely, we must grab exclusive lock on its parent
*************** index_drop(Oid indexId, bool concurrent)
*** 1330,1434 ****
  	 * proceeding until we commit and send out a shared-cache-inval notice
  	 * that will make them update their index lists.
  	 *
! 	 * In the concurrent case we make sure that nobody can be looking at the
! 	 * indexes by dropping the index in multiple steps, so we don't need a full
! 	 * AccessExclusiveLock yet.
! 	 *
! 	 * All predicate locks on the index are about to be made invalid. Promote
! 	 * them to relation locks on the heap. For correctness the index must not
! 	 * be seen with indisvalid = true during query planning after the move
! 	 * starts, so that the index will not be used for a scan after the
! 	 * predicate lock move, as this could create new predicate locks on the
! 	 * index which would not ensure a heap relation lock. Also, the index must
! 	 * not be seen during execution of a heap tuple insert with indisready =
! 	 * false before the move is complete, since the conflict with the
! 	 * predicate lock on the index gap could be missed before the lock on the
! 	 * heap relation is in place to detect a conflict based on the heap tuple
! 	 * insert.
  	 */
  	heapId = IndexGetRelation(indexId, false);
! 	if (concurrent)
! 	{
! 		userHeapRelation = heap_open(heapId, ShareUpdateExclusiveLock);
! 		userIndexRelation = index_open(indexId, ShareUpdateExclusiveLock);
! 	}
! 	else
! 	{
! 		userHeapRelation = heap_open(heapId, AccessExclusiveLock);
! 		userIndexRelation = index_open(indexId, AccessExclusiveLock);
! 	}
  
  	/*
! 	 * We might still have open queries using it in our own session.
  	 */
  	CheckTableNotInUse(userIndexRelation, "DROP INDEX");
  
  	/*
! 	 * Drop Index concurrently is similar in many ways to creating an index
! 	 * concurrently, so some actions are similar to DefineIndex() just in the
! 	 * reverse order.
  	 *
  	 * First we unset indisvalid so queries starting afterwards don't use the
! 	 * index to answer queries anymore. We have to keep indisready = true
! 	 * so transactions that are still scanning the index can continue to
! 	 * see valid index contents. E.g. when they are using READ COMMITTED mode,
! 	 * and another transactions that started later commits makes changes and
! 	 * commits, they need to see those new tuples in the index.
  	 *
! 	 * After all transactions that could possibly have used it for queries
! 	 * ended we can unset indisready and wait till nobody could be updating it
! 	 * anymore.
  	 */
  	if (concurrent)
  	{
  		/*
! 		 * Mark index invalid by updating its pg_index entry
! 		 *
! 		 * Don't Assert(indexForm->indisvalid) because we may be trying to
! 		 * clear up after an error when trying to create an index which left
! 		 * the index invalid
  		 */
! 		indexRelation = heap_open(IndexRelationId, RowExclusiveLock);
! 
! 		tuple = SearchSysCacheCopy1(INDEXRELID,
! 									ObjectIdGetDatum(indexId));
! 		if (!HeapTupleIsValid(tuple))
! 			elog(ERROR, "cache lookup failed for index %u", indexId);
! 		indexForm = (Form_pg_index) GETSTRUCT(tuple);
  
  		/*
! 		 * If indisready == true we leave it set so the index still gets
! 		 * maintained by pre-existing transactions. We only need to ensure
! 		 * that indisvalid is false.
  		 */
! 		if (indexForm->indisvalid)
! 		{
! 			indexForm->indisvalid = false;	/* make unusable for new queries */
! 
! 			simple_heap_update(indexRelation, &tuple->t_self, tuple);
! 			CatalogUpdateIndexes(indexRelation, tuple);
! 		}
  
! 		heap_close(indexRelation, RowExclusiveLock);
  
  		/* save lockrelid and locktag for below, then close but keep locks */
  		heaprelid = userHeapRelation->rd_lockInfo.lockRelId;
  		SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
- 		heap_close(userHeapRelation, NoLock);
- 
  		indexrelid = userIndexRelation->rd_lockInfo.lockRelId;
  		index_close(userIndexRelation, NoLock);
  
  		/*
! 		 * For a concurrent drop, it's important to make the catalog entries
! 		 * visible to other transactions before we drop the index. The index
! 		 * will be marked not indisvalid, so that no one else tries to use it
! 		 * for queries.
! 		 *
! 		 * We must commit our current transaction so that the index update
! 		 * becomes visible; then start another.  Note that all the data
! 		 * structures we just built are lost in the commit.  The only data we
! 		 * keep past here are the relation IDs.
  		 *
  		 * Before committing, get a session-level lock on the table, to ensure
  		 * that neither it nor the index can be dropped before we finish. This
--- 1336,1436 ----
  	 * proceeding until we commit and send out a shared-cache-inval notice
  	 * that will make them update their index lists.
  	 *
! 	 * In the concurrent case we avoid this requirement by disabling index use
! 	 * in multiple steps and waiting out any transactions that might be using
! 	 * the index, so we don't need exclusive lock on the parent table. Instead
! 	 * we take ShareUpdateExclusiveLock, to ensure that two sessions aren't
! 	 * doing CREATE/DROP INDEX CONCURRENTLY on the same index.	(We will get
! 	 * AccessExclusiveLock on the index below, once we're sure nobody else is
! 	 * using it.)
  	 */
  	heapId = IndexGetRelation(indexId, false);
! 	lockmode = concurrent ? ShareUpdateExclusiveLock : AccessExclusiveLock;
! 	userHeapRelation = heap_open(heapId, lockmode);
! 	userIndexRelation = index_open(indexId, lockmode);
  
  	/*
! 	 * We might still have open queries using it in our own session, which the
! 	 * above locking won't prevent, so test explicitly.
  	 */
  	CheckTableNotInUse(userIndexRelation, "DROP INDEX");
  
  	/*
! 	 * Drop Index Concurrently is more or less the reverse process of Create
! 	 * Index Concurrently.
  	 *
  	 * First we unset indisvalid so queries starting afterwards don't use the
! 	 * index to answer queries anymore.  We have to keep indisready = true so
! 	 * transactions that are still scanning the index can continue to see
! 	 * valid index contents.  For instance, if they are using READ COMMITTED
! 	 * mode, and another transaction makes changes and commits, they need to
! 	 * see those new tuples in the index.
  	 *
! 	 * After all transactions that could possibly have used the index for
! 	 * queries end, we can unset indisready and indislive, then wait till
! 	 * nobody could be touching it anymore.  (Note: we need indislive because
! 	 * this state must be distinct from the initial state during CREATE INDEX
! 	 * CONCURRENTLY, which has indislive true while indisready and indisvalid
! 	 * are false.  That's because in that state, transactions must examine the
! 	 * index for HOT-safety decisions, while in this state we don't want them
! 	 * to open it at all.)
! 	 *
! 	 * Since all predicate locks on the index are about to be made invalid, we
! 	 * must promote them to predicate locks on the heap.  In the
! 	 * non-concurrent case we can just do that now.  In the concurrent case
! 	 * it's a bit trickier.  The predicate locks must be moved when there are
! 	 * no index scans in progress on the index and no more can subsequently
! 	 * start, so that no new predicate locks can be made on the index.	Also,
! 	 * they must be moved before heap inserts stop maintaining the index, else
! 	 * the conflict with the predicate lock on the index gap could be missed
! 	 * before the lock on the heap relation is in place to detect a conflict
! 	 * based on the heap tuple insert.
  	 */
  	if (concurrent)
  	{
  		/*
! 		 * We must commit our transaction in order to make the first pg_index
! 		 * state update visible to other sessions.	If the DROP machinery has
! 		 * already performed any other actions (removal of other objects,
! 		 * pg_depend entries, etc), the commit would make those actions
! 		 * permanent, which would leave us with inconsistent catalog state if
! 		 * we fail partway through the following sequence.	Since DROP INDEX
! 		 * CONCURRENTLY is restricted to dropping just one index that has no
! 		 * dependencies, we should get here before anything's been done ---
! 		 * but let's check that to be sure.  We can verify that the current
! 		 * transaction has not executed any transactional updates by checking
! 		 * that no XID has been assigned.
  		 */
! 		if (GetTopTransactionIdIfAny() != InvalidTransactionId)
! 			ereport(ERROR,
! 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 					 errmsg("DROP INDEX CONCURRENTLY must be first action in transaction")));
  
  		/*
! 		 * Mark index invalid by updating its pg_index entry
  		 */
! 		index_set_state_flags(indexId, INDEX_DROP_CLEAR_VALID);
  
! 		/*
! 		 * Invalidate the relcache for the table, so that after this commit
! 		 * all sessions will refresh any cached plans that might reference the
! 		 * index.
! 		 */
! 		CacheInvalidateRelcache(userHeapRelation);
  
  		/* save lockrelid and locktag for below, then close but keep locks */
  		heaprelid = userHeapRelation->rd_lockInfo.lockRelId;
  		SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
  		indexrelid = userIndexRelation->rd_lockInfo.lockRelId;
+ 
+ 		heap_close(userHeapRelation, NoLock);
  		index_close(userIndexRelation, NoLock);
  
  		/*
! 		 * We must commit our current transaction so that the indisvalid
! 		 * update becomes visible to other transactions; then start another.
! 		 * Note that any previously-built data structures are lost in the
! 		 * commit.	The only data we keep past here are the relation IDs.
  		 *
  		 * Before committing, get a session-level lock on the table, to ensure
  		 * that neither it nor the index can be dropped before we finish. This
*************** index_drop(Oid indexId, bool concurrent)
*** 1443,1455 ****
  		StartTransactionCommand();
  
  		/*
! 		 * Now we must wait until no running transaction could have the table
! 		 * open with the old list of indexes.  To do this, inquire which xacts
! 		 * currently would conflict with AccessExclusiveLock on the table --
! 		 * ie, which ones have a lock of any kind on the table. Then wait for
! 		 * each of these xacts to commit or abort.	Note we do not need to
! 		 * worry about xacts that open the table for writing after this point;
! 		 * they will see the index as invalid when they open the relation.
  		 *
  		 * Note: the reason we use actual lock acquisition here, rather than
  		 * just checking the ProcArray and sleeping, is that deadlock is
--- 1445,1457 ----
  		StartTransactionCommand();
  
  		/*
! 		 * Now we must wait until no running transaction could be using the
! 		 * index for a query.  To do this, inquire which xacts currently would
! 		 * conflict with AccessExclusiveLock on the table -- ie, which ones
! 		 * have a lock of any kind on the table. Then wait for each of these
! 		 * xacts to commit or abort. Note we do not need to worry about xacts
! 		 * that open the table for reading after this point; they will see the
! 		 * index as invalid when they open the relation.
  		 *
  		 * Note: the reason we use actual lock acquisition here, rather than
  		 * just checking the ProcArray and sleeping, is that deadlock is
*************** index_drop(Oid indexId, bool concurrent)
*** 1480,1507 ****
  		TransferPredicateLocksToHeapRelation(userIndexRelation);
  
  		/*
! 		 * Now we are sure that nobody uses the index for queries, they just
! 		 * might have it opened for updating it. So now we can unset
! 		 * indisready and wait till nobody could update the index anymore.
  		 */
! 		indexRelation = heap_open(IndexRelationId, RowExclusiveLock);
! 
! 		tuple = SearchSysCacheCopy1(INDEXRELID,
! 									ObjectIdGetDatum(indexId));
! 		if (!HeapTupleIsValid(tuple))
! 			elog(ERROR, "cache lookup failed for index %u", indexId);
! 		indexForm = (Form_pg_index) GETSTRUCT(tuple);
! 
! 		Assert(indexForm->indisvalid == false);
! 		if (indexForm->indisready)
! 		{
! 			indexForm->indisready = false;	/* don't update index anymore */
! 
! 			simple_heap_update(indexRelation, &tuple->t_self, tuple);
! 			CatalogUpdateIndexes(indexRelation, tuple);
! 		}
  
! 		heap_close(indexRelation, RowExclusiveLock);
  
  		/*
  		 * Close the relations again, though still holding session lock.
--- 1482,1500 ----
  		TransferPredicateLocksToHeapRelation(userIndexRelation);
  
  		/*
! 		 * Now we are sure that nobody uses the index for queries; they just
! 		 * might have it open for updating it.	So now we can unset indisready
! 		 * and indislive, then wait till nobody could be using it at all
! 		 * anymore.
  		 */
! 		index_set_state_flags(indexId, INDEX_DROP_SET_DEAD);
  
! 		/*
! 		 * Invalidate the relcache for the table, so that after this commit
! 		 * all sessions will refresh the table's index list.  Forgetting just
! 		 * the index's relcache entry is not enough.
! 		 */
! 		CacheInvalidateRelcache(userHeapRelation);
  
  		/*
  		 * Close the relations again, though still holding session lock.
*************** index_drop(Oid indexId, bool concurrent)
*** 1510,1532 ****
  		index_close(userIndexRelation, NoLock);
  
  		/*
! 		 * Invalidate the relcache for the table, so that after this
! 		 * transaction we will refresh the index list. Forgetting just the
! 		 * index is not enough.
! 		 */
! 		CacheInvalidateRelcache(userHeapRelation);
! 
! 		/*
! 		 * Just as with indisvalid = false we need to make sure indisready
! 		 * is false is visible for everyone.
  		 */
  		CommitTransactionCommand();
  		StartTransactionCommand();
  
  		/*
! 		 * Wait till everyone that saw indisready = true finished so we can
! 		 * finally really remove the index. The logic here is the same as
! 		 * above.
  		 */
  		old_lockholders = GetLockConflicts(&heaplocktag, AccessExclusiveLock);
  
--- 1503,1517 ----
  		index_close(userIndexRelation, NoLock);
  
  		/*
! 		 * Again, commit the transaction to make the pg_index update visible
! 		 * to other sessions.
  		 */
  		CommitTransactionCommand();
  		StartTransactionCommand();
  
  		/*
! 		 * Wait till every transaction that saw the old index state has
! 		 * finished.  The logic here is the same as above.
  		 */
  		old_lockholders = GetLockConflicts(&heaplocktag, AccessExclusiveLock);
  
*************** index_drop(Oid indexId, bool concurrent)
*** 1547,1553 ****
--- 1532,1541 ----
  		userIndexRelation = index_open(indexId, AccessExclusiveLock);
  	}
  	else
+ 	{
+ 		/* Not concurrent, so just transfer predicate locks and we're good */
  		TransferPredicateLocksToHeapRelation(userIndexRelation);
+ 	}
  
  	/*
  	 * Schedule physical removal of the files
*************** index_drop(Oid indexId, bool concurrent)
*** 1601,1607 ****
  	 * of relhasindex (the next VACUUM will fix it if necessary). So there is
  	 * no need to update the pg_class tuple for the owning relation. But we
  	 * must send out a shared-cache-inval notice on the owning relation to
! 	 * ensure other backends update their relcache lists of indexes.
  	 */
  	CacheInvalidateRelcache(userHeapRelation);
  
--- 1589,1596 ----
  	 * of relhasindex (the next VACUUM will fix it if necessary). So there is
  	 * no need to update the pg_class tuple for the owning relation. But we
  	 * must send out a shared-cache-inval notice on the owning relation to
! 	 * ensure other backends update their relcache lists of indexes.  (In the
! 	 * concurrent case, this is redundant but harmless.)
  	 */
  	CacheInvalidateRelcache(userHeapRelation);
  
*************** BuildIndexInfo(Relation index)
*** 1677,1683 ****
  
  	/* other info */
  	ii->ii_Unique = indexStruct->indisunique;
! 	ii->ii_ReadyForInserts = indexStruct->indisready;
  
  	/* initialize index-build state to default */
  	ii->ii_Concurrent = false;
--- 1666,1672 ----
  
  	/* other info */
  	ii->ii_Unique = indexStruct->indisunique;
! 	ii->ii_ReadyForInserts = IndexIsReady(indexStruct);
  
  	/* initialize index-build state to default */
  	ii->ii_Concurrent = false;
*************** index_build(Relation heapRelation,
*** 2035,2042 ****
  	 * index's usability horizon.  Moreover, we *must not* try to change the
  	 * index's pg_index entry while reindexing pg_index itself, and this
  	 * optimization nicely prevents that.
  	 */
! 	if (indexInfo->ii_BrokenHotChain && !isreindex)
  	{
  		Oid			indexId = RelationGetRelid(indexRelation);
  		Relation	pg_index;
--- 2024,2043 ----
  	 * index's usability horizon.  Moreover, we *must not* try to change the
  	 * index's pg_index entry while reindexing pg_index itself, and this
  	 * optimization nicely prevents that.
+ 	 *
+ 	 * We also need not set indcheckxmin during a concurrent index build,
+ 	 * because we won't set indisvalid true until all transactions that care
+ 	 * about the broken HOT chains are gone.
+ 	 *
+ 	 * Therefore, this code path can only be taken during non-concurrent
+ 	 * CREATE INDEX.  Thus the fact that heap_update will set the pg_index
+ 	 * tuple's xmin doesn't matter, because that tuple was created in the
+ 	 * current transaction anyway.	That also means we don't need to worry
+ 	 * about any concurrent readers of the tuple; no other transaction can see
+ 	 * it yet.
  	 */
! 	if (indexInfo->ii_BrokenHotChain && !isreindex &&
! 		!indexInfo->ii_Concurrent)
  	{
  		Oid			indexId = RelationGetRelid(indexRelation);
  		Relation	pg_index;
*************** validate_index_heapscan(Relation heapRel
*** 3000,3005 ****
--- 3001,3091 ----
  
  
  /*
+  * index_set_state_flags - adjust pg_index state flags
+  *
+  * This is used during CREATE/DROP INDEX CONCURRENTLY to adjust the pg_index
+  * flags that denote the index's state.  We must use an in-place update of
+  * the pg_index tuple, because we do not have exclusive lock on the parent
+  * table and so other sessions might concurrently be doing SnapshotNow scans
+  * of pg_index to identify the table's indexes.  A transactional update would
+  * risk somebody not seeing the index at all.  Because the update is not
+  * transactional and will not roll back on error, this must only be used as
+  * the last step in a transaction that has not made any transactional catalog
+  * updates!
+  *
+  * Note that heap_inplace_update does send a cache inval message for the
+  * tuple, so other sessions will hear about the update as soon as we commit.
+  */
+ void
+ index_set_state_flags(Oid indexId, IndexStateFlagsAction action)
+ {
+ 	Relation	pg_index;
+ 	HeapTuple	indexTuple;
+ 	Form_pg_index indexForm;
+ 
+ 	pg_index = heap_open(IndexRelationId, RowExclusiveLock);
+ 
+ 	indexTuple = SearchSysCacheCopy1(INDEXRELID,
+ 									 ObjectIdGetDatum(indexId));
+ 	if (!HeapTupleIsValid(indexTuple))
+ 		elog(ERROR, "cache lookup failed for index %u", indexId);
+ 	indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
+ 
+ 	switch (action)
+ 	{
+ 		case INDEX_CREATE_SET_READY:
+ 			/* Set indisready during a CREATE INDEX CONCURRENTLY sequence */
+ 			Assert(indexForm->indislive);
+ 			Assert(!indexForm->indisready);
+ 			Assert(!indexForm->indisvalid);
+ 			indexForm->indisready = true;
+ 			break;
+ 		case INDEX_CREATE_SET_VALID:
+ 			/* Set indisvalid during a CREATE INDEX CONCURRENTLY sequence */
+ 			Assert(indexForm->indislive);
+ 			Assert(indexForm->indisready);
+ 			Assert(!indexForm->indisvalid);
+ 			indexForm->indisvalid = true;
+ 			break;
+ 		case INDEX_DROP_CLEAR_VALID:
+ 
+ 			/*
+ 			 * Clear indisvalid during a DROP INDEX CONCURRENTLY sequence
+ 			 *
+ 			 * If indisready == true we leave it set so the index still gets
+ 			 * maintained by active transactions.  We only need to ensure that
+ 			 * indisvalid is false.  (We don't assert that either is initially
+ 			 * true, though, since we want to be able to retry a DROP INDEX
+ 			 * CONCURRENTLY that failed partway through.)
+ 			 *
+ 			 * Note: the CLUSTER logic assumes that indisclustered cannot be
+ 			 * set on any invalid index, so clear that flag too.
+ 			 */
+ 			indexForm->indisvalid = false;
+ 			indexForm->indisclustered = false;
+ 			break;
+ 		case INDEX_DROP_SET_DEAD:
+ 
+ 			/*
+ 			 * Clear indisready/indislive during DROP INDEX CONCURRENTLY
+ 			 *
+ 			 * We clear both indisready and indislive, because we not only
+ 			 * want to stop updates, we want to prevent sessions from touching
+ 			 * the index at all.
+ 			 */
+ 			Assert(!indexForm->indisvalid);
+ 			indexForm->indisready = false;
+ 			indexForm->indislive = false;
+ 			break;
+ 	}
+ 
+ 	heap_inplace_update(pg_index, indexTuple);
+ 
+ 	heap_close(pg_index, RowExclusiveLock);
+ }
+ 
+ 
+ /*
   * IndexGetRelation: given an index's relation OID, get the OID of the
   * relation it is an index on.	Uses the system cache.
   */
*************** void
*** 3032,3043 ****
  reindex_index(Oid indexId, bool skip_constraint_checks)
  {
  	Relation	iRel,
! 				heapRelation,
! 				pg_index;
  	Oid			heapId;
  	IndexInfo  *indexInfo;
- 	HeapTuple	indexTuple;
- 	Form_pg_index indexForm;
  	volatile bool skipped_constraint = false;
  
  	/*
--- 3118,3126 ----
  reindex_index(Oid indexId, bool skip_constraint_checks)
  {
  	Relation	iRel,
! 				heapRelation;
  	Oid			heapId;
  	IndexInfo  *indexInfo;
  	volatile bool skipped_constraint = false;
  
  	/*
*************** reindex_index(Oid indexId, bool skip_con
*** 3110,3141 ****
  	ResetReindexProcessing();
  
  	/*
! 	 * If the index is marked invalid or not ready (ie, it's from a failed
! 	 * CREATE INDEX CONCURRENTLY), and we didn't skip a uniqueness check, we
! 	 * can now mark it valid.  This allows REINDEX to be used to clean up in
! 	 * such cases.
  	 *
  	 * We can also reset indcheckxmin, because we have now done a
  	 * non-concurrent index build, *except* in the case where index_build
! 	 * found some still-broken HOT chains.	If it did, we normally leave
! 	 * indcheckxmin alone (note that index_build won't have changed it,
! 	 * because this is a reindex).	But if the index was invalid or not ready
! 	 * and there were broken HOT chains, it seems best to force indcheckxmin
! 	 * true, because the normal argument that the HOT chains couldn't conflict
! 	 * with the index is suspect for an invalid index.
  	 *
! 	 * Note that it is important to not update the pg_index entry if we don't
! 	 * have to, because updating it will move the index's usability horizon
! 	 * (recorded as the tuple's xmin value) if indcheckxmin is true.  We don't
! 	 * really want REINDEX to move the usability horizon forward ever, but we
! 	 * have no choice if we are to fix indisvalid or indisready.  Of course,
! 	 * clearing indcheckxmin eliminates the issue, so we're happy to do that
! 	 * if we can.  Another reason for caution here is that while reindexing
! 	 * pg_index itself, we must not try to update it.  We assume that
! 	 * pg_index's indexes will always have these flags in their clean state.
  	 */
  	if (!skipped_constraint)
  	{
  		pg_index = heap_open(IndexRelationId, RowExclusiveLock);
  
  		indexTuple = SearchSysCacheCopy1(INDEXRELID,
--- 3193,3240 ----
  	ResetReindexProcessing();
  
  	/*
! 	 * If the index is marked invalid/not-ready/dead (ie, it's from a failed
! 	 * CREATE INDEX CONCURRENTLY, or a DROP INDEX CONCURRENTLY failed midway),
! 	 * and we didn't skip a uniqueness check, we can now mark it valid.  This
! 	 * allows REINDEX to be used to clean up in such cases.
  	 *
  	 * We can also reset indcheckxmin, because we have now done a
  	 * non-concurrent index build, *except* in the case where index_build
! 	 * found some still-broken HOT chains. If it did, and we don't have to
! 	 * change any of the other flags, we just leave indcheckxmin alone (note
! 	 * that index_build won't have changed it, because this is a reindex).
! 	 * This is okay and desirable because not updating the tuple leaves the
! 	 * index's usability horizon (recorded as the tuple's xmin value) the same
! 	 * as it was.
  	 *
! 	 * But, if the index was invalid/not-ready/dead and there were broken HOT
! 	 * chains, we had better force indcheckxmin true, because the normal
! 	 * argument that the HOT chains couldn't conflict with the index is
! 	 * suspect for an invalid index.  (A conflict is definitely possible if
! 	 * the index was dead.	It probably shouldn't happen otherwise, but let's
! 	 * be conservative.)  In this case advancing the usability horizon is
! 	 * appropriate.
! 	 *
! 	 * Note that if we have to update the tuple, there is a risk of concurrent
! 	 * transactions not seeing it during their SnapshotNow scans of pg_index.
! 	 * While not especially desirable, this is safe because no such
! 	 * transaction could be trying to update the table (since we have
! 	 * ShareLock on it).  The worst case is that someone might transiently
! 	 * fail to use the index for a query --- but it was probably unusable
! 	 * before anyway, if we are updating the tuple.
! 	 *
! 	 * Another reason for avoiding unnecessary updates here is that while
! 	 * reindexing pg_index itself, we must not try to update tuples in it.
! 	 * pg_index's indexes should always have these flags in their clean state,
! 	 * so that won't happen.
  	 */
  	if (!skipped_constraint)
  	{
+ 		Relation	pg_index;
+ 		HeapTuple	indexTuple;
+ 		Form_pg_index indexForm;
+ 		bool		index_bad;
+ 
  		pg_index = heap_open(IndexRelationId, RowExclusiveLock);
  
  		indexTuple = SearchSysCacheCopy1(INDEXRELID,
*************** reindex_index(Oid indexId, bool skip_con
*** 3144,3160 ****
  			elog(ERROR, "cache lookup failed for index %u", indexId);
  		indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
  
! 		if (!indexForm->indisvalid || !indexForm->indisready ||
  			(indexForm->indcheckxmin && !indexInfo->ii_BrokenHotChain))
  		{
  			if (!indexInfo->ii_BrokenHotChain)
  				indexForm->indcheckxmin = false;
! 			else if (!indexForm->indisvalid || !indexForm->indisready)
  				indexForm->indcheckxmin = true;
  			indexForm->indisvalid = true;
  			indexForm->indisready = true;
  			simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
  			CatalogUpdateIndexes(pg_index, indexTuple);
  		}
  
  		heap_close(pg_index, RowExclusiveLock);
--- 3243,3272 ----
  			elog(ERROR, "cache lookup failed for index %u", indexId);
  		indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
  
! 		index_bad = (!indexForm->indisvalid ||
! 					 !indexForm->indisready ||
! 					 !indexForm->indislive);
! 		if (index_bad ||
  			(indexForm->indcheckxmin && !indexInfo->ii_BrokenHotChain))
  		{
  			if (!indexInfo->ii_BrokenHotChain)
  				indexForm->indcheckxmin = false;
! 			else if (index_bad)
  				indexForm->indcheckxmin = true;
  			indexForm->indisvalid = true;
  			indexForm->indisready = true;
+ 			indexForm->indislive = true;
  			simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
  			CatalogUpdateIndexes(pg_index, indexTuple);
+ 
+ 			/*
+ 			 * Invalidate the relcache for the table, so that after we commit
+ 			 * all sessions will refresh the table's index list.  This ensures
+ 			 * that if anyone misses seeing the pg_index row during this
+ 			 * update, they'll refresh their list before attempting any update
+ 			 * on the table.
+ 			 */
+ 			CacheInvalidateRelcache(heapRelation);
  		}
  
  		heap_close(pg_index, RowExclusiveLock);
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index de71a3594d48302b4b6be801ad383a2f509bdd6d..c3deb567626ef22cf4e2dbf072267d1aed562b95 100644
*** a/src/backend/commands/cluster.c
--- b/src/backend/commands/cluster.c
*************** check_index_is_clusterable(Relation OldH
*** 444,450 ****
  	 * might put recently-dead tuples out-of-order in the new table, and there
  	 * is little harm in that.)
  	 */
! 	if (!OldIndex->rd_index->indisvalid)
  		ereport(ERROR,
  				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
  				 errmsg("cannot cluster on invalid index \"%s\"",
--- 444,450 ----
  	 * might put recently-dead tuples out-of-order in the new table, and there
  	 * is little harm in that.)
  	 */
! 	if (!IndexIsValid(OldIndex->rd_index))
  		ereport(ERROR,
  				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
  				 errmsg("cannot cluster on invalid index \"%s\"",
*************** check_index_is_clusterable(Relation OldH
*** 458,463 ****
--- 458,468 ----
   * mark_index_clustered: mark the specified index as the one clustered on
   *
   * With indexOid == InvalidOid, will mark all indexes of rel not-clustered.
+  *
+  * Note: we do transactional updates of the pg_index rows, which are unsafe
+  * against concurrent SnapshotNow scans of pg_index.  Therefore this is unsafe
+  * to execute with less than full exclusive lock on the parent table;
+  * otherwise concurrent executions of RelationGetIndexList could miss indexes.
   */
  void
  mark_index_clustered(Relation rel, Oid indexOid)
*************** mark_index_clustered(Relation rel, Oid i
*** 513,518 ****
--- 518,526 ----
  		}
  		else if (thisIndexOid == indexOid)
  		{
+ 			/* this was checked earlier, but let's be real sure */
+ 			if (!IndexIsValid(indexForm))
+ 				elog(ERROR, "cannot cluster on invalid index %u", indexOid);
  			indexForm->indisclustered = true;
  			simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
  			CatalogUpdateIndexes(pg_index, indexTuple);
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index dd46cf93dad8b32bfea63cf699a026c2f7f8681a..75f9ff19cc70bcbd417eb2bedd8dc30a827f23ee 100644
*** a/src/backend/commands/indexcmds.c
--- b/src/backend/commands/indexcmds.c
*************** CheckIndexCompatible(Oid oldId,
*** 124,129 ****
--- 124,130 ----
  	Oid			accessMethodId;
  	Oid			relationId;
  	HeapTuple	tuple;
+ 	Form_pg_index indexForm;
  	Form_pg_am	accessMethodForm;
  	bool		amcanorder;
  	int16	   *coloptions;
*************** CheckIndexCompatible(Oid oldId,
*** 193,209 ****
  	tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(oldId));
  	if (!HeapTupleIsValid(tuple))
  		elog(ERROR, "cache lookup failed for index %u", oldId);
  
! 	/* We don't assess expressions or predicates; assume incompatibility. */
  	if (!(heap_attisnull(tuple, Anum_pg_index_indpred) &&
! 		  heap_attisnull(tuple, Anum_pg_index_indexprs)))
  	{
  		ReleaseSysCache(tuple);
  		return false;
  	}
  
  	/* Any change in operator class or collation breaks compatibility. */
! 	old_natts = ((Form_pg_index) GETSTRUCT(tuple))->indnatts;
  	Assert(old_natts == numberOfAttributes);
  
  	d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indcollation, &isnull);
--- 194,215 ----
  	tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(oldId));
  	if (!HeapTupleIsValid(tuple))
  		elog(ERROR, "cache lookup failed for index %u", oldId);
+ 	indexForm = (Form_pg_index) GETSTRUCT(tuple);
  
! 	/*
! 	 * We don't assess expressions or predicates; assume incompatibility.
! 	 * Also, if the index is invalid for any reason, treat it as incompatible.
! 	 */
  	if (!(heap_attisnull(tuple, Anum_pg_index_indpred) &&
! 		  heap_attisnull(tuple, Anum_pg_index_indexprs) &&
! 		  IndexIsValid(indexForm)))
  	{
  		ReleaseSysCache(tuple);
  		return false;
  	}
  
  	/* Any change in operator class or collation breaks compatibility. */
! 	old_natts = indexForm->indnatts;
  	Assert(old_natts == numberOfAttributes);
  
  	d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indcollation, &isnull);
*************** DefineIndex(IndexStmt *stmt,
*** 320,328 ****
  	LockRelId	heaprelid;
  	LOCKTAG		heaplocktag;
  	Snapshot	snapshot;
- 	Relation	pg_index;
- 	HeapTuple	indexTuple;
- 	Form_pg_index indexForm;
  	int			i;
  
  	/*
--- 326,331 ----
*************** DefineIndex(IndexStmt *stmt,
*** 717,739 ****
  	 * commit this transaction, any new transactions that open the table must
  	 * insert new entries into the index for insertions and non-HOT updates.
  	 */
! 	pg_index = heap_open(IndexRelationId, RowExclusiveLock);
! 
! 	indexTuple = SearchSysCacheCopy1(INDEXRELID,
! 									 ObjectIdGetDatum(indexRelationId));
! 	if (!HeapTupleIsValid(indexTuple))
! 		elog(ERROR, "cache lookup failed for index %u", indexRelationId);
! 	indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
! 
! 	Assert(!indexForm->indisready);
! 	Assert(!indexForm->indisvalid);
! 
! 	indexForm->indisready = true;
! 
! 	simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
! 	CatalogUpdateIndexes(pg_index, indexTuple);
! 
! 	heap_close(pg_index, RowExclusiveLock);
  
  	/* we can do away with our snapshot */
  	PopActiveSnapshot();
--- 720,726 ----
  	 * commit this transaction, any new transactions that open the table must
  	 * insert new entries into the index for insertions and non-HOT updates.
  	 */
! 	index_set_state_flags(indexRelationId, INDEX_CREATE_SET_READY);
  
  	/* we can do away with our snapshot */
  	PopActiveSnapshot();
*************** DefineIndex(IndexStmt *stmt,
*** 857,879 ****
  	/*
  	 * Index can now be marked valid -- update its pg_index entry
  	 */
! 	pg_index = heap_open(IndexRelationId, RowExclusiveLock);
! 
! 	indexTuple = SearchSysCacheCopy1(INDEXRELID,
! 									 ObjectIdGetDatum(indexRelationId));
! 	if (!HeapTupleIsValid(indexTuple))
! 		elog(ERROR, "cache lookup failed for index %u", indexRelationId);
! 	indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
! 
! 	Assert(indexForm->indisready);
! 	Assert(!indexForm->indisvalid);
! 
! 	indexForm->indisvalid = true;
! 
! 	simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
! 	CatalogUpdateIndexes(pg_index, indexTuple);
! 
! 	heap_close(pg_index, RowExclusiveLock);
  
  	/*
  	 * The pg_index update will cause backends (including this one) to update
--- 844,850 ----
  	/*
  	 * Index can now be marked valid -- update its pg_index entry
  	 */
! 	index_set_state_flags(indexRelationId, INDEX_CREATE_SET_VALID);
  
  	/*
  	 * The pg_index update will cause backends (including this one) to update
*************** DefineIndex(IndexStmt *stmt,
*** 881,887 ****
  	 * relcache inval on the parent table to force replanning of cached plans.
  	 * Otherwise existing sessions might fail to use the new index where it
  	 * would be useful.  (Note that our earlier commits did not create reasons
! 	 * to replan; relcache flush on the index itself was sufficient.)
  	 */
  	CacheInvalidateRelcacheByRelid(heaprelid.relId);
  
--- 852,858 ----
  	 * relcache inval on the parent table to force replanning of cached plans.
  	 * Otherwise existing sessions might fail to use the new index where it
  	 * would be useful.  (Note that our earlier commits did not create reasons
! 	 * to replan; so relcache flush on the index itself was sufficient.)
  	 */
  	CacheInvalidateRelcacheByRelid(heaprelid.relId);
  
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index f88bf793e57e60f98a73fcf1de5a750d34cdb5b0..4065740b336079305bc663b57ef8ae96b9692100 100644
*** a/src/backend/commands/tablecmds.c
--- b/src/backend/commands/tablecmds.c
*************** RemoveRelations(DropStmt *drop)
*** 744,753 ****
  	int			flags = 0;
  	LOCKMODE	lockmode = AccessExclusiveLock;
  
  	if (drop->concurrent)
  	{
  		lockmode = ShareUpdateExclusiveLock;
! 		if (list_length(drop->objects) > 1)
  			ereport(ERROR,
  					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
  					 errmsg("DROP INDEX CONCURRENTLY does not support dropping multiple objects")));
--- 744,756 ----
  	int			flags = 0;
  	LOCKMODE	lockmode = AccessExclusiveLock;
  
+ 	/* DROP CONCURRENTLY uses a weaker lock, and has some restrictions */
  	if (drop->concurrent)
  	{
+ 		flags |= PERFORM_DELETION_CONCURRENTLY;
  		lockmode = ShareUpdateExclusiveLock;
! 		Assert(drop->removeType == OBJECT_INDEX);
! 		if (list_length(drop->objects) != 1)
  			ereport(ERROR,
  					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
  					 errmsg("DROP INDEX CONCURRENTLY does not support dropping multiple objects")));
*************** RemoveRelations(DropStmt *drop)
*** 839,857 ****
  		add_exact_object_address(&obj, objects);
  	}
  
- 	/*
- 	 * Set options and check further requirements for concurrent drop
- 	 */
- 	if (drop->concurrent)
- 	{
- 		/*
- 		 * Confirm that concurrent behaviour is restricted in grammar.
- 		 */
- 		Assert(drop->removeType == OBJECT_INDEX);
- 
- 		flags |= PERFORM_DELETION_CONCURRENTLY;
- 	}
- 
  	performMultipleDeletions(objects, drop->behavior, flags);
  
  	free_object_addresses(objects);
--- 842,847 ----
*************** RangeVarCallbackForDropRelation(const Ra
*** 918,924 ****
  	 * locking the index.  index_drop() will need this anyway, and since
  	 * regular queries lock tables before their indexes, we risk deadlock if
  	 * we do it the other way around.  No error if we don't find a pg_index
! 	 * entry, though --- the relation may have been droppd.
  	 */
  	if (relkind == RELKIND_INDEX && relOid != oldRelOid)
  	{
--- 908,914 ----
  	 * locking the index.  index_drop() will need this anyway, and since
  	 * regular queries lock tables before their indexes, we risk deadlock if
  	 * we do it the other way around.  No error if we don't find a pg_index
! 	 * entry, though --- the relation may have been dropped.
  	 */
  	if (relkind == RELKIND_INDEX && relOid != oldRelOid)
  	{
*************** ATExecDropNotNull(Relation rel, const ch
*** 4784,4789 ****
--- 4774,4781 ----
  
  	/*
  	 * Check that the attribute is not in a primary key
+ 	 *
+ 	 * Note: we'll throw error even if the pkey index is not valid.
  	 */
  
  	/* Loop over all indexes on the relation */
*************** transformFkeyGetPrimaryKey(Relation pkre
*** 6318,6324 ****
  	/*
  	 * Get the list of index OIDs for the table from the relcache, and look up
  	 * each one in the pg_index syscache until we find one marked primary key
! 	 * (hopefully there isn't more than one such).
  	 */
  	*indexOid = InvalidOid;
  
--- 6310,6316 ----
  	/*
  	 * Get the list of index OIDs for the table from the relcache, and look up
  	 * each one in the pg_index syscache until we find one marked primary key
! 	 * (hopefully there isn't more than one such).  Insist it's valid, too.
  	 */
  	*indexOid = InvalidOid;
  
*************** transformFkeyGetPrimaryKey(Relation pkre
*** 6332,6338 ****
  		if (!HeapTupleIsValid(indexTuple))
  			elog(ERROR, "cache lookup failed for index %u", indexoid);
  		indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
! 		if (indexStruct->indisprimary)
  		{
  			/*
  			 * Refuse to use a deferrable primary key.	This is per SQL spec,
--- 6324,6330 ----
  		if (!HeapTupleIsValid(indexTuple))
  			elog(ERROR, "cache lookup failed for index %u", indexoid);
  		indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
! 		if (indexStruct->indisprimary && IndexIsValid(indexStruct))
  		{
  			/*
  			 * Refuse to use a deferrable primary key.	This is per SQL spec,
*************** transformFkeyCheckAttrs(Relation pkrel,
*** 6430,6439 ****
  
  		/*
  		 * Must have the right number of columns; must be unique and not a
! 		 * partial index; forget it if there are any expressions, too
  		 */
  		if (indexStruct->indnatts == numattrs &&
  			indexStruct->indisunique &&
  			heap_attisnull(indexTuple, Anum_pg_index_indpred) &&
  			heap_attisnull(indexTuple, Anum_pg_index_indexprs))
  		{
--- 6422,6433 ----
  
  		/*
  		 * Must have the right number of columns; must be unique and not a
! 		 * partial index; forget it if there are any expressions, too. Invalid
! 		 * indexes are out as well.
  		 */
  		if (indexStruct->indnatts == numattrs &&
  			indexStruct->indisunique &&
+ 			IndexIsValid(indexStruct) &&
  			heap_attisnull(indexTuple, Anum_pg_index_indpred) &&
  			heap_attisnull(indexTuple, Anum_pg_index_indexprs))
  		{
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 14d1c08c9738184b70b9bff27798bcfef9c5783a..c85289544808867257b1f3cf867fa67a87b6736a 100644
*** a/src/backend/commands/vacuum.c
--- b/src/backend/commands/vacuum.c
*************** vacuum_rel(Oid relid, VacuumStmt *vacstm
*** 1097,1105 ****
  
  
  /*
!  * Open all the indexes of the given relation, obtaining the specified kind
!  * of lock on each.  Return an array of Relation pointers for the indexes
!  * into *Irel, and the number of indexes into *nindexes.
   */
  void
  vac_open_indexes(Relation relation, LOCKMODE lockmode,
--- 1097,1112 ----
  
  
  /*
!  * Open all the vacuumable indexes of the given relation, obtaining the
!  * specified kind of lock on each.	Return an array of Relation pointers for
!  * the indexes into *Irel, and the number of indexes into *nindexes.
!  *
!  * We consider an index vacuumable if it is marked insertable (IndexIsReady).
!  * If it isn't, probably a CREATE INDEX CONCURRENTLY command failed early in
!  * execution, and what we have is too corrupt to be processable.  We will
!  * vacuum even if the index isn't indisvalid; this is important because in a
!  * unique index, uniqueness checks will be performed anyway and had better not
!  * hit dangling index pointers.
   */
  void
  vac_open_indexes(Relation relation, LOCKMODE lockmode,
*************** vac_open_indexes(Relation relation, LOCK
*** 1113,1133 ****
  
  	indexoidlist = RelationGetIndexList(relation);
  
! 	*nindexes = list_length(indexoidlist);
  
! 	if (*nindexes > 0)
! 		*Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
  	else
  		*Irel = NULL;
  
  	i = 0;
  	foreach(indexoidscan, indexoidlist)
  	{
  		Oid			indexoid = lfirst_oid(indexoidscan);
  
! 		(*Irel)[i++] = index_open(indexoid, lockmode);
  	}
  
  	list_free(indexoidlist);
  }
  
--- 1120,1149 ----
  
  	indexoidlist = RelationGetIndexList(relation);
  
! 	/* allocate enough memory for all indexes */
! 	i = list_length(indexoidlist);
  
! 	if (i > 0)
! 		*Irel = (Relation *) palloc(i * sizeof(Relation));
  	else
  		*Irel = NULL;
  
+ 	/* collect just the ready indexes */
  	i = 0;
  	foreach(indexoidscan, indexoidlist)
  	{
  		Oid			indexoid = lfirst_oid(indexoidscan);
+ 		Relation	indrel;
  
! 		indrel = index_open(indexoid, lockmode);
! 		if (IndexIsReady(indrel->rd_index))
! 			(*Irel)[i++] = indrel;
! 		else
! 			index_close(indrel, lockmode);
  	}
  
+ 	*nindexes = i;
+ 
  	list_free(indexoidlist);
  }
  
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index 0bbd0d464025c71dac05d72119c5a4d3052ecbf7..d6cf06c6fdfa90b51030aa26bbdac779576b82b9 100644
*** a/src/backend/executor/execUtils.c
--- b/src/backend/executor/execUtils.c
*************** ExecOpenIndices(ResultRelInfo *resultRel
*** 906,911 ****
--- 906,914 ----
  	/*
  	 * For each index, open the index relation and save pg_index info. We
  	 * acquire RowExclusiveLock, signifying we will update the index.
+ 	 *
+ 	 * Note: we do this even if the index is not IndexIsReady; it's not worth
+ 	 * the trouble to optimize for the case where it isn't.
  	 */
  	i = 0;
  	foreach(l, indexoidlist)
diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c
index abcd0ee574cf4f7560e71d8fdc11ebea5a9b303c..04d502866b516380a7695c773ca830869eb91fe4 100644
*** a/src/backend/optimizer/util/plancat.c
--- b/src/backend/optimizer/util/plancat.c
*************** get_relation_info(PlannerInfo *root, Oid
*** 170,178 ****
  			 * Ignore invalid indexes, since they can't safely be used for
  			 * queries.  Note that this is OK because the data structure we
  			 * are constructing is only used by the planner --- the executor
! 			 * still needs to insert into "invalid" indexes!
  			 */
! 			if (!index->indisvalid)
  			{
  				index_close(indexRelation, NoLock);
  				continue;
--- 170,179 ----
  			 * Ignore invalid indexes, since they can't safely be used for
  			 * queries.  Note that this is OK because the data structure we
  			 * are constructing is only used by the planner --- the executor
! 			 * still needs to insert into "invalid" indexes, if they're marked
! 			 * IndexIsReady.
  			 */
! 			if (!IndexIsValid(index))
  			{
  				index_close(indexRelation, NoLock);
  				continue;
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c
index 95c57e81b584e9e5c82a2efb3a626632c6753cfd..086cc75e2269ec1cd621b1024970860112ca470a 100644
*** a/src/backend/parser/parse_utilcmd.c
--- b/src/backend/parser/parse_utilcmd.c
*************** transformIndexConstraint(Constraint *con
*** 1533,1550 ****
  							index_name, RelationGetRelationName(heap_rel)),
  					 parser_errposition(cxt->pstate, constraint->location)));
  
! 		if (!index_form->indisvalid)
  			ereport(ERROR,
  					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
  					 errmsg("index \"%s\" is not valid", index_name),
  					 parser_errposition(cxt->pstate, constraint->location)));
  
- 		if (!index_form->indisready)
- 			ereport(ERROR,
- 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- 					 errmsg("index \"%s\" is not ready", index_name),
- 					 parser_errposition(cxt->pstate, constraint->location)));
- 
  		if (!index_form->indisunique)
  			ereport(ERROR,
  					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
--- 1533,1544 ----
  							index_name, RelationGetRelationName(heap_rel)),
  					 parser_errposition(cxt->pstate, constraint->location)));
  
! 		if (!IndexIsValid(index_form))
  			ereport(ERROR,
  					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
  					 errmsg("index \"%s\" is not valid", index_name),
  					 parser_errposition(cxt->pstate, constraint->location)));
  
  		if (!index_form->indisunique)
  			ereport(ERROR,
  					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 8c9ebe0f6f89cda440fae64299e6ba5d59aff150..9a504f80251b6c1c548672282612ec130ba70401 100644
*** a/src/backend/utils/cache/relcache.c
--- b/src/backend/utils/cache/relcache.c
*************** RelationReloadIndexInfo(Relation relatio
*** 1731,1739 ****
--- 1731,1753 ----
  				 RelationGetRelid(relation));
  		index = (Form_pg_index) GETSTRUCT(tuple);
  
+ 		/*
+ 		 * Basically, let's just copy all the bool fields.  There are one or
+ 		 * two of these that can't actually change in the current code, but
+ 		 * it's not worth it to track exactly which ones they are.  None of
+ 		 * the array fields are allowed to change, though.
+ 		 */
+ 		relation->rd_index->indisunique = index->indisunique;
+ 		relation->rd_index->indisprimary = index->indisprimary;
+ 		relation->rd_index->indisexclusion = index->indisexclusion;
+ 		relation->rd_index->indimmediate = index->indimmediate;
+ 		relation->rd_index->indisclustered = index->indisclustered;
  		relation->rd_index->indisvalid = index->indisvalid;
  		relation->rd_index->indcheckxmin = index->indcheckxmin;
  		relation->rd_index->indisready = index->indisready;
+ 		relation->rd_index->indislive = index->indislive;
+ 
+ 		/* Copy xmin too, as that is needed to make sense of indcheckxmin */
  		HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
  							   HeapTupleHeaderGetXmin(tuple->t_data));
  
*************** CheckConstraintFetch(Relation relation)
*** 3299,3304 ****
--- 3313,3322 ----
   * so that we must recompute the index list on next request.  This handles
   * creation or deletion of an index.
   *
+  * Indexes that are marked not IndexIsLive are omitted from the returned list.
+  * Such indexes are expected to be dropped momentarily, and should not be
+  * touched at all by any caller of this function.
+  *
   * The returned list is guaranteed to be sorted in order by OID.  This is
   * needed by the executor, since for index types that we obtain exclusive
   * locks on when updating the index, all backends must lock the indexes in
*************** RelationGetIndexList(Relation relation)
*** 3358,3366 ****
  		bool		isnull;
  
  		/*
! 		 * Ignore any indexes that are currently being dropped
  		 */
! 		if (!index->indisvalid && !index->indisready)
  			continue;
  
  		/* Add index's OID to result list in the proper order */
--- 3376,3387 ----
  		bool		isnull;
  
  		/*
! 		 * Ignore any indexes that are currently being dropped.  This will
! 		 * prevent them from being searched, inserted into, or considered in
! 		 * HOT-safety decisions.  It's unsafe to touch such an index at all
! 		 * since its catalog entries could disappear at any instant.
  		 */
! 		if (!IndexIsLive(index))
  			continue;
  
  		/* Add index's OID to result list in the proper order */
*************** RelationGetIndexList(Relation relation)
*** 3379,3385 ****
  		indclass = (oidvector *) DatumGetPointer(indclassDatum);
  
  		/* Check to see if it is a unique, non-partial btree index on OID */
! 		if (index->indnatts == 1 &&
  			index->indisunique && index->indimmediate &&
  			index->indkey.values[0] == ObjectIdAttributeNumber &&
  			indclass->values[0] == OID_BTREE_OPS_OID &&
--- 3400,3407 ----
  		indclass = (oidvector *) DatumGetPointer(indclassDatum);
  
  		/* Check to see if it is a unique, non-partial btree index on OID */
! 		if (IndexIsValid(index) &&
! 			index->indnatts == 1 &&
  			index->indisunique && index->indimmediate &&
  			index->indkey.values[0] == ObjectIdAttributeNumber &&
  			indclass->values[0] == OID_BTREE_OPS_OID &&
*************** RelationGetIndexAttrBitmap(Relation rela
*** 3674,3679 ****
--- 3696,3708 ----
  
  	/*
  	 * For each index, add referenced attributes to indexattrs.
+ 	 *
+ 	 * Note: we consider all indexes returned by RelationGetIndexList, even if
+ 	 * they are not indisready or indisvalid.  This is important because an
+ 	 * index for which CREATE INDEX CONCURRENTLY has just started must be
+ 	 * included in HOT-safety decisions (see README.HOT).  If a DROP INDEX
+ 	 * CONCURRENTLY is far enough along that we should ignore the index, it
+ 	 * won't be returned at all by RelationGetIndexList.
  	 */
  	indexattrs = NULL;
  	foreach(l, indexoidlist)
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 5254a57d8a6816b8d1a91b6effab7fc7c2398230..9622356a6376e85fc2ccbf16afc0d1e9e68ab28f 100644
*** a/src/include/catalog/catversion.h
--- b/src/include/catalog/catversion.h
***************
*** 53,58 ****
   */
  
  /*							yyyymmddN */
! #define CATALOG_VERSION_NO	201210071
  
  #endif
--- 53,58 ----
   */
  
  /*							yyyymmddN */
! #define CATALOG_VERSION_NO	201211281
  
  #endif
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index 298641baf7c1f3a8034df0161596a16efdaa0268..b96099f94ce39c9dec94d069847e2ef85788b6eb 100644
*** a/src/include/catalog/index.h
--- b/src/include/catalog/index.h
*************** typedef void (*IndexBuildCallback) (Rela
*** 27,32 ****
--- 27,41 ----
  												bool tupleIsAlive,
  												void *state);
  
+ /* Action code for index_set_state_flags */
+ typedef enum
+ {
+ 	INDEX_CREATE_SET_READY,
+ 	INDEX_CREATE_SET_VALID,
+ 	INDEX_DROP_CLEAR_VALID,
+ 	INDEX_DROP_SET_DEAD
+ } IndexStateFlagsAction;
+ 
  
  extern void index_check_primary_key(Relation heapRel,
  						IndexInfo *indexInfo,
*************** extern double IndexBuildHeapScan(Relatio
*** 90,95 ****
--- 99,106 ----
  
  extern void validate_index(Oid heapId, Oid indexId, Snapshot snapshot);
  
+ extern void index_set_state_flags(Oid indexId, IndexStateFlagsAction action);
+ 
  extern void reindex_index(Oid indexId, bool skip_constraint_checks);
  
  /* Flag bits for reindex_relation(): */
diff --git a/src/include/catalog/pg_index.h b/src/include/catalog/pg_index.h
index 934fe97e5f2e966b1ae038522ad74a97ad018af9..0ebe2cc9e2c6780fa7bd59fd299563c7f41a8ff6 100644
*** a/src/include/catalog/pg_index.h
--- b/src/include/catalog/pg_index.h
*************** CATALOG(pg_index,2610) BKI_WITHOUT_OIDS 
*** 41,46 ****
--- 41,47 ----
  	bool		indisvalid;		/* is this index valid for use by queries? */
  	bool		indcheckxmin;	/* must we wait for xmin to be old? */
  	bool		indisready;		/* is this index ready for inserts? */
+ 	bool		indislive;		/* is this index alive at all? */
  
  	/* variable-length fields start here, but we allow direct access to indkey */
  	int2vector	indkey;			/* column numbers of indexed cols, or 0 */
*************** typedef FormData_pg_index *Form_pg_index
*** 68,74 ****
   *		compiler constants for pg_index
   * ----------------
   */
! #define Natts_pg_index					17
  #define Anum_pg_index_indexrelid		1
  #define Anum_pg_index_indrelid			2
  #define Anum_pg_index_indnatts			3
--- 69,75 ----
   *		compiler constants for pg_index
   * ----------------
   */
! #define Natts_pg_index					18
  #define Anum_pg_index_indexrelid		1
  #define Anum_pg_index_indrelid			2
  #define Anum_pg_index_indnatts			3
*************** typedef FormData_pg_index *Form_pg_index
*** 80,91 ****
  #define Anum_pg_index_indisvalid		9
  #define Anum_pg_index_indcheckxmin		10
  #define Anum_pg_index_indisready		11
! #define Anum_pg_index_indkey			12
! #define Anum_pg_index_indcollation		13
! #define Anum_pg_index_indclass			14
! #define Anum_pg_index_indoption			15
! #define Anum_pg_index_indexprs			16
! #define Anum_pg_index_indpred			17
  
  /*
   * Index AMs that support ordered scans must support these two indoption
--- 81,93 ----
  #define Anum_pg_index_indisvalid		9
  #define Anum_pg_index_indcheckxmin		10
  #define Anum_pg_index_indisready		11
! #define Anum_pg_index_indislive			12
! #define Anum_pg_index_indkey			13
! #define Anum_pg_index_indcollation		14
! #define Anum_pg_index_indclass			15
! #define Anum_pg_index_indoption			16
! #define Anum_pg_index_indexprs			17
! #define Anum_pg_index_indpred			18
  
  /*
   * Index AMs that support ordered scans must support these two indoption
*************** typedef FormData_pg_index *Form_pg_index
*** 95,98 ****
--- 97,109 ----
  #define INDOPTION_DESC			0x0001	/* values are in reverse order */
  #define INDOPTION_NULLS_FIRST	0x0002	/* NULLs are first instead of last */
  
+ /*
+  * Use of these macros is recommended over direct examination of the state
+  * flag columns where possible; this allows source code compatibility with
+  * the hacky representation used in 9.2.
+  */
+ #define IndexIsValid(indexForm) ((indexForm)->indisvalid)
+ #define IndexIsReady(indexForm) ((indexForm)->indisready)
+ #define IndexIsLive(indexForm)	((indexForm)->indislive)
+ 
  #endif   /* PG_INDEX_H */
