From 27949647f968fc7914a48ce9c4dae9462c2b7707 Mon Sep 17 00:00:00 2001
From: Andy Fan <zhihuifan1213@163.com>
Date: Tue, 2 Jul 2024 07:40:00 +0800
Subject: [PATCH v20240702 3/3] optimize some sorts on tuplesort.c if the input
 is sorted.

add input_presorted member in Tuplesortstate to indicate the tuples is
sorted already, it can be 'partially' sorted or 'overall' sorted.

Within input_presorted is set, we can remove the sorts during the
tuplesort_puttuple_common when the memory is full and continue to reuse
the previous 'runs' in the current tape on behalf of mergeruns do less
work, unless caller tells tuplesort.c to puttuple into the next run,
this is the user case where the inputs are just presorted in some
different batches, the side impacts is the number of 'runs' is not
decided by work_mem but decided by users calls.

I also use this optimization to Gin index parallel build, at the stage
of 'bs_worker_sort' -> 'bs_sort_state' where all the inputs are
sorted. so the 'runs' can be reduced to 1 for sure. However the
improvements are not measurable (the sort is too cheap in this
case, or the reduce of 'runs' isn't helpful due to my test case?)
---
 src/backend/access/gin/gininsert.c         |  41 +++++++--
 src/backend/utils/sort/tuplesort.c         | 102 ++++++++++++++++++---
 src/backend/utils/sort/tuplesortvariants.c |   6 +-
 src/include/utils/tuplesort.h              |   9 +-
 4 files changed, 132 insertions(+), 26 deletions(-)

diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index 29bca0c54c..df469e3d9e 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -718,7 +718,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 		 */
 		state->bs_sortstate =
 			tuplesort_begin_index_gin(maintenance_work_mem, coordinate,
-									  TUPLESORT_NONE);
+									  TUPLESORT_NONE, false);
 
 		/* scan the relation and merge per-worker results */
 		reltuples = _gin_parallel_merge(state);
@@ -1743,7 +1743,9 @@ _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort)
 	buffer = GinBufferInit();
 
 	/* sort the raw per-worker data */
+	elog(LOG, "tuplesort_performsort(state->bs_worker_sort); started");
 	tuplesort_performsort(state->bs_worker_sort);
+	elog(LOG, "tuplesort_performsort(state->bs_worker_sort); done");
 
 	/* print some basic info */
 	elog(LOG, "_gin_parallel_scan_and_build raw %zu compressed %zu ratio %.2f%%",
@@ -1754,6 +1756,8 @@ _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort)
 	state->buildStats.sizeCompressed = 0;
 	state->buildStats.sizeRaw = 0;
 
+	elog(LOG, "start to fill to bs_statesort");
+
 	/*
 	 * Read the GIN tuples from the shared tuplesort, sorted by the key, and
 	 * merge them into larger chunks for the leader to combine.
@@ -1854,6 +1858,8 @@ _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort)
 		GinBufferReset(buffer);
 	}
 
+	elog(LOG, "finish to fill to bs_sortstate");
+
 	/* relase all the memory */
 	GinBufferFree(buffer);
 
@@ -1894,13 +1900,21 @@ _gin_parallel_scan_and_build(GinBuildState *state,
 	coordinate->nParticipants = -1;
 	coordinate->sharedsort = sharedsort;
 
-	/* Begin "partial" tuplesort */
+	/*
+	 * Begin "partial" tuplesort, the input tuples come from RBTree, so they
+	 * are pre-sorted in batch, however since the batch is too small, we will
+	 * think it is not pre-sorted and let tuplesort_state sort the multi
+	 * batches and ..;
+	 */
 	state->bs_sortstate = tuplesort_begin_index_gin(sortmem, coordinate,
-													TUPLESORT_NONE);
+													TUPLESORT_NONE, true);
 
-	/* Local per-worker sort of raw-data */
+	/*
+	 * Local per-worker sort of raw-data, the input tuples come from
+	 * bs_sortstate, so all the tuples are presorted.
+	 */
 	state->bs_worker_sort = tuplesort_begin_index_gin(sortmem, NULL,
-													  TUPLESORT_NONE);
+													  TUPLESORT_NONE, false);
 
 	/* Join parallel scan */
 	indexInfo = BuildIndexInfo(index);
