>From ac038643d402474522ac38118a379d7523231fbd Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 13 Nov 2013 18:15:22 +0200
Subject: [PATCH 3/4] More GIN refactoring.

Split off the portion of ginInsertValue that inserts the tuple to
current level into a separate function, ginPlaceToPage. ginInsertValue's
charter is now to recurse up the tree to insert the downlink, when a page
split is required.

This is in preparation for a patch to change the way incomplete splits
are handled, which will need to do these operations separately. And IMHO
makes the code more readable anyway.
---
 src/backend/access/gin/ginbtree.c | 276 ++++++++++++++++++++------------------
 1 file changed, 148 insertions(+), 128 deletions(-)

diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c
index c4801d5..7248b06 100644
--- a/src/backend/access/gin/ginbtree.c
+++ b/src/backend/access/gin/ginbtree.c
@@ -280,80 +280,115 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
 }
 
 /*
- * Insert value (stored in GinBtree) to tree described by stack
- *
- * During an index build, buildStats is non-null and the counters
- * it contains are incremented as needed.
+ * Returns true if the insertion is done, false if the page was split and
+ * downlink insertion is pending.
  *
- * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
+ * stack->buffer is locked on entry, and is kept locked.
  */
-void
-ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
+static bool
+ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
+			   GinStatsData *buildStats)
 {
-	GinBtreeStack *parent;
-	BlockNumber rootBlkno;
-	Page		page,
-				rpage,
-				lpage;
+	Page		page = BufferGetPage(stack->buffer);
+	XLogRecData *rdata;
+	bool		fit;
 
-	/* extract root BlockNumber from stack */
-	Assert(stack != NULL);
-	parent = stack;
-	while (parent->parent)
-		parent = parent->parent;
-	rootBlkno = parent->blkno;
-	Assert(BlockNumberIsValid(rootBlkno));
+	START_CRIT_SECTION();
+	fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata);
+	if (fit)
+	{
+		MarkBufferDirty(stack->buffer);
 
-	/* this loop crawls up the stack until the insertion is complete */
-	for (;;)
+		if (RelationNeedsWAL(btree->index))
+		{
+			XLogRecPtr	recptr;
+
+			recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
+			PageSetLSN(page, recptr);
+		}
+
+		END_CRIT_SECTION();
+
+		return true;
+	}
+	else
 	{
-		XLogRecData *rdata;
+		/* Didn't fit, have to split */
+		Buffer		rbuffer;
+		Page		newlpage;
 		BlockNumber savedRightLink;
-		bool		fit;
+		GinBtreeStack *parent;
+		Page		lpage,
+					rpage;
+
+		END_CRIT_SECTION();
+
+		rbuffer = GinNewBuffer(btree->index);
 
-		page = BufferGetPage(stack->buffer);
 		savedRightLink = GinPageGetOpaque(page)->rightlink;
 
-		START_CRIT_SECTION();
-		fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata);
-		if (fit)
+		/*
+		 * newlpage is a pointer to memory page, it is not associated with
+		 * a buffer. stack->buffer is not touched yet.
+		 */
+		newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata);
+
+		((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno;
+
+		/* During index build, count the newly-split page */
+		if (buildStats)
 		{
+			if (btree->isData)
+				buildStats->nDataPages++;
+			else
+				buildStats->nEntryPages++;
+		}
+
+		parent = stack->parent;
+
+		if (parent == NULL)
+		{
+			/*
+			 * split root, so we need to allocate new left page and place
+			 * pointer on root to left and right page
+			 */
+			Buffer		lbuffer = GinNewBuffer(btree->index);
+
+			((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
+			((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
+
+			lpage = BufferGetPage(lbuffer);
+			rpage = BufferGetPage(rbuffer);
+
+			GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
+			GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
+			((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
+
+			START_CRIT_SECTION();
+
+			GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF);
+			PageRestoreTempPage(newlpage, lpage);
+			btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer);
+
+			MarkBufferDirty(rbuffer);
+			MarkBufferDirty(lbuffer);
 			MarkBufferDirty(stack->buffer);
 
 			if (RelationNeedsWAL(btree->index))
 			{
 				XLogRecPtr	recptr;
 
-				recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
+				recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
 				PageSetLSN(page, recptr);
+				PageSetLSN(lpage, recptr);
+				PageSetLSN(rpage, recptr);
 			}
 
-			LockBuffer(stack->buffer, GIN_UNLOCK);
-			END_CRIT_SECTION();
-
-			freeGinBtreeStack(stack);
-
-			return;
-		}
-		else
-		{
-			/* Didn't fit, have to split */
-			Buffer		rbuffer;
-			Page		newlpage;
-
+			UnlockReleaseBuffer(rbuffer);
+			UnlockReleaseBuffer(lbuffer);
 			END_CRIT_SECTION();
 
-			rbuffer = GinNewBuffer(btree->index);
-
-			/*
-			 * newlpage is a pointer to memory page, it is not associated with
-			 * a buffer. stack->buffer is not touched yet.
-			 */
-			newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata);
-
-			((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno;
-
-			/* During index build, count the newly-split page */
+			/* During index build, count the newly-added root page */
 			if (buildStats)
 			{
 				if (btree->isData)
@@ -362,98 +397,83 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
 					buildStats->nEntryPages++;
 			}
 
-			parent = stack->parent;
-
-			if (parent == NULL)
-			{
-				/*
-				 * split root, so we need to allocate new left page and place
-				 * pointer on root to left and right page
-				 */
-				Buffer		lbuffer = GinNewBuffer(btree->index);
-
-				((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
-				((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
-
-				page = BufferGetPage(stack->buffer);
-				lpage = BufferGetPage(lbuffer);
-				rpage = BufferGetPage(rbuffer);
-
-				GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
-				GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
-				((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
-
-				START_CRIT_SECTION();
-
-				GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF);
-				PageRestoreTempPage(newlpage, lpage);
-				btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer);
-
-				MarkBufferDirty(rbuffer);
-				MarkBufferDirty(lbuffer);
-				MarkBufferDirty(stack->buffer);
+			return true;
+		}
+		else
+		{
+			/* split non-root page */
+			((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE;
+			((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink;
 
-				if (RelationNeedsWAL(btree->index))
-				{
-					XLogRecPtr	recptr;
+			lpage = BufferGetPage(stack->buffer);
+			rpage = BufferGetPage(rbuffer);
 
-					recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
-					PageSetLSN(page, recptr);
-					PageSetLSN(lpage, recptr);
-					PageSetLSN(rpage, recptr);
-				}
+			GinPageGetOpaque(rpage)->rightlink = savedRightLink;
+			GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
 
-				UnlockReleaseBuffer(rbuffer);
-				UnlockReleaseBuffer(lbuffer);
-				LockBuffer(stack->buffer, GIN_UNLOCK);
-				END_CRIT_SECTION();
+			START_CRIT_SECTION();
+			PageRestoreTempPage(newlpage, lpage);
 
-				freeGinBtreeStack(stack);
+			MarkBufferDirty(rbuffer);
+			MarkBufferDirty(stack->buffer);
 
-				/* During index build, count the newly-added root page */
-				if (buildStats)
-				{
-					if (btree->isData)
-						buildStats->nDataPages++;
-					else
-						buildStats->nEntryPages++;
-				}
+			if (RelationNeedsWAL(btree->index))
+			{
+				XLogRecPtr	recptr;
 
-				return;
+				recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
+				PageSetLSN(lpage, recptr);
+				PageSetLSN(rpage, recptr);
 			}
-			else
-			{
-				/* split non-root page */
-				((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE;
-				((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink;
+			UnlockReleaseBuffer(rbuffer);
+			END_CRIT_SECTION();
 
-				lpage = BufferGetPage(stack->buffer);
-				rpage = BufferGetPage(rbuffer);
+			return false;
+		}
+	}
+}
 
-				GinPageGetOpaque(rpage)->rightlink = savedRightLink;
-				GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
+/*
+ * Insert value (stored in GinBtree) to tree described by stack
+ *
+ * During an index build, buildStats is non-null and the counters
+ * it contains are incremented as needed.
+ *
+ * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
+ */
+void
+ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
+{
+	GinBtreeStack *parent;
+	BlockNumber rootBlkno;
+	Page		page;
+
+	/* extract root BlockNumber from stack */
+	Assert(stack != NULL);
+	parent = stack;
+	while (parent->parent)
+		parent = parent->parent;
+	rootBlkno = parent->blkno;
+	Assert(BlockNumberIsValid(rootBlkno));
 
-				START_CRIT_SECTION();
-				PageRestoreTempPage(newlpage, lpage);
+	/* this loop crawls up the stack until the insertion is complete */
+	for (;;)
+	{
+		bool done;
 
-				MarkBufferDirty(rbuffer);
-				MarkBufferDirty(stack->buffer);
+		done = ginPlaceToPage(btree, rootBlkno, stack, buildStats);
 
-				if (RelationNeedsWAL(btree->index))
-				{
-					XLogRecPtr	recptr;
+		/* just to be extra sure we don't delete anything by accident... */
+		btree->isDelete = FALSE;
 
-					recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
-					PageSetLSN(lpage, recptr);
-					PageSetLSN(rpage, recptr);
-				}
-				UnlockReleaseBuffer(rbuffer);
-				END_CRIT_SECTION();
-			}
+		if (done)
+		{
+			LockBuffer(stack->buffer, GIN_UNLOCK);
+			freeGinBtreeStack(stack);
+			break;
 		}
 
 		btree->prepareDownlink(btree, stack->buffer);
-		btree->isDelete = FALSE;
 
 		/* search parent to lock */
 		LockBuffer(parent->buffer, GIN_EXCLUSIVE);
-- 
1.8.4.rc3

