From e08b66cc678ee013b7c5bb1acf26c5287a71f909 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Thu, 2 May 2024 15:21:49 +0200
Subject: [PATCH v20240619 10/11] Enforce memory limit when combining tuples

When combinnig intermediate results during parallel GIN index build, we
want to restrict the memory usage. In ginBuildCallbackParallel() this is
done simply by dumping working state into tuplesort after hitting the
memory limit.

This commit introduces memory limit to the following steps, merging the
intermediate results in both worker and leader. The merge only deals
with one key at a time, and the primary risk is the key might have too
many different TIDs. This is not very likely, because the TID array only
needs 6B per item, it's a potential issue.

We can't simply dump the whole current TID list - the index requires the
TID values to be inserted in the correct order, but if the lists overlap
(as they do between workers), the tail of the list may change during the
mergesort. But thanks to sorting GIN tuples by first TID, we can derive
a safe TID horizon - we know no future tuples will have TIDs from before
this value, so it's safe to output this part of the list.

This commit tracks "frozen" part of the the TID list, which is the part
we know won't change after merging additional TID lists. Then if the TID
list grows too large (more than 64kB), we try to trim it - write out the
frozen part of the list, and discard it from the buffer. We only do the
trimming if there's at least 1024 frozen items - we don't want to write
the data into the index in tiny chunks.

The freezing also allows us to skip the frozen part during mergesort.
The frozen part of the list is known to be fully sorted, so we can just
skip it and mergesort only the rest of the data.

Note: These limites (1024 and 64kB) are mostly arbitrary - but seem high
enough to get good efficiency for compression/batching, but low enough
to release memory early and work in small increments.
---
 src/backend/access/gin/gininsert.c | 245 +++++++++++++++++++++++++++--
 src/include/access/gin.h           |   1 +
 2 files changed, 237 insertions(+), 9 deletions(-)

diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index 47007aa63b4..8d16e484093 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -1130,8 +1130,12 @@ typedef struct GinBuffer
 	int16		typlen;
 	bool		typbyval;
 
+	/* Number of TIDs to collect before attempt to write some out. */
+	int			maxitems;
+
 	/* array of TID values */
 	int			nitems;
+	int			nfrozen;
 	ItemPointerData *items;
 } GinBuffer;
 
@@ -1166,7 +1170,21 @@ AssertCheckItemPointers(GinBuffer *buffer, bool sorted)
 static GinBuffer *
 GinBufferInit(void)
 {
-	return palloc0(sizeof(GinBuffer));
+	GinBuffer  *buffer = (GinBuffer *) palloc0(sizeof(GinBuffer));
+
+	/*
+	 * How many items can we fit into the memory limit? 64kB seems more than
+	 * enough and we don't want a limit that's too high. OTOH maybe this
+	 * should be tied to maintenance_work_mem or something like that?
+	 *
+	 * XXX This is not enough to prevent repeated merges after a wraparound,
+	 * but it should be enough to make the merges cheap because it quickly
+	 * finds reaches the end of the second list and can just memcpy the rest
+	 * without walking it item by item.
+	 */
+	buffer->maxitems = (64 * 1024L) / sizeof(ItemPointerData);
+
+	return buffer;
 }
 
 static bool
@@ -1221,6 +1239,54 @@ GinBufferKeyEquals(GinBuffer *buffer, GinTuple *tup)
 	return (memcmp(tup->data, DatumGetPointer(buffer->key), buffer->keylen) == 0);
 }
 
