From 8408fa11a4ac56709815e5dfbde6d25d8ac4257d Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Thu, 2 May 2024 15:21:32 +0200
Subject: [PATCH v20240620 2/7] Use mergesort in the leader process

The leader process (executing the serial part of the index build) spent
a significant part of the time in pg_qsort, after combining the partial
results from the workers. But we can improve this and move some of the
costs to the parallel part in workers - if workers produce sorted TID
lists, the leader can combine them by mergesort.

But to make this really efficient, the mergesort must not be executed
too many times. The workers may easily produce very short TID lists, if
there are many different keys, hitting the memory limit often. So this
adds an intermediate tuplesort pass into each worker, to combine TIDs
for each key and only then write the result into the shared tuplestore.

This means the number of mergesort invocations for each key should be
about the same as the number of workers. We can't really do better, and
it's low enough to keep the mergesort approach efficient.

Note: If we introduce a memory limit on GinBuffer (to not accumulate too
many TIDs in memory), we could end up with more chunks, but it should
not be very common.
---
 src/backend/access/gin/gininsert.c | 191 ++++++++++++++++++++++++-----
 1 file changed, 159 insertions(+), 32 deletions(-)

diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index cdadd389185..8d46b53c43b 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -160,6 +160,14 @@ typedef struct
 	 * build callback etc.
 	 */
 	Tuplesortstate *bs_sortstate;
+
+	/*
+	 * The sortstate is used only within a worker for the first merge pass
+	 * that happens in the worker. In principle it doesn't need to be part of
+	 * the build state and we could pass it around directly, but it's more
+	 * convenient this way.
+	 */
+	Tuplesortstate *bs_worker_sort;
 } GinBuildState;
 
 
@@ -463,23 +471,23 @@ ginBuildCallback(Relation index, ItemPointer tid, Datum *values,
 }
 
 /*
- * XXX Instead of writing the entries directly into the shared tuplesort,
- * we might write them into a local one, do a sort in the worker, combine
+ * Instead of writing the entries directly into the shared tuplesort, write
+ * them into a local one (in each worker), do a sort in the worker, combine
  * the results, and only then write the results into the shared tuplesort.
  * For large tables with many different keys that's going to work better
  * than the current approach where we don't get many matches in work_mem
  * (maybe this should use 32MB, which is what we use when planning, but
- * even that may not be sufficient). Which means we are likely to have
- * many entries with a small number of TIDs, forcing the leader to merge
- * the data, often amounting to ~50% of the serial part. By doing the
- * first sort workers, the leader then could do fewer merges with longer
- * TID lists, which is much cheaprr. Also, the amount of data sent from
- * workers to the leader woiuld be lower.
+ * even that may not be sufficient). Which means we would end up with many
+ * entries with a small number of TIDs, forcing the leader to merge the data,
+ * often amounting to ~50% of the serial part. By doing the first sort in
+ * workers, this work is parallelized and the leader does fewer merges with
+ * longer TID lists, which is much cheaper and more efficient. Also, the
+ * amount of data sent from workers to the leader gets be lower.
  *
  * The disadvantage is increased disk space usage, possibly up to 2x, if
  * no entries get combined at the worker level.
  *
- * It would be possible to partition the data into multiple tuplesorts
+ * XXX It would be possible to partition the data into multiple tuplesorts
  * per worker (by hashing) - we don't need the data produced by workers
  * to be perfectly sorted, and we could even live with multiple entries
  * for the same key (in case it has multiple binary representations with
@@ -535,7 +543,7 @@ ginBuildCallbackParallel(Relation index, ItemPointer tid, Datum *values,
 								   key, attr->attlen, attr->attbyval,
 								   list, nlist, &tuplen);
 
-			tuplesort_putgintuple(buildstate->bs_sortstate, tup, tuplen);
+			tuplesort_putgintuple(buildstate->bs_worker_sort, tup, tuplen);
 
 			pfree(tup);
 		}
@@ -1132,7 +1140,6 @@ typedef struct GinBuffer
 
 	/* array of TID values */
 	int			nitems;
-	int			maxitems;
 	ItemPointerData *items;
 } GinBuffer;
 
@@ -1141,7 +1148,6 @@ static void
 AssertCheckGinBuffer(GinBuffer *buffer)
 {
 #ifdef USE_ASSERT_CHECKING
-	Assert(buffer->nitems <= buffer->maxitems);
 #endif
 }
 
