*** a/doc/src/sgml/gist.sgml
--- b/doc/src/sgml/gist.sgml
***************
*** 709,741 **** my_distance(PG_FUNCTION_ARGS)
  
  </sect1>
  
- <sect1 id="gist-recovery">
-  <title>Crash Recovery</title>
- 
-  <para>
-   Usually, replay of the WAL log is sufficient to restore the integrity
-   of a GiST index following a database crash.  However, there are some
-   corner cases in which the index state is not fully rebuilt.  The index
-   will still be functionally correct, but there might be some performance
-   degradation.  When this occurs, the index can be repaired by
-   <command>VACUUM</>ing its table, or by rebuilding the index using
-   <command>REINDEX</>.  In some cases a plain <command>VACUUM</> is
-   not sufficient, and either <command>VACUUM FULL</> or <command>REINDEX</>
-   is needed.  The need for one of these procedures is indicated by occurrence
-   of this log message during crash recovery:
- <programlisting>
- LOG:  index NNN/NNN/NNN needs VACUUM or REINDEX to finish crash recovery
- </programlisting>
-   or this log message during routine index insertions:
- <programlisting>
- LOG:  index "FOO" needs VACUUM or REINDEX to finish crash recovery
- </programlisting>
-   If a plain <command>VACUUM</> finds itself unable to complete recovery
-   fully, it will return a notice:
- <programlisting>
- NOTICE:  index "FOO" needs VACUUM FULL or REINDEX to finish crash recovery
- </programlisting>
-  </para>
- </sect1>
- 
  </chapter>
--- 709,712 ----
*** a/src/backend/access/gist/README
--- b/src/backend/access/gist/README
***************
*** 108,150 **** Penalty is used for choosing a subtree to insert; method PickSplit is used for
  the node splitting algorithm; method Union is used for propagating changes
  upward to maintain the tree properties.
  
! NOTICE: We modified original INSERT algorithm for performance reason. In
! particularly, it is now a single-pass algorithm.
! 
! Function findLeaf is used to identify subtree for insertion. Page, in which
! insertion is proceeded, is locked as well as its parent page. Functions
! findParent and findPath are used to find parent pages, which could be changed
! because of concurrent access. Function pageSplit is recurrent and could split
! page by more than 2 pages, which could be necessary if keys have different
! lengths or more than one key are inserted (in such situation, user defined
! function pickSplit cannot guarantee free space on page).
! 
! findLeaf(new-key)
! 	push(stack, [root, 0]) //page, LSN
! 	while(true)
! 		ptr = top of stack
! 		latch( ptr->page, S-mode )
! 		ptr->lsn = ptr->page->lsn
! 		if ( exists ptr->parent AND ptr->parent->lsn < ptr->page->nsn )
! 			unlatch( ptr->page )
! 			pop stack
! 		else if ( ptr->page is not leaf )
! 			push( stack, [get_best_child(ptr->page, new-key), 0] )
! 			unlatch( ptr->page )
! 		else
! 			unlatch( ptr->page )
! 			latch( ptr->page, X-mode )
! 			if ( ptr->page is not leaf )
! 				//the only root page can become a non-leaf
! 				unlatch( ptr->page )
! 			else if ( ptr->parent->lsn < ptr->page->nsn )
! 				unlatch( ptr->page )
! 				pop stack
! 			else
! 				return stack
! 			end
! 		end
! 	end
  
  findPath( stack item )
  	push stack, [root, 0, 0] // page, LSN, parent
--- 108,178 ----
  the node splitting algorithm; method Union is used for propagating changes
  upward to maintain the tree properties.
  
! To insert a tuple, we first have to find a suitable leaf page to insert to.
! The algorithm walks down the tree, starting from the root, along the path
! of smallest Penalty. At each step:
! 
! 1. Has this page been split since we looked at the parent? If so, it's
! possible that we should be inserting to the other half instead, so retreat
! back to the parent.
! 2. If this is a leaf node, we've found our target node.
! 3. Otherwise use Penalty to pick a new target subtree.
! 4. Check the key representing the target subtree. If it doesn't already cover
! the key we're inserting, replace it with the Union of the old downlink key
! and the key being inserted. (Actually, we always call Union, and just skip
! the replacement if the Unioned key is the same as the existing key)
! 5. Replacing the key in step 4 might cause the page to be split. In that case,
! propagate the change upwards and restart the algorithm from the first parent
! that didn't need to be split.
! 6. Walk down to the target subtree, and goto 1.
! 
! This differs from the insertion algorithm in the original paper. In the
! original paper, you first walk down the tree until you reach a leaf page, and
! then you adjust the downlink in the parent, and propagating the adjustment up,
! all the way up to the root in the worst case. But we adjust the downlinks to
! cover the new key already when we walk down, so that when we reach the leaf
! page, we don't need to update the parents anymore, except to insert the
! downlinks if we have to split the page. This makes crash recovery simpler:
! after inserting a key to the page, the tree is immediately self-consistent
! without having to update the parents. Even if we split a page and crash before
! inserting the downlink to the parent, the tree is self-consistent because the
! right half of the split is accessible via the rightlink of the left page
! (which replaced the original page).
! 
! Note that the algorithm can walk up and down the tree before reaching a leaf
! page, if internal pages need to split while adjusting the downlinks for the
! new key. Eventually, you should reach the bottom, and proceed with the
! insertion of the new tuple.
! 
! Once we've found the target page to insert to, we check if there's room
! for the new tuple. If there is, the tuple is inserted, and we're done.
! If it doesn't fit, however, the page needs to be split. Note that it is
! possible that a page needs to be split into more than two pages, if keys have
! different lengths or more than one key is being inserted at a time (which can
! happen when inserting downlinks for a page split that resulted in more than
! two pages at the lower level). After splitting a page, the parent page needs
! to be updated. The downlink for the new page needs to be inserted, and the
! downlink for the old page, which became the left half of the split, needs to
! be updated to only cover those tuples that stayed on the left page. Inserting
! the downlink in the parent can again lead to a page split, recursing up to the
! root page in the worst case.
! 
! gistplacetopage is the workhorse function that performs one step of the
! insertion. If the tuple fits, it inserts it to the given page, otherwise
! it splits the page, and constructs the new downlink tuples for the split
! pages. The caller must then call gistplacetopage() on the parent page to
! insert the downlink tuples. The parent page that holds the downlink to
! the child might have migrated as a result of concurrent splits of the
! parent, gistfindCorrectParent() is used to find the parent page.
! 
! Splitting the root page works slightly differently. At root split,
! gistplacetopage() allocates the new child pages and replaces the old root
! page with the new root containing downlinks to the new children, all in one
! operation.
! 
! 
! findPath is a subroutine of findParent, used when the correct parent page
! can't be found by following the rightlinks at the parent level:
  
  findPath( stack item )
  	push stack, [root, 0, 0] // page, LSN, parent
***************
*** 165,170 **** findPath( stack item )
--- 193,203 ----
  		pop stack
  	end
  
+ 
+ gistFindCorrectParent is used to re-find the parent of a page during
+ insertion. It might have migrated to the right since we traversed down the
+ tree because of page splits.
+ 
  findParent( stack item )
  	parent = item->parent
  	latch( parent->page, X-mode )
***************
*** 184,189 **** findParent( stack item )
--- 217,225 ----
  		return findParent( item )
  	end
  
+ pageSplit function decides how to distribute keys to the new pages after
+ page split:
+ 
  pageSplit(page, allkeys)
  	(lkeys, rkeys) = pickSplit( allkeys )
  	if ( page is root )
***************
*** 204,242 **** pageSplit(page, allkeys)
  	return newkeys
  
  
- placetopage(page, keysarray)
- 	if ( no space left on page )
- 		keysarray = pageSplit(page, [ extract_keys(page), keysarray])
- 		last page in chain gets old NSN,
- 		original and others - new NSN equals to LSN
- 		if ( page is root )
- 			make new root with keysarray
- 		end
- 	else
- 		put keysarray on page
- 		if ( length of keysarray > 1 )
- 			keysarray = [ union(keysarray) ]
- 		end
- 	end
  
! insert(new-key)
! 	stack = findLeaf(new-key)
! 	keysarray = [new-key]
! 	ptr = top of stack
! 	while(true)
! 		findParent( ptr ) //findParent latches parent page
! 		keysarray = placetopage(ptr->page, keysarray)
! 		unlatch( ptr->page )
! 		pop stack;
! 		ptr = top of stack
! 		if (length of keysarray == 1)
! 			newboundingkey = union(oldboundingkey, keysarray)
! 			if (newboundingkey == oldboundingkey)
! 				unlatch ptr->page
! 				break loop
! 			end
! 		end
! 	end
  
  Authors:
  	Teodor Sigaev	<teodor@sigaev.ru>
--- 240,283 ----
  	return newkeys
  
  
  
! Concurrency control
! -------------------
! As a rule of thumb, if you need to hold a lock on multiple pages at the
! same time, the locks should be acquired in the following order: child page
! before parent, and left-to-right at the same level. Always acquiring the
! locks in the same order avoids deadlocks.
! 
! The search algorithm only looks at and locks one page at a time. Consequently
! there's a race condition between a search and a page split. A page split
! happens in two phases: 1. The page is split 2. The downlink is inserted to the
! parent. If a search looks at the parent page between those steps, before the
! downlink is inserted, it will still find the new right half by following the
! rightlink on the left half. But it must not follow the rightlink if it saw the
! downlink in the parent, or the page will be visited twice!
! 
! A split initially marks the left page with the F_FOLLOW_RIGHT flag. If a scan
! sees that flag set, it knows that the right page is missing the downlink, and
! should be visited too. When split inserts the downlink to the parent, it
! clears the F_FOLLOW_RIGHT flag in the child, and sets the NSN field in the
! child page header to match the LSN of the insertion on the parent. If the
! F_FOLLOW_RIGHT flag is not set, a scan compares the NSN on the child and the
! LSN it saw in the parent. If NSN < LSN, the scan looked at the parent page
! before the downlink was inserted, so it should follow the rightlink. Otherwise
! the scan saw the downlink in the parent page, and will/did follow that as
! usual.
! 
! A scan can't normally see a page with the F_FOLLOW_RIGHT flag set, because
! a page split keeps the child pages locked until the downlink has been inserted
! to the parent and the flag cleared again. But if a crash happens in the middle
! of a page split, before the downlinks are inserted into the parent, that will
! leave a page with F_FOLLOW_RIGHT in the tree. Scans handle that just fine,
! but we'll eventually want to fix that for performance reasons. And more
! importantly, dealing with pages with missing downlink pointers in the parent
! would complicate the insertion algorithm. So when an insertion sees a page
! with F_FOLLOW_RIGHT set, it immediately tries to bring the split that
! crashed in the middle to completion by adding the downlink in the parent.
! 
  
  Authors:
  	Teodor Sigaev	<teodor@sigaev.ru>
*** a/src/backend/access/gist/gist.c
--- b/src/backend/access/gist/gist.c
***************
*** 31,36 **** typedef struct
--- 31,42 ----
  	MemoryContext tmpCtx;
  } GISTBuildState;
  
+ /* A List of these is used represent a split-in-progress. */
+ typedef struct
+ {
+ 	Buffer		buf;		/* the split page "half" */
+ 	IndexTuple	downlink;	/* downlink for this half. */
+ } GISTPageSplitInfo;
  
  /* non-export function prototypes */
  static void gistbuildCallback(Relation index,
***************
*** 43,50 **** static void gistdoinsert(Relation r,
  			 IndexTuple itup,
  			 Size freespace,
  			 GISTSTATE *GISTstate);
! static void gistfindleaf(GISTInsertState *state,
! 			 GISTSTATE *giststate);
  
  
  #define ROTATEDIST(d) do { \
--- 49,61 ----
  			 IndexTuple itup,
  			 Size freespace,
  			 GISTSTATE *GISTstate);
! static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
! static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
! 				 GISTSTATE *giststate,
! 				 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
! 				 Buffer leftchild);
! static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
! 				GISTSTATE *giststate, List *splitinfo);
  
  
  #define ROTATEDIST(d) do { \
***************
*** 251,291 **** gistinsert(PG_FUNCTION_ARGS)
  
  
  /*
!  * Workhouse routine for doing insertion into a GiST index. Note that
!  * this routine assumes it is invoked in a short-lived memory context,
!  * so it does not bother releasing palloc'd allocations.
   */
! static void
! gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
  {
! 	GISTInsertState state;
! 
! 	memset(&state, 0, sizeof(GISTInsertState));
! 
! 	state.itup = (IndexTuple *) palloc(sizeof(IndexTuple));
! 	state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
! 	memcpy(state.itup[0], itup, IndexTupleSize(itup));
! 	state.ituplen = 1;
! 	state.freespace = freespace;
! 	state.r = r;
! 	state.key = itup->t_tid;
! 	state.needInsertComplete = true;
! 
! 	state.stack = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
! 	state.stack->blkno = GIST_ROOT_BLKNO;
  
! 	gistfindleaf(&state, giststate);
! 	gistmakedeal(&state, giststate);
! }
  
