From 2633c013c4921a38c0ff16dc9119f4212ceb2c80 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@2ndquadrant.com>
Date: Mon, 24 Jun 2024 01:46:48 +0200
Subject: [PATCH v20240624 6/7] Enforce memory limit when combining tuples

When combinnig intermediate results during parallel GIN index build, we
want to restrict the memory usage. In ginBuildCallbackParallel() this is
done simply by dumping working state into tuplesort after hitting the
memory limit.

This commit introduces memory limit to the following steps, merging the
intermediate results in both worker and leader. The merge only deals
with one key at a time, and the primary risk is the key might have too
many different TIDs. This is not very likely, because the TID array only
needs 6B per item, it's a potential issue.

We can't simply dump the whole current TID list - the index requires the
TID values to be inserted in the correct order, but if the lists overlap
(as they do between workers), the tail of the list may change during the
mergesort. But thanks to sorting GIN tuples by first TID, we can derive
a safe TID horizon - we know no future tuples will have TIDs from before
this value, so it's safe to output this part of the list.

This commit tracks "frozen" part of the the TID list, which is the part
we know won't change after merging additional TID lists. Then if the TID
list grows too large (more than 64kB), we try to trim it - write out the
frozen part of the list, and discard it from the buffer. We only do the
trimming if there's at least 1024 frozen items - we don't want to write
the data into the index in tiny chunks.

The freezing also allows us to skip the frozen part during mergesort.
The frozen part of the list is known to be fully sorted, so we can just
skip it and mergesort only the rest of the data.

Note: These limites (1024 and 64kB) are mostly arbitrary - but seem high
enough to get good efficiency for compression/batching, but low enough
to release memory early and work in small increments.
---
 src/backend/access/gin/gininsert.c | 232 ++++++++++++++++++++++++++++-
 src/include/access/gin.h           |   1 +
 2 files changed, 225 insertions(+), 8 deletions(-)

diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index bb993dfdf80..cc380f03593 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -1154,8 +1154,12 @@ typedef struct GinBuffer
 	int16		typlen;
 	bool		typbyval;
 
+	/* Number of TIDs to collect before attempt to write some out. */
+	int			maxitems;
+
 	/* array of TID values */
 	int			nitems;
+	int			nfrozen;
 	SortSupport ssup;			/* for sorting/comparing keys */
 	ItemPointerData *items;
 } GinBuffer;
@@ -1222,6 +1226,18 @@ GinBufferInit(Relation index)
 				nKeys;
 	TupleDesc	desc = RelationGetDescr(index);
 
+	/*
+	 * How many items can we fit into the memory limit? We don't want to end
+	 * with too many TIDs. and 64kB seems more than enough. But maybe this
+	 * should be tied to maintenance_work_mem or something like that?
+	 *
+	 * XXX This is not enough to prevent repeated merges after a wraparound
+	 * of the parallel scan, but it should be enough to make the merges cheap
+	 * because it quickly reaches the end of the second list and can just
+	 * memcpy the rest without walking it item by item.
+	 */
+	buffer->maxitems = (64 * 1024L) / sizeof(ItemPointerData);
+
 	nKeys = IndexRelationGetNumberOfKeyAttributes(index);
 
 	buffer->ssup = palloc0(sizeof(SortSupportData) * nKeys);
@@ -1303,6 +1319,54 @@ GinBufferKeyEquals(GinBuffer *buffer, GinTuple *tup)
 	return (r == 0);
 }
 
