Polyphase merge is obsolete
The beauty of the polyphase merge algorithm is that it allows reusing
input tapes as output tapes efficiently. So if you have N tape drives,
you can keep them all busy throughout the merge.
That doesn't matter, when we can easily have as many "tape drives" as we
want. In PostgreSQL, a tape drive consumes just a few kB of memory, for
the buffers. With the patch being discussed to allow a tape to be
"paused" between write passes [1]Logical tape pause/resume, /messages/by-id/55b3b7ae-8dec-b188-b8eb-e07604052351@iki.fi, we don't even keep the tape buffers
around, when a tape is not actively read written to, so all it consumes
is the memory needed for the LogicalTape struct.
The number of *input* tapes we can use in each merge pass is still
limited, by the memory needed for the tape buffers and the merge heap,
but only one output tape is active at any time. The inactive output
tapes consume very few resources. So the whole idea of trying to
efficiently reuse input tapes as output tapes is pointless.
Let's switch over to a simple k-way balanced merge. Because it's
simpler. If you're severely limited on memory, like when sorting 1GB of
data with work_mem='1MB' or less, it's also slightly faster. I'm not too
excited about the performance aspect, because in the typical case of a
single-pass merge, there's no difference. But it would be worth changing
on simplicity grounds, since we're mucking around in tuplesort.c anyway.
I came up with the attached patch to do that. This patch applies on top
of my latest "Logical tape pause/resume" patches [1]Logical tape pause/resume, /messages/by-id/55b3b7ae-8dec-b188-b8eb-e07604052351@iki.fi. It includes
changes to the logtape.c interface, that make it possible to create and
destroy LogicalTapes in a tapeset on the fly. I believe this will also
come handy for Peter's parallel tuplesort patch set.
[1]: Logical tape pause/resume, /messages/by-id/55b3b7ae-8dec-b188-b8eb-e07604052351@iki.fi
/messages/by-id/55b3b7ae-8dec-b188-b8eb-e07604052351@iki.fi
PS. I finally bit the bullet and got self a copy of The Art of Computer
Programming, Vol 3, 2nd edition. In section 5.4 on External Sorting,
Knuth says:
"
When this book was first written, magnetic tapes were abundant and disk
drives were expensive. But disks became enormously better during the
1980s, and by the late 1990s they had almost completely replaced
magnetic tape units on most of the world's computer systems. Therefore
the once-crucial topic of tape merging has become of limited relevance
to current needs.
Yet many of the patterns are quite beautiful, and the associated
algorithms reflect some of the best research done in computer science
during its early days; the techniques are just too nice to be discarded
abruptly onto the rubbish heap of history. Indeed, the ways in which
these methods blend theory with practice are especially instructive.
Therefore merging patterns are discussed carefully and completely below,
in what may be their last grand appearance before they accept the final
curtain call.
"
Yep, the polyphase and other merge patterns are beautiful. I enjoyed
reading through those sections. Now let's put them to rest in PostgreSQL.
- Heikki
Attachments:
0001-Replace-polyphase-merge-algorithm-with-a-simple-bala.patchtext/x-diff; name=0001-Replace-polyphase-merge-algorithm-with-a-simple-bala.patchDownload
From 15169f32f99a69401d3565c8d0ff0d532d6b6638 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 12 Oct 2016 20:04:17 +0300
Subject: [PATCH 1/1] Replace polyphase merge algorithm with a simple balanced
k-way merge.
The advantage of polyphase merge is that it can reuse the input tapes as
output tapes efficiently, but that is irrelevant on modern hardware, when
we can easily emulate any number of tape drives. The number of input tapes
we can/should use during merging is limited by work_mem, but output tapes
that we are not currently writing to only cost a little bit of memory, so
there is no need to skimp on them.
Refactor LogicalTapeSet/LogicalTape interface. All the tape functions,
like LogicalTapeRead and LogicalTapeWrite, take a LogicalTape as argument,
instead of LogicalTapeSet+tape number. You can create any number of
LogicalTapes in a single LogicalTapeSet, and you don't need to decide the
number upfront, when you create the tape set.
---
src/backend/utils/sort/logtape.c | 223 +++++--------
src/backend/utils/sort/tuplesort.c | 665 +++++++++++++++----------------------
src/include/utils/logtape.h | 37 ++-
3 files changed, 366 insertions(+), 559 deletions(-)
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 1f540f0..2370fa5 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -118,6 +118,8 @@ typedef struct TapeBlockTrailer
*/
typedef struct LogicalTape
{
+ LogicalTapeSet *tapeSet; /* tape set this tape is part of */
+
bool writing; /* T while in write phase */
bool paused; /* T if the tape is paused */
bool frozen; /* T if blocks should not be freed when read */
@@ -173,10 +175,6 @@ struct LogicalTapeSet
long *freeBlocks; /* resizable array */
int nFreeBlocks; /* # of currently free blocks */
int freeBlocksLen; /* current allocated length of freeBlocks[] */
-
- /* The array of logical tapes. */
- int nTapes; /* # of logical tapes in set */
- LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]; /* has nTapes nentries */
};
static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
@@ -228,7 +226,7 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
* Returns true if anything was read, 'false' on EOF.
*/
static bool
-ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
+ltsReadFillBuffer(LogicalTape *lt)
{
lt->pos = 0;
lt->nbytes = 0;
@@ -242,9 +240,9 @@ ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
break; /* EOF */
/* Read the block */
- ltsReadBlock(lts, lt->nextBlockNumber, (void *) thisbuf);
+ ltsReadBlock(lt->tapeSet, lt->nextBlockNumber, (void *) thisbuf);
if (!lt->frozen)
- ltsReleaseBlock(lts, lt->nextBlockNumber);
+ ltsReleaseBlock(lt->tapeSet, lt->nextBlockNumber);
lt->curBlockNumber = lt->nextBlockNumber;
lt->nbytes += TapeBlockGetNBytes(thisbuf);
@@ -348,18 +346,14 @@ ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
* Each tape is initialized in write state.
*/
LogicalTapeSet *
-LogicalTapeSetCreate(int ntapes)
+LogicalTapeSetCreate(void)
{
LogicalTapeSet *lts;
- LogicalTape *lt;
- int i;
/*
* Create top-level struct including per-tape LogicalTape structs.
*/
- Assert(ntapes > 0);
- lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) +
- ntapes * sizeof(LogicalTape));
+ lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
lts->pfile = BufFileCreateTemp(false);
lts->nFileBlocks = 0L;
lts->forgetFreeSpace = false;
@@ -367,52 +361,71 @@ LogicalTapeSetCreate(int ntapes)
lts->freeBlocksLen = 32; /* reasonable initial guess */
lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
lts->nFreeBlocks = 0;
- lts->nTapes = ntapes;
- /*
- * Initialize per-tape structs. Note we allocate the I/O buffer and the
- * first block for a tape only when it is first actually written to. This
- * avoids wasting memory space when tuplesort.c overestimates the number
- * of tapes needed.
- */
- for (i = 0; i < ntapes; i++)
- {
- lt = <s->tapes[i];
- lt->writing = true;
- lt->frozen = false;
- lt->dirty = false;
- lt->paused = false;
- lt->firstBlockNumber = -1L;
- lt->curBlockNumber = -1L;
- lt->buffer = NULL;
- lt->buffer_size = 0;
- lt->pos = 0;
- lt->nbytes = 0;
- }
return lts;
}
/*
* Close a logical tape set and release all resources.
+ *
+ * NOTE: This doesn't close any of the tapes! You must close them
+ * first, or you can let them be destroyed along with the memory context.
*/
void
LogicalTapeSetClose(LogicalTapeSet *lts)
{
- LogicalTape *lt;
- int i;
-
BufFileClose(lts->pfile);
- for (i = 0; i < lts->nTapes; i++)
- {
- lt = <s->tapes[i];
- if (lt->buffer)
- pfree(lt->buffer);
- }
pfree(lts->freeBlocks);
pfree(lts);
}
/*
+ * Create a logical tape in the given tapeset.
+ */
+LogicalTape *
+LogicalTapeCreate(LogicalTapeSet *lts)
+{
+ LogicalTape *lt;
+
+ lt = palloc(sizeof(LogicalTape));
+
+ /*
+ * Initialize per-tape structs. Note we allocate the I/O buffer and the
+ * first block for a tape only when it is first actually written to. This
+ * avoids wasting memory space when tuplesort.c overestimates the number
+ * of tapes needed. (XXX: that doesn't really happen anymore).
+ */
+ lt->tapeSet = lts;
+ lt->writing = true;
+ lt->frozen = false;
+ lt->dirty = false;
+ lt->paused = false;
+ lt->firstBlockNumber = -1L;
+ lt->curBlockNumber = -1L;
+ lt->buffer = NULL;
+ lt->buffer_size = 0;
+ lt->pos = 0;
+ lt->nbytes = 0;
+
+ return lt;
+}
+
+/*
+ * Close a logical tape.
+ *
+ * Note: This doesn't return any blocks to the free list! You must
+ * read the tape to the end first, to reuse the space. In current use,
+ * though, we only close tapes after fully reading them.
+ */
+void
+LogicalTapeClose(LogicalTape *lt)
+{
+ if (lt->buffer)
+ pfree(lt->buffer);
+ pfree(lt);
+}
+
+/*
* Mark a logical tape set as not needing management of free space anymore.
*
* This should be called if the caller does not intend to write any more data
@@ -428,19 +441,24 @@ LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
}
/*
+ * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
+ */
+long
+LogicalTapeSetBlocks(LogicalTapeSet *lts)
+{
+ return lts->nFileBlocks;
+}
+
+/*
* Write to a logical tape.
*
* There are no error returns; we ereport() on failure.
*/
void
-LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
- void *ptr, size_t size)
+LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size)
{
- LogicalTape *lt;
size_t nthistime;
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
Assert(lt->writing);
/* Allocate data buffer and first block on first write */
@@ -452,7 +470,7 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
/* if the tape was paused, resume it now. */
if (lt->paused)
{
- ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+ ltsReadBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
/* 'pos' and 'nbytes' should still be valid */
Assert(lt->nbytes == TapeBlockGetNBytes(lt->buffer));
@@ -470,7 +488,7 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
Assert(lt->firstBlockNumber == -1);
Assert(lt->pos == 0);
- lt->curBlockNumber = ltsGetFreeBlock(lts);
+ lt->curBlockNumber = ltsGetFreeBlock(lt->tapeSet);
lt->firstBlockNumber = lt->curBlockNumber;
TapeBlockGetTrailer(lt->buffer)->prev = -1L;
@@ -488,11 +506,11 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
* First allocate the next block, so that we can store it in the
* 'next' pointer of this block.
*/
- nextBlockNumber = ltsGetFreeBlock(lts);
+ nextBlockNumber = ltsGetFreeBlock(lt->tapeSet);
/* set the next-pointer and dump the current block. */
TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
- ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+ ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
/* initialize the prev-pointer of the next block */
TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
@@ -530,13 +548,8 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
* byte buffer is used.
*/
void
-LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
+LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
{
- LogicalTape *lt;
-
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
-
/*
* Round and cap buffer_size if needed.
*/
@@ -578,7 +591,7 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
if (lt->dirty)
{
TapeBlockSetNBytes(lt->buffer, lt->nbytes);
- ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+ ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
}
}
else
@@ -606,36 +619,7 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
lt->nextBlockNumber = lt->firstBlockNumber;
lt->pos = 0;
lt->nbytes = 0;
- ltsReadFillBuffer(lts, lt);
-}
-
-/*
- * Rewind logical tape and switch from reading to writing.
- *
- * NOTE: we assume the caller has read the tape to the end; otherwise
- * untouched data and indirect blocks will not have been freed. We
- * could add more code to free any unread blocks, but in current usage
- * of this module it'd be useless code.
- */
-void
-LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
-{
- LogicalTape *lt;
-
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
-
- Assert(!lt->writing && !lt->frozen);
- lt->writing = true;
- lt->dirty = false;
- lt->firstBlockNumber = -1L;
- lt->curBlockNumber = -1L;
- lt->pos = 0;
- lt->nbytes = 0;
- if (lt->buffer)
- pfree(lt->buffer);
- lt->buffer = NULL;
- lt->buffer_size = 0;
+ ltsReadFillBuffer(lt);
}
/*
@@ -647,18 +631,13 @@ LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
* and the last partial block is read back into memory.
*/
void
-LogicalTapePause(LogicalTapeSet *lts, int tapenum)
+LogicalTapePause(LogicalTape *lt)
{
- LogicalTape *lt;
-
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
-
Assert(lt->writing);
/* Flush last partial data block. */
TapeBlockSetNBytes(lt->buffer, lt->nbytes);
- ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+ ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
lt->dirty = false;
pfree(lt->buffer);
@@ -674,15 +653,11 @@ LogicalTapePause(LogicalTapeSet *lts, int tapenum)
* Early EOF is indicated by return value less than #bytes requested.
*/
size_t
-LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
- void *ptr, size_t size)
+LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
{
- LogicalTape *lt;
size_t nread = 0;
size_t nthistime;
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
Assert(!lt->writing);
while (size > 0)
@@ -690,7 +665,7 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
if (lt->pos >= lt->nbytes)
{
/* Try to load more data into buffer. */
- if (!ltsReadFillBuffer(lts, lt))
+ if (!ltsReadFillBuffer(lt))
break; /* EOF */
}
@@ -722,12 +697,8 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
* for-read call is OK but not necessary.
*/
void
-LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
+LogicalTapeFreeze(LogicalTape *lt)
{
- LogicalTape *lt;
-
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
Assert(lt->writing);
/*
@@ -737,8 +708,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
if (lt->dirty)
{
TapeBlockSetNBytes(lt->buffer, lt->nbytes);
- ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
- lt->writing = false;
+ ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
}
lt->writing = false;
lt->paused = false;
@@ -766,7 +736,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
if (lt->firstBlockNumber == -1L)
lt->nextBlockNumber = -1L;
- ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+ ltsReadBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
if (TapeBlockIsLast(lt->buffer))
lt->nextBlockNumber = -1L;
else
@@ -788,13 +758,10 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
* that case.
*/
size_t
-LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
+LogicalTapeBackspace(LogicalTape *lt, size_t size)
{
- LogicalTape *lt;
size_t seeked = 0;
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
Assert(lt->frozen);
Assert(lt->buffer_size == BLCKSZ);
@@ -826,7 +793,7 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
return seeked;
}
- ltsReadBlock(lts, prev, (void *) lt->buffer);
+ ltsReadBlock(lt->tapeSet, prev, (void *) lt->buffer);
if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
@@ -859,20 +826,15 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
* LogicalTapeTell().
*/
void
-LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
- long blocknum, int offset)
+LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset)
{
- LogicalTape *lt;
-
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
Assert(lt->frozen);
Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
Assert(lt->buffer_size == BLCKSZ);
if (blocknum != lt->curBlockNumber)
{
- ltsReadBlock(lts, blocknum, (void *) lt->buffer);
+ ltsReadBlock(lt->tapeSet, blocknum, (void *) lt->buffer);
lt->curBlockNumber = blocknum;
lt->nbytes = TapeBlockPayloadSize;
lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
@@ -890,26 +852,11 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
* the position for a seek after freezing. Not clear if anyone needs that.
*/
void
-LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
- long *blocknum, int *offset)
+LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset)
{
- LogicalTape *lt;
-
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
-
/* With a larger buffer, 'pos' wouldn't be the same as offset within page */
Assert(lt->buffer_size == BLCKSZ);
*blocknum = lt->curBlockNumber;
*offset = lt->pos;
}
-
-/*
- * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
- */
-long
-LogicalTapeSetBlocks(LogicalTapeSet *lts)
-{
- return lts->nFileBlocks;
-}
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index eebfb3d..fd4c4a8 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -10,16 +10,22 @@
* amounts are sorted using temporary files and a standard external sort
* algorithm.
*
- * See Knuth, volume 3, for more than you want to know about the external
- * sorting algorithm. Historically, we divided the input into sorted runs
- * using replacement selection, in the form of a priority tree implemented
- * as a heap (essentially his Algorithm 5.2.3H), but now we only do that
- * for the first run, and only if the run would otherwise end up being very
- * short. We merge the runs using polyphase merge, Knuth's Algorithm
- * 5.4.2D. The logical "tapes" used by Algorithm D are implemented by
- * logtape.c, which avoids space wastage by recycling disk space as soon
- * as each block is read from its "tape".
+ * See Knuth, volume 3, for more than you want to know about external
+ * sorting algorithms. The algorithm we use is a balanced k-way merge.
+ * Before PostgreSQL 10, we used the polyphase merge algorithm (Knuth's
+ * Algorithm 5.4.2D), but with modern hardware, a straightforward
+ * balanced merge is better. Knuth is assuming that tape drives are
+ * expensive beasts, and in particular that there will always be many more
+ * runs than tape drives. The polyphase merge algorithm was good at keeping
+ * all the tape drives busy, but in our implementation a "tape drive"
+ * doesn't cost much more than a few Kb of memory buffers, so we can afford
+ * to have lots of them. In particular, if we can have as many tape drives
+ * as sorted runs, we can eliminate any repeated I/O at all.
*
+ * Historically, we divided the input into sorted runs using replacement
+ * selection, in the form of a priority tree implemented as a heap
+ * (essentially Knuth's Algorithm 5.2.3H), but now we only do that for the
+ * first run, and only if the run would otherwise end up being very short.
* We do not use Knuth's recommended data structure (Algorithm 5.4.1R) for
* the replacement selection, because it uses a fixed number of records
* in memory at all times. Since we are dealing with tuples that may vary
@@ -63,10 +69,9 @@
* tuples just by scanning the tuple array sequentially. If we do exceed
* workMem, we begin to emit tuples into sorted runs in temporary tapes.
* When tuples are dumped in batch after quicksorting, we begin a new run
- * with a new output tape (selected per Algorithm D). After the end of the
- * input is reached, we dump out remaining tuples in memory into a final run
- * (or two, when replacement selection is still used), then merge the runs
- * using Algorithm D.
+ * with a new output tape. After the end of the input is reached, we dump
+ * out remaining tuples in memory into a final run (or two, when replacement
+ * selection is still used), then merge the runs.
*
* When merging runs, we use a heap containing just the frontmost tuple from
* each source run; we repeatedly output the smallest tuple and replace it
@@ -89,6 +94,14 @@
* accesses. The pre-reading is handled by logtape.c, we just tell it how
* much memory to use for the buffers.
*
+ * In the current code we determine the number of input tapes M on the basis
+ * of workMem: we want workMem/M to be large enough that we read a fair
+ * amount of data each time we read from a tape, so as to maintain the
+ * locality of access described above. Nonetheless, with large workMem we
+ * can have many tapes. The logical "tapes" are implemented by logtape.c,
+ * which avoids space wastage by recycling disk space as soon as each block
+ * is read from its "tape".
+ *
* When the caller requests random access to the sort result, we form
* the final sorted run on a logical tape which is then "frozen", so
* that we can access it randomly. When the caller does not need random
@@ -97,19 +110,6 @@
* on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
* saves one cycle of writing all the data out to disk and reading it in.
*
- * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
- * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
- * to Knuth's figure 70 (section 5.4.2). However, Knuth is assuming that
- * tape drives are expensive beasts, and in particular that there will always
- * be many more runs than tape drives. In our implementation a "tape drive"
- * doesn't cost much more than a few Kb of memory buffers, so we can afford
- * to have lots of them. In particular, if we can have as many tape drives
- * as sorted runs, we can eliminate any repeated I/O at all. In the current
- * code we determine the number of tapes M on the basis of workMem: we want
- * workMem/M to be large enough that we read a fair amount of data each time
- * we preread from a tape, so as to maintain the locality of access described
- * above. Nonetheless, with large workMem we can have many tapes.
- *
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
@@ -277,8 +277,7 @@ struct Tuplesortstate
bool tuples; /* Can SortTuple.tuple ever be set? */
int64 availMem; /* remaining memory available, in bytes */
int64 allowedMem; /* total memory allowed, in bytes */
- int maxTapes; /* number of tapes (Knuth's T) */
- int tapeRange; /* maxTapes-1 (Knuth's P) */
+ int maxInputTapes; /* max number of input tapes */
MemoryContext sortcontext; /* memory context holding most sort data */
MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
@@ -310,7 +309,7 @@ struct Tuplesortstate
* SortTuple struct!), and increase state->availMem by the amount of
* memory space thereby released.
*/
- void (*writetup) (Tuplesortstate *state, int tapenum,
+ void (*writetup) (Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
/*
@@ -319,7 +318,7 @@ struct Tuplesortstate
* from the slab memory arena, or is palloc'd, see readtup_alloc().
*/
void (*readtup) (Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ LogicalTape *tape, unsigned int len);
/*
* This array holds the tuples now in sort memory. If we are in state
@@ -366,8 +365,8 @@ struct Tuplesortstate
char *slabMemoryEnd; /* end of slab memory arena */
SlabSlot *slabFreeHead; /* head of free list */
- /* Buffer size to use for reading input tapes, during merge. */
- size_t read_buffer_size;
+ /* Memory to use for input tape buffers, during merge. */
+ size_t read_buffer_mem;
/*
* When we return a tuple to the caller in tuplesort_gettuple_XXX, that
@@ -392,36 +391,30 @@ struct Tuplesortstate
int currentRun;
/*
- * Unless otherwise noted, all pointer variables below are pointers to
- * arrays of length maxTapes, holding per-tape data.
+ * Logical tapes.
+ *
+ * The initial runs are written in the output tapes. In each merge pass,
+ * the output tapes of the previous pass become the input tapes, and
+ * new output tapes are allocated as needed. If the number of input runs
+ * is equal to the number of input tapes, there is only one merge pass
+ * left.
*/
+ LogicalTape **inputTapes;
+ int nInputTapes;
+ int nInputRuns;
- /*
- * This variable is only used during merge passes. mergeactive[i] is true
- * if we are reading an input run from (actual) tape number i and have not
- * yet exhausted that run.
- */
- bool *mergeactive; /* active input run source? */
+ LogicalTape **outputTapes;
+ int nOutputTapes;
+ int nOutputRuns;
- /*
- * Variables for Algorithm D. Note that destTape is a "logical" tape
- * number, ie, an index into the tp_xxx[] arrays. Be careful to keep
- * "logical" and "actual" tape numbers straight!
- */
- int Level; /* Knuth's l */
- int destTape; /* current output tape (Knuth's j, less 1) */
- int *tp_fib; /* Target Fibonacci run counts (A[]) */
- int *tp_runs; /* # of real runs on each tape */
- int *tp_dummy; /* # of dummy runs for each tape (D[]) */
- int *tp_tapenum; /* Actual tape numbers (TAPE[]) */
- int activeTapes; /* # of active input tapes in merge pass */
+ LogicalTape *destTape; /* current output tape */
/*
* These variables are used after completion of sorting to keep track of
* the next tuple to return. (In the tape case, the tape's current read
* position is also critical state.)
*/
- int result_tape; /* actual tape number of finished output */
+ LogicalTape *result_tape; /* actual tape of finished output */
int current; /* array index (only used if SORTEDINMEM) */
bool eof_reached; /* reached EOF (needed for cursors) */
@@ -568,9 +561,9 @@ struct Tuplesortstate
*/
/* When using this macro, beware of double evaluation of len */
-#define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
+#define LogicalTapeReadExact(tape, ptr, len) \
do { \
- if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
+ if (LogicalTapeRead(tape, ptr, len) != (size_t) (len)) \
elog(ERROR, "unexpected end of data"); \
} while(0)
@@ -585,7 +578,7 @@ static void init_slab_allocator(Tuplesortstate *state, int numSlots);
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state);
-static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
+static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
static void dumptuples(Tuplesortstate *state, bool alltuples);
static void dumpbatch(Tuplesortstate *state, bool alltuples);
static void make_bounded_heap(Tuplesortstate *state);
@@ -597,39 +590,39 @@ static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple,
bool checkIndex);
static void tuplesort_heap_delete_top(Tuplesortstate *state, bool checkIndex);
static void reversedirection(Tuplesortstate *state);
-static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
-static void markrunend(Tuplesortstate *state, int tapenum);
+static unsigned int getlen(Tuplesortstate *state, LogicalTape *tape, bool eofOK);
+static void markrunend(Tuplesortstate *state, LogicalTape *tape);
static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
static int comparetup_heap(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_heap(Tuplesortstate *state, int tapenum,
+static void writetup_heap(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ LogicalTape *tape, unsigned int len);
static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_cluster(Tuplesortstate *state, int tapenum,
+static void writetup_cluster(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ LogicalTape *tape, unsigned int len);
static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_index(Tuplesortstate *state, int tapenum,
+static void writetup_index(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_index(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ LogicalTape *tape, unsigned int len);
static int comparetup_datum(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_datum(Tuplesortstate *state, int tapenum,
+static void writetup_datum(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ LogicalTape *tape, unsigned int len);
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
/*
@@ -736,11 +729,11 @@ tuplesort_begin_common(int workMem, bool randomAccess)
state->currentRun = RUN_FIRST;
/*
- * maxTapes, tapeRange, and Algorithm D variables will be initialized by
- * inittapes(), if needed
+ * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
+ * inittapes(), if needed.
*/
- state->result_tape = -1; /* flag that result tape has not been formed */
+ state->result_tape = NULL; /* flag that result tape has not been formed */
MemoryContextSwitchTo(oldcontext);
@@ -1171,6 +1164,9 @@ tuplesort_end(Tuplesortstate *state)
*
* Note: want to include this in reported total cost of sort, hence need
* for two #ifdef TRACE_SORT sections.
+ *
+ * We don't bother to destroy the individual tapes here, they will go away
+ * with the sortcontext.
*/
if (state->tapeset)
LogicalTapeSetClose(state->tapeset);
@@ -1825,7 +1821,7 @@ tuplesort_performsort(Tuplesortstate *state)
{
if (state->status == TSS_FINALMERGE)
elog(LOG, "performsort done (except %d-way final merge): %s",
- state->activeTapes,
+ state->nInputTapes,
pg_rusage_show(&state->ru_start));
else
elog(LOG, "performsort done: %s",
@@ -1949,8 +1945,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
* end of file; back up to fetch last tuple's ending length
* word. If seek fails we must have a completely empty file.
*/
- seeked = LogicalTapeBackspace(state->tapeset,
- state->result_tape,
+ seeked = LogicalTapeBackspace(state->result_tape,
2 * sizeof(unsigned int));
if (seeked == 0)
return false;
@@ -1964,8 +1959,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
* Back up and fetch previously-returned tuple's ending length
* word. If seek fails, assume we are at start of file.
*/
- seeked = LogicalTapeBackspace(state->tapeset,
- state->result_tape,
+ seeked = LogicalTapeBackspace(state->result_tape,
sizeof(unsigned int));
if (seeked == 0)
return false;
@@ -1976,8 +1970,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
/*
* Back up to get ending length word of tuple before it.
*/
- seeked = LogicalTapeBackspace(state->tapeset,
- state->result_tape,
+ seeked = LogicalTapeBackspace(state->result_tape,
tuplen + 2 * sizeof(unsigned int));
if (seeked == tuplen + sizeof(unsigned int))
{
@@ -2001,8 +1994,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
* Note: READTUP expects we are positioned after the initial
* length word of the tuple, so back up to that point.
*/
- if (!LogicalTapeBackspace(state->tapeset,
- state->result_tape,
+ if (!LogicalTapeBackspace(state->result_tape,
tuplen))
elog(ERROR, "bogus tuple length in backward scan");
READTUP(state, stup, state->result_tape, tuplen);
@@ -2037,7 +2029,8 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
*/
if (state->memtupcount > 0)
{
- int srcTape = state->memtuples[0].tupindex;
+ int srcTapeIndex = state->memtuples[0].tupindex;
+ LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
SortTuple newtup;
*stup = state->memtuples[0];
@@ -2059,16 +2052,16 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
* Remove the top node from the heap.
*/
tuplesort_heap_delete_top(state, false);
+ state->nInputRuns--;
/*
- * Rewind to free the read buffer. It'd go away at the
- * end of the sort anyway, but better to release the
- * memory early.
+ * Close the tape. It'd go away at the end of the sort
+ * anyway, but better to release the memory early.
*/
- LogicalTapeRewindForWrite(state->tapeset, srcTape);
+ LogicalTapeClose(srcTape);
return true;
}
- newtup.tupindex = srcTape;
+ newtup.tupindex = srcTapeIndex;
tuplesort_heap_replace_top(state, &newtup, false);
return true;
}
@@ -2347,20 +2340,16 @@ useselection(Tuplesortstate *state)
static void
inittapes(Tuplesortstate *state)
{
- int maxTapes,
- j;
+ int j;
int64 tapeSpace;
- /* Compute number of tapes to use: merge order plus 1 */
- maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
-
- state->maxTapes = maxTapes;
- state->tapeRange = maxTapes - 1;
+ /* Compute number of input tapes to use (merge order) */
+ state->maxInputTapes = tuplesort_merge_order(state->allowedMem);
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "switching to external sort with %d tapes: %s",
- maxTapes, pg_rusage_show(&state->ru_start));
+ state->maxInputTapes, pg_rusage_show(&state->ru_start));
#endif
/*
@@ -2384,15 +2373,10 @@ inittapes(Tuplesortstate *state)
PrepareTempTablespaces();
/*
- * Create the tape set and allocate the per-tape data arrays.
+ * Create the tape set. It is initially empty, the tapes are created as
+ * needed.
*/
- state->tapeset = LogicalTapeSetCreate(maxTapes);
-
- state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
- state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
+ state->tapeset = LogicalTapeSetCreate();
/*
* Give replacement selection a try based on user setting. There will be
@@ -2434,63 +2418,50 @@ inittapes(Tuplesortstate *state)
state->currentRun = RUN_FIRST;
/*
- * Initialize variables of Algorithm D (step D1).
+ * Initialize logical tape variables.
*/
- for (j = 0; j < maxTapes; j++)
- {
- state->tp_fib[j] = 1;
- state->tp_runs[j] = 0;
- state->tp_dummy[j] = 1;
- state->tp_tapenum[j] = j;
- }
- state->tp_fib[state->tapeRange] = 0;
- state->tp_dummy[state->tapeRange] = 0;
+ state->inputTapes = NULL;
+ state->nInputTapes = 0;
+ state->nInputRuns = 0;
+
+ state->outputTapes = palloc0(state->maxInputTapes * sizeof(LogicalTape *));
+ state->nOutputTapes = 0;
+ state->nOutputRuns = 0;
- state->Level = 1;
- state->destTape = 0;
+ state->destTape = NULL;
state->status = TSS_BUILDRUNS;
+
+ selectnewtape(state);
}
/*
- * selectnewtape -- select new tape for new initial run.
+ * selectnewtape -- select next tape to output to.
*
* This is called after finishing a run when we know another run
- * must be started. This implements steps D3, D4 of Algorithm D.
+ * must be started. This is used both when building the initial
+ * runs, and during merge passes.
*/
static void
selectnewtape(Tuplesortstate *state)
{
- int j;
- int a;
-
- /*
- * Pause the old tape, to release the memory that was used for its buffer,
- * so that we can use it for building the next run.
- */
- LogicalTapePause(state->tapeset, state->destTape);
-
- /* Step D3: advance j (destTape) */
- if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
- {
- state->destTape++;
- return;
- }
- if (state->tp_dummy[state->destTape] != 0)
+ if (state->nOutputRuns < state->maxInputTapes)
{
- state->destTape = 0;
- return;
+ /* Create a new tape to hold the next run */
+ Assert(state->outputTapes[state->nOutputRuns] == NULL);
+ Assert(state->nOutputRuns == state->nOutputTapes);
+ state->outputTapes[state->nOutputRuns] = LogicalTapeCreate(state->tapeset);
+ state->nOutputTapes++;
}
-
- /* Step D4: increase level */
- state->Level++;
- a = state->tp_fib[0];
- for (j = 0; j < state->tapeRange; j++)
+ else
{
- state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
- state->tp_fib[j] = a + state->tp_fib[j + 1];
+ /*
+ * We have reached the max number of tapes. Append to an existing
+ * tape.
+ */
}
- state->destTape = 0;
+ state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
+ state->nOutputRuns++;
}
/*
@@ -2529,18 +2500,14 @@ init_slab_allocator(Tuplesortstate *state, int numSlots)
/*
* mergeruns -- merge all the completed initial runs.
*
- * This implements steps D5, D6 of Algorithm D. All input data has
+ * This implements the Balanced k-Way Merge Algorithm. All input data has
* already been written to initial runs on tape (see dumptuples).
*/
static void
mergeruns(Tuplesortstate *state)
{
- int tapenum,
- svTape,
- svRuns,
- svDummy;
- int numTapes;
- int numInputTapes;
+ int tapenum;
+ int64 tape_buffer_mem;
Assert(state->status == TSS_BUILDRUNS);
Assert(state->memtupcount == 0);
@@ -2577,37 +2544,19 @@ mergeruns(Tuplesortstate *state)
state->memtuples = NULL;
/*
- * If we had fewer runs than tapes, refund the memory that we imagined we
- * would need for the tape buffers of the unused tapes.
- *
- * numTapes and numInputTapes reflect the actual number of tapes we will
- * use. Note that the output tape's tape number is maxTapes - 1, so the
- * tape numbers of the used tapes are not consecutive, and you cannot just
- * loop from 0 to numTapes to visit all used tapes!
- */
- if (state->Level == 1)
- {
- numInputTapes = state->currentRun;
- numTapes = numInputTapes + 1;
- FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
- }
- else
- {
- numInputTapes = state->tapeRange;
- numTapes = state->maxTapes;
- }
-
- /*
* Initialize the slab allocator. We need one slab slot per input tape,
* for the tuples in the heap, plus one to hold the tuple last returned
* from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
* however, we don't need to do allocate anything.)
*
+ * In a multi-pass merge, we could shrink this allocation between each
+ * pass, when the number of tapes is reduced, but we don't bother.
+ *
* From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
* to track memory usage of individual tuples.
*/
if (state->tuples)
- init_slab_allocator(state, numInputTapes + 1);
+ init_slab_allocator(state, state->nOutputTapes + 1);
else
init_slab_allocator(state, 0);
@@ -2616,13 +2565,13 @@ mergeruns(Tuplesortstate *state)
* volume is between 1X and 2X workMem when replacement selection is used,
* but something we particular count on when input is presorted), we can
* just use that tape as the finished output, rather than doing a useless
- * merge. (This obvious optimization is not in Knuth's algorithm.)
+ * merge.
*/
if (state->currentRun == RUN_SECOND)
{
- state->result_tape = state->tp_tapenum[state->destTape];
+ state->result_tape = state->outputTapes[0];
/* must freeze and rewind the finished output tape */
- LogicalTapeFreeze(state->tapeset, state->result_tape);
+ LogicalTapeFreeze(state->result_tape);
state->status = TSS_SORTEDONTAPE;
return;
}
@@ -2630,65 +2579,73 @@ mergeruns(Tuplesortstate *state)
/*
* Use all the spare memory we have available for read buffers among the
* input tapes.
- *
- * We do this only after checking for the case that we produced only one
- * initial run, because there is no need to use a large read buffer when
- * we're reading from a single tape. With one tape, the I/O pattern will
- * be the same regardless of the buffer size.
- *
- * We don't try to "rebalance" the memory among tapes, when we start a new
- * merge phase, even if some tapes are inactive in the new phase. That
- * would be hard, because logtape.c doesn't know where one run ends and
- * another begins. When a new merge phase begins, and a tape doesn't
- * participate in it, its buffer nevertheless already contains tuples from
- * the next run on same tape, so we cannot release the buffer. That's OK
- * in practice, merge performance isn't that sensitive to the amount of
- * buffers used, and most merge phases use all or almost all tapes,
- * anyway.
*/
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
- (state->availMem) / 1024, numInputTapes);
+ (state->availMem) / 1024, state->nOutputTapes);
#endif
- state->read_buffer_size = state->availMem / numInputTapes;
+ state->read_buffer_mem = state->availMem;
USEMEM(state, state->availMem);
/*
* Allocate a new 'memtuples' array, for the heap. It will hold one tuple
* from each input tape.
+ *
+ * We could shrink this, too, between passes in a multi-pass merge, but
+ * we don't bother. (The initial input tapes are still in outputTapes.
+ * The number of input tapes will not increase between passes.)
*/
- state->memtupsize = numInputTapes;
- state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple));
+ state->memtupsize = state->nOutputTapes;
+ state->memtuples = (SortTuple *) palloc(state->nOutputTapes * sizeof(SortTuple));
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
- /* End of step D2: rewind all output tapes to prepare for merging */
- for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
- LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
+ /* We will use all remaining memory for read buffers */
+ tape_buffer_mem = state->availMem;
+ USEMEM(state, tape_buffer_mem);
for (;;)
{
/*
- * At this point we know that tape[T] is empty. If there's just one
- * (real or dummy) run left on each input tape, then only one merge
- * pass remains. If we don't have to produce a materialized sorted
- * tape, we can stop at this point and do the final merge on-the-fly.
+ * On the first iteration, or if we have read all the runs from the input tapes in
+ * a multi-pass merge, it's time to start a new pass. Rewind all the output tapes,
+ * and make them inputs for the next pass.
*/
- if (!state->randomAccess)
+ if (state->nInputRuns == 0)
{
- bool allOneRun = true;
-
- Assert(state->tp_runs[state->tapeRange] == 0);
- for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
+ /* Close the old, emptied, input tapes */
+ if (state->nInputTapes > 0)
{
- if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
- {
- allOneRun = false;
- break;
- }
+ for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+ LogicalTapeClose(state->inputTapes[tapenum]);
+ pfree(state->inputTapes);
}
- if (allOneRun)
+
+ /* Previous pass's outputs become next pass's inputs. */
+ state->inputTapes = state->outputTapes;
+ state->nInputTapes = state->nOutputTapes;
+ state->nInputRuns = state->nOutputRuns;
+
+ /*
+ * Reset output tape variables. (The actual LogicalTapes will be created
+ * as needed, we just allocate a large-enough array for them here.)
+ */
+ state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
+ state->nOutputTapes = 0;
+ state->nOutputRuns = 0;
+
+ /* Prepare the new input tapes for merge pass. */
+ for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+ LogicalTapeRewindForRead(state->inputTapes[tapenum],
+ state->read_buffer_mem / state->nInputTapes);
+
+ /*
+ * If there's just one run left on each input tape, then only one merge pass
+ * remains. If we don't have to produce a materialized sorted tape, we can
+ * stop at this point and do the final merge on-the-fly.
+ */
+ if (!state->randomAccess && state->nInputRuns <= state->nInputTapes)
{
/* Tell logtape.c we won't be writing anymore */
LogicalTapeSetForgetFreeSpace(state->tapeset);
@@ -2699,93 +2656,46 @@ mergeruns(Tuplesortstate *state)
}
}
- /* Step D5: merge runs onto tape[T] until tape[P] is empty */
- while (state->tp_runs[state->tapeRange - 1] ||
- state->tp_dummy[state->tapeRange - 1])
- {
- bool allDummy = true;
-
- for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
- {
- if (state->tp_dummy[tapenum] == 0)
- {
- allDummy = false;
- break;
- }
- }
+ /* Select an output tape */
+ selectnewtape(state);
- if (allDummy)
- {
- state->tp_dummy[state->tapeRange]++;
- for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
- state->tp_dummy[tapenum]--;
- }
- else
- mergeonerun(state);
- }
+ /* Merge one run from each input tape. */
+ mergeonerun(state);
- /* Step D6: decrease level */
- if (--state->Level == 0)
- break;
- /* rewind output tape T to use as new input */
- LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
- state->read_buffer_size);
- /* rewind used-up input tape P, and prepare it for write pass */
- LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
- state->tp_runs[state->tapeRange - 1] = 0;
+ LogicalTapePause(state->destTape);
/*
- * reassign tape units per step D6; note we no longer care about A[]
+ * If the input tapes are empty, and we output only one output run,
+ * we're done. The current output tape contains the final result.
*/
- svTape = state->tp_tapenum[state->tapeRange];
- svDummy = state->tp_dummy[state->tapeRange];
- svRuns = state->tp_runs[state->tapeRange];
- for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
- {
- state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
- state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
- state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
- }
- state->tp_tapenum[0] = svTape;
- state->tp_dummy[0] = svDummy;
- state->tp_runs[0] = svRuns;
+ if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
+ break;
}
/*
- * Done. Knuth says that the result is on TAPE[1], but since we exited
- * the loop without performing the last iteration of step D6, we have not
- * rearranged the tape unit assignment, and therefore the result is on
- * TAPE[T]. We need to do it this way so that we can freeze the final
- * output tape while rewinding it. The last iteration of step D6 would be
- * a waste of cycles anyway...
+ * Done.
*/
- state->result_tape = state->tp_tapenum[state->tapeRange];
- LogicalTapeFreeze(state->tapeset, state->result_tape);
+ state->result_tape = state->outputTapes[0];
+ LogicalTapeFreeze(state->result_tape);
state->status = TSS_SORTEDONTAPE;
- /* Release the read buffers of all the other tapes, by rewinding them. */
- for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
- {
- if (tapenum != state->result_tape)
- LogicalTapeRewindForWrite(state->tapeset, tapenum);
- }
+ /* Release the read buffers of all the now-empty input tapes. */
+ for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+ LogicalTapeClose(state->inputTapes[tapenum]);
}
/*
- * Merge one run from each input tape, except ones with dummy runs.
- *
- * This is the inner loop of Algorithm D step D5. We know that the
- * output tape is TAPE[T].
+ * Merge one run from each input tape.
*/
static void
mergeonerun(Tuplesortstate *state)
{
- int destTape = state->tp_tapenum[state->tapeRange];
- int srcTape;
+ int srcTapeIndex;
+ LogicalTape *srcTape;
/*
* Start the merge by loading one tuple from each active source tape into
- * the heap. We can also decrease the input run/dummy run counts.
+ * the heap.
*/
beginmerge(state);
@@ -2799,8 +2709,9 @@ mergeonerun(Tuplesortstate *state)
SortTuple stup;
/* write the tuple to destTape */
- srcTape = state->memtuples[0].tupindex;
- WRITETUP(state, destTape, &state->memtuples[0]);
+ srcTapeIndex = state->memtuples[0].tupindex;
+ srcTape = state->inputTapes[srcTapeIndex];
+ WRITETUP(state, state->destTape, &state->memtuples[0]);
/* recycle the slot of the tuple we just wrote out, for the next read */
RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
@@ -2811,24 +2722,26 @@ mergeonerun(Tuplesortstate *state)
*/
if (mergereadnext(state, srcTape, &stup))
{
- stup.tupindex = srcTape;
+ stup.tupindex = srcTapeIndex;
tuplesort_heap_replace_top(state, &stup, false);
}
else
+ {
tuplesort_heap_delete_top(state, false);
+ state->nInputRuns--;
+ }
}
/*
* When the heap empties, we're done. Write an end-of-run marker on the
- * output tape, and increment its count of real runs.
+ * output tape.
*/
- markrunend(state, destTape);
- state->tp_runs[state->tapeRange]++;
+ markrunend(state, state->destTape);
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
+ elog(LOG, "finished %d-way merge step: %s", state->nInputTapes,
pg_rusage_show(&state->ru_start));
#endif
}
@@ -2836,48 +2749,26 @@ mergeonerun(Tuplesortstate *state)
/*
* beginmerge - initialize for a merge pass
*
- * We decrease the counts of real and dummy runs for each tape, and mark
- * which tapes contain active input runs in mergeactive[]. Then, fill the
- * merge heap with the first tuple from each active tape.
+ * Fill the merge heap with the first tuple from each input tape.
*/
static void
beginmerge(Tuplesortstate *state)
{
int activeTapes;
- int tapenum;
- int srcTape;
+ int srcTapeIndex;
/* Heap should be empty here */
Assert(state->memtupcount == 0);
- /* Adjust run counts and mark the active tapes */
- memset(state->mergeactive, 0,
- state->maxTapes * sizeof(*state->mergeactive));
- activeTapes = 0;
- for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
- {
- if (state->tp_dummy[tapenum] > 0)
- state->tp_dummy[tapenum]--;
- else
- {
- Assert(state->tp_runs[tapenum] > 0);
- state->tp_runs[tapenum]--;
- srcTape = state->tp_tapenum[tapenum];
- state->mergeactive[srcTape] = true;
- activeTapes++;
- }
- }
- Assert(activeTapes > 0);
- state->activeTapes = activeTapes;
+ activeTapes = Min(state->nInputTapes, state->nInputRuns);
- /* Load the merge heap with the first tuple from each input tape */
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
+ for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
{
SortTuple tup;
- if (mergereadnext(state, srcTape, &tup))
+ if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
{
- tup.tupindex = srcTape;
+ tup.tupindex = srcTapeIndex;
tuplesort_heap_insert(state, &tup, false);
}
}
@@ -2889,19 +2780,13 @@ beginmerge(Tuplesortstate *state)
* Returns false on EOF.
*/
static bool
-mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
+mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
{
unsigned int tuplen;
- if (!state->mergeactive[srcTape])
- return false; /* tape's run is already exhausted */
-
/* read next tuple, if any */
if ((tuplen = getlen(state, srcTape, true)) == 0)
- {
- state->mergeactive[srcTape] = false;
return false;
- }
READTUP(state, stup, srcTape, tuplen);
return true;
@@ -2944,7 +2829,7 @@ dumptuples(Tuplesortstate *state, bool alltuples)
* Dump the heap's frontmost entry, and remove it from the heap.
*/
Assert(state->memtupcount > 0);
- WRITETUP(state, state->tp_tapenum[state->destTape],
+ WRITETUP(state, state->destTape,
&state->memtuples[0]);
tuplesort_heap_delete_top(state, true);
}
@@ -2964,17 +2849,21 @@ dumptuples(Tuplesortstate *state, bool alltuples)
if (state->memtupcount == 0 ||
state->memtuples[0].tupindex == HEAP_RUN_NEXT)
{
- markrunend(state, state->tp_tapenum[state->destTape]);
+ markrunend(state, state->destTape);
Assert(state->currentRun == RUN_FIRST);
state->currentRun++;
- state->tp_runs[state->destTape]++;
- state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
+
+ /*
+ * Pause the old tape, to release the memory that was used for
+ * its buffer, so that we can use it for building the next run.
+ */
+ LogicalTapePause(state->destTape);
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "finished incrementally writing %s run %d to tape %d: %s",
(state->memtupcount == 0) ? "only" : "first",
- state->currentRun, state->destTape,
+ state->currentRun, state->nOutputTapes % state->nOutputRuns,
pg_rusage_show(&state->ru_start));
#endif
@@ -3035,16 +2924,10 @@ dumpbatch(Tuplesortstate *state, bool alltuples)
* In general, short final runs are quite possible. Rather than allowing
* a special case where there was a superfluous selectnewtape() call (i.e.
* a call with no subsequent run actually written to destTape), we prefer
- * to write out a 0 tuple run.
- *
- * mergereadnext() is prepared for 0 tuple runs, and will reliably mark
- * the tape inactive for the merge when called from beginmerge(). This
- * case is therefore similar to the case where mergeonerun() finds a dummy
- * run for the tape, and so doesn't need to merge a run from the tape (or
- * conceptually "merges" the dummy run, if you prefer). According to
- * Knuth, Algorithm D "isn't strictly optimal" in its method of
- * distribution and dummy run assignment; this edge case seems very
- * unlikely to make that appreciably worse.
+ * to write out a 0 tuple run. In the worst case, that could add another
+ * merge pass, if that pushes us over the threshold, but it's unlikely
+ * enough to not warrant a special case. (XXX: Actually, I think some
+ * refactoring to avoid that would be in order...)
*/
Assert(state->status == TSS_BUILDRUNS);
@@ -3081,8 +2964,7 @@ dumpbatch(Tuplesortstate *state, bool alltuples)
memtupwrite = state->memtupcount;
for (i = 0; i < memtupwrite; i++)
{
- WRITETUP(state, state->tp_tapenum[state->destTape],
- &state->memtuples[i]);
+ WRITETUP(state, state->destTape, &state->memtuples[i]);
state->memtupcount--;
}
@@ -3095,14 +2977,18 @@ dumpbatch(Tuplesortstate *state, bool alltuples)
*/
MemoryContextReset(state->tuplecontext);
- markrunend(state, state->tp_tapenum[state->destTape]);
- state->tp_runs[state->destTape]++;
- state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
+ markrunend(state, state->destTape);
+
+ /*
+ * Pause the old tape, to release the memory that was used for its buffer,
+ * so that we can use it for building the next run.
+ */
+ LogicalTapePause(state->destTape);
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "finished writing run %d to tape %d: %s",
- state->currentRun, state->destTape,
+ state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
pg_rusage_show(&state->ru_start));
#endif
@@ -3129,9 +3015,7 @@ tuplesort_rescan(Tuplesortstate *state)
state->markpos_eof = false;
break;
case TSS_SORTEDONTAPE:
- LogicalTapeRewindForRead(state->tapeset,
- state->result_tape,
- 0);
+ LogicalTapeRewindForRead(state->result_tape, 0);
state->eof_reached = false;
state->markpos_block = 0L;
state->markpos_offset = 0;
@@ -3162,8 +3046,7 @@ tuplesort_markpos(Tuplesortstate *state)
state->markpos_eof = state->eof_reached;
break;
case TSS_SORTEDONTAPE:
- LogicalTapeTell(state->tapeset,
- state->result_tape,
+ LogicalTapeTell(state->result_tape,
&state->markpos_block,
&state->markpos_offset);
state->markpos_eof = state->eof_reached;
@@ -3194,8 +3077,7 @@ tuplesort_restorepos(Tuplesortstate *state)
state->eof_reached = state->markpos_eof;
break;
case TSS_SORTEDONTAPE:
- LogicalTapeSeek(state->tapeset,
- state->result_tape,
+ LogicalTapeSeek(state->result_tape,
state->markpos_block,
state->markpos_offset);
state->eof_reached = state->markpos_eof;
@@ -3527,11 +3409,11 @@ reversedirection(Tuplesortstate *state)
*/
static unsigned int
-getlen(Tuplesortstate *state, int tapenum, bool eofOK)
+getlen(Tuplesortstate *state, LogicalTape *tape, bool eofOK)
{
unsigned int len;
- if (LogicalTapeRead(state->tapeset, tapenum,
+ if (LogicalTapeRead(tape,
&len, sizeof(len)) != sizeof(len))
elog(ERROR, "unexpected end of tape");
if (len == 0 && !eofOK)
@@ -3540,11 +3422,11 @@ getlen(Tuplesortstate *state, int tapenum, bool eofOK)
}
static void
-markrunend(Tuplesortstate *state, int tapenum)
+markrunend(Tuplesortstate *state, LogicalTape *tape)
{
unsigned int len = 0;
- LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
+ LogicalTapeWrite(tape, (void *) &len, sizeof(len));
}
/*
@@ -3722,7 +3604,7 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
}
static void
-writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_heap(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
{
MinimalTuple tuple = (MinimalTuple) stup->tuple;
@@ -3733,13 +3615,10 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
/* total on-disk footprint: */
unsigned int tuplen = tupbodylen + sizeof(int);
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &tuplen, sizeof(tuplen));
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) tupbody, tupbodylen);
+ LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, (void *) tupbody, tupbodylen);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
if (!state->slabAllocatorUsed)
{
@@ -3750,7 +3629,7 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
static void
readtup_heap(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len)
+ LogicalTape *tape, unsigned int len)
{
unsigned int tupbodylen = len - sizeof(int);
unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
@@ -3760,11 +3639,9 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
/* read in the tuple proper */
tuple->t_len = tuplen;
- LogicalTapeReadExact(state->tapeset, tapenum,
- tupbody, tupbodylen);
+ LogicalTapeReadExact(tape, tupbody, tupbodylen);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeReadExact(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
stup->tuple = (void *) tuple;
/* set up first-column key value */
htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
@@ -3965,21 +3842,17 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
}
static void
-writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_cluster(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
{
HeapTuple tuple = (HeapTuple) stup->tuple;
unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
/* We need to store t_self, but not other fields of HeapTupleData */
- LogicalTapeWrite(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
- LogicalTapeWrite(state->tapeset, tapenum,
- &tuple->t_self, sizeof(ItemPointerData));
- LogicalTapeWrite(state->tapeset, tapenum,
- tuple->t_data, tuple->t_len);
+ LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, &tuple->t_self, sizeof(ItemPointerData));
+ LogicalTapeWrite(tape, tuple->t_data, tuple->t_len);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeWrite(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
if (!state->slabAllocatorUsed)
{
@@ -3990,7 +3863,7 @@ writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
static void
readtup_cluster(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int tuplen)
+ LogicalTape *tape, unsigned int tuplen)
{
unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
HeapTuple tuple = (HeapTuple) readtup_alloc(state,
@@ -3999,16 +3872,13 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
/* Reconstruct the HeapTupleData header */
tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
tuple->t_len = t_len;
- LogicalTapeReadExact(state->tapeset, tapenum,
- &tuple->t_self, sizeof(ItemPointerData));
+ LogicalTapeReadExact(tape, &tuple->t_self, sizeof(ItemPointerData));
/* We don't currently bother to reconstruct t_tableOid */
tuple->t_tableOid = InvalidOid;
/* Read in the tuple body */
- LogicalTapeReadExact(state->tapeset, tapenum,
- tuple->t_data, tuple->t_len);
+ LogicalTapeReadExact(tape, tuple->t_data, tuple->t_len);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeReadExact(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
stup->tuple = (void *) tuple;
/* set up first-column key value, if it's a simple column */
if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
@@ -4271,19 +4141,16 @@ copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
}
static void
-writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
{
IndexTuple tuple = (IndexTuple) stup->tuple;
unsigned int tuplen;
tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &tuplen, sizeof(tuplen));
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) tuple, IndexTupleSize(tuple));
+ LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, (void *) tuple, IndexTupleSize(tuple));
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
if (!state->slabAllocatorUsed)
{
@@ -4294,16 +4161,14 @@ writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
static void
readtup_index(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len)
+ LogicalTape *tape, unsigned int len)
{
unsigned int tuplen = len - sizeof(unsigned int);
IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen);
- LogicalTapeReadExact(state->tapeset, tapenum,
- tuple, tuplen);
+ LogicalTapeReadExact(tape, tuple, tuplen);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeReadExact(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
stup->tuple = (void *) tuple;
/* set up first-column key value */
stup->datum1 = index_getattr(tuple,
@@ -4345,7 +4210,7 @@ copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
}
static void
-writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_datum(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
{
void *waddr;
unsigned int tuplen;
@@ -4370,13 +4235,10 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
writtenlen = tuplen + sizeof(unsigned int);
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &writtenlen, sizeof(writtenlen));
- LogicalTapeWrite(state->tapeset, tapenum,
- waddr, tuplen);
+ LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen));
+ LogicalTapeWrite(tape, waddr, tuplen);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &writtenlen, sizeof(writtenlen));
+ LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen));
if (!state->slabAllocatorUsed && stup->tuple)
{
@@ -4387,7 +4249,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
static void
readtup_datum(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len)
+ LogicalTape *tape, unsigned int len)
{
unsigned int tuplen = len - sizeof(unsigned int);
@@ -4401,8 +4263,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
else if (!state->tuples)
{
Assert(tuplen == sizeof(Datum));
- LogicalTapeReadExact(state->tapeset, tapenum,
- &stup->datum1, tuplen);
+ LogicalTapeReadExact(tape, &stup->datum1, tuplen);
stup->isnull1 = false;
stup->tuple = NULL;
}
@@ -4410,16 +4271,14 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
{
void *raddr = readtup_alloc(state, tuplen);
- LogicalTapeReadExact(state->tapeset, tapenum,
- raddr, tuplen);
+ LogicalTapeReadExact(tape, raddr, tuplen);
stup->datum1 = PointerGetDatum(raddr);
stup->isnull1 = false;
stup->tuple = raddr;
}
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeReadExact(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
}
/*
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index 3a51cfb..6add0a5 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -16,32 +16,33 @@
#ifndef LOGTAPE_H
#define LOGTAPE_H
-/* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */
-
+/*
+ * LogicalTapeSet and LogicalTape are opaque types whose details are not
+ * known outside logtape.c.
+ */
typedef struct LogicalTapeSet LogicalTapeSet;
+typedef struct LogicalTape LogicalTape;
+
/*
* prototypes for functions in logtape.c
*/
-extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes);
+extern LogicalTapeSet *LogicalTapeSetCreate(void);
extern void LogicalTapeSetClose(LogicalTapeSet *lts);
extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts);
-extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
- void *ptr, size_t size);
-extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
- void *ptr, size_t size);
-extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum,
- size_t buffer_size);
-extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum);
-extern void LogicalTapePause(LogicalTapeSet *lts, int tapenum);
-extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum);
-extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
- size_t size);
-extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
- long blocknum, int offset);
-extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
- long *blocknum, int *offset);
extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
+extern LogicalTape *LogicalTapeCreate(LogicalTapeSet *lts);
+extern void LogicalTapeClose(LogicalTape *lt);
+extern size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size);
+extern void LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size);
+extern void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size);
+extern void LogicalTapePause(LogicalTape *lt);
+extern void LogicalTapeFreeze(LogicalTape *lt);
+extern size_t LogicalTapeBackspace(LogicalTape *lt, size_t size);
+extern void LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset);
+extern void LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset);
+extern void LogicalTapeAssignReadBufferSize(LogicalTape *lt, size_t bufsize);
+
#endif /* LOGTAPE_H */
--
2.9.3
Heikki Linnakangas <hlinnaka@iki.fi> writes:
The beauty of the polyphase merge algorithm is that it allows reusing
input tapes as output tapes efficiently ... So the whole idea of trying to
efficiently reuse input tapes as output tapes is pointless.
It's been awhile since I looked at that code, but I'm quite certain that
it *never* thought it was dealing with actual tapes. Rather, the point of
sticking with polyphase merge was that it allowed efficient incremental
re-use of temporary disk files, so that the maximum on-disk footprint was
only about equal to the volume of data to be sorted, rather than being a
multiple of that. Have we thrown that property away?
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 10/12/2016 08:27 PM, Tom Lane wrote:
Heikki Linnakangas <hlinnaka@iki.fi> writes:
The beauty of the polyphase merge algorithm is that it allows reusing
input tapes as output tapes efficiently ... So the whole idea of trying to
efficiently reuse input tapes as output tapes is pointless.It's been awhile since I looked at that code, but I'm quite certain that
it *never* thought it was dealing with actual tapes. Rather, the point of
sticking with polyphase merge was that it allowed efficient incremental
re-use of temporary disk files, so that the maximum on-disk footprint was
only about equal to the volume of data to be sorted, rather than being a
multiple of that. Have we thrown that property away?
No, there's no difference to that behavior. logtape.c takes care of
incremental re-use of disk space, regardless of the merging pattern.
- Heikki
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Oct 12, 2016 at 10:16 AM, Heikki Linnakangas <hlinnaka@iki.fi> wrote:
Let's switch over to a simple k-way balanced merge. Because it's simpler. If
you're severely limited on memory, like when sorting 1GB of data with
work_mem='1MB' or less, it's also slightly faster. I'm not too excited about
the performance aspect, because in the typical case of a single-pass merge,
there's no difference. But it would be worth changing on simplicity grounds,
since we're mucking around in tuplesort.c anyway.
This analysis seems sound. I suppose we might as well simplify things
while we're at it.
--
Peter Geoghegan
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Oct 12, 2016 at 10:16 AM, Heikki Linnakangas <hlinnaka@iki.fi> wrote:
The number of *input* tapes we can use in each merge pass is still limited,
by the memory needed for the tape buffers and the merge heap, but only one
output tape is active at any time. The inactive output tapes consume very
few resources. So the whole idea of trying to efficiently reuse input tapes
as output tapes is pointless
I picked this up again. The patch won't apply cleanly. Can you rebase?
Also, please look at my bugfix for logtape.c free block management [1]https://commitfest.postgresql.org/13/955/ -- Peter Geoghegan
before doing so, as that might be prerequisite. Finally, I don't think
that the Logical tape pause/resume idea is compelling. Is it hard to
not do that, but still do everything else that you propose here?
That's what I lean towards doing right now.
Anyway, efficient use of tapes certainly mattered a lot more when
rewinding meant sitting around for a magnetic tape deck to physically
rewind. There is another algorithm in AoCP Vol III that lets us write
to tapes backwards, actually, which is motivated by similar obsolete
considerations about hardware. Why not write while we rewind, to avoid
doing nothing else while rewinding?!
Perhaps this patch should make a clean break from the "rewinding"
terminology. Perhaps you should rename LogicalTapeRewindForRead() to
LogicalTapePrepareForRead(), and so on. It's already a bit awkward
that that routine is called LogicalTapeRewindForRead(), because it
behaves significantly differently when a tape is frozen, and because
the whole point of logtape.c is space reuse that is completely
dissimilar to rewinding. (Space reuse is thus totally unlike how
polyphase merge is supposed to reuse space, which is all about
rewinding, and isn't nearly as eager. Same applies to K-way balanced
merge, of course.)
I think that the "rewinding" terminology does more harm than good, now
that it doesn't even help the Knuth reader to match Knuth's remarks to
what's going on in tuplesort.c. Just a thought.
Let's switch over to a simple k-way balanced merge. Because it's simpler. If
you're severely limited on memory, like when sorting 1GB of data with
work_mem='1MB' or less, it's also slightly faster. I'm not too excited about
the performance aspect, because in the typical case of a single-pass merge,
there's no difference. But it would be worth changing on simplicity grounds,
since we're mucking around in tuplesort.c anyway.
I actually think that the discontinuities in the merge scheduling are
worse than you suggest here. There doesn't have to be as extreme a
difference between work_mem and the size of input as you describe
here. As an example:
create table seq_tab(t int);
insert into seq_tab select generate_series(1, 10000000);
set work_mem = '4MB';
select count(distinct t) from seq_tab;
The trace_sort output ends like this:
30119/2017-01-16 17:17:05 PST LOG: begin datum sort: workMem = 4096,
randomAccess = f
30119/2017-01-16 17:17:05 PST LOG: switching to external sort with 16
tapes: CPU: user: 0.07 s, system: 0.00 s, elapsed: 0.06 s
30119/2017-01-16 17:17:05 PST LOG: starting quicksort of run 1: CPU:
user: 0.07 s, system: 0.00 s, elapsed: 0.06 s
30119/2017-01-16 17:17:05 PST LOG: finished quicksort of run 1: CPU:
user: 0.07 s, system: 0.00 s, elapsed: 0.07 s
*** SNIP ***
30119/2017-01-16 17:17:08 PST LOG: finished writing run 58 to tape 0:
CPU: user: 2.50 s, system: 0.27 s, elapsed: 2.78 s
30119/2017-01-16 17:17:08 PST LOG: using 4095 KB of memory for read
buffers among 15 input tapes
30119/2017-01-16 17:17:08 PST LOG: finished 1-way merge step: CPU:
user: 2.52 s, system: 0.28 s, elapsed: 2.80 s
30119/2017-01-16 17:17:08 PST LOG: finished 4-way merge step: CPU:
user: 2.58 s, system: 0.30 s, elapsed: 2.89 s
30119/2017-01-16 17:17:08 PST LOG: finished 14-way merge step: CPU:
user: 2.86 s, system: 0.34 s, elapsed: 3.20 s
30119/2017-01-16 17:17:08 PST LOG: finished 14-way merge step: CPU:
user: 3.09 s, system: 0.41 s, elapsed: 3.51 s
30119/2017-01-16 17:17:09 PST LOG: finished 15-way merge step: CPU:
user: 3.61 s, system: 0.52 s, elapsed: 4.14 s
30119/2017-01-16 17:17:09 PST LOG: performsort done (except 15-way
final merge): CPU: user: 3.61 s, system: 0.52 s, elapsed: 4.14 s
30119/2017-01-16 17:17:10 PST LOG: external sort ended, 14678 disk
blocks used: CPU: user: 4.93 s, system: 0.57 s, elapsed: 5.51 s
(This is the test case that Cy posted earlier today, for the bug that
was just fixed in master.)
The first 1-way merge step is clearly kind of a waste of time. We
incur no actual comparisons during this "merge", since there is only
one real run from each input tape (all other active tapes contain only
dummy runs). We are, in effect, just shoveling the tuples from that
single run from one tape to another (from one range in the underlying
logtape.c BufFile space to another). I've seen this quite a lot
before, over the years, while working on sorting. It's not that big of
a deal, but it's a degenerate case that illustrates how polyphase
merge can do badly. What's probably of more concern here is that the
final on-the-fly merge we see merges 15 input runs, but 10 of the
inputs only have runs that are sized according to how much we could
quicksort at once (actually, I should say 11 out of 15, what with that
1-way merge). So, 4 of those runs are far larger, which is not great.
ISTM that having the merge be "balanced" is pretty important, at least
as far as multi-pass sorts in the year 2017 go.
Still, I agree with you that this patch is much more about refactoring
than about performance, and I agree that the refactoring is
worthwhile. I just think that you may have somewhat underestimated how
much this could help when we're low on memory, but not ridiculously
so. It doesn't seem necessary to prove this, though. (We need only
verify that there are no regressions.)
That's all I have right now.
[1]: https://commitfest.postgresql.org/13/955/ -- Peter Geoghegan
--
Peter Geoghegan
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Jan 16, 2017 at 5:56 PM, Peter Geoghegan <pg@heroku.com> wrote:
On Wed, Oct 12, 2016 at 10:16 AM, Heikki Linnakangas <hlinnaka@iki.fi> wrote:
The number of *input* tapes we can use in each merge pass is still limited,
by the memory needed for the tape buffers and the merge heap, but only one
output tape is active at any time. The inactive output tapes consume very
few resources. So the whole idea of trying to efficiently reuse input tapes
as output tapes is pointlessI picked this up again. The patch won't apply cleanly. Can you rebase?
Since we have an awful lot of stuff in the last CF, and this patch
doesn't seem particularly strategic, I've marked it "Returned with
Feedback".
Thanks
--
Peter Geoghegan
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Feb 27, 2017 at 2:45 PM, Peter Geoghegan <pg@bowt.ie> wrote:
Since we have an awful lot of stuff in the last CF, and this patch
doesn't seem particularly strategic, I've marked it "Returned with
Feedback".
I noticed that this is in the upcoming CF 1 for v11. I'm signed up to review.
I'd like to point out that replacement selection is also obsolete,
which is something I brought up recently [1]postgr.es/m/CAH2-WzmmNjG_K0R9nqYwMq3zjyJJK+hCbiZYNGhAy-Zyjs64GQ@mail.gmail.com -- Peter Geoghegan. I don't actually have
any feature-driven reason to want to kill replacement selection - it's
just an annoyance at this point. I do think that RS is more deserving
of being killed than Polyphase merge, because it actually costs users
something to continue to support it. The replacement_sort_tuples GUC
particularly deserves to be removed.
It would be nice if killing RS was put in scope here. I'd appreciate
it, at least, since it would simplify the heap routines noticeably.
The original analysis that led to adding replacement_sort_tuples was
based on certain performance characteristics of merging that have
since changed by quite a bit, due to our work for v10.
[1]: postgr.es/m/CAH2-WzmmNjG_K0R9nqYwMq3zjyJJK+hCbiZYNGhAy-Zyjs64GQ@mail.gmail.com -- Peter Geoghegan
--
Peter Geoghegan
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Aug 30, 2017 at 2:54 PM, Peter Geoghegan <pg@bowt.ie> wrote:
I noticed that this is in the upcoming CF 1 for v11. I'm signed up to review.
I'd like to point out that replacement selection is also obsolete,
which is something I brought up recently [1]. I don't actually have
any feature-driven reason to want to kill replacement selection - it's
just an annoyance at this point. I do think that RS is more deserving
of being killed than Polyphase merge, because it actually costs users
something to continue to support it. The replacement_sort_tuples GUC
particularly deserves to be removed.It would be nice if killing RS was put in scope here. I'd appreciate
it, at least, since it would simplify the heap routines noticeably.
The original analysis that led to adding replacement_sort_tuples was
based on certain performance characteristics of merging that have
since changed by quite a bit, due to our work for v10.
These are separate topics. They should each be discussed on their own
thread. Please don't hijack this thread to talk about something else.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Aug 30, 2017 at 12:48 PM, Robert Haas <robertmhaas@gmail.com> wrote:
These are separate topics. They should each be discussed on their own
thread. Please don't hijack this thread to talk about something else.
I don't think that that is a fair summary.
Heikki has done a number of things in this area, of which this is only
the latest. I'm saying "hey, have you thought about RS too?". Whether
or not I'm "hijacking" this thread is, at best, ambiguous.
--
Peter Geoghegan
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hi,
I planned to do some benchmarking on this patch, but apparently the
patch no longer applies. Rebase please?
regards
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 11/09/2017 13:37, Tomas Vondra wrote:
I planned to do some benchmarking on this patch, but apparently the
patch no longer applies. Rebase please?
Here's a rebase of this. Sorry to keep you waiting :-).
I split the patch into two, one patch to refactor the logtape.c APIs,
and another to change the merge algorithm. With the new logtape.c API,
you can create and destroy LogicalTapes in a tape set on the fly, which
simplified the new hash agg spilling code somewhat. I think that's nice,
even without the sort merge algorithm change.
I did some benchmarking too. I used four data sets: ordered integers,
random integers, ordered text, and random text, with 1 GB of data in
each. I sorted those datasets with different work_mem settings, and
plotted the results. I only ran each combination once, and all on my
laptop, so there's a fair amount of noise in individual data points. The
trend is clear, though.
The results show that this makes sorting faster with small work_mem
settings. As work_mem grows so that there's only one merge pass, there's
no difference. The rightmost data points are with work_mem='2500 MB', so
that the data fits completely in work_mem and you get a quicksort.
That's unaffected by the patch.
I've attached the test scripts I used for the plots, although they are
quite messy and not very automated. I don't expect them to be very
helpful to anyone, but might as well have it archived.
- Heikki
Attachments:
v2-0001-Refactor-LogicalTapeSet-LogicalTape-interface.patchtext/x-patch; charset=UTF-8; name=v2-0001-Refactor-LogicalTapeSet-LogicalTape-interface.patchDownload
From 8549855bd7d8516a2575ac962bb9104d6ac7f5a0 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 21 Oct 2020 21:57:42 +0300
Subject: [PATCH v2 1/2] Refactor LogicalTapeSet/LogicalTape interface.
All the tape functions, like LogicalTapeRead and LogicalTapeWrite, now
take a LogicalTape as argument, instead of LogicalTapeSet+tape number.
You can create any number of LogicalTapes in a single LogicalTapeSet, and
you don't need to decide the number upfront, when you create the tape set.
This makes the tape management in hash agg spilling in nodeAgg.c simpler.
---
src/backend/executor/nodeAgg.c | 187 ++++--------
src/backend/utils/sort/logtape.c | 456 ++++++++++++-----------------
src/backend/utils/sort/tuplesort.c | 229 +++++++--------
src/include/nodes/execnodes.h | 3 +-
src/include/utils/logtape.h | 37 ++-
5 files changed, 360 insertions(+), 552 deletions(-)
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 75e5bbf209d..f680f74ed5a 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -208,7 +208,16 @@
*
* Spilled data is written to logical tapes. These provide better control
* over memory usage, disk space, and the number of files than if we were
- * to use a BufFile for each spill.
+ * to use a BufFile for each spill. We don't know the number of tapes needed
+ * at the start of the algorithm (because it can recurse), so a tape set is
+ * allocated at the beginning, and individual tapes are created as needed.
+ * As a particular tape is read, logtape.c recycles its disk space. When a
+ * tape is read to completion, it is destroyed entirely.
+ *
+ * Tapes' buffers can take up substantial memory when many tapes are open at
+ * once. We only need one tape open at a time in read mode (using a buffer
+ * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
+ * requiring a buffer of size BLCKSZ) for each partition.
*
* Note that it's possible for transition states to start small but then
* grow very large; for instance in the case of ARRAY_AGG. In such cases,
@@ -311,27 +320,6 @@
*/
#define CHUNKHDRSZ 16
-/*
- * Track all tapes needed for a HashAgg that spills. We don't know the maximum
- * number of tapes needed at the start of the algorithm (because it can
- * recurse), so one tape set is allocated and extended as needed for new
- * tapes. When a particular tape is already read, rewind it for write mode and
- * put it in the free list.
- *
- * Tapes' buffers can take up substantial memory when many tapes are open at
- * once. We only need one tape open at a time in read mode (using a buffer
- * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
- * requiring a buffer of size BLCKSZ) for each partition.
- */
-typedef struct HashTapeInfo
-{
- LogicalTapeSet *tapeset;
- int ntapes;
- int *freetapes;
- int nfreetapes;
- int freetapes_alloc;
-} HashTapeInfo;
-
/*
* Represents partitioned spill data for a single hashtable. Contains the
* necessary information to route tuples to the correct partition, and to
@@ -343,9 +331,8 @@ typedef struct HashTapeInfo
*/
typedef struct HashAggSpill
{
- LogicalTapeSet *tapeset; /* borrowed reference to tape set */
int npartitions; /* number of partitions */
- int *partitions; /* spill partition tape numbers */
+ LogicalTape **partitions; /* spill partition tapes */
int64 *ntuples; /* number of tuples in each partition */
uint32 mask; /* mask to find partition from hash value */
int shift; /* after masking, shift by this amount */
@@ -365,8 +352,7 @@ typedef struct HashAggBatch
{
int setno; /* grouping set */
int used_bits; /* number of bits of hash already used */
- LogicalTapeSet *tapeset; /* borrowed reference to tape set */
- int input_tapenum; /* input partition tape */
+ LogicalTape *input_tape; /* input partition tape */
int64 input_tuples; /* number of tuples in this batch */
double input_card; /* estimated group cardinality */
} HashAggBatch;
@@ -442,22 +428,17 @@ static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
int npartitions);
static void hashagg_finish_initial_spills(AggState *aggstate);
static void hashagg_reset_spill_state(AggState *aggstate);
-static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
- int input_tapenum, int setno,
+static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno,
int64 input_tuples, double input_card,
int used_bits);
static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
-static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
+static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *lts,
int used_bits, double input_groups,
double hashentrysize);
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
TupleTableSlot *slot, uint32 hash);
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
int setno);
-static void hashagg_tapeinfo_init(AggState *aggstate);
-static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
- int ndest);
-static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
AggState *aggstate, EState *estate,
@@ -1881,12 +1862,12 @@ hash_agg_enter_spill_mode(AggState *aggstate)
if (!aggstate->hash_ever_spilled)
{
- Assert(aggstate->hash_tapeinfo == NULL);
+ Assert(aggstate->hash_tapeset == NULL);
Assert(aggstate->hash_spills == NULL);
aggstate->hash_ever_spilled = true;
- hashagg_tapeinfo_init(aggstate);
+ aggstate->hash_tapeset = LogicalTapeSetCreate(true, NULL, -1);
aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
@@ -1895,7 +1876,7 @@ hash_agg_enter_spill_mode(AggState *aggstate)
AggStatePerHash perhash = &aggstate->perhash[setno];
HashAggSpill *spill = &aggstate->hash_spills[setno];
- hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
+ hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
perhash->aggnode->numGroups,
aggstate->hashentrysize);
}
@@ -1937,9 +1918,9 @@ hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
aggstate->hash_mem_peak = total_mem;
/* update disk usage */
- if (aggstate->hash_tapeinfo != NULL)
+ if (aggstate->hash_tapeset != NULL)
{
- uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
+ uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeset) * (BLCKSZ / 1024);
if (aggstate->hash_disk_used < disk_used)
aggstate->hash_disk_used = disk_used;
@@ -2123,7 +2104,7 @@ lookup_hash_entries(AggState *aggstate)
TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
if (spill->partitions == NULL)
- hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
+ hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
perhash->aggnode->numGroups,
aggstate->hashentrysize);
@@ -2588,7 +2569,7 @@ agg_refill_hash_table(AggState *aggstate)
HashAggBatch *batch;
AggStatePerHash perhash;
HashAggSpill spill;
- HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
+ LogicalTapeSet *tapeset = aggstate->hash_tapeset;
bool spill_initialized = false;
if (aggstate->hash_batches == NIL)
@@ -2680,7 +2661,7 @@ agg_refill_hash_table(AggState *aggstate)
* that we don't assign tapes that will never be used.
*/
spill_initialized = true;
- hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
+ hashagg_spill_init(&spill, tapeset, batch->used_bits,
batch->input_card, aggstate->hashentrysize);
}
/* no memory for a new group, spill */
@@ -2696,7 +2677,7 @@ agg_refill_hash_table(AggState *aggstate)
ResetExprContext(aggstate->tmpcontext);
}
- hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
+ LogicalTapeClose(batch->input_tape);
/* change back to phase 0 */
aggstate->current_phase = 0;
@@ -2871,67 +2852,6 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate)
return NULL;
}
-/*
- * Initialize HashTapeInfo
- */
-static void
-hashagg_tapeinfo_init(AggState *aggstate)
-{
- HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
- int init_tapes = 16; /* expanded dynamically */
-
- tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1);
- tapeinfo->ntapes = init_tapes;
- tapeinfo->nfreetapes = init_tapes;
- tapeinfo->freetapes_alloc = init_tapes;
- tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
- for (int i = 0; i < init_tapes; i++)
- tapeinfo->freetapes[i] = i;
-
- aggstate->hash_tapeinfo = tapeinfo;
-}
-
-/*
- * Assign unused tapes to spill partitions, extending the tape set if
- * necessary.
- */
-static void
-hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions,
- int npartitions)
-{
- int partidx = 0;
-
- /* use free tapes if available */
- while (partidx < npartitions && tapeinfo->nfreetapes > 0)
- partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
-
- if (partidx < npartitions)
- {
- LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
-
- while (partidx < npartitions)
- partitions[partidx++] = tapeinfo->ntapes++;
- }
-}
-
-/*
- * After a tape has already been written to and then read, this function
- * rewinds it for writing and adds it to the free list.
- */
-static void
-hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
-{
- /* rewinding frees the buffer while not in use */
- LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
- if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
- {
- tapeinfo->freetapes_alloc <<= 1;
- tapeinfo->freetapes = repalloc(tapeinfo->freetapes,
- tapeinfo->freetapes_alloc * sizeof(int));
- }
- tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
-}
-
/*
* hashagg_spill_init
*
@@ -2939,7 +2859,7 @@ hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
* of partitions to create, and initializes them.
*/
static void
-hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
+hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
double input_groups, double hashentrysize)
{
int npartitions;
@@ -2948,13 +2868,13 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
used_bits, &partition_bits);
- spill->partitions = palloc0(sizeof(int) * npartitions);
+ spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions);
spill->ntuples = palloc0(sizeof(int64) * npartitions);
spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
- hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
+ for (int i = 0; i < npartitions; i++)
+ spill->partitions[i] = LogicalTapeCreate(tapeset);
- spill->tapeset = tapeinfo->tapeset;
spill->shift = 32 - used_bits - partition_bits;
spill->mask = (npartitions - 1) << spill->shift;
spill->npartitions = npartitions;
@@ -2973,11 +2893,10 @@ static Size
hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
TupleTableSlot *inputslot, uint32 hash)
{
- LogicalTapeSet *tapeset = spill->tapeset;
TupleTableSlot *spillslot;
int partition;
MinimalTuple tuple;
- int tapenum;
+ LogicalTape *tape;
int total_written = 0;
bool shouldFree;
@@ -3016,12 +2935,12 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
*/
addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
- tapenum = spill->partitions[partition];
+ tape = spill->partitions[partition];
- LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
+ LogicalTapeWrite(tape, (void *) &hash, sizeof(uint32));
total_written += sizeof(uint32);
- LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
+ LogicalTapeWrite(tape, (void *) tuple, tuple->t_len);
total_written += tuple->t_len;
if (shouldFree)
@@ -3037,15 +2956,14 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
* be done.
*/
static HashAggBatch *
-hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
+hashagg_batch_new(LogicalTape *input_tape, int setno,
int64 input_tuples, double input_card, int used_bits)
{
HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
batch->setno = setno;
batch->used_bits = used_bits;
- batch->tapeset = tapeset;
- batch->input_tapenum = tapenum;
+ batch->input_tape = input_tape;
batch->input_tuples = input_tuples;
batch->input_card = input_card;
@@ -3059,42 +2977,41 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
static MinimalTuple
hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
{
- LogicalTapeSet *tapeset = batch->tapeset;
- int tapenum = batch->input_tapenum;
+ LogicalTape *tape = batch->input_tape;
MinimalTuple tuple;
uint32 t_len;
size_t nread;
uint32 hash;
- nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
+ nread = LogicalTapeRead(tape, &hash, sizeof(uint32));
if (nread == 0)
return NULL;
if (nread != sizeof(uint32))
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
- tapenum, sizeof(uint32), nread)));
+ errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+ tape, sizeof(uint32), nread)));
if (hashp != NULL)
*hashp = hash;
- nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
+ nread = LogicalTapeRead(tape, &t_len, sizeof(t_len));
if (nread != sizeof(uint32))
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
- tapenum, sizeof(uint32), nread)));
+ errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+ tape, sizeof(uint32), nread)));
tuple = (MinimalTuple) palloc(t_len);
tuple->t_len = t_len;
- nread = LogicalTapeRead(tapeset, tapenum,
+ nread = LogicalTapeRead(tape,
(void *) ((char *) tuple + sizeof(uint32)),
t_len - sizeof(uint32));
if (nread != t_len - sizeof(uint32))
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
- tapenum, t_len - sizeof(uint32), nread)));
+ errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+ tape, t_len - sizeof(uint32), nread)));
return tuple;
}
@@ -3151,8 +3068,7 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
for (i = 0; i < spill->npartitions; i++)
{
- LogicalTapeSet *tapeset = aggstate->hash_tapeinfo->tapeset;
- int tapenum = spill->partitions[i];
+ LogicalTape *tape = spill->partitions[i];
HashAggBatch *new_batch;
double cardinality;
@@ -3164,10 +3080,9 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
freeHyperLogLog(&spill->hll_card[i]);
/* rewinding frees the buffer while not in use */
- LogicalTapeRewindForRead(tapeset, tapenum,
- HASHAGG_READ_BUFFER_SIZE);
+ LogicalTapeRewindForRead(tape, HASHAGG_READ_BUFFER_SIZE);
- new_batch = hashagg_batch_new(tapeset, tapenum, setno,
+ new_batch = hashagg_batch_new(tape, setno,
spill->ntuples[i], cardinality,
used_bits);
aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
@@ -3214,14 +3129,10 @@ hashagg_reset_spill_state(AggState *aggstate)
aggstate->hash_batches = NIL;
/* close tape set */
- if (aggstate->hash_tapeinfo != NULL)
+ if (aggstate->hash_tapeset != NULL)
{
- HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
-
- LogicalTapeSetClose(tapeinfo->tapeset);
- pfree(tapeinfo->freetapes);
- pfree(tapeinfo);
- aggstate->hash_tapeinfo = NULL;
+ LogicalTapeSetClose(aggstate->hash_tapeset);
+ aggstate->hash_tapeset = NULL;
}
}
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 28905124f96..ecefa9bac96 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -9,8 +9,7 @@
* there is an annoying problem: the peak space usage is at least twice
* the volume of actual data to be sorted. (This must be so because each
* datum will appear in both the input and output tapes of the final
- * merge pass. For seven-tape polyphase merge, which is otherwise a
- * pretty good algorithm, peak usage is more like 4x actual data volume.)
+ * merge pass.)
*
* We can work around this problem by recognizing that any one tape
* dataset (with the possible exception of the final output) is written
@@ -137,6 +136,8 @@ typedef struct TapeBlockTrailer
*/
typedef struct LogicalTape
{
+ LogicalTapeSet *tapeSet; /* tape set this tape is part of */
+
bool writing; /* T while in write phase */
bool frozen; /* T if blocks should not be freed when read */
bool dirty; /* does buffer need to be written? */
@@ -180,11 +181,14 @@ typedef struct LogicalTape
* This data structure represents a set of related "logical tapes" sharing
* space in a single underlying file. (But that "file" may be multiple files
* if needed to escape OS limits on file size; buffile.c handles that for us.)
- * The number of tapes is fixed at creation.
+ * Tapes belonging to a tape set can be created and destroyed on-the-fly, on
+ * demand.
*/
struct LogicalTapeSet
{
BufFile *pfile; /* underlying file for whole tape set */
+ SharedFileSet *fileset;
+ int worker; /* worker # if shared, -1 for leader/serial */
/*
* File size tracking. nBlocksWritten is the size of the underlying file,
@@ -213,22 +217,16 @@ struct LogicalTapeSet
long nFreeBlocks; /* # of currently free blocks */
Size freeBlocksLen; /* current allocated length of freeBlocks[] */
bool enable_prealloc; /* preallocate write blocks? */
-
- /* The array of logical tapes. */
- int nTapes; /* # of logical tapes in set */
- LogicalTape *tapes; /* has nTapes nentries */
};
+static LogicalTape *ltsCreateTape(LogicalTapeSet *lts);
static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static long ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt);
static long ltsGetFreeBlock(LogicalTapeSet *lts);
static long ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt);
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
-static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
- SharedFileSet *fileset);
-static void ltsInitTape(LogicalTape *lt);
-static void ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt);
+static void ltsInitReadBuffer(LogicalTape *lt);
/*
@@ -304,7 +302,7 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
* Returns true if anything was read, 'false' on EOF.
*/
static bool
-ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
+ltsReadFillBuffer(LogicalTape *lt)
{
lt->pos = 0;
lt->nbytes = 0;
@@ -321,9 +319,9 @@ ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
datablocknum += lt->offsetBlockNumber;
/* Read the block */
- ltsReadBlock(lts, datablocknum, (void *) thisbuf);
+ ltsReadBlock(lt->tapeSet, datablocknum, (void *) thisbuf);
if (!lt->frozen)
- ltsReleaseBlock(lts, datablocknum);
+ ltsReleaseBlock(lt->tapeSet, datablocknum);
lt->curBlockNumber = lt->nextBlockNumber;
lt->nbytes += TapeBlockGetNBytes(thisbuf);
@@ -530,126 +528,13 @@ ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
}
}
-/*
- * Claim ownership of a set of logical tapes from existing shared BufFiles.
- *
- * Caller should be leader process. Though tapes are marked as frozen in
- * workers, they are not frozen when opened within leader, since unfrozen tapes
- * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
- * for random access.)
- */
-static void
-ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
- SharedFileSet *fileset)
-{
- LogicalTape *lt = NULL;
- long tapeblocks = 0L;
- long nphysicalblocks = 0L;
- int i;
-
- /* Should have at least one worker tape, plus leader's tape */
- Assert(lts->nTapes >= 2);
-
- /*
- * Build concatenated view of all BufFiles, remembering the block number
- * where each source file begins. No changes are needed for leader/last
- * tape.
- */
- for (i = 0; i < lts->nTapes - 1; i++)
- {
- char filename[MAXPGPATH];
- BufFile *file;
- int64 filesize;
-
- lt = <s->tapes[i];
-
- pg_itoa(i, filename);
- file = BufFileOpenShared(fileset, filename, O_RDONLY);
- filesize = BufFileSize(file);
-
- /*
- * Stash first BufFile, and concatenate subsequent BufFiles to that.
- * Store block offset into each tape as we go.
- */
- lt->firstBlockNumber = shared[i].firstblocknumber;
- if (i == 0)
- {
- lts->pfile = file;
- lt->offsetBlockNumber = 0L;
- }
- else
- {
- lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
- }
- /* Don't allocate more for read buffer than could possibly help */
- lt->max_size = Min(MaxAllocSize, filesize);
- tapeblocks = filesize / BLCKSZ;
- nphysicalblocks += tapeblocks;
- }
-
- /*
- * Set # of allocated blocks, as well as # blocks written. Use extent of
- * new BufFile space (from 0 to end of last worker's tape space) for this.
- * Allocated/written blocks should include space used by holes left
- * between concatenated BufFiles.
- */
- lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
- lts->nBlocksWritten = lts->nBlocksAllocated;
-
- /*
- * Compute number of hole blocks so that we can later work backwards, and
- * instrument number of physical blocks. We don't simply use physical
- * blocks directly for instrumentation because this would break if we ever
- * subsequently wrote to the leader tape.
- *
- * Working backwards like this keeps our options open. If shared BufFiles
- * ever support being written to post-export, logtape.c can automatically
- * take advantage of that. We'd then support writing to the leader tape
- * while recycling space from worker tapes, because the leader tape has a
- * zero offset (write routines won't need to have extra logic to apply an
- * offset).
- *
- * The only thing that currently prevents writing to the leader tape from
- * working is the fact that BufFiles opened using BufFileOpenShared() are
- * read-only by definition, but that could be changed if it seemed
- * worthwhile. For now, writing to the leader tape will raise a "Bad file
- * descriptor" error, so tuplesort must avoid writing to the leader tape
- * altogether.
- */
- lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
-}
-
-/*
- * Initialize per-tape struct. Note we allocate the I/O buffer lazily.
- */
-static void
-ltsInitTape(LogicalTape *lt)
-{
- lt->writing = true;
- lt->frozen = false;
- lt->dirty = false;
- lt->firstBlockNumber = -1L;
- lt->curBlockNumber = -1L;
- lt->nextBlockNumber = -1L;
- lt->offsetBlockNumber = 0L;
- lt->buffer = NULL;
- lt->buffer_size = 0;
- /* palloc() larger than MaxAllocSize would fail */
- lt->max_size = MaxAllocSize;
- lt->pos = 0;
- lt->nbytes = 0;
- lt->prealloc = NULL;
- lt->nprealloc = 0;
- lt->prealloc_size = 0;
-}
-
/*
* Lazily allocate and initialize the read buffer. This avoids waste when many
* tapes are open at once, but not all are active between rewinding and
* reading.
*/
static void
-ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt)
+ltsInitReadBuffer(LogicalTape *lt)
{
Assert(lt->buffer_size > 0);
lt->buffer = palloc(lt->buffer_size);
@@ -658,40 +543,32 @@ ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt)
lt->nextBlockNumber = lt->firstBlockNumber;
lt->pos = 0;
lt->nbytes = 0;
- ltsReadFillBuffer(lts, lt);
+ ltsReadFillBuffer(lt);
}
/*
- * Create a set of logical tapes in a temporary underlying file.
+ * Create a tape set, backed by a temporary underlying file.
*
- * Each tape is initialized in write state. Serial callers pass ntapes,
- * NULL argument for shared, and -1 for worker. Parallel worker callers
- * pass ntapes, a shared file handle, NULL shared argument, and their own
- * worker number. Leader callers, which claim shared worker tapes here,
- * must supply non-sentinel values for all arguments except worker number,
- * which should be -1.
+ * The tape set is initially empty. Use LogicalTapeCreate() to create
+ * tapes in it.
*
- * Leader caller is passing back an array of metadata each worker captured
- * when LogicalTapeFreeze() was called for their final result tapes. Passed
- * tapes array is actually sized ntapes - 1, because it includes only
- * worker tapes, whereas leader requires its own leader tape. Note that we
- * rely on the assumption that reclaimed worker tapes will only be read
- * from once by leader, and never written to again (tapes are initialized
- * for writing, but that's only to be consistent). Leader may not write to
- * its own tape purely due to a restriction in the shared buffile
- * infrastructure that may be lifted in the future.
+ * Serial callers pass NULL argument for shared, and -1 for worker. Parallel
+ * worker callers pass a shared file handle and their own worker number.
+ *
+ * Leader callers pass a shared file handle and -1 for worker. After creating
+ * the tape set, use LogicalTapeImport() to import the worker tapes into it.
+ *
+ * Currently, the leader will only import worker tapes into the set, it does
+ * not create tapes of its own, although in principle that should work.
*/
LogicalTapeSet *
-LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
- SharedFileSet *fileset, int worker)
+LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
{
LogicalTapeSet *lts;
- int i;
/*
* Create top-level struct including per-tape LogicalTape structs.
*/
- Assert(ntapes > 0);
lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
lts->nBlocksAllocated = 0L;
lts->nBlocksWritten = 0L;
@@ -701,22 +578,21 @@ LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
lts->nFreeBlocks = 0;
lts->enable_prealloc = preallocate;
- lts->nTapes = ntapes;
- lts->tapes = (LogicalTape *) palloc(ntapes * sizeof(LogicalTape));
- for (i = 0; i < ntapes; i++)
- ltsInitTape(<s->tapes[i]);
+ lts->fileset = fileset;
+ lts->worker = worker;
/*
* Create temp BufFile storage as required.
*
- * Leader concatenates worker tapes, which requires special adjustment to
- * final tapeset data. Things are simpler for the worker case and the
+ * In leader, we hijack the BufFile of the first tape that's imported, and
+ * concatenate the BufFiles of any subsequent tapes to that. Hence don't
+ * create a BufFile here. Things are simpler for the worker case and the
* serial case, though. They are generally very similar -- workers use a
* shared fileset, whereas serial sorts use a conventional serial BufFile.
*/
- if (shared)
- ltsConcatWorkerTapes(lts, shared, fileset);
+ if (fileset && worker == -1)
+ lts->pfile = NULL;
else if (fileset)
{
char filename[MAXPGPATH];
@@ -730,27 +606,147 @@ LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
return lts;
}
+
/*
- * Close a logical tape set and release all resources.
+ * Claim ownership of a logical tape from an existing shared BufFile.
+ *
+ * Caller should be leader process. Though tapes are marked as frozen in
+ * workers, they are not frozen when opened within leader, since unfrozen tapes
+ * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
+ * for random access.)
*/
-void
-LogicalTapeSetClose(LogicalTapeSet *lts)
+LogicalTape *
+LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
{
LogicalTape *lt;
- int i;
+ long tapeblocks;
+ char filename[MAXPGPATH];
+ BufFile *file;
+ int64 filesize;
- BufFileClose(lts->pfile);
- for (i = 0; i < lts->nTapes; i++)
+ lt = ltsCreateTape(lts);
+
+ /*
+ * build concatenated view of all buffiles, remembering the block number
+ * where each source file begins.
+ */
+ pg_itoa(worker, filename);
+ file = BufFileOpenShared(lts->fileset, filename, O_RDONLY);
+ filesize = BufFileSize(file);
+
+ /*
+ * Stash first BufFile, and concatenate subsequent BufFiles to that. Store
+ * block offset into each tape as we go.
+ */
+ lt->firstBlockNumber = shared->firstblocknumber;
+ if (lts->pfile == NULL)
{
- lt = <s->tapes[i];
- if (lt->buffer)
- pfree(lt->buffer);
+ lts->pfile = file;
+ lt->offsetBlockNumber = 0L;
}
- pfree(lts->tapes);
+ else
+ {
+ lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
+ }
+ /* Don't allocate more for read buffer than could possibly help */
+ lt->max_size = Min(MaxAllocSize, filesize);
+ tapeblocks = filesize / BLCKSZ;
+
+ /*
+ * Update # of allocated blocks and # blocks written to reflect the
+ * imported BufFile. Allocated/written blocks include space used by holes
+ * left between concatenated BufFiles. Also track the number of hole
+ * blocks so that we can later work backwards to calculate the number of
+ * physical blocks for instrumentation.
+ */
+ lts->nHoleBlocks += lt->offsetBlockNumber - lts->nBlocksAllocated;
+
+ lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
+ lts->nBlocksWritten = lts->nBlocksAllocated;
+
+ return lt;
+}
+
+/*
+ * Close a logical tape set and release all resources.
+ *
+ * NOTE: This doesn't close any of the tapes! You must close them
+ * first, or you can let them be destroyed along with the memory context.
+ */
+void
+LogicalTapeSetClose(LogicalTapeSet *lts)
+{
+ BufFileClose(lts->pfile);
pfree(lts->freeBlocks);
pfree(lts);
}
+/*
+ * Create a logical tape in the given tapeset.
+ *
+ * The tape is initialized in write state.
+ */
+LogicalTape *
+LogicalTapeCreate(LogicalTapeSet *lts)
+{
+ /*
+ * The only thing that currently prevents creating new tapes in leader is
+ * the fact that BufFiles opened using BufFileOpenShared() are read-only
+ * by definition, but that could be changed if it seemed worthwhile. For
+ * now, writing to the leader tape will raise a "Bad file descriptor"
+ * error, so tuplesort must avoid writing to the leader tape altogether.
+ */
+ if (lts->fileset && lts->worker == -1)
+ elog(ERROR, "cannot create new tapes in leader process");
+
+ return ltsCreateTape(lts);
+}
+
+static LogicalTape *
+ltsCreateTape(LogicalTapeSet *lts)
+{
+ LogicalTape *lt;
+
+ /*
+ * Create per-tape struct. Note we allocate the I/O buffer lazily.
+ */
+ lt = palloc(sizeof(LogicalTape));
+ lt->tapeSet = lts;
+ lt->writing = true;
+ lt->frozen = false;
+ lt->dirty = false;
+ lt->firstBlockNumber = -1L;
+ lt->curBlockNumber = -1L;
+ lt->nextBlockNumber = -1L;
+ lt->offsetBlockNumber = 0L;
+ lt->buffer = NULL;
+ lt->buffer_size = 0;
+ /* palloc() larger than MaxAllocSize would fail */
+ lt->max_size = MaxAllocSize;
+ lt->pos = 0;
+ lt->nbytes = 0;
+ lt->prealloc = NULL;
+ lt->nprealloc = 0;
+ lt->prealloc_size = 0;
+
+ return lt;
+}
+
+/*
+ * Close a logical tape.
+ *
+ * Note: This doesn't return any blocks to the free list! You must read
+ * the tape to the end first, to reuse the space. In current use, though,
+ * we only close tapes after fully reading them.
+ */
+void
+LogicalTapeClose(LogicalTape *lt)
+{
+ if (lt->buffer)
+ pfree(lt->buffer);
+ pfree(lt);
+}
+
/*
* Mark a logical tape set as not needing management of free space anymore.
*
@@ -772,14 +768,11 @@ LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
* There are no error returns; we ereport() on failure.
*/
void
-LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
- void *ptr, size_t size)
+LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size)
{
- LogicalTape *lt;
+ LogicalTapeSet *lts = lt->tapeSet;
size_t nthistime;
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
Assert(lt->writing);
Assert(lt->offsetBlockNumber == 0L);
@@ -818,11 +811,11 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
* First allocate the next block, so that we can store it in the
* 'next' pointer of this block.
*/
- nextBlockNumber = ltsGetBlock(lts, lt);
+ nextBlockNumber = ltsGetBlock(lt->tapeSet, lt);
/* set the next-pointer and dump the current block. */
TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
- ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+ ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
/* initialize the prev-pointer of the next block */
TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
@@ -860,12 +853,9 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
* byte buffer is used.
*/
void
-LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
+LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
{
- LogicalTape *lt;
-
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
+ LogicalTapeSet *lts = lt->tapeSet;
/*
* Round and cap buffer_size if needed.
@@ -907,7 +897,7 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
lt->buffer_size - lt->nbytes);
TapeBlockSetNBytes(lt->buffer, lt->nbytes);
- ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+ ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
}
lt->writing = false;
}
@@ -939,61 +929,28 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
}
}
-/*
- * Rewind logical tape and switch from reading to writing.
- *
- * NOTE: we assume the caller has read the tape to the end; otherwise
- * untouched data will not have been freed. We could add more code to free
- * any unread blocks, but in current usage of this module it'd be useless
- * code.
- */
-void
-LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
-{
- LogicalTape *lt;
-
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
-
- Assert(!lt->writing && !lt->frozen);
- lt->writing = true;
- lt->dirty = false;
- lt->firstBlockNumber = -1L;
- lt->curBlockNumber = -1L;
- lt->pos = 0;
- lt->nbytes = 0;
- if (lt->buffer)
- pfree(lt->buffer);
- lt->buffer = NULL;
- lt->buffer_size = 0;
-}
-
/*
* Read from a logical tape.
*
* Early EOF is indicated by return value less than #bytes requested.
*/
size_t
-LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
- void *ptr, size_t size)
+LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
{
- LogicalTape *lt;
size_t nread = 0;
size_t nthistime;
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
Assert(!lt->writing);
if (lt->buffer == NULL)
- ltsInitReadBuffer(lts, lt);
+ ltsInitReadBuffer(lt);
while (size > 0)
{
if (lt->pos >= lt->nbytes)
{
/* Try to load more data into buffer. */
- if (!ltsReadFillBuffer(lts, lt))
+ if (!ltsReadFillBuffer(lt))
break; /* EOF */
}
@@ -1031,12 +988,10 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
* Serial sorts should set share to NULL.
*/
void
-LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
+LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
{
- LogicalTape *lt;
+ LogicalTapeSet *lts = lt->tapeSet;
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
Assert(lt->writing);
Assert(lt->offsetBlockNumber == 0L);
@@ -1058,8 +1013,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
lt->buffer_size - lt->nbytes);
TapeBlockSetNBytes(lt->buffer, lt->nbytes);
- ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
- lt->writing = false;
+ ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
}
lt->writing = false;
lt->frozen = true;
@@ -1086,7 +1040,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
if (lt->firstBlockNumber == -1L)
lt->nextBlockNumber = -1L;
- ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+ ltsReadBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
if (TapeBlockIsLast(lt->buffer))
lt->nextBlockNumber = -1L;
else
@@ -1101,25 +1055,6 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
}
}
-/*
- * Add additional tapes to this tape set. Not intended to be used when any
- * tapes are frozen.
- */
-void
-LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
-{
- int i;
- int nTapesOrig = lts->nTapes;
-
- lts->nTapes += nAdditional;
-
- lts->tapes = (LogicalTape *) repalloc(lts->tapes,
- lts->nTapes * sizeof(LogicalTape));
-
- for (i = nTapesOrig; i < lts->nTapes; i++)
- ltsInitTape(<s->tapes[i]);
-}
-
/*
* Backspace the tape a given number of bytes. (We also support a more
* general seek interface, see below.)
@@ -1134,18 +1069,15 @@ LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
* that case.
*/
size_t
-LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
+LogicalTapeBackspace(LogicalTape *lt, size_t size)
{
- LogicalTape *lt;
size_t seekpos = 0;
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
Assert(lt->frozen);
Assert(lt->buffer_size == BLCKSZ);
if (lt->buffer == NULL)
- ltsInitReadBuffer(lts, lt);
+ ltsInitReadBuffer(lt);
/*
* Easy case for seek within current block.
@@ -1175,7 +1107,7 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
return seekpos;
}
- ltsReadBlock(lts, prev, (void *) lt->buffer);
+ ltsReadBlock(lt->tapeSet, prev, (void *) lt->buffer);
if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
@@ -1208,23 +1140,18 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
* LogicalTapeTell().
*/
void
-LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
- long blocknum, int offset)
+LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset)
{
- LogicalTape *lt;
-
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
Assert(lt->frozen);
Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
Assert(lt->buffer_size == BLCKSZ);
if (lt->buffer == NULL)
- ltsInitReadBuffer(lts, lt);
+ ltsInitReadBuffer(lt);
if (blocknum != lt->curBlockNumber)
{
- ltsReadBlock(lts, blocknum, (void *) lt->buffer);
+ ltsReadBlock(lt->tapeSet, blocknum, (void *) lt->buffer);
lt->curBlockNumber = blocknum;
lt->nbytes = TapeBlockPayloadSize;
lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
@@ -1242,16 +1169,10 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
* the position for a seek after freezing. Not clear if anyone needs that.
*/
void
-LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
- long *blocknum, int *offset)
+LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset)
{
- LogicalTape *lt;
-
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
-
if (lt->buffer == NULL)
- ltsInitReadBuffer(lts, lt);
+ ltsInitReadBuffer(lt);
Assert(lt->offsetBlockNumber == 0L);
@@ -1271,12 +1192,5 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
long
LogicalTapeSetBlocks(LogicalTapeSet *lts)
{
-#ifdef USE_ASSERT_CHECKING
- for (int i = 0; i < lts->nTapes; i++)
- {
- LogicalTape *lt = <s->tapes[i];
- Assert(!lt->writing || lt->buffer == NULL);
- }
-#endif
return lts->nBlocksWritten - lts->nHoleBlocks;
}
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index d0cc04a878a..07b6c92f54d 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -262,6 +262,7 @@ struct Tuplesortstate
MemoryContext sortcontext; /* memory context holding most sort data */
MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
+ LogicalTape **tapes;
/*
* These function pointers decouple the routines that must know what kind
@@ -290,7 +291,7 @@ struct Tuplesortstate
* SortTuple struct!), and increase state->availMem by the amount of
* memory space thereby released.
*/
- void (*writetup) (Tuplesortstate *state, int tapenum,
+ void (*writetup) (Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
/*
@@ -299,7 +300,7 @@ struct Tuplesortstate
* from the slab memory arena, or is palloc'd, see readtup_alloc().
*/
void (*readtup) (Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ LogicalTape *tape, unsigned int len);
/*
* This array holds the tuples now in sort memory. If we are in state
@@ -393,7 +394,7 @@ struct Tuplesortstate
* the next tuple to return. (In the tape case, the tape's current read
* position is also critical state.)
*/
- int result_tape; /* actual tape number of finished output */
+ LogicalTape *result_tape; /* tape of finished output */
int current; /* array index (only used if SORTEDINMEM) */
bool eof_reached; /* reached EOF (needed for cursors) */
@@ -599,9 +600,9 @@ struct Sharedsort
*/
/* When using this macro, beware of double evaluation of len */
-#define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
+#define LogicalTapeReadExact(tape, ptr, len) \
do { \
- if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
+ if (LogicalTapeRead(tape, ptr, len) != (size_t) (len)) \
elog(ERROR, "unexpected end of data"); \
} while(0)
@@ -619,7 +620,7 @@ static void init_slab_allocator(Tuplesortstate *state, int numSlots);
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state);
-static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
+static bool mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup);
static void dumptuples(Tuplesortstate *state, bool alltuples);
static void make_bounded_heap(Tuplesortstate *state);
static void sort_bounded_heap(Tuplesortstate *state);
@@ -628,39 +629,39 @@ static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
static void tuplesort_heap_delete_top(Tuplesortstate *state);
static void reversedirection(Tuplesortstate *state);
-static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
-static void markrunend(Tuplesortstate *state, int tapenum);
+static unsigned int getlen(LogicalTape *tape, bool eofOK);
+static void markrunend(LogicalTape *tape);
static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
static int comparetup_heap(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_heap(Tuplesortstate *state, int tapenum,
+static void writetup_heap(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ LogicalTape *tape, unsigned int len);
static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_cluster(Tuplesortstate *state, int tapenum,
+static void writetup_cluster(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ LogicalTape *tape, unsigned int len);
static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_index(Tuplesortstate *state, int tapenum,
+static void writetup_index(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_index(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ LogicalTape *tape, unsigned int len);
static int comparetup_datum(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_datum(Tuplesortstate *state, int tapenum,
+static void writetup_datum(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ LogicalTape *tape, unsigned int len);
static int worker_get_identifier(Tuplesortstate *state);
static void worker_freeze_result_tape(Tuplesortstate *state);
static void worker_nomergeruns(Tuplesortstate *state);
@@ -869,7 +870,7 @@ tuplesort_begin_batch(Tuplesortstate *state)
* inittapes(), if needed
*/
- state->result_tape = -1; /* flag that result tape has not been formed */
+ state->result_tape = NULL; /* flag that result tape has not been formed */
MemoryContextSwitchTo(oldcontext);
}
@@ -2202,7 +2203,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
if (state->eof_reached)
return false;
- if ((tuplen = getlen(state, state->result_tape, true)) != 0)
+ if ((tuplen = getlen(state->result_tape, true)) != 0)
{
READTUP(state, stup, state->result_tape, tuplen);
@@ -2235,8 +2236,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
* end of file; back up to fetch last tuple's ending length
* word. If seek fails we must have a completely empty file.
*/
- nmoved = LogicalTapeBackspace(state->tapeset,
- state->result_tape,
+ nmoved = LogicalTapeBackspace(state->result_tape,
2 * sizeof(unsigned int));
if (nmoved == 0)
return false;
@@ -2250,20 +2250,18 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
* Back up and fetch previously-returned tuple's ending length
* word. If seek fails, assume we are at start of file.
*/
- nmoved = LogicalTapeBackspace(state->tapeset,
- state->result_tape,
+ nmoved = LogicalTapeBackspace(state->result_tape,
sizeof(unsigned int));
if (nmoved == 0)
return false;
else if (nmoved != sizeof(unsigned int))
elog(ERROR, "unexpected tape position");
- tuplen = getlen(state, state->result_tape, false);
+ tuplen = getlen(state->result_tape, false);
/*
* Back up to get ending length word of tuple before it.
*/
- nmoved = LogicalTapeBackspace(state->tapeset,
- state->result_tape,
+ nmoved = LogicalTapeBackspace(state->result_tape,
tuplen + 2 * sizeof(unsigned int));
if (nmoved == tuplen + sizeof(unsigned int))
{
@@ -2280,15 +2278,14 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
elog(ERROR, "bogus tuple length in backward scan");
}
- tuplen = getlen(state, state->result_tape, false);
+ tuplen = getlen(state->result_tape, false);
/*
* Now we have the length of the prior tuple, back up and read it.
* Note: READTUP expects we are positioned after the initial
* length word of the tuple, so back up to that point.
*/
- nmoved = LogicalTapeBackspace(state->tapeset,
- state->result_tape,
+ nmoved = LogicalTapeBackspace(state->result_tape,
tuplen);
if (nmoved != tuplen)
elog(ERROR, "bogus tuple length in backward scan");
@@ -2346,11 +2343,10 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
tuplesort_heap_delete_top(state);
/*
- * Rewind to free the read buffer. It'd go away at the
- * end of the sort anyway, but better to release the
- * memory early.
+ * Close the tape. It'd go away at the end of the sort
+ * anyway, but better to release the memory early.
*/
- LogicalTapeRewindForWrite(state->tapeset, srcTape);
+ LogicalTapeClose(state->tapes[srcTape]);
return true;
}
newtup.srctape = srcTape;
@@ -2648,9 +2644,12 @@ inittapes(Tuplesortstate *state, bool mergeruns)
/* Create the tape set and allocate the per-tape data arrays */
inittapestate(state, maxTapes);
state->tapeset =
- LogicalTapeSetCreate(maxTapes, false, NULL,
+ LogicalTapeSetCreate(false,
state->shared ? &state->shared->fileset : NULL,
state->worker);
+ state->tapes = palloc(maxTapes * sizeof(LogicalTape *));
+ for (j = 0; j < maxTapes; j++)
+ state->tapes[j] = LogicalTapeCreate(state->tapeset);
state->currentRun = 0;
@@ -2900,7 +2899,7 @@ mergeruns(Tuplesortstate *state)
/* End of step D2: rewind all output tapes to prepare for merging */
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
- LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
+ LogicalTapeRewindForRead(state->tapes[tapenum], state->read_buffer_size);
for (;;)
{
@@ -2962,11 +2961,14 @@ mergeruns(Tuplesortstate *state)
/* Step D6: decrease level */
if (--state->Level == 0)
break;
+
/* rewind output tape T to use as new input */
- LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
+ LogicalTapeRewindForRead(state->tapes[state->tp_tapenum[state->tapeRange]],
state->read_buffer_size);
- /* rewind used-up input tape P, and prepare it for write pass */
- LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
+
+ /* close used-up input tape P, and create a new one for write pass */
+ LogicalTapeClose(state->tapes[state->tp_tapenum[state->tapeRange - 1]]);
+ state->tapes[state->tp_tapenum[state->tapeRange - 1]] = LogicalTapeCreate(state->tapeset);
state->tp_runs[state->tapeRange - 1] = 0;
/*
@@ -2994,18 +2996,21 @@ mergeruns(Tuplesortstate *state)
* output tape while rewinding it. The last iteration of step D6 would be
* a waste of cycles anyway...
*/
- state->result_tape = state->tp_tapenum[state->tapeRange];
+ state->result_tape = state->tapes[state->tp_tapenum[state->tapeRange]];
if (!WORKER(state))
- LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
+ LogicalTapeFreeze(state->result_tape, NULL);
else
worker_freeze_result_tape(state);
state->status = TSS_SORTEDONTAPE;
- /* Release the read buffers of all the other tapes, by rewinding them. */
+ /* Close all the other tapes, to release their read buffers. */
for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
{
- if (tapenum != state->result_tape)
- LogicalTapeRewindForWrite(state->tapeset, tapenum);
+ if (state->tapes[tapenum] != state->result_tape)
+ {
+ LogicalTapeClose(state->tapes[tapenum]);
+ state->tapes[tapenum] = NULL;
+ }
}
}
@@ -3018,7 +3023,8 @@ mergeruns(Tuplesortstate *state)
static void
mergeonerun(Tuplesortstate *state)
{
- int destTape = state->tp_tapenum[state->tapeRange];
+ int destTapeNum = state->tp_tapenum[state->tapeRange];
+ LogicalTape *destTape = state->tapes[destTapeNum];
int srcTape;
/*
@@ -3061,7 +3067,7 @@ mergeonerun(Tuplesortstate *state)
* When the heap empties, we're done. Write an end-of-run marker on the
* output tape, and increment its count of real runs.
*/
- markrunend(state, destTape);
+ markrunend(destTape);
state->tp_runs[state->tapeRange]++;
#ifdef TRACE_SORT
@@ -3127,17 +3133,18 @@ beginmerge(Tuplesortstate *state)
* Returns false on EOF.
*/
static bool
-mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
+mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup)
{
+ LogicalTape *srcTape = state->tapes[srcTapeIndex];
unsigned int tuplen;
- if (!state->mergeactive[srcTape])
+ if (!state->mergeactive[srcTapeIndex])
return false; /* tape's run is already exhausted */
/* read next tuple, if any */
- if ((tuplen = getlen(state, srcTape, true)) == 0)
+ if ((tuplen = getlen(srcTape, true)) == 0)
{
- state->mergeactive[srcTape] = false;
+ state->mergeactive[srcTapeIndex] = false;
return false;
}
READTUP(state, stup, srcTape, tuplen);
@@ -3154,6 +3161,7 @@ mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
static void
dumptuples(Tuplesortstate *state, bool alltuples)
{
+ LogicalTape *destTape;
int memtupwrite;
int i;
@@ -3220,10 +3228,10 @@ dumptuples(Tuplesortstate *state, bool alltuples)
#endif
memtupwrite = state->memtupcount;
+ destTape = state->tapes[state->tp_tapenum[state->destTape]];
for (i = 0; i < memtupwrite; i++)
{
- WRITETUP(state, state->tp_tapenum[state->destTape],
- &state->memtuples[i]);
+ WRITETUP(state, destTape, &state->memtuples[i]);
state->memtupcount--;
}
@@ -3236,7 +3244,7 @@ dumptuples(Tuplesortstate *state, bool alltuples)
*/
MemoryContextReset(state->tuplecontext);
- markrunend(state, state->tp_tapenum[state->destTape]);
+ markrunend(destTape);
state->tp_runs[state->destTape]++;
state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
@@ -3270,9 +3278,7 @@ tuplesort_rescan(Tuplesortstate *state)
state->markpos_eof = false;
break;
case TSS_SORTEDONTAPE:
- LogicalTapeRewindForRead(state->tapeset,
- state->result_tape,
- 0);
+ LogicalTapeRewindForRead(state->result_tape, 0);
state->eof_reached = false;
state->markpos_block = 0L;
state->markpos_offset = 0;
@@ -3303,8 +3309,7 @@ tuplesort_markpos(Tuplesortstate *state)
state->markpos_eof = state->eof_reached;
break;
case TSS_SORTEDONTAPE:
- LogicalTapeTell(state->tapeset,
- state->result_tape,
+ LogicalTapeTell(state->result_tape,
&state->markpos_block,
&state->markpos_offset);
state->markpos_eof = state->eof_reached;
@@ -3335,8 +3340,7 @@ tuplesort_restorepos(Tuplesortstate *state)
state->eof_reached = state->markpos_eof;
break;
case TSS_SORTEDONTAPE:
- LogicalTapeSeek(state->tapeset,
- state->result_tape,
+ LogicalTapeSeek(state->result_tape,
state->markpos_block,
state->markpos_offset);
state->eof_reached = state->markpos_eof;
@@ -3678,11 +3682,11 @@ reversedirection(Tuplesortstate *state)
*/
static unsigned int
-getlen(Tuplesortstate *state, int tapenum, bool eofOK)
+getlen(LogicalTape *tape, bool eofOK)
{
unsigned int len;
- if (LogicalTapeRead(state->tapeset, tapenum,
+ if (LogicalTapeRead(tape,
&len, sizeof(len)) != sizeof(len))
elog(ERROR, "unexpected end of tape");
if (len == 0 && !eofOK)
@@ -3691,11 +3695,11 @@ getlen(Tuplesortstate *state, int tapenum, bool eofOK)
}
static void
-markrunend(Tuplesortstate *state, int tapenum)
+markrunend(LogicalTape *tape)
{
unsigned int len = 0;
- LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
+ LogicalTapeWrite(tape, (void *) &len, sizeof(len));
}
/*
@@ -3873,7 +3877,7 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
}
static void
-writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_heap(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
{
MinimalTuple tuple = (MinimalTuple) stup->tuple;
@@ -3884,13 +3888,10 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
/* total on-disk footprint: */
unsigned int tuplen = tupbodylen + sizeof(int);
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &tuplen, sizeof(tuplen));
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) tupbody, tupbodylen);
+ LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, (void *) tupbody, tupbodylen);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
if (!state->slabAllocatorUsed)
{
@@ -3901,7 +3902,7 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
static void
readtup_heap(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len)
+ LogicalTape *tape, unsigned int len)
{
unsigned int tupbodylen = len - sizeof(int);
unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
@@ -3911,11 +3912,9 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
/* read in the tuple proper */
tuple->t_len = tuplen;
- LogicalTapeReadExact(state->tapeset, tapenum,
- tupbody, tupbodylen);
+ LogicalTapeReadExact(tape, tupbody, tupbodylen);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeReadExact(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
stup->tuple = (void *) tuple;
/* set up first-column key value */
htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
@@ -4116,21 +4115,17 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
}
static void
-writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_cluster(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
{
HeapTuple tuple = (HeapTuple) stup->tuple;
unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
/* We need to store t_self, but not other fields of HeapTupleData */
- LogicalTapeWrite(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
- LogicalTapeWrite(state->tapeset, tapenum,
- &tuple->t_self, sizeof(ItemPointerData));
- LogicalTapeWrite(state->tapeset, tapenum,
- tuple->t_data, tuple->t_len);
+ LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, &tuple->t_self, sizeof(ItemPointerData));
+ LogicalTapeWrite(tape, tuple->t_data, tuple->t_len);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeWrite(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
if (!state->slabAllocatorUsed)
{
@@ -4141,7 +4136,7 @@ writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
static void
readtup_cluster(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int tuplen)
+ LogicalTape *tape, unsigned int tuplen)
{
unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
HeapTuple tuple = (HeapTuple) readtup_alloc(state,
@@ -4150,16 +4145,13 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
/* Reconstruct the HeapTupleData header */
tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
tuple->t_len = t_len;
- LogicalTapeReadExact(state->tapeset, tapenum,
- &tuple->t_self, sizeof(ItemPointerData));
+ LogicalTapeReadExact(tape, &tuple->t_self, sizeof(ItemPointerData));
/* We don't currently bother to reconstruct t_tableOid */
tuple->t_tableOid = InvalidOid;
/* Read in the tuple body */
- LogicalTapeReadExact(state->tapeset, tapenum,
- tuple->t_data, tuple->t_len);
+ LogicalTapeReadExact(tape, tuple->t_data, tuple->t_len);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeReadExact(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
stup->tuple = (void *) tuple;
/* set up first-column key value, if it's a simple column */
if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
@@ -4373,19 +4365,16 @@ copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
}
static void
-writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
{
IndexTuple tuple = (IndexTuple) stup->tuple;
unsigned int tuplen;
tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &tuplen, sizeof(tuplen));
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) tuple, IndexTupleSize(tuple));
+ LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, (void *) tuple, IndexTupleSize(tuple));
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
if (!state->slabAllocatorUsed)
{
@@ -4396,16 +4385,14 @@ writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
static void
readtup_index(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len)
+ LogicalTape *tape, unsigned int len)
{
unsigned int tuplen = len - sizeof(unsigned int);
IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen);
- LogicalTapeReadExact(state->tapeset, tapenum,
- tuple, tuplen);
+ LogicalTapeReadExact(tape, tuple, tuplen);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeReadExact(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
stup->tuple = (void *) tuple;
/* set up first-column key value */
stup->datum1 = index_getattr(tuple,
@@ -4447,7 +4434,7 @@ copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
}
static void
-writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_datum(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
{
void *waddr;
unsigned int tuplen;
@@ -4472,13 +4459,10 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
writtenlen = tuplen + sizeof(unsigned int);
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &writtenlen, sizeof(writtenlen));
- LogicalTapeWrite(state->tapeset, tapenum,
- waddr, tuplen);
+ LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen));
+ LogicalTapeWrite(tape, waddr, tuplen);
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeWrite(state->tapeset, tapenum,
- (void *) &writtenlen, sizeof(writtenlen));
+ LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen));
if (!state->slabAllocatorUsed && stup->tuple)
{
@@ -4489,7 +4473,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
static void
readtup_datum(Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len)
+ LogicalTape *tape, unsigned int len)
{
unsigned int tuplen = len - sizeof(unsigned int);
@@ -4503,8 +4487,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
else if (!state->tuples)
{
Assert(tuplen == sizeof(Datum));
- LogicalTapeReadExact(state->tapeset, tapenum,
- &stup->datum1, tuplen);
+ LogicalTapeReadExact(tape, &stup->datum1, tuplen);
stup->isnull1 = false;
stup->tuple = NULL;
}
@@ -4512,16 +4495,14 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
{
void *raddr = readtup_alloc(state, tuplen);
- LogicalTapeReadExact(state->tapeset, tapenum,
- raddr, tuplen);
+ LogicalTapeReadExact(tape, raddr, tuplen);
stup->datum1 = PointerGetDatum(raddr);
stup->isnull1 = false;
stup->tuple = raddr;
}
if (state->randomAccess) /* need trailing length word? */
- LogicalTapeReadExact(state->tapeset, tapenum,
- &tuplen, sizeof(tuplen));
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
}
/*
@@ -4633,7 +4614,7 @@ worker_freeze_result_tape(Tuplesortstate *state)
TapeShare output;
Assert(WORKER(state));
- Assert(state->result_tape != -1);
+ Assert(state->result_tape != NULL);
Assert(state->memtupcount == 0);
/*
@@ -4649,7 +4630,7 @@ worker_freeze_result_tape(Tuplesortstate *state)
* Parallel worker requires result tape metadata, which is to be stored in
* shared memory for leader
*/
- LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
+ LogicalTapeFreeze(state->result_tape, &output);
/* Store properties of output tape, and update finished worker count */
SpinLockAcquire(&shared->mutex);
@@ -4668,9 +4649,9 @@ static void
worker_nomergeruns(Tuplesortstate *state)
{
Assert(WORKER(state));
- Assert(state->result_tape == -1);
+ Assert(state->result_tape == NULL);
- state->result_tape = state->tp_tapenum[state->destTape];
+ state->result_tape = state->tapes[state->tp_tapenum[state->destTape]];
worker_freeze_result_tape(state);
}
@@ -4714,9 +4695,13 @@ leader_takeover_tapes(Tuplesortstate *state)
* randomAccess is disallowed for parallel sorts.
*/
inittapestate(state, nParticipants + 1);
- state->tapeset = LogicalTapeSetCreate(nParticipants + 1, false,
- shared->tapes, &shared->fileset,
+ state->tapeset = LogicalTapeSetCreate(false,
+ &shared->fileset,
state->worker);
+ state->tapes = palloc(state->maxTapes * sizeof(LogicalTape *));
+ for (j = 0; j < nParticipants; j++)
+ state->tapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
+ /* tapes[nParticipants] represents the "leader tape", which is not used */
/* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
state->currentRun = nParticipants;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 6c0a7d68d61..16378209f08 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -40,6 +40,7 @@ struct ExprContext;
struct RangeTblEntry; /* avoid including parsenodes.h here */
struct ExprEvalStep; /* avoid including execExpr.h everywhere */
struct CopyMultiInsertBuffer;
+struct LogicalTapeSet;
/* ----------------
@@ -2183,7 +2184,7 @@ typedef struct AggState
bool table_filled; /* hash table filled yet? */
int num_hashes;
MemoryContext hash_metacxt; /* memory for hash table itself */
- struct HashTapeInfo *hash_tapeinfo; /* metadata for spill tapes */
+ struct LogicalTapeSet *hash_tapeset; /* tape set for hash spill tapes */
struct HashAggSpill *hash_spills; /* HashAggSpill for each grouping set,
* exists only during first pass */
TupleTableSlot *hash_spill_rslot; /* for reading spill files */
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index da5159e4c6c..97bc967902d 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -18,9 +18,13 @@
#include "storage/sharedfileset.h"
-/* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */
-
+/*
+ * LogicalTapeSet and LogicalTape are opaque types whose details are not
+ * known outside logtape.c.
+ */
typedef struct LogicalTapeSet LogicalTapeSet;
+typedef struct LogicalTape LogicalTape;
+
/*
* The approach tuplesort.c takes to parallel external sorts is that workers,
@@ -54,27 +58,20 @@ typedef struct TapeShare
* prototypes for functions in logtape.c
*/
-extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes, bool preallocate,
- TapeShare *shared,
+extern LogicalTapeSet *LogicalTapeSetCreate(bool preallocate,
SharedFileSet *fileset, int worker);
+extern void LogicalTapeClose(LogicalTape *lt);
extern void LogicalTapeSetClose(LogicalTapeSet *lts);
+extern LogicalTape *LogicalTapeCreate(LogicalTapeSet *lts);
+extern LogicalTape *LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared);
extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts);
-extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
- void *ptr, size_t size);
-extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
- void *ptr, size_t size);
-extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum,
- size_t buffer_size);
-extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum);
-extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum,
- TapeShare *share);
-extern void LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional);
-extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
- size_t size);
-extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
- long blocknum, int offset);
-extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
- long *blocknum, int *offset);
+extern size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size);
+extern void LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size);
+extern void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size);
+extern void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share);
+extern size_t LogicalTapeBackspace(LogicalTape *lt, size_t size);
+extern void LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset);
+extern void LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset);
extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
#endif /* LOGTAPE_H */
--
2.20.1
v2-0002-Replace-polyphase-merge-algorithm-with-a-simple-b.patchtext/x-patch; charset=UTF-8; name=v2-0002-Replace-polyphase-merge-algorithm-with-a-simple-b.patchDownload
From 75fedd54dd3a93026e63b03105db552327583d26 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Thu, 22 Oct 2020 14:34:48 +0300
Subject: [PATCH v2 2/2] Replace polyphase merge algorithm with a simple
balanced k-way merge.
The advantage of polyphase merge is that it can reuse the input tapes as
output tapes efficiently, but that is irrelevant on modern hardware, when
we can easily emulate any number of tape drives. The number of input tapes
we can/should use during merging is limited by work_mem, but output tapes
that we are not currently writing to only cost a little bit of memory, so
there is no need to skimp on them.
This makes sorts that need multiple merge passes faster.
---
src/backend/utils/sort/tuplesort.c | 662 +++++++++++++----------------
1 file changed, 293 insertions(+), 369 deletions(-)
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 07b6c92f54d..3a6c97f8f53 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -10,14 +10,22 @@
* amounts are sorted using temporary files and a standard external sort
* algorithm.
*
- * See Knuth, volume 3, for more than you want to know about the external
- * sorting algorithm. Historically, we divided the input into sorted runs
- * using replacement selection, in the form of a priority tree implemented
- * as a heap (essentially his Algorithm 5.2.3H), but now we always use
- * quicksort for run generation. We merge the runs using polyphase merge,
- * Knuth's Algorithm 5.4.2D. The logical "tapes" used by Algorithm D are
- * implemented by logtape.c, which avoids space wastage by recycling disk
- * space as soon as each block is read from its "tape".
+ * See Knuth, volume 3, for more than you want to know about external
+ * sorting algorithms. The algorithm we use is a balanced k-way merge.
+ * Before PostgreSQL 14, we used the polyphase merge algorithm (Knuth's
+ * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
+ * merge is better. Knuth is assuming that tape drives are expensive
+ * beasts, and in particular that there will always be many more runs than
+ * tape drives. The polyphase merge algorithm was good at keeping all the
+ * tape drives busy, but in our implementation a "tape drive" doesn't cost
+ * much more than a few Kb of memory buffers, so we can afford to have
+ * lots of them. In particular, if we can have as many tape drives as
+ * sorted runs, we can eliminate any repeated I/O at all.
+ *
+ * Historically, we divided the input into sorted runs using replacement
+ * selection, in the form of a priority tree implemented as a heap
+ * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
+ * for run generation.
*
* The approximate amount of memory allowed for any one sort operation
* is specified in kilobytes by the caller (most pass work_mem). Initially,
@@ -27,9 +35,11 @@
* tuples just by scanning the tuple array sequentially. If we do exceed
* workMem, we begin to emit tuples into sorted runs in temporary tapes.
* When tuples are dumped in batch after quicksorting, we begin a new run
- * with a new output tape (selected per Algorithm D). After the end of the
- * input is reached, we dump out remaining tuples in memory into a final run,
- * then merge the runs using Algorithm D.
+ * with a new output tape. If we reach the max number of tapes, we write
+ * subsequent runs on the existing tapes in a round-robin fashion. We will
+ * need multiple merge passes to finish the merge in that case. After the
+ * end of the input is reached, we dump out remaining tuples in memory into
+ * a final run, then merge the runs.
*
* When merging runs, we use a heap containing just the frontmost tuple from
* each source run; we repeatedly output the smallest tuple and replace it
@@ -52,6 +62,14 @@
* accesses. The pre-reading is handled by logtape.c, we just tell it how
* much memory to use for the buffers.
*
+ * In the current code we determine the number of input tapes M on the basis
+ * of workMem: we want workMem/M to be large enough that we read a fair
+ * amount of data each time we read from a tape, so as to maintain the
+ * locality of access described above. Nonetheless, with large workMem we
+ * can have many tapes. The logical "tapes" are implemented by logtape.c,
+ * which avoids space wastage by recycling disk space as soon as each block
+ * is read from its "tape".
+ *
* When the caller requests random access to the sort result, we form
* the final sorted run on a logical tape which is then "frozen", so
* that we can access it randomly. When the caller does not need random
@@ -60,20 +78,6 @@
* on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
* saves one cycle of writing all the data out to disk and reading it in.
*
- * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
- * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
- * to Knuth's figure 70 (section 5.4.2). However, Knuth is assuming that
- * tape drives are expensive beasts, and in particular that there will always
- * be many more runs than tape drives. In our implementation a "tape drive"
- * doesn't cost much more than a few Kb of memory buffers, so we can afford
- * to have lots of them. In particular, if we can have as many tape drives
- * as sorted runs, we can eliminate any repeated I/O at all. In the current
- * code we determine the number of tapes M on the basis of workMem: we want
- * workMem/M to be large enough that we read a fair amount of data each time
- * we preread from a tape, so as to maintain the locality of access described
- * above. Nonetheless, with large workMem we can have many tapes (but not
- * too many -- see the comments in tuplesort_merge_order).
- *
* This module supports parallel sorting. Parallel sorts involve coordination
* among one or more worker processes, and a leader process, each with its own
* tuplesort state. The leader process (or, more accurately, the
@@ -223,8 +227,9 @@ typedef enum
* worth of buffer space. This ignores the overhead of all the other data
* structures needed for each tape, but it's probably close enough.
*
- * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
- * tape during a preread cycle (see discussion at top of file).
+ * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
+ * input tape, for pre-reading (see discussion at top of file). This is *in
+ * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
*/
#define MINORDER 6 /* minimum merge order */
#define MAXORDER 500 /* maximum merge order */
@@ -249,8 +254,8 @@ struct Tuplesortstate
bool tuples; /* Can SortTuple.tuple ever be set? */
int64 availMem; /* remaining memory available, in bytes */
int64 allowedMem; /* total memory allowed, in bytes */
- int maxTapes; /* number of tapes (Knuth's T) */
- int tapeRange; /* maxTapes-1 (Knuth's P) */
+ int maxTapes; /* max number of input tapes to merge in each
+ * pass */
int64 maxSpace; /* maximum amount of space occupied among sort
* of groups, either in-memory or on-disk */
bool isMaxSpaceDisk; /* true when maxSpace is value for on-disk
@@ -262,7 +267,6 @@ struct Tuplesortstate
MemoryContext sortcontext; /* memory context holding most sort data */
MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
- LogicalTape **tapes;
/*
* These function pointers decouple the routines that must know what kind
@@ -347,8 +351,8 @@ struct Tuplesortstate
char *slabMemoryEnd; /* end of slab memory arena */
SlabSlot *slabFreeHead; /* head of free list */
- /* Buffer size to use for reading input tapes, during merge. */
- size_t read_buffer_size;
+ /* Memory used for input and output tape buffers. */
+ size_t tape_buffer_mem;
/*
* When we return a tuple to the caller in tuplesort_gettuple_XXX, that
@@ -365,36 +369,29 @@ struct Tuplesortstate
int currentRun;
/*
- * Unless otherwise noted, all pointer variables below are pointers to
- * arrays of length maxTapes, holding per-tape data.
+ * Logical tapes, for merging.
+ *
+ * The initial runs are written in the output tapes. In each merge pass,
+ * the output tapes of the previous pass become the input tapes, and new
+ * output tapes are created as needed. When nInputTapes equals
+ * nInputRuns, there is only one merge pass left.
*/
+ LogicalTape **inputTapes;
+ int nInputTapes;
+ int nInputRuns;
- /*
- * This variable is only used during merge passes. mergeactive[i] is true
- * if we are reading an input run from (actual) tape number i and have not
- * yet exhausted that run.
- */
- bool *mergeactive; /* active input run source? */
+ LogicalTape **outputTapes;
+ int nOutputTapes;
+ int nOutputRuns;
- /*
- * Variables for Algorithm D. Note that destTape is a "logical" tape
- * number, ie, an index into the tp_xxx[] arrays. Be careful to keep
- * "logical" and "actual" tape numbers straight!
- */
- int Level; /* Knuth's l */
- int destTape; /* current output tape (Knuth's j, less 1) */
- int *tp_fib; /* Target Fibonacci run counts (A[]) */
- int *tp_runs; /* # of real runs on each tape */
- int *tp_dummy; /* # of dummy runs for each tape (D[]) */
- int *tp_tapenum; /* Actual tape numbers (TAPE[]) */
- int activeTapes; /* # of active input tapes in merge pass */
+ LogicalTape *destTape; /* current output tape */
/*
* These variables are used after completion of sorting to keep track of
* the next tuple to return. (In the tape case, the tape's current read
* position is also critical state.)
*/
- LogicalTape *result_tape; /* tape of finished output */
+ LogicalTape *result_tape; /* actual tape of finished output */
int current; /* array index (only used if SORTEDINMEM) */
bool eof_reached; /* reached EOF (needed for cursors) */
@@ -415,8 +412,9 @@ struct Tuplesortstate
*
* nParticipants is the number of worker Tuplesortstates known by the
* leader to have actually been launched, which implies that they must
- * finish a run leader can merge. Typically includes a worker state held
- * by the leader process itself. Set in the leader Tuplesortstate only.
+ * finish a run that the leader needs to merge. Typically includes a
+ * worker state held by the leader process itself. Set in the leader
+ * Tuplesortstate only.
*/
int worker;
Sharedsort *shared;
@@ -620,7 +618,7 @@ static void init_slab_allocator(Tuplesortstate *state, int numSlots);
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state);
-static bool mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup);
+static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
static void dumptuples(Tuplesortstate *state, bool alltuples);
static void make_bounded_heap(Tuplesortstate *state);
static void sort_bounded_heap(Tuplesortstate *state);
@@ -866,8 +864,8 @@ tuplesort_begin_batch(Tuplesortstate *state)
state->currentRun = 0;
/*
- * maxTapes, tapeRange, and Algorithm D variables will be initialized by
- * inittapes(), if needed
+ * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
+ * inittapes(), if needed.
*/
state->result_tape = NULL; /* flag that result tape has not been formed */
@@ -1389,6 +1387,10 @@ tuplesort_free(Tuplesortstate *state)
*
* Note: want to include this in reported total cost of sort, hence need
* for two #ifdef TRACE_SORT sections.
+ *
+ * We don't bother to destroy the individual tapes here. They will go away
+ * with the sortcontext. (In TSS_FINALMERGE state, we have closed
+ * finished tapes already.)
*/
if (state->tapeset)
LogicalTapeSetClose(state->tapeset);
@@ -2111,7 +2113,7 @@ tuplesort_performsort(Tuplesortstate *state)
{
if (state->status == TSS_FINALMERGE)
elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
- state->worker, state->activeTapes,
+ state->worker, state->nInputTapes,
pg_rusage_show(&state->ru_start));
else
elog(LOG, "performsort of worker %d done: %s",
@@ -2319,7 +2321,8 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
*/
if (state->memtupcount > 0)
{
- int srcTape = state->memtuples[0].srctape;
+ int srcTapeIndex = state->memtuples[0].srctape;
+ LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
SortTuple newtup;
*stup = state->memtuples[0];
@@ -2341,15 +2344,16 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
* Remove the top node from the heap.
*/
tuplesort_heap_delete_top(state);
+ state->nInputRuns--;
/*
* Close the tape. It'd go away at the end of the sort
* anyway, but better to release the memory early.
*/
- LogicalTapeClose(state->tapes[srcTape]);
+ LogicalTapeClose(srcTape);
return true;
}
- newtup.srctape = srcTape;
+ newtup.srctape = srcTapeIndex;
tuplesort_heap_replace_top(state, &newtup);
return true;
}
@@ -2580,18 +2584,29 @@ tuplesort_merge_order(int64 allowedMem)
{
int mOrder;
- /*
- * We need one tape for each merge input, plus another one for the output,
- * and each of these tapes needs buffer space. In addition we want
- * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
- * count).
+ /*----------
+ * In the merge phase, we need buffer space for each input and output tape.
+ * Each pass in the balanced merge algorithm reads from M input tapes, and
+ * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
+ * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
+ * input tape.
+ *
+ * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
+ * N * TAPE_BUFFER_OVERHEAD
+ *
+ * Except for the last and next-to-last merge passes, where there can be
+ * fewer tapes left to process, M = N. We choose M so that we have the
+ * desired amount of memory available for the input buffers
+ * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
+ * available for the tape buffers (allowedMem).
*
* Note: you might be thinking we need to account for the memtuples[]
* array in this calculation, but we effectively treat that as part of the
* MERGE_BUFFER_SIZE workspace.
+ *----------
*/
- mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
- (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
+ mOrder = allowedMem /
+ (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
/*
* Even in minimum memory, use at least a MINORDER merge. On the other
@@ -2601,7 +2616,7 @@ tuplesort_merge_order(int64 allowedMem)
* which in turn can cause the same sort to need more runs, which makes
* merging slower even if it can still be done in a single pass. Also,
* high order merges are quite slow due to CPU cache effects; it can be
- * faster to pay the I/O cost of a polyphase merge than to perform a
+ * faster to pay the I/O cost of a multi-pass merge than to perform a
* single merge pass across many hundreds of tapes.
*/
mOrder = Max(mOrder, MINORDER);
@@ -2610,6 +2625,42 @@ tuplesort_merge_order(int64 allowedMem)
return mOrder;
}
+/*
+ * Helper function to calculate how much memory to allocate for the read buffer
+ * of each input tape in a merge pass.
+ *
+ * 'avail_mem' is the amount of memory available for the buffers of all the
+ * tapes, both input and output.
+ * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
+ * 'maxOutputTapes' is the max. number of output tapes we should produce.
+ */
+static int64
+merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
+ int maxOutputTapes)
+{
+ int nOutputRuns;
+ int nOutputTapes;
+
+ /*
+ * How many output tapes will we produce in this pass?
+ *
+ * This is nInputRuns / nInputTapes, rounded up.
+ */
+ nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
+
+ nOutputTapes = Min(nOutputRuns, maxOutputTapes);
+
+ /*
+ * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
+ * remaining memory is divided evenly between the input tapes.
+ *
+ * This also follows from the formula in tuplesort_merge_order, but here
+ * we derive the input buffer size from the amount of memory available,
+ * and M and N.
+ */
+ return (avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes;
+}
+
/*
* inittapes - initialize for tape sorting.
*
@@ -2618,58 +2669,49 @@ tuplesort_merge_order(int64 allowedMem)
static void
inittapes(Tuplesortstate *state, bool mergeruns)
{
- int maxTapes,
- j;
-
Assert(!LEADER(state));
if (mergeruns)
{
- /* Compute number of tapes to use: merge order plus 1 */
- maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+ /* Compute number of input tapes to use when merging */
+ state->maxTapes = tuplesort_merge_order(state->allowedMem);
}
else
{
/* Workers can sometimes produce single run, output without merge */
Assert(WORKER(state));
- maxTapes = MINORDER + 1;
+ state->maxTapes = MINORDER;
}
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "worker %d switching to external sort with %d tapes: %s",
- state->worker, maxTapes, pg_rusage_show(&state->ru_start));
+ state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
#endif
- /* Create the tape set and allocate the per-tape data arrays */
- inittapestate(state, maxTapes);
+ /* Create the tape set */
+ inittapestate(state, state->maxTapes);
state->tapeset =
LogicalTapeSetCreate(false,
state->shared ? &state->shared->fileset : NULL,
state->worker);
- state->tapes = palloc(maxTapes * sizeof(LogicalTape *));
- for (j = 0; j < maxTapes; j++)
- state->tapes[j] = LogicalTapeCreate(state->tapeset);
state->currentRun = 0;
/*
- * Initialize variables of Algorithm D (step D1).
+ * Initialize logical tape arrays.
*/
- for (j = 0; j < maxTapes; j++)
- {
- state->tp_fib[j] = 1;
- state->tp_runs[j] = 0;
- state->tp_dummy[j] = 1;
- state->tp_tapenum[j] = j;
- }
- state->tp_fib[state->tapeRange] = 0;
- state->tp_dummy[state->tapeRange] = 0;
+ state->inputTapes = NULL;
+ state->nInputTapes = 0;
+ state->nInputRuns = 0;
- state->Level = 1;
- state->destTape = 0;
+ state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
+ state->nOutputTapes = 0;
+ state->nOutputRuns = 0;
state->status = TSS_BUILDRUNS;
+
+ selectnewtape(state);
}
/*
@@ -2700,52 +2742,37 @@ inittapestate(Tuplesortstate *state, int maxTapes)
* called already, but it doesn't matter if it is called a second time.
*/
PrepareTempTablespaces();
-
- state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
- state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
-
- /* Record # of tapes allocated (for duration of sort) */
- state->maxTapes = maxTapes;
- /* Record maximum # of tapes usable as inputs when merging */
- state->tapeRange = maxTapes - 1;
}
/*
- * selectnewtape -- select new tape for new initial run.
+ * selectnewtape -- select next tape to output to.
*
* This is called after finishing a run when we know another run
- * must be started. This implements steps D3, D4 of Algorithm D.
+ * must be started. This is used both when building the initial
+ * runs, and during merge passes.
*/
static void
selectnewtape(Tuplesortstate *state)
{
- int j;
- int a;
-
- /* Step D3: advance j (destTape) */
- if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
- {
- state->destTape++;
- return;
- }
- if (state->tp_dummy[state->destTape] != 0)
+ if (state->nOutputRuns < state->maxTapes)
{
- state->destTape = 0;
- return;
+ /* Create a new tape to hold the next run */
+ Assert(state->outputTapes[state->nOutputRuns] == NULL);
+ Assert(state->nOutputRuns == state->nOutputTapes);
+ state->destTape = LogicalTapeCreate(state->tapeset);
+ state->outputTapes[state->nOutputRuns] = state->destTape;
+ state->nOutputTapes++;
+ state->nOutputRuns++;
}
-
- /* Step D4: increase level */
- state->Level++;
- a = state->tp_fib[0];
- for (j = 0; j < state->tapeRange; j++)
+ else
{
- state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
- state->tp_fib[j] = a + state->tp_fib[j + 1];
+ /*
+ * We have reached the max number of tapes. Append to an existing
+ * tape.
+ */
+ state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
+ state->nOutputRuns++;
}
- state->destTape = 0;
}
/*
@@ -2784,18 +2811,13 @@ init_slab_allocator(Tuplesortstate *state, int numSlots)
/*
* mergeruns -- merge all the completed initial runs.
*
- * This implements steps D5, D6 of Algorithm D. All input data has
+ * This implements the Balanced k-Way Merge Algorithm. All input data has
* already been written to initial runs on tape (see dumptuples).
*/
static void
mergeruns(Tuplesortstate *state)
{
- int tapenum,
- svTape,
- svRuns,
- svDummy;
- int numTapes;
- int numInputTapes;
+ int tapenum;
Assert(state->status == TSS_BUILDRUNS);
Assert(state->memtupcount == 0);
@@ -2830,99 +2852,111 @@ mergeruns(Tuplesortstate *state)
pfree(state->memtuples);
state->memtuples = NULL;
- /*
- * If we had fewer runs than tapes, refund the memory that we imagined we
- * would need for the tape buffers of the unused tapes.
- *
- * numTapes and numInputTapes reflect the actual number of tapes we will
- * use. Note that the output tape's tape number is maxTapes - 1, so the
- * tape numbers of the used tapes are not consecutive, and you cannot just
- * loop from 0 to numTapes to visit all used tapes!
- */
- if (state->Level == 1)
- {
- numInputTapes = state->currentRun;
- numTapes = numInputTapes + 1;
- FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
- }
- else
- {
- numInputTapes = state->tapeRange;
- numTapes = state->maxTapes;
- }
-
/*
* Initialize the slab allocator. We need one slab slot per input tape,
* for the tuples in the heap, plus one to hold the tuple last returned
* from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
* however, we don't need to do allocate anything.)
*
+ * In a multi-pass merge, we could shrink this allocation for the last
+ * merge pass, if it has fewer tapes than previous passes, but we don't
+ * bother.
+ *
* From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
* to track memory usage of individual tuples.
*/
if (state->tuples)
- init_slab_allocator(state, numInputTapes + 1);
+ init_slab_allocator(state, state->nOutputTapes + 1);
else
init_slab_allocator(state, 0);
/*
* Allocate a new 'memtuples' array, for the heap. It will hold one tuple
* from each input tape.
+ *
+ * We could shrink this, too, between passes in a multi-pass merge, but we
+ * don't bother. (The initial input tapes are still in outputTapes. The
+ * number of input tapes will not increase between passes.)
*/
- state->memtupsize = numInputTapes;
+ state->memtupsize = state->nOutputTapes;
state->memtuples = (SortTuple *) MemoryContextAlloc(state->maincontext,
- numInputTapes * sizeof(SortTuple));
+ state->nOutputTapes * sizeof(SortTuple));
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
/*
- * Use all the remaining memory we have available for read buffers among
- * the input tapes.
- *
- * We don't try to "rebalance" the memory among tapes, when we start a new
- * merge phase, even if some tapes are inactive in the new phase. That
- * would be hard, because logtape.c doesn't know where one run ends and
- * another begins. When a new merge phase begins, and a tape doesn't
- * participate in it, its buffer nevertheless already contains tuples from
- * the next run on same tape, so we cannot release the buffer. That's OK
- * in practice, merge performance isn't that sensitive to the amount of
- * buffers used, and most merge phases use all or almost all tapes,
- * anyway.
+ * Use all the remaining memory we have available for tape buffers among
+ * all the input tapes. At the beginning of each merge pass, we will
+ * divide this memory between the input and output tapes in the pass.
*/
+ state->tape_buffer_mem = state->availMem;
+ USEMEM(state, state->availMem);
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
- state->worker, state->availMem / 1024, numInputTapes);
+ elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for tape buffers",
+ state->worker, state->tape_buffer_mem / 1024);
#endif
- state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
- USEMEM(state, state->read_buffer_size * numInputTapes);
-
- /* End of step D2: rewind all output tapes to prepare for merging */
- for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
- LogicalTapeRewindForRead(state->tapes[tapenum], state->read_buffer_size);
-
for (;;)
{
/*
- * At this point we know that tape[T] is empty. If there's just one
- * (real or dummy) run left on each input tape, then only one merge
- * pass remains. If we don't have to produce a materialized sorted
- * tape, we can stop at this point and do the final merge on-the-fly.
+ * On the first iteration, or if we have read all the runs from the
+ * input tapes in a multi-pass merge, it's time to start a new pass.
+ * Rewind all the output tapes, and make them inputs for the next
+ * pass.
*/
- if (!state->randomAccess && !WORKER(state))
+ if (state->nInputRuns == 0 && !WORKER(state))
{
- bool allOneRun = true;
+ int64 input_buffer_size;
- Assert(state->tp_runs[state->tapeRange] == 0);
- for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
+ /* Close the old, emptied, input tapes */
+ if (state->nInputTapes > 0)
{
- if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
- {
- allOneRun = false;
- break;
- }
+ for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+ LogicalTapeClose(state->inputTapes[tapenum]);
+ pfree(state->inputTapes);
}
- if (allOneRun)
+
+ /* Previous pass's outputs become next pass's inputs. */
+ state->inputTapes = state->outputTapes;
+ state->nInputTapes = state->nOutputTapes;
+ state->nInputRuns = state->nOutputRuns;
+
+ /*
+ * Reset output tape variables. The actual LogicalTapes will be
+ * created as needed, here we only allocate the array to hold
+ * them.
+ */
+ state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
+ state->nOutputTapes = 0;
+ state->nOutputRuns = 0;
+
+ /*
+ * Redistribute the memory allocated for tape buffers, among the
+ * new input and output tapes.
+ */
+ input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
+ state->nInputTapes,
+ state->nInputRuns,
+ state->maxTapes);
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
+ state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
+ pg_rusage_show(&state->ru_start));
+#endif
+
+ /* Prepare the new input tapes for merge pass. */
+ for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+ LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
+
+ /*
+ * If there's just one run left on each input tape, then only one
+ * merge pass remains. If we don't have to produce a materialized
+ * sorted tape, we can stop at this point and do the final merge
+ * on-the-fly.
+ */
+ if (!state->randomAccess && state->nInputRuns <= state->nInputTapes)
{
/* Tell logtape.c we won't be writing anymore */
LogicalTapeSetForgetFreeSpace(state->tapeset);
@@ -2933,103 +2967,47 @@ mergeruns(Tuplesortstate *state)
}
}
- /* Step D5: merge runs onto tape[T] until tape[P] is empty */
- while (state->tp_runs[state->tapeRange - 1] ||
- state->tp_dummy[state->tapeRange - 1])
- {
- bool allDummy = true;
-
- for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
- {
- if (state->tp_dummy[tapenum] == 0)
- {
- allDummy = false;
- break;
- }
- }
-
- if (allDummy)
- {
- state->tp_dummy[state->tapeRange]++;
- for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
- state->tp_dummy[tapenum]--;
- }
- else
- mergeonerun(state);
- }
-
- /* Step D6: decrease level */
- if (--state->Level == 0)
- break;
-
- /* rewind output tape T to use as new input */
- LogicalTapeRewindForRead(state->tapes[state->tp_tapenum[state->tapeRange]],
- state->read_buffer_size);
+ /* Select an output tape */
+ selectnewtape(state);
- /* close used-up input tape P, and create a new one for write pass */
- LogicalTapeClose(state->tapes[state->tp_tapenum[state->tapeRange - 1]]);
- state->tapes[state->tp_tapenum[state->tapeRange - 1]] = LogicalTapeCreate(state->tapeset);
- state->tp_runs[state->tapeRange - 1] = 0;
+ /* Merge one run from each input tape. */
+ mergeonerun(state);
/*
- * reassign tape units per step D6; note we no longer care about A[]
+ * If the input tapes are empty, and we output only one output run,
+ * we're done. The current output tape contains the final result.
*/
- svTape = state->tp_tapenum[state->tapeRange];
- svDummy = state->tp_dummy[state->tapeRange];
- svRuns = state->tp_runs[state->tapeRange];
- for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
- {
- state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
- state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
- state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
- }
- state->tp_tapenum[0] = svTape;
- state->tp_dummy[0] = svDummy;
- state->tp_runs[0] = svRuns;
+ if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
+ break;
}
/*
- * Done. Knuth says that the result is on TAPE[1], but since we exited
- * the loop without performing the last iteration of step D6, we have not
- * rearranged the tape unit assignment, and therefore the result is on
- * TAPE[T]. We need to do it this way so that we can freeze the final
- * output tape while rewinding it. The last iteration of step D6 would be
- * a waste of cycles anyway...
+ * Done. The result is on a single run on a single tape.
*/
- state->result_tape = state->tapes[state->tp_tapenum[state->tapeRange]];
+ state->result_tape = state->outputTapes[0];
if (!WORKER(state))
LogicalTapeFreeze(state->result_tape, NULL);
else
worker_freeze_result_tape(state);
state->status = TSS_SORTEDONTAPE;
- /* Close all the other tapes, to release their read buffers. */
- for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
- {
- if (state->tapes[tapenum] != state->result_tape)
- {
- LogicalTapeClose(state->tapes[tapenum]);
- state->tapes[tapenum] = NULL;
- }
- }
+ /* Close all the now-empty input tapes, to release their read buffers. */
+ for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+ LogicalTapeClose(state->inputTapes[tapenum]);
}
/*
- * Merge one run from each input tape, except ones with dummy runs.
- *
- * This is the inner loop of Algorithm D step D5. We know that the
- * output tape is TAPE[T].
+ * Merge one run from each input tape.
*/
static void
mergeonerun(Tuplesortstate *state)
{
- int destTapeNum = state->tp_tapenum[state->tapeRange];
- LogicalTape *destTape = state->tapes[destTapeNum];
- int srcTape;
+ int srcTapeIndex;
+ LogicalTape *srcTape;
/*
* Start the merge by loading one tuple from each active source tape into
- * the heap. We can also decrease the input run/dummy run counts.
+ * the heap.
*/
beginmerge(state);
@@ -3043,8 +3021,9 @@ mergeonerun(Tuplesortstate *state)
SortTuple stup;
/* write the tuple to destTape */
- srcTape = state->memtuples[0].srctape;
- WRITETUP(state, destTape, &state->memtuples[0]);
+ srcTapeIndex = state->memtuples[0].srctape;
+ srcTape = state->inputTapes[srcTapeIndex];
+ WRITETUP(state, state->destTape, &state->memtuples[0]);
/* recycle the slot of the tuple we just wrote out, for the next read */
if (state->memtuples[0].tuple)
@@ -3056,72 +3035,47 @@ mergeonerun(Tuplesortstate *state)
*/
if (mergereadnext(state, srcTape, &stup))
{
- stup.srctape = srcTape;
+ stup.srctape = srcTapeIndex;
tuplesort_heap_replace_top(state, &stup);
+
}
else
+ {
tuplesort_heap_delete_top(state);
+ state->nInputRuns--;
+ }
}
/*
* When the heap empties, we're done. Write an end-of-run marker on the
- * output tape, and increment its count of real runs.
+ * output tape.
*/
- markrunend(destTape);
- state->tp_runs[state->tapeRange]++;
-
-#ifdef TRACE_SORT
- if (trace_sort)
- elog(LOG, "worker %d finished %d-way merge step: %s", state->worker,
- state->activeTapes, pg_rusage_show(&state->ru_start));
-#endif
+ markrunend(state->destTape);
}
/*
* beginmerge - initialize for a merge pass
*
- * We decrease the counts of real and dummy runs for each tape, and mark
- * which tapes contain active input runs in mergeactive[]. Then, fill the
- * merge heap with the first tuple from each active tape.
+ * Fill the merge heap with the first tuple from each input tape.
*/
static void
beginmerge(Tuplesortstate *state)
{
int activeTapes;
- int tapenum;
- int srcTape;
+ int srcTapeIndex;
/* Heap should be empty here */
Assert(state->memtupcount == 0);
- /* Adjust run counts and mark the active tapes */
- memset(state->mergeactive, 0,
- state->maxTapes * sizeof(*state->mergeactive));
- activeTapes = 0;
- for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
- {
- if (state->tp_dummy[tapenum] > 0)
- state->tp_dummy[tapenum]--;
- else
- {
- Assert(state->tp_runs[tapenum] > 0);
- state->tp_runs[tapenum]--;
- srcTape = state->tp_tapenum[tapenum];
- state->mergeactive[srcTape] = true;
- activeTapes++;
- }
- }
- Assert(activeTapes > 0);
- state->activeTapes = activeTapes;
+ activeTapes = Min(state->nInputTapes, state->nInputRuns);
- /* Load the merge heap with the first tuple from each input tape */
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
+ for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
{
SortTuple tup;
- if (mergereadnext(state, srcTape, &tup))
+ if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
{
- tup.srctape = srcTape;
+ tup.srctape = srcTapeIndex;
tuplesort_heap_insert(state, &tup);
}
}
@@ -3133,20 +3087,13 @@ beginmerge(Tuplesortstate *state)
* Returns false on EOF.
*/
static bool
-mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup)
+mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
{
- LogicalTape *srcTape = state->tapes[srcTapeIndex];
unsigned int tuplen;
- if (!state->mergeactive[srcTapeIndex])
- return false; /* tape's run is already exhausted */
-
/* read next tuple, if any */
if ((tuplen = getlen(srcTape, true)) == 0)
- {
- state->mergeactive[srcTapeIndex] = false;
return false;
- }
READTUP(state, stup, srcTape, tuplen);
return true;
@@ -3161,7 +3108,6 @@ mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup)
static void
dumptuples(Tuplesortstate *state, bool alltuples)
{
- LogicalTape *destTape;
int memtupwrite;
int i;
@@ -3177,22 +3123,13 @@ dumptuples(Tuplesortstate *state, bool alltuples)
* Final call might require no sorting, in rare cases where we just so
* happen to have previously LACKMEM()'d at the point where exactly all
* remaining tuples are loaded into memory, just before input was
- * exhausted.
- *
- * In general, short final runs are quite possible. Rather than allowing
- * a special case where there was a superfluous selectnewtape() call (i.e.
- * a call with no subsequent run actually written to destTape), we prefer
- * to write out a 0 tuple run.
- *
- * mergereadnext() is prepared for 0 tuple runs, and will reliably mark
- * the tape inactive for the merge when called from beginmerge(). This
- * case is therefore similar to the case where mergeonerun() finds a dummy
- * run for the tape, and so doesn't need to merge a run from the tape (or
- * conceptually "merges" the dummy run, if you prefer). According to
- * Knuth, Algorithm D "isn't strictly optimal" in its method of
- * distribution and dummy run assignment; this edge case seems very
- * unlikely to make that appreciably worse.
+ * exhausted. In general, short final runs are quite possible, but avoid
+ * creating a completely empty run. In a worker, though, we must produce
+ * at least one tape, even if it's empty.
*/
+ if (state->memtupcount == 0 && state->currentRun > 0)
+ return;
+
Assert(state->status == TSS_BUILDRUNS);
/*
@@ -3205,6 +3142,9 @@ dumptuples(Tuplesortstate *state, bool alltuples)
errmsg("cannot have more than %d runs for an external sort",
INT_MAX)));
+ if (state->currentRun > 0)
+ selectnewtape(state);
+
state->currentRun++;
#ifdef TRACE_SORT
@@ -3228,10 +3168,9 @@ dumptuples(Tuplesortstate *state, bool alltuples)
#endif
memtupwrite = state->memtupcount;
- destTape = state->tapes[state->tp_tapenum[state->destTape]];
for (i = 0; i < memtupwrite; i++)
{
- WRITETUP(state, destTape, &state->memtuples[i]);
+ WRITETUP(state, state->destTape, &state->memtuples[i]);
state->memtupcount--;
}
@@ -3244,19 +3183,14 @@ dumptuples(Tuplesortstate *state, bool alltuples)
*/
MemoryContextReset(state->tuplecontext);
- markrunend(destTape);
- state->tp_runs[state->destTape]++;
- state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
+ markrunend(state->destTape);
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "worker %d finished writing run %d to tape %d: %s",
- state->worker, state->currentRun, state->destTape,
+ state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
pg_rusage_show(&state->ru_start));
#endif
-
- if (!alltuples)
- selectnewtape(state);
}
/*
@@ -4650,8 +4584,9 @@ worker_nomergeruns(Tuplesortstate *state)
{
Assert(WORKER(state));
Assert(state->result_tape == NULL);
+ Assert(state->nOutputRuns == 1);
- state->result_tape = state->tapes[state->tp_tapenum[state->destTape]];
+ state->result_tape = state->destTape;
worker_freeze_result_tape(state);
}
@@ -4688,47 +4623,36 @@ leader_takeover_tapes(Tuplesortstate *state)
* Create the tapeset from worker tapes, including a leader-owned tape at
* the end. Parallel workers are far more expensive than logical tapes,
* so the number of tapes allocated here should never be excessive.
- *
- * We still have a leader tape, though it's not possible to write to it
- * due to restrictions in the shared fileset infrastructure used by
- * logtape.c. It will never be written to in practice because
- * randomAccess is disallowed for parallel sorts.
*/
- inittapestate(state, nParticipants + 1);
- state->tapeset = LogicalTapeSetCreate(false,
- &shared->fileset,
- state->worker);
- state->tapes = palloc(state->maxTapes * sizeof(LogicalTape *));
- for (j = 0; j < nParticipants; j++)
- state->tapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
- /* tapes[nParticipants] represents the "leader tape", which is not used */
+ inittapestate(state, nParticipants);
+ state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
- /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
+ /*
+ * Set currentRun to reflect the number of runs we will merge (it's not
+ * used for anything, this is just pro forma)
+ */
state->currentRun = nParticipants;
/*
- * Initialize variables of Algorithm D to be consistent with runs from
- * workers having been generated in the leader.
+ * Initialize the state to look the same as after building the initial
+ * runs.
*
* There will always be exactly 1 run per worker, and exactly one input
* tape per run, because workers always output exactly 1 run, even when
* there were no input tuples for workers to sort.
*/
- for (j = 0; j < state->maxTapes; j++)
+ state->inputTapes = NULL;
+ state->nInputTapes = 0;
+ state->nInputRuns = 0;
+
+ state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
+ state->nOutputTapes = nParticipants;
+ state->nOutputRuns = nParticipants;
+
+ for (j = 0; j < nParticipants; j++)
{
- /* One real run; no dummy runs for worker tapes */
- state->tp_fib[j] = 1;
- state->tp_runs[j] = 1;
- state->tp_dummy[j] = 0;
- state->tp_tapenum[j] = j;
+ state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
}
- /* Leader tape gets one dummy run, and no real runs */
- state->tp_fib[state->tapeRange] = 0;
- state->tp_runs[state->tapeRange] = 0;
- state->tp_dummy[state->tapeRange] = 1;
-
- state->Level = 1;
- state->destTape = 0;
state->status = TSS_BUILDRUNS;
}
--
2.20.1
ordered_ints.pngimage/png; name=ordered_ints.pngDownload
�PNG
IHDR � � ,� 8PLTE��� ���� � ��� � ���@ �� Ai��� �@���0`�� @� ������**�� @�� 333MMMfff�������������������22�������U��������������� � d �"�".�W � �p � ���� ��� � �����P����E ��r��z�����k������� �� ���� �����P@Uk/� ��@�@��`��`��� ��@��@��`��p������������������������|�@�� ������������___???��� �se��O pHYs � ��+ lIDATx�����(Ee�������PD�
�0��3�IZ��u?*b�u
�e�I~e����l��e5�cR+�r|e^�r�-`T����n^������Vr�L+5Jh�_�q{������tm�������D�nP0aZ��>����I��l�j&Eg�y�g?�N�`�~=l����������v��[]Z�f��
����N�?�~��Z���� ��R��EJ�s6�^���L����n=�[�E-z\L���A�k����?�����V����DL�=�6�U������s��_��L��3m�5�&����V�pQ���3�h������V�����������������@@�<���v����b;��1�N���������u��������V��}f{��?T���{k�VKD�9�:za;�S�b�+�+�����<�������|�85�^��}�+%����4���
]~��u��3 �������#�!�ZB3,N���7���aVV����������j-��U�[
[_ ��
O+|����������K�+#�9_�������x��[/�Z~;�]����/�x�(��`;�2�pWs��Jh����`e�q�,�����F��0|�p��%�v��[
[�*)���������A�/���1* S��� �+t� �y\����xh��0 4�<��
\/ (�fBqs�`��7�� �4ru��� ���to�`G/ �������{���A��b�\u����x.B�f��9��p�K�w��(����W�OB����c������y������{��?+��j9�{[c;� �����oJ3���/�=R��8������h����5�_�lhK���Kq�5�=)�u��n�J���n�J���A��Gt�<l.����g��x*qP >