! static bool
! gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
! {
! 	bool		is_splitted = false;
! 	bool		is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
  
  	/*
! 	 * if (!is_leaf) remove old key: This node's key has been modified, either
  	 * because a child split occurred or because we needed to adjust our key
  	 * for an insert in a child node. Therefore, remove the old version of
  	 * this node's key.
--- 262,313 ----
  
  
  /*
!  * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
!  * at that offset is atomically removed along with inserting the new tuples.
!  * This is used to replace a tuple with a new one.
!  *
!  * If 'leftchildbuf' is valid, we're inserting the downlink for the page
!  * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
!  * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
!  *
!  * If there is not enough room on the page, it is split. All the split
!  * pages are kept pinned and locked and returned in *splitinfo, the caller
!  * is responsible for inserting the downlinks for them. However, if
!  * 'buffer' is the root page and it needs to be split, gistplacetopage()
!  * performs the split as one atomic operation, and *splitinfo is set to NIL.
!  * In that case, we continue to hold the root page locked, and the child
!  * pages are released; note that new tuple(s) are *not* on the root page
!  * but in one of the new child pages.
   */
! static bool
! gistplacetopage(GISTInsertState *state, GISTSTATE *giststate,
! 				Buffer buffer,
! 				IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
! 				Buffer leftchildbuf,
! 				List **splitinfo)
  {
! 	Page		page = BufferGetPage(buffer);
! 	bool		is_leaf = (GistPageIsLeaf(page)) ? true : false;
! 	XLogRecPtr	recptr;
! 	int			i;
! 	bool		is_split;
  
! 	/*
! 	 * Refuse to modify a page that's incompletely split. This should
! 	 * not happen because we finish any incomplete splits while we walk
! 	 * down the tree. However, it's remotely possible that another
! 	 * concurrent inserter splits a parent page, and errors out before
! 	 * completing the split. We will just throw an error in that case,
! 	 * and leave any split we had in progress unfinished too. The next
! 	 * insert that comes along will clean up the mess.
! 	 */
! 	if (GistFollowRight(page))
! 		elog(ERROR, "concurrent GiST page split was incomplete");
  
! 	*splitinfo = NIL;
  
  	/*
! 	 * if isupdate, remove old key: This node's key has been modified, either
  	 * because a child split occurred or because we needed to adjust our key
  	 * for an insert in a child node. Therefore, remove the old version of
  	 * this node's key.
***************
*** 293,369 **** gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
  	 * for WAL replay, in the non-split case we handle this by setting up a
  	 * one-element todelete array; in the split case, it's handled implicitly
  	 * because the tuple vector passed to gistSplit won't include this tuple.
- 	 *
- 	 * XXX: If we want to change fillfactors between node and leaf, fillfactor
- 	 * = (is_leaf ? state->leaf_fillfactor : state->node_fillfactor)
  	 */
! 	if (gistnospace(state->stack->page, state->itup, state->ituplen,
! 					is_leaf ? InvalidOffsetNumber : state->stack->childoffnum,
! 					state->freespace))
  	{
  		/* no space for insertion */
  		IndexTuple *itvec;
  		int			tlen;
  		SplitedPageLayout *dist = NULL,
  				   *ptr;
! 		BlockNumber rrlink = InvalidBlockNumber;
! 		GistNSN		oldnsn;
  
! 		is_splitted = true;
  
  		/*
! 		 * Form index tuples vector to split: remove old tuple if t's needed
! 		 * and add new tuples to vector
  		 */
! 		itvec = gistextractpage(state->stack->page, &tlen);
! 		if (!is_leaf)
  		{
  			/* on inner page we should remove old tuple */
! 			int			pos = state->stack->childoffnum - FirstOffsetNumber;
  
  			tlen--;
  			if (pos != tlen)
  				memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
  		}
! 		itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
! 		dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate);
! 
! 		state->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * tlen);
! 		state->ituplen = 0;
  
! 		if (state->stack->blkno != GIST_ROOT_BLKNO)
  		{
! 			/*
! 			 * if non-root split then we should not allocate new buffer, but
! 			 * we must create temporary page to operate
! 			 */
! 			dist->buffer = state->stack->buffer;
! 			dist->page = PageGetTempPageCopySpecial(BufferGetPage(dist->buffer));
  
  			/* clean all flags except F_LEAF */
  			GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
  		}
  
! 		/* make new pages and fills them */
  		for (ptr = dist; ptr; ptr = ptr->next)
  		{
! 			int			i;
! 			char	   *data;
  
! 			/* get new page */
! 			if (ptr->buffer == InvalidBuffer)
  			{
! 				ptr->buffer = gistNewBuffer(state->r);
! 				GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
! 				ptr->page = BufferGetPage(ptr->buffer);
  			}
! 			ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
  
! 			/*
! 			 * fill page, we can do it because all these pages are new (ie not
! 			 * linked in tree or masked by temp page
! 			 */
! 			data = (char *) (ptr->list);
  			for (i = 0; i < ptr->block.num; i++)
  			{
  				if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
--- 315,448 ----
  	 * for WAL replay, in the non-split case we handle this by setting up a
  	 * one-element todelete array; in the split case, it's handled implicitly
  	 * because the tuple vector passed to gistSplit won't include this tuple.
  	 */
! 	is_split = gistnospace(page, itup, ntup, oldoffnum, state->freespace);
! 	if (is_split)
  	{
  		/* no space for insertion */
  		IndexTuple *itvec;
  		int			tlen;
  		SplitedPageLayout *dist = NULL,
  				   *ptr;
! 		BlockNumber oldrlink = InvalidBlockNumber;
! 		GistNSN		oldnsn = { 0, 0 };
! 		SplitedPageLayout rootpg;
! 		BlockNumber blkno = BufferGetBlockNumber(buffer);
! 		bool		is_rootsplit;
  
! 		is_rootsplit = (blkno == GIST_ROOT_BLKNO);
  
  		/*
! 		 * Form index tuples vector to split. If we're replacing an old tuple,
! 		 * remove the old version from the vector.
  		 */
! 		itvec = gistextractpage(page, &tlen);
! 		if (OffsetNumberIsValid(oldoffnum))
  		{
  			/* on inner page we should remove old tuple */
! 			int			pos = oldoffnum - FirstOffsetNumber;
  
  			tlen--;
  			if (pos != tlen)
  				memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
  		}
! 		itvec = gistjoinvector(itvec, &tlen, itup, ntup);
! 		dist = gistSplit(state->r, page, itvec, tlen, giststate);
  
! 		/*
! 		 * Set up pages to work with. Allocate new buffers for all but the
! 		 * leftmost page. The original page becomes the new leftmost page,
! 		 * and is just replaced with the new contents.
! 		 *
! 		 * For a root-split, allocate new buffers for all child pages, the
! 		 * original page is overwritten with new root page containing
! 		 * downlinks to the new child pages.
! 		 */
! 		ptr = dist;
! 		if (!is_rootsplit)
  		{
! 			/* save old rightlink and NSN */
! 			oldrlink = GistPageGetOpaque(page)->rightlink;
! 			oldnsn = GistPageGetOpaque(page)->nsn;
! 
! 			dist->buffer = buffer;
! 			dist->block.blkno = BufferGetBlockNumber(buffer);
! 			dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
  
  			/* clean all flags except F_LEAF */
  			GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
+ 
+ 			ptr = ptr->next;
+ 		}
+ 		for (; ptr; ptr = ptr->next)
+ 		{
+ 			/* Allocate new page */
+ 			ptr->buffer = gistNewBuffer(state->r);
+ 			GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
+ 			ptr->page = BufferGetPage(ptr->buffer);
+ 			ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
  		}
  
! 		/*
! 		 * Now that we know whick blocks the new pages go to, set up downlink
! 		 * tuples to point to them.
! 		 */
  		for (ptr = dist; ptr; ptr = ptr->next)
  		{
! 			ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
! 			GistTupleSetValid(ptr->itup);
! 		}
  
! 		/*
! 		 * If this is a root split, we construct the new root page with the
! 		 * downlinks here directly, instead of requiring the caller to insert
! 		 * them. Add the new root page to the list along with the child pages.
! 		 */
! 		if (is_rootsplit)
! 		{
! 			IndexTuple *downlinks;
! 			int ndownlinks = 0;
! 			int i;
! 
! 			rootpg.buffer = buffer;
! 			rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
! 			GistPageGetOpaque(rootpg.page)->flags = 0;
! 
! 			/* Prepare a vector of all the downlinks */
! 			for (ptr = dist; ptr; ptr = ptr->next)
! 				ndownlinks++;
! 			downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
! 			for (i = 0, ptr = dist; ptr; ptr = ptr->next)
! 				downlinks[i++] = ptr->itup;
! 
! 			rootpg.block.blkno = GIST_ROOT_BLKNO;
! 			rootpg.block.num = ndownlinks;
! 			rootpg.list = gistfillitupvec(downlinks, ndownlinks,
! 										  &(rootpg.lenlist));
! 			rootpg.itup = NULL;
! 
! 			rootpg.next = dist;
! 			dist = &rootpg;
! 		}
! 		else
! 		{
! 			/* Prepare split-info to be returned to caller */
! 			for (ptr = dist; ptr; ptr = ptr->next)
  			{
! 				GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
! 				si->buf = ptr->buffer;
! 				si->downlink = ptr->itup;
! 				*splitinfo = lappend(*splitinfo, si);
  			}
! 		}
  
! 		/*
! 		 * Fill all pages. All the pages are new, ie. freshly allocated empty
! 		 * pages, or a temporary copy of the old page.
! 		 */
! 		for (ptr = dist; ptr; ptr = ptr->next)
! 		{
! 			char *data = (char *) (ptr->list);
  			for (i = 0; i < ptr->block.num; i++)
  			{
  				if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
***************
*** 371,646 **** gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
  				data += IndexTupleSize((IndexTuple) data);
  			}
  
! 			/* set up ItemPointer and remember it for parent */
! 			ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
! 			state->itup[state->ituplen] = ptr->itup;
! 			state->ituplen++;
! 		}
  
! 		/* saves old rightlink */
! 		if (state->stack->blkno != GIST_ROOT_BLKNO)
! 			rrlink = GistPageGetOpaque(dist->page)->rightlink;
  
  		START_CRIT_SECTION();
  
  		/*
! 		 * must mark buffers dirty before XLogInsert, even though we'll still
! 		 * be changing their opaque fields below. set up right links.
  		 */
  		for (ptr = dist; ptr; ptr = ptr->next)
- 		{
  			MarkBufferDirty(ptr->buffer);
! 			GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ?
! 				ptr->next->block.blkno : rrlink;
! 		}
  
! 		/* restore splitted non-root page */
! 		if (state->stack->blkno != GIST_ROOT_BLKNO)
! 		{
! 			PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
! 			dist->page = BufferGetPage(dist->buffer);
! 		}
  
  		if (RelationNeedsWAL(state->r))
! 		{
! 			XLogRecPtr	recptr;
! 			XLogRecData *rdata;
! 
! 			rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
! 								   is_leaf, &(state->key), dist);
! 
! 			recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
! 
! 			for (ptr = dist; ptr; ptr = ptr->next)
! 			{
! 				PageSetLSN(ptr->page, recptr);
! 				PageSetTLI(ptr->page, ThisTimeLineID);
! 			}
! 		}
  		else
! 		{
! 			for (ptr = dist; ptr; ptr = ptr->next)
! 			{
! 				PageSetLSN(ptr->page, GetXLogRecPtrForTemp());
! 			}
! 		}
! 
! 		/* set up NSN */
! 		oldnsn = GistPageGetOpaque(dist->page)->nsn;
! 		if (state->stack->blkno == GIST_ROOT_BLKNO)
! 			/* if root split we should put initial value */
! 			oldnsn = PageGetLSN(dist->page);
  
  		for (ptr = dist; ptr; ptr = ptr->next)
  		{
! 			/* only for last set oldnsn */
! 			GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ?
! 				PageGetLSN(ptr->page) : oldnsn;
  		}
  
  		/*
! 		 * release buffers, if it was a root split then release all buffers
! 		 * because we create all buffers
  		 */
! 		ptr = (state->stack->blkno == GIST_ROOT_BLKNO) ? dist : dist->next;
! 		for (; ptr; ptr = ptr->next)
! 			UnlockReleaseBuffer(ptr->buffer);
! 
! 		if (state->stack->blkno == GIST_ROOT_BLKNO)
  		{
! 			gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
! 			state->needInsertComplete = false;
  		}
- 
- 		END_CRIT_SECTION();
  	}
  	else
  	{
! 		/* enough space */
  		START_CRIT_SECTION();
  
! 		if (!is_leaf)
! 			PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
! 		gistfillbuffer(state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber);
  
! 		MarkBufferDirty(state->stack->buffer);
  
  		if (RelationNeedsWAL(state->r))
  		{
! 			OffsetNumber noffs = 0,
! 						offs[1];
! 			XLogRecPtr	recptr;
! 			XLogRecData *rdata;
  
! 			if (!is_leaf)
  			{
! 				/* only on inner page we should delete previous version */
! 				offs[0] = state->stack->childoffnum;
! 				noffs = 1;
  			}
  
! 			rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer,
! 									offs, noffs,
! 									state->itup, state->ituplen,
! 									&(state->key));
  
! 			recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
! 			PageSetLSN(state->stack->page, recptr);
! 			PageSetTLI(state->stack->page, ThisTimeLineID);
  		}
  		else
! 			PageSetLSN(state->stack->page, GetXLogRecPtrForTemp());
! 
! 		if (state->stack->blkno == GIST_ROOT_BLKNO)
! 			state->needInsertComplete = false;
  
! 		END_CRIT_SECTION();
  
! 		if (state->ituplen > 1)
! 		{						/* previous is_splitted==true */
  
! 			/*
! 			 * child was splited, so we must form union for insertion in
! 			 * parent
! 			 */
! 			IndexTuple	newtup = gistunion(state->r, state->itup, state->ituplen, giststate);
  
! 			ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno);
! 			state->itup[0] = newtup;
! 			state->ituplen = 1;
! 		}
! 		else if (is_leaf)
! 		{
! 			/*
! 			 * itup[0] store key to adjust parent, we set it to valid to
! 			 * correct check by GistTupleIsInvalid macro in gistgetadjusted()
! 			 */
! 			ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno);
! 			GistTupleSetValid(state->itup[0]);
! 		}
  	}
! 	return is_splitted;
  }
  
  /*
!  * returns stack of pages, all pages in stack are pinned, and
!  * leaf is X-locked
   */
- 
  static void
! gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
  {
  	ItemId		iid;
  	IndexTuple	idxtuple;
! 	GISTPageOpaque opaque;
  
  	/*
! 	 * walk down, We don't lock page for a long time, but so we should be
! 	 * ready to recheck path in a bad case... We remember, that page->lsn
! 	 * should never be invalid.
  	 */
  	for (;;)
  	{
! 		if (XLogRecPtrIsInvalid(state->stack->lsn))
! 			state->stack->buffer = ReadBuffer(state->r, state->stack->blkno);
! 		LockBuffer(state->stack->buffer, GIST_SHARE);
! 		gistcheckpage(state->r, state->stack->buffer);
  
! 		state->stack->page = (Page) BufferGetPage(state->stack->buffer);
! 		opaque = GistPageGetOpaque(state->stack->page);
  
! 		state->stack->lsn = PageGetLSN(state->stack->page);
! 		Assert(!RelationNeedsWAL(state->r) || !XLogRecPtrIsInvalid(state->stack->lsn));
  
! 		if (state->stack->blkno != GIST_ROOT_BLKNO &&
! 			XLByteLT(state->stack->parent->lsn, opaque->nsn))
  		{
  			/*
! 			 * caused split non-root page is detected, go up to parent to
! 			 * choose best child
  			 */
! 			UnlockReleaseBuffer(state->stack->buffer);
! 			state->stack = state->stack->parent;
  			continue;
  		}
  
! 		if (!GistPageIsLeaf(state->stack->page))
  		{
  			/*
! 			 * This is an internal page, so continue to walk down the tree. We
! 			 * find the child node that has the minimum insertion penalty and
! 			 * recursively invoke ourselves to modify that node. Once the
! 			 * recursive call returns, we may need to adjust the parent node
! 			 * for two reasons: the child node split, or the key in this node
! 			 * needs to be adjusted for the newly inserted key below us.
  			 */
! 			GISTInsertStack *item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
! 
! 			state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate);
  
! 			iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
! 			idxtuple = (IndexTuple) PageGetItem(state->stack->page, iid);
! 			item->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
! 			LockBuffer(state->stack->buffer, GIST_UNLOCK);
  
! 			item->parent = state->stack;
! 			item->child = NULL;
! 			if (state->stack)
! 				state->stack->child = item;
! 			state->stack = item;
! 		}
! 		else
! 		{
! 			/* be carefull, during unlock/lock page may be changed... */
! 			LockBuffer(state->stack->buffer, GIST_UNLOCK);
! 			LockBuffer(state->stack->buffer, GIST_EXCLUSIVE);
! 			state->stack->page = (Page) BufferGetPage(state->stack->buffer);
! 			opaque = GistPageGetOpaque(state->stack->page);
  
! 			if (state->stack->blkno == GIST_ROOT_BLKNO)
  			{
  				/*
! 				 * the only page can become inner instead of leaf is a root
! 				 * page, so for root we should recheck it
  				 */
! 				if (!GistPageIsLeaf(state->stack->page))
  				{
! 					/*
! 					 * very rarely situation: during unlock/lock index with
! 					 * number of pages = 1 was increased
! 					 */
! 					LockBuffer(state->stack->buffer, GIST_UNLOCK);
! 					continue;
! 				}
  
  				/*
! 				 * we don't need to check root split, because checking
! 				 * leaf/inner is enough to recognize split for root
  				 */
! 
  			}
! 			else if (XLByteLT(state->stack->parent->lsn, opaque->nsn))
  			{
! 				/*
! 				 * detecting split during unlock/lock, so we should find
! 				 * better child on parent
! 				 */
  
! 				/* forget buffer */
! 				UnlockReleaseBuffer(state->stack->buffer);
  
! 				state->stack = state->stack->parent;
! 				continue;
  			}
  
! 			state->stack->lsn = PageGetLSN(state->stack->page);
  
! 			/* ok we found a leaf page and it X-locked */
  			break;
  		}
  	}
