From bef514a687bcdc6f13b84ded17d62ab3cf1ff848 Mon Sep 17 00:00:00 2001 From: ChangAo Chen Date: Fri, 5 Dec 2025 14:37:21 +0800 Subject: [PATCH v2 1/2] Support tuplesort_heap_build() in tuplesort.c. Build a valid heap with n tuples by using tuplesort_heap_insert() is O(n log n), we can use tuplesort_heap_build() to optimize it, which is O(n). --- src/backend/utils/sort/tuplesort.c | 103 +++++++++++++++-------------- 1 file changed, 53 insertions(+), 50 deletions(-) diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index 5d4411dc33f..e28b8cfa868 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -465,7 +465,7 @@ static void dumptuples(Tuplesortstate *state, bool alltuples); static void make_bounded_heap(Tuplesortstate *state); static void sort_bounded_heap(Tuplesortstate *state); static void tuplesort_sort_memtuples(Tuplesortstate *state); -static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple); +static void tuplesort_heap_build(Tuplesortstate *state); static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple); static void tuplesort_heap_delete_top(Tuplesortstate *state); static void reversedirection(Tuplesortstate *state); @@ -2269,10 +2269,14 @@ beginmerge(Tuplesortstate *state) if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup)) { + Assert(state->memtupcount < state->memtupsize); + CHECK_FOR_INTERRUPTS(); tup.srctape = srcTapeIndex; - tuplesort_heap_insert(state, &tup); + state->memtuples[state->memtupcount++] = tup; } } + + tuplesort_heap_build(state); } /* @@ -2593,32 +2597,25 @@ make_bounded_heap(Tuplesortstate *state) /* Reverse sort direction so largest entry will be at root */ reversedirection(state); - state->memtupcount = 0; /* make the heap empty */ - for (i = 0; i < tupcount; i++) + /* Build a valid heap from the first "state->bound" tuples */ + state->memtupcount = state->bound; + tuplesort_heap_build(state); + + /* Process the remaining tuples */ + for (i = state->bound; i < tupcount; i++) { - if (state->memtupcount < state->bound) + /* + * The heap is full. Replace the largest entry with the new + * tuple, or just discard it, if it's larger than anything already + * in the heap. + */ + if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0) { - /* Insert next tuple into heap */ - /* Must copy source tuple to avoid possible overwrite */ - SortTuple stup = state->memtuples[i]; - - tuplesort_heap_insert(state, &stup); + free_sort_tuple(state, &state->memtuples[i]); + CHECK_FOR_INTERRUPTS(); } else - { - /* - * The heap is full. Replace the largest entry with the new - * tuple, or just discard it, if it's larger than anything already - * in the heap. - */ - if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0) - { - free_sort_tuple(state, &state->memtuples[i]); - CHECK_FOR_INTERRUPTS(); - } - else - tuplesort_heap_replace_top(state, &state->memtuples[i]); - } + tuplesort_heap_replace_top(state, &state->memtuples[i]); } Assert(state->memtupcount == state->bound); @@ -2721,40 +2718,46 @@ tuplesort_sort_memtuples(Tuplesortstate *state) } /* - * Insert a new tuple into an empty or existing heap, maintaining the - * heap invariant. Caller is responsible for ensuring there's room. - * - * Note: For some callers, tuple points to a memtuples[] entry above the - * end of the heap. This is safe as long as it's not immediately adjacent - * to the end of the heap (ie, in the [memtupcount] array entry) --- if it - * is, it might get overwritten before being moved into the heap! + * Build a valid heap from memtuples[]. */ static void -tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple) +tuplesort_heap_build(Tuplesortstate *state) { - SortTuple *memtuples; - int j; - - memtuples = state->memtuples; - Assert(state->memtupcount < state->memtupsize); - - CHECK_FOR_INTERRUPTS(); + SortTuple *memtuples = state->memtuples; + SortTuple stup; + int hole; + unsigned int i, j, n; /* - * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is - * using 1-based array indexes, not 0-based. + * state->memtupcount is "int", but we use "unsigned int" for i, j, n. + * This prevents overflow in the "2 * i + 1" calculation, since at the top + * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2. */ - j = state->memtupcount++; - while (j > 0) + n = state->memtupcount; + for (hole = (state->memtupcount - 2) / 2; hole >= 0; hole--) { - int i = (j - 1) >> 1; - - if (COMPARETUP(state, tuple, &memtuples[i]) >= 0) - break; - memtuples[j] = memtuples[i]; - j = i; + CHECK_FOR_INTERRUPTS(); + /* i is where the "hole" is */ + i = hole; + /* copy memtuples[i] because it will get overwritten below */ + stup = memtuples[i]; + /* sift down */ + for (;;) + { + j = 2 * i + 1; + + if (j >= n) + break; + if (j + 1 < n && + COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0) + j++; + if (COMPARETUP(state, &stup, &memtuples[j]) <= 0) + break; + memtuples[i] = memtuples[j]; + i = j; + } + memtuples[i] = stup; } - memtuples[j] = *tuple; } /* -- 2.52.0