Index: doc/src/sgml/catalogs.sgml
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/doc/src/sgml/catalogs.sgml,v
retrieving revision 2.137
diff -c -r2.137 catalogs.sgml
*** doc/src/sgml/catalogs.sgml	12 Nov 2006 06:25:37 -0000	2.137
--- doc/src/sgml/catalogs.sgml	15 Jan 2007 17:24:52 -0000
***************
*** 499,504 ****
--- 499,518 ----
        <entry>Function to parse and validate <structfield>reloptions</> for an index</entry>
       </row>
  
+      <row>
+       <entry><structfield>amprepareinsert</structfield></entry>
+       <entry><type>regproc</type></entry>
+       <entry><literal><link linkend="catalog-pg-proc"><structname>pg_proc</structname></link>.oid</literal></entry>
+       <entry>Performs the 1st phase of a two phase index insert, returning a suggestion of where in the heap to put a new tuple</entry>
+      </row>
+ 
+      <row>
+       <entry><structfield>amfinishinsert</structfield></entry>
+       <entry><type>regproc</type></entry>
+       <entry><literal><link linkend="catalog-pg-proc"><structname>pg_proc</structname></link>.oid</literal></entry>
+       <entry>Finishes an index insert started with amprepareinsert</entry>
+      </row>
+ 
      </tbody>
     </tgroup>
    </table>
Index: src/backend/access/heap/heapam.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/heap/heapam.c,v
retrieving revision 1.222.2.1
diff -c -r1.222.2.1 heapam.c
*** src/backend/access/heap/heapam.c	4 Feb 2007 20:00:49 -0000	1.222.2.1
--- src/backend/access/heap/heapam.c	18 May 2007 10:41:13 -0000
***************
*** 1363,1368 ****
--- 1363,1372 ----
   * use_fsm is passed directly to RelationGetBufferForTuple, which see for
   * more info.
   *
+  * If suggested_blk is a valid block number, the tuple will be inserted to
+  * that block if there's enough room. If it's full, a block will be chosen
+  * as if suggested_blk was not set.
+  *
   * The return value is the OID assigned to the tuple (either here or by the
   * caller), or InvalidOid if no OID.  The header fields of *tup are updated
   * to match the stored tuple; in particular tup->t_self receives the actual
***************
*** 1371,1377 ****
   */
  Oid
  heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			bool use_wal, bool use_fsm)
  {
  	TransactionId xid = GetCurrentTransactionId();
  	HeapTuple	heaptup;
--- 1375,1381 ----
   */
  Oid
  heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			bool use_wal, bool use_fsm, BlockNumber suggested_blk)
  {
  	TransactionId xid = GetCurrentTransactionId();
  	HeapTuple	heaptup;
***************
*** 1421,1429 ****
  	else
  		heaptup = tup;
  
! 	/* Find buffer to insert this tuple into */
! 	buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
! 									   InvalidBuffer, use_fsm);
  
  	/* NO EREPORT(ERROR) from here till changes are logged */
  	START_CRIT_SECTION();
--- 1425,1467 ----
  	else
  		heaptup = tup;
  
! 	/* Find buffer to insert this tuple into. Try the suggested block first
! 	 * if caller gave one.
! 	 */
! 	if (suggested_blk != InvalidBlockNumber)
! 	{
! 		Buffer suggested_buf;
! 		Page pageHeader;
! 		Size pageFreeSpace;
! 
! 		suggested_buf = ReadBuffer(relation, suggested_blk);
! 		pageHeader = (Page) BufferGetPage(suggested_buf);
! 
! 		LockBuffer(suggested_buf, BUFFER_LOCK_EXCLUSIVE);
! 
! 		/* Don't subtract fillfactor from the free space. That space is
! 		 * reserved exactly for situations like this; keeping updated and
! 		 * inserted tuples close to other tuples with similar values.
! 		 */
! 		pageFreeSpace = PageGetFreeSpace(pageHeader);
! 
! 		if (heaptup->t_len <= pageFreeSpace)
! 			buffer = suggested_buf;
! 		else
! 		{
! 			/* Page was full. Release lock and pin and get another block
! 			 * as if suggested_blk was not given. 
! 			 */
! 			LockBuffer(suggested_buf, BUFFER_LOCK_UNLOCK);
! 			ReleaseBuffer(suggested_buf);
! 
! 			buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
! 											   InvalidBuffer, use_fsm);
! 		}
! 	} 
! 	else
! 		buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
! 										   InvalidBuffer, use_fsm);
  
  	/* NO EREPORT(ERROR) from here till changes are logged */
  	START_CRIT_SECTION();