- 
- 	/* now state->stack->(page, buffer and blkno) points to leaf page */
  }
  
  /*
--- 450,830 ----
  				data += IndexTupleSize((IndexTuple) data);
  			}
  
! 			/* Set up rightlinks */
! 			if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
! 				GistPageGetOpaque(ptr->page)->rightlink =
! 					ptr->next->block.blkno;
! 			else
! 				GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
! 
! 			if (ptr->next && !is_rootsplit)
! 				GistMarkFollowRight(ptr->page);
! 			else
! 				GistClearFollowRight(ptr->page);
  
! 			/*
! 			 * Copy the NSN of the original page to all pages. The
! 			 * F_FOLLOW_RIGHT flags ensure that scans will follow the
! 			 * rightlinks until the downlinks are inserted.
! 			 */
! 			GistPageGetOpaque(ptr->page)->nsn = oldnsn;
! 		}
  
  		START_CRIT_SECTION();
  
  		/*
! 		 * Must mark buffers dirty before XLogInsert, even though we'll still
! 		 * be changing their opaque fields below.
  		 */
  		for (ptr = dist; ptr; ptr = ptr->next)
  			MarkBufferDirty(ptr->buffer);
! 		if (BufferIsValid(leftchildbuf))
! 			MarkBufferDirty(leftchildbuf);
  
! 		/*
! 		 * The first page in the chain was a temporary working copy meant
! 		 * to replace the old page. Copy it over the old page.
! 		 */
! 		PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
! 		dist->page = BufferGetPage(dist->buffer);
  
+ 		/* Write the WAL record */
  		if (RelationNeedsWAL(state->r))
! 			recptr = gistXLogSplit(state->r->rd_node, blkno, is_leaf,
! 								   dist, oldrlink, oldnsn, leftchildbuf);
  		else
! 			recptr = GetXLogRecPtrForTemp();
  
  		for (ptr = dist; ptr; ptr = ptr->next)
  		{
! 			PageSetLSN(ptr->page, recptr);
! 			PageSetTLI(ptr->page, ThisTimeLineID);
  		}
  
  		/*
! 		 * Return the new child buffers to the caller.
! 		 *
! 		 * If this was a root split, we've already inserted the downlink
! 		 * pointers, in the form of a new root page. Therefore we can
! 		 * release all the new buffers, and keep just the root page locked.
  		 */
! 		if (is_rootsplit)
  		{
! 			for (ptr = dist->next; ptr; ptr = ptr->next)
! 				UnlockReleaseBuffer(ptr->buffer);
  		}
  	}
  	else
  	{
! 		/*
! 		 * Enough space. We also get here if ntuples==0.
! 		 */
  		START_CRIT_SECTION();
  
! 		if (OffsetNumberIsValid(oldoffnum))
! 			PageIndexTupleDelete(page, oldoffnum);
! 		gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
  
! 		MarkBufferDirty(buffer);
! 
! 		if (BufferIsValid(leftchildbuf))
! 			MarkBufferDirty(leftchildbuf);
  
  		if (RelationNeedsWAL(state->r))
  		{
! 			OffsetNumber ndeloffs = 0,
! 						deloffs[1];
  
! 			if (OffsetNumberIsValid(oldoffnum))
  			{
! 				deloffs[0] = oldoffnum;
! 				ndeloffs = 1;
  			}
  
! 			recptr = gistXLogUpdate(state->r->rd_node, buffer,
! 									deloffs, ndeloffs, itup, ntup,
! 									leftchildbuf);
  
! 			PageSetLSN(page, recptr);
! 			PageSetTLI(page, ThisTimeLineID);
  		}
  		else
! 		{
! 			recptr = GetXLogRecPtrForTemp();
! 			PageSetLSN(page, recptr);
! 		}
  
! 		*splitinfo = NIL;
! 	}
  
! 	/*
! 	 * If we inserted the downlink for a child page, set NSN and clear
! 	 * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know
! 	 * to follow the rightlink if and only if they looked at the parent page
! 	 * before we inserted the downlink.
! 	 *
! 	 * Note that we do this *after* writing the WAL record. That means that
! 	 * the possible full page image in the WAL record does not include
! 	 * these changes, and they must be replayed even if the page is restored
! 	 * from the full page image. There's a chicken-and-egg problem: if we
! 	 * updated the child pages first, we wouldn't know the recptr of the WAL
! 	 * record we're about to write.
! 	 */
! 	if (BufferIsValid(leftchildbuf))
! 	{
! 		Page leftpg = BufferGetPage(leftchildbuf);
  
! 		GistPageGetOpaque(leftpg)->nsn = recptr;
! 		GistClearFollowRight(leftpg);
  
! 		PageSetLSN(leftpg, recptr);
! 		PageSetTLI(leftpg, ThisTimeLineID);
  	}
! 
! 	END_CRIT_SECTION();
! 
! 	return is_split;
  }
  
  /*
!  * Workhouse routine for doing insertion into a GiST index. Note that
!  * this routine assumes it is invoked in a short-lived memory context,
!  * so it does not bother releasing palloc'd allocations.
   */
  static void
! gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
  {
  	ItemId		iid;
  	IndexTuple	idxtuple;
! 	GISTInsertStack firststack;
! 	GISTInsertStack *stack;
! 	GISTInsertState state;
! 	bool		xlocked = false;
! 
! 	memset(&state, 0, sizeof(GISTInsertState));
! 	state.freespace = freespace;
! 	state.r = r;
! 
! 	/* Start from the root */
! 	firststack.blkno = GIST_ROOT_BLKNO;
! 	firststack.lsn.xrecoff = 0;
! 	firststack.parent = NULL;
! 	state.stack = stack = &firststack;
  
  	/*
! 	 * Walk down along the path of smallest penalty, updating the parent
! 	 * pointers with the key we're inserting as we go. If we crash in the
! 	 * middle, the tree is consistent, although the possible parent updates
! 	 * were a waste.
  	 */
  	for (;;)
  	{
! 		if (XLogRecPtrIsInvalid(stack->lsn))
! 			stack->buffer = ReadBuffer(state.r, stack->blkno);
! 
! 		/*
! 		 * Be optimistic and grab shared lock first. Swap it for an
! 		 * exclusive lock later if we need to update the page.
! 		 */
! 		if (!xlocked)
! 		{
! 			LockBuffer(stack->buffer, GIST_SHARE);
! 			gistcheckpage(state.r, stack->buffer);
! 		}
! 
! 		stack->page = (Page) BufferGetPage(stack->buffer);
! 		stack->lsn = PageGetLSN(stack->page);
! 		Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
  
! 		/*
! 		 * If this page was split but the downlink was never inserted to
! 		 * the parent because the inserting backend crashed before doing
! 		 * that, fix that now.
! 		 */
! 		if (GistFollowRight(stack->page))
! 		{
! 			if (!xlocked)
! 			{
! 				LockBuffer(stack->buffer, GIST_UNLOCK);
! 				LockBuffer(stack->buffer, GIST_EXCLUSIVE);
! 				xlocked = true;
! 				/* someone might've completed the split when we unlocked */
! 				if (!GistFollowRight(stack->page))
! 					continue;
! 			}
! 			gistfixsplit(&state, giststate);
  
! 			UnlockReleaseBuffer(stack->buffer);
! 			xlocked = false;
! 			state.stack = stack = stack->parent;
! 			continue;
! 		}
  
! 		if (stack->blkno != GIST_ROOT_BLKNO &&
! 			XLByteLT(stack->parent->lsn,
! 					 GistPageGetOpaque(stack->page)->nsn))
  		{
  			/*
! 			 * Concurrent split detected. There's no guarantee that the
! 			 * downlink for this page is consistent with the tuple we're
! 			 * inserting anymore, so go back to parent and rechoose the
! 			 * best child.
  			 */
! 			UnlockReleaseBuffer(stack->buffer);
! 			xlocked = false;
! 			state.stack = stack = stack->parent;
  			continue;
  		}
  
! 		if (!GistPageIsLeaf(stack->page))
  		{
  			/*
! 			 * This is an internal page so continue to walk down the tree.
! 			 * Find the child node that has the minimum insertion penalty.
  			 */
! 			BlockNumber childblkno;
! 			IndexTuple newtup;
! 			GISTInsertStack *item;
  
! 			stack->childoffnum = gistchoose(state.r, stack->page, itup, giststate);
! 			iid = PageGetItemId(stack->page, stack->childoffnum);
! 			idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
! 			childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
  
! 			/*
! 			 * Check that it's not a leftover invalid tuple from pre-9.1
! 			 */
! 			if (GistTupleIsInvalid(idxtuple))
! 				ereport(ERROR,
! 						(errmsg("index \"%s\" contains an inner tuple marked as invalid",
! 								RelationGetRelationName(r)),
! 						 errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."),
! 						 errhint("Please REINDEX it.")));
  
! 			/*
! 			 * Check that the key representing the target child node is
! 			 * consistent with the key we're inserting. Update it if it's not.
! 			 */
! 			newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
! 			if (newtup)
  			{
  				/*
! 				 * Swap shared lock for an exclusive one. Beware, the page
! 				 * may change while we unlock/lock the page...
  				 */
! 				if (!xlocked)
  				{
! 					LockBuffer(stack->buffer, GIST_UNLOCK);
! 					LockBuffer(stack->buffer, GIST_EXCLUSIVE);
! 					xlocked = true;
! 					stack->page = (Page) BufferGetPage(stack->buffer);
  
+ 					if (!XLByteEQ(PageGetLSN(stack->page), stack->lsn))
+ 					{
+ 						/* the page was changed while we unlocked it, retry */
+ 						continue;
+ 					}
+ 				}
  				/*
! 				 * Update the tuple.
! 				 *
! 				 * gistinserthere() might have to split the page to make the
! 				 * updated tuple fit. It will adjust the stack so that after
! 				 * the call, we'll be holding a lock on the page containing
! 				 * the tuple, which might have moved right.
! 				 *
! 				 * Except if this causes a root split, gistinserthere()
! 				 * returns 'true'. In that case, stack only holds the new
! 				 * root, and the child page was released. Have to start
! 				 * all over.
  				 */
! 				if (gistinserttuples(&state, stack, giststate, &newtup, 1,
! 									 stack->childoffnum, InvalidBuffer))
! 				{
! 					UnlockReleaseBuffer(stack->buffer);
! 					xlocked = false;
! 					state.stack = stack = stack->parent;
! 					continue;
! 				}
  			}
! 			LockBuffer(stack->buffer, GIST_UNLOCK);
! 			xlocked = false;
! 
! 			/* descend to the chosen child */
! 			item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
! 			item->blkno = childblkno;
! 			item->parent = stack;
! 			state.stack = stack = item;
! 		}
! 		else
! 		{
! 			/*
! 			 * Leaf page. Insert the new key. We've already updated all the
! 			 * parents on the way down, but we might have to split the page
! 			 * if it doesn't fit. gistinserthere() will take care of that.
! 			 */
! 
! 			/*
! 			 * Swap shared lock for an exclusive one. Be careful, the page
! 			 * may change while we unlock/lock the page...
! 			 */
! 			if (!xlocked)
  			{
! 				LockBuffer(stack->buffer, GIST_UNLOCK);
! 				LockBuffer(stack->buffer, GIST_EXCLUSIVE);
! 				xlocked = true;
! 				stack->page = (Page) BufferGetPage(stack->buffer);
! 				stack->lsn = PageGetLSN(stack->page);
  
! 				if (stack->blkno == GIST_ROOT_BLKNO)
! 				{
! 					/*
! 					 * the only page that can become inner instead of leaf
! 					 * is the root page, so for root we should recheck it
! 					 */
! 					if (!GistPageIsLeaf(stack->page))
! 					{
! 						/*
! 						 * very rare situation: during unlock/lock index with
! 						 * number of pages = 1 was increased
! 						 */
! 						LockBuffer(stack->buffer, GIST_UNLOCK);
! 						xlocked = false;
! 						continue;
! 					}
  
! 					/*
! 					 * we don't need to check root split, because checking
! 					 * leaf/inner is enough to recognize split for root
! 					 */
! 				}
! 				else if (GistFollowRight(stack->page) ||
! 						 XLByteLT(stack->parent->lsn,
! 								  GistPageGetOpaque(stack->page)->nsn))
! 				{
! 					/*
! 					 * The page was split while we momentarily unlocked the
! 					 * page. Go back to parent.
! 					 */
! 					UnlockReleaseBuffer(stack->buffer);
! 					xlocked = false;
! 					state.stack = stack = stack->parent;
! 					continue;
! 				}
  			}
  
! 			/* now state.stack->(page, buffer and blkno) points to leaf page */
  
! 			gistinserttuples(&state, stack, giststate, &itup, 1,
! 							 InvalidOffsetNumber, InvalidBuffer);
! 			LockBuffer(stack->buffer, GIST_UNLOCK);
! 
! 			/* Release any pins we might still hold before exiting */
! 			for (; stack; stack = stack->parent)
! 				ReleaseBuffer(stack->buffer);
  			break;
  		}
  	}
  }
  
  /*
***************
*** 648,654 **** gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
   *
   * returns from the beginning of closest parent;
   *
!  * To prevent deadlocks, this should lock only one page simultaneously.
   */
  GISTInsertStack *
  gistFindPath(Relation r, BlockNumber child)
--- 832,838 ----
   *
   * returns from the beginning of closest parent;
   *
!  * To prevent deadlocks, this should lock only one page at a time.
   */
  GISTInsertStack *
  gistFindPath(Relation r, BlockNumber child)
***************
*** 683,688 **** gistFindPath(Relation r, BlockNumber child)
--- 867,879 ----
  
  		top->lsn = PageGetLSN(page);
  