+/*
+ * GinBufferShouldTrim
+ *		Should we trim the list of item pointers?
+ *
+ * By trimming we understand writing out and removing the tuple IDs that
+ * we know can't change by future merges. We can deduce the TID up to which
+ * this is guaranteed from the "first" TID in each GIN tuple, which provides
+ * a "horizon" (for a given key) thanks to the sort.
+ *
+ * We don't want to do this too often - compressing longer TID lists is more
+ * efficient. But we also don't want to accumulate too many TIDs, for two
+ * reasons. First, it consumes memory and we might exceed maintenance_work_mem
+ * (or whatever limit applies), even if that's unlikely because TIDs are very
+ * small so we can fit a lot of them. Second, and more importantly, long TID
+ * lists are an issue if the scan wraps around, because a key may get a very
+ * wide list (with min/max TID for that key), forcing "full" mergesorts for
+ * every list merged into it (instead of the efficient append).
+ *
+ * So we look at two things when deciding if to trim - if the resulting list
+ * (after adding TIDs from the new tuple) would be too long, and if there is
+ * enough TIDs to trim (with values less than "first" TID from the new tuple),
+ * we do the trim. By enough we mean at least 128 TIDs (mostly an arbitrary
+ * number).
+ *
+ * XXX This does help for the wrap around case, because the "wide" TID list
+ * is essentially two ranges - one at the beginning of the table, one at the
+ * end. And all the other ranges (from GIN tuples) come in between, and also
+ * do not overlap. So by trimming up to the range we're about to add, this
+ * guarantees we'll be able to "concatenate" the two lists cheaply.
+ */
+static bool
+GinBufferShouldTrim(GinBuffer *buffer, GinTuple *tup)
+{
+	/* not enough TIDs to trim (1024 is somewhat arbitrary number) */
+	if (buffer->nfrozen < 1024)
+		return false;
+
+	/* We're not to hit the memory limit after adding this tuple. */
+	if ((buffer->nitems + tup->nitems) < buffer->maxitems)
+		return false;
+
+	/*
+	 * OK, we have enough frozen TIDs to flush, and we have hit the memory
+	 * limit, so it's time to write it out.
+	 */
+	return true;
+}
+
 /*
  * GinBufferStoreTuple
  *		Add data from a GinTuple into the GinBuffer.
@@ -1259,6 +1325,11 @@ GinBufferKeyEquals(GinBuffer *buffer, GinTuple *tup)
  * once, so there's going to be only such wide list, and it'll be sorted
  * first (because it has the lowest TID for the key). So we'd do this at
  * most once per key.
+ *
+ * XXX Maybe we could/should allocate the buffer once and then keep it
+ * without palloc/pfree. That won't help when just calling the mergesort,
+ * as that does palloc internally, but if we detected the append case,
+ * we could do without it. Not sure how much overhead it is, though.
  */
 static void
 GinBufferStoreTuple(GinBuffer *buffer, GinTuple *tup)
@@ -1287,26 +1358,82 @@ GinBufferStoreTuple(GinBuffer *buffer, GinTuple *tup)
 			buffer->key = (Datum) 0;
 	}
 
