From 1a8851891f8c3e7e760aa3a6f21ff2e5467f5f59 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Thu, 20 Jun 2024 20:50:51 +0200
Subject: [PATCH v20240624 7/7] Detect wrap around in parallel callback

When sync scan during index build wraps around, that may result in some
keys having very long TID lists, requiring "full" merge sort runs when
combining data in workers. It also causes problems with enforcing memory
limit, because we can't just dump the data - the index build requires
append-only posting lists, and violating may result in errors like

  ERROR: could not split GIN page; all old items didn't fit

because after the scan wrap around some of the TIDs may belong to the
beginning of the list, affecting the compression.

But we can deal with this in the callback - if we see the TID to jump
back, that must mean a wraparound happened. In that case we simply dump
all the data accumulated in memory, and start from scratch.

This means there won't be any tuples with very wide TID ranges, instead
there'll be one tuple with a range at the end of the table, and another
tuple at the beginning. And all the lists in the worker will be
non-overlapping, and sort nicely based on first TID.

For the leader, we still need to do the full merge - the lists may
overlap and interleave in various ways. But there should be only very
few of those lists, about one per worker, making it not an issue.
---
 src/backend/access/gin/gininsert.c | 132 ++++++++++++++---------------
 1 file changed, 63 insertions(+), 69 deletions(-)

diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index cc380f03593..4483eedcbe2 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -143,6 +143,7 @@ typedef struct
 	MemoryContext tmpCtx;
 	MemoryContext funcCtx;
 	BuildAccumulator accum;
+	ItemPointerData tid;
 
 	/* FIXME likely duplicate with indtuples */
 	double		bs_numtuples;
@@ -474,6 +475,47 @@ ginBuildCallback(Relation index, ItemPointer tid, Datum *values,
 	MemoryContextSwitchTo(oldCtx);
 }
 
+/*
+ * ginFlushBuildState
+ *		Write all data from BuildAccumulator into the tuplesort.
+ */
+static void
+ginFlushBuildState(GinBuildState *buildstate, Relation index)
+{
+	ItemPointerData *list;
+	Datum		key;
+	GinNullCategory category;
+	uint32		nlist;
+	OffsetNumber attnum;
+	TupleDesc	tdesc = RelationGetDescr(index);
+
+	ginBeginBAScan(&buildstate->accum);
+	while ((list = ginGetBAEntry(&buildstate->accum,
+								 &attnum, &key, &category, &nlist)) != NULL)
+	{
+		/* information about the key */
+		Form_pg_attribute attr = TupleDescAttr(tdesc, (attnum - 1));
+
+		/* GIN tuple and tuple length */
+		GinTuple   *tup;
+		Size		tuplen;
+
+		/* there could be many entries, so be willing to abort here */
+		CHECK_FOR_INTERRUPTS();
+
+		tup = _gin_build_tuple(buildstate, attnum, category,
+							   key, attr->attlen, attr->attbyval,
+							   list, nlist, &tuplen);
+
+		tuplesort_putgintuple(buildstate->bs_worker_sort, tup, tuplen);
+
+		pfree(tup);
+	}
+
+	MemoryContextReset(buildstate->tmpCtx);
+	ginInitBA(&buildstate->accum);
+}
+
 /*
  * ginBuildCallbackParallel
  *		Callback for the parallel index build.
@@ -498,6 +540,11 @@ ginBuildCallback(Relation index, ItemPointer tid, Datum *values,
  * The disadvantage is increased disk space usage, possibly up to 2x, if
  * no entries get combined at the worker level.
  *
+ * To detect a wraparound (which can happen with sync scans), we remember the
+ * last TID seen by each worker - if the next TID seen by the worker is lower,
+ * the scan must have wrapped around. We handle that by flushing the current
+ * buildstate to the tuplesort, so that we don't end up with wide TID lists.
+ *
  * XXX It would be possible to partition the data into multiple tuplesorts
  * per worker (by hashing) - we don't need the data produced by workers
  * to be perfectly sorted, and we could even live with multiple entries
@@ -514,6 +561,16 @@ ginBuildCallbackParallel(Relation index, ItemPointer tid, Datum *values,
 
 	oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
 
+	/* scan wrapped around - flush accumulated entries and start anew */
+	if (ItemPointerCompare(tid, &buildstate->tid) < 0)
+	{
+		elog(LOG, "calling ginFlushBuildState");
+		ginFlushBuildState(buildstate, index);
+	}
+
+	/* remember the TID we're about to process */
+	memcpy(&buildstate->tid, tid, sizeof(ItemPointerData));
+
 	for (i = 0; i < buildstate->ginstate.origTupdesc->natts; i++)
 		ginHeapTupleBulkInsert(buildstate, (OffsetNumber) (i + 1),
 							   values[i], isnull[i], tid);
