From f43aeb97f766b24092c3758fa5d6a9f0e6676eaf Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Thu, 2 May 2024 15:21:32 +0200
Subject: [PATCH v20240505 2/8] Use mergesort in the leader process

The leader process (executing the serial part of the index build) spent
a significant part of the time in pg_qsort, after combining the partial
results from the workers. But we can improve this and move some of the
costs to the parallel part in workers - if workers produce sorted TID
lists, the leader can combine them by mergesort.

But to make this really efficient, the mergesort must not be executed
too many times. The workers may easily produce very short TID lists, if
there are many different keys, hitting the memory limit often. So this
adds an intermediate tuplesort pass into each worker, to combine TIDs
for each key and only then write the result into the shared tuplestore.

This means the number of mergesort invocations for each key should be
about the same as the number of workers. We can't really do better, and
it's low enough to keep the mergesort approach efficient.

Note: If we introduce a memory limit on GinBuffer (to not accumulate too
many TIDs in memory), we could end up with more chunks, but it should
not be very common.
---
 src/backend/access/gin/gininsert.c | 171 +++++++++++++++++++++++++----
 1 file changed, 148 insertions(+), 23 deletions(-)

diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index b353e155fc6..cf7a6278914 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -161,6 +161,14 @@ typedef struct
 	 * build callback etc.
 	 */
 	Tuplesortstate *bs_sortstate;
+
+	/*
+	 * The sortstate is used only within a worker for the first merge pass
+	 * that happens in the worker. In principle it doesn't need to be part of
+	 * the build state and we could pass it around directly, but it's more
+	 * convenient this way.
+	 */
+	Tuplesortstate *bs_worker_sort;
 } GinBuildState;
 
 
@@ -533,7 +541,7 @@ ginBuildCallbackParallel(Relation index, ItemPointer tid, Datum *values,
 								   key, attr->attlen, attr->attbyval,
 								   list, nlist, &tuplen);
 
-			tuplesort_putgintuple(buildstate->bs_sortstate, tup, tuplen);
+			tuplesort_putgintuple(buildstate->bs_worker_sort, tup, tuplen);
 
 			pfree(tup);
 		}
@@ -1127,7 +1135,6 @@ typedef struct GinBuffer
 
 	/* array of TID values */
 	int			nitems;
-	int			maxitems;
 	ItemPointerData *items;
 } GinBuffer;
 
@@ -1136,7 +1143,6 @@ static void
 AssertCheckGinBuffer(GinBuffer *buffer)
 {
 #ifdef USE_ASSERT_CHECKING
-	Assert(buffer->nitems <= buffer->maxitems);
 #endif
 }
 
@@ -1240,28 +1246,22 @@ GinBufferStoreTuple(GinBuffer *buffer, GinTuple *tup)
 			buffer->key = (Datum) 0;
 	}
 
-	/* enlarge the TID buffer, if needed */
-	if (buffer->nitems + tup->nitems > buffer->maxitems)
+	/* copy the new TIDs into the buffer, combine using merge-sort */
 	{
-		/* 64 seems like a good init value */
-		buffer->maxitems = Max(buffer->maxitems, 64);
+		int			nnew;
+		ItemPointer new;
 
-		while (buffer->nitems + tup->nitems > buffer->maxitems)
-			buffer->maxitems *= 2;
+		new = ginMergeItemPointers(buffer->items, buffer->nitems,
+								   items, tup->nitems, &nnew);
 
-		if (buffer->items == NULL)
-			buffer->items = palloc(buffer->maxitems * sizeof(ItemPointerData));
-		else
-			buffer->items = repalloc(buffer->items,
-									 buffer->maxitems * sizeof(ItemPointerData));
-	}
+		Assert(nnew == buffer->nitems + tup->nitems);
 
-	/* now we should be guaranteed to have enough space for all the TIDs */
-	Assert(buffer->nitems + tup->nitems <= buffer->maxitems);
+		if (buffer->items)
+			pfree(buffer->items);
 
-	/* copy the new TIDs into the buffer */
-	memcpy(&buffer->items[buffer->nitems], items, sizeof(ItemPointerData) * tup->nitems);
-	buffer->nitems += tup->nitems;
+		buffer->items = new;
+		buffer->nitems = nnew;
+	}
 
 	AssertCheckItemPointers(buffer->items, buffer->nitems, false);
 }
@@ -1302,6 +1302,21 @@ GinBufferReset(GinBuffer *buffer)
 	/* XXX should do something with extremely large array of items? */
 }
 
+/* XXX probably would be better to have a memory context for the buffer */
+static void
+GinBufferFree(GinBuffer *buffer)
+{
+	if (buffer->items)
+		pfree(buffer->items);
+
+	/* release byref values, do nothing for by-val ones */
+	if (!GinBufferIsEmpty(buffer) &&
+		(buffer->category == GIN_CAT_NORM_KEY) && !buffer->typbyval)
+		pfree(DatumGetPointer(buffer->key));
+
+	pfree(buffer);
+}
+
 /*
  * XXX Maybe check size of the TID arrays, and return false if it's too
  * large (more thant maintenance_work_mem or something?).
@@ -1375,7 +1390,7 @@ _gin_parallel_merge(GinBuildState *state)
 			 * the data into the insert, and start a new entry for current
 			 * GinTuple.
 			 */