@@ -1245,28 +1251,22 @@ GinBufferStoreTuple(GinBuffer *buffer, GinTuple *tup)
 			buffer->key = (Datum) 0;
 	}
 
-	/* enlarge the TID buffer, if needed */
-	if (buffer->nitems + tup->nitems > buffer->maxitems)
+	/* copy the new TIDs into the buffer, combine using merge-sort */
 	{
-		/* 64 seems like a good init value */
-		buffer->maxitems = Max(buffer->maxitems, 64);
+		int			nnew;
+		ItemPointer new;
 
-		while (buffer->nitems + tup->nitems > buffer->maxitems)
-			buffer->maxitems *= 2;
+		new = ginMergeItemPointers(buffer->items, buffer->nitems,
+								   items, tup->nitems, &nnew);
 
-		if (buffer->items == NULL)
-			buffer->items = palloc(buffer->maxitems * sizeof(ItemPointerData));
-		else
-			buffer->items = repalloc(buffer->items,
-									 buffer->maxitems * sizeof(ItemPointerData));
-	}
+		Assert(nnew == buffer->nitems + tup->nitems);
 
-	/* now we should be guaranteed to have enough space for all the TIDs */
-	Assert(buffer->nitems + tup->nitems <= buffer->maxitems);
+		if (buffer->items)
+			pfree(buffer->items);
 
-	/* copy the new TIDs into the buffer */
-	memcpy(&buffer->items[buffer->nitems], items, sizeof(ItemPointerData) * tup->nitems);
-	buffer->nitems += tup->nitems;
+		buffer->items = new;
+		buffer->nitems = nnew;
+	}
 
 	AssertCheckItemPointers(buffer->items, buffer->nitems, false);
 }
@@ -1316,6 +1316,23 @@ GinBufferReset(GinBuffer *buffer)
 	 */
 }
 
+/*
+ * Release all memory associated with the GinBuffer (including TID array).
+ */
+static void
+GinBufferFree(GinBuffer *buffer)
+{
+	if (buffer->items)
+		pfree(buffer->items);
+
+	/* release byref values, do nothing for by-val ones */
+	if (!GinBufferIsEmpty(buffer) &&
+		(buffer->category == GIN_CAT_NORM_KEY) && !buffer->typbyval)
+		pfree(DatumGetPointer(buffer->key));
+
+	pfree(buffer);
+}
+
 /*
  * XXX This could / should also enforce a memory limit by checking the size of
  * the TID array, and returning false if it's too large (more thant work_mem,
@@ -1392,7 +1409,7 @@ _gin_parallel_merge(GinBuildState *state)
 			 * the data into the insert, and start a new entry for current
 			 * GinTuple.
 			 */