@@ -1909,6 +1923,7 @@ _gin_parallel_scan_and_build(GinBuildState *state,
 	scan = table_beginscan_parallel(heap,
 									ParallelTableScanFromGinShared(ginshared));
 
+	elog(LOG, "start to fill into bs_worker_start");
 	reltuples = table_index_build_scan(heap, index, indexInfo, true, progress,
 									   ginBuildCallbackParallel, state, scan);
 
@@ -1948,6 +1963,8 @@ _gin_parallel_scan_and_build(GinBuildState *state,
 		ginInitBA(&state->accum);
 	}
 
+	elog(LOG, "end to fill into bs_worker_start");
+
 	/*
 	 * Do the first phase of in-worker processing - sort the data produced by
 	 * the callback, and combine them into much larger chunks and place that
@@ -1955,8 +1972,18 @@ _gin_parallel_scan_and_build(GinBuildState *state,
 	 */
 	_gin_process_worker_data(state, state->bs_worker_sort);
 
-	/* sort the GIN tuples built by this worker */
-	tuplesort_performsort(state->bs_sortstate);
+	elog(LOG, "start to sort bs_sortstate");
+
+	/*
+	 * the tuple is sorted already in bs_worker_sort, so let's dump the left
+	 * tuples into tapes, no sort is needed.
+	 */
+	tuplesort_dump_sortedtuples(state->bs_sortstate);
+
+	/* mark the worker has finished its work. */
+	worker_freeze_result_tape(state->bs_sortstate);
+
+	elog(LOG, "end to sort bs_sortstate");
 
 	state->bs_reltuples += reltuples;
 
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 7c4d6dc106..36e0a77b8d 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -187,6 +187,9 @@ struct Tuplesortstate
 {
 	TuplesortPublic base;
 	TupSortStatus status;		/* enumerated value as shown above */
+	bool		input_presorted;	/* if the input presorted . */
+	bool		new_tapes;		/* force to selectnewtapes, used with
+								 * input_presorted, see dumptumples */
 	bool		bounded;		/* did caller specify a maximum number of
 								 * tuples to return? */
 	bool		boundUsed;		/* true if we made use of a bounded heap */
@@ -475,7 +478,6 @@ static void reversedirection(Tuplesortstate *state);
 static unsigned int getlen(LogicalTape *tape, bool eofOK);
 static void markrunend(LogicalTape *tape);
 static int	worker_get_identifier(Tuplesortstate *state);
-static void worker_freeze_result_tape(Tuplesortstate *state);
 static void worker_nomergeruns(Tuplesortstate *state);
 static void leader_takeover_tapes(Tuplesortstate *state);
 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
@@ -643,6 +645,12 @@ qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
 
 Tuplesortstate *
 tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
+{
+	return tuplesort_begin_common_ext(workMem, coordinate, sortopt, false);
+}
+
+Tuplesortstate *
+tuplesort_begin_common_ext(int workMem, SortCoordinate coordinate, int sortopt, bool presorted)
 {
 	Tuplesortstate *state;
 	MemoryContext maincontext;
@@ -742,6 +750,8 @@ tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
 	}
 
 	MemoryContextSwitchTo(oldcontext);
+	state->input_presorted = presorted;
+	state->new_tapes = true;
 
 	return state;
 }
@@ -1846,6 +1856,36 @@ tuplesort_merge_order(int64 allowedMem)
 	return mOrder;
 }
 
+/*
+ * Dump the presorted in-memory tuples into tapes and let next batch
+ * of sorted tuples dump to a new tape.
+ */
+void
+tuplesort_dump_sortedtuples(Tuplesortstate *state)
+{
+	MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
+
+	if (state->tapeset == NULL)
+	{
+		inittapes(state, true);
+	}
+
+	dumptuples(state, true);
+
+	/* add a end-mark for this run. */
+	markrunend(state->destTape);
+
+	/* When dumptuples for the next batch, we need a new_tapes. */
+	state->new_tapes = true;
+
+	/*
+	 * record the result_tape for the sake of worker_freeze_result_tape. where
+	 * 'LogicalTapeFreeze(state->result_tape, &output);' is called.
+	 */
+	state->result_tape = state->outputTapes[0];
+	MemoryContextSwitchTo(oldcontext);
+}
+
 /*
  * Helper function to calculate how much memory to allocate for the read buffer
  * of each input tape in a merge pass.
@@ -2105,6 +2145,7 @@ mergeruns(Tuplesortstate *state)
 	 * don't bother.  (The initial input tapes are still in outputTapes.  The
 	 * number of input tapes will not increase between passes.)
 	 */
+	elog(INFO, "number of runs %d ", state->currentRun);
 	state->memtupsize = state->nOutputTapes;
 	state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
 														state->nOutputTapes * sizeof(SortTuple));
@@ -2334,6 +2375,10 @@ mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
  *
  * When alltuples = true, dump everything currently in memory.  (This case is
  * only used at end of input data.)
+ *
+ * When input_presorted = true and new_tapes=false, dump everything to the
+ * existing tape (rather than select a new tap) to order to reduce the number
+ * of tapes & runs;
  */
 static void
 dumptuples(Tuplesortstate *state, bool alltuples)
@@ -2372,23 +2417,41 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 				 errmsg("cannot have more than %d runs for an external sort",
 						INT_MAX)));
 
