>From 040ebdd9dd7025c4e0fdccef05975848da3d2ee0 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 20 Nov 2013 23:11:23 +0200
Subject: [PATCH 1/2] More GIN refactoring.

Separate the insertion payload from the more static portions of GinBtree in
the gin b-tree functions.

Add root block number to GinBtree, instead of passing it around all the
functions as argument.

Split off ginFinishSplit() from ginInsertValue(), which is responsible for
finding the parent and inserting the downlink to it.
---
 src/backend/access/gin/ginbtree.c     | 157 +++++++++++++++++++++------------
 src/backend/access/gin/gindatapage.c  | 159 ++++++++++++++++++----------------
 src/backend/access/gin/ginentrypage.c |  81 +++++++++--------
 src/backend/access/gin/ginget.c       |   2 +-
 src/backend/access/gin/gininsert.c    |  11 ++-
 src/backend/access/gin/ginxlog.c      |  22 ++---
 src/include/access/gin_private.h      |  39 ++++++---
 7 files changed, 272 insertions(+), 199 deletions(-)

diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c
index 7248b06..3c352e9 100644
--- a/src/backend/access/gin/ginbtree.c
+++ b/src/backend/access/gin/ginbtree.c
@@ -63,13 +63,13 @@ ginTraverseLock(Buffer buffer, bool searchMode)
  * is share-locked, and stack->parent is NULL.
  */
 GinBtreeStack *
-ginFindLeafPage(GinBtree btree, BlockNumber rootBlkno, bool searchMode)
+ginFindLeafPage(GinBtree btree, bool searchMode)
 {
 	GinBtreeStack *stack;
 
 	stack = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
-	stack->blkno = rootBlkno;
-	stack->buffer = ReadBuffer(btree->index, rootBlkno);
+	stack->blkno = btree->rootBlkno;
+	stack->buffer = ReadBuffer(btree->index, btree->rootBlkno);
 	stack->parent = NULL;
 	stack->predictNumber = 1;
 
@@ -89,7 +89,7 @@ ginFindLeafPage(GinBtree btree, BlockNumber rootBlkno, bool searchMode)
 		 * ok, page is correctly locked, we should check to move right ..,
 		 * root never has a right link, so small optimization
 		 */
-		while (btree->fullScan == FALSE && stack->blkno != rootBlkno &&
+		while (btree->fullScan == FALSE && stack->blkno != btree->rootBlkno &&
 			   btree->isMoveRight(btree, page))
 		{
 			BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
@@ -189,8 +189,7 @@ freeGinBtreeStack(GinBtreeStack *stack)
  * with vacuum process
  */
 void
-ginFindParents(GinBtree btree, GinBtreeStack *stack,
-			   BlockNumber rootBlkno)
+ginFindParents(GinBtree btree, GinBtreeStack *stack)
 {
 	Page		page;
 	Buffer		buffer;
@@ -204,8 +203,8 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
 	{
 		/* XLog mode... */
 		root = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
-		root->blkno = rootBlkno;
-		root->buffer = ReadBuffer(btree->index, rootBlkno);
+		root->blkno = btree->rootBlkno;
+		root->buffer = ReadBuffer(btree->index, btree->rootBlkno);
 		LockBuffer(root->buffer, GIN_EXCLUSIVE);
 		root->parent = NULL;
 	}
@@ -221,8 +220,8 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
 			root = root->parent;
 		}
 
-		Assert(root->blkno == rootBlkno);
-		Assert(BufferGetBlockNumber(root->buffer) == rootBlkno);
+		Assert(root->blkno == btree->rootBlkno);
+		Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
 		LockBuffer(root->buffer, GIN_EXCLUSIVE);
 	}
 	root->off = InvalidOffsetNumber;
@@ -268,7 +267,7 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
 			ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
 			ptr->blkno = blkno;
 			ptr->buffer = buffer;
-			ptr->parent = root; /* it's may be wrong, but in next call we will
+			ptr->parent = root; /* it may be wrong, but in next call we will
 								 * correct */
 			ptr->off = offset;
 			stack->parent = ptr;
@@ -280,21 +279,35 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
 }
 
 /*
- * Returns true if the insertion is done, false if the page was split and
- * downlink insertion is pending.
+ * Insert a new item to a page.
+ *
+ * Returns true if the insertion was finished. On false, the page was split and
+ * the parent needs to be updated. (a root update returns true as it doesn't
+ * need any further action by the caller to complete)
+ *
+ * When inserting a downlink to a internal page, the existing item at the
+ * given location is updated to point to 'updateblkno'.
  *
  * stack->buffer is locked on entry, and is kept locked.
  */
 static bool
-ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
+ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
+			   void *insertdata, BlockNumber updateblkno,
 			   GinStatsData *buildStats)
 {
 	Page		page = BufferGetPage(stack->buffer);
 	XLogRecData *rdata;
 	bool		fit;
 
+	/*
+	 * Try to put the incoming tuple on the page. If it doesn't fit,
+	 * placeToPage method will return false and leave the page unmodified,
+	 * and we'll have to split the page.
+	 */
 	START_CRIT_SECTION();
-	fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata);
+	fit = btree->placeToPage(btree, stack->buffer, stack->off,
+							 insertdata, updateblkno,
+							 &rdata);
 	if (fit)
 	{
 		MarkBufferDirty(stack->buffer);
@@ -324,6 +337,14 @@ ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
 		END_CRIT_SECTION();
 
 		rbuffer = GinNewBuffer(btree->index);
+		/* During index build, count the new page */
+		if (buildStats)
+		{
+			if (btree->isData)
+				buildStats->nDataPages++;
+			else
+				buildStats->nEntryPages++;
+		}
 
 		savedRightLink = GinPageGetOpaque(page)->rightlink;
 
@@ -331,18 +352,11 @@ ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
 		 * newlpage is a pointer to memory page, it is not associated with
 		 * a buffer. stack->buffer is not touched yet.
 		 */
-		newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata);
+		newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off,
+									insertdata, updateblkno,
+									&rdata);
 