+ 		/*
+ 		 * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
+ 		 * downlink. This should not normally happen..
+ 		 */
+ 		if (GistFollowRight(page))
+ 			elog(ERROR, "concurrent GiST page split was incomplete");
+ 
  		if (top->parent && XLByteLT(top->parent->lsn, GistPageGetOpaque(page)->nsn) &&
  			GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
  		{
***************
*** 711,718 **** gistFindPath(Relation r, BlockNumber child)
  				ptr = top;
  				while (ptr->parent)
  				{
- 					/* set child link */
- 					ptr->parent->child = ptr;
  					/* move childoffnum.. */
  					if (ptr == top)
  					{
--- 902,907 ----
***************
*** 754,770 **** gistFindPath(Relation r, BlockNumber child)
  	return NULL;
  }
  
- 
  /*
!  * Returns X-locked parent of stack page
   */
- 
  static void
  gistFindCorrectParent(Relation r, GISTInsertStack *child)
  {
  	GISTInsertStack *parent = child->parent;
  
- 	LockBuffer(parent->buffer, GIST_EXCLUSIVE);
  	gistcheckpage(r, parent->buffer);
  	parent->page = (Page) BufferGetPage(parent->buffer);
  
--- 943,958 ----
  	return NULL;
  }
  
  /*
!  * Updates the stack so that child->parent is the correct parent of the
!  * child. child->parent must be exclusively locked on entry, and will
!  * remain so at exit, but it might not be the same page anymore.
   */
  static void
  gistFindCorrectParent(Relation r, GISTInsertStack *child)
  {
  	GISTInsertStack *parent = child->parent;
  
  	gistcheckpage(r, parent->buffer);
  	parent->page = (Page) BufferGetPage(parent->buffer);
  
***************
*** 836,918 **** gistFindCorrectParent(Relation r, GISTInsertStack *child)
  
  		/* install new chain of parents to stack */
  		child->parent = parent;
- 		parent->child = child;
  
  		/* make recursive call to normal processing */
  		gistFindCorrectParent(r, child);
  	}
  
  	return;
  }
  
! void
! gistmakedeal(GISTInsertState *state, GISTSTATE *giststate)
  {
! 	int			is_splitted;
! 	ItemId		iid;
! 	IndexTuple	oldtup,
! 				newtup;
  
! 	/* walk up */
! 	while (true)
  	{
! 		/*
! 		 * After this call: 1. if child page was splited, then itup contains
! 		 * keys for each page 2. if  child page wasn't splited, then itup
! 		 * contains additional for adjustment of current key
! 		 */
! 
! 		if (state->stack->parent)
  		{
! 			/*
! 			 * X-lock parent page before proceed child, gistFindCorrectParent
! 			 * should find and lock it
! 			 */
! 			gistFindCorrectParent(state->r, state->stack);
  		}
! 		is_splitted = gistplacetopage(state, giststate);
  
! 		/* parent locked above, so release child buffer */
! 		UnlockReleaseBuffer(state->stack->buffer);
  
! 		/* pop parent page from stack */
! 		state->stack = state->stack->parent;
  
- 		/* stack is void */
- 		if (!state->stack)
- 			break;
  
! 		/*
! 		 * child did not split, so we can check is it needed to update parent
! 		 * tuple
! 		 */
! 		if (!is_splitted)
! 		{
! 			/* parent's tuple */
! 			iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
! 			oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
! 			newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate);
! 
! 			if (!newtup)
! 			{					/* not need to update key */
! 				LockBuffer(state->stack->buffer, GIST_UNLOCK);
! 				break;
! 			}
  
! 			state->itup[0] = newtup;
  		}
! 	}							/* while */
  
! 	/* release all parent buffers */
! 	while (state->stack)
  	{
! 		ReleaseBuffer(state->stack->buffer);
! 		state->stack = state->stack->parent;
  	}
  
! 	/* say to xlog that insert is completed */
! 	if (state->needInsertComplete && RelationNeedsWAL(state->r))
! 		gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1);
  }
  
  /*
--- 1024,1254 ----
  
  		/* install new chain of parents to stack */
  		child->parent = parent;
  
  		/* make recursive call to normal processing */
+ 		LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
  		gistFindCorrectParent(r, child);
  	}
  
  	return;
  }
  
! /*
!  * Form a downlink pointer for the page in 'buf'.
!  */
! static IndexTuple
! gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
! 				 GISTInsertStack *stack)
  {
! 	Page page = BufferGetPage(buf);
! 	OffsetNumber maxoff;
! 	OffsetNumber offset;
! 	IndexTuple downlink = NULL;
  
! 	maxoff = PageGetMaxOffsetNumber(page);
! 	for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
  	{
! 		IndexTuple	ituple = (IndexTuple)
! 			PageGetItem(page, PageGetItemId(page, offset));
! 		if (downlink == NULL)
! 			downlink = CopyIndexTuple(ituple);
! 		else
  		{
! 			IndexTuple newdownlink;
! 			newdownlink = gistgetadjusted(rel, downlink, ituple,
! 										  giststate);
! 			if (newdownlink)
! 				downlink = newdownlink;
  		}
! 	}
! 
! 	/*
! 	 * If the page is completely empty, we can't form a meaningful
! 	 * downlink for it. But we have to insert a downlink for the page.
! 	 * Any key will do, as long as its consistent with the downlink of
! 	 * parent page, so that we can legally insert it to the parent.
! 	 * A minimal one that matches as few scans as possible would be best,
! 	 * to keep scans from doing useless work, but we don't know how to
! 	 * construct that. So we just use the downlink of the original page
! 	 * that was split - that's as far from optimal as it can get but will
! 	 * do..
! 	 */
! 	if (!downlink)
! 	{
! 		ItemId iid;
! 
! 		LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
! 		gistFindCorrectParent(rel, stack);
! 		iid = PageGetItemId(stack->parent->page, stack->parent->childoffnum);
! 		downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
! 		downlink = CopyIndexTuple(downlink);
! 		LockBuffer(stack->parent->buffer, GIST_UNLOCK);
! 	}
  
! 	ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
! 	GistTupleSetValid(downlink);
  
! 	return downlink;
! }
  
  
! /*
!  * Complete the incomplete split of state->stack->page.
!  */
! static void
! gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
! {
! 	GISTInsertStack *stack = state->stack;
! 	Buffer		buf;
! 	Page		page;
! 	List	   *splitinfo = NIL;
! 
! 	elog(LOG, "fixing incomplete split in index \"%s\", block %u",
! 		 RelationGetRelationName(state->r), stack->blkno);
  
! 	Assert(GistFollowRight(stack->page));
! 	Assert(OffsetNumberIsValid(stack->parent->childoffnum));
! 
! 	buf = stack->buffer;
! 
! 	/*
! 	 * Read the chain of split pages, following the rightlinks. Construct
! 	 * a downlink tuple for each page.
! 	 */
! 	for (;;)
! 	{
! 		GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
! 		IndexTuple downlink;
! 
! 		page = BufferGetPage(buf);
! 
! 		/* Form the new downlink tuples to insert to parent */
! 		downlink = gistformdownlink(state->r, buf, giststate, stack);
! 
! 		si->buf = buf;
! 		si->downlink = downlink;
! 
! 		splitinfo = lappend(splitinfo, si);
! 
! 		if (GistFollowRight(page))
! 		{
! 			/* lock next page */
! 			buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
! 			LockBuffer(buf, GIST_EXCLUSIVE);
  		}
! 		else
! 			break;
! 	}
! 
! 	/* Insert the downlinks */
! 	gistfinishsplit(state, stack, giststate, splitinfo);
! }
! 
! /*
!  * Insert tuples to stack->buffer. If 'oldoffnum' is valid, the new tuples
!  * replace an old tuple at oldoffnum. The caller must hold an exclusive lock
!  * on the page.
!  *
!  * If leftchild is valid, we're inserting/updating the downlink for the
!  * page to the right of leftchild. We clear the F_FOLLOW_RIGHT flag and
!  * update NSN on leftchild, atomically with the insertion of the downlink.
!  *
!  * Returns 'true' if the page had to be split. On return, we will continue
!  * to hold an exclusive lock on state->stack->buffer, but if we had to split
!  * the page, it might not contain the tuple we just inserted/updated.
!  */
! static bool
! gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
! 				 GISTSTATE *giststate,
! 				 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
! 				 Buffer leftchild)
! {
! 	List *splitinfo;
! 	bool is_split;
! 
! 	is_split = gistplacetopage(state, giststate, stack->buffer,
! 							   tuples, ntup, oldoffnum,
! 							   leftchild,
! 							   &splitinfo);
! 	if (splitinfo)
! 		gistfinishsplit(state, stack, giststate, splitinfo);
! 
! 	return is_split;
! }
! 
! /*
!  * Finish an incomplete split by inserting/updating the downlinks in
!  * parent page. 'splitinfo' contains all the child pages, exclusively-locked,
!  * involved in the split, from left-to-right.
!  */
! static void
! gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
! 				GISTSTATE *giststate, List *splitinfo)
! {
! 	ListCell *lc;
! 	List *reversed;
! 	GISTPageSplitInfo *right;
! 	GISTPageSplitInfo *left;
! 	IndexTuple tuples[2];
! 
! 	/* A split always contains at least two halves */
! 	Assert(list_length(splitinfo) >= 2);
! 
! 	/*
! 	 * We need to insert downlinks for each new page, and update the
! 	 * downlink for the original (leftmost) page in the split. Begin at
! 	 * the rightmost page, inserting one downlink at a time until there's
! 	 * only two pages left. Finally insert the downlink for the last new
! 	 * page and update the downlink for the original page as one operation.
! 	 */
  
! 	/* for convenience, create a copy of the list in reverse order */
! 	reversed = NIL;
! 	foreach(lc, splitinfo)
  	{
! 		reversed = lcons(lfirst(lc), reversed);
  	}
  
! 	LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
! 	gistFindCorrectParent(state->r, stack);
! 
! 	while(list_length(reversed) > 2)
! 	{
! 		right = (GISTPageSplitInfo *) linitial(reversed);
! 		left = (GISTPageSplitInfo *) lsecond(reversed);
! 
! 		if (gistinserttuples(state, stack->parent, giststate,
! 							 &right->downlink, 1,
! 							 InvalidOffsetNumber,
! 							 left->buf))
! 		{
! 			/*
! 			 * If the parent page was split, need to relocate the original
! 			 * parent pointer.
! 			 */
! 			gistFindCorrectParent(state->r, stack);
! 		}
! 		UnlockReleaseBuffer(right->buf);
! 		reversed = list_delete_first(reversed);
! 	}
! 
! 	right = (GISTPageSplitInfo *) linitial(reversed);
! 	left = (GISTPageSplitInfo *) lsecond(reversed);
! 
! 	/*
! 	 * Finally insert downlink for the remaining right page and update the
! 	 * downlink for the original page to not contain the tuples that were
! 	 * moved to the new pages.
! 	 */
! 	tuples[0] = left->downlink;
! 	tuples[1] = right->downlink;
! 	gistinserttuples(state, stack->parent, giststate,
! 					 tuples, 2,
! 					 stack->parent->childoffnum,
! 					 left->buf);
! 	LockBuffer(stack->parent->buffer, GIST_UNLOCK);
! 	UnlockReleaseBuffer(right->buf);
! 	Assert(left->buf == stack->buffer);
  }
  
  /*
***************
*** 963,970 **** gistSplit(Relation r,
  		ROTATEDIST(res);
  		res->block.num = v.splitVector.spl_nright;
  		res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
! 		res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false)
! 			: gist_form_invalid_tuple(GIST_ROOT_BLKNO);
  	}
  
  	if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
--- 1299,1305 ----
  		ROTATEDIST(res);
  		res->block.num = v.splitVector.spl_nright;
  		res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
! 		res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
  	}
  
  	if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
***************
*** 986,1036 **** gistSplit(Relation r,
  		ROTATEDIST(res);
  		res->block.num = v.splitVector.spl_nleft;
  		res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
! 		res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false)
! 			: gist_form_invalid_tuple(GIST_ROOT_BLKNO);
  	}
  
  	return res;
  }
  
  /*
-  * buffer must be pinned and locked by caller
-  */
- void
- gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key)
- {
- 	Page		page;
- 
- 	Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
- 	page = BufferGetPage(buffer);
- 
- 	START_CRIT_SECTION();
- 
- 	GISTInitBuffer(buffer, 0);
- 	gistfillbuffer(page, itup, len, FirstOffsetNumber);
- 
- 	MarkBufferDirty(buffer);
- 
- 	if (RelationNeedsWAL(r))
- 	{
- 		XLogRecPtr	recptr;
- 		XLogRecData *rdata;
- 
- 		rdata = formUpdateRdata(r->rd_node, buffer,
- 								NULL, 0,
- 								itup, len, key);
- 
- 		recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
- 		PageSetLSN(page, recptr);
- 		PageSetTLI(page, ThisTimeLineID);
- 	}
- 	else
- 		PageSetLSN(page, GetXLogRecPtrForTemp());
- 
- 	END_CRIT_SECTION();
- }
- 
- /*
   * Fill a GISTSTATE with information about the index
   */
  void
--- 1321,1333 ----
  		ROTATEDIST(res);
  		res->block.num = v.splitVector.spl_nleft;
  		res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
! 		res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
  	}
  
  	return res;
  }
  
  /*
   * Fill a GISTSTATE with information about the index
   */
  void
*** a/src/backend/access/gist/gistget.c
--- b/src/backend/access/gist/gistget.c
***************
*** 254,262 **** gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
  	page = BufferGetPage(buffer);
  	opaque = GistPageGetOpaque(page);
  
! 	/* check if page split occurred since visit to parent */
  	if (!XLogRecPtrIsInvalid(pageItem->data.parentlsn) &&
! 		XLByteLT(pageItem->data.parentlsn, opaque->nsn) &&
  		opaque->rightlink != InvalidBlockNumber /* sanity check */ )
  	{
  		/* There was a page split, follow right link to add pages */
--- 254,268 ----
  	page = BufferGetPage(buffer);
  	opaque = GistPageGetOpaque(page);
  
! 	/*
! 	 * Check if we need to follow the rightlink. We need to follow it if the
! 	 * page was concurrently split since we visited the parent (in which case
! 	 * parentlsn < nsn), or if the the system crashed after a page split but
! 	 * before the downlink was inserted into the parent.
! 	 */
  	if (!XLogRecPtrIsInvalid(pageItem->data.parentlsn) &&
! 		(GistFollowRight(page) ||
! 		 XLByteLT(pageItem->data.parentlsn, opaque->nsn)) &&
  		opaque->rightlink != InvalidBlockNumber /* sanity check */ )
  	{
  		/* There was a page split, follow right link to add pages */
*** a/src/backend/access/gist/gistsplit.c
--- b/src/backend/access/gist/gistsplit.c
***************
*** 500,557 **** gistSplitHalf(GIST_SPLITVEC *v, int len)
  }
  
  /*
-  * if it was invalid tuple then we need special processing.
-  * We move all invalid tuples on right page.
-  *
-  * if there is no place on left page, gistSplit will be called one more
-  * time for left page.
-  *
-  * Normally, we never exec this code, but after crash replay it's possible
-  * to get 'invalid' tuples (probability is low enough)
-  */
- static void
- gistSplitByInvalid(GISTSTATE *giststate, GistSplitVector *v, IndexTuple *itup, int len)
- {
- 	int			i;
- 	static OffsetNumber offInvTuples[MaxOffsetNumber];
- 	int			nOffInvTuples = 0;
- 
- 	for (i = 1; i <= len; i++)
- 		if (GistTupleIsInvalid(itup[i - 1]))
- 			offInvTuples[nOffInvTuples++] = i;
- 
- 	if (nOffInvTuples == len)
- 	{
- 		/* corner case, all tuples are invalid */
- 		v->spl_rightvalid = v->spl_leftvalid = false;
- 		gistSplitHalf(&v->splitVector, len);
- 	}
- 	else
- 	{
- 		GistSplitUnion gsvp;
- 
- 		v->splitVector.spl_right = offInvTuples;
- 		v->splitVector.spl_nright = nOffInvTuples;
- 		v->spl_rightvalid = false;
- 
- 		v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
- 		v->splitVector.spl_nleft = 0;
- 		for (i = 1; i <= len; i++)
- 			if (!GistTupleIsInvalid(itup[i - 1]))
- 				v->splitVector.spl_left[v->splitVector.spl_nleft++] = i;
- 		v->spl_leftvalid = true;
- 
- 		gsvp.equiv = NULL;
- 		gsvp.attr = v->spl_lattr;
- 		gsvp.len = v->splitVector.spl_nleft;
- 		gsvp.entries = v->splitVector.spl_left;
- 		gsvp.isnull = v->spl_lisnull;
- 
- 		gistunionsubkeyvec(giststate, itup, &gsvp, 0);
- 	}
- }
- 
- /*
   * trys to split page by attno key, in a case of null
   * values move its to separate page.
   */
--- 500,505 ----
***************
*** 568,579 **** gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *gist
  		Datum		datum;
  		bool		IsNull;
  
- 		if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
- 		{
- 			gistSplitByInvalid(giststate, v, itup, len);
- 			return;
- 		}
- 
  		datum = index_getattr(itup[i - 1], attno + 1, giststate->tupdesc, &IsNull);
  		gistdentryinit(giststate, attno, &(entryvec->vector[i]),
  					   datum, r, page, i,
--- 516,521 ----
***************
*** 582,589 **** gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *gist
  			offNullTuples[nOffNullTuples++] = i;
  	}
  
- 	v->spl_leftvalid = v->spl_rightvalid = true;
- 
  	if (nOffNullTuples == len)
  	{
  		/*
--- 524,529 ----
*** a/src/backend/access/gist/gistutil.c
--- b/src/backend/access/gist/gistutil.c
***************
*** 152,158 **** gistfillitupvec(IndexTuple *vec, int veclen, int *memlen)
   * invalid tuple. Resulting Datums aren't compressed.
   */
  
! bool
  gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
  				   Datum *attr, bool *isnull)
  {
--- 152,158 ----
   * invalid tuple. Resulting Datums aren't compressed.
   */
  
! void
  gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
  				   Datum *attr, bool *isnull)
  {
***************
*** 180,189 **** gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke
  			Datum		datum;
  			bool		IsNull;
  
- 			if (GistTupleIsInvalid(itvec[j]))
- 				return FALSE;	/* signals that union with invalid tuple =>
- 								 * result is invalid */
- 
  			datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
  			if (IsNull)
  				continue;
--- 180,185 ----
***************
*** 218,225 **** gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke
  			isnull[i] = FALSE;
  		}
  	}
