From 379580dc600c079b8de3cc2f392376ad46429758 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Thu, 8 Sep 2016 20:34:06 +0300
Subject: [PATCH 2/3] Use larger read buffers in logtape.

This makes the access pattern appear more sequential to the OS, making it
more likely that the OS will do read-head for use. It will also ensure that
there are more sequential blocks available when writing, because we can
free more blocks in the underlying file at once. Sequential I/O is much
cheaper than random I/O.

We used to do pre-reading from each tape, in tuplesort.c, for the same
reasons. But it seems simpler to do it in logtape.c, reading the raw data
into larger a buffer, than converting every tuple to SortTuple format when
pre-reading, like tuplesort.c used to do.
---
 src/backend/utils/sort/logtape.c   | 134 +++++++++++++++++++++++++++++++------
 src/backend/utils/sort/tuplesort.c |  35 +++++++++-
 src/include/utils/logtape.h        |   1 +
 3 files changed, 147 insertions(+), 23 deletions(-)

diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 7745207..05d7697 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -131,9 +131,12 @@ typedef struct LogicalTape
 	 * reading.
 	 */
 	char	   *buffer;			/* physical buffer (separately palloc'd) */
+	int			buffer_size;	/* allocated size of the buffer */
 	long		curBlockNumber; /* this block's logical blk# within tape */
 	int			pos;			/* next read/write position in buffer */
 	int			nbytes;			/* total # of valid bytes in buffer */
+
+	int			read_buffer_size;	/* buffer size to use when reading */
 } LogicalTape;
 
 /*
@@ -228,6 +231,53 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
 }
 
 /*
+ * Read as many blocks as we can into the per-tape buffer.
+ *
+ * The caller can specify the next physical block number to read, in
+ * datablocknum, or -1 to fetch the next block number from the internal block.
+ * If datablocknum == -1, the caller must've already set curBlockNumber.
+ *
+ * Returns true if anything was read, 'false' on EOF.
+ */
+static bool
+ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt, long datablocknum)
+{
+	lt->pos = 0;
+	lt->nbytes = 0;
+
+	do
+	{
+		/* Fetch next block number (unless provided by caller) */
+		if (datablocknum == -1)
+		{
+			datablocknum = ltsRecallNextBlockNum(lts, lt->indirect, lt->frozen);
+			if (datablocknum == -1L)
+				break;			/* EOF */
+			lt->curBlockNumber++;
+		}
+
+		/* Read the block */
+		ltsReadBlock(lts, datablocknum, (void *) (lt->buffer + lt->nbytes));
+		if (!lt->frozen)
+			ltsReleaseBlock(lts, datablocknum);
+
+		if (lt->curBlockNumber < lt->numFullBlocks)
+			lt->nbytes += BLCKSZ;
+		else
+		{
+			/* EOF */
+			lt->nbytes += lt->lastBlockBytes;
+			break;
+		}
+
+		/* Advance to next block, if we have buffer space left */
+		datablocknum = -1;
+	} while (lt->nbytes < lt->buffer_size);
+
+	return (lt->nbytes > 0);
+}
+
+/*
  * qsort comparator for sorting freeBlocks[] into decreasing order.
  */
 static int
@@ -546,6 +596,8 @@ LogicalTapeSetCreate(int ntapes)
 		lt->numFullBlocks = 0L;
 		lt->lastBlockBytes = 0;
 		lt->buffer = NULL;
+		lt->buffer_size = 0;
+		lt->read_buffer_size = BLCKSZ;
 		lt->curBlockNumber = 0L;
 		lt->pos = 0;
 		lt->nbytes = 0;
@@ -628,7 +680,10 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
 
 	/* Allocate data buffer and first indirect block on first write */
 	if (lt->buffer == NULL)
+	{
 		lt->buffer = (char *) palloc(BLCKSZ);
+		lt->buffer_size = BLCKSZ;
+	}
 	if (lt->indirect == NULL)
 	{
 		lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock));
@@ -636,6 +691,7 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
 		lt->indirect->nextup = NULL;
 	}
 
+	Assert(lt->buffer_size == BLCKSZ);
 	while (size > 0)
 	{
 		if (lt->pos >= BLCKSZ)
@@ -709,18 +765,19 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
 			Assert(lt->frozen);
 			datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
 		}
+
+		/* Allocate a read buffer */
+		if (lt->buffer)
+			pfree(lt->buffer);
+		lt->buffer = palloc(lt->read_buffer_size);
+		lt->buffer_size = lt->read_buffer_size;
+
 		/* Read the first block, or reset if tape is empty */
 		lt->curBlockNumber = 0L;
 		lt->pos = 0;
 		lt->nbytes = 0;
 		if (datablocknum != -1L)
-		{
-			ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
-			if (!lt->frozen)
-				ltsReleaseBlock(lts, datablocknum);
-			lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
-				BLCKSZ : lt->lastBlockBytes;
-		}
+			ltsReadFillBuffer(lts, lt, datablocknum);
 	}
 	else
 	{
@@ -754,6 +811,13 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
 		lt->curBlockNumber = 0L;
 		lt->pos = 0;
 		lt->nbytes = 0;
+
+		if (lt->buffer)
+		{
+			pfree(lt->buffer);
+			lt->buffer = NULL;
+			lt->buffer_size = 0;
+		}
 	}
 }
 
@@ -779,20 +843,8 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
 		if (lt->pos >= lt->nbytes)
 		{
 			/* Try to load more data into buffer. */
-			long		datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
-															 lt->frozen);
-
-			if (datablocknum == -1L)
+			if (!ltsReadFillBuffer(lts, lt, -1))
 				break;			/* EOF */