-		((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno;
-
-		/* During index build, count the newly-split page */
-		if (buildStats)
-		{
-			if (btree->isData)
-				buildStats->nDataPages++;
-			else
-				buildStats->nEntryPages++;
-		}
+		((ginxlogSplit *) (rdata->data))->rootBlkno = btree->rootBlkno;
 
 		parent = stack->parent;
 
@@ -354,6 +368,15 @@ ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
 			 */
 			Buffer		lbuffer = GinNewBuffer(btree->index);
 
+			/* During index build, count the new page */
+			if (buildStats)
+			{
+				if (btree->isData)
+					buildStats->nDataPages++;
+				else
+					buildStats->nEntryPages++;
+			}
+
 			((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
 			((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
 
@@ -434,46 +457,27 @@ ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
 }
 
 /*
- * Insert value (stored in GinBtree) to tree described by stack
+ * Finish a split by inserting the downlink for the new page to parent.
  *
- * During an index build, buildStats is non-null and the counters
- * it contains are incremented as needed.
+ * On entry, stack->buffer is exclusively locked.
  *
  * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
  */
 void
-ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
+ginFinishSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
 {
-	GinBtreeStack *parent;
-	BlockNumber rootBlkno;
 	Page		page;
-
-	/* extract root BlockNumber from stack */
-	Assert(stack != NULL);
-	parent = stack;
-	while (parent->parent)
-		parent = parent->parent;
-	rootBlkno = parent->blkno;
-	Assert(BlockNumberIsValid(rootBlkno));
+	bool		done;
 
 	/* this loop crawls up the stack until the insertion is complete */
-	for (;;)
+	do
 	{
-		bool done;
-
-		done = ginPlaceToPage(btree, rootBlkno, stack, buildStats);
-
-		/* just to be extra sure we don't delete anything by accident... */
-		btree->isDelete = FALSE;
-
-		if (done)
-		{
-			LockBuffer(stack->buffer, GIN_UNLOCK);
-			freeGinBtreeStack(stack);
-			break;
-		}
+		GinBtreeStack *parent = stack->parent;
+		void	   *insertdata;
+		BlockNumber updateblkno;
 
-		btree->prepareDownlink(btree, stack->buffer);
+		insertdata = btree->prepareDownlink(btree, stack->buffer);
+		updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
 
 		/* search parent to lock */
 		LockBuffer(parent->buffer, GIN_EXCLUSIVE);
@@ -491,7 +495,7 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
 				 * plain search...
 				 */
 				LockBuffer(parent->buffer, GIN_UNLOCK);
-				ginFindParents(btree, stack, rootBlkno);
+				ginFindParents(btree, stack);
 				parent = stack->parent;
 				Assert(parent != NULL);
 				break;
@@ -502,8 +506,49 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
 			page = BufferGetPage(parent->buffer);
 		}
 
+		/* release the child */
 		UnlockReleaseBuffer(stack->buffer);
 		pfree(stack);
 		stack = parent;
+
+		/* insert the downlink to parent */
+		done = ginPlaceToPage(btree, stack,
+							  insertdata, updateblkno,
+							  buildStats);
+		pfree(insertdata);
+	} while (!done);
+	LockBuffer(stack->buffer, GIN_UNLOCK);
+
+	/* free the rest of the stack */
+	freeGinBtreeStack(stack);
+}
+
+/*
+ * Insert a value to tree described by stack.
+ *
+ * The value to be inserted is given in 'insertdata'. Its format depends
+ * on whether this is an entry or data tree, ginInsertValue just passes it
+ * through to the tree-specific callback function.
+ *
+ * During an index build, buildStats is non-null and the counters
+ * it contains are incremented as needed.
+ *
+ * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
+ */
+void
+ginInsertValue(GinBtree btree, GinBtreeStack *stack, void *insertdata,
+			   GinStatsData *buildStats)
+{
+	bool		done;
+
+	done = ginPlaceToPage(btree, stack,
+						  insertdata, InvalidBlockNumber,
+						  buildStats);
+	if (done)
+	{
+		LockBuffer(stack->buffer, GIN_UNLOCK);
+		freeGinBtreeStack(stack);
 	}
+	else
+		ginFinishSplit(btree, stack, buildStats);
 }
diff --git a/src/backend/access/gin/gindatapage.c b/src/backend/access/gin/gindatapage.c
index fbdde1d..647d383 100644
--- a/src/backend/access/gin/gindatapage.c
+++ b/src/backend/access/gin/gindatapage.c
@@ -30,7 +30,7 @@ dataIsMoveRight(GinBtree btree, Page page)
 	if (GinPageRightMost(page))
 		return FALSE;
 
-	return (ginCompareItemPointers(btree->items + btree->curitem, iptr) > 0) ? TRUE : FALSE;
+	return (ginCompareItemPointers(&btree->itemptr, iptr) > 0) ? TRUE : FALSE;
 }
 
 /*
@@ -80,7 +80,7 @@ dataLocateItem(GinBtree btree, GinBtreeStack *stack)
 		else
 		{
 			pitem = GinDataPageGetPostingItem(page, mid);
-			result = ginCompareItemPointers(btree->items + btree->curitem, &(pitem->key));
+			result = ginCompareItemPointers(&btree->itemptr, &(pitem->key));
 		}
 
 		if (result == 0)
@@ -138,7 +138,7 @@ dataLocateLeafItem(GinBtree btree, GinBtreeStack *stack)
 	{
 		OffsetNumber mid = low + ((high - low) / 2);
 
-		result = ginCompareItemPointers(btree->items + btree->curitem,
+		result = ginCompareItemPointers(&btree->itemptr,
 										GinDataPageGetItemPointer(page, mid));
 
 		if (result == 0)
@@ -298,18 +298,18 @@ GinPageDeletePostingItem(Page page, OffsetNumber offset)
  * item pointer never deletes!
  */
 static bool
-dataIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
+dataIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off, void *insertdata)
 {
 	Page		page = BufferGetPage(buf);
 
 	Assert(GinPageIsData(page));
-	Assert(!btree->isDelete);
 
 	if (GinPageIsLeaf(page))
 	{
+		GinBtreeDataLeafInsertData *items = insertdata;
 		if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
 		{
-			if ((btree->nitem - btree->curitem) * sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
+			if ((items->nitem - items->curitem) * sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
 				return true;
 		}
 		else if (sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
@@ -322,42 +322,21 @@ dataIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
 }
 
 /*
- * In case of previous split update old child blkno to
- * new right page
- * item pointer never deletes!
- */
-static BlockNumber
-dataPrepareData(GinBtree btree, Page page, OffsetNumber off)
-{
-	BlockNumber ret = InvalidBlockNumber;
-
-	Assert(GinPageIsData(page));
-
-	if (!GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
-	{
-		PostingItem *pitem = GinDataPageGetPostingItem(page, off);
-
-		PostingItemSetBlockNumber(pitem, btree->rightblkno);
-		ret = btree->rightblkno;
-	}
-
-	btree->rightblkno = InvalidBlockNumber;
-
-	return ret;
-}
-
-/*
  * Places keys to page and fills WAL record. In case leaf page and
  * build mode puts all ItemPointers to page.
  *
  * If none of the keys fit, returns false without modifying the page.
+ *
+ * On an insertion to an internal node, in addition to inserting the given
+ * item, the downlink of the existing item at 'off' is updated to point to
+ * 'updateblkno'.
  */
 static bool
 dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
+				void *insertdata, BlockNumber updateblkno,
 				XLogRecData **prdata)
 {
 	Page		page = BufferGetPage(buf);
-	int			sizeofitem = GinSizeOfDataPageItem(page);
 	int			cnt = 0;
 
 	/* these must be static so they can be returned to caller */
@@ -365,14 +344,20 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
 	static ginxlogInsert data;
 
 	/* quick exit if it doesn't fit */
-	if (!dataIsEnoughSpace(btree, buf, off))
+	if (!dataIsEnoughSpace(btree, buf, off, insertdata))
 		return false;
 
 	*prdata = rdata;
 	Assert(GinPageIsData(page));
 
-	data.updateBlkno = dataPrepareData(btree, page, off);
+	/* Update existing downlink to point to next page (on internal page) */
+	if (!GinPageIsLeaf(page))
+	{
+		PostingItem *pitem = GinDataPageGetPostingItem(page, off);
+		PostingItemSetBlockNumber(pitem, updateblkno);
+	}
 
+	data.updateBlkno = updateblkno;
 	data.node = btree->index->rd_node;
 	data.blkno = BufferGetBlockNumber(buf);
 	data.offset = off;
@@ -405,34 +390,42 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
 	cnt++;
 
 	rdata[cnt].buffer = InvalidBuffer;
-	rdata[cnt].data = (GinPageIsLeaf(page)) ? ((char *) (btree->items + btree->curitem)) : ((char *) &(btree->pitem));
-	rdata[cnt].len = sizeofitem;
+	/* data and len filled in below */
 	rdata[cnt].next = NULL;
 
 	if (GinPageIsLeaf(page))
 	{
+		GinBtreeDataLeafInsertData *items = insertdata;
+		uint32		savedPos = items->curitem;
+
 		if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
 		{
 			/* usually, create index... */
-			uint32		savedPos = btree->curitem;
-
-			while (btree->curitem < btree->nitem)
+			while (items->curitem < items->nitem)
 			{
-				GinDataPageAddItemPointer(page, btree->items + btree->curitem, off);
+				GinDataPageAddItemPointer(page, items->items + items->curitem, off);
 				off++;
-				btree->curitem++;
+				items->curitem++;
 			}
-			data.nitem = btree->curitem - savedPos;
-			rdata[cnt].len = sizeofitem * data.nitem;
+			data.nitem = items->curitem - savedPos;
 		}
 		else
 		{
-			GinDataPageAddItemPointer(page, btree->items + btree->curitem, off);
-			btree->curitem++;
+			GinDataPageAddItemPointer(page, items->items + items->curitem, off);
+			items->curitem++;
 		}
+
+		rdata[cnt].data = (char *) &items->items[savedPos];
+		rdata[cnt].len = sizeof(ItemPointerData) * data.nitem;
 	}
 	else
-		GinDataPageAddPostingItem(page, &(btree->pitem), off);
+	{
+		PostingItem *pitem = insertdata;
+		GinDataPageAddPostingItem(page, pitem, off);
+
+		rdata[cnt].data = (char *) pitem;
+		rdata[cnt].len = sizeof(PostingItem);
+	}
 
 	return true;
 }
@@ -444,7 +437,8 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
  * left page
  */
 static Page
-dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata)
+dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
+			  void *insertdata, BlockNumber updateblkno, XLogRecData **prdata)
 {
 	char	   *ptr;
 	OffsetNumber separator;
@@ -457,7 +451,6 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
 	Page		rpage = BufferGetPage(rbuf);
 	Size		pageSize = PageGetPageSize(lpage);
 	Size		freeSpace;
-	uint32		nCopied = 1;
 
 	/* these must be static so they can be returned to caller */
 	static ginxlogSplit data;
@@ -468,9 +461,13 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
 	freeSpace = GinDataPageGetFreeSpace(rpage);
 
 	*prdata = rdata;
-	data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
-		InvalidOffsetNumber : PostingItemGetBlockNumber(&(btree->pitem));
-	data.updateBlkno = dataPrepareData(btree, lpage, off);
+
+	/* Update existing downlink to point to next page (on internal page) */
+	if (!isleaf)
+	{
+		PostingItem *pitem = GinDataPageGetPostingItem(lpage, off);
+		PostingItemSetBlockNumber(pitem, updateblkno);
+	}
 
 	if (isleaf)
 	{
@@ -487,16 +484,15 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
 
 	if (isleaf && GinPageRightMost(lpage) && off > GinPageGetOpaque(lpage)->maxoff)
 	{
-		nCopied = 0;
-		while (btree->curitem < btree->nitem &&
+		GinBtreeDataLeafInsertData *items = insertdata;
+		while (items->curitem < items->nitem &&
 			   maxoff * sizeof(ItemPointerData) < 2 * (freeSpace - sizeof(ItemPointerData)))
 		{
 			memcpy(vector + maxoff * sizeof(ItemPointerData),
-				   btree->items + btree->curitem,
+				   items->items + items->curitem,
 				   sizeof(ItemPointerData));
 			maxoff++;
-			nCopied++;
-			btree->curitem++;
+			items->curitem++;
 		}
 	}
 	else
@@ -506,11 +502,15 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
 			memmove(ptr + sizeofitem, ptr, (maxoff - off + 1) * sizeofitem);
 		if (isleaf)
 		{
-			memcpy(ptr, btree->items + btree->curitem, sizeofitem);
-			btree->curitem++;
+			GinBtreeDataLeafInsertData *items = insertdata;
+			memcpy(ptr, items->items + items->curitem, sizeofitem);
+			items->curitem++;
 		}
 		else
-			memcpy(ptr, &(btree->pitem), sizeofitem);
+		{
+			PostingItem *pitem = insertdata;
+			memcpy(ptr, pitem, sizeofitem);
+		}
 
 		maxoff++;
 	}
@@ -584,16 +584,18 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
 }
 
 /*
- * Prepare the state in 'btree' for inserting a downlink for given buffer.
+ * Construct insertion payload for inserting the downlink for given buffer.
  */
-static void
+static void *
 dataPrepareDownlink(GinBtree btree, Buffer lbuf)
 {
+	PostingItem	*pitem = palloc(sizeof(PostingItem));
 	Page		lpage = BufferGetPage(lbuf);
 
-	PostingItemSetBlockNumber(&(btree->pitem), BufferGetBlockNumber(lbuf));
-	btree->pitem.key = *GinDataPageGetRightBound(lpage);
-	btree->rightblkno = GinPageGetOpaque(lpage)->rightlink;
+	PostingItemSetBlockNumber(pitem, BufferGetBlockNumber(lbuf));
+	pitem->key = *GinDataPageGetRightBound(lpage);
+
+	return pitem;
 }
 
 /*
@@ -698,11 +700,12 @@ createPostingTree(Relation index, ItemPointerData *items, uint32 nitems,
 }
 
 void
-ginPrepareDataScan(GinBtree btree, Relation index)
+ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno)
 {
 	memset(btree, 0, sizeof(GinBtreeData));
 
 	btree->index = index;
+	btree->rootBlkno = rootBlkno;
 
 	btree->findChildPage = dataLocateItem;
 	btree->getLeftMostChild = dataGetLeftMostPage;
@@ -715,7 +718,6 @@ ginPrepareDataScan(GinBtree btree, Relation index)
 	btree->prepareDownlink = dataPrepareDownlink;
 
 	btree->isData = TRUE;
-	btree->isDelete = FALSE;
 	btree->fullScan = FALSE;
 	btree->isBuild = FALSE;
 }
@@ -729,29 +731,32 @@ ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
 					  GinStatsData *buildStats)
 {
 	GinBtreeData btree;
+	GinBtreeDataLeafInsertData insertdata;
 	GinBtreeStack *stack;
 
-	ginPrepareDataScan(&btree, index);
+	ginPrepareDataScan(&btree, index, rootBlkno);
 	btree.isBuild = (buildStats != NULL);
-	btree.items = items;
-	btree.nitem = nitem;
-	btree.curitem = 0;
+	insertdata.items = items;
+	insertdata.nitem = nitem;
+	insertdata.curitem = 0;
 
-	while (btree.curitem < btree.nitem)
+	while (insertdata.curitem < insertdata.nitem)
 	{
-		stack = ginFindLeafPage(&btree, rootBlkno, false);
+		/* search for the leaf page where the first item should go to */
+		btree.itemptr = insertdata.items[insertdata.curitem];
+		stack = ginFindLeafPage(&btree, false);
 
 		if (btree.findItem(&btree, stack))
 		{
 			/*
-			 * btree.items[btree.curitem] already exists in index
+			 * Current item already exists in index.
 			 */
-			btree.curitem++;
+			insertdata.curitem++;
 			LockBuffer(stack->buffer, GIN_UNLOCK);
 			freeGinBtreeStack(stack);
 		}
 		else
-			ginInsertValue(&btree, stack, buildStats);
+			ginInsertValue(&btree, stack, &insertdata, buildStats);
 	}
 }
 
@@ -764,11 +769,11 @@ ginScanBeginPostingTree(Relation index, BlockNumber rootBlkno)
 	GinBtreeData btree;
 	GinBtreeStack *stack;
 
-	ginPrepareDataScan(&btree, index);
+	ginPrepareDataScan(&btree, index, rootBlkno);
 
 	btree.fullScan = TRUE;
 
-	stack = ginFindLeafPage(&btree, rootBlkno, TRUE);
+	stack = ginFindLeafPage(&btree, TRUE);
 
 	return stack;
 }
diff --git a/src/backend/access/gin/ginentrypage.c b/src/backend/access/gin/ginentrypage.c
index ec0114e..56cbf8d 100644
--- a/src/backend/access/gin/ginentrypage.c
+++ b/src/backend/access/gin/ginentrypage.c
@@ -431,22 +431,23 @@ entryGetLeftMostPage(GinBtree btree, Page page)
 }
 
 static bool
-entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
+entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off,
+				   GinBtreeEntryInsertData *entry)
 {
 	Size		itupsz = 0;
 	Page		page = BufferGetPage(buf);
 
-	Assert(btree->entry);
+	Assert(entry->entry);
 	Assert(!GinPageIsData(page));
 
-	if (btree->isDelete)
+	if (entry->isDelete)
 	{
 		IndexTuple	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
 
 		itupsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
 	}
 
-	if (PageGetFreeSpace(page) + itupsz >= MAXALIGN(IndexTupleSize(btree->entry)) + sizeof(ItemIdData))
+	if (PageGetFreeSpace(page) + itupsz >= MAXALIGN(IndexTupleSize(entry->entry)) + sizeof(ItemIdData))
 		return true;
 
 	return false;
@@ -457,42 +458,42 @@ entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
  * should update it, update old child blkno to new right page
  * if child split occurred
  */
-static BlockNumber
-entryPreparePage(GinBtree btree, Page page, OffsetNumber off)
+static void
+entryPreparePage(GinBtree btree, Page page, OffsetNumber off,
+				 GinBtreeEntryInsertData *entry, BlockNumber updateblkno)
 {
-	BlockNumber ret = InvalidBlockNumber;
-
-	Assert(btree->entry);
+	Assert(entry->entry);
 	Assert(!GinPageIsData(page));
 
-	if (btree->isDelete)
+	if (entry->isDelete)
 	{
 		Assert(GinPageIsLeaf(page));
 		PageIndexTupleDelete(page, off);
 	}
 
-	if (!GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
+	if (!GinPageIsLeaf(page) && updateblkno != InvalidBlockNumber)
 	{
 		IndexTuple	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
 
-		GinSetDownlink(itup, btree->rightblkno);
-		ret = btree->rightblkno;
+		GinSetDownlink(itup, updateblkno);
 	}
-
-	btree->rightblkno = InvalidBlockNumber;
-
-	return ret;
 }
 
 /*
  * Place tuple on page and fills WAL record
  *
  * If the tuple doesn't fit, returns false without modifying the page.
+ *
+ * On an insertion to an internal node, in addition to inserting the given
+ * item, the downlink of the existing item at 'off' is updated to point to
+ * 'updateblkno'.
  */
 static bool
 entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
+				 void *insertdata, BlockNumber updateblkno,
 				 XLogRecData **prdata)
 {
+	GinBtreeEntryInsertData *entry = insertdata;
 	Page		page = BufferGetPage(buf);
 	OffsetNumber placed;
 	int			cnt = 0;
@@ -502,13 +503,14 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
 	static ginxlogInsert data;
 
 	/* quick exit if it doesn't fit */
-	if (!entryIsEnoughSpace(btree, buf, off))
+	if (!entryIsEnoughSpace(btree, buf, off, entry))
 		return false;
 
 	*prdata = rdata;
-	data.updateBlkno = entryPreparePage(btree, page, off);
+	entryPreparePage(btree, page, off, entry, updateblkno);
+	data.updateBlkno = updateblkno;
 
-	placed = PageAddItem(page, (Item) btree->entry, IndexTupleSize(btree->entry), off, false, false);
+	placed = PageAddItem(page, (Item) entry->entry, IndexTupleSize(entry->entry), off, false, false);
 	if (placed != off)
 		elog(ERROR, "failed to add item to index page in \"%s\"",
 			 RelationGetRelationName(btree->index));
@@ -517,7 +519,7 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
 	data.blkno = BufferGetBlockNumber(buf);
 	data.offset = off;
 	data.nitem = 1;
-	data.isDelete = btree->isDelete;
+	data.isDelete = entry->isDelete;
 	data.isData = false;
 	data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
 
@@ -545,12 +547,10 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
 	cnt++;
 
 	rdata[cnt].buffer = InvalidBuffer;
-	rdata[cnt].data = (char *) btree->entry;
-	rdata[cnt].len = IndexTupleSize(btree->entry);
+	rdata[cnt].data = (char *) entry->entry;
+	rdata[cnt].len = IndexTupleSize(entry->entry);
 	rdata[cnt].next = NULL;
 
-	btree->entry = NULL;
-
 	return true;
 }
 
@@ -561,8 +561,10 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
  * an equal number!
  */
 static Page
-entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata)
+entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
+			   void *insertdata, BlockNumber updateblkno, XLogRecData **prdata)
 {
+	GinBtreeEntryInsertData *entry = insertdata;
 	OffsetNumber i,
 				maxoff,
 				separator = InvalidOffsetNumber;
@@ -583,8 +585,9 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
 
 	*prdata = rdata;
 	data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
-		InvalidOffsetNumber : GinGetDownlink(btree->entry);
-	data.updateBlkno = entryPreparePage(btree, lpage, off);
+		InvalidOffsetNumber : GinGetDownlink(entry->entry);
+	data.updateBlkno = updateblkno;
+	entryPreparePage(btree, lpage, off, entry, updateblkno);
 
 	maxoff = PageGetMaxOffsetNumber(lpage);
 	ptr = tupstore;
@@ -593,8 +596,8 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
 	{
 		if (i == off)
 		{
-			size = MAXALIGN(IndexTupleSize(btree->entry));
-			memcpy(ptr, btree->entry, size);
+			size = MAXALIGN(IndexTupleSize(entry->entry));
+			memcpy(ptr, entry->entry, size);
 			ptr += size;
 			totalsize += size + sizeof(ItemIdData);
 		}
@@ -608,8 +611,8 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
 
 	if (off == maxoff + 1)
 	{
-		size = MAXALIGN(IndexTupleSize(btree->entry));
-		memcpy(ptr, btree->entry, size);
+		size = MAXALIGN(IndexTupleSize(entry->entry));
+		memcpy(ptr, entry->entry, size);
 		ptr += size;
 		totalsize += size + sizeof(ItemIdData);
 	}
@@ -667,20 +670,22 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
 }
 
 /*
- * Prepare the state in 'btree' for inserting a downlink for given buffer.
+ * Construct insertion payload for inserting the downlink for given buffer.
  */
-static void
+static void *
 entryPrepareDownlink(GinBtree btree, Buffer lbuf)
 {
+	GinBtreeEntryInsertData *entry = palloc(sizeof(GinBtreeEntryInsertData));
 	Page		lpage = BufferGetPage(lbuf);
 	IndexTuple	itup;
 
 	itup = getRightMostTuple(lpage);
-
-	btree->entry = GinFormInteriorTuple(itup,
+	entry->entry = GinFormInteriorTuple(itup,
 										lpage,
 										BufferGetBlockNumber(lbuf));
-	btree->rightblkno = GinPageGetOpaque(lpage)->rightlink;
+	entry->isDelete = false;
+
+	return entry;
 }
 
 /*
@@ -724,6 +729,7 @@ ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum,
 	memset(btree, 0, sizeof(GinBtreeData));
 
 	btree->index = ginstate->index;
+	btree->rootBlkno = GIN_ROOT_BLKNO;
 	btree->ginstate = ginstate;
 
 	btree->findChildPage = entryLocateEntry;
@@ -743,5 +749,4 @@ ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum,
 	btree->entryAttnum = attnum;
 	btree->entryKey = key;
 	btree->entryCategory = category;
-	btree->isDelete = FALSE;
 }
diff --git a/src/backend/access/gin/ginget.c b/src/backend/access/gin/ginget.c
index 2a3991f..f2ee2fb 100644
--- a/src/backend/access/gin/ginget.c
+++ b/src/backend/access/gin/ginget.c
@@ -374,7 +374,7 @@ restartScanEntry:
 	ginPrepareEntryScan(&btreeEntry, entry->attnum,
 						entry->queryKey, entry->queryCategory,
 						ginstate);
-	stackEntry = ginFindLeafPage(&btreeEntry, GIN_ROOT_BLKNO, true);
+	stackEntry = ginFindLeafPage(&btreeEntry, true);
 	page = BufferGetPage(stackEntry->buffer);
 	needUnlock = TRUE;
 
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index 0a2883a..556e318 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -163,17 +163,20 @@ ginEntryInsert(GinState *ginstate,
 			   GinStatsData *buildStats)
 {
 	GinBtreeData btree;
+	GinBtreeEntryInsertData insertdata;
 	GinBtreeStack *stack;
 	IndexTuple	itup;
 	Page		page;
 
+	insertdata.isDelete = FALSE;
+
 	/* During index build, count the to-be-inserted entry */
 	if (buildStats)
 		buildStats->nEntries++;
 
 	ginPrepareEntryScan(&btree, attnum, key, category, ginstate);
 
-	stack = ginFindLeafPage(&btree, GIN_ROOT_BLKNO, false);
+	stack = ginFindLeafPage(&btree, false);
 	page = BufferGetPage(stack->buffer);
 
 	if (btree.findItem(&btree, stack))
@@ -201,7 +204,7 @@ ginEntryInsert(GinState *ginstate,
 		itup = addItemPointersToLeafTuple(ginstate, itup,
 										  items, nitem, buildStats);
 
-		btree.isDelete = TRUE;
+		insertdata.isDelete = TRUE;
 	}
 	else
 	{
@@ -211,8 +214,8 @@ ginEntryInsert(GinState *ginstate,
 	}
 
 	/* Insert the new or modified leaf tuple */
-	btree.entry = itup;
-	ginInsertValue(&btree, stack, buildStats);
+	insertdata.entry = itup;
+	ginInsertValue(&btree, stack, &insertdata, buildStats);
 	pfree(itup);
 }
 
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c
index ddac343..ca7bee1 100644
--- a/src/backend/access/gin/ginxlog.c
+++ b/src/backend/access/gin/ginxlog.c
@@ -774,7 +774,7 @@ ginContinueSplit(ginIncompleteSplit *split)
 	GinState	ginstate;
 	Relation	reln;
 	Buffer		buffer;
-	GinBtreeStack stack;
+	GinBtreeStack *stack;
 
 	/*
 	 * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u",  split->rootBlkno,
@@ -802,22 +802,22 @@ ginContinueSplit(ginIncompleteSplit *split)
 	}
 	else
 	{
-		ginPrepareDataScan(&btree, reln);
+		ginPrepareDataScan(&btree, reln, split->rootBlkno);
 	}
 
-	stack.blkno = split->leftBlkno;
-	stack.buffer = buffer;
-	stack.off = InvalidOffsetNumber;
-	stack.parent = NULL;
+	stack = palloc(sizeof(GinBtreeStack));
+	stack->blkno = split->leftBlkno;
+	stack->buffer = buffer;
+	stack->off = InvalidOffsetNumber;
+	stack->parent = NULL;
 
-	ginFindParents(&btree, &stack, split->rootBlkno);
+	ginFindParents(&btree, stack);
+	LockBuffer(stack->parent->buffer, GIN_UNLOCK);
+	ginFinishSplit(&btree, stack, NULL);
 
-	btree.prepareDownlink(&btree, buffer);
-	ginInsertValue(&btree, stack.parent, NULL);
+	/* buffer is released by ginFinishSplit */
 
 	FreeFakeRelcacheEntry(reln);
-
-	UnlockReleaseBuffer(buffer);
 }
 
 void
diff --git a/src/include/access/gin_private.h b/src/include/access/gin_private.h
index 3952935..3aa92b3 100644
--- a/src/include/access/gin_private.h
+++ b/src/include/access/gin_private.h
@@ -485,41 +485,56 @@ typedef struct GinBtreeData
 
 	/* insert methods */
 	OffsetNumber (*findChildPtr) (GinBtree, Page, BlockNumber, OffsetNumber);
-	bool		(*placeToPage) (GinBtree, Buffer, OffsetNumber, XLogRecData **);
-	Page		(*splitPage) (GinBtree, Buffer, Buffer, OffsetNumber, XLogRecData **);
-	void		(*prepareDownlink) (GinBtree, Buffer);
+	bool		(*placeToPage) (GinBtree, Buffer, OffsetNumber, void *, BlockNumber, XLogRecData **);
+	Page		(*splitPage) (GinBtree, Buffer, Buffer, OffsetNumber, void *, BlockNumber, XLogRecData **);
+	void		*(*prepareDownlink) (GinBtree, Buffer);
 	void		(*fillRoot) (GinBtree, Buffer, Buffer, Buffer);
 
 	bool		isData;
 
 	Relation	index;
+	BlockNumber	rootBlkno;
 	GinState   *ginstate;		/* not valid in a data scan */
 	bool		fullScan;
 	bool		isBuild;
 
-	BlockNumber rightblkno;
-
-	/* Entry options */
+	/* Search key for Entry tree */
 	OffsetNumber entryAttnum;
 	Datum		entryKey;
 	GinNullCategory entryCategory;
+
+	/* Search key for data tree (posting tree) */
+	ItemPointerData itemptr;
+} GinBtreeData;
+
+/* This represents a tuple to be inserted to entry tree. */
+typedef struct
+{
 	IndexTuple	entry;
 	bool		isDelete;
+} GinBtreeEntryInsertData;
 