- 
- 	return TRUE;
  }
  
  /*
--- 214,219 ----
***************
*** 231,238 **** gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
  {
  	memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts);
  
! 	if (!gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS))
! 		return gist_form_invalid_tuple(InvalidBlockNumber);
  
  	return gistFormTuple(giststate, r, attrS, isnullS, false);
  }
--- 225,231 ----
  {
  	memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts);
  
! 	gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS);
  
  	return gistFormTuple(giststate, r, attrS, isnullS, false);
  }
***************
*** 328,336 **** gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
  	IndexTuple	newtup = NULL;
  	int			i;
  
- 	if (GistTupleIsInvalid(oldtup) || GistTupleIsInvalid(addtup))
- 		return gist_form_invalid_tuple(ItemPointerGetBlockNumber(&(oldtup->t_tid)));
- 
  	gistDeCompressAtt(giststate, r, oldtup, NULL,
  					  (OffsetNumber) 0, oldentries, oldisnull);
  
--- 321,326 ----
***************
*** 401,414 **** gistchoose(Relation r, Page p, IndexTuple it,	/* it has compressed entry */
  		int			j;
  		IndexTuple	itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
  
- 		if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup))
- 		{
- 			ereport(LOG,
- 					(errmsg("index \"%s\" needs VACUUM or REINDEX to finish crash recovery",
- 							RelationGetRelationName(r))));
- 			continue;
- 		}
- 
  		sum_grow = 0;
  		for (j = 0; j < r->rd_att->natts; j++)
  		{
--- 391,396 ----
***************
*** 521,527 **** gistFormTuple(GISTSTATE *giststate, Relation r,
  	}
  
  	res = index_form_tuple(giststate->tupdesc, compatt, isnull);
! 	GistTupleSetValid(res);
  	return res;
  }
  
--- 503,513 ----
  	}
  
  	res = index_form_tuple(giststate->tupdesc, compatt, isnull);
! 	/*
! 	 * The offset number on tuples on internal pages is unused. For historical
! 	 * reasons, it is set 0xffff.
! 	 */
! 	ItemPointerSetOffsetNumber( &(res->t_tid), 0xffff);
  	return res;
  }
  
*** a/src/backend/access/gist/gistvacuum.c
--- b/src/backend/access/gist/gistvacuum.c
***************
*** 26,38 ****
  #include "utils/memutils.h"
  
  
- typedef struct GistBulkDeleteResult
- {
- 	IndexBulkDeleteResult std;	/* common state */
- 	bool		needReindex;
- } GistBulkDeleteResult;
- 
- 
  /*
   * VACUUM cleanup: update FSM
   */
--- 26,31 ----
***************
*** 40,46 **** Datum
  gistvacuumcleanup(PG_FUNCTION_ARGS)
  {
  	IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
! 	GistBulkDeleteResult *stats = (GistBulkDeleteResult *) PG_GETARG_POINTER(1);
  	Relation	rel = info->index;
  	BlockNumber npages,
  				blkno;
--- 33,39 ----
  gistvacuumcleanup(PG_FUNCTION_ARGS)
  {
  	IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
! 	IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
  	Relation	rel = info->index;
  	BlockNumber npages,
  				blkno;
***************
*** 56,65 **** gistvacuumcleanup(PG_FUNCTION_ARGS)
  	/* Set up all-zero stats if gistbulkdelete wasn't called */
  	if (stats == NULL)
  	{
! 		stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult));
  		/* use heap's tuple count */
! 		stats->std.num_index_tuples = info->num_heap_tuples;
! 		stats->std.estimated_count = info->estimated_count;
  
  		/*
  		 * XXX the above is wrong if index is partial.	Would it be OK to just
--- 49,58 ----
  	/* Set up all-zero stats if gistbulkdelete wasn't called */
  	if (stats == NULL)
  	{
! 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
  		/* use heap's tuple count */
! 		stats->num_index_tuples = info->num_heap_tuples;
! 		stats->estimated_count = info->estimated_count;
  
  		/*
  		 * XXX the above is wrong if index is partial.	Would it be OK to just
***************
*** 67,77 **** gistvacuumcleanup(PG_FUNCTION_ARGS)
  		 */
  	}
  
- 	if (stats->needReindex)
- 		ereport(NOTICE,
- 				(errmsg("index \"%s\" needs VACUUM FULL or REINDEX to finish crash recovery",
- 						RelationGetRelationName(rel))));
- 
  	/*
  	 * Need lock unless it's local to this backend.
  	 */
--- 60,65 ----
***************
*** 112,121 **** gistvacuumcleanup(PG_FUNCTION_ARGS)
  	IndexFreeSpaceMapVacuum(info->index);
  
  	/* return statistics */
! 	stats->std.pages_free = totFreePages;
  	if (needLock)
  		LockRelationForExtension(rel, ExclusiveLock);
! 	stats->std.num_pages = RelationGetNumberOfBlocks(rel);
  	if (needLock)
  		UnlockRelationForExtension(rel, ExclusiveLock);
  
--- 100,109 ----
  	IndexFreeSpaceMapVacuum(info->index);
  
  	/* return statistics */
! 	stats->pages_free = totFreePages;
  	if (needLock)
  		LockRelationForExtension(rel, ExclusiveLock);
! 	stats->num_pages = RelationGetNumberOfBlocks(rel);
  	if (needLock)
  		UnlockRelationForExtension(rel, ExclusiveLock);
  
***************
*** 135,141 **** pushStackIfSplited(Page page, GistBDItem *stack)
  	GISTPageOpaque opaque = GistPageGetOpaque(page);
  
  	if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) &&
! 		XLByteLT(stack->parentlsn, opaque->nsn) &&
  		opaque->rightlink != InvalidBlockNumber /* sanity check */ )
  	{
  		/* split page detected, install right link to the stack */
--- 123,129 ----
  	GISTPageOpaque opaque = GistPageGetOpaque(page);
  
  	if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) &&
! 		(GistFollowRight(page) || XLByteLT(stack->parentlsn, opaque->nsn)) &&
  		opaque->rightlink != InvalidBlockNumber /* sanity check */ )
  	{
  		/* split page detected, install right link to the stack */
***************
*** 162,168 **** Datum
  gistbulkdelete(PG_FUNCTION_ARGS)
  {
  	IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
! 	GistBulkDeleteResult *stats = (GistBulkDeleteResult *) PG_GETARG_POINTER(1);
  	IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
  	void	   *callback_state = (void *) PG_GETARG_POINTER(3);
  	Relation	rel = info->index;
--- 150,156 ----
  gistbulkdelete(PG_FUNCTION_ARGS)
  {
  	IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
! 	IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
  	IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
  	void	   *callback_state = (void *) PG_GETARG_POINTER(3);
  	Relation	rel = info->index;
***************
*** 171,180 **** gistbulkdelete(PG_FUNCTION_ARGS)
  
  	/* first time through? */
  	if (stats == NULL)
! 		stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult));
  	/* we'll re-count the tuples each time */
! 	stats->std.estimated_count = false;
! 	stats->std.num_index_tuples = 0;
  
  	stack = (GistBDItem *) palloc0(sizeof(GistBDItem));
  	stack->blkno = GIST_ROOT_BLKNO;
--- 159,168 ----
  
  	/* first time through? */
  	if (stats == NULL)
! 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
  	/* we'll re-count the tuples each time */
! 	stats->estimated_count = false;
! 	stats->num_index_tuples = 0;
  
  	stack = (GistBDItem *) palloc0(sizeof(GistBDItem));
  	stack->blkno = GIST_ROOT_BLKNO;
***************
*** 232,241 **** gistbulkdelete(PG_FUNCTION_ARGS)
  				{
  					todelete[ntodelete] = i - ntodelete;
  					ntodelete++;
! 					stats->std.tuples_removed += 1;
  				}
  				else
! 					stats->std.num_index_tuples += 1;
  			}
  
  			if (ntodelete)
--- 220,229 ----
  				{
  					todelete[ntodelete] = i - ntodelete;
  					ntodelete++;
! 					stats->tuples_removed += 1;
  				}
  				else
! 					stats->num_index_tuples += 1;
  			}
  
  			if (ntodelete)
***************
*** 250,271 **** gistbulkdelete(PG_FUNCTION_ARGS)
  
  				if (RelationNeedsWAL(rel))
  				{
- 					XLogRecData *rdata;
  					XLogRecPtr	recptr;
- 					gistxlogPageUpdate *xlinfo;
  
! 					rdata = formUpdateRdata(rel->rd_node, buffer,
  											todelete, ntodelete,
! 											NULL, 0,
! 											NULL);
! 					xlinfo = (gistxlogPageUpdate *) rdata->next->data;
! 
! 					recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
  					PageSetLSN(page, recptr);
  					PageSetTLI(page, ThisTimeLineID);
- 
- 					pfree(xlinfo);
- 					pfree(rdata);
  				}
  				else
  					PageSetLSN(page, GetXLogRecPtrForTemp());
--- 238,250 ----
  
  				if (RelationNeedsWAL(rel))
  				{
  					XLogRecPtr	recptr;
  
! 					recptr = gistXLogUpdate(rel->rd_node, buffer,
  											todelete, ntodelete,
! 											NULL, 0, InvalidBuffer);
  					PageSetLSN(page, recptr);
  					PageSetTLI(page, ThisTimeLineID);
  				}
  				else
  					PageSetLSN(page, GetXLogRecPtrForTemp());
***************
*** 293,299 **** gistbulkdelete(PG_FUNCTION_ARGS)
  				stack->next = ptr;
  
  				if (GistTupleIsInvalid(idxtuple))
! 					stats->needReindex = true;
  			}
  		}
  
--- 272,282 ----
  				stack->next = ptr;
  
  				if (GistTupleIsInvalid(idxtuple))
! 					ereport(LOG,
! 							(errmsg("index \"%s\" contains an inner tuple marked as invalid",
! 									RelationGetRelationName(rel)),
! 							 errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."),
! 							 errhint("Please REINDEX it.")));
  			}
  		}
  
*** a/src/backend/access/gist/gistxlog.c
--- b/src/backend/access/gist/gistxlog.c
***************
*** 20,34 ****
  #include "utils/memutils.h"
  #include "utils/rel.h"
  
- 
- typedef struct
- {
- 	gistxlogPageUpdate *data;
- 	int			len;
- 	IndexTuple *itup;
- 	OffsetNumber *todelete;
- } PageUpdateRecord;
- 
  typedef struct
  {
  	gistxlogPage *header;
--- 20,25 ----
***************
*** 41,184 **** typedef struct
  	NewPage    *page;
  } PageSplitRecord;
  
- /* track for incomplete inserts, idea was taken from nbtxlog.c */
- 
- typedef struct gistIncompleteInsert
- {
- 	RelFileNode node;
- 	BlockNumber origblkno;		/* for splits */
- 	ItemPointerData key;
- 	int			lenblk;
- 	BlockNumber *blkno;
- 	XLogRecPtr	lsn;
- 	BlockNumber *path;
- 	int			pathlen;
- } gistIncompleteInsert;
- 
- 
  static MemoryContext opCtx;		/* working memory for operations */
- static MemoryContext insertCtx; /* holds incomplete_inserts list */
- static List *incomplete_inserts;
- 
- 
- #define ItemPointerEQ(a, b) \
- 	( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
- 	  ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) )
- 
  
  static void
! pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
! 					 BlockNumber *blkno, int lenblk,
! 					 PageSplitRecord *xlinfo /* to extract blkno info */ )
  {
! 	MemoryContext oldCxt;
! 	gistIncompleteInsert *ninsert;
  
! 	if (!ItemPointerIsValid(&key))
  
  		/*
! 		 * if key is null then we should not store insertion as incomplete,
! 		 * because it's a vacuum operation..
  		 */
! 		return;
! 
! 	oldCxt = MemoryContextSwitchTo(insertCtx);
! 	ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
! 
! 	ninsert->node = node;
! 	ninsert->key = key;
! 	ninsert->lsn = lsn;
! 
! 	if (lenblk && blkno)
! 	{
! 		ninsert->lenblk = lenblk;
! 		ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
! 		memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk);
! 		ninsert->origblkno = *blkno;
! 	}
! 	else
! 	{
! 		int			i;
! 
! 		Assert(xlinfo);
! 		ninsert->lenblk = xlinfo->data->npage;
! 		ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
! 		for (i = 0; i < ninsert->lenblk; i++)
! 			ninsert->blkno[i] = xlinfo->page[i].header->blkno;
! 		ninsert->origblkno = xlinfo->data->origblkno;
! 	}
! 	Assert(ninsert->lenblk > 0);
! 
! 	/*
! 	 * Stick the new incomplete insert onto the front of the list, not the
! 	 * back.  This is so that gist_xlog_cleanup will process incompletions in
! 	 * last-in-first-out order.
! 	 */
! 	incomplete_inserts = lcons(ninsert, incomplete_inserts);
! 
! 	MemoryContextSwitchTo(oldCxt);
! }
! 
! static void
! forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
! {
! 	ListCell   *l;
! 
! 	if (!ItemPointerIsValid(&key))
! 		return;
! 
! 	if (incomplete_inserts == NIL)
! 		return;
! 
! 	foreach(l, incomplete_inserts)
! 	{
! 		gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
! 
! 		if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key)))
  		{
! 			/* found */
! 			incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
! 			pfree(insert->blkno);
! 			pfree(insert);
! 			break;
! 		}
! 	}
! }
  
! static void
! decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
! {
! 	char	   *begin = XLogRecGetData(record),
! 			   *ptr;
! 	int			i = 0,
! 				addpath = 0;
! 
! 	decoded->data = (gistxlogPageUpdate *) begin;
! 
! 	if (decoded->data->ntodelete)
! 	{
! 		decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath);
! 		addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete);
! 	}
! 	else
! 		decoded->todelete = NULL;
! 
! 	decoded->len = 0;
! 	ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
! 	while (ptr - begin < record->xl_len)
! 	{
! 		decoded->len++;
! 		ptr += IndexTupleSize((IndexTuple) ptr);
! 	}
! 
! 	decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len);
! 
! 	ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
! 	while (ptr - begin < record->xl_len)
! 	{
! 		decoded->itup[i] = (IndexTuple) ptr;
! 		ptr += IndexTupleSize(decoded->itup[i]);
! 		i++;
  	}
  }
  
--- 32,68 ----
  	NewPage    *page;
  } PageSplitRecord;
  
  static MemoryContext opCtx;		/* working memory for operations */
  
+ /*
+  * Replay the clearing of F_FOLLOW_RIGHT flag.
+  */
  static void
! gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn,
! 						 BlockNumber leftblkno)
  {
! 	Buffer buffer;
  
! 	buffer = XLogReadBuffer(node, leftblkno, false);
! 	if (BufferIsValid(buffer))
! 	{
! 		Page page = (Page) BufferGetPage(buffer);
  
  		/*
! 		 * Note that we still update the page even if page LSN is equal to the
! 		 * LSN of this record, because the updated NSN is not included in the
! 		 * full page image.
  		 */
! 		if (!XLByteLT(lsn, PageGetLSN(page)))
  		{
! 			GistPageGetOpaque(page)->nsn = lsn;
! 			GistClearFollowRight(page);
  
! 			PageSetLSN(page, lsn);
! 			PageSetTLI(page, ThisTimeLineID);
! 			MarkBufferDirty(buffer);
! 		}
! 		UnlockReleaseBuffer(buffer);
  	}
  }
  
***************
*** 186,214 **** decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
   * redo any page update (except page split)
   */
  static void
! gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
  {
! 	gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record);
! 	PageUpdateRecord xlrec;
  	Buffer		buffer;
  	Page		page;
  
! 	/* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */
! 	forgetIncompleteInsert(xldata->node, xldata->key);
  
! 	if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
! 		/* operation with root always finalizes insertion */
! 		pushIncompleteInsert(xldata->node, lsn, xldata->key,
! 							 &(xldata->blkno), 1,
! 							 NULL);
! 
! 	/* nothing else to do if page was backed up (and no info to do it with) */
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
! 	decodePageUpdateRecord(&xlrec, record);
! 
! 	buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false);
  	if (!BufferIsValid(buffer))
  		return;
  	page = (Page) BufferGetPage(buffer);
--- 70,91 ----
   * redo any page update (except page split)
   */
  static void
! gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
  {
! 	char	   *begin = XLogRecGetData(record);
! 	gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin;
  	Buffer		buffer;
  	Page		page;
+ 	char	   *data;
  
! 	if (BlockNumberIsValid(xldata->leftchild))
! 		gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
  
! 	/* nothing more to do if page was backed up (and no info to do it with) */
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
! 	buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
  	if (!BufferIsValid(buffer))
  		return;
  	page = (Page) BufferGetPage(buffer);
***************
*** 219,246 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
  		return;
  	}
  
! 	if (isnewroot)
! 		GISTInitBuffer(buffer, 0);
! 	else if (xlrec.data->ntodelete)
  	{
  		int			i;
  
! 		for (i = 0; i < xlrec.data->ntodelete; i++)
! 			PageIndexTupleDelete(page, xlrec.todelete[i]);
  		if (GistPageIsLeaf(page))
  			GistMarkTuplesDeleted(page);
  	}
  
  	/* add tuples */
! 	if (xlrec.len > 0)
! 		gistfillbuffer(page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
! 
! 	/*
! 	 * special case: leafpage, nothing to insert, nothing to delete, then
! 	 * vacuum marks page
! 	 */
! 	if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
! 		GistClearTuplesDeleted(page);
  
  	if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO)
  
--- 96,144 ----
  		return;
  	}
  
! 	data = begin + sizeof(gistxlogPageUpdate);
! 
! 	/* Delete old tuples */
! 	if (xldata->ntodelete > 0)
  	{
  		int			i;
+ 		OffsetNumber *todelete = (OffsetNumber *) data;
+ 		data += sizeof(OffsetNumber) * xldata->ntodelete;
  
! 		for (i = 0; i < xldata->ntodelete; i++)
! 			PageIndexTupleDelete(page, todelete[i]);
  		if (GistPageIsLeaf(page))
  			GistMarkTuplesDeleted(page);
  	}
  
  	/* add tuples */
! 	if (data - begin < record->xl_len)
! 	{
! 		OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
! 			OffsetNumberNext(PageGetMaxOffsetNumber(page));
! 		while (data - begin < record->xl_len)
! 		{
! 			IndexTuple itup = (IndexTuple) data;
! 			Size		sz = IndexTupleSize(itup);
! 			OffsetNumber l;
! 			data += sz;
! 
! 			l = PageAddItem(page, (Item) itup, sz, off, false, false);
! 			if (l == InvalidOffsetNumber)
! 				elog(ERROR, "failed to add item to GiST index page, size %d bytes",
! 					 (int) sz);
! 			off++;
! 		}
! 	}
! 	else
! 	{
! 		/*
! 		 * special case: leafpage, nothing to insert, nothing to delete, then
! 		 * vacuum marks page
! 		 */
! 		if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
! 			GistClearTuplesDeleted(page);
! 	}
  
  	if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO)
  
***************
*** 315,355 **** decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
  static void
  gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
  {
  	PageSplitRecord xlrec;
  	Buffer		buffer;
  	Page		page;
  	int			i;
! 	int			flags;
  
  	decodePageSplitRecord(&xlrec, record);
- 	flags = xlrec.data->origleaf ? F_LEAF : 0;
  
  	/* loop around all pages */
  	for (i = 0; i < xlrec.data->npage; i++)
  	{
  		NewPage    *newpage = xlrec.page + i;
  
  		buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
  		Assert(BufferIsValid(buffer));
  		page = (Page) BufferGetPage(buffer);
  
  		/* ok, clear buffer */
  		GISTInitBuffer(buffer, flags);
  
  		/* and fill it */
  		gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
  
  		PageSetLSN(page, lsn);
  		PageSetTLI(page, ThisTimeLineID);
  		MarkBufferDirty(buffer);
  		UnlockReleaseBuffer(buffer);
  	}
- 
- 	forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
- 
- 	pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
- 						 NULL, 0,
- 						 &xlrec);
  }
  
  static void
--- 213,279 ----
  static void
  gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
  {
+ 	gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
  	PageSplitRecord xlrec;
  	Buffer		buffer;
  	Page		page;
  	int			i;
! 	bool		isrootsplit = false;
  
+ 	if (BlockNumberIsValid(xldata->leftchild))
+ 		gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
  	decodePageSplitRecord(&xlrec, record);
  
  	/* loop around all pages */
  	for (i = 0; i < xlrec.data->npage; i++)
  	{
  		NewPage    *newpage = xlrec.page + i;
+ 		int			flags;
+ 
+ 		if (newpage->header->blkno == GIST_ROOT_BLKNO)
+ 		{
+ 			Assert(i == 0);
+ 			isrootsplit = true;
+ 		}
  
  		buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
  		Assert(BufferIsValid(buffer));
  		page = (Page) BufferGetPage(buffer);
  
  		/* ok, clear buffer */
+ 		if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO)
+ 			flags = F_LEAF;
+ 		else
+ 			flags = 0;
  		GISTInitBuffer(buffer, flags);
  
  		/* and fill it */
  		gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
  
+ 		if (newpage->header->blkno == GIST_ROOT_BLKNO)
+ 		{
+ 			GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
+ 			GistPageGetOpaque(page)->nsn = xldata->orignsn;
+ 			GistClearFollowRight(page);
+ 		}
+ 		else
+ 		{
+ 			if (i < xlrec.data->npage - 1)
+ 				GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
+ 			else
+ 				GistPageGetOpaque(page)->rightlink = xldata->origrlink;
+ 			GistPageGetOpaque(page)->nsn = xldata->orignsn;
+ 			if (i < xlrec.data->npage - 1 && !isrootsplit)
+ 				GistMarkFollowRight(page);
+ 			else
+ 				GistClearFollowRight(page);
+ 		}
+ 
  		PageSetLSN(page, lsn);
  		PageSetTLI(page, ThisTimeLineID);
  		MarkBufferDirty(buffer);
  		UnlockReleaseBuffer(buffer);
  	}
  }
  
  static void
***************
*** 372,395 **** gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
  	UnlockReleaseBuffer(buffer);
  }
  
- static void
- gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record)
- {
- 	char	   *begin = XLogRecGetData(record),
- 			   *ptr;
- 	gistxlogInsertComplete *xlrec;
- 
- 	xlrec = (gistxlogInsertComplete *) begin;
- 
- 	ptr = begin + sizeof(gistxlogInsertComplete);
- 	while (ptr - begin < record->xl_len)
- 	{
- 		Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData));
- 		forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr));
- 		ptr += sizeof(ItemPointerData);
- 	}
- }
- 
  void
  gist_redo(XLogRecPtr lsn, XLogRecord *record)
  {
--- 296,301 ----
***************
*** 401,430 **** gist_redo(XLogRecPtr lsn, XLogRecord *record)
  	 * implement a similar optimization we have in b-tree, and remove killed
  	 * tuples outside VACUUM, we'll need to handle that here.
  	 */
- 
  	RestoreBkpBlocks(lsn, record, false);
  
  	oldCxt = MemoryContextSwitchTo(opCtx);
  	switch (info)
  	{
  		case XLOG_GIST_PAGE_UPDATE:
! 			gistRedoPageUpdateRecord(lsn, record, false);
  			break;
  		case XLOG_GIST_PAGE_DELETE:
  			gistRedoPageDeleteRecord(lsn, record);
  			break;
- 		case XLOG_GIST_NEW_ROOT:
- 			gistRedoPageUpdateRecord(lsn, record, true);
- 			break;
  		case XLOG_GIST_PAGE_SPLIT:
  			gistRedoPageSplitRecord(lsn, record);
  			break;
  		case XLOG_GIST_CREATE_INDEX:
  			gistRedoCreateIndex(lsn, record);
  			break;
- 		case XLOG_GIST_INSERT_COMPLETE:
- 			gistRedoCompleteInsert(lsn, record);
- 			break;
  		default:
  			elog(PANIC, "gist_redo: unknown op code %u", info);
  	}
--- 307,329 ----
  	 * implement a similar optimization we have in b-tree, and remove killed
  	 * tuples outside VACUUM, we'll need to handle that here.
  	 */
  	RestoreBkpBlocks(lsn, record, false);
  
  	oldCxt = MemoryContextSwitchTo(opCtx);
  	switch (info)
  	{
  		case XLOG_GIST_PAGE_UPDATE:
! 			gistRedoPageUpdateRecord(lsn, record);
  			break;
  		case XLOG_GIST_PAGE_DELETE:
  			gistRedoPageDeleteRecord(lsn, record);
  			break;
  		case XLOG_GIST_PAGE_SPLIT:
  			gistRedoPageSplitRecord(lsn, record);
  			break;
  		case XLOG_GIST_CREATE_INDEX:
  			gistRedoCreateIndex(lsn, record);
  			break;
  		default:
  			elog(PANIC, "gist_redo: unknown op code %u", info);
  	}
***************
*** 434,453 **** gist_redo(XLogRecPtr lsn, XLogRecord *record)
  }
  
  static void
! out_target(StringInfo buf, RelFileNode node, ItemPointerData key)
  {
  	appendStringInfo(buf, "rel %u/%u/%u",
  					 node.spcNode, node.dbNode, node.relNode);
- 	if (ItemPointerIsValid(&key))
- 		appendStringInfo(buf, "; tid %u/%u",
- 						 ItemPointerGetBlockNumber(&key),
- 						 ItemPointerGetOffsetNumber(&key));
  }
  
  static void
  out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
  {
! 	out_target(buf, xlrec->node, xlrec->key);
  	appendStringInfo(buf, "; block number %u", xlrec->blkno);
  }
  
--- 333,348 ----
  }
  
  static void
! out_target(StringInfo buf, RelFileNode node)
  {
  	appendStringInfo(buf, "rel %u/%u/%u",
  					 node.spcNode, node.dbNode, node.relNode);
  }
  
  static void
  out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
  {
! 	out_target(buf, xlrec->node);
  	appendStringInfo(buf, "; block number %u", xlrec->blkno);
  }
  
***************
*** 463,469 **** static void
  out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
  {
  	appendStringInfo(buf, "page_split: ");
! 	out_target(buf, xlrec->node, xlrec->key);
  	appendStringInfo(buf, "; block number %u splits to %d pages",
  					 xlrec->origblkno, xlrec->npage);
  }
--- 358,364 ----
  out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
  {
  	appendStringInfo(buf, "page_split: ");
! 	out_target(buf, xlrec->node);
  	appendStringInfo(buf, "; block number %u splits to %d pages",
  					 xlrec->origblkno, xlrec->npage);
  }
***************
*** 482,491 **** gist_desc(StringInfo buf, uint8 xl_info, char *rec)
  		case XLOG_GIST_PAGE_DELETE:
  			out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
  			break;
- 		case XLOG_GIST_NEW_ROOT:
- 			appendStringInfo(buf, "new_root: ");
- 			out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
- 			break;
  		case XLOG_GIST_PAGE_SPLIT:
  			out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
  			break;
--- 377,382 ----
***************
*** 495,909 **** gist_desc(StringInfo buf, uint8 xl_info, char *rec)
  							 ((RelFileNode *) rec)->dbNode,
  							 ((RelFileNode *) rec)->relNode);
  			break;
- 		case XLOG_GIST_INSERT_COMPLETE:
- 			appendStringInfo(buf, "complete_insert: rel %u/%u/%u",
- 							 ((gistxlogInsertComplete *) rec)->node.spcNode,
- 							 ((gistxlogInsertComplete *) rec)->node.dbNode,
- 							 ((gistxlogInsertComplete *) rec)->node.relNode);
- 			break;
  		default:
  			appendStringInfo(buf, "unknown gist op code %u", info);
  			break;
  	}
  }
  