***************
*** 1531,1537 ****
  Oid
  simple_heap_insert(Relation relation, HeapTuple tup)
  {
! 	return heap_insert(relation, tup, GetCurrentCommandId(), true, true);
  }
  
  /*
--- 1569,1576 ----
  Oid
  simple_heap_insert(Relation relation, HeapTuple tup)
  {
! 	return heap_insert(relation, tup, GetCurrentCommandId(), true, 
! 					   true, InvalidBlockNumber);
  }
  
  /*
Index: src/backend/access/index/indexam.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/index/indexam.c,v
retrieving revision 1.95
diff -c -r1.95 indexam.c
*** src/backend/access/index/indexam.c	4 Oct 2006 00:29:48 -0000	1.95
--- src/backend/access/index/indexam.c	18 May 2007 10:42:13 -0000
***************
*** 18,23 ****
--- 18,25 ----
   *		index_rescan	- restart a scan of an index
   *		index_endscan	- end a scan
   *		index_insert	- insert an index tuple into a relation
+  *		index_prepareinsert	- get desired insert location for a heap tuple
+  *		index_finishinsert	- insert a previously prepared index tuple
   *		index_markpos	- mark a scan position
   *		index_restrpos	- restore a scan position
   *		index_getnext	- get the next tuple from a scan
***************
*** 202,207 ****
--- 204,269 ----
  									  BoolGetDatum(check_uniqueness)));
  }
  
+ /* ----------------
+  *		index_prepareinsert - get desired insert location for a heap tuple
+  *
+  * The returned BlockNumber is the *heap* page that is the best place
+  * to insert the given tuple to, according to the index am. The best
+  * place is one that maintains the cluster order.
+  *
+  * opaque should be passed to a later index_finishinsert to finish the
+  * insert.
+  * ----------------
+  */
+ BlockNumber
+ index_prepareinsert(Relation indexRelation,
+ 					Datum *values,
+ 					bool *isnull,
+ 					Relation heapRelation,
+ 					bool check_uniqueness,
+ 					void **opaque)
+ {
+ 	FmgrInfo   *procedure;
+ 
+ 	RELATION_CHECKS;
+ 	GET_REL_PROCEDURE(amprepareinsert);
+ 
+ 	/*
+ 	 * have the am's prepareinsert proc do all the work.
+ 	 */
+ 	return DatumGetUInt32(FunctionCall6(procedure,
+ 										PointerGetDatum(indexRelation),
+ 										PointerGetDatum(values),
+ 										PointerGetDatum(isnull),
+ 										PointerGetDatum(heapRelation),
+ 										BoolGetDatum(check_uniqueness),
+ 										PointerGetDatum(opaque)));
+ }
+ 
+ /* ----------------
+  *		index_finishinsert - insert a previously prepared index tuple
+  *
+  * Finishes an insert operation initiated by an earlier call to
+  * index_prepareinsert. 
+  * ----------------
+  */
+ bool
+ index_finishinsert(Relation indexRelation,
+ 				   ItemPointer heap_t_ctid, void *opaque)
+ {
+ 	FmgrInfo   *procedure;
+ 
+ 	RELATION_CHECKS;
+ 	GET_REL_PROCEDURE(amfinishinsert);
+ 
+ 	/*
+ 	 * have the am's finishinsert proc do all the work.
+ 	 */
+ 	return DatumGetBool(FunctionCall2(procedure,
+ 									  PointerGetDatum(heap_t_ctid),
+ 									  PointerGetDatum(opaque)));
+ }
+ 
  /*
   * index_beginscan - start a scan of an index with amgettuple
   *
Index: src/backend/access/nbtree/nbtinsert.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/nbtree/nbtinsert.c,v
retrieving revision 1.146.2.1
diff -c -r1.146.2.1 nbtinsert.c
*** src/backend/access/nbtree/nbtinsert.c	27 Jan 2007 20:53:36 -0000	1.146.2.1
--- src/backend/access/nbtree/nbtinsert.c	18 May 2007 10:19:05 -0000
***************
*** 86,99 ****
  	/* we need an insertion scan key to do our search, so build one */
  	itup_scankey = _bt_mkscankey(rel, itup);
  
- top:
  	/* find the first page containing this key */
  	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
  
  	/* trade in our read lock for a write lock */
  	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
  	LockBuffer(buf, BT_WRITE);
  
  	/*
  	 * If the page was split between the time that we surrendered our read
  	 * lock and acquired our write lock, then this page may no longer be the
--- 86,198 ----
  	/* we need an insertion scan key to do our search, so build one */
  	itup_scankey = _bt_mkscankey(rel, itup);
  
  	/* find the first page containing this key */
  	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
  
  	/* trade in our read lock for a write lock */
  	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ 