-			GinBufferSortItems(buffer);
+			AssertCheckItemPointers(buffer->items, buffer->nitems, true);
 
 			ginEntryInsert(&state->ginstate,
 						   buffer->attnum, buffer->key, buffer->category,
@@ -1409,7 +1426,7 @@ _gin_parallel_merge(GinBuildState *state)
 	/* flush data remaining in the buffer (for the last key) */
 	if (!GinBufferIsEmpty(buffer))
 	{
-		GinBufferSortItems(buffer);
+		AssertCheckItemPointers(buffer->items, buffer->nitems, true);
 
 		ginEntryInsert(&state->ginstate,
 					   buffer->attnum, buffer->key, buffer->category,
@@ -1419,6 +1436,9 @@ _gin_parallel_merge(GinBuildState *state)
 		GinBufferReset(buffer);
 	}
 
+	/* relase all the memory */
+	GinBufferFree(buffer);
+
 	tuplesort_end(state->bs_sortstate);
 
 	return reltuples;
@@ -1457,6 +1477,102 @@ _gin_leader_participate_as_worker(GinBuildState *buildstate, Relation heap, Rela
 								 ginleader->sharedsort, heap, index, sortmem, true);
 }
 
+/*
+ * _gin_process_worker_data
+ *		First phase of the key merging, happening in the worker.
+ *
+ * Depending on the number of distinct keys, the TID lists produced by the
+ * callback may be very short. But combining many tiny lists is expensive,
+ * so we try to do as much as possible in the workers and only then pass the
+ * results to the leader.
+ *
+ * We read the tuples sorted by the key, and merge them into larger lists.
+ * At the moment there's no memory limit, so this will just produce one
+ * huge (sorted) list per key in each worker. Which means the leader will
+ * do a very limited number of mergesorts, which is good.
+ */
+static void
+_gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort)
+{
+	GinTuple   *tup;
+	Size		tuplen;
+
+	GinBuffer  *buffer;
+
+	/* initialize buffer to combine entries for the same key */
+	buffer = GinBufferInit();
+
+	/* sort the raw per-worker data */
+	tuplesort_performsort(state->bs_worker_sort);
+
+	/*
+	 * Read the GIN tuples from the shared tuplesort, sorted by the key, and
+	 * merge them into larger chunks for the leader to combine.
+	 */
+	while ((tup = tuplesort_getgintuple(worker_sort, &tuplen, true)) != NULL)
+	{
+
+		CHECK_FOR_INTERRUPTS();
+
+		/*
+		 * If the buffer can accept the new GIN tuple, just store it there and
+		 * we're done. If it's a different key (or maybe too much data) flush
+		 * the current contents into the index first.
+		 */
+		if (!GinBufferCanAddKey(buffer, tup))
+		{
+			GinTuple   *ntup;
+			Size		ntuplen;
+
+			/*
+			 * Buffer is not empty and it's storing a different key - flush
+			 * the data into the insert, and start a new entry for current
+			 * GinTuple.
+			 */
+			GinBufferSortItems(buffer);
+
+			ntup = _gin_build_tuple(buffer->attnum, buffer->category,
+									buffer->key, buffer->typlen, buffer->typbyval,
+									buffer->items, buffer->nitems, &ntuplen);
+
+			tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen);
+
+			pfree(ntup);
+
+			/* discard the existing data */
+			GinBufferReset(buffer);
+		}
+
+		/* now remember the new key */
+		GinBufferStoreTuple(buffer, tup);
+	}
+
+	/* flush data remaining in the buffer (for the last key) */
+	if (!GinBufferIsEmpty(buffer))
+	{
+		GinTuple   *ntup;
+		Size		ntuplen;
+
+		GinBufferSortItems(buffer);
+
+		ntup = _gin_build_tuple(buffer->attnum, buffer->category,
+								buffer->key, buffer->typlen, buffer->typbyval,
+								buffer->items, buffer->nitems, &ntuplen);
+
+		tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen);
+
+		pfree(ntup);
+
+		/* discard the existing data */
+		GinBufferReset(buffer);
+	}
+
+	/* relase all the memory */
+	GinBufferFree(buffer);
+
+	tuplesort_end(worker_sort);
+}
+
 /*
  * Perform a worker's portion of a parallel sort.
  *
@@ -1488,6 +1604,10 @@ _gin_parallel_scan_and_build(GinBuildState *state,
 	state->bs_sortstate = tuplesort_begin_index_gin(sortmem, coordinate,
 													TUPLESORT_NONE);
 
+	/* Local per-worker sort of raw-data */
+	state->bs_worker_sort = tuplesort_begin_index_gin(sortmem, NULL,
+													  TUPLESORT_NONE);
+
 	/* Join parallel scan */
 	indexInfo = BuildIndexInfo(index);
 	indexInfo->ii_Concurrent = ginshared->isconcurrent;
@@ -1525,7 +1645,7 @@ _gin_parallel_scan_and_build(GinBuildState *state,
 								   key, attr->attlen, attr->attbyval,
 								   list, nlist, &len);
 
-			tuplesort_putgintuple(state->bs_sortstate, tup, len);
+			tuplesort_putgintuple(state->bs_worker_sort, tup, len);
 
 			pfree(tup);
 		}
@@ -1534,6 +1654,13 @@ _gin_parallel_scan_and_build(GinBuildState *state,
 		ginInitBA(&state->accum);
 	}
 
+	/*
+	 * Do the first phase of in-worker processing - sort the data produced by
+	 * the callback, and combine them into much larger chunks and place that
+	 * into the shared tuplestore for leader to process.
+	 */
+	_gin_process_worker_data(state, state->bs_worker_sort);
+
 	/* sort the GIN tuples built by this worker */
 	tuplesort_performsort(state->bs_sortstate);
 
-- 
2.45.2