- IndexTuple
- gist_form_invalid_tuple(BlockNumber blkno)
- {
- 	/*
- 	 * we don't alloc space for null's bitmap, this is invalid tuple, be
- 	 * carefull in read and write code
- 	 */
- 	Size		size = IndexInfoFindDataOffset(0);
- 	IndexTuple	tuple = (IndexTuple) palloc0(size);
- 
- 	tuple->t_info |= size;
- 
- 	ItemPointerSetBlockNumber(&(tuple->t_tid), blkno);
- 	GistTupleSetInvalid(tuple);
- 
- 	return tuple;
- }
- 
- 
- static void
- gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
- {
- 	GISTInsertStack *top;
- 
- 	insert->pathlen = 0;
- 	insert->path = NULL;
- 
- 	if ((top = gistFindPath(index, insert->origblkno)) != NULL)
- 	{
- 		int			i;
- 		GISTInsertStack *ptr;
- 
- 		for (ptr = top; ptr; ptr = ptr->parent)
- 			insert->pathlen++;
- 
- 		insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen);
- 
- 		i = 0;
- 		for (ptr = top; ptr; ptr = ptr->parent)
- 			insert->path[i++] = ptr->blkno;
- 	}
- 	else
- 		elog(ERROR, "lost parent for block %u", insert->origblkno);
- }
- 
- static SplitedPageLayout *
- gistMakePageLayout(Buffer *buffers, int nbuffers)
- {
- 	SplitedPageLayout *res = NULL,
- 			   *resptr;
- 
- 	while (nbuffers-- > 0)
- 	{
- 		Page		page = BufferGetPage(buffers[nbuffers]);
- 		IndexTuple *vec;
- 		int			veclen;
- 
- 		resptr = (SplitedPageLayout *) palloc0(sizeof(SplitedPageLayout));
- 
- 		resptr->block.blkno = BufferGetBlockNumber(buffers[nbuffers]);
- 		resptr->block.num = PageGetMaxOffsetNumber(page);
- 
- 		vec = gistextractpage(page, &veclen);
- 		resptr->list = gistfillitupvec(vec, veclen, &(resptr->lenlist));
- 
- 		resptr->next = res;
- 		res = resptr;
- 	}
- 
- 	return res;
- }
- 
- /*
-  * Continue insert after crash.  In normal situations, there aren't any
-  * incomplete inserts, but if a crash occurs partway through an insertion
-  * sequence, we'll need to finish making the index valid at the end of WAL
-  * replay.
-  *
-  * Note that we assume the index is now in a valid state, except for the
-  * unfinished insertion.  In particular it's safe to invoke gistFindPath();
-  * there shouldn't be any garbage pages for it to run into.
-  *
-  * To complete insert we can't use basic insertion algorithm because
-  * during insertion we can't call user-defined support functions of opclass.
-  * So, we insert 'invalid' tuples without real key and do it by separate algorithm.
-  * 'invalid' tuple should be updated by vacuum full.
-  */
- static void
- gistContinueInsert(gistIncompleteInsert *insert)
- {
- 	IndexTuple *itup;
- 	int			i,
- 				lenitup;
- 	Relation	index;
- 
- 	index = CreateFakeRelcacheEntry(insert->node);
- 
- 	/*
- 	 * needed vector itup never will be more than initial lenblkno+2, because
- 	 * during this processing Indextuple can be only smaller
- 	 */
- 	lenitup = insert->lenblk;
- 	itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ ));
- 
- 	for (i = 0; i < insert->lenblk; i++)
- 		itup[i] = gist_form_invalid_tuple(insert->blkno[i]);
- 
- 	/*
- 	 * any insertion of itup[] should make LOG message about
- 	 */
- 
- 	if (insert->origblkno == GIST_ROOT_BLKNO)
- 	{
- 		/*
- 		 * it was split root, so we should only make new root. it can't be
- 		 * simple insert into root, we should replace all content of root.
- 		 */
- 		Buffer		buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true);
- 
- 		gistnewroot(index, buffer, itup, lenitup, NULL);
- 		UnlockReleaseBuffer(buffer);
- 	}
- 	else
- 	{
- 		Buffer	   *buffers;
- 		Page	   *pages;
- 		int			numbuffer;
- 		OffsetNumber *todelete;
- 
- 		/* construct path */
- 		gistxlogFindPath(index, insert);
- 
- 		Assert(insert->pathlen > 0);
- 
- 		buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ ));
- 		pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ ));
- 		todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ ));
- 
- 		for (i = 0; i < insert->pathlen; i++)
- 		{
- 			int			j,
- 						k,
- 						pituplen = 0;
- 			uint8		xlinfo;
- 			XLogRecData *rdata;
- 			XLogRecPtr	recptr;
- 			Buffer		tempbuffer = InvalidBuffer;
- 			int			ntodelete = 0;
- 
- 			numbuffer = 1;
- 			buffers[0] = ReadBuffer(index, insert->path[i]);
- 			LockBuffer(buffers[0], GIST_EXCLUSIVE);
- 
- 			/*
- 			 * we check buffer, because we restored page earlier
- 			 */
- 			gistcheckpage(index, buffers[0]);
- 
- 			pages[0] = BufferGetPage(buffers[0]);
- 			Assert(!GistPageIsLeaf(pages[0]));
- 
- 			pituplen = PageGetMaxOffsetNumber(pages[0]);
- 
- 			/* find remove old IndexTuples to remove */
- 			for (j = 0; j < pituplen && ntodelete < lenitup; j++)
- 			{
- 				BlockNumber blkno;
- 				ItemId		iid = PageGetItemId(pages[0], j + FirstOffsetNumber);
- 				IndexTuple	idxtup = (IndexTuple) PageGetItem(pages[0], iid);
- 
- 				blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid));
- 
- 				for (k = 0; k < lenitup; k++)
- 					if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno)
- 					{
- 						todelete[ntodelete] = j + FirstOffsetNumber - ntodelete;
- 						ntodelete++;
- 						break;
- 					}
- 			}
- 
- 			if (ntodelete == 0)
- 				elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)");
- 
- 			/*
- 			 * we check space with subtraction only first tuple to delete,
- 			 * hope, that wiil be enough space....
- 			 */
- 
- 			if (gistnospace(pages[0], itup, lenitup, *todelete, 0))
- 			{
- 
- 				/* no space left on page, so we must split */
- 				buffers[numbuffer] = ReadBuffer(index, P_NEW);
- 				LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
- 				GISTInitBuffer(buffers[numbuffer], 0);
- 				pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
- 				gistfillbuffer(pages[numbuffer], itup, lenitup, FirstOffsetNumber);
- 				numbuffer++;
- 
- 				if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
- 				{
- 					Buffer		tmp;
- 
- 					/*
- 					 * we split root, just copy content from root to new page
- 					 */
- 
- 					/* sanity check */
- 					if (i + 1 != insert->pathlen)
- 						elog(PANIC, "unexpected pathlen in index \"%s\"",
- 							 RelationGetRelationName(index));
- 
- 					/* fill new page, root will be changed later */
- 					tempbuffer = ReadBuffer(index, P_NEW);
- 					LockBuffer(tempbuffer, GIST_EXCLUSIVE);
- 					memcpy(BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer));
- 
- 					/* swap buffers[0] (was root) and temp buffer */
- 					tmp = buffers[0];
- 					buffers[0] = tempbuffer;
- 					tempbuffer = tmp;	/* now in tempbuffer GIST_ROOT_BLKNO,
- 										 * it is still unchanged */
- 
- 					pages[0] = BufferGetPage(buffers[0]);
- 				}
- 
- 				START_CRIT_SECTION();
- 
- 				for (j = 0; j < ntodelete; j++)
- 					PageIndexTupleDelete(pages[0], todelete[j]);
- 
- 				xlinfo = XLOG_GIST_PAGE_SPLIT;
- 				rdata = formSplitRdata(index->rd_node, insert->path[i],
- 									   false, &(insert->key),
- 									 gistMakePageLayout(buffers, numbuffer));
- 
- 			}
- 			else
- 			{
- 				START_CRIT_SECTION();
- 
- 				for (j = 0; j < ntodelete; j++)
- 					PageIndexTupleDelete(pages[0], todelete[j]);
- 				gistfillbuffer(pages[0], itup, lenitup, InvalidOffsetNumber);
- 
- 				xlinfo = XLOG_GIST_PAGE_UPDATE;
- 				rdata = formUpdateRdata(index->rd_node, buffers[0],
- 										todelete, ntodelete,
- 										itup, lenitup, &(insert->key));
- 			}
- 
- 			/*
- 			 * use insert->key as mark for completion of insert (form*Rdata()
- 			 * above) for following possible replays
- 			 */
- 
- 			/* write pages, we should mark it dirty befor XLogInsert() */
- 			for (j = 0; j < numbuffer; j++)
- 			{
- 				GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
- 				MarkBufferDirty(buffers[j]);
- 			}
- 			recptr = XLogInsert(RM_GIST_ID, xlinfo, rdata);
- 			for (j = 0; j < numbuffer; j++)
- 			{
- 				PageSetLSN(pages[j], recptr);
- 				PageSetTLI(pages[j], ThisTimeLineID);
- 			}
- 
- 			END_CRIT_SECTION();
- 
- 			lenitup = numbuffer;
- 			for (j = 0; j < numbuffer; j++)
- 			{
- 				itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
- 				UnlockReleaseBuffer(buffers[j]);
- 			}
- 
- 			if (tempbuffer != InvalidBuffer)
- 			{
- 				/*
- 				 * it was a root split, so fill it by new values
- 				 */
- 				gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key));
- 				UnlockReleaseBuffer(tempbuffer);
- 			}
- 		}
- 	}
- 
- 	FreeFakeRelcacheEntry(index);
- 
- 	ereport(LOG,
- 			(errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
- 			insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
- 		   errdetail("Incomplete insertion detected during crash replay.")));
- }
- 
  void
  gist_xlog_startup(void)
  {
- 	incomplete_inserts = NIL;
- 	insertCtx = AllocSetContextCreate(CurrentMemoryContext,
- 									  "GiST recovery temporary context",
- 									  ALLOCSET_DEFAULT_MINSIZE,
- 									  ALLOCSET_DEFAULT_INITSIZE,
- 									  ALLOCSET_DEFAULT_MAXSIZE);
  	opCtx = createTempGistContext();
  }
  
  void
  gist_xlog_cleanup(void)
  {
- 	ListCell   *l;
- 	MemoryContext oldCxt;
- 
- 	oldCxt = MemoryContextSwitchTo(opCtx);
- 
- 	foreach(l, incomplete_inserts)
- 	{
- 		gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
- 
- 		gistContinueInsert(insert);
- 		MemoryContextReset(opCtx);
- 	}
- 	MemoryContextSwitchTo(oldCxt);
- 
  	MemoryContextDelete(opCtx);
- 	MemoryContextDelete(insertCtx);
- }
- 
- bool
- gist_safe_restartpoint(void)
- {
- 	if (incomplete_inserts)
- 		return false;
- 	return true;
  }
  
! 
! XLogRecData *
! formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
! 			   ItemPointer key, SplitedPageLayout *dist)
  {
  	XLogRecData *rdata;
! 	gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit));
  	SplitedPageLayout *ptr;
  	int			npage = 0,
! 				cur = 1;
  
! 	ptr = dist;
! 	while (ptr)
! 	{
  		npage++;
- 		ptr = ptr->next;
- 	}
  
  	rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
  
! 	xlrec->node = node;
! 	xlrec->origblkno = blkno;
! 	xlrec->origleaf = page_is_leaf;
! 	xlrec->npage = (uint16) npage;
! 	if (key)
! 		xlrec->key = *key;
! 	else
! 		ItemPointerSetInvalid(&(xlrec->key));
  
! 	rdata[0].buffer = InvalidBuffer;
! 	rdata[0].data = (char *) xlrec;
  	rdata[0].len = sizeof(gistxlogPageSplit);
! 	rdata[0].next = NULL;
  
! 	ptr = dist;
! 	while (ptr)
  	{
  		rdata[cur].buffer = InvalidBuffer;
  		rdata[cur].data = (char *) &(ptr->block);
  		rdata[cur].len = sizeof(gistxlogPage);
- 		rdata[cur - 1].next = &(rdata[cur]);
  		cur++;
  
  		rdata[cur].buffer = InvalidBuffer;
  		rdata[cur].data = (char *) (ptr->list);
  		rdata[cur].len = ptr->lenlist;
- 		rdata[cur - 1].next = &(rdata[cur]);
- 		rdata[cur].next = NULL;
  		cur++;
- 		ptr = ptr->next;
  	}
  
! 	return rdata;
  }
  
  /*
!  * Construct the rdata array for an XLOG record describing a page update
!  * (deletion and/or insertion of tuples on a single index page).
   *
   * Note that both the todelete array and the tuples are marked as belonging
   * to the target buffer; they need not be stored in XLOG if XLogInsert decides
--- 386,487 ----
  							 ((RelFileNode *) rec)->dbNode,
  							 ((RelFileNode *) rec)->relNode);
  			break;
  		default:
  			appendStringInfo(buf, "unknown gist op code %u", info);
  			break;
  	}
  }
  
  void
  gist_xlog_startup(void)
  {
  	opCtx = createTempGistContext();
  }
  
  void
  gist_xlog_cleanup(void)
  {
  	MemoryContextDelete(opCtx);
  }
  
! /*
!  * Write WAL record of a page split.
!  */
! XLogRecPtr
! gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
! 			  SplitedPageLayout *dist,
! 			  BlockNumber origrlink, GistNSN orignsn,
! 			  Buffer leftchildbuf)
  {
  	XLogRecData *rdata;
! 	gistxlogPageSplit xlrec;
  	SplitedPageLayout *ptr;
  	int			npage = 0,
! 				cur;
! 	XLogRecPtr recptr;
  
! 	for (ptr = dist; ptr; ptr = ptr->next)
  		npage++;
  
  	rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
  
! 	xlrec.node = node;
! 	xlrec.origblkno = blkno;
! 	xlrec.origrlink = origrlink;
! 	xlrec.orignsn = orignsn;
! 	xlrec.origleaf = page_is_leaf;
! 	xlrec.npage = (uint16) npage;
! 	xlrec.leftchild =
! 		BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
  
! 	rdata[0].data = (char *) &xlrec;
  	rdata[0].len = sizeof(gistxlogPageSplit);
! 	rdata[0].buffer = InvalidBuffer;
  
! 	cur = 1;
! 
! 	/*
! 	 * Include a full page image of the child buf. (only necessary if a
! 	 * checkpoint happened since the child page was split)
! 	 */
! 	if (BufferIsValid(leftchildbuf))
  	{
+ 		rdata[cur - 1].next = &(rdata[cur]);
+ 		rdata[cur].data = NULL;
+ 		rdata[cur].len = 0;
+ 		rdata[cur].buffer = leftchildbuf;
+ 		rdata[cur].buffer_std = true;
+ 		cur++;
+ 	}
+ 
+ 	for (ptr = dist; ptr; ptr = ptr->next)
+ 	{
+ 		rdata[cur - 1].next = &(rdata[cur]);
  		rdata[cur].buffer = InvalidBuffer;
  		rdata[cur].data = (char *) &(ptr->block);
  		rdata[cur].len = sizeof(gistxlogPage);
  		cur++;
  
+ 		rdata[cur - 1].next = &(rdata[cur]);
  		rdata[cur].buffer = InvalidBuffer;
  		rdata[cur].data = (char *) (ptr->list);
  		rdata[cur].len = ptr->lenlist;
  		cur++;
  	}
+ 	rdata[cur - 1].next = NULL;
+ 
+ 	recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
  
! 	pfree(rdata);
! 	return recptr;
  }
  
  /*
!  * Write XLOG record describing a page update. The update can include any
!  * number of deletions and/or insertions of tuples on a single index page.
!  *
!  * If this update inserts a downlink for a split page, also record that
!  * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set.
   *
   * Note that both the todelete array and the tuples are marked as belonging
   * to the target buffer; they need not be stored in XLOG if XLogInsert decides
***************
*** 911,937 **** formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
   * at least one rdata item referencing the buffer, even when ntodelete and
   * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
   */
! XLogRecData *
! formUpdateRdata(RelFileNode node, Buffer buffer,
! 				OffsetNumber *todelete, int ntodelete,
! 				IndexTuple *itup, int ituplen, ItemPointer key)
  {
  	XLogRecData *rdata;
  	gistxlogPageUpdate *xlrec;
  	int			cur,
  				i;
  
! 	rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
  	xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
  
  	xlrec->node = node;
  	xlrec->blkno = BufferGetBlockNumber(buffer);
  	xlrec->ntodelete = ntodelete;
! 
! 	if (key)
! 		xlrec->key = *key;
! 	else
! 		ItemPointerSetInvalid(&(xlrec->key));
  
  	rdata[0].buffer = buffer;
  	rdata[0].buffer_std = true;
--- 489,514 ----
   * at least one rdata item referencing the buffer, even when ntodelete and
   * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
   */
! XLogRecPtr
! gistXLogUpdate(RelFileNode node, Buffer buffer,
! 			   OffsetNumber *todelete, int ntodelete,
! 			   IndexTuple *itup, int ituplen,
! 			   Buffer leftchildbuf)
  {
  	XLogRecData *rdata;
  	gistxlogPageUpdate *xlrec;
  	int			cur,
  				i;
+ 	XLogRecPtr	recptr;
  
! 	rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen));
  	xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
  
  	xlrec->node = node;
  	xlrec->blkno = BufferGetBlockNumber(buffer);
  	xlrec->ntodelete = ntodelete;
! 	xlrec->leftchild =
! 		BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
  
  	rdata[0].buffer = buffer;
  	rdata[0].buffer_std = true;