+ 	_bt_finishinsert(rel, heapRel, index_is_unique, itup, 
+ 					 itup_scankey, stack, buf);
+ }
+ 
+ /*
+  *	_bt_prepareinsert() -- Find the insert location for a new tuple
+  *
+  * Descends the tree and finds the location for a new index tuple.
+  * As a hint to the executor, returns the heap block number the previous
+  * index tuple at that location points to. By inserting the heap tuple
+  * to that block, the heap will stay better clustered than by inserting
+  * to a random block.
+  */
+ BlockNumber
+ _bt_prepareinsert(Relation rel, IndexTuple itup, bool index_is_unique, 
+ 				  Relation heapRel, BTInsertInfo *opaquePtr)
+ {
+ 	int			natts = rel->rd_rel->relnatts;
+ 	OffsetNumber offset;
+ 	Page		page;
+ 	BTPageOpaque opaque;
+ 
+ 	ScanKey		itup_scankey;
+ 	BTStack		stack;
+ 	Buffer		buf;
+ 	BlockNumber suggestion = InvalidBlockNumber;
+ 	BTInsertInfo insert_opaque;
+ 
+ 	/* we need an insertion scan key to do our search, so build one */
+ 	itup_scankey = _bt_mkscankey(rel, itup);
+ 
+ 	/* find the first page containing this key */
+ 	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_READ);
+ 	if(!BufferIsValid(buf))
+ 	{
+ 		/* The index was completely empty. No suggestion then. */
+ 		*opaquePtr = NULL;
+ 		return InvalidBlockNumber;
+ 	}
+ 
+ 	page = BufferGetPage(buf);
+ 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ 
+ 	/* Find the location in the page where the new index tuple would go to. */
+ 
+ 	offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
+ 	if (offset > PageGetMaxOffsetNumber(page))
+ 	{
+ 		/* _bt_binsrch returned pointer to end-of-page. It means that
+ 		 * there was no equal items on the page, and the new item should 
+ 		 * be inserted as the last tuple of the page. There could be equal
+ 		 * items on the next page, however.
+ 		 *
+ 		 * At the moment, we just ignore the potential equal items on the 
+ 		 * right, and pretend there isn't any. We could instead walk right
+ 		 * to the next page to check that, but let's keep it simple for now.
+ 		 */
+ 		offset = OffsetNumberPrev(offset);
+ 	}
+ 	if(offset < P_FIRSTDATAKEY(opaque))
+ 	{
+ 		/* We landed on an empty page. We could step left or right until
+ 		 * we find some items, but let's keep it simple for now. 
+ 		 */
+ 	} else {
+ 		/* We're now positioned at the index tuple that we're interested in. */
+ 		ItemId iid = PageGetItemId(page, offset);
+ 		IndexTuple curitup = (IndexTuple) PageGetItem(page, iid);
+ 
+ 		suggestion = ItemPointerGetBlockNumber(&curitup->t_tid);
+ 	}
+ 
+ 	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ 
+ 	insert_opaque = *opaquePtr = palloc(sizeof(struct BTInsertInfoData));
+ 	insert_opaque->rel = rel;
+ 	insert_opaque->heapRel = heapRel;
+ 	insert_opaque->index_is_unique = index_is_unique;
+ 	insert_opaque->itup = itup;
+ 	insert_opaque->itup_scankey = itup_scankey;
+ 	insert_opaque->stack = stack;
+ 	insert_opaque->buf = buf;
+ 
+ 	return suggestion;
+ }
+ 
+ /*
+  *	_bt_finishinsert() -- Handle insertion of a single index tuple in the tree.
+  *
+  */
+ void
+ _bt_finishinsert(Relation rel, Relation heapRel, bool index_is_unique,
+ 				 IndexTuple itup, ScanKey itup_scankey,
+ 				 BTStack stack, Buffer buf)
+ {
+ 	int			natts = rel->rd_rel->relnatts;
+ 
  	LockBuffer(buf, BT_WRITE);
  
+ top:
+ 
  	/*
  	 * If the page was split between the time that we surrendered our read
  	 * lock and acquired our write lock, then this page may no longer be the
***************
*** 133,138 ****
--- 232,245 ----
  			XactLockTableWait(xwait);
  			/* start over... */
  			_bt_freestack(stack);
+ 
+ 			/* find the first page containing this key */
+ 			stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
+ 
+ 			/* trade in our read lock for a write lock */
+ 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ 			LockBuffer(buf, BT_WRITE);
+ 
  			goto top;
  		}
  	}
***************
*** 143,148 ****
--- 250,256 ----
  	/* be tidy */
  	_bt_freestack(stack);
  	_bt_freeskey(itup_scankey);