-			lt->curBlockNumber++;
-			lt->pos = 0;
-			ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
-			if (!lt->frozen)
-				ltsReleaseBlock(lts, datablocknum);
-			lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
-				BLCKSZ : lt->lastBlockBytes;
-			if (lt->nbytes <= 0)
-				break;			/* EOF (possible here?) */
 		}
 
 		nthistime = lt->nbytes - lt->pos;
@@ -842,6 +894,22 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
 	lt->writing = false;
 	lt->frozen = true;
 	datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
+
+	/*
+	 * The seek and backspace functions assume a single block read buffer.
+	 * That's OK with current usage. A larger buffer is helpful to make the
+	 * read pattern of the backing file look more sequential to the OS, when
+	 * we're reading from multiple tapes. But at the end of a sort, when a
+	 * tape is frozen, we only read from a single tape anyway.
+	 */
+	if (!lt->buffer || lt->buffer_size != BLCKSZ)
+	{
+		if (lt->buffer)
+			pfree(lt->buffer);
+		lt->buffer = palloc(BLCKSZ);
+		lt->buffer_size = BLCKSZ;
+	}
+
 	/* Read the first block, or reset if tape is empty */
 	lt->curBlockNumber = 0L;
 	lt->pos = 0;
@@ -875,6 +943,7 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
 	lt = &lts->tapes[tapenum];
 	Assert(lt->frozen);
+	Assert(lt->buffer_size == BLCKSZ);
 
 	/*
 	 * Easy case for seek within current block.
@@ -941,6 +1010,7 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
 	lt = &lts->tapes[tapenum];
 	Assert(lt->frozen);
 	Assert(offset >= 0 && offset <= BLCKSZ);
+	Assert(lt->buffer_size == BLCKSZ);
 
 	/*
 	 * Easy case for seek within current block.
@@ -1000,6 +1070,9 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
 {
 	LogicalTape *lt;
 
+	/* With a larger buffer, 'pos' wouldn't be the same as offset within page */
+	Assert(lt->buffer_size == BLCKSZ);
+
 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
 	lt = &lts->tapes[tapenum];
 	*blocknum = lt->curBlockNumber;
@@ -1014,3 +1087,24 @@ LogicalTapeSetBlocks(LogicalTapeSet *lts)
 {
 	return lts->nFileBlocks;
 }
+
+/*
+ * Set buffer size to use, when reading from given tape.
+ */
+void
+LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t avail_mem)
+{
+	LogicalTape *lt;
+
+	Assert(tapenum >= 0 && tapenum < lts->nTapes);
+	lt = &lts->tapes[tapenum];
+
+	/*
+	 * The buffer size must be a multiple of BLCKSZ in size, so round the
+	 * given value down to nearest BLCKSZ. Make sure we have at least one page.
+	 */
+	if (avail_mem < BLCKSZ)
+		avail_mem = BLCKSZ;
+	avail_mem -= avail_mem % BLCKSZ;
+	lt->read_buffer_size = avail_mem;
+}
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index b9fb99c..dc35fcf 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -2487,6 +2487,8 @@ mergeruns(Tuplesortstate *state)
 				svDummy;
 	char	   *p;
 	int			i;
+	int			per_tape, cutoff;
+	long		avail_blocks;
 
 	Assert(state->status == TSS_BUILDRUNS);
 	Assert(state->memtupcount == 0);
@@ -2535,15 +2537,17 @@ mergeruns(Tuplesortstate *state)
 	USEMEM(state, state->memtupsize * sizeof(SortTuple));
 
 	/*
-	 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism to
-	 * track memory usage.
+	 * If we had fewer runs than tapes, refund buffers for tapes that were never
+	 * allocated.
 	 */
-	state->batchUsed = true;
+	if (state->currentRun < state->maxTapes)
+		FREEMEM(state, (state->maxTapes - state->currentRun) * TAPE_BUFFER_OVERHEAD);
 
 	/* Initialize the merge tuple buffer arena.  */
 	state->batchMemoryBegin = palloc((state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE);
 	state->batchMemoryEnd = state->batchMemoryBegin + (state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE;
 	state->freeBufferHead = (MergeTupleBuffer *) state->batchMemoryBegin;
+	USEMEM(state, (state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE);
 
 	p = state->batchMemoryBegin;
 	for (i = 0; i < state->maxTapes; i++)
@@ -2553,6 +2557,31 @@ mergeruns(Tuplesortstate *state)
 	}
 	((MergeTupleBuffer *) p)->nextfree = NULL;
 
+	/*
+	 * Use all the spare memory we have available for read buffers. Divide it
+	 * memory evenly among all the tapes.
+	 */
+	avail_blocks = state->availMem / BLCKSZ;
+	per_tape = avail_blocks / state->maxTapes;
+	cutoff = avail_blocks % state->maxTapes;
+	if (per_tape == 0)
+	{
+		per_tape = 1;
+		cutoff = 0;
+	}
+	for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
+	{
+		LogicalTapeAssignReadBufferSize(state->tapeset, tapenum,
+										(per_tape + (tapenum < cutoff ? 1 : 0)) * BLCKSZ);
+	}
+	USEMEM(state, avail_blocks * BLCKSZ);
+
+	/*
+	 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism to
+	 * track memory usage of indivitual tuples.
+	 */
+	state->batchUsed = true;
+
 	/* End of step D2: rewind all output tapes to prepare for merging */
 	for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
 		LogicalTapeRewind(state->tapeset, tapenum, false);
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index fa1e992..03d0a6f 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -39,6 +39,7 @@ extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
 				long blocknum, int offset);
 extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
 				long *blocknum, int *offset);
+extern void LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t bufsize);
 extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
 
 #endif   /* LOGTAPE_H */
-- 
2.9.3