***************
*** 945,957 **** formUpdateRdata(RelFileNode node, Buffer buffer,
  	rdata[1].next = &(rdata[2]);
  
  	rdata[2].data = (char *) todelete;
! 	rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
  	rdata[2].buffer = buffer;
  	rdata[2].buffer_std = true;
- 	rdata[2].next = NULL;
  
- 	/* new tuples */
  	cur = 3;
  	for (i = 0; i < ituplen; i++)
  	{
  		rdata[cur - 1].next = &(rdata[cur]);
--- 522,534 ----
  	rdata[1].next = &(rdata[2]);
  
  	rdata[2].data = (char *) todelete;
! 	rdata[2].len = sizeof(OffsetNumber) * ntodelete;
  	rdata[2].buffer = buffer;
  	rdata[2].buffer_std = true;
  
  	cur = 3;
+ 
+ 	/* new tuples */
  	for (i = 0; i < ituplen; i++)
  	{
  		rdata[cur - 1].next = &(rdata[cur]);
***************
*** 959,996 **** formUpdateRdata(RelFileNode node, Buffer buffer,
  		rdata[cur].len = IndexTupleSize(itup[i]);
  		rdata[cur].buffer = buffer;
  		rdata[cur].buffer_std = true;
- 		rdata[cur].next = NULL;
  		cur++;
  	}
  
! 	return rdata;
! }
! 
! XLogRecPtr
! gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len)
! {
! 	gistxlogInsertComplete xlrec;
! 	XLogRecData rdata[2];
! 	XLogRecPtr	recptr;
! 
! 	Assert(len > 0);
! 	xlrec.node = node;
! 
! 	rdata[0].buffer = InvalidBuffer;
! 	rdata[0].data = (char *) &xlrec;
! 	rdata[0].len = sizeof(gistxlogInsertComplete);
! 	rdata[0].next = &(rdata[1]);
! 
! 	rdata[1].buffer = InvalidBuffer;
! 	rdata[1].data = (char *) keys;
! 	rdata[1].len = sizeof(ItemPointerData) * len;
! 	rdata[1].next = NULL;
! 
! 	START_CRIT_SECTION();
! 
! 	recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata);
  
! 	END_CRIT_SECTION();
  
  	return recptr;
  }
--- 536,561 ----
  		rdata[cur].len = IndexTupleSize(itup[i]);
  		rdata[cur].buffer = buffer;
  		rdata[cur].buffer_std = true;
  		cur++;
  	}
  
! 	/*
! 	 * Include a full page image of the child buf. (only necessary if
! 	 * a checkpoint happened since the child page was split)
! 	 */
! 	if (BufferIsValid(leftchildbuf))
! 	{
! 		rdata[cur - 1].next = &(rdata[cur]);
! 		rdata[cur].data = NULL;
! 		rdata[cur].len = 0;
! 		rdata[cur].buffer = leftchildbuf;
! 		rdata[cur].buffer_std = true;
! 		cur++;
! 	}
! 	rdata[cur - 1].next = NULL;
  
! 	recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
  
+ 	pfree(rdata);
  	return recptr;
  }
*** a/src/backend/access/transam/rmgr.c
--- b/src/backend/access/transam/rmgr.c
***************
*** 40,45 **** const RmgrData RmgrTable[RM_MAX_ID + 1] = {
  	{"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint},
  	{"Hash", hash_redo, hash_desc, NULL, NULL, NULL},
  	{"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint},
! 	{"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, gist_safe_restartpoint},
  	{"Sequence", seq_redo, seq_desc, NULL, NULL, NULL}
  };
--- 40,45 ----
  	{"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint},
  	{"Hash", hash_redo, hash_desc, NULL, NULL, NULL},
  	{"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint},
! 	{"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL},
  	{"Sequence", seq_redo, seq_desc, NULL, NULL, NULL}
  };
*** a/src/include/access/gist.h
--- b/src/include/access/gist.h
***************
*** 58,66 ****
  /*
   * Page opaque data in a GiST index page.
   */
! #define F_LEAF				(1 << 0)
! #define F_DELETED			(1 << 1)
! #define F_TUPLES_DELETED	(1 << 2)
  
  typedef XLogRecPtr GistNSN;
  
--- 58,67 ----
  /*
   * Page opaque data in a GiST index page.
   */
! #define F_LEAF				(1 << 0)	/* leaf page */
! #define F_DELETED			(1 << 1)	/* the page has been deleted */
! #define F_TUPLES_DELETED	(1 << 2)	/* some tuples on the page are dead */
! #define F_FOLLOW_RIGHT		(1 << 3)	/* page to the right has no downlink */
  
  typedef XLogRecPtr GistNSN;
  
***************
*** 132,137 **** typedef struct GISTENTRY
--- 133,142 ----
  #define GistMarkTuplesDeleted(page) ( GistPageGetOpaque(page)->flags |= F_TUPLES_DELETED)
  #define GistClearTuplesDeleted(page)	( GistPageGetOpaque(page)->flags &= ~F_TUPLES_DELETED)
  
+ #define GistFollowRight(page) ( GistPageGetOpaque(page)->flags & F_FOLLOW_RIGHT)
+ #define GistMarkFollowRight(page) ( GistPageGetOpaque(page)->flags |= F_FOLLOW_RIGHT)
+ #define GistClearFollowRight(page)	( GistPageGetOpaque(page)->flags &= ~F_FOLLOW_RIGHT)
+ 
  /*
   * Vector of GISTENTRY structs; user-defined methods union and picksplit
   * take it as one of their arguments
*** a/src/include/access/gist_private.h
--- b/src/include/access/gist_private.h
***************
*** 132,140 **** typedef GISTScanOpaqueData *GISTScanOpaque;
  /* XLog stuff */
  
  #define XLOG_GIST_PAGE_UPDATE		0x00
! #define XLOG_GIST_NEW_ROOT			0x20
  #define XLOG_GIST_PAGE_SPLIT		0x30
! #define XLOG_GIST_INSERT_COMPLETE	0x40
  #define XLOG_GIST_CREATE_INDEX		0x50
  #define XLOG_GIST_PAGE_DELETE		0x60
  
--- 132,140 ----
  /* XLog stuff */
  
  #define XLOG_GIST_PAGE_UPDATE		0x00
! /* #define XLOG_GIST_NEW_ROOT			0x20 */ /* not used anymore */
  #define XLOG_GIST_PAGE_SPLIT		0x30
! /* #define XLOG_GIST_INSERT_COMPLETE	0x40 */ /* not used anymore */
  #define XLOG_GIST_CREATE_INDEX		0x50
  #define XLOG_GIST_PAGE_DELETE		0x60
  
***************
*** 144,152 **** typedef struct gistxlogPageUpdate
  	BlockNumber blkno;
  
  	/*
! 	 * It used to identify completeness of insert. Sets to leaf itup
  	 */
! 	ItemPointerData key;
  
  	/* number of deleted offsets */
  	uint16		ntodelete;
--- 144,153 ----
  	BlockNumber blkno;
  
  	/*
! 	 * If this operation completes a page split, by inserting a downlink for
! 	 * the split page, leftchild points to the left half of the split.
  	 */
! 	BlockNumber	leftchild;
  
  	/* number of deleted offsets */
  	uint16		ntodelete;
***************
*** 160,170 **** typedef struct gistxlogPageSplit
  {
  	RelFileNode node;
  	BlockNumber origblkno;		/* splitted page */
  	bool		origleaf;		/* was splitted page a leaf page? */
- 	uint16		npage;
  
! 	/* see comments on gistxlogPageUpdate */
! 	ItemPointerData key;
  
  	/*
  	 * follow: 1. gistxlogPage and array of IndexTupleData per page
--- 161,172 ----
  {
  	RelFileNode node;
  	BlockNumber origblkno;		/* splitted page */
+ 	BlockNumber	origrlink;		/* rightlink of the page before split */
+ 	GistNSN		orignsn;		/* NSN of the page before split */
  	bool		origleaf;		/* was splitted page a leaf page? */
  
! 	BlockNumber leftchild;		/* like in gistxlogPageUpdate */
! 	uint16		npage;			/* # of pages in the split */
  
  	/*
  	 * follow: 1. gistxlogPage and array of IndexTupleData per page
***************
*** 177,188 **** typedef struct gistxlogPage
  	int			num;			/* number of index tuples following */
  } gistxlogPage;
  
- typedef struct gistxlogInsertComplete
- {
- 	RelFileNode node;
- 	/* follows ItemPointerData key to clean */
- } gistxlogInsertComplete;
- 
  typedef struct gistxlogPageDelete
  {
  	RelFileNode node;
--- 179,184 ----
***************
*** 206,212 **** typedef struct SplitedPageLayout
   * GISTInsertStack used for locking buffers and transfer arguments during
   * insertion
   */
- 
  typedef struct GISTInsertStack
  {
  	/* current page */
--- 202,207 ----
***************
*** 215,221 **** typedef struct GISTInsertStack
  	Page		page;
  
  	/*
! 	 * log sequence number from page->lsn to recognize page update	and
  	 * compare it with page's nsn to recognize page split
  	 */
  	GistNSN		lsn;
--- 210,216 ----
  	Page		page;
  
  	/*
! 	 * log sequence number from page->lsn to recognize page update and
  	 * compare it with page's nsn to recognize page split
  	 */
  	GistNSN		lsn;
***************
*** 223,231 **** typedef struct GISTInsertStack
  	/* child's offset */
  	OffsetNumber childoffnum;
  
! 	/* pointer to parent and child */
  	struct GISTInsertStack *parent;
- 	struct GISTInsertStack *child;
  
  	/* for gistFindPath */
  	struct GISTInsertStack *next;
--- 218,225 ----
  	/* child's offset */
  	OffsetNumber childoffnum;
  
! 	/* pointer to parent */
  	struct GISTInsertStack *parent;
  
  	/* for gistFindPath */
  	struct GISTInsertStack *next;
***************
*** 238,249 **** typedef struct GistSplitVector
  	Datum		spl_lattr[INDEX_MAX_KEYS];		/* Union of subkeys in
  												 * spl_left */
  	bool		spl_lisnull[INDEX_MAX_KEYS];
- 	bool		spl_leftvalid;
  
  	Datum		spl_rattr[INDEX_MAX_KEYS];		/* Union of subkeys in
  												 * spl_right */
  	bool		spl_risnull[INDEX_MAX_KEYS];
- 	bool		spl_rightvalid;
  
  	bool	   *spl_equiv;		/* equivalent tuples which can be freely
  								 * distributed between left and right pages */
--- 232,241 ----
***************
*** 252,279 **** typedef struct GistSplitVector
  typedef struct
  {
  	Relation	r;
- 	IndexTuple *itup;			/* in/out, points to compressed entry */
- 	int			ituplen;		/* length of itup */
  	Size		freespace;		/* free space to be left */
- 	GISTInsertStack *stack;
- 	bool		needInsertComplete;
  
! 	/* pointer to heap tuple */
! 	ItemPointerData key;
  } GISTInsertState;
  
  /* root page of a gist index */
  #define GIST_ROOT_BLKNO				0
  
  /*
!  * mark tuples on inner pages during recovery
   */
  #define TUPLE_IS_VALID		0xffff
  #define TUPLE_IS_INVALID	0xfffe
  
  #define  GistTupleIsInvalid(itup)	( ItemPointerGetOffsetNumber( &((itup)->t_tid) ) == TUPLE_IS_INVALID )
  #define  GistTupleSetValid(itup)	ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID )
- #define  GistTupleSetInvalid(itup)	ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_INVALID )
  
  /* gist.c */
  extern Datum gistbuild(PG_FUNCTION_ARGS);
--- 244,283 ----
  typedef struct
  {
  	Relation	r;
  	Size		freespace;		/* free space to be left */
  
! 	GISTInsertStack *stack;
  } GISTInsertState;
  
  /* root page of a gist index */
  #define GIST_ROOT_BLKNO				0
  
  /*
!  * Before PostgreSQL 9.1, we used rely on so-called "invalid tuples" on inner
!  * pages to finish crash recovery of incomplete page splits. If a crash
!  * happened in the middle of a page split, so that the downlink pointers were
!  * not yet inserted, crash recovery inserted a special downlink pointer. The
!  * semantics of an invalid tuple was that it if you encounter one in a scan,
!  * it must always be followed, because we don't know if the tuples on the
!  * child page match or not.
!  *
!  * We no longer create such invalid tuples, we now mark the left-half of such
!  * an incomplete split with the F_FOLLOW_RIGHT flag instead, and finish the
!  * split properly the next time we need to insert on that page. To retain
!  * on-disk compatibility for the sake of pg_upgrade, we still store 0xffff as
!  * the offset number of all inner tuples. If we encounter any invalid tuples
!  * with 0xfffe during insertion, we throw an error, though scans still handle
!  * them. You should only encounter invalid tuples if you pg_upgrade a pre-9.1
!  * gist index which already has invalid tuples in it because of a crash. That
!  * should be rare, and you are recommended to REINDEX anyway if you have any
!  * invalid tuples in an index, so throwing an error is as far as we go with
!  * supporting that.
   */
  #define TUPLE_IS_VALID		0xffff
  #define TUPLE_IS_INVALID	0xfffe
  
  #define  GistTupleIsInvalid(itup)	( ItemPointerGetOffsetNumber( &((itup)->t_tid) ) == TUPLE_IS_INVALID )
  #define  GistTupleSetValid(itup)	ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID )
  
  /* gist.c */
  extern Datum gistbuild(PG_FUNCTION_ARGS);
***************
*** 281,288 **** extern Datum gistinsert(PG_FUNCTION_ARGS);
  extern MemoryContext createTempGistContext(void);
  extern void initGISTstate(GISTSTATE *giststate, Relation index);
  extern void freeGISTstate(GISTSTATE *giststate);
- extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate);
- extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key);
  
  extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup,
  		  int len, GISTSTATE *giststate);
--- 285,290 ----
***************
*** 294,311 **** extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
  extern void gist_desc(StringInfo buf, uint8 xl_info, char *rec);
  extern void gist_xlog_startup(void);
  extern void gist_xlog_cleanup(void);
- extern bool gist_safe_restartpoint(void);
- extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
- 
- extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer,
- 				OffsetNumber *todelete, int ntodelete,
- 				IndexTuple *itup, int ituplen, ItemPointer key);
  
! extern XLogRecData *formSplitRdata(RelFileNode node,
! 			   BlockNumber blkno, bool page_is_leaf,
! 			   ItemPointer key, SplitedPageLayout *dist);
  
! extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len);
  
  /* gistget.c */
  extern Datum gistgettuple(PG_FUNCTION_ARGS);
--- 296,312 ----
  extern void gist_desc(StringInfo buf, uint8 xl_info, char *rec);
  extern void gist_xlog_startup(void);
  extern void gist_xlog_cleanup(void);
  
! extern XLogRecPtr gistXLogUpdate(RelFileNode node, Buffer buffer,
! 			   OffsetNumber *todelete, int ntodelete,
! 								 IndexTuple *itup, int ntup,
! 			   Buffer leftchild);
  
! extern XLogRecPtr gistXLogSplit(RelFileNode node,
! 			  BlockNumber blkno, bool page_is_leaf,
! 			  SplitedPageLayout *dist,
! 			  BlockNumber origrlink, GistNSN oldnsn,
! 			  Buffer leftchild);
  
  /* gistget.c */
  extern Datum gistgettuple(PG_FUNCTION_ARGS);
***************
*** 357,363 **** extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
  extern float gistpenalty(GISTSTATE *giststate, int attno,
  			GISTENTRY *key1, bool isNull1,
  			GISTENTRY *key2, bool isNull2);
! extern bool gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
  				   Datum *attr, bool *isnull);
  extern bool gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b);
  extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
--- 358,364 ----
  extern float gistpenalty(GISTSTATE *giststate, int attno,
  			GISTENTRY *key1, bool isNull1,
  			GISTENTRY *key2, bool isNull2);
! extern void gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
  				   Datum *attr, bool *isnull);
  extern bool gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b);
  extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
