Tuplesort merge pre-reading
While reviewing Peter's latest round of sorting patches, and trying to
understand the new "batch allocation" mechanism, I started to wonder how
useful the pre-reading in the merge stage is in the first place.
I'm talking about the code that reads a bunch of from each tape, loading
them into the memtuples array. That code was added by Tom Lane, back in
1999:
commit cf627ab41ab9f6038a29ddd04dd0ff0ccdca714e
Author: Tom Lane <tgl@sss.pgh.pa.us>
Date: Sat Oct 30 17:27:15 1999 +0000
Further performance improvements in sorting: reduce number of
comparisons
during initial run formation by keeping both current run and next-run
tuples in the same heap (yup, Knuth is smarter than I am). And, during
merge passes, make use of available sort memory to load multiple tuples
from any one input 'tape' at a time, thereby improving locality of
access to the temp file.
So apparently there was a benefit back then, but is it still worthwhile?
The LogicalTape buffers one block at a time, anyway, how much gain are
we getting from parsing the tuples into SortTuple format in batches?
I wrote a quick patch to test that, attached. It seems to improve
performance, at least in this small test case:
create table lotsofints(i integer);
insert into lotsofints select random() * 1000000000.0 from
generate_series(1, 10000000);
vacuum freeze;
select count(*) FROM (select * from lotsofints order by i) t;
On my laptop, with default work_mem=4MB, that select takes 7.8 s on
unpatched master, and 6.2 s with the attached patch.
So, at least in some cases, the pre-loading hurts. I think we should get
rid of it. This patch probably needs some polishing: I replaced the
batch allocations with a simpler scheme with a buffer to hold just a
single tuple for each tape, and that might need some more work to allow
downsizing those buffers if you have a few very large tuples in an
otherwise narrow table. And perhaps we should free and reallocate a
smaller memtuples array for the merging, now that we're not making use
of the whole of it. And perhaps we should teach LogicalTape to use
larger buffers, if we can't rely on the OS to do the buffering for us.
But overall, this seems to make the code both simpler and faster.
Am I missing something?
- Heikki
Attachments:
0001-Don-t-bother-to-pre-read-tuples-into-slots-during-me.patchapplication/x-patch; name=0001-Don-t-bother-to-pre-read-tuples-into-slots-during-me.patchDownload
From ea4ce25a33d0dec370a1b5e45cbc6f794e377a90 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Tue, 6 Sep 2016 14:38:54 +0300
Subject: [PATCH 1/1] Don't bother to pre-read tuples into slots during merge.
That only seems to add overhead. We're doing the same number of READTUP()
calls either way, but we're spreading the memory usage over a larger area
if we try to pre-read, so it doesn't seem worth it. Although, we're not
using all the available memory this way. Are we now doing too short reads
from the underlying files? Perhaps we should increase the buffer size in
LogicalTape instead, if that would help?
---
src/backend/utils/sort/tuplesort.c | 487 ++++++-------------------------------
1 file changed, 80 insertions(+), 407 deletions(-)
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index c8fbcf8..1fc1b5e 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -358,42 +358,27 @@ struct Tuplesortstate
*/
/*
- * These variables are only used during merge passes. mergeactive[i] is
+ * This variable is only used during merge passes. mergeactive[i] is
* true if we are reading an input run from (actual) tape number i and
- * have not yet exhausted that run. mergenext[i] is the memtuples index
- * of the next pre-read tuple (next to be loaded into the heap) for tape
- * i, or 0 if we are out of pre-read tuples. mergelast[i] similarly
- * points to the last pre-read tuple from each tape. mergeavailslots[i]
- * is the number of unused memtuples[] slots reserved for tape i, and
- * mergeavailmem[i] is the amount of unused space allocated for tape i.
- * mergefreelist and mergefirstfree keep track of unused locations in the
- * memtuples[] array. The memtuples[].tupindex fields link together
- * pre-read tuples for each tape as well as recycled locations in
- * mergefreelist. It is OK to use 0 as a null link in these lists, because
- * memtuples[0] is part of the merge heap and is never a pre-read tuple.
+ * have not yet exhausted that run.
*/
bool *mergeactive; /* active input run source? */
- int *mergenext; /* first preread tuple for each source */
- int *mergelast; /* last preread tuple for each source */
- int *mergeavailslots; /* slots left for prereading each tape */
- int64 *mergeavailmem; /* availMem for prereading each tape */
- int mergefreelist; /* head of freelist of recycled slots */
- int mergefirstfree; /* first slot never used in this merge */
/*
- * Per-tape batch state, when final on-the-fly merge consumes memory from
- * just a few large allocations.
+ * Per-tape batch state, when final on-the-fly merge uses pre-allocated
+ * buffers to hold just the latest tuple, instead of using palloc() for
+ * each tuple. We have one buffer to hold the next tuple from each tape,
+ * plus one buffer to hold the tuple we last returned to the caller.
*
* Aside from the general benefits of performing fewer individual retail
* palloc() calls, this also helps make merging more cache efficient,
- * since each tape's tuples must naturally be accessed sequentially (in
- * sorted order).
+ * since we reuse the same memory quickly.
*/
- int64 spacePerTape; /* Space (memory) for tuples (not slots) */
- char **mergetuples; /* Each tape's memory allocation */
- char **mergecurrent; /* Current offset into each tape's memory */
- char **mergetail; /* Last item's start point for each tape */
- char **mergeoverflow; /* Retail palloc() "overflow" for each tape */
+ char **mergetuples; /* Each tape's memory allocation */
+ int *mergetuplesizes; /* size of each allocation */
+
+ char *mergelasttuple;
+ int mergelasttuplesize; /* allocated size */
/*
* Variables for Algorithm D. Note that destTape is a "logical" tape
@@ -555,14 +540,8 @@ static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state, bool finalMergeBatch);
static void batchmemtuples(Tuplesortstate *state);
-static void mergebatch(Tuplesortstate *state, int64 spacePerTape);
-static void mergebatchone(Tuplesortstate *state, int srcTape,
- SortTuple *stup, bool *should_free);
-static void mergebatchfreetape(Tuplesortstate *state, int srcTape,
- SortTuple *rtup, bool *should_free);
-static void *mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen);
-static void mergepreread(Tuplesortstate *state);
-static void mergeprereadone(Tuplesortstate *state, int srcTape);
+static void mergebatch(Tuplesortstate *state);
+static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
static void dumptuples(Tuplesortstate *state, bool alltuples);
static void dumpbatch(Tuplesortstate *state, bool alltuples);
static void make_bounded_heap(Tuplesortstate *state);
@@ -1976,8 +1955,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
if (state->memtupcount > 0)
{
int srcTape = state->memtuples[0].tupindex;
- int tupIndex;
- SortTuple *newtup;
+ SortTuple newtup;
/*
* Returned tuple is still counted in our memory space most of
@@ -1988,42 +1966,15 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
*/
*stup = state->memtuples[0];
tuplesort_heap_siftup(state, false);
- if ((tupIndex = state->mergenext[srcTape]) == 0)
- {
- /*
- * out of preloaded data on this tape, try to read more
- *
- * Unlike mergeonerun(), we only preload from the single
- * tape that's run dry, though not before preparing its
- * batch memory for a new round of sequential consumption.
- * See mergepreread() comments.
- */
- if (state->batchUsed)
- mergebatchone(state, srcTape, stup, should_free);
-
- mergeprereadone(state, srcTape);
- /*
- * if still no data, we've reached end of run on this tape
- */
- if ((tupIndex = state->mergenext[srcTape]) == 0)
- {
- /* Free tape's buffer, avoiding dangling pointer */
- if (state->batchUsed)
- mergebatchfreetape(state, srcTape, stup, should_free);
- return true;
- }
+ /* pull next tuple from tape, insert in heap */
+ if (!mergereadnext(state, srcTape, &newtup))
+ {
+ /* we've reached end of run on this tape */
+ return true;
}
- /* pull next preread tuple from list, insert in heap */
- newtup = &state->memtuples[tupIndex];
- state->mergenext[srcTape] = newtup->tupindex;
- if (state->mergenext[srcTape] == 0)
- state->mergelast[srcTape] = 0;
- tuplesort_heap_insert(state, newtup, srcTape, false);
- /* put the now-unused memtuples entry on the freelist */
- newtup->tupindex = state->mergefreelist;
- state->mergefreelist = tupIndex;
- state->mergeavailslots[srcTape]++;
+
+ tuplesort_heap_insert(state, &newtup, srcTape, false);
return true;
}
return false;
@@ -2350,14 +2301,8 @@ inittapes(Tuplesortstate *state)
state->tapeset = LogicalTapeSetCreate(maxTapes);
state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
- state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
- state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
- state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
- state->mergeavailmem = (int64 *) palloc0(maxTapes * sizeof(int64));
state->mergetuples = (char **) palloc0(maxTapes * sizeof(char *));
- state->mergecurrent = (char **) palloc0(maxTapes * sizeof(char *));
- state->mergetail = (char **) palloc0(maxTapes * sizeof(char *));
- state->mergeoverflow = (char **) palloc0(maxTapes * sizeof(char *));
+ state->mergetuplesizes = (int *) palloc0(maxTapes * sizeof(int));
state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
@@ -2617,10 +2562,6 @@ mergeonerun(Tuplesortstate *state)
{
int destTape = state->tp_tapenum[state->tapeRange];
int srcTape;
- int tupIndex;
- SortTuple *tup;
- int64 priorAvail,
- spaceFreed;
/*
* Start the merge by loading one tuple from each active source tape into
@@ -2635,33 +2576,21 @@ mergeonerun(Tuplesortstate *state)
*/
while (state->memtupcount > 0)
{
+ SortTuple stup;
+
/* write the tuple to destTape */
- priorAvail = state->availMem;
srcTape = state->memtuples[0].tupindex;
WRITETUP(state, destTape, &state->memtuples[0]);
- /* writetup adjusted total free space, now fix per-tape space */
- spaceFreed = state->availMem - priorAvail;
- state->mergeavailmem[srcTape] += spaceFreed;
/* compact the heap */
tuplesort_heap_siftup(state, false);
- if ((tupIndex = state->mergenext[srcTape]) == 0)
+
+ /* pull next tuple from tape, insert in heap */
+ if (!mergereadnext(state, srcTape, &stup))
{
- /* out of preloaded data on this tape, try to read more */
- mergepreread(state);
- /* if still no data, we've reached end of run on this tape */
- if ((tupIndex = state->mergenext[srcTape]) == 0)
- continue;
+ /* we've reached end of run on this tape */
+ continue;
}
- /* pull next preread tuple from list, insert in heap */
- tup = &state->memtuples[tupIndex];
- state->mergenext[srcTape] = tup->tupindex;
- if (state->mergenext[srcTape] == 0)
- state->mergelast[srcTape] = 0;
- tuplesort_heap_insert(state, tup, srcTape, false);
- /* put the now-unused memtuples entry on the freelist */
- tup->tupindex = state->mergefreelist;
- state->mergefreelist = tupIndex;
- state->mergeavailslots[srcTape]++;
+ tuplesort_heap_insert(state, &stup, srcTape, false);
}
/*
@@ -2704,8 +2633,6 @@ beginmerge(Tuplesortstate *state, bool finalMergeBatch)
int activeTapes;
int tapenum;
int srcTape;
- int slotsPerTape;
- int64 spacePerTape;
/* Heap should be empty here */
Assert(state->memtupcount == 0);
@@ -2729,14 +2656,6 @@ beginmerge(Tuplesortstate *state, bool finalMergeBatch)
}
state->activeTapes = activeTapes;
- /* Clear merge-pass state variables */
- memset(state->mergenext, 0,
- state->maxTapes * sizeof(*state->mergenext));
- memset(state->mergelast, 0,
- state->maxTapes * sizeof(*state->mergelast));
- state->mergefreelist = 0; /* nothing in the freelist */
- state->mergefirstfree = activeTapes; /* 1st slot avail for preread */
-
if (finalMergeBatch)
{
/* Free outright buffers for tape never actually allocated */
@@ -2749,22 +2668,7 @@ beginmerge(Tuplesortstate *state, bool finalMergeBatch)
batchmemtuples(state);
}
- /*
- * Initialize space allocation to let each active input tape have an equal
- * share of preread space.
- */
Assert(activeTapes > 0);
- slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
- Assert(slotsPerTape > 0);
- spacePerTape = MAXALIGN_DOWN(state->availMem / activeTapes);
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
- {
- if (state->mergeactive[srcTape])
- {
- state->mergeavailslots[srcTape] = slotsPerTape;
- state->mergeavailmem[srcTape] = spacePerTape;
- }
- }
/*
* Preallocate tuple batch memory for each tape. This is the memory used
@@ -2773,35 +2677,21 @@ beginmerge(Tuplesortstate *state, bool finalMergeBatch)
* once per sort, just in advance of the final on-the-fly merge step.
*/
if (finalMergeBatch)
- mergebatch(state, spacePerTape);
-
- /*
- * Preread as many tuples as possible (and at least one) from each active
- * tape
- */
- mergepreread(state);
+ mergebatch(state);
/* Load the merge heap with the first tuple from each input tape */
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
- int tupIndex = state->mergenext[srcTape];
- SortTuple *tup;
+ SortTuple tup;
- if (tupIndex)
+ if (mergereadnext(state, srcTape, &tup))
{
- tup = &state->memtuples[tupIndex];
- state->mergenext[srcTape] = tup->tupindex;
- if (state->mergenext[srcTape] == 0)
- state->mergelast[srcTape] = 0;
- tuplesort_heap_insert(state, tup, srcTape, false);
- /* put the now-unused memtuples entry on the freelist */
- tup->tupindex = state->mergefreelist;
- state->mergefreelist = tupIndex;
- state->mergeavailslots[srcTape]++;
+ tuplesort_heap_insert(state, &tup, srcTape, false);
#ifdef TRACE_SORT
if (trace_sort && finalMergeBatch)
{
+#if 0
int64 perTapeKB = (spacePerTape + 1023) / 1024;
int64 usedSpaceKB;
int usedSlots;
@@ -2828,6 +2718,7 @@ beginmerge(Tuplesortstate *state, bool finalMergeBatch)
(double) usedSpaceKB / (double) perTapeKB,
usedSlots, slotsPerTape,
(double) usedSlots / (double) slotsPerTape);
+#endif
}
#endif
}
@@ -2923,7 +2814,7 @@ batchmemtuples(Tuplesortstate *state)
* goal.
*/
static void
-mergebatch(Tuplesortstate *state, int64 spacePerTape)
+mergebatch(Tuplesortstate *state)
{
int srcTape;
@@ -2943,283 +2834,46 @@ mergebatch(Tuplesortstate *state, int64 spacePerTape)
continue;
/* Allocate buffer for each active tape */
- mergetuples = MemoryContextAllocHuge(state->tuplecontext,
- spacePerTape);
+ mergetuples = MemoryContextAlloc(state->tuplecontext, BLCKSZ);
/* Initialize state for tape */
state->mergetuples[srcTape] = mergetuples;
- state->mergecurrent[srcTape] = mergetuples;
- state->mergetail[srcTape] = mergetuples;
- state->mergeoverflow[srcTape] = NULL;
+ state->mergetuplesizes[srcTape] = BLCKSZ;
}
- state->batchUsed = true;
- state->spacePerTape = spacePerTape;
-}
+ /* and one more buffer that's not associated with any tape initially */
+ state->mergelasttuple = MemoryContextAlloc(state->tuplecontext, BLCKSZ);
+ state->mergelasttuplesize = BLCKSZ;
-/*
- * mergebatchone - prepare batch memory for one merge input tape
- *
- * This is called following the exhaustion of preread tuples for one input
- * tape. All that actually occurs is that the state for the source tape is
- * reset to indicate that all memory may be reused.
- *
- * This routine must deal with fixing up the tuple that is about to be returned
- * to the client, due to "overflow" allocations.
- */
-static void
-mergebatchone(Tuplesortstate *state, int srcTape, SortTuple *rtup,
- bool *should_free)
-{
- Assert(state->batchUsed);
-
- /*
- * Tuple about to be returned to caller ("stup") is final preread tuple
- * from tape, just removed from the top of the heap. Special steps around
- * memory management must be performed for that tuple, to make sure it
- * isn't overwritten early.
- */
- if (!state->mergeoverflow[srcTape])
- {
- Size tupLen;
-
- /*
- * Mark tuple buffer range for reuse, but be careful to move final,
- * tail tuple to start of space for next run so that it's available to
- * caller when stup is returned, and remains available at least until
- * the next tuple is requested.
- */
- tupLen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
- state->mergecurrent[srcTape] = state->mergetuples[srcTape];
- MOVETUP(state->mergecurrent[srcTape], state->mergetail[srcTape],
- tupLen);
-
- /* Make SortTuple at top of the merge heap point to new tuple */
- rtup->tuple = (void *) state->mergecurrent[srcTape];
-
- state->mergetail[srcTape] = state->mergecurrent[srcTape];
- state->mergecurrent[srcTape] += tupLen;
- }
- else
- {
- /*
- * Handle an "overflow" retail palloc.
- *
- * This is needed when we run out of tuple memory for the tape.
- */
- state->mergecurrent[srcTape] = state->mergetuples[srcTape];
- state->mergetail[srcTape] = state->mergetuples[srcTape];
-
- if (rtup->tuple)
- {
- Assert(rtup->tuple == (void *) state->mergeoverflow[srcTape]);
- /* Caller should free palloc'd tuple */
- *should_free = true;
- }
- state->mergeoverflow[srcTape] = NULL;
- }
-}
-
-/*
- * mergebatchfreetape - handle final clean-up for batch memory once tape is
- * about to become exhausted
- *
- * All tuples are returned from tape, but a single final tuple, *rtup, is to be
- * passed back to caller. Free tape's batch allocation buffer while ensuring
- * that the final tuple is managed appropriately.
- */
-static void
-mergebatchfreetape(Tuplesortstate *state, int srcTape, SortTuple *rtup,
- bool *should_free)
-{
- Assert(state->batchUsed);
- Assert(state->status == TSS_FINALMERGE);
-
- /*
- * Tuple may or may not already be an overflow allocation from
- * mergebatchone()
- */
- if (!*should_free && rtup->tuple)
- {
- /*
- * Final tuple still in tape's batch allocation.
- *
- * Return palloc()'d copy to caller, and have it freed in a similar
- * manner to overflow allocation. Otherwise, we'd free batch memory
- * and pass back a pointer to garbage. Note that we deliberately
- * allocate this in the parent tuplesort context, to be on the safe
- * side.
- */
- Size tuplen;
- void *oldTuple = rtup->tuple;
-
- tuplen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
- rtup->tuple = MemoryContextAlloc(state->sortcontext, tuplen);
- MOVETUP(rtup->tuple, oldTuple, tuplen);
- *should_free = true;
- }
-
- /* Free spacePerTape-sized buffer */
- pfree(state->mergetuples[srcTape]);
-}
-
-/*
- * mergebatchalloc - allocate memory for one tuple using a batch memory
- * "logical allocation".
- *
- * This is used for the final on-the-fly merge phase only. READTUP() routines
- * receive memory from here in place of palloc() and USEMEM() calls.
- *
- * Tuple tapenum is passed, ensuring each tape's tuples are stored in sorted,
- * contiguous order (while allowing safe reuse of memory made available to
- * each tape). This maximizes locality of access as tuples are returned by
- * final merge.
- *
- * Caller must not subsequently attempt to free memory returned here. In
- * general, only mergebatch* functions know about how memory returned from
- * here should be freed, and this function's caller must ensure that batch
- * memory management code will definitely have the opportunity to do the right
- * thing during the final on-the-fly merge.
- */
-static void *
-mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen)
-{
- Size reserve_tuplen = MAXALIGN(tuplen);
- char *ret;
-
- /* Should overflow at most once before mergebatchone() call: */
- Assert(state->mergeoverflow[tapenum] == NULL);
- Assert(state->batchUsed);
-
- /* It should be possible to use precisely spacePerTape memory at once */
- if (state->mergecurrent[tapenum] + reserve_tuplen <=
- state->mergetuples[tapenum] + state->spacePerTape)
- {
- /*
- * Usual case -- caller is returned pointer into its tape's buffer,
- * and an offset from that point is recorded as where tape has
- * consumed up to for current round of preloading.
- */
- ret = state->mergetail[tapenum] = state->mergecurrent[tapenum];
- state->mergecurrent[tapenum] += reserve_tuplen;
- }
- else
- {
- /*
- * Allocate memory, and record as tape's overflow allocation. This
- * will be detected quickly, in a similar fashion to a LACKMEM()
- * condition, and should not happen again before a new round of
- * preloading for caller's tape. Note that we deliberately allocate
- * this in the parent tuplesort context, to be on the safe side.
- *
- * Sometimes, this does not happen because merging runs out of slots
- * before running out of memory.
- */
- ret = state->mergeoverflow[tapenum] =
- MemoryContextAlloc(state->sortcontext, tuplen);
- }
-
- return ret;
+ state->batchUsed = true;
}
/*
- * mergepreread - load tuples from merge input tapes
+ * mergereadnext - load tuple from one merge input tape
*
- * This routine exists to improve sequentiality of reads during a merge pass,
- * as explained in the header comments of this file. Load tuples from each
- * active source tape until the tape's run is exhausted or it has used up
- * its fair share of available memory. In any case, we guarantee that there
- * is at least one preread tuple available from each unexhausted input tape.
- *
- * We invoke this routine at the start of a merge pass for initial load,
- * and then whenever any tape's preread data runs out. Note that we load
- * as much data as possible from all tapes, not just the one that ran out.
- * This is because logtape.c works best with a usage pattern that alternates
- * between reading a lot of data and writing a lot of data, so whenever we
- * are forced to read, we should fill working memory completely.
- *
- * In FINALMERGE state, we *don't* use this routine, but instead just preread
- * from the single tape that ran dry. There's no read/write alternation in
- * that state and so no point in scanning through all the tapes to fix one.
- * (Moreover, there may be quite a lot of inactive tapes in that state, since
- * we might have had many fewer runs than tapes. In a regular tape-to-tape
- * merge we can expect most of the tapes to be active. Plus, only
- * FINALMERGE state has to consider memory management for a batch
- * allocation.)
- */
-static void
-mergepreread(Tuplesortstate *state)
-{
- int srcTape;
-
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
- mergeprereadone(state, srcTape);
-}
-
-/*
- * mergeprereadone - load tuples from one merge input tape
+ * Returns false on EOF.
*
* Read tuples from the specified tape until it has used up its free memory
* or array slots; but ensure that we have at least one tuple, if any are
* to be had.
*/
-static void
-mergeprereadone(Tuplesortstate *state, int srcTape)
+static bool
+mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
{
unsigned int tuplen;
- SortTuple stup;
- int tupIndex;
- int64 priorAvail,
- spaceUsed;
if (!state->mergeactive[srcTape])
- return; /* tape's run is already exhausted */
-
- /*
- * Manage per-tape availMem. Only actually matters when batch memory not
- * in use.
- */
- priorAvail = state->availMem;
- state->availMem = state->mergeavailmem[srcTape];
+ return false; /* tape's run is already exhausted */
- /*
- * When batch memory is used if final on-the-fly merge, only mergeoverflow
- * test is relevant; otherwise, only LACKMEM() test is relevant.
- */
- while ((state->mergeavailslots[srcTape] > 0 &&
- state->mergeoverflow[srcTape] == NULL && !LACKMEM(state)) ||
- state->mergenext[srcTape] == 0)
+ /* read next tuple, if any */
+ if ((tuplen = getlen(state, srcTape, true)) == 0)
{
- /* read next tuple, if any */
- if ((tuplen = getlen(state, srcTape, true)) == 0)
- {
- state->mergeactive[srcTape] = false;
- break;
- }
- READTUP(state, &stup, srcTape, tuplen);
- /* find a free slot in memtuples[] for it */
- tupIndex = state->mergefreelist;
- if (tupIndex)
- state->mergefreelist = state->memtuples[tupIndex].tupindex;
- else
- {
- tupIndex = state->mergefirstfree++;
- Assert(tupIndex < state->memtupsize);
- }
- state->mergeavailslots[srcTape]--;
- /* store tuple, append to list for its tape */
- stup.tupindex = 0;
- state->memtuples[tupIndex] = stup;
- if (state->mergelast[srcTape])
- state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
- else
- state->mergenext[srcTape] = tupIndex;
- state->mergelast[srcTape] = tupIndex;
+ state->mergeactive[srcTape] = false;
+ return false;
}
- /* update per-tape and global availmem counts */
- spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
- state->mergeavailmem[srcTape] = state->availMem;
- state->availMem = priorAvail - spaceUsed;
+ READTUP(state, stup, srcTape, tuplen);
+
+ return true;
}
/*
@@ -3861,14 +3515,33 @@ readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen)
{
if (state->batchUsed)
{
+ char *buf;
+ int bufsize;
+
/*
+ * Recycle the buffer that held the previous tuple returned from
+ * the sort. Enlarge it if it's not large enough to hold the new
+ * tuple.
+ *
* No USEMEM() call, because during final on-the-fly merge accounting
- * is based on tape-private state. ("Overflow" allocations are
- * detected as an indication that a new round or preloading is
- * required. Preloading marks existing contents of tape's batch buffer
- * for reuse.)
+ * is based on tape-private state.
*/
- return mergebatchalloc(state, tapenum, tuplen);
+ if (tuplen > state->mergelasttuplesize)
+ {
+ state->mergelasttuple = repalloc(state->mergelasttuple, tuplen);
+ state->mergelasttuplesize = tuplen;
+ }
+ buf = state->mergelasttuple;
+ bufsize = state->mergelasttuplesize;
+
+ /* we will return the previous tuple from this tape next. */
+ state->mergelasttuple = state->mergetuples[tapenum];
+ state->mergelasttuplesize = state->mergetuplesizes[tapenum];
+
+ state->mergetuples[tapenum] = buf;
+ state->mergetuplesizes[tapenum] = bufsize;
+
+ return buf;
}
else
{
--
2.9.3
On Tue, Sep 6, 2016 at 5:20 AM, Heikki Linnakangas <hlinnaka@iki.fi> wrote:
I wrote a quick patch to test that, attached. It seems to improve
performance, at least in this small test case:create table lotsofints(i integer);
insert into lotsofints select random() * 1000000000.0 from
generate_series(1, 10000000);
vacuum freeze;select count(*) FROM (select * from lotsofints order by i) t;
On my laptop, with default work_mem=4MB, that select takes 7.8 s on
unpatched master, and 6.2 s with the attached patch.
The benefits have a lot to do with OS read-ahead, and efficient use of
memory bandwidth during the merge, where we want to access the caller
tuples sequentially per tape (i.e. that's what the batch memory stuff
added -- it also made much better use of available memory). Note that
I've been benchmarking the parallel CREATE INDEX patch on a server
with many HDDs, since sequential performance is mostly all that
matters. I think that in 1999, the preloading had a lot more to do
with logtape.c's ability to aggressively recycle blocks during merges,
such that the total storage overhead does not exceed the original size
of the caller tuples (plus what it calls "trivial bookkeeping
overhead" IIRC). That's less important these days, but still matters
some (it's more of an issue when you can't complete the sort in one
pass, which is rare these days).
Offhand, I would think that taken together this is very important. I'd
certainly want to see cases in the hundreds of megabytes or gigabytes
of work_mem alongside your 4MB case, even just to be able to talk
informally about this. As you know, the default work_mem value is very
conservative.
--
Peter Geoghegan
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Sep 6, 2016 at 12:08 PM, Peter Geoghegan <pg@heroku.com> wrote:
Offhand, I would think that taken together this is very important. I'd
certainly want to see cases in the hundreds of megabytes or gigabytes
of work_mem alongside your 4MB case, even just to be able to talk
informally about this. As you know, the default work_mem value is very
conservative.
It looks like your benchmark relies on multiple passes, which can be
misleading. I bet it suffers some amount of problems from palloc()
fragmentation. When very memory constrained, that can get really bad.
Non-final merge passes (merges with more than one run -- real or dummy
-- on any given tape) can have uneven numbers of runs on each tape.
So, tuplesort.c needs to be prepared to doll out memory among tapes
*unevenly* there (same applies to memtuples "slots"). This is why
batch memory support is so hard for those cases (the fact that they're
so rare anyway also puts me off it). As you know, I wrote a patch that
adds batch memory support to cases that require randomAccess (final
output on a materialized tape), for their final merge. These final
merges happen to not be a final on-the-fly merge only due to this
randomAccess requirement from caller. It's possible to support these
cases in the future, with that patch, only because I am safe to assume
that each run/tape is the same size there (well, the assumption is
exactly as safe as it was for the 9.6 final on-the-fly merge, at
least).
My point about non-final merges is that you have to be very careful
that you're comparing apples to apples, memory accounting wise, when
looking into something like this. I'm not saying that you didn't, but
it's worth considering.
FWIW, I did try an increase in the buffer size in LogicalTape at one
time several months back, and so no benefit there (at least, with no
other changes).
--
Peter Geoghegan
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 09/06/2016 10:26 PM, Peter Geoghegan wrote:
On Tue, Sep 6, 2016 at 12:08 PM, Peter Geoghegan <pg@heroku.com> wrote:
Offhand, I would think that taken together this is very important. I'd
certainly want to see cases in the hundreds of megabytes or gigabytes
of work_mem alongside your 4MB case, even just to be able to talk
informally about this. As you know, the default work_mem value is very
conservative.
I spent some more time polishing this up, and also added some code to
logtape.c, to use larger read buffers, to compensate for the fact that
we don't do pre-reading from tuplesort.c anymore. That should trigger
the OS read-ahead, and make the I/O more sequential, like was the
purpose of the old pre-reading code. But simpler. I haven't tested that
part much yet, but I plan to run some tests on larger data sets that
don't fit in RAM, to make the I/O effects visible.
I wrote a little testing toolkit, see third patch. I'm not proposing to
commit that, but that's what I used for testing. It creates four tables,
about 1GB in size each (it also creates smaller and larger tables, but I
used the "medium" sized ones for these tests). Two of the tables contain
integers, and two contain text strings. Two of the tables are completely
ordered, two are in random order. To measure, it runs ORDER BY queries
on the tables, with different work_mem settings.
Attached are the full results. In summary, these patches improve
performance in some of the tests, and are a wash on others. The patches
help in particular in the randomly ordered cases, with up to 512 MB of
work_mem.
For example, with work_mem=256MB, which is enough to get a single merge
pass:
with patches:
ordered_ints: 7078 ms, 6910 ms, 6849 ms
random_ints: 15639 ms, 15575 ms, 15625 ms
ordered_text: 11121 ms, 12318 ms, 10824 ms
random_text: 53462 ms, 53420 ms, 52949 ms
unpatched master:
ordered_ints: 6961 ms, 7040 ms, 7044 ms
random_ints: 19205 ms, 18797 ms, 18955 ms
ordered_text: 11045 ms, 11377 ms, 11203 ms
random_text: 57117 ms, 54489 ms, 54806 ms
(The same queries were run three times in a row, that's what the three
numbers on each row mean. I.e. the differences between the numbers on
same row are noise)
It looks like your benchmark relies on multiple passes, which can be
misleading. I bet it suffers some amount of problems from palloc()
fragmentation. When very memory constrained, that can get really bad.Non-final merge passes (merges with more than one run -- real or dummy
-- on any given tape) can have uneven numbers of runs on each tape.
So, tuplesort.c needs to be prepared to doll out memory among tapes
*unevenly* there (same applies to memtuples "slots"). This is why
batch memory support is so hard for those cases (the fact that they're
so rare anyway also puts me off it). As you know, I wrote a patch that
adds batch memory support to cases that require randomAccess (final
output on a materialized tape), for their final merge. These final
merges happen to not be a final on-the-fly merge only due to this
randomAccess requirement from caller. It's possible to support these
cases in the future, with that patch, only because I am safe to assume
that each run/tape is the same size there (well, the assumption is
exactly as safe as it was for the 9.6 final on-the-fly merge, at
least).My point about non-final merges is that you have to be very careful
that you're comparing apples to apples, memory accounting wise, when
looking into something like this. I'm not saying that you didn't, but
it's worth considering.
I'm not 100% sure I'm accounting for all the memory correctly. But I
didn't touch the way the initial quicksort works, nor the way the runs
are built. And the merge passes don't actually need or benefit from a
lot of memory, so I doubt it's very sensitive to that.
In this patch, the memory available for the read buffers is just divided
evenly across maxTapes. The buffers for the tapes that are not currently
active are wasted. It could be made smarter, by freeing all the
currently-unused buffers for tapes that are not active at the moment.
Might do that later, but this is what I'm going to benchmark for now. I
don't think adding buffers is helpful beyond a certain point, so this is
probably good enough in practice. Although it would be nice to free the
memory we don't need earlier, in case there are other processes that
could make use of it.
FWIW, I did try an increase in the buffer size in LogicalTape at one
time several months back, and so no benefit there (at least, with no
other changes).
Yeah, unless you get rid of the pre-reading in tuplesort.c, you're just
double-buffering.
- Heikki
Attachments:
0001-Don-t-bother-to-pre-read-tuples-into-SortTuple-slots.patchtext/x-diff; name=0001-Don-t-bother-to-pre-read-tuples-into-SortTuple-slots.patchDownload
From d4d89c88c5e26be70c976a756e874af65ad6ec55 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Thu, 8 Sep 2016 14:31:31 +0300
Subject: [PATCH 1/3] Don't bother to pre-read tuples into SortTuple slots
during merge.
That only seems to add overhead. We're doing the same number of READTUP()
calls either way, but we're spreading the memory usage over a larger area
if we try to pre-read, so it doesn't seem worth it.
The pre-reading can be helpful, to trigger the OS readahead of the
underlying tape, because it will make the read pattern appear more
sequential. But we'll fix that in the next patch, by teaching logtape.c to
read in larger chunks.
---
src/backend/utils/sort/tuplesort.c | 889 ++++++++++---------------------------
1 file changed, 223 insertions(+), 666 deletions(-)
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index c8fbcf8..b9fb99c 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -162,7 +162,7 @@ bool optimize_bounded_sort = true;
* The objects we actually sort are SortTuple structs. These contain
* a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
* which is a separate palloc chunk --- we assume it is just one chunk and
- * can be freed by a simple pfree() (except during final on-the-fly merge,
+ * can be freed by a simple pfree() (except during merge,
* when memory is used in batch). SortTuples also contain the tuple's
* first key column in Datum/nullflag format, and an index integer.
*
@@ -203,6 +203,20 @@ typedef struct
int tupindex; /* see notes above */
} SortTuple;
+/*
+ * During merge, we use a pre-allocated set of fixed-size buffers to store
+ * tuples in. To avoid palloc/pfree overhead.
+ *
+ * 'nextfree' is valid when this chunk is in the free list. When in use, the
+ * buffer holds a tuple.
+ */
+#define MERGETUPLEBUFFER_SIZE 1024
+
+typedef union MergeTupleBuffer
+{
+ union MergeTupleBuffer *nextfree;
+ char buffer[MERGETUPLEBUFFER_SIZE];
+} MergeTupleBuffer;
/*
* Possible states of a Tuplesort object. These denote the states that
@@ -307,14 +321,6 @@ struct Tuplesortstate
int tapenum, unsigned int len);
/*
- * Function to move a caller tuple. This is usually implemented as a
- * memmove() shim, but function may also perform additional fix-up of
- * caller tuple where needed. Batch memory support requires the movement
- * of caller tuples from one location in memory to another.
- */
- void (*movetup) (void *dest, void *src, unsigned int len);
-
- /*
* This array holds the tuples now in sort memory. If we are in state
* INITIAL, the tuples are in no particular order; if we are in state
* SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
@@ -332,12 +338,40 @@ struct Tuplesortstate
/*
* Memory for tuples is sometimes allocated in batch, rather than
* incrementally. This implies that incremental memory accounting has
- * been abandoned. Currently, this only happens for the final on-the-fly
- * merge step. Large batch allocations can store tuples (e.g.
- * IndexTuples) without palloc() fragmentation and other overhead.
+ * been abandoned. Currently, this happens when we start merging.
+ * Large batch allocations can store tuples (e.g. IndexTuples) without
+ * palloc() fragmentation and other overhead.
+ *
+ * For the batch memory, we use one large allocation, divided into
+ * MERGETUPLEBUFFER_SIZE chunks. The allocation is sized to hold
+ * one chunk per tape, plus one additional chunk. We need that many
+ * chunks to hold all the tuples kept in the heap during merge, plus
+ * the one we have last returned from the sort.
+ *
+ * Initially, all the chunks are kept in a linked list, in freeBufferHead.
+ * When a tuple is read from a tape, it is put to the next available
+ * chunk, if it fits. If the tuple is larger than MERGETUPLEBUFFER_SIZE,
+ * it is palloc'd instead.
+ *
+ * When we're done processing a tuple, we return the chunk back to the
+ * free list, or pfree() if it was palloc'd. We know that a tuple was
+ * allocated from the batch memory arena, if its pointer value is between
+ * mergeTupleBuffersBegin and -End.
*/
bool batchUsed;
+ char *batchMemoryBegin; /* beginning of batch memory arena */
+ char *batchMemoryEnd; /* end of batch memory arena */
+ MergeTupleBuffer *freeBufferHead; /* head of free list */
+
+ /*
+ * When we return a tuple to the caller that came from a tape (that is,
+ * in TSS_SORTEDONTAPE or TSS_FINALMERGE modes), we remember the tuple
+ * in 'readlasttuple', so that we can recycle the memory on next
+ * gettuple call.
+ */
+ void *readlasttuple;
+
/*
* While building initial runs, this indicates if the replacement
* selection strategy is in use. When it isn't, then a simple hybrid
@@ -358,42 +392,11 @@ struct Tuplesortstate
*/
/*
- * These variables are only used during merge passes. mergeactive[i] is
+ * This variable is only used during merge passes. mergeactive[i] is
* true if we are reading an input run from (actual) tape number i and
- * have not yet exhausted that run. mergenext[i] is the memtuples index
- * of the next pre-read tuple (next to be loaded into the heap) for tape
- * i, or 0 if we are out of pre-read tuples. mergelast[i] similarly
- * points to the last pre-read tuple from each tape. mergeavailslots[i]
- * is the number of unused memtuples[] slots reserved for tape i, and
- * mergeavailmem[i] is the amount of unused space allocated for tape i.
- * mergefreelist and mergefirstfree keep track of unused locations in the
- * memtuples[] array. The memtuples[].tupindex fields link together
- * pre-read tuples for each tape as well as recycled locations in
- * mergefreelist. It is OK to use 0 as a null link in these lists, because
- * memtuples[0] is part of the merge heap and is never a pre-read tuple.
+ * have not yet exhausted that run.
*/
bool *mergeactive; /* active input run source? */
- int *mergenext; /* first preread tuple for each source */
- int *mergelast; /* last preread tuple for each source */
- int *mergeavailslots; /* slots left for prereading each tape */
- int64 *mergeavailmem; /* availMem for prereading each tape */
- int mergefreelist; /* head of freelist of recycled slots */
- int mergefirstfree; /* first slot never used in this merge */
-
- /*
- * Per-tape batch state, when final on-the-fly merge consumes memory from
- * just a few large allocations.
- *
- * Aside from the general benefits of performing fewer individual retail
- * palloc() calls, this also helps make merging more cache efficient,
- * since each tape's tuples must naturally be accessed sequentially (in
- * sorted order).
- */
- int64 spacePerTape; /* Space (memory) for tuples (not slots) */
- char **mergetuples; /* Each tape's memory allocation */
- char **mergecurrent; /* Current offset into each tape's memory */
- char **mergetail; /* Last item's start point for each tape */
- char **mergeoverflow; /* Retail palloc() "overflow" for each tape */
/*
* Variables for Algorithm D. Note that destTape is a "logical" tape
@@ -481,11 +484,33 @@ struct Tuplesortstate
#endif
};
+/*
+ * Is the given tuple allocated from the batch memory arena?
+ */
+#define IS_MERGETUPLE_BUFFER(state, tuple) \
+ ((char *) tuple >= state->batchMemoryBegin && \
+ (char *) tuple < state->batchMemoryEnd)
+
+/*
+ * Return the given tuple to the batch memory free list, or free it
+ * if it was palloc'd.
+ */
+#define RELEASE_MERGETUPLE_BUFFER(state, tuple) \
+ do { \
+ MergeTupleBuffer *buf = (MergeTupleBuffer *) tuple; \
+ \
+ if (IS_MERGETUPLE_BUFFER(state, tuple)) \
+ { \
+ buf->nextfree = state->freeBufferHead; \
+ state->freeBufferHead = buf; \
+ } else \
+ pfree(tuple); \
+ } while(0)
+
#define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state))
#define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
#define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
#define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
-#define MOVETUP(dest,src,len) ((*(state)->movetup) (dest, src, len))
#define LACKMEM(state) ((state)->availMem < 0 && !(state)->batchUsed)
#define USEMEM(state,amt) ((state)->availMem -= (amt))
#define FREEMEM(state,amt) ((state)->availMem += (amt))
@@ -553,16 +578,8 @@ static void inittapes(Tuplesortstate *state);
static void selectnewtape(Tuplesortstate *state);
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
-static void beginmerge(Tuplesortstate *state, bool finalMergeBatch);
-static void batchmemtuples(Tuplesortstate *state);
-static void mergebatch(Tuplesortstate *state, int64 spacePerTape);
-static void mergebatchone(Tuplesortstate *state, int srcTape,
- SortTuple *stup, bool *should_free);
-static void mergebatchfreetape(Tuplesortstate *state, int srcTape,
- SortTuple *rtup, bool *should_free);
-static void *mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen);
-static void mergepreread(Tuplesortstate *state);
-static void mergeprereadone(Tuplesortstate *state, int srcTape);
+static void beginmerge(Tuplesortstate *state);
+static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
static void dumptuples(Tuplesortstate *state, bool alltuples);
static void dumpbatch(Tuplesortstate *state, bool alltuples);
static void make_bounded_heap(Tuplesortstate *state);
@@ -574,7 +591,7 @@ static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
static void reversedirection(Tuplesortstate *state);
static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
static void markrunend(Tuplesortstate *state, int tapenum);
-static void *readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen);
+static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
static int comparetup_heap(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -582,7 +599,6 @@ static void writetup_heap(Tuplesortstate *state, int tapenum,
SortTuple *stup);
static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
-static void movetup_heap(void *dest, void *src, unsigned int len);
static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -590,7 +606,6 @@ static void writetup_cluster(Tuplesortstate *state, int tapenum,
SortTuple *stup);
static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
-static void movetup_cluster(void *dest, void *src, unsigned int len);
static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
@@ -600,7 +615,6 @@ static void writetup_index(Tuplesortstate *state, int tapenum,
SortTuple *stup);
static void readtup_index(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
-static void movetup_index(void *dest, void *src, unsigned int len);
static int comparetup_datum(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -608,7 +622,6 @@ static void writetup_datum(Tuplesortstate *state, int tapenum,
SortTuple *stup);
static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
-static void movetup_datum(void *dest, void *src, unsigned int len);
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
/*
@@ -760,7 +773,6 @@ tuplesort_begin_heap(TupleDesc tupDesc,
state->copytup = copytup_heap;
state->writetup = writetup_heap;
state->readtup = readtup_heap;
- state->movetup = movetup_heap;
state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
state->abbrevNext = 10;
@@ -833,7 +845,6 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
state->copytup = copytup_cluster;
state->writetup = writetup_cluster;
state->readtup = readtup_cluster;
- state->movetup = movetup_cluster;
state->abbrevNext = 10;
state->indexInfo = BuildIndexInfo(indexRel);
@@ -925,7 +936,6 @@ tuplesort_begin_index_btree(Relation heapRel,
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
- state->movetup = movetup_index;
state->abbrevNext = 10;
state->heapRel = heapRel;
@@ -993,7 +1003,6 @@ tuplesort_begin_index_hash(Relation heapRel,
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
- state->movetup = movetup_index;
state->heapRel = heapRel;
state->indexRel = indexRel;
@@ -1036,7 +1045,6 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
state->copytup = copytup_datum;
state->writetup = writetup_datum;
state->readtup = readtup_datum;
- state->movetup = movetup_datum;
state->abbrevNext = 10;
state->datumType = datumType;
@@ -1881,14 +1889,33 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
case TSS_SORTEDONTAPE:
Assert(forward || state->randomAccess);
Assert(!state->batchUsed);
- *should_free = true;
+
+ /*
+ * The buffer holding the tuple that we returned in previous
+ * gettuple call can now be reused.
+ */
+ if (state->readlasttuple)
+ {
+ RELEASE_MERGETUPLE_BUFFER(state, state->readlasttuple);
+ state->readlasttuple = NULL;
+ }
+
if (forward)
{
if (state->eof_reached)
return false;
+
if ((tuplen = getlen(state, state->result_tape, true)) != 0)
{
READTUP(state, stup, state->result_tape, tuplen);
+
+ /*
+ * Remember the tuple we return, so that we can recycle its
+ * memory on next call. (This can be NULL, in the Datum case).
+ */
+ state->readlasttuple = stup->tuple;
+
+ *should_free = false;
return true;
}
else
@@ -1962,6 +1989,14 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
tuplen))
elog(ERROR, "bogus tuple length in backward scan");
READTUP(state, stup, state->result_tape, tuplen);
+
+ /*
+ * Remember the tuple we return, so that we can recycle its
+ * memory on next call. (This can be NULL, in the Datum case).
+ */
+ state->readlasttuple = stup->tuple;
+
+ *should_free = false;
return true;
case TSS_FINALMERGE:
@@ -1971,13 +2006,22 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
*should_free = false;
/*
+ * The buffer holding the tuple that we returned in previous
+ * gettuple call can now be reused.
+ */
+ if (state->readlasttuple)
+ {
+ RELEASE_MERGETUPLE_BUFFER(state, state->readlasttuple);
+ state->readlasttuple = NULL;
+ }
+
+ /*
* This code should match the inner loop of mergeonerun().
*/
if (state->memtupcount > 0)
{
int srcTape = state->memtuples[0].tupindex;
- int tupIndex;
- SortTuple *newtup;
+ SortTuple newtup;
/*
* Returned tuple is still counted in our memory space most of
@@ -1988,42 +2032,22 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
*/
*stup = state->memtuples[0];
tuplesort_heap_siftup(state, false);
- if ((tupIndex = state->mergenext[srcTape]) == 0)
- {
- /*
- * out of preloaded data on this tape, try to read more
- *
- * Unlike mergeonerun(), we only preload from the single
- * tape that's run dry, though not before preparing its
- * batch memory for a new round of sequential consumption.
- * See mergepreread() comments.
- */
- if (state->batchUsed)
- mergebatchone(state, srcTape, stup, should_free);
- mergeprereadone(state, srcTape);
+ /*
+ * Remember the tuple we return, so that we can recycle its
+ * memory on next call. (This can be NULL, in the Datum case).
+ */
+ state->readlasttuple = stup->tuple;
- /*
- * if still no data, we've reached end of run on this tape
- */
- if ((tupIndex = state->mergenext[srcTape]) == 0)
- {
- /* Free tape's buffer, avoiding dangling pointer */
- if (state->batchUsed)
- mergebatchfreetape(state, srcTape, stup, should_free);
- return true;
- }
+ /* pull next tuple from tape, insert in heap */
+ if (!mergereadnext(state, srcTape, &newtup))
+ {
+ /* we've reached end of run on this tape */
+ return true;
}
- /* pull next preread tuple from list, insert in heap */
- newtup = &state->memtuples[tupIndex];
- state->mergenext[srcTape] = newtup->tupindex;
- if (state->mergenext[srcTape] == 0)
- state->mergelast[srcTape] = 0;
- tuplesort_heap_insert(state, newtup, srcTape, false);
- /* put the now-unused memtuples entry on the freelist */
- newtup->tupindex = state->mergefreelist;
- state->mergefreelist = tupIndex;
- state->mergeavailslots[srcTape]++;
+
+ tuplesort_heap_insert(state, &newtup, srcTape, false);
+
return true;
}
return false;
@@ -2325,7 +2349,8 @@ inittapes(Tuplesortstate *state)
#endif
/*
- * Decrease availMem to reflect the space needed for tape buffers; but
+ * Decrease availMem to reflect the space needed for tape buffers, when
+ * writing the initial runs; but
* don't decrease it to the point that we have no room for tuples. (That
* case is only likely to occur if sorting pass-by-value Datums; in all
* other scenarios the memtuples[] array is unlikely to occupy more than
@@ -2350,14 +2375,6 @@ inittapes(Tuplesortstate *state)
state->tapeset = LogicalTapeSetCreate(maxTapes);
state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
- state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
- state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
- state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
- state->mergeavailmem = (int64 *) palloc0(maxTapes * sizeof(int64));
- state->mergetuples = (char **) palloc0(maxTapes * sizeof(char *));
- state->mergecurrent = (char **) palloc0(maxTapes * sizeof(char *));
- state->mergetail = (char **) palloc0(maxTapes * sizeof(char *));
- state->mergeoverflow = (char **) palloc0(maxTapes * sizeof(char *));
state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
@@ -2468,6 +2485,8 @@ mergeruns(Tuplesortstate *state)
svTape,
svRuns,
svDummy;
+ char *p;
+ int i;
Assert(state->status == TSS_BUILDRUNS);
Assert(state->memtupcount == 0);
@@ -2504,6 +2523,36 @@ mergeruns(Tuplesortstate *state)
return;
}
+ /*
+ * We no longer need a large memtuples array, only one slot per tape. Shrink
+ * it, to make the memory available for other use. We only need one slot per
+ * tape.
+ */
+ pfree(state->memtuples);
+ FREEMEM(state, state->memtupsize * sizeof(SortTuple));
+ state->memtupsize = state->maxTapes;
+ state->memtuples = (SortTuple *) palloc(state->maxTapes * sizeof(SortTuple));
+ USEMEM(state, state->memtupsize * sizeof(SortTuple));
+
+ /*
+ * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism to
+ * track memory usage.
+ */
+ state->batchUsed = true;
+
+ /* Initialize the merge tuple buffer arena. */
+ state->batchMemoryBegin = palloc((state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE);
+ state->batchMemoryEnd = state->batchMemoryBegin + (state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE;
+ state->freeBufferHead = (MergeTupleBuffer *) state->batchMemoryBegin;
+
+ p = state->batchMemoryBegin;
+ for (i = 0; i < state->maxTapes; i++)
+ {
+ ((MergeTupleBuffer *) p)->nextfree = (MergeTupleBuffer *) (p + MERGETUPLEBUFFER_SIZE);
+ p += MERGETUPLEBUFFER_SIZE;
+ }
+ ((MergeTupleBuffer *) p)->nextfree = NULL;
+
/* End of step D2: rewind all output tapes to prepare for merging */
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
LogicalTapeRewind(state->tapeset, tapenum, false);
@@ -2534,7 +2583,7 @@ mergeruns(Tuplesortstate *state)
/* Tell logtape.c we won't be writing anymore */
LogicalTapeSetForgetFreeSpace(state->tapeset);
/* Initialize for the final merge pass */
- beginmerge(state, state->tuples);
+ beginmerge(state);
state->status = TSS_FINALMERGE;
return;
}
@@ -2617,16 +2666,12 @@ mergeonerun(Tuplesortstate *state)
{
int destTape = state->tp_tapenum[state->tapeRange];
int srcTape;
- int tupIndex;
- SortTuple *tup;
- int64 priorAvail,
- spaceFreed;
/*
* Start the merge by loading one tuple from each active source tape into
* the heap. We can also decrease the input run/dummy run counts.
*/
- beginmerge(state, false);
+ beginmerge(state);
/*
* Execute merge by repeatedly extracting lowest tuple in heap, writing it
@@ -2635,33 +2680,25 @@ mergeonerun(Tuplesortstate *state)
*/
while (state->memtupcount > 0)
{
+ SortTuple stup;
+
/* write the tuple to destTape */
- priorAvail = state->availMem;
srcTape = state->memtuples[0].tupindex;
WRITETUP(state, destTape, &state->memtuples[0]);
- /* writetup adjusted total free space, now fix per-tape space */
- spaceFreed = state->availMem - priorAvail;
- state->mergeavailmem[srcTape] += spaceFreed;
+
+ /* Recycle the buffer we just wrote out, for the next read */
+ RELEASE_MERGETUPLE_BUFFER(state, state->memtuples[0].tuple);
+
/* compact the heap */
tuplesort_heap_siftup(state, false);
- if ((tupIndex = state->mergenext[srcTape]) == 0)
+
+ /* pull next tuple from tape, insert in heap */
+ if (!mergereadnext(state, srcTape, &stup))
{
- /* out of preloaded data on this tape, try to read more */
- mergepreread(state);
- /* if still no data, we've reached end of run on this tape */
- if ((tupIndex = state->mergenext[srcTape]) == 0)
- continue;
+ /* we've reached end of run on this tape */
+ continue;
}
- /* pull next preread tuple from list, insert in heap */
- tup = &state->memtuples[tupIndex];
- state->mergenext[srcTape] = tup->tupindex;
- if (state->mergenext[srcTape] == 0)
- state->mergelast[srcTape] = 0;
- tuplesort_heap_insert(state, tup, srcTape, false);
- /* put the now-unused memtuples entry on the freelist */
- tup->tupindex = state->mergefreelist;
- state->mergefreelist = tupIndex;
- state->mergeavailslots[srcTape]++;
+ tuplesort_heap_insert(state, &stup, srcTape, false);
}
/*
@@ -2694,18 +2731,13 @@ mergeonerun(Tuplesortstate *state)
* which tapes contain active input runs in mergeactive[]. Then, load
* as many tuples as we can from each active input tape, and finally
* fill the merge heap with the first tuple from each active tape.
- *
- * finalMergeBatch indicates if this is the beginning of a final on-the-fly
- * merge where a batched allocation of tuple memory is required.
*/
static void
-beginmerge(Tuplesortstate *state, bool finalMergeBatch)
+beginmerge(Tuplesortstate *state)
{
int activeTapes;
int tapenum;
int srcTape;
- int slotsPerTape;
- int64 spacePerTape;
/* Heap should be empty here */
Assert(state->memtupcount == 0);
@@ -2729,497 +2761,48 @@ beginmerge(Tuplesortstate *state, bool finalMergeBatch)
}
state->activeTapes = activeTapes;
- /* Clear merge-pass state variables */
- memset(state->mergenext, 0,
- state->maxTapes * sizeof(*state->mergenext));
- memset(state->mergelast, 0,
- state->maxTapes * sizeof(*state->mergelast));
- state->mergefreelist = 0; /* nothing in the freelist */
- state->mergefirstfree = activeTapes; /* 1st slot avail for preread */
-
- if (finalMergeBatch)
- {
- /* Free outright buffers for tape never actually allocated */
- FREEMEM(state, (state->maxTapes - activeTapes) * TAPE_BUFFER_OVERHEAD);
-
- /*
- * Grow memtuples one last time, since the palloc() overhead no longer
- * incurred can make a big difference
- */
- batchmemtuples(state);
- }
-
/*
* Initialize space allocation to let each active input tape have an equal
* share of preread space.
*/
Assert(activeTapes > 0);
- slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
- Assert(slotsPerTape > 0);
- spacePerTape = MAXALIGN_DOWN(state->availMem / activeTapes);
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
- {
- if (state->mergeactive[srcTape])
- {
- state->mergeavailslots[srcTape] = slotsPerTape;
- state->mergeavailmem[srcTape] = spacePerTape;
- }
- }
-
- /*
- * Preallocate tuple batch memory for each tape. This is the memory used
- * for tuples themselves (not SortTuples), so it's never used by
- * pass-by-value datum sorts. Memory allocation is performed here at most
- * once per sort, just in advance of the final on-the-fly merge step.
- */
- if (finalMergeBatch)
- mergebatch(state, spacePerTape);
-
- /*
- * Preread as many tuples as possible (and at least one) from each active
- * tape
- */
- mergepreread(state);
/* Load the merge heap with the first tuple from each input tape */
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
- int tupIndex = state->mergenext[srcTape];
- SortTuple *tup;
-
- if (tupIndex)
- {
- tup = &state->memtuples[tupIndex];
- state->mergenext[srcTape] = tup->tupindex;
- if (state->mergenext[srcTape] == 0)
- state->mergelast[srcTape] = 0;
- tuplesort_heap_insert(state, tup, srcTape, false);
- /* put the now-unused memtuples entry on the freelist */
- tup->tupindex = state->mergefreelist;
- state->mergefreelist = tupIndex;
- state->mergeavailslots[srcTape]++;
-
-#ifdef TRACE_SORT
- if (trace_sort && finalMergeBatch)
- {
- int64 perTapeKB = (spacePerTape + 1023) / 1024;
- int64 usedSpaceKB;
- int usedSlots;
+ SortTuple tup;
- /*
- * Report how effective batchmemtuples() was in balancing the
- * number of slots against the need for memory for the
- * underlying tuples (e.g. IndexTuples). The big preread of
- * all tapes when switching to FINALMERGE state should be
- * fairly representative of memory utilization during the
- * final merge step, and in any case is the only point at
- * which all tapes are guaranteed to have depleted either
- * their batch memory allowance or slot allowance. Ideally,
- * both will be completely depleted for every tape by now.
- */
- usedSpaceKB = (state->mergecurrent[srcTape] -
- state->mergetuples[srcTape] + 1023) / 1024;
- usedSlots = slotsPerTape - state->mergeavailslots[srcTape];
-
- elog(LOG, "tape %d initially used " INT64_FORMAT " KB of "
- INT64_FORMAT " KB batch (%2.3f) and %d out of %d slots "
- "(%2.3f)", srcTape,
- usedSpaceKB, perTapeKB,
- (double) usedSpaceKB / (double) perTapeKB,
- usedSlots, slotsPerTape,
- (double) usedSlots / (double) slotsPerTape);
- }
-#endif
- }
+ if (mergereadnext(state, srcTape, &tup))
+ tuplesort_heap_insert(state, &tup, srcTape, false);
}
}
/*
- * batchmemtuples - grow memtuples without palloc overhead
+ * mergereadnext - load tuple from one merge input tape
*
- * When called, availMem should be approximately the amount of memory we'd
- * require to allocate memtupsize - memtupcount tuples (not SortTuples/slots)
- * that were allocated with palloc() overhead, and in doing so use up all
- * allocated slots. However, though slots and tuple memory is in balance
- * following the last grow_memtuples() call, that's predicated on the observed
- * average tuple size for the "final" grow_memtuples() call, which includes
- * palloc overhead. During the final merge pass, where we will arrange to
- * squeeze out the palloc overhead, we might need more slots in the memtuples
- * array.
- *
- * To make that happen, arrange for the amount of remaining memory to be
- * exactly equal to the palloc overhead multiplied by the current size of
- * the memtuples array, force the grow_memtuples flag back to true (it's
- * probably but not necessarily false on entry to this routine), and then
- * call grow_memtuples. This simulates loading enough tuples to fill the
- * whole memtuples array and then having some space left over because of the
- * elided palloc overhead. We expect that grow_memtuples() will conclude that
- * it can't double the size of the memtuples array but that it can increase
- * it by some percentage; but if it does decide to double it, that just means
- * that we've never managed to use many slots in the memtuples array, in which
- * case doubling it shouldn't hurt anything anyway.
- */
-static void
-batchmemtuples(Tuplesortstate *state)
-{
- int64 refund;
- int64 availMemLessRefund;
- int memtupsize = state->memtupsize;
-
- /* For simplicity, assume no memtuples are actually currently counted */
- Assert(state->memtupcount == 0);
-
- /*
- * Refund STANDARDCHUNKHEADERSIZE per tuple.
- *
- * This sometimes fails to make memory use perfectly balanced, but it
- * should never make the situation worse. Note that Assert-enabled builds
- * get a larger refund, due to a varying STANDARDCHUNKHEADERSIZE.
- */
- refund = memtupsize * STANDARDCHUNKHEADERSIZE;
- availMemLessRefund = state->availMem - refund;
-
- /*
- * To establish balanced memory use after refunding palloc overhead,
- * temporarily have our accounting indicate that we've allocated all
- * memory we're allowed to less that refund, and call grow_memtuples() to
- * have it increase the number of slots.
- */
- state->growmemtuples = true;
- USEMEM(state, availMemLessRefund);
- (void) grow_memtuples(state);
- /* Should not matter, but be tidy */
- FREEMEM(state, availMemLessRefund);
- state->growmemtuples = false;
-
-#ifdef TRACE_SORT
- if (trace_sort)
- {
- Size OldKb = (memtupsize * sizeof(SortTuple) + 1023) / 1024;
- Size NewKb = (state->memtupsize * sizeof(SortTuple) + 1023) / 1024;
-
- elog(LOG, "grew memtuples %1.2fx from %d (%zu KB) to %d (%zu KB) for final merge",
- (double) NewKb / (double) OldKb,
- memtupsize, OldKb,
- state->memtupsize, NewKb);
- }
-#endif
-}
-
-/*
- * mergebatch - initialize tuple memory in batch
- *
- * This allows sequential access to sorted tuples buffered in memory from
- * tapes/runs on disk during a final on-the-fly merge step. Note that the
- * memory is not used for SortTuples, but for the underlying tuples (e.g.
- * MinimalTuples).
- *
- * Note that when batch memory is used, there is a simple division of space
- * into large buffers (one per active tape). The conventional incremental
- * memory accounting (calling USEMEM() and FREEMEM()) is abandoned. Instead,
- * when each tape's memory budget is exceeded, a retail palloc() "overflow" is
- * performed, which is then immediately detected in a way that is analogous to
- * LACKMEM(). This keeps each tape's use of memory fair, which is always a
- * goal.
- */
-static void
-mergebatch(Tuplesortstate *state, int64 spacePerTape)
-{
- int srcTape;
-
- Assert(state->activeTapes > 0);
- Assert(state->tuples);
-
- /*
- * For the purposes of tuplesort's memory accounting, the batch allocation
- * is special, and regular memory accounting through USEMEM() calls is
- * abandoned (see mergeprereadone()).
- */
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
- {
- char *mergetuples;
-
- if (!state->mergeactive[srcTape])
- continue;
-
- /* Allocate buffer for each active tape */
- mergetuples = MemoryContextAllocHuge(state->tuplecontext,
- spacePerTape);
-
- /* Initialize state for tape */
- state->mergetuples[srcTape] = mergetuples;
- state->mergecurrent[srcTape] = mergetuples;
- state->mergetail[srcTape] = mergetuples;
- state->mergeoverflow[srcTape] = NULL;
- }
-
- state->batchUsed = true;
- state->spacePerTape = spacePerTape;
-}
-
-/*
- * mergebatchone - prepare batch memory for one merge input tape
- *
- * This is called following the exhaustion of preread tuples for one input
- * tape. All that actually occurs is that the state for the source tape is
- * reset to indicate that all memory may be reused.
- *
- * This routine must deal with fixing up the tuple that is about to be returned
- * to the client, due to "overflow" allocations.
- */
-static void
-mergebatchone(Tuplesortstate *state, int srcTape, SortTuple *rtup,
- bool *should_free)
-{
- Assert(state->batchUsed);
-
- /*
- * Tuple about to be returned to caller ("stup") is final preread tuple
- * from tape, just removed from the top of the heap. Special steps around
- * memory management must be performed for that tuple, to make sure it
- * isn't overwritten early.
- */
- if (!state->mergeoverflow[srcTape])
- {
- Size tupLen;
-
- /*
- * Mark tuple buffer range for reuse, but be careful to move final,
- * tail tuple to start of space for next run so that it's available to
- * caller when stup is returned, and remains available at least until
- * the next tuple is requested.
- */
- tupLen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
- state->mergecurrent[srcTape] = state->mergetuples[srcTape];
- MOVETUP(state->mergecurrent[srcTape], state->mergetail[srcTape],
- tupLen);
-
- /* Make SortTuple at top of the merge heap point to new tuple */
- rtup->tuple = (void *) state->mergecurrent[srcTape];
-
- state->mergetail[srcTape] = state->mergecurrent[srcTape];
- state->mergecurrent[srcTape] += tupLen;
- }
- else
- {
- /*
- * Handle an "overflow" retail palloc.
- *
- * This is needed when we run out of tuple memory for the tape.
- */
- state->mergecurrent[srcTape] = state->mergetuples[srcTape];
- state->mergetail[srcTape] = state->mergetuples[srcTape];
-
- if (rtup->tuple)
- {
- Assert(rtup->tuple == (void *) state->mergeoverflow[srcTape]);
- /* Caller should free palloc'd tuple */
- *should_free = true;
- }
- state->mergeoverflow[srcTape] = NULL;
- }
-}
-
-/*
- * mergebatchfreetape - handle final clean-up for batch memory once tape is
- * about to become exhausted
- *
- * All tuples are returned from tape, but a single final tuple, *rtup, is to be
- * passed back to caller. Free tape's batch allocation buffer while ensuring
- * that the final tuple is managed appropriately.
- */
-static void
-mergebatchfreetape(Tuplesortstate *state, int srcTape, SortTuple *rtup,
- bool *should_free)
-{
- Assert(state->batchUsed);
- Assert(state->status == TSS_FINALMERGE);
-
- /*
- * Tuple may or may not already be an overflow allocation from
- * mergebatchone()
- */
- if (!*should_free && rtup->tuple)
- {
- /*
- * Final tuple still in tape's batch allocation.
- *
- * Return palloc()'d copy to caller, and have it freed in a similar
- * manner to overflow allocation. Otherwise, we'd free batch memory
- * and pass back a pointer to garbage. Note that we deliberately
- * allocate this in the parent tuplesort context, to be on the safe
- * side.
- */
- Size tuplen;
- void *oldTuple = rtup->tuple;
-
- tuplen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
- rtup->tuple = MemoryContextAlloc(state->sortcontext, tuplen);
- MOVETUP(rtup->tuple, oldTuple, tuplen);
- *should_free = true;
- }
-
- /* Free spacePerTape-sized buffer */
- pfree(state->mergetuples[srcTape]);
-}
-
-/*
- * mergebatchalloc - allocate memory for one tuple using a batch memory
- * "logical allocation".
- *
- * This is used for the final on-the-fly merge phase only. READTUP() routines
- * receive memory from here in place of palloc() and USEMEM() calls.
- *
- * Tuple tapenum is passed, ensuring each tape's tuples are stored in sorted,
- * contiguous order (while allowing safe reuse of memory made available to
- * each tape). This maximizes locality of access as tuples are returned by
- * final merge.
- *
- * Caller must not subsequently attempt to free memory returned here. In
- * general, only mergebatch* functions know about how memory returned from
- * here should be freed, and this function's caller must ensure that batch
- * memory management code will definitely have the opportunity to do the right
- * thing during the final on-the-fly merge.
- */
-static void *
-mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen)
-{
- Size reserve_tuplen = MAXALIGN(tuplen);
- char *ret;
-
- /* Should overflow at most once before mergebatchone() call: */
- Assert(state->mergeoverflow[tapenum] == NULL);
- Assert(state->batchUsed);
-
- /* It should be possible to use precisely spacePerTape memory at once */
- if (state->mergecurrent[tapenum] + reserve_tuplen <=
- state->mergetuples[tapenum] + state->spacePerTape)
- {
- /*
- * Usual case -- caller is returned pointer into its tape's buffer,
- * and an offset from that point is recorded as where tape has
- * consumed up to for current round of preloading.
- */
- ret = state->mergetail[tapenum] = state->mergecurrent[tapenum];
- state->mergecurrent[tapenum] += reserve_tuplen;
- }
- else
- {
- /*
- * Allocate memory, and record as tape's overflow allocation. This
- * will be detected quickly, in a similar fashion to a LACKMEM()
- * condition, and should not happen again before a new round of
- * preloading for caller's tape. Note that we deliberately allocate
- * this in the parent tuplesort context, to be on the safe side.
- *
- * Sometimes, this does not happen because merging runs out of slots
- * before running out of memory.
- */
- ret = state->mergeoverflow[tapenum] =
- MemoryContextAlloc(state->sortcontext, tuplen);
- }
-
- return ret;
-}
-
-/*
- * mergepreread - load tuples from merge input tapes
- *
- * This routine exists to improve sequentiality of reads during a merge pass,
- * as explained in the header comments of this file. Load tuples from each
- * active source tape until the tape's run is exhausted or it has used up
- * its fair share of available memory. In any case, we guarantee that there
- * is at least one preread tuple available from each unexhausted input tape.
- *
- * We invoke this routine at the start of a merge pass for initial load,
- * and then whenever any tape's preread data runs out. Note that we load
- * as much data as possible from all tapes, not just the one that ran out.
- * This is because logtape.c works best with a usage pattern that alternates
- * between reading a lot of data and writing a lot of data, so whenever we
- * are forced to read, we should fill working memory completely.
- *
- * In FINALMERGE state, we *don't* use this routine, but instead just preread
- * from the single tape that ran dry. There's no read/write alternation in
- * that state and so no point in scanning through all the tapes to fix one.
- * (Moreover, there may be quite a lot of inactive tapes in that state, since
- * we might have had many fewer runs than tapes. In a regular tape-to-tape
- * merge we can expect most of the tapes to be active. Plus, only
- * FINALMERGE state has to consider memory management for a batch
- * allocation.)
- */
-static void
-mergepreread(Tuplesortstate *state)
-{
- int srcTape;
-
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
- mergeprereadone(state, srcTape);
-}
-
-/*
- * mergeprereadone - load tuples from one merge input tape
+ * Returns false on EOF.
*
* Read tuples from the specified tape until it has used up its free memory
* or array slots; but ensure that we have at least one tuple, if any are
* to be had.
*/
-static void
-mergeprereadone(Tuplesortstate *state, int srcTape)
+static bool
+mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
{
unsigned int tuplen;
- SortTuple stup;
- int tupIndex;
- int64 priorAvail,
- spaceUsed;
if (!state->mergeactive[srcTape])
- return; /* tape's run is already exhausted */
+ return false; /* tape's run is already exhausted */
- /*
- * Manage per-tape availMem. Only actually matters when batch memory not
- * in use.
- */
- priorAvail = state->availMem;
- state->availMem = state->mergeavailmem[srcTape];
-
- /*
- * When batch memory is used if final on-the-fly merge, only mergeoverflow
- * test is relevant; otherwise, only LACKMEM() test is relevant.
- */
- while ((state->mergeavailslots[srcTape] > 0 &&
- state->mergeoverflow[srcTape] == NULL && !LACKMEM(state)) ||
- state->mergenext[srcTape] == 0)
+ /* read next tuple, if any */
+ if ((tuplen = getlen(state, srcTape, true)) == 0)
{
- /* read next tuple, if any */
- if ((tuplen = getlen(state, srcTape, true)) == 0)
- {
- state->mergeactive[srcTape] = false;
- break;
- }
- READTUP(state, &stup, srcTape, tuplen);
- /* find a free slot in memtuples[] for it */
- tupIndex = state->mergefreelist;
- if (tupIndex)
- state->mergefreelist = state->memtuples[tupIndex].tupindex;
- else
- {
- tupIndex = state->mergefirstfree++;
- Assert(tupIndex < state->memtupsize);
- }
- state->mergeavailslots[srcTape]--;
- /* store tuple, append to list for its tape */
- stup.tupindex = 0;
- state->memtuples[tupIndex] = stup;
- if (state->mergelast[srcTape])
- state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
- else
- state->mergenext[srcTape] = tupIndex;
- state->mergelast[srcTape] = tupIndex;
+ state->mergeactive[srcTape] = false;
+ return false;
}
- /* update per-tape and global availmem counts */
- spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
- state->mergeavailmem[srcTape] = state->availMem;
- state->availMem = priorAvail - spaceUsed;
+ READTUP(state, stup, srcTape, tuplen);
+
+ return true;
}
/*
@@ -3857,27 +3440,24 @@ markrunend(Tuplesortstate *state, int tapenum)
* routines.
*/
static void *
-readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen)
+readtup_alloc(Tuplesortstate *state, Size tuplen)
{
- if (state->batchUsed)
- {
- /*
- * No USEMEM() call, because during final on-the-fly merge accounting
- * is based on tape-private state. ("Overflow" allocations are
- * detected as an indication that a new round or preloading is
- * required. Preloading marks existing contents of tape's batch buffer
- * for reuse.)
- */
- return mergebatchalloc(state, tapenum, tuplen);
- }
+ MergeTupleBuffer *buf;
+
+ /*
+ * We pre-allocate enough buffers in the arena that we should never run out.
+ */
+ Assert(state->freeBufferHead);
+
+ if (tuplen > MERGETUPLEBUFFER_SIZE || !state->freeBufferHead)
+ return MemoryContextAlloc(state->sortcontext, tuplen);
else
{
- char *ret;
+ buf = state->freeBufferHead;
+ /* Reuse this buffer */
+ state->freeBufferHead = buf->nextfree;
- /* Batch allocation yet to be performed */
- ret = MemoryContextAlloc(state->tuplecontext, tuplen);
- USEMEM(state, GetMemoryChunkSpace(ret));
- return ret;
+ return buf;
}
}
@@ -4046,8 +3626,11 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
LogicalTapeWrite(state->tapeset, tapenum,
(void *) &tuplen, sizeof(tuplen));
- FREEMEM(state, GetMemoryChunkSpace(tuple));
- heap_free_minimal_tuple(tuple);
+ if (!state->batchUsed)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ heap_free_minimal_tuple(tuple);
+ }
}
static void
@@ -4056,7 +3639,7 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
{
unsigned int tupbodylen = len - sizeof(int);
unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
- MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tapenum, tuplen);
+ MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen);
char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
HeapTupleData htup;
@@ -4077,12 +3660,6 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
&stup->isnull1);
}
-static void
-movetup_heap(void *dest, void *src, unsigned int len)
-{
- memmove(dest, src, len);
-}
-
/*
* Routines specialized for the CLUSTER case (HeapTuple data, with
* comparisons per a btree index definition)
@@ -4289,8 +3866,11 @@ writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
LogicalTapeWrite(state->tapeset, tapenum,
&tuplen, sizeof(tuplen));
- FREEMEM(state, GetMemoryChunkSpace(tuple));
- heap_freetuple(tuple);
+ if (!state->batchUsed)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ heap_freetuple(tuple);
+ }
}
static void
@@ -4299,7 +3879,6 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
{
unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
HeapTuple tuple = (HeapTuple) readtup_alloc(state,
- tapenum,
t_len + HEAPTUPLESIZE);
/* Reconstruct the HeapTupleData header */
@@ -4324,19 +3903,6 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
&stup->isnull1);
}
-static void
-movetup_cluster(void *dest, void *src, unsigned int len)
-{
- HeapTuple tuple;
-
- memmove(dest, src, len);
-
- /* Repoint the HeapTupleData header */
- tuple = (HeapTuple) dest;
- tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
-}
-
-
/*
* Routines specialized for IndexTuple case
*
@@ -4604,8 +4170,11 @@ writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
LogicalTapeWrite(state->tapeset, tapenum,
(void *) &tuplen, sizeof(tuplen));
- FREEMEM(state, GetMemoryChunkSpace(tuple));
- pfree(tuple);
+ if (!state->batchUsed)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ pfree(tuple);
+ }
}
static void
@@ -4613,7 +4182,7 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len)
{
unsigned int tuplen = len - sizeof(unsigned int);
- IndexTuple tuple = (IndexTuple) readtup_alloc(state, tapenum, tuplen);
+ IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen);
LogicalTapeReadExact(state->tapeset, tapenum,
tuple, tuplen);
@@ -4628,12 +4197,6 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
&stup->isnull1);
}
-static void
-movetup_index(void *dest, void *src, unsigned int len)
-{
- memmove(dest, src, len);
-}
-
/*
* Routines specialized for DatumTuple case
*/
@@ -4700,7 +4263,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
LogicalTapeWrite(state->tapeset, tapenum,
(void *) &writtenlen, sizeof(writtenlen));
- if (stup->tuple)
+ if (!state->batchUsed && stup->tuple)
{
FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
pfree(stup->tuple);
@@ -4730,7 +4293,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
}
else
{
- void *raddr = readtup_alloc(state, tapenum, tuplen);
+ void *raddr = readtup_alloc(state, tuplen);
LogicalTapeReadExact(state->tapeset, tapenum,
raddr, tuplen);
@@ -4744,12 +4307,6 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
&tuplen, sizeof(tuplen));
}
-static void
-movetup_datum(void *dest, void *src, unsigned int len)
-{
- memmove(dest, src, len);
-}
-
/*
* Convenience routine to free a tuple previously loaded into sort memory
*/
--
2.9.3
0002-Use-larger-read-buffers-in-logtape.patchtext/x-diff; name=0002-Use-larger-read-buffers-in-logtape.patchDownload
From 379580dc600c079b8de3cc2f392376ad46429758 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Thu, 8 Sep 2016 20:34:06 +0300
Subject: [PATCH 2/3] Use larger read buffers in logtape.
This makes the access pattern appear more sequential to the OS, making it
more likely that the OS will do read-head for use. It will also ensure that
there are more sequential blocks available when writing, because we can
free more blocks in the underlying file at once. Sequential I/O is much
cheaper than random I/O.
We used to do pre-reading from each tape, in tuplesort.c, for the same
reasons. But it seems simpler to do it in logtape.c, reading the raw data
into larger a buffer, than converting every tuple to SortTuple format when
pre-reading, like tuplesort.c used to do.
---
src/backend/utils/sort/logtape.c | 134 +++++++++++++++++++++++++++++++------
src/backend/utils/sort/tuplesort.c | 35 +++++++++-
src/include/utils/logtape.h | 1 +
3 files changed, 147 insertions(+), 23 deletions(-)
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 7745207..05d7697 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -131,9 +131,12 @@ typedef struct LogicalTape
* reading.
*/
char *buffer; /* physical buffer (separately palloc'd) */
+ int buffer_size; /* allocated size of the buffer */
long curBlockNumber; /* this block's logical blk# within tape */
int pos; /* next read/write position in buffer */
int nbytes; /* total # of valid bytes in buffer */
+
+ int read_buffer_size; /* buffer size to use when reading */
} LogicalTape;
/*
@@ -228,6 +231,53 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
}
/*
+ * Read as many blocks as we can into the per-tape buffer.
+ *
+ * The caller can specify the next physical block number to read, in
+ * datablocknum, or -1 to fetch the next block number from the internal block.
+ * If datablocknum == -1, the caller must've already set curBlockNumber.
+ *
+ * Returns true if anything was read, 'false' on EOF.
+ */
+static bool
+ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt, long datablocknum)
+{
+ lt->pos = 0;
+ lt->nbytes = 0;
+
+ do
+ {
+ /* Fetch next block number (unless provided by caller) */
+ if (datablocknum == -1)
+ {
+ datablocknum = ltsRecallNextBlockNum(lts, lt->indirect, lt->frozen);
+ if (datablocknum == -1L)
+ break; /* EOF */
+ lt->curBlockNumber++;
+ }
+
+ /* Read the block */
+ ltsReadBlock(lts, datablocknum, (void *) (lt->buffer + lt->nbytes));
+ if (!lt->frozen)
+ ltsReleaseBlock(lts, datablocknum);
+
+ if (lt->curBlockNumber < lt->numFullBlocks)
+ lt->nbytes += BLCKSZ;
+ else
+ {
+ /* EOF */
+ lt->nbytes += lt->lastBlockBytes;
+ break;
+ }
+
+ /* Advance to next block, if we have buffer space left */
+ datablocknum = -1;
+ } while (lt->nbytes < lt->buffer_size);
+
+ return (lt->nbytes > 0);
+}
+
+/*
* qsort comparator for sorting freeBlocks[] into decreasing order.
*/
static int
@@ -546,6 +596,8 @@ LogicalTapeSetCreate(int ntapes)
lt->numFullBlocks = 0L;
lt->lastBlockBytes = 0;
lt->buffer = NULL;
+ lt->buffer_size = 0;
+ lt->read_buffer_size = BLCKSZ;
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
@@ -628,7 +680,10 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
/* Allocate data buffer and first indirect block on first write */
if (lt->buffer == NULL)
+ {
lt->buffer = (char *) palloc(BLCKSZ);
+ lt->buffer_size = BLCKSZ;
+ }
if (lt->indirect == NULL)
{
lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock));
@@ -636,6 +691,7 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
lt->indirect->nextup = NULL;
}
+ Assert(lt->buffer_size == BLCKSZ);
while (size > 0)
{
if (lt->pos >= BLCKSZ)
@@ -709,18 +765,19 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
Assert(lt->frozen);
datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
}
+
+ /* Allocate a read buffer */
+ if (lt->buffer)
+ pfree(lt->buffer);
+ lt->buffer = palloc(lt->read_buffer_size);
+ lt->buffer_size = lt->read_buffer_size;
+
/* Read the first block, or reset if tape is empty */
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
if (datablocknum != -1L)
- {
- ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
- if (!lt->frozen)
- ltsReleaseBlock(lts, datablocknum);
- lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
- BLCKSZ : lt->lastBlockBytes;
- }
+ ltsReadFillBuffer(lts, lt, datablocknum);
}
else
{
@@ -754,6 +811,13 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
+
+ if (lt->buffer)
+ {
+ pfree(lt->buffer);
+ lt->buffer = NULL;
+ lt->buffer_size = 0;
+ }
}
}
@@ -779,20 +843,8 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
if (lt->pos >= lt->nbytes)
{
/* Try to load more data into buffer. */
- long datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
- lt->frozen);
-
- if (datablocknum == -1L)
+ if (!ltsReadFillBuffer(lts, lt, -1))
break; /* EOF */
- lt->curBlockNumber++;
- lt->pos = 0;
- ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
- if (!lt->frozen)
- ltsReleaseBlock(lts, datablocknum);
- lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
- BLCKSZ : lt->lastBlockBytes;
- if (lt->nbytes <= 0)
- break; /* EOF (possible here?) */
}
nthistime = lt->nbytes - lt->pos;
@@ -842,6 +894,22 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
lt->writing = false;
lt->frozen = true;
datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
+
+ /*
+ * The seek and backspace functions assume a single block read buffer.
+ * That's OK with current usage. A larger buffer is helpful to make the
+ * read pattern of the backing file look more sequential to the OS, when
+ * we're reading from multiple tapes. But at the end of a sort, when a
+ * tape is frozen, we only read from a single tape anyway.
+ */
+ if (!lt->buffer || lt->buffer_size != BLCKSZ)
+ {
+ if (lt->buffer)
+ pfree(lt->buffer);
+ lt->buffer = palloc(BLCKSZ);
+ lt->buffer_size = BLCKSZ;
+ }
+
/* Read the first block, or reset if tape is empty */
lt->curBlockNumber = 0L;
lt->pos = 0;
@@ -875,6 +943,7 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
Assert(lt->frozen);
+ Assert(lt->buffer_size == BLCKSZ);
/*
* Easy case for seek within current block.
@@ -941,6 +1010,7 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
lt = <s->tapes[tapenum];
Assert(lt->frozen);
Assert(offset >= 0 && offset <= BLCKSZ);
+ Assert(lt->buffer_size == BLCKSZ);
/*
* Easy case for seek within current block.
@@ -1000,6 +1070,9 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
{
LogicalTape *lt;
+ /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
+ Assert(lt->buffer_size == BLCKSZ);
+
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
*blocknum = lt->curBlockNumber;
@@ -1014,3 +1087,24 @@ LogicalTapeSetBlocks(LogicalTapeSet *lts)
{
return lts->nFileBlocks;
}
+
+/*
+ * Set buffer size to use, when reading from given tape.
+ */
+void
+LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t avail_mem)
+{
+ LogicalTape *lt;
+
+ Assert(tapenum >= 0 && tapenum < lts->nTapes);
+ lt = <s->tapes[tapenum];
+
+ /*
+ * The buffer size must be a multiple of BLCKSZ in size, so round the
+ * given value down to nearest BLCKSZ. Make sure we have at least one page.
+ */
+ if (avail_mem < BLCKSZ)
+ avail_mem = BLCKSZ;
+ avail_mem -= avail_mem % BLCKSZ;
+ lt->read_buffer_size = avail_mem;
+}
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index b9fb99c..dc35fcf 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -2487,6 +2487,8 @@ mergeruns(Tuplesortstate *state)
svDummy;
char *p;
int i;
+ int per_tape, cutoff;
+ long avail_blocks;
Assert(state->status == TSS_BUILDRUNS);
Assert(state->memtupcount == 0);
@@ -2535,15 +2537,17 @@ mergeruns(Tuplesortstate *state)
USEMEM(state, state->memtupsize * sizeof(SortTuple));
/*
- * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism to
- * track memory usage.
+ * If we had fewer runs than tapes, refund buffers for tapes that were never
+ * allocated.
*/
- state->batchUsed = true;
+ if (state->currentRun < state->maxTapes)
+ FREEMEM(state, (state->maxTapes - state->currentRun) * TAPE_BUFFER_OVERHEAD);
/* Initialize the merge tuple buffer arena. */
state->batchMemoryBegin = palloc((state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE);
state->batchMemoryEnd = state->batchMemoryBegin + (state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE;
state->freeBufferHead = (MergeTupleBuffer *) state->batchMemoryBegin;
+ USEMEM(state, (state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE);
p = state->batchMemoryBegin;
for (i = 0; i < state->maxTapes; i++)
@@ -2553,6 +2557,31 @@ mergeruns(Tuplesortstate *state)
}
((MergeTupleBuffer *) p)->nextfree = NULL;
+ /*
+ * Use all the spare memory we have available for read buffers. Divide it
+ * memory evenly among all the tapes.
+ */
+ avail_blocks = state->availMem / BLCKSZ;
+ per_tape = avail_blocks / state->maxTapes;
+ cutoff = avail_blocks % state->maxTapes;
+ if (per_tape == 0)
+ {
+ per_tape = 1;
+ cutoff = 0;
+ }
+ for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
+ {
+ LogicalTapeAssignReadBufferSize(state->tapeset, tapenum,
+ (per_tape + (tapenum < cutoff ? 1 : 0)) * BLCKSZ);
+ }
+ USEMEM(state, avail_blocks * BLCKSZ);
+
+ /*
+ * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism to
+ * track memory usage of indivitual tuples.
+ */
+ state->batchUsed = true;
+
/* End of step D2: rewind all output tapes to prepare for merging */
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
LogicalTapeRewind(state->tapeset, tapenum, false);
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index fa1e992..03d0a6f 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -39,6 +39,7 @@ extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
long blocknum, int offset);
extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
long *blocknum, int *offset);
+extern void LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t bufsize);
extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
#endif /* LOGTAPE_H */
--
2.9.3
0003-Add-sorting-test-suite.patchtext/x-diff; name=0003-Add-sorting-test-suite.patchDownload
From c63cd34aa51941d5851dfd6d3d273415ad02a7fb Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Thu, 8 Sep 2016 21:42:55 +0300
Subject: [PATCH 3/3] Add sorting test suite
---
src/test/sorttestsuite/Makefile | 31 ++++++
src/test/sorttestsuite/correctness.c | 153 +++++++++++++++++++++++++++
src/test/sorttestsuite/generate.c | 198 +++++++++++++++++++++++++++++++++++
src/test/sorttestsuite/speed.c | 139 ++++++++++++++++++++++++
4 files changed, 521 insertions(+)
create mode 100644 src/test/sorttestsuite/Makefile
create mode 100644 src/test/sorttestsuite/correctness.c
create mode 100644 src/test/sorttestsuite/generate.c
create mode 100644 src/test/sorttestsuite/speed.c
diff --git a/src/test/sorttestsuite/Makefile b/src/test/sorttestsuite/Makefile
new file mode 100644
index 0000000..91c8ccd
--- /dev/null
+++ b/src/test/sorttestsuite/Makefile
@@ -0,0 +1,31 @@
+CFLAGS=-g -I/home/heikki/pgsql.master/include
+
+LDFLAGS=-L/home/heikki/pgsql.master/lib -lpq -lm
+
+TESTDB=sorttest
+
+# For testing quicksort.
+SCALE_SMALL=1024 # 1 MB
+
+# For testing external sort, while the dataset still fits in OS cache.
+SCALE_MEDIUM=1048576 # 1 GB
+
+# Does not fit in memory.
+SCALE_LARGE=20971520 # 20 GB
+#SCALE_LARGE=1500000 # 20 GB
+
+all: generate speed correctness
+
+generate: generate.c
+
+speed: speed.c
+
+correctness: correctness.c
+
+generate_testdata:
+ dropdb --if-exists $(TESTDB)
+ createdb $(TESTDB)
+ psql $(TESTDB) -c "CREATE SCHEMA small; CREATE SCHEMA medium; CREATE SCHEMA large;"
+ (echo "set search_path=small;"; ./generate all $(SCALE_SMALL)) | psql $(TESTDB)
+ (echo "set search_path=medium;"; ./generate all $(SCALE_MEDIUM)) | psql $(TESTDB)
+ (echo "set search_path=large;"; ./generate all $(SCALE_LARGE)) | psql $(TESTDB)
diff --git a/src/test/sorttestsuite/correctness.c b/src/test/sorttestsuite/correctness.c
new file mode 100644
index 0000000..b41aa2e
--- /dev/null
+++ b/src/test/sorttestsuite/correctness.c
@@ -0,0 +1,153 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <sys/time.h>
+
+#include <libpq-fe.h>
+
+static PGconn *conn;
+
+static void
+execute(const char *sql)
+{
+ int i;
+ PGresult *res;
+
+ fprintf(stderr, "%s\n", sql);
+
+ res = PQexec(conn, sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr,"command failed: %s\n%s", sql, PQerrorMessage(conn));
+ PQclear(res);
+ exit(1);
+ }
+
+ PQclear(res);
+}
+
+static void
+check_sorted(const char *sql, int (*cmp)(const char *a, const char *b))
+{
+ int i;
+ PGresult *res;
+ PGresult *prevres = NULL;
+ int rowno;
+
+ fprintf(stderr, "running query: %s\n", sql);
+ if (!PQsendQuery(conn, sql))
+ {
+ fprintf(stderr,"query failed: %s\n%s", sql, PQerrorMessage(conn));
+ PQclear(res);
+ exit(1);
+ }
+ if (!PQsetSingleRowMode(conn))
+ {
+ fprintf(stderr,"setting single-row mode failed: %s", PQerrorMessage(conn));
+ PQclear(res);
+ exit(1);
+ }
+
+ rowno = 1;
+ while (res = PQgetResult(conn))
+ {
+ if (PQresultStatus(res) == PGRES_TUPLES_OK)
+ continue;
+ if (PQresultStatus(res) != PGRES_SINGLE_TUPLE)
+ {
+ fprintf(stderr,"error while fetching: %d, %s\n%s", PQresultStatus(res), sql, PQerrorMessage(conn));
+ PQclear(res);
+ exit(1);
+ }
+
+ if (prevres)
+ {
+ if (!cmp(PQgetvalue(prevres, 0, 0), PQgetvalue(res, 0, 0)))
+ {
+ fprintf(stderr,"FAIL: result not sorted, row %d: %s, prev %s\n", rowno,
+ PQgetvalue(prevres, 0, 0), PQgetvalue(res, 0, 0));
+ PQclear(res);
+ exit(1);
+ }
+ PQclear(prevres);
+ }
+ prevres = res;
+
+ rowno++;
+ }
+
+ if (prevres)
+ PQclear(prevres);
+}
+
+
+static int
+compare_strings(const char *a, const char *b)
+{
+ return strcmp(a, b) <= 0;
+}
+
+static int
+compare_ints(const char *a, const char *b)
+{
+ return atoi(a) <= atoi(b);
+}
+
+int
+main(int argc, char **argv)
+{
+ double duration;
+ char buf[1000];
+
+ /* Make a connection to the database */
+ conn = PQconnectdb("");
+
+ /* Check to see that the backend connection was successfully made */
+ if (PQstatus(conn) != CONNECTION_OK)
+ {
+ fprintf(stderr, "Connection to database failed: %s",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+ execute("set trace_sort=on");
+
+ execute("set work_mem = '4MB'");
+
+ check_sorted("SELECT * FROM small.ordered_ints ORDER BY i", compare_ints);
+ check_sorted("SELECT * FROM small.random_ints ORDER BY i", compare_ints);
+ check_sorted("SELECT * FROM small.ordered_text ORDER BY t", compare_strings);
+ check_sorted("SELECT * FROM small.random_text ORDER BY t", compare_strings);
+
+ execute("set work_mem = '16MB'");
+
+ check_sorted("SELECT * FROM medium.ordered_ints ORDER BY i", compare_ints);
+ check_sorted("SELECT * FROM medium.random_ints ORDER BY i", compare_ints);
+ check_sorted("SELECT * FROM medium.ordered_text ORDER BY t", compare_strings);
+ check_sorted("SELECT * FROM medium.random_text ORDER BY t", compare_strings);
+
+ execute("set work_mem = '256MB'");
+
+ check_sorted("SELECT * FROM medium.ordered_ints ORDER BY i", compare_ints);
+ check_sorted("SELECT * FROM medium.random_ints ORDER BY i", compare_ints);
+ check_sorted("SELECT * FROM medium.ordered_text ORDER BY t", compare_strings);
+ check_sorted("SELECT * FROM medium.random_text ORDER BY t", compare_strings);
+
+ execute("set work_mem = '512MB'");
+
+ check_sorted("SELECT * FROM medium.ordered_ints ORDER BY i", compare_ints);
+ check_sorted("SELECT * FROM medium.random_ints ORDER BY i", compare_ints);
+ check_sorted("SELECT * FROM medium.ordered_text ORDER BY t", compare_strings);
+ check_sorted("SELECT * FROM medium.random_text ORDER BY t", compare_strings);
+
+ execute("set work_mem = '2048MB'");
+
+ check_sorted("SELECT * FROM medium.ordered_ints ORDER BY i", compare_ints);
+ check_sorted("SELECT * FROM medium.random_ints ORDER BY i", compare_ints);
+ check_sorted("SELECT * FROM medium.ordered_text ORDER BY t", compare_strings);
+ check_sorted("SELECT * FROM medium.random_text ORDER BY t", compare_strings);
+
+ PQfinish(conn);
+
+ return 0;
+}
diff --git a/src/test/sorttestsuite/generate.c b/src/test/sorttestsuite/generate.c
new file mode 100644
index 0000000..f481189
--- /dev/null
+++ b/src/test/sorttestsuite/generate.c
@@ -0,0 +1,198 @@
+#include <math.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+static void
+generate_ordered_integers(int scale)
+{
+ int rows = ((double) scale) * 28.75;
+ int i;
+
+ printf("DROP TABLE IF EXISTS ordered_ints;\n");
+ printf("BEGIN;");
+ printf("CREATE TABLE ordered_ints (i int4);\n");
+ printf("COPY ordered_ints FROM STDIN WITH (FREEZE);\n");
+
+ for (i = 0; i < rows; i++)
+ printf("%d\n", i);
+
+ printf("\\.\n");
+ printf("COMMIT;\n");
+}
+
+static void
+generate_random_integers(int scale)
+{
+ int rows = ((double) scale) * 28.75;
+ int i;
+
+ printf("DROP TABLE IF EXISTS random_ints;\n");
+ printf("BEGIN;");
+ printf("CREATE TABLE random_ints (i int4);\n");
+ printf("COPY random_ints FROM STDIN WITH (FREEZE);\n");
+
+ for (i = 0; i < rows; i++)
+ printf("%d\n", random());
+
+ printf("\\.\n");
+ printf("COMMIT;\n");
+}
+
+#define ALPHABET_SIZE 26
+static const char alphabet[ALPHABET_SIZE + 1] = "abcdefghijklmnopqrstuvwxyz";
+
+#define TEXT_LEN 50
+
+static void
+random_string(char *buf, int len)
+{
+ int i;
+ long r;
+ long m;
+
+ m = 0;
+ for (i = 0; i < len; i++)
+ {
+ if (m / ALPHABET_SIZE < ALPHABET_SIZE)
+ {
+ m = RAND_MAX;
+ r = random();
+ }
+
+ *buf = alphabet[r % ALPHABET_SIZE];
+ m = m / ALPHABET_SIZE;
+ r = r / ALPHABET_SIZE;
+ buf++;
+ }
+ *buf = '\0';
+ return;
+}
+
+static void
+generate_random_text(int scale)
+{
+ int rows = ((double) scale) * 12.7;
+ int i;
+ char buf[TEXT_LEN + 1] = { 0 };
+
+ printf("DROP TABLE IF EXISTS random_text;\n");
+ printf("BEGIN;");
+ printf("CREATE TABLE random_text (t text);\n");
+ printf("COPY random_text FROM STDIN WITH (FREEZE);\n");
+
+ for (i = 0; i < rows; i++)
+ {
+ random_string(buf, TEXT_LEN);
+ printf("%s\n", buf);
+ }
+
+ printf("\\.\n");
+ printf("COMMIT;\n");
+}
+
+static void
+generate_ordered_text(int scale)
+{
+ int rows = ((double) scale) * 12.7;
+ int i;
+ int j;
+ char indexes[TEXT_LEN] = {0};
+ char buf[TEXT_LEN + 1];
+ double digits;
+
+ printf("DROP TABLE IF EXISTS ordered_text;\n");
+ printf("BEGIN;");
+ printf("CREATE TABLE ordered_text (t text);\n");
+ printf("COPY ordered_text FROM STDIN WITH (FREEZE);\n");
+
+ /*
+ * We don't want all the strings to have the same prefix.
+ * That makes the comparisons very expensive. That might be an
+ * interesting test case too, but not what we want here. To avoid
+ * that, figure out how many characters will change, with the #
+ * of rows we chose.
+ */
+ digits = ceil(log(rows) / log((double) ALPHABET_SIZE));
+
+ if (digits > TEXT_LEN)
+ digits = TEXT_LEN;
+
+ for (i = 0; i < rows; i++)
+ {
+ for (j = 0; j < TEXT_LEN; j++)
+ {
+ buf[j] = alphabet[indexes[j]];
+ }
+ buf[j] = '\0';
+ printf("%s\n", buf);
+
+ /* increment last character, carrying if needed */
+ for (j = digits - 1; j >= 0; j--)
+ {
+ indexes[j]++;
+ if (indexes[j] == ALPHABET_SIZE)
+ indexes[j] = 0;
+ else
+ break;
+ }
+ }
+
+ printf("\\.\n");
+ printf("COMMIT;\n");
+}
+
+
+struct
+{
+ char *name;
+ void (*generate_func)(int scale);
+} datasets[] =
+{
+ { "ordered_integers", generate_ordered_integers },
+ { "random_integers", generate_random_integers },
+ { "ordered_text", generate_ordered_text },
+ { "random_text", generate_random_text },
+ { NULL, NULL }
+};
+
+void
+usage()
+{
+ printf("Usage: generate <dataset name> [scale] [schema]");
+ exit(1);
+}
+
+int
+main(int argc, char **argv)
+{
+ int scale;
+ int i;
+ int found = 0;
+
+ if (argc < 2)
+ usage();
+
+ if (argc >= 3)
+ scale = atoi(argv[2]);
+ else
+ scale = 1024; /* 1 MB */
+
+ for (i = 0; datasets[i].name != NULL; i++)
+ {
+ if (strcmp(argv[1], datasets[i].name) == 0 ||
+ strcmp(argv[1], "all") == 0)
+ {
+ fprintf (stderr, "Generating %s for %d kB...\n", datasets[i].name, scale);
+ datasets[i].generate_func(scale);
+ found = 1;
+ }
+ }
+
+ if (!found)
+ {
+ fprintf(stderr, "unrecognized test name %s\n", argv[1]);
+ exit(1);
+ }
+ exit(0);
+}
diff --git a/src/test/sorttestsuite/speed.c b/src/test/sorttestsuite/speed.c
new file mode 100644
index 0000000..3ebc57c
--- /dev/null
+++ b/src/test/sorttestsuite/speed.c
@@ -0,0 +1,139 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <sys/time.h>
+
+#include <libpq-fe.h>
+
+#define REPETITIONS 3
+
+static PGconn *conn;
+
+/* returns duration in ms */
+static double
+execute(const char *sql)
+{
+ struct timeval before, after;
+ PGresult *res;
+
+ gettimeofday(&before, NULL);
+ res = PQexec(conn, sql);
+ gettimeofday(&after, NULL);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr,"command failed: %s\n%s", sql, PQerrorMessage(conn));
+ PQclear(res);
+ exit(1);
+ }
+ PQclear(res);
+
+ return (((double) (after.tv_sec - before.tv_sec)) * 1000.0 + ((double) (after.tv_usec - before.tv_usec) / 1000.0));
+}
+
+static void
+execute_test(const char *testname, const char *query)
+{
+ double duration;
+ char buf[100];
+ int i;
+
+ printf ("%s: ", testname);
+ fflush(stdout);
+ for (i = 0; i < REPETITIONS; i++)
+ {
+ duration = execute(query);
+
+ if (i > 0)
+ printf(", ");
+ printf("%.0f ms", duration);
+ fflush(stdout);
+ }
+ printf("\n");
+}
+
+int
+main(int argc, char **argv)
+{
+ double duration;
+ char buf[1000];
+
+ /* Make a connection to the database */
+ conn = PQconnectdb("");
+
+ /* Check to see that the backend connection was successfully made */
+ if (PQstatus(conn) != CONNECTION_OK)
+ {
+ fprintf(stderr, "Connection to database failed: %s",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+
+ execute("set trace_sort=on");
+
+ printf("# Tests on small tables (1 MB), 4MB work_mem\n");
+ printf("# Performs a quicksort\n");
+ printf("-----\n");
+ execute("set work_mem='4MB'");
+ execute_test("ordered_ints,", "SELECT COUNT(*) FROM (SELECT * FROM small.ordered_ints ORDER BY i) t");
+ execute_test("random_ints", "SELECT COUNT(*) FROM (SELECT * FROM small.random_ints ORDER BY i) t");
+ execute_test("ordered_text", "SELECT COUNT(*) FROM (SELECT * FROM small.ordered_text ORDER BY t) t");
+ execute_test("random_text", "SELECT COUNT(*) FROM (SELECT * FROM small.random_text ORDER BY t) t");
+ printf("\n");
+
+ printf("# Tests on medium-sized tables (1 GB), 4MB work_mem\n");
+ printf("# Performs an external sort, but the table still fits in OS cache\n");
+ printf("# Needs a multi-stage merge\n");
+ printf("-----\n");
+ execute("set work_mem='4MB'");
+ execute_test("ordered_ints", "SELECT COUNT(*) FROM (SELECT * FROM medium.ordered_ints ORDER BY i) t");
+ execute_test("random_ints", "SELECT COUNT(*) FROM (SELECT * FROM medium.random_ints ORDER BY i) t");
+ execute_test("ordered_text", "SELECT COUNT(*) FROM (SELECT * FROM medium.ordered_text ORDER BY t) t");
+ execute_test("random_text", "SELECT COUNT(*) FROM (SELECT * FROM medium.random_text ORDER BY t) t");
+ printf("\n");
+
+ printf("# Tests on medium-sized tables (1 GB), 16MB work_mem\n");
+ printf("# Same as previous test, but with larger work_mem\n");
+ printf("-----\n");
+ execute("set work_mem='16MB'");
+ execute_test("ordered_ints", "SELECT COUNT(*) FROM (SELECT * FROM medium.ordered_ints ORDER BY i) t");
+ execute_test("random_ints", "SELECT COUNT(*) FROM (SELECT * FROM medium.random_ints ORDER BY i) t");
+ execute_test("ordered_text", "SELECT COUNT(*) FROM (SELECT * FROM medium.ordered_text ORDER BY t) t");
+ execute_test("random_text", "SELECT COUNT(*) FROM (SELECT * FROM medium.random_text ORDER BY t) t");
+ printf("\n");
+
+ printf("# Tests on medium-sized tables (1 GB), 256MB work_mem\n");
+ printf("# This works with a single merge pass\n");
+ printf("-----\n");
+ execute("set work_mem='256MB'");
+ execute_test("ordered_ints", "SELECT COUNT(*) FROM (SELECT * FROM medium.ordered_ints ORDER BY i) t");
+ execute_test("random_ints", "SELECT COUNT(*) FROM (SELECT * FROM medium.random_ints ORDER BY i) t");
+ execute_test("ordered_text", "SELECT COUNT(*) FROM (SELECT * FROM medium.ordered_text ORDER BY t) t");
+ execute_test("random_text", "SELECT COUNT(*) FROM (SELECT * FROM medium.random_text ORDER BY t) t");
+ printf("\n");
+
+ printf("# Tests on medium-sized tables (1 GB), 512MB work_mem\n");
+ printf("# This works with a single merge pass\n");
+ printf("-----\n");
+ execute("set work_mem='512MB'");
+ execute_test("ordered_ints", "SELECT COUNT(*) FROM (SELECT * FROM medium.ordered_ints ORDER BY i) t");
+ execute_test("random_ints", "SELECT COUNT(*) FROM (SELECT * FROM medium.random_ints ORDER BY i) t");
+ execute_test("ordered_text", "SELECT COUNT(*) FROM (SELECT * FROM medium.ordered_text ORDER BY t) t");
+ execute_test("random_text", "SELECT COUNT(*) FROM (SELECT * FROM medium.random_text ORDER BY t) t");
+ printf("\n");
+
+ printf("# Tests on medium-sized tables (1 GB), 2GB work_mem\n");
+ printf("# I thought 2GB would be enough to do a quicksort, but because of\n");
+ printf("# SortTuple overhead (?), it doesn't fit. Performs an external sort with two runs\n");
+ printf("-----\n");
+ execute("set work_mem='2048MB'");
+ execute_test("ordered_ints", "SELECT COUNT(*) FROM (SELECT * FROM medium.ordered_ints ORDER BY i) t");
+ execute_test("random_ints", "SELECT COUNT(*) FROM (SELECT * FROM medium.random_ints ORDER BY i) t");
+ execute_test("ordered_text", "SELECT COUNT(*) FROM (SELECT * FROM medium.ordered_text ORDER BY t) t");
+ execute_test("random_text", "SELECT COUNT(*) FROM (SELECT * FROM medium.random_text ORDER BY t) t");
+ printf("\n");
+
+ PQfinish(conn);
+
+ return 0;
+}
--
2.9.3
On 09/08/2016 09:59 PM, Heikki Linnakangas wrote:
On 09/06/2016 10:26 PM, Peter Geoghegan wrote:
On Tue, Sep 6, 2016 at 12:08 PM, Peter Geoghegan <pg@heroku.com> wrote:
Offhand, I would think that taken together this is very important. I'd
certainly want to see cases in the hundreds of megabytes or gigabytes
of work_mem alongside your 4MB case, even just to be able to talk
informally about this. As you know, the default work_mem value is very
conservative.I spent some more time polishing this up, and also added some code to
logtape.c, to use larger read buffers, to compensate for the fact that
we don't do pre-reading from tuplesort.c anymore. That should trigger
the OS read-ahead, and make the I/O more sequential, like was the
purpose of the old pre-reading code. But simpler. I haven't tested that
part much yet, but I plan to run some tests on larger data sets that
don't fit in RAM, to make the I/O effects visible.
Ok, I ran a few tests with 20 GB tables. I thought this would show any
differences in I/O behaviour, but in fact it was still completely CPU
bound, like the tests on smaller tables I posted yesterday. I guess I
need to point temp_tablespaces to a USB drive or something. But here we go.
It looks like there was a regression when sorting random text, with 256
MB work_mem. I suspect that was a fluke - I only ran these tests once
because they took so long. But I don't know for sure.
Claudio, if you could also repeat the tests you ran on Peter's patch set
on the other thread, with these patches, that'd be nice. These patches
are effectively a replacement for
0002-Use-tuplesort-batch-memory-for-randomAccess-sorts.patch. And review
would be much appreciated too, of course.
Attached are new versions. Compared to last set, they contain a few
comment fixes, and a change to the 2nd patch to not allocate tape
buffers for tapes that were completely unused.
- Heikki
Attachments:
0001-Don-t-bother-to-pre-read-tuples-into-SortTuple-slots.patchtext/x-diff; name=0001-Don-t-bother-to-pre-read-tuples-into-SortTuple-slots.patchDownload
From 90137ebfac0d5f2e80e2fb24cd12bfb664367f5d Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Fri, 9 Sep 2016 14:10:05 +0300
Subject: [PATCH 1/2] Don't bother to pre-read tuples into SortTuple slots
during merge.
That only seems to add overhead. We're doing the same number of READTUP()
calls either way, but we're spreading the memory usage over a larger area
if we try to pre-read, so it doesn't seem worth it.
The pre-reading can be helpful, to trigger the OS readahead of the
underlying tape, because it will make the read pattern appear more
sequential. But we'll fix that in the next patch, by teaching logtape.c to
read in larger chunks.
---
src/backend/utils/sort/tuplesort.c | 903 ++++++++++---------------------------
1 file changed, 226 insertions(+), 677 deletions(-)
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index c8fbcf8..a6d167a 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -162,7 +162,7 @@ bool optimize_bounded_sort = true;
* The objects we actually sort are SortTuple structs. These contain
* a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
* which is a separate palloc chunk --- we assume it is just one chunk and
- * can be freed by a simple pfree() (except during final on-the-fly merge,
+ * can be freed by a simple pfree() (except during merge,
* when memory is used in batch). SortTuples also contain the tuple's
* first key column in Datum/nullflag format, and an index integer.
*
@@ -191,9 +191,8 @@ bool optimize_bounded_sort = true;
* it now only distinguishes RUN_FIRST and HEAP_RUN_NEXT, since replacement
* selection is always abandoned after the first run; no other run number
* should be represented here. During merge passes, we re-use it to hold the
- * input tape number that each tuple in the heap was read from, or to hold the
- * index of the next tuple pre-read from the same tape in the case of pre-read
- * entries. tupindex goes unused if the sort occurs entirely in memory.
+ * input tape number that each tuple in the heap was read from. tupindex goes
+ * unused if the sort occurs entirely in memory.
*/
typedef struct
{
@@ -203,6 +202,20 @@ typedef struct
int tupindex; /* see notes above */
} SortTuple;
+/*
+ * During merge, we use a pre-allocated set of fixed-size buffers to store
+ * tuples in. To avoid palloc/pfree overhead.
+ *
+ * 'nextfree' is valid when this chunk is in the free list. When in use, the
+ * buffer holds a tuple.
+ */
+#define MERGETUPLEBUFFER_SIZE 1024
+
+typedef union MergeTupleBuffer
+{
+ union MergeTupleBuffer *nextfree;
+ char buffer[MERGETUPLEBUFFER_SIZE];
+} MergeTupleBuffer;
/*
* Possible states of a Tuplesort object. These denote the states that
@@ -307,14 +320,6 @@ struct Tuplesortstate
int tapenum, unsigned int len);
/*
- * Function to move a caller tuple. This is usually implemented as a
- * memmove() shim, but function may also perform additional fix-up of
- * caller tuple where needed. Batch memory support requires the movement
- * of caller tuples from one location in memory to another.
- */
- void (*movetup) (void *dest, void *src, unsigned int len);
-
- /*
* This array holds the tuples now in sort memory. If we are in state
* INITIAL, the tuples are in no particular order; if we are in state
* SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
@@ -332,12 +337,40 @@ struct Tuplesortstate
/*
* Memory for tuples is sometimes allocated in batch, rather than
* incrementally. This implies that incremental memory accounting has
- * been abandoned. Currently, this only happens for the final on-the-fly
- * merge step. Large batch allocations can store tuples (e.g.
- * IndexTuples) without palloc() fragmentation and other overhead.
+ * been abandoned. Currently, this happens when we start merging.
+ * Large batch allocations can store tuples (e.g. IndexTuples) without
+ * palloc() fragmentation and other overhead.
+ *
+ * For the batch memory, we use one large allocation, divided into
+ * MERGETUPLEBUFFER_SIZE chunks. The allocation is sized to hold
+ * one chunk per tape, plus one additional chunk. We need that many
+ * chunks to hold all the tuples kept in the heap during merge, plus
+ * the one we have last returned from the sort.
+ *
+ * Initially, all the chunks are kept in a linked list, in freeBufferHead.
+ * When a tuple is read from a tape, it is put to the next available
+ * chunk, if it fits. If the tuple is larger than MERGETUPLEBUFFER_SIZE,
+ * it is palloc'd instead.
+ *
+ * When we're done processing a tuple, we return the chunk back to the
+ * free list, or pfree() if it was palloc'd. We know that a tuple was
+ * allocated from the batch memory arena, if its pointer value is between
+ * batchMemoryBegin and -End.
*/
bool batchUsed;
+ char *batchMemoryBegin; /* beginning of batch memory arena */
+ char *batchMemoryEnd; /* end of batch memory arena */
+ MergeTupleBuffer *freeBufferHead; /* head of free list */
+
+ /*
+ * When we return a tuple to the caller that came from a tape (that is,
+ * in TSS_SORTEDONTAPE or TSS_FINALMERGE modes), we remember the tuple
+ * in 'readlasttuple', so that we can recycle the memory on next
+ * gettuple call.
+ */
+ void *readlasttuple;
+
/*
* While building initial runs, this indicates if the replacement
* selection strategy is in use. When it isn't, then a simple hybrid
@@ -358,42 +391,11 @@ struct Tuplesortstate
*/
/*
- * These variables are only used during merge passes. mergeactive[i] is
+ * This variable is only used during merge passes. mergeactive[i] is
* true if we are reading an input run from (actual) tape number i and
- * have not yet exhausted that run. mergenext[i] is the memtuples index
- * of the next pre-read tuple (next to be loaded into the heap) for tape
- * i, or 0 if we are out of pre-read tuples. mergelast[i] similarly
- * points to the last pre-read tuple from each tape. mergeavailslots[i]
- * is the number of unused memtuples[] slots reserved for tape i, and
- * mergeavailmem[i] is the amount of unused space allocated for tape i.
- * mergefreelist and mergefirstfree keep track of unused locations in the
- * memtuples[] array. The memtuples[].tupindex fields link together
- * pre-read tuples for each tape as well as recycled locations in
- * mergefreelist. It is OK to use 0 as a null link in these lists, because
- * memtuples[0] is part of the merge heap and is never a pre-read tuple.
+ * have not yet exhausted that run.
*/
bool *mergeactive; /* active input run source? */
- int *mergenext; /* first preread tuple for each source */
- int *mergelast; /* last preread tuple for each source */
- int *mergeavailslots; /* slots left for prereading each tape */
- int64 *mergeavailmem; /* availMem for prereading each tape */
- int mergefreelist; /* head of freelist of recycled slots */
- int mergefirstfree; /* first slot never used in this merge */
-
- /*
- * Per-tape batch state, when final on-the-fly merge consumes memory from
- * just a few large allocations.
- *
- * Aside from the general benefits of performing fewer individual retail
- * palloc() calls, this also helps make merging more cache efficient,
- * since each tape's tuples must naturally be accessed sequentially (in
- * sorted order).
- */
- int64 spacePerTape; /* Space (memory) for tuples (not slots) */
- char **mergetuples; /* Each tape's memory allocation */
- char **mergecurrent; /* Current offset into each tape's memory */
- char **mergetail; /* Last item's start point for each tape */
- char **mergeoverflow; /* Retail palloc() "overflow" for each tape */
/*
* Variables for Algorithm D. Note that destTape is a "logical" tape
@@ -481,11 +483,33 @@ struct Tuplesortstate
#endif
};
+/*
+ * Is the given tuple allocated from the batch memory arena?
+ */
+#define IS_MERGETUPLE_BUFFER(state, tuple) \
+ ((char *) tuple >= state->batchMemoryBegin && \
+ (char *) tuple < state->batchMemoryEnd)
+
+/*
+ * Return the given tuple to the batch memory free list, or free it
+ * if it was palloc'd.
+ */
+#define RELEASE_MERGETUPLE_BUFFER(state, tuple) \
+ do { \
+ MergeTupleBuffer *buf = (MergeTupleBuffer *) tuple; \
+ \
+ if (IS_MERGETUPLE_BUFFER(state, tuple)) \
+ { \
+ buf->nextfree = state->freeBufferHead; \
+ state->freeBufferHead = buf; \
+ } else \
+ pfree(tuple); \
+ } while(0)
+
#define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state))
#define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
#define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
#define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
-#define MOVETUP(dest,src,len) ((*(state)->movetup) (dest, src, len))
#define LACKMEM(state) ((state)->availMem < 0 && !(state)->batchUsed)
#define USEMEM(state,amt) ((state)->availMem -= (amt))
#define FREEMEM(state,amt) ((state)->availMem += (amt))
@@ -553,16 +577,8 @@ static void inittapes(Tuplesortstate *state);
static void selectnewtape(Tuplesortstate *state);
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
-static void beginmerge(Tuplesortstate *state, bool finalMergeBatch);
-static void batchmemtuples(Tuplesortstate *state);
-static void mergebatch(Tuplesortstate *state, int64 spacePerTape);
-static void mergebatchone(Tuplesortstate *state, int srcTape,
- SortTuple *stup, bool *should_free);
-static void mergebatchfreetape(Tuplesortstate *state, int srcTape,
- SortTuple *rtup, bool *should_free);
-static void *mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen);
-static void mergepreread(Tuplesortstate *state);
-static void mergeprereadone(Tuplesortstate *state, int srcTape);
+static void beginmerge(Tuplesortstate *state);
+static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
static void dumptuples(Tuplesortstate *state, bool alltuples);
static void dumpbatch(Tuplesortstate *state, bool alltuples);
static void make_bounded_heap(Tuplesortstate *state);
@@ -574,7 +590,7 @@ static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
static void reversedirection(Tuplesortstate *state);
static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
static void markrunend(Tuplesortstate *state, int tapenum);
-static void *readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen);
+static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
static int comparetup_heap(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -582,7 +598,6 @@ static void writetup_heap(Tuplesortstate *state, int tapenum,
SortTuple *stup);
static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
-static void movetup_heap(void *dest, void *src, unsigned int len);
static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -590,7 +605,6 @@ static void writetup_cluster(Tuplesortstate *state, int tapenum,
SortTuple *stup);
static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
-static void movetup_cluster(void *dest, void *src, unsigned int len);
static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
@@ -600,7 +614,6 @@ static void writetup_index(Tuplesortstate *state, int tapenum,
SortTuple *stup);
static void readtup_index(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
-static void movetup_index(void *dest, void *src, unsigned int len);
static int comparetup_datum(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -608,7 +621,6 @@ static void writetup_datum(Tuplesortstate *state, int tapenum,
SortTuple *stup);
static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
-static void movetup_datum(void *dest, void *src, unsigned int len);
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
/*
@@ -760,7 +772,6 @@ tuplesort_begin_heap(TupleDesc tupDesc,
state->copytup = copytup_heap;
state->writetup = writetup_heap;
state->readtup = readtup_heap;
- state->movetup = movetup_heap;
state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
state->abbrevNext = 10;
@@ -833,7 +844,6 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
state->copytup = copytup_cluster;
state->writetup = writetup_cluster;
state->readtup = readtup_cluster;
- state->movetup = movetup_cluster;
state->abbrevNext = 10;
state->indexInfo = BuildIndexInfo(indexRel);
@@ -925,7 +935,6 @@ tuplesort_begin_index_btree(Relation heapRel,
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
- state->movetup = movetup_index;
state->abbrevNext = 10;
state->heapRel = heapRel;
@@ -993,7 +1002,6 @@ tuplesort_begin_index_hash(Relation heapRel,
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
- state->movetup = movetup_index;
state->heapRel = heapRel;
state->indexRel = indexRel;
@@ -1036,7 +1044,6 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
state->copytup = copytup_datum;
state->writetup = writetup_datum;
state->readtup = readtup_datum;
- state->movetup = movetup_datum;
state->abbrevNext = 10;
state->datumType = datumType;
@@ -1881,14 +1888,33 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
case TSS_SORTEDONTAPE:
Assert(forward || state->randomAccess);
Assert(!state->batchUsed);
- *should_free = true;
+
+ /*
+ * The buffer holding the tuple that we returned in previous
+ * gettuple call can now be reused.
+ */
+ if (state->readlasttuple)
+ {
+ RELEASE_MERGETUPLE_BUFFER(state, state->readlasttuple);
+ state->readlasttuple = NULL;
+ }
+
if (forward)
{
if (state->eof_reached)
return false;
+
if ((tuplen = getlen(state, state->result_tape, true)) != 0)
{
READTUP(state, stup, state->result_tape, tuplen);
+
+ /*
+ * Remember the tuple we return, so that we can recycle its
+ * memory on next call. (This can be NULL, in the Datum case).
+ */
+ state->readlasttuple = stup->tuple;
+
+ *should_free = false;
return true;
}
else
@@ -1962,68 +1988,58 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
tuplen))
elog(ERROR, "bogus tuple length in backward scan");
READTUP(state, stup, state->result_tape, tuplen);
+
+ /*
+ * Remember the tuple we return, so that we can recycle its
+ * memory on next call. (This can be NULL, in the Datum case).
+ */
+ state->readlasttuple = stup->tuple;
+
+ *should_free = false;
return true;
case TSS_FINALMERGE:
Assert(forward);
Assert(state->batchUsed || !state->tuples);
- /* For now, assume tuple is stored in tape's batch memory */
+ /* We are managing memory ourselves, with the batch memory arena. */
*should_free = false;
/*
+ * The buffer holding the tuple that we returned in previous
+ * gettuple call can now be reused.
+ */
+ if (state->readlasttuple)
+ {
+ RELEASE_MERGETUPLE_BUFFER(state, state->readlasttuple);
+ state->readlasttuple = NULL;
+ }
+
+ /*
* This code should match the inner loop of mergeonerun().
*/
if (state->memtupcount > 0)
{
int srcTape = state->memtuples[0].tupindex;
- int tupIndex;
- SortTuple *newtup;
+ SortTuple newtup;
- /*
- * Returned tuple is still counted in our memory space most of
- * the time. See mergebatchone() for discussion of why caller
- * may occasionally be required to free returned tuple, and
- * how preread memory is managed with regard to edge cases
- * more generally.
- */
*stup = state->memtuples[0];
tuplesort_heap_siftup(state, false);
- if ((tupIndex = state->mergenext[srcTape]) == 0)
- {
- /*
- * out of preloaded data on this tape, try to read more
- *
- * Unlike mergeonerun(), we only preload from the single
- * tape that's run dry, though not before preparing its
- * batch memory for a new round of sequential consumption.
- * See mergepreread() comments.
- */
- if (state->batchUsed)
- mergebatchone(state, srcTape, stup, should_free);
- mergeprereadone(state, srcTape);
+ /*
+ * Remember the tuple we return, so that we can recycle its
+ * memory on next call. (This can be NULL, in the Datum case).
+ */
+ state->readlasttuple = stup->tuple;
- /*
- * if still no data, we've reached end of run on this tape
- */
- if ((tupIndex = state->mergenext[srcTape]) == 0)
- {
- /* Free tape's buffer, avoiding dangling pointer */
- if (state->batchUsed)
- mergebatchfreetape(state, srcTape, stup, should_free);
- return true;
- }
+ /* pull next tuple from tape, insert in heap */
+ if (!mergereadnext(state, srcTape, &newtup))
+ {
+ /* if no more data, we've reached end of run on this tape */
+ return true;
}
- /* pull next preread tuple from list, insert in heap */
- newtup = &state->memtuples[tupIndex];
- state->mergenext[srcTape] = newtup->tupindex;
- if (state->mergenext[srcTape] == 0)
- state->mergelast[srcTape] = 0;
- tuplesort_heap_insert(state, newtup, srcTape, false);
- /* put the now-unused memtuples entry on the freelist */
- newtup->tupindex = state->mergefreelist;
- state->mergefreelist = tupIndex;
- state->mergeavailslots[srcTape]++;
+
+ tuplesort_heap_insert(state, &newtup, srcTape, false);
+
return true;
}
return false;
@@ -2325,7 +2341,8 @@ inittapes(Tuplesortstate *state)
#endif
/*
- * Decrease availMem to reflect the space needed for tape buffers; but
+ * Decrease availMem to reflect the space needed for tape buffers, when
+ * writing the initial runs; but
* don't decrease it to the point that we have no room for tuples. (That
* case is only likely to occur if sorting pass-by-value Datums; in all
* other scenarios the memtuples[] array is unlikely to occupy more than
@@ -2350,14 +2367,6 @@ inittapes(Tuplesortstate *state)
state->tapeset = LogicalTapeSetCreate(maxTapes);
state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
- state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
- state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
- state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
- state->mergeavailmem = (int64 *) palloc0(maxTapes * sizeof(int64));
- state->mergetuples = (char **) palloc0(maxTapes * sizeof(char *));
- state->mergecurrent = (char **) palloc0(maxTapes * sizeof(char *));
- state->mergetail = (char **) palloc0(maxTapes * sizeof(char *));
- state->mergeoverflow = (char **) palloc0(maxTapes * sizeof(char *));
state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
@@ -2468,6 +2477,8 @@ mergeruns(Tuplesortstate *state)
svTape,
svRuns,
svDummy;
+ char *p;
+ int i;
Assert(state->status == TSS_BUILDRUNS);
Assert(state->memtupcount == 0);
@@ -2504,6 +2515,36 @@ mergeruns(Tuplesortstate *state)
return;
}
+ /*
+ * We no longer need a large memtuples array, only one slot per tape. Shrink
+ * it, to make the memory available for other use. We only need one slot per
+ * tape.
+ */
+ pfree(state->memtuples);
+ FREEMEM(state, state->memtupsize * sizeof(SortTuple));
+ state->memtupsize = state->maxTapes;
+ state->memtuples = (SortTuple *) palloc(state->maxTapes * sizeof(SortTuple));
+ USEMEM(state, state->memtupsize * sizeof(SortTuple));
+
+ /*
+ * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism to
+ * track memory usage.
+ */
+ state->batchUsed = true;
+
+ /* Initialize the merge tuple buffer arena. */
+ state->batchMemoryBegin = palloc((state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE);
+ state->batchMemoryEnd = state->batchMemoryBegin + (state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE;
+ state->freeBufferHead = (MergeTupleBuffer *) state->batchMemoryBegin;
+
+ p = state->batchMemoryBegin;
+ for (i = 0; i < state->maxTapes; i++)
+ {
+ ((MergeTupleBuffer *) p)->nextfree = (MergeTupleBuffer *) (p + MERGETUPLEBUFFER_SIZE);
+ p += MERGETUPLEBUFFER_SIZE;
+ }
+ ((MergeTupleBuffer *) p)->nextfree = NULL;
+
/* End of step D2: rewind all output tapes to prepare for merging */
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
LogicalTapeRewind(state->tapeset, tapenum, false);
@@ -2534,7 +2575,7 @@ mergeruns(Tuplesortstate *state)
/* Tell logtape.c we won't be writing anymore */
LogicalTapeSetForgetFreeSpace(state->tapeset);
/* Initialize for the final merge pass */
- beginmerge(state, state->tuples);
+ beginmerge(state);
state->status = TSS_FINALMERGE;
return;
}
@@ -2617,16 +2658,12 @@ mergeonerun(Tuplesortstate *state)
{
int destTape = state->tp_tapenum[state->tapeRange];
int srcTape;
- int tupIndex;
- SortTuple *tup;
- int64 priorAvail,
- spaceFreed;
/*
* Start the merge by loading one tuple from each active source tape into
* the heap. We can also decrease the input run/dummy run counts.
*/
- beginmerge(state, false);
+ beginmerge(state);
/*
* Execute merge by repeatedly extracting lowest tuple in heap, writing it
@@ -2635,33 +2672,25 @@ mergeonerun(Tuplesortstate *state)
*/
while (state->memtupcount > 0)
{
+ SortTuple stup;
+
/* write the tuple to destTape */
- priorAvail = state->availMem;
srcTape = state->memtuples[0].tupindex;
WRITETUP(state, destTape, &state->memtuples[0]);
- /* writetup adjusted total free space, now fix per-tape space */
- spaceFreed = state->availMem - priorAvail;
- state->mergeavailmem[srcTape] += spaceFreed;
+
+ /* Recycle the buffer we just wrote out, for the next read */
+ RELEASE_MERGETUPLE_BUFFER(state, state->memtuples[0].tuple);
+
/* compact the heap */
tuplesort_heap_siftup(state, false);
- if ((tupIndex = state->mergenext[srcTape]) == 0)
+
+ /* pull next tuple from tape, insert in heap */
+ if (!mergereadnext(state, srcTape, &stup))
{
- /* out of preloaded data on this tape, try to read more */
- mergepreread(state);
- /* if still no data, we've reached end of run on this tape */
- if ((tupIndex = state->mergenext[srcTape]) == 0)
- continue;
+ /* we've reached end of run on this tape */
+ continue;
}
- /* pull next preread tuple from list, insert in heap */
- tup = &state->memtuples[tupIndex];
- state->mergenext[srcTape] = tup->tupindex;
- if (state->mergenext[srcTape] == 0)
- state->mergelast[srcTape] = 0;
- tuplesort_heap_insert(state, tup, srcTape, false);
- /* put the now-unused memtuples entry on the freelist */
- tup->tupindex = state->mergefreelist;
- state->mergefreelist = tupIndex;
- state->mergeavailslots[srcTape]++;
+ tuplesort_heap_insert(state, &stup, srcTape, false);
}
/*
@@ -2694,18 +2723,13 @@ mergeonerun(Tuplesortstate *state)
* which tapes contain active input runs in mergeactive[]. Then, load
* as many tuples as we can from each active input tape, and finally
* fill the merge heap with the first tuple from each active tape.
- *
- * finalMergeBatch indicates if this is the beginning of a final on-the-fly
- * merge where a batched allocation of tuple memory is required.
*/
static void
-beginmerge(Tuplesortstate *state, bool finalMergeBatch)
+beginmerge(Tuplesortstate *state)
{
int activeTapes;
int tapenum;
int srcTape;
- int slotsPerTape;
- int64 spacePerTape;
/* Heap should be empty here */
Assert(state->memtupcount == 0);
@@ -2729,497 +2753,48 @@ beginmerge(Tuplesortstate *state, bool finalMergeBatch)
}
state->activeTapes = activeTapes;
- /* Clear merge-pass state variables */
- memset(state->mergenext, 0,
- state->maxTapes * sizeof(*state->mergenext));
- memset(state->mergelast, 0,
- state->maxTapes * sizeof(*state->mergelast));
- state->mergefreelist = 0; /* nothing in the freelist */
- state->mergefirstfree = activeTapes; /* 1st slot avail for preread */
-
- if (finalMergeBatch)
- {
- /* Free outright buffers for tape never actually allocated */
- FREEMEM(state, (state->maxTapes - activeTapes) * TAPE_BUFFER_OVERHEAD);
-
- /*
- * Grow memtuples one last time, since the palloc() overhead no longer
- * incurred can make a big difference
- */
- batchmemtuples(state);
- }
-
/*
* Initialize space allocation to let each active input tape have an equal
* share of preread space.
*/
Assert(activeTapes > 0);
- slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
- Assert(slotsPerTape > 0);
- spacePerTape = MAXALIGN_DOWN(state->availMem / activeTapes);
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
- {
- if (state->mergeactive[srcTape])
- {
- state->mergeavailslots[srcTape] = slotsPerTape;
- state->mergeavailmem[srcTape] = spacePerTape;
- }
- }
-
- /*
- * Preallocate tuple batch memory for each tape. This is the memory used
- * for tuples themselves (not SortTuples), so it's never used by
- * pass-by-value datum sorts. Memory allocation is performed here at most
- * once per sort, just in advance of the final on-the-fly merge step.
- */
- if (finalMergeBatch)
- mergebatch(state, spacePerTape);
-
- /*
- * Preread as many tuples as possible (and at least one) from each active
- * tape
- */
- mergepreread(state);
/* Load the merge heap with the first tuple from each input tape */
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
- int tupIndex = state->mergenext[srcTape];
- SortTuple *tup;
-
- if (tupIndex)
- {
- tup = &state->memtuples[tupIndex];
- state->mergenext[srcTape] = tup->tupindex;
- if (state->mergenext[srcTape] == 0)
- state->mergelast[srcTape] = 0;
- tuplesort_heap_insert(state, tup, srcTape, false);
- /* put the now-unused memtuples entry on the freelist */
- tup->tupindex = state->mergefreelist;
- state->mergefreelist = tupIndex;
- state->mergeavailslots[srcTape]++;
+ SortTuple tup;
-#ifdef TRACE_SORT
- if (trace_sort && finalMergeBatch)
- {
- int64 perTapeKB = (spacePerTape + 1023) / 1024;
- int64 usedSpaceKB;
- int usedSlots;
-
- /*
- * Report how effective batchmemtuples() was in balancing the
- * number of slots against the need for memory for the
- * underlying tuples (e.g. IndexTuples). The big preread of
- * all tapes when switching to FINALMERGE state should be
- * fairly representative of memory utilization during the
- * final merge step, and in any case is the only point at
- * which all tapes are guaranteed to have depleted either
- * their batch memory allowance or slot allowance. Ideally,
- * both will be completely depleted for every tape by now.
- */
- usedSpaceKB = (state->mergecurrent[srcTape] -
- state->mergetuples[srcTape] + 1023) / 1024;
- usedSlots = slotsPerTape - state->mergeavailslots[srcTape];
-
- elog(LOG, "tape %d initially used " INT64_FORMAT " KB of "
- INT64_FORMAT " KB batch (%2.3f) and %d out of %d slots "
- "(%2.3f)", srcTape,
- usedSpaceKB, perTapeKB,
- (double) usedSpaceKB / (double) perTapeKB,
- usedSlots, slotsPerTape,
- (double) usedSlots / (double) slotsPerTape);
- }
-#endif
- }
+ if (mergereadnext(state, srcTape, &tup))
+ tuplesort_heap_insert(state, &tup, srcTape, false);
}
}
/*
- * batchmemtuples - grow memtuples without palloc overhead
+ * mergereadnext - load tuple from one merge input tape
*
- * When called, availMem should be approximately the amount of memory we'd
- * require to allocate memtupsize - memtupcount tuples (not SortTuples/slots)
- * that were allocated with palloc() overhead, and in doing so use up all
- * allocated slots. However, though slots and tuple memory is in balance
- * following the last grow_memtuples() call, that's predicated on the observed
- * average tuple size for the "final" grow_memtuples() call, which includes
- * palloc overhead. During the final merge pass, where we will arrange to
- * squeeze out the palloc overhead, we might need more slots in the memtuples
- * array.
- *
- * To make that happen, arrange for the amount of remaining memory to be
- * exactly equal to the palloc overhead multiplied by the current size of
- * the memtuples array, force the grow_memtuples flag back to true (it's
- * probably but not necessarily false on entry to this routine), and then
- * call grow_memtuples. This simulates loading enough tuples to fill the
- * whole memtuples array and then having some space left over because of the
- * elided palloc overhead. We expect that grow_memtuples() will conclude that
- * it can't double the size of the memtuples array but that it can increase
- * it by some percentage; but if it does decide to double it, that just means
- * that we've never managed to use many slots in the memtuples array, in which
- * case doubling it shouldn't hurt anything anyway.
- */
-static void
-batchmemtuples(Tuplesortstate *state)
-{
- int64 refund;
- int64 availMemLessRefund;
- int memtupsize = state->memtupsize;
-
- /* For simplicity, assume no memtuples are actually currently counted */
- Assert(state->memtupcount == 0);
-
- /*
- * Refund STANDARDCHUNKHEADERSIZE per tuple.
- *
- * This sometimes fails to make memory use perfectly balanced, but it
- * should never make the situation worse. Note that Assert-enabled builds
- * get a larger refund, due to a varying STANDARDCHUNKHEADERSIZE.
- */
- refund = memtupsize * STANDARDCHUNKHEADERSIZE;
- availMemLessRefund = state->availMem - refund;
-
- /*
- * To establish balanced memory use after refunding palloc overhead,
- * temporarily have our accounting indicate that we've allocated all
- * memory we're allowed to less that refund, and call grow_memtuples() to
- * have it increase the number of slots.
- */
- state->growmemtuples = true;
- USEMEM(state, availMemLessRefund);
- (void) grow_memtuples(state);
- /* Should not matter, but be tidy */
- FREEMEM(state, availMemLessRefund);
- state->growmemtuples = false;
-
-#ifdef TRACE_SORT
- if (trace_sort)
- {
- Size OldKb = (memtupsize * sizeof(SortTuple) + 1023) / 1024;
- Size NewKb = (state->memtupsize * sizeof(SortTuple) + 1023) / 1024;
-
- elog(LOG, "grew memtuples %1.2fx from %d (%zu KB) to %d (%zu KB) for final merge",
- (double) NewKb / (double) OldKb,
- memtupsize, OldKb,
- state->memtupsize, NewKb);
- }
-#endif
-}
-
-/*
- * mergebatch - initialize tuple memory in batch
- *
- * This allows sequential access to sorted tuples buffered in memory from
- * tapes/runs on disk during a final on-the-fly merge step. Note that the
- * memory is not used for SortTuples, but for the underlying tuples (e.g.
- * MinimalTuples).
- *
- * Note that when batch memory is used, there is a simple division of space
- * into large buffers (one per active tape). The conventional incremental
- * memory accounting (calling USEMEM() and FREEMEM()) is abandoned. Instead,
- * when each tape's memory budget is exceeded, a retail palloc() "overflow" is
- * performed, which is then immediately detected in a way that is analogous to
- * LACKMEM(). This keeps each tape's use of memory fair, which is always a
- * goal.
- */
-static void
-mergebatch(Tuplesortstate *state, int64 spacePerTape)
-{
- int srcTape;
-
- Assert(state->activeTapes > 0);
- Assert(state->tuples);
-
- /*
- * For the purposes of tuplesort's memory accounting, the batch allocation
- * is special, and regular memory accounting through USEMEM() calls is
- * abandoned (see mergeprereadone()).
- */
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
- {
- char *mergetuples;
-
- if (!state->mergeactive[srcTape])
- continue;
-
- /* Allocate buffer for each active tape */
- mergetuples = MemoryContextAllocHuge(state->tuplecontext,
- spacePerTape);
-
- /* Initialize state for tape */
- state->mergetuples[srcTape] = mergetuples;
- state->mergecurrent[srcTape] = mergetuples;
- state->mergetail[srcTape] = mergetuples;
- state->mergeoverflow[srcTape] = NULL;
- }
-
- state->batchUsed = true;
- state->spacePerTape = spacePerTape;
-}
-
-/*
- * mergebatchone - prepare batch memory for one merge input tape
- *
- * This is called following the exhaustion of preread tuples for one input
- * tape. All that actually occurs is that the state for the source tape is
- * reset to indicate that all memory may be reused.
- *
- * This routine must deal with fixing up the tuple that is about to be returned
- * to the client, due to "overflow" allocations.
- */
-static void
-mergebatchone(Tuplesortstate *state, int srcTape, SortTuple *rtup,
- bool *should_free)
-{
- Assert(state->batchUsed);
-
- /*
- * Tuple about to be returned to caller ("stup") is final preread tuple
- * from tape, just removed from the top of the heap. Special steps around
- * memory management must be performed for that tuple, to make sure it
- * isn't overwritten early.
- */
- if (!state->mergeoverflow[srcTape])
- {
- Size tupLen;
-
- /*
- * Mark tuple buffer range for reuse, but be careful to move final,
- * tail tuple to start of space for next run so that it's available to
- * caller when stup is returned, and remains available at least until
- * the next tuple is requested.
- */
- tupLen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
- state->mergecurrent[srcTape] = state->mergetuples[srcTape];
- MOVETUP(state->mergecurrent[srcTape], state->mergetail[srcTape],
- tupLen);
-
- /* Make SortTuple at top of the merge heap point to new tuple */
- rtup->tuple = (void *) state->mergecurrent[srcTape];
-
- state->mergetail[srcTape] = state->mergecurrent[srcTape];
- state->mergecurrent[srcTape] += tupLen;
- }
- else
- {
- /*
- * Handle an "overflow" retail palloc.
- *
- * This is needed when we run out of tuple memory for the tape.
- */
- state->mergecurrent[srcTape] = state->mergetuples[srcTape];
- state->mergetail[srcTape] = state->mergetuples[srcTape];
-
- if (rtup->tuple)
- {
- Assert(rtup->tuple == (void *) state->mergeoverflow[srcTape]);
- /* Caller should free palloc'd tuple */
- *should_free = true;
- }
- state->mergeoverflow[srcTape] = NULL;
- }
-}
-
-/*
- * mergebatchfreetape - handle final clean-up for batch memory once tape is
- * about to become exhausted
- *
- * All tuples are returned from tape, but a single final tuple, *rtup, is to be
- * passed back to caller. Free tape's batch allocation buffer while ensuring
- * that the final tuple is managed appropriately.
- */
-static void
-mergebatchfreetape(Tuplesortstate *state, int srcTape, SortTuple *rtup,
- bool *should_free)
-{
- Assert(state->batchUsed);
- Assert(state->status == TSS_FINALMERGE);
-
- /*
- * Tuple may or may not already be an overflow allocation from
- * mergebatchone()
- */
- if (!*should_free && rtup->tuple)
- {
- /*
- * Final tuple still in tape's batch allocation.
- *
- * Return palloc()'d copy to caller, and have it freed in a similar
- * manner to overflow allocation. Otherwise, we'd free batch memory
- * and pass back a pointer to garbage. Note that we deliberately
- * allocate this in the parent tuplesort context, to be on the safe
- * side.
- */
- Size tuplen;
- void *oldTuple = rtup->tuple;
-
- tuplen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
- rtup->tuple = MemoryContextAlloc(state->sortcontext, tuplen);
- MOVETUP(rtup->tuple, oldTuple, tuplen);
- *should_free = true;
- }
-
- /* Free spacePerTape-sized buffer */
- pfree(state->mergetuples[srcTape]);
-}
-
-/*
- * mergebatchalloc - allocate memory for one tuple using a batch memory
- * "logical allocation".
- *
- * This is used for the final on-the-fly merge phase only. READTUP() routines
- * receive memory from here in place of palloc() and USEMEM() calls.
- *
- * Tuple tapenum is passed, ensuring each tape's tuples are stored in sorted,
- * contiguous order (while allowing safe reuse of memory made available to
- * each tape). This maximizes locality of access as tuples are returned by
- * final merge.
- *
- * Caller must not subsequently attempt to free memory returned here. In
- * general, only mergebatch* functions know about how memory returned from
- * here should be freed, and this function's caller must ensure that batch
- * memory management code will definitely have the opportunity to do the right
- * thing during the final on-the-fly merge.
- */
-static void *
-mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen)
-{
- Size reserve_tuplen = MAXALIGN(tuplen);
- char *ret;
-
- /* Should overflow at most once before mergebatchone() call: */
- Assert(state->mergeoverflow[tapenum] == NULL);
- Assert(state->batchUsed);
-
- /* It should be possible to use precisely spacePerTape memory at once */
- if (state->mergecurrent[tapenum] + reserve_tuplen <=
- state->mergetuples[tapenum] + state->spacePerTape)
- {
- /*
- * Usual case -- caller is returned pointer into its tape's buffer,
- * and an offset from that point is recorded as where tape has
- * consumed up to for current round of preloading.
- */
- ret = state->mergetail[tapenum] = state->mergecurrent[tapenum];
- state->mergecurrent[tapenum] += reserve_tuplen;
- }
- else
- {
- /*
- * Allocate memory, and record as tape's overflow allocation. This
- * will be detected quickly, in a similar fashion to a LACKMEM()
- * condition, and should not happen again before a new round of
- * preloading for caller's tape. Note that we deliberately allocate
- * this in the parent tuplesort context, to be on the safe side.
- *
- * Sometimes, this does not happen because merging runs out of slots
- * before running out of memory.
- */
- ret = state->mergeoverflow[tapenum] =
- MemoryContextAlloc(state->sortcontext, tuplen);
- }
-
- return ret;
-}
-
-/*
- * mergepreread - load tuples from merge input tapes
- *
- * This routine exists to improve sequentiality of reads during a merge pass,
- * as explained in the header comments of this file. Load tuples from each
- * active source tape until the tape's run is exhausted or it has used up
- * its fair share of available memory. In any case, we guarantee that there
- * is at least one preread tuple available from each unexhausted input tape.
- *
- * We invoke this routine at the start of a merge pass for initial load,
- * and then whenever any tape's preread data runs out. Note that we load
- * as much data as possible from all tapes, not just the one that ran out.
- * This is because logtape.c works best with a usage pattern that alternates
- * between reading a lot of data and writing a lot of data, so whenever we
- * are forced to read, we should fill working memory completely.
- *
- * In FINALMERGE state, we *don't* use this routine, but instead just preread
- * from the single tape that ran dry. There's no read/write alternation in
- * that state and so no point in scanning through all the tapes to fix one.
- * (Moreover, there may be quite a lot of inactive tapes in that state, since
- * we might have had many fewer runs than tapes. In a regular tape-to-tape
- * merge we can expect most of the tapes to be active. Plus, only
- * FINALMERGE state has to consider memory management for a batch
- * allocation.)
- */
-static void
-mergepreread(Tuplesortstate *state)
-{
- int srcTape;
-
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
- mergeprereadone(state, srcTape);
-}
-
-/*
- * mergeprereadone - load tuples from one merge input tape
+ * Returns false on EOF.
*
* Read tuples from the specified tape until it has used up its free memory
* or array slots; but ensure that we have at least one tuple, if any are
* to be had.
*/
-static void
-mergeprereadone(Tuplesortstate *state, int srcTape)
+static bool
+mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
{
unsigned int tuplen;
- SortTuple stup;
- int tupIndex;
- int64 priorAvail,
- spaceUsed;
if (!state->mergeactive[srcTape])
- return; /* tape's run is already exhausted */
+ return false; /* tape's run is already exhausted */
- /*
- * Manage per-tape availMem. Only actually matters when batch memory not
- * in use.
- */
- priorAvail = state->availMem;
- state->availMem = state->mergeavailmem[srcTape];
-
- /*
- * When batch memory is used if final on-the-fly merge, only mergeoverflow
- * test is relevant; otherwise, only LACKMEM() test is relevant.
- */
- while ((state->mergeavailslots[srcTape] > 0 &&
- state->mergeoverflow[srcTape] == NULL && !LACKMEM(state)) ||
- state->mergenext[srcTape] == 0)
+ /* read next tuple, if any */
+ if ((tuplen = getlen(state, srcTape, true)) == 0)
{
- /* read next tuple, if any */
- if ((tuplen = getlen(state, srcTape, true)) == 0)
- {
- state->mergeactive[srcTape] = false;
- break;
- }
- READTUP(state, &stup, srcTape, tuplen);
- /* find a free slot in memtuples[] for it */
- tupIndex = state->mergefreelist;
- if (tupIndex)
- state->mergefreelist = state->memtuples[tupIndex].tupindex;
- else
- {
- tupIndex = state->mergefirstfree++;
- Assert(tupIndex < state->memtupsize);
- }
- state->mergeavailslots[srcTape]--;
- /* store tuple, append to list for its tape */
- stup.tupindex = 0;
- state->memtuples[tupIndex] = stup;
- if (state->mergelast[srcTape])
- state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
- else
- state->mergenext[srcTape] = tupIndex;
- state->mergelast[srcTape] = tupIndex;
+ state->mergeactive[srcTape] = false;
+ return false;
}
- /* update per-tape and global availmem counts */
- spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
- state->mergeavailmem[srcTape] = state->availMem;
- state->availMem = priorAvail - spaceUsed;
+ READTUP(state, stup, srcTape, tuplen);
+
+ return true;
}
/*
@@ -3857,27 +3432,24 @@ markrunend(Tuplesortstate *state, int tapenum)
* routines.
*/
static void *
-readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen)
+readtup_alloc(Tuplesortstate *state, Size tuplen)
{
- if (state->batchUsed)
- {
- /*
- * No USEMEM() call, because during final on-the-fly merge accounting
- * is based on tape-private state. ("Overflow" allocations are
- * detected as an indication that a new round or preloading is
- * required. Preloading marks existing contents of tape's batch buffer
- * for reuse.)
- */
- return mergebatchalloc(state, tapenum, tuplen);
- }
+ MergeTupleBuffer *buf;
+
+ /*
+ * We pre-allocate enough buffers in the arena that we should never run out.
+ */
+ Assert(state->freeBufferHead);
+
+ if (tuplen > MERGETUPLEBUFFER_SIZE || !state->freeBufferHead)
+ return MemoryContextAlloc(state->sortcontext, tuplen);
else
{
- char *ret;
+ buf = state->freeBufferHead;
+ /* Reuse this buffer */
+ state->freeBufferHead = buf->nextfree;
- /* Batch allocation yet to be performed */
- ret = MemoryContextAlloc(state->tuplecontext, tuplen);
- USEMEM(state, GetMemoryChunkSpace(ret));
- return ret;
+ return buf;
}
}
@@ -4046,8 +3618,11 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
LogicalTapeWrite(state->tapeset, tapenum,
(void *) &tuplen, sizeof(tuplen));
- FREEMEM(state, GetMemoryChunkSpace(tuple));
- heap_free_minimal_tuple(tuple);
+ if (!state->batchUsed)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ heap_free_minimal_tuple(tuple);
+ }
}
static void
@@ -4056,7 +3631,7 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
{
unsigned int tupbodylen = len - sizeof(int);
unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
- MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tapenum, tuplen);
+ MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen);
char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
HeapTupleData htup;
@@ -4077,12 +3652,6 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
&stup->isnull1);
}
-static void
-movetup_heap(void *dest, void *src, unsigned int len)
-{
- memmove(dest, src, len);
-}
-
/*
* Routines specialized for the CLUSTER case (HeapTuple data, with
* comparisons per a btree index definition)
@@ -4289,8 +3858,11 @@ writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
LogicalTapeWrite(state->tapeset, tapenum,
&tuplen, sizeof(tuplen));
- FREEMEM(state, GetMemoryChunkSpace(tuple));
- heap_freetuple(tuple);
+ if (!state->batchUsed)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ heap_freetuple(tuple);
+ }
}
static void
@@ -4299,7 +3871,6 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
{
unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
HeapTuple tuple = (HeapTuple) readtup_alloc(state,
- tapenum,
t_len + HEAPTUPLESIZE);
/* Reconstruct the HeapTupleData header */
@@ -4324,19 +3895,6 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
&stup->isnull1);
}
-static void
-movetup_cluster(void *dest, void *src, unsigned int len)
-{
- HeapTuple tuple;
-
- memmove(dest, src, len);
-
- /* Repoint the HeapTupleData header */
- tuple = (HeapTuple) dest;
- tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
-}
-
-
/*
* Routines specialized for IndexTuple case
*
@@ -4604,8 +4162,11 @@ writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
LogicalTapeWrite(state->tapeset, tapenum,
(void *) &tuplen, sizeof(tuplen));
- FREEMEM(state, GetMemoryChunkSpace(tuple));
- pfree(tuple);
+ if (!state->batchUsed)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ pfree(tuple);
+ }
}
static void
@@ -4613,7 +4174,7 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len)
{
unsigned int tuplen = len - sizeof(unsigned int);
- IndexTuple tuple = (IndexTuple) readtup_alloc(state, tapenum, tuplen);
+ IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen);
LogicalTapeReadExact(state->tapeset, tapenum,
tuple, tuplen);
@@ -4628,12 +4189,6 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
&stup->isnull1);
}
-static void
-movetup_index(void *dest, void *src, unsigned int len)
-{
- memmove(dest, src, len);
-}
-
/*
* Routines specialized for DatumTuple case
*/
@@ -4700,7 +4255,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
LogicalTapeWrite(state->tapeset, tapenum,
(void *) &writtenlen, sizeof(writtenlen));
- if (stup->tuple)
+ if (!state->batchUsed && stup->tuple)
{
FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
pfree(stup->tuple);
@@ -4730,7 +4285,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
}
else
{
- void *raddr = readtup_alloc(state, tapenum, tuplen);
+ void *raddr = readtup_alloc(state, tuplen);
LogicalTapeReadExact(state->tapeset, tapenum,
raddr, tuplen);
@@ -4744,12 +4299,6 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
&tuplen, sizeof(tuplen));
}
-static void
-movetup_datum(void *dest, void *src, unsigned int len)
-{
- memmove(dest, src, len);
-}
-
/*
* Convenience routine to free a tuple previously loaded into sort memory
*/
--
2.9.3
0002-Use-larger-read-buffers-in-logtape.patchtext/x-diff; name=0002-Use-larger-read-buffers-in-logtape.patchDownload
From d28de3cab15ceae31ba1e8d469dc41302470df88 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Thu, 8 Sep 2016 20:34:06 +0300
Subject: [PATCH 2/2] Use larger read buffers in logtape.
This makes the access pattern appear more sequential to the OS, making it
more likely that the OS will do read-head for use. It will also ensure that
there are more sequential blocks available when writing, because we can
free more blocks in the underlying file at once. Sequential I/O is much
cheaper than random I/O.
We used to do pre-reading from each tape, in tuplesort.c, for the same
reasons. But it seems simpler to do it in logtape.c, reading the raw data
into larger a buffer, than converting every tuple to SortTuple format when
pre-reading, like tuplesort.c used to do.
---
src/backend/utils/sort/logtape.c | 134 +++++++++++++++++++++++++++++++------
src/backend/utils/sort/tuplesort.c | 53 +++++++++++++--
src/include/utils/logtape.h | 1 +
3 files changed, 162 insertions(+), 26 deletions(-)
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 7745207..05d7697 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -131,9 +131,12 @@ typedef struct LogicalTape
* reading.
*/
char *buffer; /* physical buffer (separately palloc'd) */
+ int buffer_size; /* allocated size of the buffer */
long curBlockNumber; /* this block's logical blk# within tape */
int pos; /* next read/write position in buffer */
int nbytes; /* total # of valid bytes in buffer */
+
+ int read_buffer_size; /* buffer size to use when reading */
} LogicalTape;
/*
@@ -228,6 +231,53 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
}
/*
+ * Read as many blocks as we can into the per-tape buffer.
+ *
+ * The caller can specify the next physical block number to read, in
+ * datablocknum, or -1 to fetch the next block number from the internal block.
+ * If datablocknum == -1, the caller must've already set curBlockNumber.
+ *
+ * Returns true if anything was read, 'false' on EOF.
+ */
+static bool
+ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt, long datablocknum)
+{
+ lt->pos = 0;
+ lt->nbytes = 0;
+
+ do
+ {
+ /* Fetch next block number (unless provided by caller) */
+ if (datablocknum == -1)
+ {
+ datablocknum = ltsRecallNextBlockNum(lts, lt->indirect, lt->frozen);
+ if (datablocknum == -1L)
+ break; /* EOF */
+ lt->curBlockNumber++;
+ }
+
+ /* Read the block */
+ ltsReadBlock(lts, datablocknum, (void *) (lt->buffer + lt->nbytes));
+ if (!lt->frozen)
+ ltsReleaseBlock(lts, datablocknum);
+
+ if (lt->curBlockNumber < lt->numFullBlocks)
+ lt->nbytes += BLCKSZ;
+ else
+ {
+ /* EOF */
+ lt->nbytes += lt->lastBlockBytes;
+ break;
+ }
+
+ /* Advance to next block, if we have buffer space left */
+ datablocknum = -1;
+ } while (lt->nbytes < lt->buffer_size);
+
+ return (lt->nbytes > 0);
+}
+
+/*
* qsort comparator for sorting freeBlocks[] into decreasing order.
*/
static int
@@ -546,6 +596,8 @@ LogicalTapeSetCreate(int ntapes)
lt->numFullBlocks = 0L;
lt->lastBlockBytes = 0;
lt->buffer = NULL;
+ lt->buffer_size = 0;
+ lt->read_buffer_size = BLCKSZ;
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
@@ -628,7 +680,10 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
/* Allocate data buffer and first indirect block on first write */
if (lt->buffer == NULL)
+ {
lt->buffer = (char *) palloc(BLCKSZ);
+ lt->buffer_size = BLCKSZ;
+ }
if (lt->indirect == NULL)
{
lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock));
@@ -636,6 +691,7 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
lt->indirect->nextup = NULL;
}
+ Assert(lt->buffer_size == BLCKSZ);
while (size > 0)
{
if (lt->pos >= BLCKSZ)
@@ -709,18 +765,19 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
Assert(lt->frozen);
datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
}
+
+ /* Allocate a read buffer */
+ if (lt->buffer)
+ pfree(lt->buffer);
+ lt->buffer = palloc(lt->read_buffer_size);
+ lt->buffer_size = lt->read_buffer_size;
+
/* Read the first block, or reset if tape is empty */
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
if (datablocknum != -1L)
- {
- ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
- if (!lt->frozen)
- ltsReleaseBlock(lts, datablocknum);
- lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
- BLCKSZ : lt->lastBlockBytes;
- }
+ ltsReadFillBuffer(lts, lt, datablocknum);
}
else
{
@@ -754,6 +811,13 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
+
+ if (lt->buffer)
+ {
+ pfree(lt->buffer);
+ lt->buffer = NULL;
+ lt->buffer_size = 0;
+ }
}
}
@@ -779,20 +843,8 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
if (lt->pos >= lt->nbytes)
{
/* Try to load more data into buffer. */
- long datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
- lt->frozen);
-
- if (datablocknum == -1L)
+ if (!ltsReadFillBuffer(lts, lt, -1))
break; /* EOF */
- lt->curBlockNumber++;
- lt->pos = 0;
- ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
- if (!lt->frozen)
- ltsReleaseBlock(lts, datablocknum);
- lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
- BLCKSZ : lt->lastBlockBytes;
- if (lt->nbytes <= 0)
- break; /* EOF (possible here?) */
}
nthistime = lt->nbytes - lt->pos;
@@ -842,6 +894,22 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
lt->writing = false;
lt->frozen = true;
datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
+
+ /*
+ * The seek and backspace functions assume a single block read buffer.
+ * That's OK with current usage. A larger buffer is helpful to make the
+ * read pattern of the backing file look more sequential to the OS, when
+ * we're reading from multiple tapes. But at the end of a sort, when a
+ * tape is frozen, we only read from a single tape anyway.
+ */
+ if (!lt->buffer || lt->buffer_size != BLCKSZ)
+ {
+ if (lt->buffer)
+ pfree(lt->buffer);
+ lt->buffer = palloc(BLCKSZ);
+ lt->buffer_size = BLCKSZ;
+ }
+
/* Read the first block, or reset if tape is empty */
lt->curBlockNumber = 0L;
lt->pos = 0;
@@ -875,6 +943,7 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
Assert(lt->frozen);
+ Assert(lt->buffer_size == BLCKSZ);
/*
* Easy case for seek within current block.
@@ -941,6 +1010,7 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
lt = <s->tapes[tapenum];
Assert(lt->frozen);
Assert(offset >= 0 && offset <= BLCKSZ);
+ Assert(lt->buffer_size == BLCKSZ);
/*
* Easy case for seek within current block.
@@ -1000,6 +1070,9 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
{
LogicalTape *lt;
+ /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
+ Assert(lt->buffer_size == BLCKSZ);
+
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
*blocknum = lt->curBlockNumber;
@@ -1014,3 +1087,24 @@ LogicalTapeSetBlocks(LogicalTapeSet *lts)
{
return lts->nFileBlocks;
}
+
+/*
+ * Set buffer size to use, when reading from given tape.
+ */
+void
+LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t avail_mem)
+{
+ LogicalTape *lt;
+
+ Assert(tapenum >= 0 && tapenum < lts->nTapes);
+ lt = <s->tapes[tapenum];
+
+ /*
+ * The buffer size must be a multiple of BLCKSZ in size, so round the
+ * given value down to nearest BLCKSZ. Make sure we have at least one page.
+ */
+ if (avail_mem < BLCKSZ)
+ avail_mem = BLCKSZ;
+ avail_mem -= avail_mem % BLCKSZ;
+ lt->read_buffer_size = avail_mem;
+}
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index a6d167a..7f5e165 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -2479,6 +2479,9 @@ mergeruns(Tuplesortstate *state)
svDummy;
char *p;
int i;
+ int per_tape, cutoff;
+ long avail_blocks;
+ int maxTapes;
Assert(state->status == TSS_BUILDRUNS);
Assert(state->memtupcount == 0);
@@ -2527,24 +2530,62 @@ mergeruns(Tuplesortstate *state)
USEMEM(state, state->memtupsize * sizeof(SortTuple));
/*
- * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism to
- * track memory usage.
+ * If we had fewer runs than tapes, refund buffers for tapes that were never
+ * allocated.
*/
- state->batchUsed = true;
+ maxTapes = state->maxTapes;
+ if (state->currentRun < maxTapes)
+ {
+ FREEMEM(state, (maxTapes - state->currentRun) * TAPE_BUFFER_OVERHEAD);
+ maxTapes = state->currentRun;
+ }
/* Initialize the merge tuple buffer arena. */
- state->batchMemoryBegin = palloc((state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE);
- state->batchMemoryEnd = state->batchMemoryBegin + (state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE;
+ state->batchMemoryBegin = palloc((maxTapes + 1) * MERGETUPLEBUFFER_SIZE);
+ state->batchMemoryEnd = state->batchMemoryBegin + (maxTapes + 1) * MERGETUPLEBUFFER_SIZE;
state->freeBufferHead = (MergeTupleBuffer *) state->batchMemoryBegin;
+ USEMEM(state, (maxTapes + 1) * MERGETUPLEBUFFER_SIZE);
p = state->batchMemoryBegin;
- for (i = 0; i < state->maxTapes; i++)
+ for (i = 0; i < maxTapes; i++)
{
((MergeTupleBuffer *) p)->nextfree = (MergeTupleBuffer *) (p + MERGETUPLEBUFFER_SIZE);
p += MERGETUPLEBUFFER_SIZE;
}
((MergeTupleBuffer *) p)->nextfree = NULL;
+ /*
+ * Use all the spare memory we have available for read buffers. Divide it
+ * memory evenly among all the tapes.
+ */
+ avail_blocks = state->availMem / BLCKSZ;
+ per_tape = avail_blocks / maxTapes;
+ cutoff = avail_blocks % maxTapes;
+ if (per_tape == 0)
+ {
+ per_tape = 1;
+ cutoff = 0;
+ }
+ for (tapenum = 0; tapenum < maxTapes; tapenum++)
+ {
+ LogicalTapeAssignReadBufferSize(state->tapeset, tapenum,
+ (per_tape + (tapenum < cutoff ? 1 : 0)) * BLCKSZ);
+ }
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "using %d kB of memory for read buffers in %d tapes, %d kB per tape",
+ (int) (state->availMem / 1024), maxTapes, (int) (per_tape * BLCKSZ) / 1024);
+#endif
+
+ USEMEM(state, avail_blocks * BLCKSZ);
+
+ /*
+ * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism to
+ * track memory usage of indivitual tuples.
+ */
+ state->batchUsed = true;
+
/* End of step D2: rewind all output tapes to prepare for merging */
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
LogicalTapeRewind(state->tapeset, tapenum, false);
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index fa1e992..03d0a6f 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -39,6 +39,7 @@ extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
long blocknum, int offset);
extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
long *blocknum, int *offset);
+extern void LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t bufsize);
extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
#endif /* LOGTAPE_H */
--
2.9.3
On 09/09/2016 02:13 PM, Heikki Linnakangas wrote:
On 09/08/2016 09:59 PM, Heikki Linnakangas wrote:
On 09/06/2016 10:26 PM, Peter Geoghegan wrote:
On Tue, Sep 6, 2016 at 12:08 PM, Peter Geoghegan <pg@heroku.com> wrote:
Offhand, I would think that taken together this is very important. I'd
certainly want to see cases in the hundreds of megabytes or gigabytes
of work_mem alongside your 4MB case, even just to be able to talk
informally about this. As you know, the default work_mem value is very
conservative.I spent some more time polishing this up, and also added some code to
logtape.c, to use larger read buffers, to compensate for the fact that
we don't do pre-reading from tuplesort.c anymore. That should trigger
the OS read-ahead, and make the I/O more sequential, like was the
purpose of the old pre-reading code. But simpler. I haven't tested that
part much yet, but I plan to run some tests on larger data sets that
don't fit in RAM, to make the I/O effects visible.Ok, I ran a few tests with 20 GB tables. I thought this would show any
differences in I/O behaviour, but in fact it was still completely CPU
bound, like the tests on smaller tables I posted yesterday. I guess I
need to point temp_tablespaces to a USB drive or something. But here we go.
I took a different tact at demonstrating the I/O pattern effects. I
added some instrumentation code to logtape.c, that prints a line to a
file whenever it reads a block, with the block number. I ran the same
query with master and with these patches, and plotted the access pattern
with gnuplot.
I'm happy with what it looks like. We are in fact getting a more
sequential access pattern with these patches, because we're not
expanding the pre-read tuples into SortTuples. Keeping densely-packed
blocks in memory, instead of SortTuples, allows caching more data overall.
Attached is the patch I used to generate these traces, the gnuplot
script, and traces from I got from sorting a 1 GB table of random
integers, with work_mem=16MB.
Note that in the big picture, what appear to be individual dots, are
actually clusters of a bunch of dots. So the access pattern is a lot
more sequential than it looks like at first glance, with or without
these patches. The zoomed-in pictures show that. If you want to inspect
these in more detail, I recommend running gnuplot in interactive mode,
so that you can zoom in and out easily.
I'm happy with the amount of testing I've done now, and the results.
Does anyone want to throw out any more test cases where there might be a
regression? If not, let's get these reviewed and committed.
- Heikki
Attachments:
trace-logtape.patchtext/x-diff; name=trace-logtape.patchDownload
commit 2d7524e2fa2810fee5c63cb84cae70b8317bf1d5
Author: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Fri Sep 9 14:08:29 2016 +0300
temp file access tracing
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 05d7697..cededac 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -77,8 +77,20 @@
#include "postgres.h"
+/* #define TRACE_BUFFER_WRITES */
+#define TRACE_BUFFER_READS
+
+#if defined(TRACE_BUFFER_WRITES) || defined(TRACE_BUFFER_READS)
+#define TRACE_BUFFER_ACCESS
+#endif
+
+
#include "storage/buffile.h"
#include "utils/logtape.h"
+#ifdef TRACE_BUFFER_ACCESS
+#include "storage/fd.h"
+#include "tcop/tcopprot.h"
+#endif
/*
* Block indexes are "long"s, so we can fit this many per indirect block.
@@ -169,6 +181,10 @@ struct LogicalTapeSet
int nFreeBlocks; /* # of currently free blocks */
int freeBlocksLen; /* current allocated length of freeBlocks[] */
+#ifdef TRACE_BUFFER_ACCESS
+ FILE *tracefile;
+#endif
+
/* The array of logical tapes. */
int nTapes; /* # of logical tapes in set */
LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]; /* has nTapes nentries */
@@ -211,6 +227,9 @@ ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
(errcode_for_file_access(),
errmsg("could not write block %ld of temporary file: %m",
blocknum)));
+#ifdef TRACE_BUFFER_WRITES
+ fprintf(lts->tracefile, "1 %ld\n", blocknum);
+#endif
}
/*
@@ -228,6 +247,10 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
(errcode_for_file_access(),
errmsg("could not read block %ld of temporary file: %m",
blocknum)));
+
+#ifdef TRACE_BUFFER_READS
+ fprintf(lts->tracefile, "0 %ld\n", blocknum);
+#endif
}
/*
@@ -602,6 +625,16 @@ LogicalTapeSetCreate(int ntapes)
lt->pos = 0;
lt->nbytes = 0;
}
+
+#ifdef TRACE_BUFFER_ACCESS
+ lts->tracefile = AllocateFile("logtape-trace", "w+");
+ if (lts->tracefile == NULL)
+ elog(ERROR, "could not open file \"logtape-trace\": %m");
+
+ fprintf(lts->tracefile, "# LogTapeSet with %d tapes allocated\n", ntapes);
+ fprintf(lts->tracefile, "# Query: %s\n", debug_query_string);
+#endif
+
return lts;
}
@@ -630,6 +663,10 @@ LogicalTapeSetClose(LogicalTapeSet *lts)
}
pfree(lts->freeBlocks);
pfree(lts);
+
+#ifdef TRACE_BUFFER_ACCESS
+ (void) FreeFile(lts->tracefile);
+#endif
}
/*
logtape-master.pngimage/png; name=logtape-master.pngDownload
�PNG
IHDR �d�? 2PLTE��� ���� � ��� � ���@ �� Ai��� �@���0`�� @� ������**�� @�� 333MMMfff�������������������22�������U��������������� � d �"�".�W � �p � ���� ��� � �����P����E ��r��z�����k������� �� ���� �����P@Uk/� ��@�@��`��`��� ��@��@��`��p������������������������|�@�� ������������___???�%� pHYs � ��+ IDATx��]v�*$��~XG>����8�m�'6h�}�`���M[���`0��`0��`0��`0��`0��`0��`0��`0��`0��`0?��ty}������`�>���p>�7���x<N����� ���`�������r�{
���z�N��?N���]OS\p�<<]P�0����<���< ��q�]�K����v~�������3������k0Z��w�� ���cy�n������ L�F�)���4�I���ru� <��f���K�O�8�������<��_p<O��F����� �q70��+��B�� ����;p�����;�Mp&