+ 	pfree(itup);
  }
  
  /*
Index: src/backend/access/nbtree/nbtree.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/nbtree/nbtree.c,v
retrieving revision 1.153
diff -c -r1.153 nbtree.c
*** src/backend/access/nbtree/nbtree.c	1 Nov 2006 19:43:17 -0000	1.153
--- src/backend/access/nbtree/nbtree.c	18 May 2007 10:20:00 -0000
***************
*** 223,229 ****
  
  	_bt_doinsert(rel, itup, checkUnique, heapRel);
  
! 	pfree(itup);
  
  	PG_RETURN_BOOL(true);
  }
--- 223,278 ----
  
  	_bt_doinsert(rel, itup, checkUnique, heapRel);
  
! 	PG_RETURN_BOOL(true);
! }
! 
! /*
!  *	btprepareinsert() -- find the best place in the heap to put a new tuple.
!  *
!  *		This uses the same logic as btinsert to find the place where the index
!  *		tuple would go if this was a btinsert call.
!  */
! Datum
! btprepareinsert(PG_FUNCTION_ARGS)
! {
! 	Relation	rel = (Relation) PG_GETARG_POINTER(0);
! 	Datum	   *values = (Datum *) PG_GETARG_POINTER(1);
! 	bool	   *isnull = (bool *) PG_GETARG_POINTER(2);
! 	Relation	heapRel = (Relation) PG_GETARG_POINTER(3);
! 	bool		checkUnique = PG_GETARG_BOOL(4);
! 	void	  **opaquePtr = (void **) PG_GETARG_POINTER(5);
! 	IndexTuple	itup;
! 	BlockNumber suggestion;
! 
! 	/* generate an index tuple */
! 	itup = index_form_tuple(RelationGetDescr(rel), values, isnull);
! 
! 	suggestion =_bt_prepareinsert(rel, itup, checkUnique, heapRel,  
! 								  (BTInsertInfo *) opaquePtr);
! 
! 	PG_RETURN_UINT32(suggestion);
! }
! 
! /*
!  *	btfinishinsert() -- finish insert
!  */
! Datum
! btfinishinsert(PG_FUNCTION_ARGS)
! {
! 	ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(0);
! 	BTInsertInfo opaque = (void *) PG_GETARG_POINTER(1);
! 
! 	opaque->itup->t_tid = *ht_ctid;
! 
! 	_bt_finishinsert(opaque->rel,
! 					 opaque->heapRel,
! 					 opaque->index_is_unique,
! 					 opaque->itup,
! 					 opaque->itup_scankey,
! 					 opaque->stack,
! 					 opaque->buf);
! 
! 	pfree(opaque);
  
  	PG_RETURN_BOOL(true);
  }
Index: src/backend/executor/execMain.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/executor/execMain.c,v
retrieving revision 1.280.2.2
diff -c -r1.280.2.2 execMain.c
*** src/backend/executor/execMain.c	2 Feb 2007 00:07:27 -0000	1.280.2.2
--- src/backend/executor/execMain.c	18 May 2007 09:48:19 -0000
***************
*** 53,58 ****
--- 53,59 ----
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"
  
+ bool cluster_inserts = true; /* GUC */
  
  typedef struct evalPlanQual
  {
***************
*** 847,854 ****
--- 848,857 ----
  	resultRelInfo->ri_RangeTableIndex = resultRelationIndex;
  	resultRelInfo->ri_RelationDesc = resultRelationDesc;
  	resultRelInfo->ri_NumIndices = 0;
+ 	resultRelInfo->ri_ClusterIndex = -1;
  	resultRelInfo->ri_IndexRelationDescs = NULL;
  	resultRelInfo->ri_IndexRelationInfo = NULL;
+ 	resultRelInfo->ri_PreparedInsertOpaque = NULL;
  	/* make a copy so as not to depend on relcache info not changing... */
  	resultRelInfo->ri_TrigDesc = CopyTriggerDesc(resultRelationDesc->trigdesc);
  	if (resultRelInfo->ri_TrigDesc)
***************
*** 1331,1336 ****
--- 1334,1340 ----
  	ResultRelInfo *resultRelInfo;
  	Relation	resultRelationDesc;
  	Oid			newId;
+ 	BlockNumber suggestedBlock;
  
  	/*
  	 * get the heap tuple out of the tuple table slot, making sure we have a
***************
*** 1379,1384 ****
--- 1383,1395 ----
  	if (resultRelationDesc->rd_att->constr)
  		ExecConstraints(resultRelInfo, slot, estate);
  
+ 	/* Ask the index am of the clustered index for the 
+ 	 * best place to put it */
+ 	if(cluster_inserts)
+ 		suggestedBlock = ExecPrepareIndexInsert(slot, estate);
+ 	else
+ 		suggestedBlock = InvalidBlockNumber;
+ 
  	/*
  	 * insert the tuple
  	 *
***************
*** 1387,1393 ****
  	 */
  	newId = heap_insert(resultRelationDesc, tuple,
  						estate->es_snapshot->curcid,
! 						true, true);
  
  	IncrAppended();
  	(estate->es_processed)++;
--- 1398,1404 ----
  	 */
  	newId = heap_insert(resultRelationDesc, tuple,
  						estate->es_snapshot->curcid,
! 						true, true, suggestedBlock);
  
  	IncrAppended();
  	(estate->es_processed)++;
***************
*** 2554,2560 ****
  				tuple,
  				estate->es_snapshot->curcid,
  				estate->es_into_relation_use_wal,
! 				false);			/* never any point in using FSM */
  
  	/* We know this is a newly created relation, so there are no indexes */
  
--- 2565,2572 ----
  				tuple,
  				estate->es_snapshot->curcid,
  				estate->es_into_relation_use_wal,
! 				false, 			/* never any point in using FSM */
! 				InvalidBlockNumber);
  
  	/* We know this is a newly created relation, so there are no indexes */
  