+	/*
+	 * Try freeze TIDs at the beginning of the list, i.e. exclude them from
+	 * the mergesort. We can do that with TIDs before the first TID in the new
+	 * tuple we're about to add into the buffer.
+	 *
+	 * We do this incrementally when adding data into the in-memory buffer,
+	 * and not later (e.g. when hitting a memory limit), because it allows us
+	 * to skip the frozen data during the mergesort, making it cheaper.
+	 */
+
+	/*
+	 * Check if the last TID in the current list is frozen. This is the case
+	 * when merging non-overlapping lists, e.g. in each parallel worker.
+	 */
+	if ((buffer->nitems > 0) &&
+		(ItemPointerCompare(&buffer->items[buffer->nitems - 1], &tup->first) == 0))
+		buffer->nfrozen = buffer->nitems;
+
+	/*
+	 * Now search the list linearly, to find the last frozen TID. If we found
+	 * the whole list is frozen, this just does nothing.
+	 *
+	 * Start with the first not-yet-frozen tuple, and walk until we find the
+	 * first TID that's higher.
+	 *
+	 * XXX Maybe this should do a binary search if the number of "non-frozen"
+	 * items is sufficiently high (enough to make linear search slower than
+	 * binsearch).
+	 */
+	for (int i = buffer->nfrozen; i < buffer->nitems; i++)
+	{
+		/* Is the TID after the first TID of the new tuple? Can't freeze. */
+		if (ItemPointerCompare(&buffer->items[i], &tup->first) > 0)
+			break;
+
+		buffer->nfrozen++;
+	}
+
 	/*
 	 * Copy the new TIDs into the buffer, combine with existing data (if any)
 	 * using merge-sort. The mergesort is already smart about cases where it
 	 * can simply concatenate the two lists, and when it actually needs to
 	 * merge the data in an expensive way.
+	 *
+	 * XXX We could check if (buffer->nitems > buffer->nfrozen) and only do
+	 * the mergesort in that case. ginMergeItemPointers does some palloc
+	 * internally, and this way we could eliminate that. But let's keep the
+	 * code simple for now.
 	 */
 	{
 		int			nnew;
 		ItemPointer new;
 
-		new = ginMergeItemPointers(buffer->items, buffer->nitems,
+		/*
+		 * Resize the array - we do this first, because we'll dereference the
+		 * first unfrozen TID, which would fail if the array is NULL. We'll
+		 * still pass 0 as number of elements in that array though.
+		 */
+		if (buffer->items == NULL)
+			buffer->items = palloc((buffer->nitems + tup->nitems) * sizeof(ItemPointerData));
+		else
+			buffer->items = repalloc(buffer->items,
+									 (buffer->nitems + tup->nitems) * sizeof(ItemPointerData));
+
+		new = ginMergeItemPointers(&buffer->items[buffer->nfrozen], /* first unfronzen */
+								   (buffer->nitems - buffer->nfrozen),	/* num of unfrozen */
 								   items, tup->nitems, &nnew);
 
-		Assert(nnew == buffer->nitems + tup->nitems);
+		Assert(nnew == (tup->nitems + (buffer->nitems - buffer->nfrozen)));
+
+		memcpy(&buffer->items[buffer->nfrozen], new,
+			   nnew * sizeof(ItemPointerData));
 
-		if (buffer->items)
-			pfree(buffer->items);
+		pfree(new);
 
-		buffer->items = new;
-		buffer->nitems = nnew;
+		buffer->nitems += tup->nitems;
 
 		AssertCheckItemPointers(buffer, true);
 	}
@@ -1332,6 +1459,7 @@ GinBufferReset(GinBuffer *buffer)
 	buffer->category = 0;
 	buffer->keylen = 0;
 	buffer->nitems = 0;
+	buffer->nfrozen = 0;
 
 	buffer->typlen = 0;
 	buffer->typbyval = 0;
@@ -1344,6 +1472,23 @@ GinBufferReset(GinBuffer *buffer)
 	/* XXX should do something with extremely large array of items? */
 }
 
+/*
+ * GinBufferTrim
+ *		Discard the "frozen" part of the TID list (which should have been
+ *		written to disk/index before this call).
+ */
+static void
+GinBufferTrim(GinBuffer *buffer)
+{
+	Assert((buffer->nfrozen > 0) && (buffer->nfrozen <= buffer->nitems));
+
+	memmove(&buffer->items[0], &buffer->items[buffer->nfrozen],
+			sizeof(ItemPointerData) * (buffer->nitems - buffer->nfrozen));
+
+	buffer->nitems -= buffer->nfrozen;
+	buffer->nfrozen = 0;
+}
+
 /* XXX probably would be better to have a memory context for the buffer */
 static void
 GinBufferFree(GinBuffer *buffer)
@@ -1402,7 +1547,12 @@ _gin_parallel_merge(GinBuildState *state)
 	/* do the actual sort in the leader */
 	tuplesort_performsort(state->bs_sortstate);
 