@@ -532,40 +589,7 @@ ginBuildCallbackParallel(Relation index, ItemPointer tid, Datum *values,
 	 * maintenance command.
 	 */
 	if (buildstate->accum.allocatedMemory >= (Size) work_mem * 1024L)
-	{
-		ItemPointerData *list;
-		Datum		key;
-		GinNullCategory category;
-		uint32		nlist;
-		OffsetNumber attnum;
-		TupleDesc	tdesc = RelationGetDescr(index);
-
-		ginBeginBAScan(&buildstate->accum);
-		while ((list = ginGetBAEntry(&buildstate->accum,
-									 &attnum, &key, &category, &nlist)) != NULL)
-		{
-			/* information about the index key */
-			Form_pg_attribute attr = TupleDescAttr(tdesc, (attnum - 1));
-
-			/* GIN tuple and tuple length that we'll use for tuplesort */
-			GinTuple   *tup;
-			Size		tuplen;
-
-			/* there could be many entries, so be willing to abort here */
-			CHECK_FOR_INTERRUPTS();
-
-			tup = _gin_build_tuple(buildstate, attnum, category,
-								   key, attr->attlen, attr->attbyval,
-								   list, nlist, &tuplen);
-
-			tuplesort_putgintuple(buildstate->bs_worker_sort, tup, tuplen);
-
-			pfree(tup);
-		}
-
-		MemoryContextReset(buildstate->tmpCtx);
-		ginInitBA(&buildstate->accum);
-	}
+		ginFlushBuildState(buildstate, index);
 
 	MemoryContextSwitchTo(oldCtx);
 }
@@ -602,6 +626,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	buildstate.bs_numtuples = 0;
 	buildstate.bs_reltuples = 0;
 	buildstate.bs_leader = NULL;
+	memset(&buildstate.tid, 0, sizeof(ItemPointerData));
 
 	/* initialize the meta page */
 	MetaBuffer = GinNewBuffer(index);
@@ -1231,8 +1256,8 @@ GinBufferInit(Relation index)
 	 * with too many TIDs. and 64kB seems more than enough. But maybe this
 	 * should be tied to maintenance_work_mem or something like that?
 	 *
-	 * XXX This is not enough to prevent repeated merges after a wraparound
-	 * of the parallel scan, but it should be enough to make the merges cheap
+	 * XXX This is not enough to prevent repeated merges after a wraparound of
+	 * the parallel scan, but it should be enough to make the merges cheap
 	 * because it quickly reaches the end of the second list and can just
 	 * memcpy the rest without walking it item by item.
 	 */
@@ -1964,39 +1989,7 @@ _gin_parallel_scan_and_build(GinBuildState *state,
 									   ginBuildCallbackParallel, state, scan);
 
 	/* write remaining accumulated entries */
-	{
-		ItemPointerData *list;
-		Datum		key;
-		GinNullCategory category;
-		uint32		nlist;
-		OffsetNumber attnum;
-		TupleDesc	tdesc = RelationGetDescr(index);
-
-		ginBeginBAScan(&state->accum);
-		while ((list = ginGetBAEntry(&state->accum,
-									 &attnum, &key, &category, &nlist)) != NULL)
-		{
-			/* information about the key */
-			Form_pg_attribute attr = TupleDescAttr(tdesc, (attnum - 1));
-
-			GinTuple   *tup;
-			Size		len;
-
-			/* there could be many entries, so be willing to abort here */
-			CHECK_FOR_INTERRUPTS();
-
-			tup = _gin_build_tuple(state, attnum, category,
-								   key, attr->attlen, attr->attbyval,
-								   list, nlist, &len);
-
-			tuplesort_putgintuple(state->bs_worker_sort, tup, len);
-
-			pfree(tup);
-		}
-
-		MemoryContextReset(state->tmpCtx);
-		ginInitBA(&state->accum);
-	}
+	ginFlushBuildState(state, index);
 
 	/*
 	 * Do the first phase of in-worker processing - sort the data produced by
@@ -2081,6 +2074,7 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
 	buildstate.indtuples = 0;
 	/* XXX Shouldn't this initialize the other fields too, like ginbuild()? */
 	memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
+	memset(&buildstate.tid, 0, sizeof(ItemPointerData));
 
 	/*
 	 * create a temporary memory context that is used to hold data not yet
-- 
2.45.2