Index: src/backend/executor/execUtils.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/executor/execUtils.c,v
retrieving revision 1.140.2.3
diff -c -r1.140.2.3 execUtils.c
*** src/backend/executor/execUtils.c	6 Feb 2007 17:35:27 -0000	1.140.2.3
--- src/backend/executor/execUtils.c	18 May 2007 09:48:19 -0000
***************
*** 32,37 ****
--- 32,38 ----
   *		ExecOpenIndices			\
   *		ExecCloseIndices		 | referenced by InitPlan, EndPlan,
   *		ExecInsertIndexTuples	/  ExecInsert, ExecUpdate
+  *		ExecSuggestBlock		Referenced by ExecInsert
   *
   *		RegisterExprContextCallback    Register function shutdown callback
   *		UnregisterExprContextCallback  Deregister function shutdown callback
***************
*** 939,944 ****
--- 940,946 ----
  	IndexInfo **indexInfoArray;
  
  	resultRelInfo->ri_NumIndices = 0;
+ 	resultRelInfo->ri_ClusterIndex = -1;
  
  	/* fast path if no indexes */
  	if (!RelationGetForm(resultRelation)->relhasindex)
***************
*** 978,983 ****
--- 980,990 ----
  		/* extract index key information from the index's pg_index info */
  		ii = BuildIndexInfo(indexDesc);
  
+ 		/* Remember which index is the clustered one.
+ 		 * It's used to call the suggestblock-method on inserts */
+ 		if(indexDesc->rd_index->indisclustered)
+ 			resultRelInfo->ri_ClusterIndex = i;
+ 
  		relationDescs[i] = indexDesc;
  		indexInfoArray[i] = ii;
  		i++;
***************
*** 1044,1049 ****
--- 1051,1058 ----
  	ExprContext *econtext;
  	Datum		values[INDEX_MAX_KEYS];
  	bool		isnull[INDEX_MAX_KEYS];
+ 	int			clusterIndex;
+ 	bool		preparedInsert;
  
  	/*
  	 * Get information from the result relation info structure.
***************
*** 1053,1058 ****
--- 1062,1069 ----
  	relationDescs = resultRelInfo->ri_IndexRelationDescs;
  	indexInfoArray = resultRelInfo->ri_IndexRelationInfo;
  	heapRelation = resultRelInfo->ri_RelationDesc;
+ 	clusterIndex = resultRelInfo->ri_ClusterIndex;
+ 	preparedInsert = resultRelInfo->ri_PreparedInsertOpaque != NULL;
  
  	/*
  	 * We will use the EState's per-tuple context for evaluating predicates
***************
*** 1063,1068 ****
--- 1074,1092 ----
  	/* Arrange for econtext's scan tuple to be the tuple under test */
  	econtext->ecxt_scantuple = slot;
  
+ 	if (preparedInsert)
+ 	{
+ 		index_finishinsert(relationDescs[clusterIndex],
+ 						   tupleid,
+ 						   resultRelInfo->ri_PreparedInsertOpaque);
+ 		resultRelInfo->ri_PreparedInsertOpaque = NULL;
+ 
+ 		/*
+ 		 * keep track of index inserts for debugging
+ 		 */
+ 		IncrIndexInserted();
+ 	}
+ 
  	/*
  	 * for each index, form and insert the index tuple
  	 */
***************
*** 1073,1078 ****
--- 1097,1105 ----
  		if (relationDescs[i] == NULL)
  			continue;
  
+ 		if (preparedInsert && i == clusterIndex)
+ 			continue; /* insert to clustered index was already handled above */
+ 
  		indexInfo = indexInfoArray[i];
  
  		/* Check for partial index */
***************
*** 1127,1132 ****
--- 1154,1228 ----
  	}
  }
  