+/*
+ * GinBufferShouldTrim
+ *		Should we trim the list of item pointers?
+ *
+ * By trimming we understand writing out and removing the tuple IDs that
+ * we know can't change by future merges. We can deduce the TID up to which
+ * this is guaranteed from the "first" TID in each GIN tuple, which provides
+ * a "horizon" (for a given key) thanks to the sort.
+ *
+ * We don't want to do this too often - compressing longer TID lists is more
+ * efficient. But we also don't want to accumulate too many TIDs, for two
+ * reasons. First, it consumes memory and we might exceed maintenance_work_mem
+ * (or whatever limit applies), even if that's unlikely because TIDs are very
+ * small so we can fit a lot of them. Second, and more importantly, long TID
+ * lists are an issue if the scan wraps around, because a key may get a very
+ * wide list (with min/max TID for that key), forcing "full" mergesorts for
+ * every list merged into it (instead of the efficient append).
+ *
+ * So we look at two things when deciding if to trim - if the resulting list
+ * (after adding TIDs from the new tuple) would be too long, and if there is
+ * enough TIDs to trim (with values less than "first" TID from the new tuple),
+ * we do the trim. By enough we mean at least 128 TIDs (mostly an arbitrary
+ * number).
+ *
+ * XXX This does help for the wraparound case too, because the "wide" TID list
+ * is essentially two ranges - one at the beginning of the table, one at the
+ * end. And all the other ranges (from GIN tuples) come in between, and also
+ * do not overlap. So by trimming up to the range we're about to add, this
+ * guarantees we'll be able to "concatenate" the two lists cheaply.
+ */
+static bool
+GinBufferShouldTrim(GinBuffer *buffer, GinTuple *tup)
+{
+	/* not enough TIDs to trim (1024 is somewhat arbitrary number) */
+	if (buffer->nfrozen < 1024)
+		return false;
+
+	/* We're not to hit the memory limit after adding this tuple. */
+	if ((buffer->nitems + tup->nitems) < buffer->maxitems)
+		return false;
+
+	/*
+	 * OK, we have enough frozen TIDs to flush, and we have hit the memory
+	 * limit, so it's time to write it out.
+	 */
+	return true;
+}
+
 /*
  * GinBufferStoreTuple
  *		Add data (especially TID list) from a GIN tuple to the buffer.
@@ -1331,6 +1395,11 @@ GinBufferKeyEquals(GinBuffer *buffer, GinTuple *tup)
  *
  * XXX We expect the tuples to contain sorted TID lists, so maybe we should
  * check that's true with an assert.
+ *
+ * XXX Maybe we could/should allocate the buffer once and then keep it
+ * without palloc/pfree. That won't help when just calling the mergesort,
+ * as that does palloc internally, but if we detected the append case,
+ * we could do without it. Not sure how much overhead it is, though.
  */
 static void
 GinBufferStoreTuple(GinBuffer *buffer, GinTuple *tup)
@@ -1359,21 +1428,72 @@ GinBufferStoreTuple(GinBuffer *buffer, GinTuple *tup)
 			buffer->key = (Datum) 0;
 	}
 