-			GinBufferSortItems(buffer);
+			AssertCheckItemPointers(buffer->items, buffer->nitems, true);
 
 			ginEntryInsert(&state->ginstate,
 						   buffer->attnum, buffer->key, buffer->category,
@@ -1392,7 +1407,7 @@ _gin_parallel_merge(GinBuildState *state)
 	/* flush data remaining in the buffer (for the last key) */
 	if (!GinBufferIsEmpty(buffer))
 	{
-		GinBufferSortItems(buffer);
+		AssertCheckItemPointers(buffer->items, buffer->nitems, true);
 
 		ginEntryInsert(&state->ginstate,
 					   buffer->attnum, buffer->key, buffer->category,
@@ -1402,6 +1417,9 @@ _gin_parallel_merge(GinBuildState *state)
 		GinBufferReset(buffer);
 	}
 
+	/* relase all the memory */
+	GinBufferFree(buffer);
+
 	tuplesort_end(state->bs_sortstate);
 
 	return reltuples;
@@ -1440,6 +1458,102 @@ _gin_leader_participate_as_worker(GinBuildState *buildstate, Relation heap, Rela
 								 ginleader->sharedsort, heap, index, sortmem, true);
 }
 
+/*
+ * _gin_process_worker_data
+ *		First phase of the key merging, happening in the worker.
+ *
+ * Depending on the number of distinct keys, the TID lists produced by the
+ * callback may be very short. But combining many tiny lists is expensive,
+ * so we try to do as much as possible in the workers and only then pass the
+ * results to the leader.
+ *
+ * We read the tuples sorted by the key, and merge them into larger lists.
+ * At the moment there's no memory limit, so this will just produce one
+ * huge (sorted) list per key in each worker. Which means the leader will
+ * do a very limited number of mergesorts, which is good.
+ */
+static void
+_gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort)
+{
+	GinTuple   *tup;
+	Size		tuplen;
+
+	GinBuffer  *buffer;
+
+	/* initialize buffer to combine entries for the same key */
+	buffer = GinBufferInit();
+
+	/* sort the raw per-worker data */
+	tuplesort_performsort(state->bs_worker_sort);
+
+	/*
+	 * Read the GIN tuples from the shared tuplesort, sorted by the key, and
+	 * merge them into larger chunks for the leader to combine.
+	 */
+	while ((tup = tuplesort_getgintuple(worker_sort, &tuplen, true)) != NULL)
+	{
+
+		CHECK_FOR_INTERRUPTS();
+
+		/*
+		 * If the buffer can accept the new GIN tuple, just store it there and
+		 * we're done. If it's a different key (or maybe too much data) flush
+		 * the current contents into the index first.
+		 */
+		if (!GinBufferCanAddKey(buffer, tup))
+		{
+			GinTuple   *ntup;
+			Size		ntuplen;
+
+			/*
+			 * Buffer is not empty and it's storing a different key - flush
+			 * the data into the insert, and start a new entry for current
+			 * GinTuple.
+			 */
+			GinBufferSortItems(buffer);
+
+			ntup = _gin_build_tuple(buffer->attnum, buffer->category,
+									buffer->key, buffer->typlen, buffer->typbyval,
+									buffer->items, buffer->nitems, &ntuplen);
+
+			tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen);
+
+			pfree(ntup);
+
+			/* discard the existing data */
+			GinBufferReset(buffer);
+		}
+
+		/* now remember the new key */
+		GinBufferStoreTuple(buffer, tup);
+	}
+
+	/* flush data remaining in the buffer (for the last key) */
+	if (!GinBufferIsEmpty(buffer))
+	{
+		GinTuple   *ntup;
+		Size		ntuplen;
+
+		GinBufferSortItems(buffer);
+
+		ntup = _gin_build_tuple(buffer->attnum, buffer->category,
+								buffer->key, buffer->typlen, buffer->typbyval,
+								buffer->items, buffer->nitems, &ntuplen);
+
+		tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen);
+
+		pfree(ntup);
+
+		/* discard the existing data */
+		GinBufferReset(buffer);
+	}
+
+	/* relase all the memory */
+	GinBufferFree(buffer);
+
+	tuplesort_end(worker_sort);
+}
+
 /*
  * Perform a worker's portion of a parallel sort.
  *
@@ -1471,6 +1585,10 @@ _gin_parallel_scan_and_build(GinBuildState *state,
 	state->bs_sortstate = tuplesort_begin_index_gin(sortmem, coordinate,
 													TUPLESORT_NONE);
 
+	/* Local per-worker sort of raw-data */
+	state->bs_worker_sort = tuplesort_begin_index_gin(sortmem, NULL,
+													  TUPLESORT_NONE);
+
 	/* Join parallel scan */
 	indexInfo = BuildIndexInfo(index);
 	indexInfo->ii_Concurrent = ginshared->isconcurrent;
@@ -1508,7 +1626,7 @@ _gin_parallel_scan_and_build(GinBuildState *state,
 								   key, attr->attlen, attr->attbyval,
 								   list, nlist, &len);
 
-			tuplesort_putgintuple(state->bs_sortstate, tup, len);
+			tuplesort_putgintuple(state->bs_worker_sort, tup, len);
 
 			pfree(tup);
 		}
@@ -1517,6 +1635,13 @@ _gin_parallel_scan_and_build(GinBuildState *state,
 		ginInitBA(&state->accum);
 	}
 
+	/*
+	 * Do the first phase of in-worker processing - sort the data produced by
+	 * the callback, and combine them into much larger chunks and place that
+	 * into the shared tuplestore for leader to process.
+	 */
+	_gin_process_worker_data(state, state->bs_worker_sort);
+
 	/* sort the GIN tuples built by this worker */
 	tuplesort_performsort(state->bs_sortstate);
 
-- 
2.44.0