-	/* Data (posting tree) options */
+/*
+ * This represents an itempointer, or many itempointers, to be inserted to
+ * a data (posting tree) leaf page
+ */
+typedef struct
+{
 	ItemPointerData *items;
 	uint32		nitem;
 	uint32		curitem;
+} GinBtreeDataLeafInsertData;
 
-	PostingItem pitem;
-} GinBtreeData;
+/* For internal data (posting tree) pages, use PostingItem */
 
-extern GinBtreeStack *ginFindLeafPage(GinBtree btree, BlockNumber rootBlkno, bool searchMode);
+extern GinBtreeStack *ginFindLeafPage(GinBtree btree, bool searchMode);
 extern Buffer ginStepRight(Buffer buffer, Relation index, int lockmode);
 extern void freeGinBtreeStack(GinBtreeStack *stack);
 extern void ginInsertValue(GinBtree btree, GinBtreeStack *stack,
+			   void *insertdata, GinStatsData *buildStats);
+extern void ginFindParents(GinBtree btree, GinBtreeStack *stack);
+extern void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
 			   GinStatsData *buildStats);
-extern void ginFindParents(GinBtree btree, GinBtreeStack *stack, BlockNumber rootBlkno);
 
 /* ginentrypage.c */
 extern IndexTuple GinFormTuple(GinState *ginstate,
@@ -543,7 +558,7 @@ extern void ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
 					  GinStatsData *buildStats);
 extern GinBtreeStack *ginScanBeginPostingTree(Relation index, BlockNumber rootBlkno);
 extern void ginDataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf);
-extern void ginPrepareDataScan(GinBtree btree, Relation index);
+extern void ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno);
 
 /* ginscan.c */
 
-- 
1.8.4.2