+	/*
+	 * Try freeze TIDs at the beginning of the list, i.e. exclude them from
+	 * the mergesort. We can do that with TIDs before the first TID in the new
+	 * tuple we're about to add into the buffer.
+	 *
+	 * We do this incrementally when adding data into the in-memory buffer,
+	 * and not later (e.g. when hitting a memory limit), because it allows us
+	 * to skip the frozen data during the mergesort, making it cheaper.
+	 */
+
+	/*
+	 * Check if the last TID in the current list is frozen. This is the case
+	 * when merging non-overlapping lists, e.g. in each parallel worker.
+	 */
+	if ((buffer->nitems > 0) &&
+		(ItemPointerCompare(&buffer->items[buffer->nitems - 1], &tup->first) == 0))
+		buffer->nfrozen = buffer->nitems;
+
+	/*
+	 * Now search the list linearly, to find the last frozen TID. If we found
+	 * the whole list is frozen, this just does nothing.
+	 *
+	 * Start with the first not-yet-frozen tuple, and walk until we find the
+	 * first TID that's higher.
+	 *
+	 * XXX Maybe this should do a binary search if the number of "non-frozen"
+	 * items is sufficiently high (enough to make linear search slower than
+	 * binsearch).
+	 */
+	for (int i = buffer->nfrozen; i < buffer->nitems; i++)
+	{
+		/* Is the TID after the first TID of the new tuple? Can't freeze. */
+		if (ItemPointerCompare(&buffer->items[i], &tup->first) > 0)
+			break;
+
+		buffer->nfrozen++;
+	}
+
 	/* add the new TIDs into the buffer, combine using merge-sort */
 	{
 		int			nnew;
 		ItemPointer new;
 
-		new = ginMergeItemPointers(buffer->items, buffer->nitems,
+		/*
+		 * Resize the array - we do this first, because we'll dereference the
+		 * first unfrozen TID, which would fail if the array is NULL. We'll
+		 * still pass 0 as number of elements in that array though.
+		 */
+		if (buffer->items == NULL)
+			buffer->items = palloc((buffer->nitems + tup->nitems) * sizeof(ItemPointerData));
+		else
+			buffer->items = repalloc(buffer->items,
+									 (buffer->nitems + tup->nitems) * sizeof(ItemPointerData));
+
+		new = ginMergeItemPointers(&buffer->items[buffer->nfrozen], /* first unfronzen */
+								   (buffer->nitems - buffer->nfrozen),	/* num of unfrozen */
 								   items, tup->nitems, &nnew);
 
-		Assert(nnew == buffer->nitems + tup->nitems);
+		Assert(nnew == (tup->nitems + (buffer->nitems - buffer->nfrozen)));
+
+		memcpy(&buffer->items[buffer->nfrozen], new,
+			   nnew * sizeof(ItemPointerData));
 
-		if (buffer->items)
-			pfree(buffer->items);
+		pfree(new);
 
-		buffer->items = new;
-		buffer->nitems = nnew;
+		buffer->nitems += tup->nitems;
 
 		AssertCheckItemPointers(buffer, true);
 	}
@@ -1412,11 +1532,29 @@ GinBufferReset(GinBuffer *buffer)
 	buffer->category = 0;
 	buffer->keylen = 0;
 	buffer->nitems = 0;
+	buffer->nfrozen = 0;
 
 	buffer->typlen = 0;
 	buffer->typbyval = 0;
 }
 
+/*
+ * GinBufferTrim
+ *		Discard the "frozen" part of the TID list (which should have been
+ *		written to disk/index before this call).
+ */
+static void
+GinBufferTrim(GinBuffer *buffer)
+{
+	Assert((buffer->nfrozen > 0) && (buffer->nfrozen <= buffer->nitems));
+
+	memmove(&buffer->items[0], &buffer->items[buffer->nfrozen],
+			sizeof(ItemPointerData) * (buffer->nitems - buffer->nfrozen));
+
+	buffer->nitems -= buffer->nfrozen;
+	buffer->nfrozen = 0;
+}
+
 /*
  * GinBufferFree
  *		Release memory associated with the GinBuffer (including TID array).
@@ -1484,7 +1622,12 @@ _gin_parallel_merge(GinBuildState *state)
 	/* do the actual sort in the leader */
 	tuplesort_performsort(state->bs_sortstate);
 