-	/* initialize buffer to combine entries for the same key */
+	/*
+	 * Initialize buffer to combine entries for the same key.
+	 *
+	 * The leader is allowed to use the whole maintenance_work_mem buffer to
+	 * combine data. The parallel workers already completed.
+	 */
 	buffer = GinBufferInit();
 
 	/*
@@ -1442,6 +1592,36 @@ _gin_parallel_merge(GinBuildState *state)
 			GinBufferReset(buffer);
 		}
 
+		/*
+		 * We're about to add a GIN tuple to the buffer - check the memory
+		 * limit first, and maybe write out some of the data into the index
+		 * first, if needed (and possible). We only flush the part of the TID
+		 * list that we know won't change, and only if there's enough data for
+		 * compression to work well.
+		 *
+		 * XXX The buffer may also be empty, but in that case we skip this.
+		 */
+		if (GinBufferShouldTrim(buffer, tup))
+		{
+			Assert(buffer->nfrozen > 0);
+
+			state->buildStats.nTrims++;
+
+			/*
+			 * Buffer is not empty and it's storing a different key - flush
+			 * the data into the insert, and start a new entry for current
+			 * GinTuple.
+			 */
+			AssertCheckItemPointers(buffer, true);
+
+			ginEntryInsert(&state->ginstate,
+						   buffer->attnum, buffer->key, buffer->category,
+						   buffer->items, buffer->nfrozen, &state->buildStats);
+
+			/* truncate the data we've just discarded */
+			GinBufferTrim(buffer);
+		}
+
 		/*
 		 * Remember data for the current tuple (either remember the new key,
 		 * or append if to the existing data).
@@ -1465,6 +1645,8 @@ _gin_parallel_merge(GinBuildState *state)
 	/* relase all the memory */
 	GinBufferFree(buffer);
 
+	elog(LOG, "_gin_parallel_merge ntrims " INT64_FORMAT, state->buildStats.nTrims);
+
 	tuplesort_end(state->bs_sortstate);
 
 	return reltuples;
@@ -1525,7 +1707,13 @@ _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort)
 
 	GinBuffer  *buffer;
 
-	/* initialize buffer to combine entries for the same key */
+	/*
+	 * Initialize buffer to combine entries for the same key.
+	 *
+	 * The workers are limited to the same amount of memory as during the sort
+	 * in ginBuildCallbackParallel. But this probably should be the 32MB used
+	 * during planning, just like there.
+	 */
 	buffer = GinBufferInit();
 
 	/* sort the raw per-worker data */
@@ -1578,6 +1766,43 @@ _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort)
 			GinBufferReset(buffer);
 		}
 
+		/*
+		 * We're about to add a GIN tuple to the buffer - check the memory
+		 * limit first, and maybe write out some of the data into the index
+		 * first, if needed (and possible). We only flush the part of the TID
+		 * list that we know won't change, and only if there's enough data for
+		 * compression to work well.
+		 *
+		 * XXX The buffer may also be empty, but in that case we skip this.
+		 */
+		if (GinBufferShouldTrim(buffer, tup))
+		{
+			GinTuple   *ntup;
+			Size		ntuplen;
+
+			Assert(buffer->nfrozen > 0);
+
+			state->buildStats.nTrims++;
+
+			/*
+			 * Buffer is not empty and it's storing a different key - flush
+			 * the data into the insert, and start a new entry for current
+			 * GinTuple.
+			 */
+			AssertCheckItemPointers(buffer, true);
+
+			ntup = _gin_build_tuple(state, buffer->attnum, buffer->category,
+									buffer->key, buffer->typlen, buffer->typbyval,
+									buffer->items, buffer->nfrozen, &ntuplen);
+
+			tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen);
+
+			pfree(ntup);
+
+			/* truncate the data we've just discarded */
+			GinBufferTrim(buffer);
+		}
+
 		/*
 		 * Remember data for the current tuple (either remember the new key,
 		 * or append if to the existing data).
@@ -1613,6 +1838,8 @@ _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort)
 		 state->buildStats.sizeRaw, state->buildStats.sizeCompressed,
 		 (100.0 * state->buildStats.sizeCompressed) / state->buildStats.sizeRaw);
 
+	elog(LOG, "_gin_process_worker_data trims " INT64_FORMAT, state->buildStats.nTrims);
+
 	tuplesort_end(worker_sort);
 }
 
diff --git a/src/include/access/gin.h b/src/include/access/gin.h
index 2b6633d068a..9381329fac5 100644
--- a/src/include/access/gin.h
+++ b/src/include/access/gin.h
@@ -51,6 +51,7 @@ typedef struct GinStatsData
 	int32		ginVersion;
 	Size		sizeRaw;
 	Size		sizeCompressed;
+	int64		nTrims;
 } GinStatsData;
 
 /*
-- 
2.45.2