+ /* ----------------------------------------------------------------
+  *		ExecPrepareIndexInsert
+  *
+  *		This routine asks the index am where a new heap tuple
+  *		should be placed.
+  * ----------------------------------------------------------------
+  */
+ BlockNumber
+ ExecPrepareIndexInsert(TupleTableSlot *slot,
+ 					   EState *estate)
+ {
+ 	ResultRelInfo *resultRelInfo;
+ 	int			i;
+ 	Relation	relationDesc;
+ 	Relation	heapRelation;
+ 	ExprContext *econtext;
+ 	Datum		values[INDEX_MAX_KEYS];
+ 	bool		isnull[INDEX_MAX_KEYS];
+ 	IndexInfo  *indexInfo;
+ 
+ 	/*
+ 	 * Get information from the result relation info structure.
+ 	 */
+ 	resultRelInfo = estate->es_result_relation_info;
+ 	i = resultRelInfo->ri_ClusterIndex;
+ 	if (i == -1)
+ 		return InvalidBlockNumber; /* there was no clustered index */
+ 
+ 	heapRelation = resultRelInfo->ri_RelationDesc;
+ 	relationDesc = resultRelInfo->ri_IndexRelationDescs[i];
+ 	indexInfo = resultRelInfo->ri_IndexRelationInfo[i];
+ 
+ 	if (!OidIsValid(relationDesc->rd_am->amprepareinsert))
+ 		return InvalidBlockNumber; /* the indexam doesn't support the
+ 									* two-phase insert API */
+ 
+ 	/* You can't cluster on a partial index */
+ 	Assert(indexInfo->ii_Predicate == NIL);
+ 
+ 	/*
+ 	 * We will use the EState's per-tuple context for evaluating 
+ 	 * index expressions (creating it if it's not already there).
+ 	 */
+ 	econtext = GetPerTupleExprContext(estate);
+ 
+ 	/* Arrange for econtext's scan tuple to be the tuple under test */
+ 	econtext->ecxt_scantuple = slot;
+ 
+ 	/*
+ 	 * FormIndexDatum fills in its values and isnull parameters with the
+ 	 * appropriate values for the column(s) of the index.
+ 	 */
+ 	FormIndexDatum(indexInfo,
+ 				   slot,
+ 				   estate,
+ 				   values,
+ 				   isnull);
+ 
+ 	/*
+ 	 * The index AM does the rest.
+ 	 */
+ 	return index_prepareinsert(relationDesc,	/* index relation */
+ 				 values,	/* array of index Datums */
+ 				 isnull,	/* null flags */
+ 				 heapRelation,
+ 			     relationDesc->rd_index->indisunique,
+ 				 &resultRelInfo->ri_PreparedInsertOpaque);
+ }
+ 
  /*
   * UpdateChangedParamSet
   *		Add changed parameters to a plan node's chgParam set
Index: src/backend/utils/misc/guc.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/utils/misc/guc.c,v
retrieving revision 1.360.2.1
diff -c -r1.360.2.1 guc.c
*** src/backend/utils/misc/guc.c	23 Apr 2007 15:13:30 -0000	1.360.2.1
--- src/backend/utils/misc/guc.c	18 May 2007 09:48:22 -0000
***************
*** 93,98 ****
--- 93,99 ----
  #define MS_PER_D (1000 * 60 * 60 * 24)
  
  /* XXX these should appear in other modules' header files */
+ extern bool cluster_inserts;
  extern bool Log_disconnections;
  extern int	CommitDelay;
  extern int	CommitSiblings;
***************
*** 403,408 ****
--- 404,417 ----
  static struct config_bool ConfigureNamesBool[] =
  {
  	{
+ 		{"cluster_inserts", PGC_USERSET, DEVELOPER_OPTIONS,
+ 			gettext_noop("Tries to maintain cluster order on inserts."),
+ 			NULL
+ 		},
+ 		&cluster_inserts,
+ 		true, NULL, NULL
+ 	},
+ 	{
  		{"enable_seqscan", PGC_USERSET, QUERY_TUNING_METHOD,
  			gettext_noop("Enables the planner's use of sequential-scan plans."),
  			NULL
Index: src/include/access/genam.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/access/genam.h,v
retrieving revision 1.65
diff -c -r1.65 genam.h
*** src/include/access/genam.h	31 Jul 2006 20:09:05 -0000	1.65
--- src/include/access/genam.h	18 May 2007 10:21:49 -0000
***************
*** 93,98 ****
--- 93,106 ----
  			 ItemPointer heap_t_ctid,
  			 Relation heapRelation,
  			 bool check_uniqueness);
+ extern BlockNumber index_prepareinsert(Relation indexRelation,
+ 			 Datum *values, bool *isnull,
+ 			 Relation heapRelation,
+ 			 bool check_uniqueness,
+ 			 void **opauqe);
+ extern bool index_finishinsert(Relation indexRelation,
+ 			 ItemPointer heap_t_ctid,
+ 			 void *opaque);
  
  extern IndexScanDesc index_beginscan(Relation heapRelation,
  				Relation indexRelation,
Index: src/include/access/heapam.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/access/heapam.h,v
retrieving revision 1.117
diff -c -r1.117 heapam.h
*** src/include/access/heapam.h	5 Nov 2006 22:42:10 -0000	1.117
--- src/include/access/heapam.h	15 Jan 2007 17:24:52 -0000
***************
*** 157,163 ****
  extern void setLastTid(const ItemPointer tid);
  
  extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			bool use_wal, bool use_fsm);
  extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
  			CommandId cid, Snapshot crosscheck, bool wait);
--- 157,163 ----
  extern void setLastTid(const ItemPointer tid);
  
  extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			bool use_wal, bool use_fsm, BlockNumber suggestedblk);
  extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
  			CommandId cid, Snapshot crosscheck, bool wait);
Index: src/include/access/nbtree.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/access/nbtree.h,v
retrieving revision 1.106
diff -c -r1.106 nbtree.h
*** src/include/access/nbtree.h	1 Nov 2006 19:43:17 -0000	1.106
--- src/include/access/nbtree.h	18 May 2007 10:22:09 -0000
***************
*** 479,488 ****
--- 479,511 ----
  extern Datum btbulkdelete(PG_FUNCTION_ARGS);
  extern Datum btvacuumcleanup(PG_FUNCTION_ARGS);
  extern Datum btoptions(PG_FUNCTION_ARGS);