-	/* initialize buffer to combine entries for the same key */
+	/*
+	 * Initialize buffer to combine entries for the same key.
+	 *
+	 * The leader is allowed to use the whole maintenance_work_mem buffer to
+	 * combine data. The parallel workers already completed.
+	 */
 	buffer = GinBufferInit(state->ginstate.index);
 
 	/*
@@ -1526,6 +1669,34 @@ _gin_parallel_merge(GinBuildState *state)
 			GinBufferReset(buffer);
 		}
 
+		/*
+		 * We're about to add a GIN tuple to the buffer - check the memory
+		 * limit first, and maybe write out some of the data into the index
+		 * first, if needed (and possible). We only flush the part of the TID
+		 * list that we know won't change, and only if there's enough data for
+		 * compression to work well.
+		 */
+		if (GinBufferShouldTrim(buffer, tup))
+		{
+			Assert(buffer->nfrozen > 0);
+
+			state->buildStats.nTrims++;
+
+			/*
+			 * Buffer is not empty and it's storing a different key - flush
+			 * the data into the insert, and start a new entry for current
+			 * GinTuple.
+			 */
+			AssertCheckItemPointers(buffer, true);
+
+			ginEntryInsert(&state->ginstate,
+						   buffer->attnum, buffer->key, buffer->category,
+						   buffer->items, buffer->nfrozen, &state->buildStats);
+
+			/* truncate the data we've just discarded */
+			GinBufferTrim(buffer);
+		}
+
 		/*
 		 * Remember data for the current tuple (either remember the new key,
 		 * or append if to the existing data).
@@ -1549,6 +1720,8 @@ _gin_parallel_merge(GinBuildState *state)
 	/* relase all the memory */
 	GinBufferFree(buffer);
 
+	elog(LOG, "_gin_parallel_merge ntrims " INT64_FORMAT, state->buildStats.nTrims);
+
 	tuplesort_end(state->bs_sortstate);
 
 	return reltuples;
@@ -1609,7 +1782,13 @@ _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort)
 
 	GinBuffer  *buffer;
 
-	/* initialize buffer to combine entries for the same key */
+	/*
+	 * Initialize buffer to combine entries for the same key.
+	 *
+	 * The workers are limited to the same amount of memory as during the sort
+	 * in ginBuildCallbackParallel. But this probably should be the 32MB used
+	 * during planning, just like there.
+	 */
 	buffer = GinBufferInit(state->ginstate.index);
 
 	/* sort the raw per-worker data */
@@ -1662,6 +1841,41 @@ _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort)
 			GinBufferReset(buffer);
 		}
 
+		/*
+		 * We're about to add a GIN tuple to the buffer - check the memory
+		 * limit first, and maybe write out some of the data into the index
+		 * first, if needed (and possible). We only flush the part of the TID
+		 * list that we know won't change, and only if there's enough data for
+		 * compression to work well.
+		 */
+		if (GinBufferShouldTrim(buffer, tup))
+		{
+			GinTuple   *ntup;
+			Size		ntuplen;
+
+			Assert(buffer->nfrozen > 0);
+
+			state->buildStats.nTrims++;
+
+			/*
+			 * Buffer is not empty and it's storing a different key - flush
+			 * the data into the insert, and start a new entry for current
+			 * GinTuple.
+			 */
+			AssertCheckItemPointers(buffer, true);
+
+			ntup = _gin_build_tuple(state, buffer->attnum, buffer->category,
+									buffer->key, buffer->typlen, buffer->typbyval,
+									buffer->items, buffer->nfrozen, &ntuplen);
+
+			tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen);
+
+			pfree(ntup);
+
+			/* truncate the data we've just discarded */
+			GinBufferTrim(buffer);
+		}
+
 		/*
 		 * Remember data for the current tuple (either remember the new key,
 		 * or append if to the existing data).
@@ -1697,6 +1911,8 @@ _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort)
 		 state->buildStats.sizeRaw, state->buildStats.sizeCompressed,
 		 (100.0 * state->buildStats.sizeCompressed) / state->buildStats.sizeRaw);
 
+	elog(LOG, "_gin_process_worker_data trims " INT64_FORMAT, state->buildStats.nTrims);
+
 	tuplesort_end(worker_sort);
 }
 
diff --git a/src/include/access/gin.h b/src/include/access/gin.h
index 2b6633d068a..9381329fac5 100644
--- a/src/include/access/gin.h
+++ b/src/include/access/gin.h
@@ -51,6 +51,7 @@ typedef struct GinStatsData
 	int32		ginVersion;
 	Size		sizeRaw;
 	Size		sizeCompressed;
+	int64		nTrims;
 } GinStatsData;
 
 /*
-- 
2.45.2