-	if (state->currentRun > 0)
-		selectnewtape(state);
+	if (!state->input_presorted)
+	{
+		if (state->currentRun > 0)
+			selectnewtape(state);
 
-	state->currentRun++;
+		state->currentRun++;
 
 #ifdef TRACE_SORT
-	if (trace_sort)
-		elog(LOG, "worker %d starting quicksort of run %d: %s",
-			 state->worker, state->currentRun,
-			 pg_rusage_show(&state->ru_start));
+		if (trace_sort)
+			elog(LOG, "worker %d starting quicksort of run %d: %s",
+				 state->worker, state->currentRun,
+				 pg_rusage_show(&state->ru_start));
 #endif
 
-	/*
-	 * Sort all tuples accumulated within the allowed amount of memory for
-	 * this run using quicksort
-	 */
-	tuplesort_sort_memtuples(state);
+		/*
+		 * Sort all tuples accumulated within the allowed amount of memory for
+		 * this run using quicksort.
+		 */
+		tuplesort_sort_memtuples(state);
+	}
+	else if (state->new_tapes)
+	{
+		if (state->currentRun > 0)
+			selectnewtape(state);
+
+		state->currentRun++;
+		/* let reuse the existing tape next time. */
+		state->new_tapes = false;
+	}
+	else
+	{
+		/*
+		 * We always using the preexisting tape to reduce the number of tapes.
+		 */
+	}
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -2423,7 +2486,14 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 	FREEMEM(state, state->tupleMem);
 	state->tupleMem = 0;
 
-	markrunend(state->destTape);
+	if (!state->input_presorted)
+	{
+		markrunend(state->destTape);
+	}
+	else
+	{
+		/* handle it in tuplesort_dump_sortedtuples */
+	}
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -3043,12 +3113,14 @@ worker_get_identifier(Tuplesortstate *state)
  * There should only be one final output run for each worker, which consists
  * of all tuples that were originally input into worker.
  */
-static void
+void
 worker_freeze_result_tape(Tuplesortstate *state)
 {
 	Sharedsort *shared = state->shared;
 	TapeShare	output;
 
+	elog(LOG, "No. of runs  %d ", state->currentRun);
+	elog(INFO, "No. of runs  %d ", state->currentRun);
 	Assert(WORKER(state));
 	Assert(state->result_tape != NULL);
 	Assert(state->memtupcount == 0);
diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c
index 3d5b5ce015..94e098974b 100644
--- a/src/backend/utils/sort/tuplesortvariants.c
+++ b/src/backend/utils/sort/tuplesortvariants.c
@@ -592,10 +592,10 @@ tuplesort_begin_index_brin(int workMem,
 
 Tuplesortstate *
 tuplesort_begin_index_gin(int workMem, SortCoordinate coordinate,
-						  int sortopt)
+						  int sortopt, bool presorted)
 {
-	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
-												   sortopt);
+	Tuplesortstate *state = tuplesort_begin_common_ext(workMem, coordinate,
+													   sortopt, presorted);
 	TuplesortPublic *base = TuplesortstateGetPublic(state);
 
 #ifdef TRACE_SORT
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index 659d551247..26fbb6b757 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -375,6 +375,12 @@ typedef struct
 extern Tuplesortstate *tuplesort_begin_common(int workMem,
 											  SortCoordinate coordinate,
 											  int sortopt);
+extern Tuplesortstate *tuplesort_begin_common_ext(int workMem,
+												  SortCoordinate coordinate,
+												  int sortopt,
+												  bool input_presorted);
+extern void worker_freeze_result_tape(Tuplesortstate *state);
+
 extern void tuplesort_set_bound(Tuplesortstate *state, int64 bound);
 extern bool tuplesort_used_bound(Tuplesortstate *state);
 extern void tuplesort_puttuple_common(Tuplesortstate *state,
@@ -387,6 +393,7 @@ extern bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples,
 								 bool forward);
 extern void tuplesort_end(Tuplesortstate *state);
 extern void tuplesort_reset(Tuplesortstate *state);
+extern void tuplesort_dump_sortedtuples(Tuplesortstate *state);
 
 extern void tuplesort_get_stats(Tuplesortstate *state,
 								TuplesortInstrumentation *stats);
@@ -445,7 +452,7 @@ extern Tuplesortstate *tuplesort_begin_index_gist(Relation heapRel,
 extern Tuplesortstate *tuplesort_begin_index_brin(int workMem, SortCoordinate coordinate,
 												  int sortopt);
 extern Tuplesortstate *tuplesort_begin_index_gin(int workMem, SortCoordinate coordinate,
-												 int sortopt);
+												 int sortopt, bool presorted);
 extern Tuplesortstate *tuplesort_begin_datum(Oid datumType,
 											 Oid sortOperator, Oid sortCollation,
 											 bool nullsFirstFlag,
-- 
2.45.1