+ extern Datum btprepareinsert(PG_FUNCTION_ARGS);
+ extern Datum btfinishinsert(PG_FUNCTION_ARGS);
+ 
+ /* Filled in by _bt_prepareinsert */
+ typedef struct BTInsertInfoData
+ {
+ 	Relation rel;
+ 	Relation heapRel;
+ 	bool index_is_unique;
+ 	IndexTuple itup;
+ 	ScanKey itup_scankey;
+ 	Buffer buf; /* pinned, not locked */
+ 	BTStack stack;
+ } BTInsertInfoData;
+ 
+ typedef BTInsertInfoData *BTInsertInfo;
  
  /*
   * prototypes for functions in nbtinsert.c
   */
+ extern BlockNumber _bt_prepareinsert(Relation rel, IndexTuple itup,
+ 									 bool index_is_unique, Relation heapRel,
+ 									 BTInsertInfo *opaquePtr);
+ extern void _bt_finishinsert(Relation rel, Relation heapRel, 
+ 							 bool check_uniqueness,
+ 							 IndexTuple itup, ScanKey itup_scankey,
+ 							 BTStack stack, Buffer buf);
  extern void _bt_doinsert(Relation rel, IndexTuple itup,
  			 bool index_is_unique, Relation heapRel);
  extern Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
Index: src/include/catalog/pg_am.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/catalog/pg_am.h,v
retrieving revision 1.46
diff -c -r1.46 pg_am.h
*** src/include/catalog/pg_am.h	31 Jul 2006 20:09:05 -0000	1.46
--- src/include/catalog/pg_am.h	18 May 2007 10:25:26 -0000
***************
*** 65,70 ****
--- 65,72 ----
  	regproc		amvacuumcleanup;	/* post-VACUUM cleanup function */
  	regproc		amcostestimate; /* estimate cost of an indexscan */
  	regproc		amoptions;		/* parse AM-specific parameters */
+ 	regproc		amprepareinsert;	/* get desired insert location on heap */
+ 	regproc		amfinishinsert;	/* finish a prepared insert operation */
  } FormData_pg_am;
  
  /* ----------------
***************
*** 78,84 ****
   *		compiler constants for pg_am
   * ----------------
   */
! #define Natts_pg_am						23
  #define Anum_pg_am_amname				1
  #define Anum_pg_am_amstrategies			2
  #define Anum_pg_am_amsupport			3
--- 80,86 ----
   *		compiler constants for pg_am
   * ----------------
   */
! #define Natts_pg_am						25
  #define Anum_pg_am_amname				1
  #define Anum_pg_am_amstrategies			2
  #define Anum_pg_am_amsupport			3
***************
*** 102,123 ****
  #define Anum_pg_am_amvacuumcleanup		21
  #define Anum_pg_am_amcostestimate		22
  #define Anum_pg_am_amoptions			23
  
  /* ----------------
   *		initial contents of pg_am
   * ----------------
   */
  
! DATA(insert OID = 403 (  btree	5 1 1 t t t t f t btinsert btbeginscan btgettuple btgetmulti btrescan btendscan btmarkpos btrestrpos btbuild btbulkdelete btvacuumcleanup btcostestimate btoptions ));
  DESCR("b-tree index access method");
  #define BTREE_AM_OID 403
! DATA(insert OID = 405 (  hash	1 1 0 f f f f f f hashinsert hashbeginscan hashgettuple hashgetmulti hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete hashvacuumcleanup hashcostestimate hashoptions ));
  DESCR("hash index access method");
  #define HASH_AM_OID 405
! DATA(insert OID = 783 (  gist	100 7 0 f t t t t t gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate gistoptions ));
  DESCR("GiST index access method");
  #define GIST_AM_OID 783
! DATA(insert OID = 2742 (  gin	100 4 0 f f f f t f gininsert ginbeginscan gingettuple gingetmulti ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbulkdelete ginvacuumcleanup gincostestimate ginoptions ));
  DESCR("GIN index access method");
  #define GIN_AM_OID 2742
  
--- 104,127 ----
  #define Anum_pg_am_amvacuumcleanup		21
  #define Anum_pg_am_amcostestimate		22
  #define Anum_pg_am_amoptions			23
+ #define Anum_pg_am_amprepareinsert		24
+ #define Anum_pg_am_amfinishinsert		25
  
  /* ----------------
   *		initial contents of pg_am
   * ----------------
   */
  
! DATA(insert OID = 403 (  btree	5 1 1 t t t t f t btinsert btbeginscan btgettuple btgetmulti btrescan btendscan btmarkpos btrestrpos btbuild btbulkdelete btvacuumcleanup btcostestimate btoptions btprepareinsert btfinishinsert));
  DESCR("b-tree index access method");
  #define BTREE_AM_OID 403
! DATA(insert OID = 405 (  hash	1 1 0 f f f f f f hashinsert hashbeginscan hashgettuple hashgetmulti hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete hashvacuumcleanup hashcostestimate hashoptions - -));
  DESCR("hash index access method");
  #define HASH_AM_OID 405
! DATA(insert OID = 783 (  gist	100 7 0 f t t t t t gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate gistoptions - -));
  DESCR("GiST index access method");
  #define GIST_AM_OID 783
! DATA(insert OID = 2742 (  gin	100 4 0 f f f f t f gininsert ginbeginscan gingettuple gingetmulti ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbulkdelete ginvacuumcleanup gincostestimate ginoptions - -));
  DESCR("GIN index access method");
  #define GIN_AM_OID 2742
  
Index: src/include/catalog/pg_proc.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/catalog/pg_proc.h,v
retrieving revision 1.429
diff -c -r1.429 pg_proc.h
*** src/include/catalog/pg_proc.h	28 Nov 2006 19:18:44 -0000	1.429
--- src/include/catalog/pg_proc.h	18 May 2007 11:32:04 -0000
***************
*** 682,687 ****
--- 682,691 ----
  DESCR("btree(internal)");
  DATA(insert OID = 2785 (  btoptions		   PGNSP PGUID 12 f f t f s 2 17 "1009 16" _null_ _null_ _null_  btoptions - _null_ ));
  DESCR("btree(internal)");
+ DATA(insert OID = 5433 (  btprepareinsert   PGNSP PGUID 12 f f t f v 6 23 "2281 2281 2281 2281 2281 2281" _null_ _null_ _null_	btprepareinsert - _null_ ));
+ DESCR("btree(internal)");
+ DATA(insert OID = 5430 (  btfinishinsert   PGNSP PGUID 12 f f t f v 2 16 "2281 2281" _null_ _null_ _null_	btfinishinsert - _null_ ));
+ DESCR("btree(internal)");
  
  DATA(insert OID = 339 (  poly_same		   PGNSP PGUID 12 f f t f i 2 16 "604 604" _null_ _null_ _null_ poly_same - _null_ ));
  DESCR("same as?");
Index: src/include/executor/executor.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/executor/executor.h,v
retrieving revision 1.130.2.2
diff -c -r1.130.2.2 executor.h
*** src/include/executor/executor.h	2 Feb 2007 00:07:28 -0000	1.130.2.2
--- src/include/executor/executor.h	18 May 2007 09:48:24 -0000
***************
*** 275,280 ****
--- 275,281 ----
  extern void ExecCloseIndices(ResultRelInfo *resultRelInfo);
  extern void ExecInsertIndexTuples(TupleTableSlot *slot, ItemPointer tupleid,
  					  EState *estate, bool is_vacuum);
+ extern BlockNumber ExecPrepareIndexInsert(TupleTableSlot *slot, EState *estate);
  
  extern void RegisterExprContextCallback(ExprContext *econtext,
  							ExprContextCallbackFunction function,
Index: src/include/nodes/execnodes.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/nodes/execnodes.h,v
retrieving revision 1.161.2.2
diff -c -r1.161.2.2 execnodes.h
*** src/include/nodes/execnodes.h	26 Apr 2007 23:24:57 -0000	1.161.2.2
--- src/include/nodes/execnodes.h	18 May 2007 09:48:25 -0000
***************
*** 259,264 ****
--- 259,266 ----
   *		NumIndices				# of indices existing on result relation
   *		IndexRelationDescs		array of relation descriptors for indices
   *		IndexRelationInfo		array of key/attr info for indices
+  *		ClusterIndex			index to the IndexRelationInfo array of the
+  *								clustered index, or -1 if there's none
   *		TrigDesc				triggers to be fired, if any
   *		TrigFunctions			cached lookup info for trigger functions
   *		TrigInstrument			optional runtime measurements for triggers
***************
*** 275,286 ****
--- 277,291 ----
  	int			ri_NumIndices;
  	RelationPtr ri_IndexRelationDescs;
  	IndexInfo **ri_IndexRelationInfo;
+ 	int         ri_ClusterIndex;
  	TriggerDesc *ri_TrigDesc;
  	FmgrInfo   *ri_TrigFunctions;
  	struct Instrumentation *ri_TrigInstrument;
  	List	  **ri_ConstraintExprs;
  	JunkFilter *ri_junkFilter;
  	ProjectionInfo *ri_projectReturning;
+ 
+ 	void		*ri_PreparedInsertOpaque;
  } ResultRelInfo;
  
  /* ----------------
Index: src/include/utils/rel.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/utils/rel.h,v
retrieving revision 1.92
diff -c -r1.92 rel.h
*** src/include/utils/rel.h	4 Oct 2006 00:30:10 -0000	1.92
--- src/include/utils/rel.h	15 Jan 2007 17:24:52 -0000
***************
*** 116,121 ****
--- 116,123 ----
  	FmgrInfo	amvacuumcleanup;
  	FmgrInfo	amcostestimate;
  	FmgrInfo	amoptions;
+ 	FmgrInfo	amprepareinsert;
+ 	FmgrInfo	amfinishinsert;
  } RelationAmInfo;
  
  
