From ad0efa2e2b14894aba4d5a5995bd8006304b3a72 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 21 Dec 2016 14:12:41 +0200
Subject: [PATCH 2/2] Pause/resume support.

When a tape is "paused", we write the final block to disk, and release the
buffer, but still allow writing to it again later. This saves memory
in the initial run building phase, as we can release the buffer of all
tapes except the one we're currently writing to, which frees up the memory
for the quicksort. This is particularly important because we used to
reserve the memory for maxTapes, i.e. the number of tapes that we might
want to use, not the actual number of tapes used.
---
 src/backend/utils/sort/logtape.c   | 69 +++++++++++++++++++++++++++++++++-----
 src/backend/utils/sort/tuplesort.c | 22 +++++++-----
 src/include/utils/logtape.h        |  1 +
 3 files changed, 76 insertions(+), 16 deletions(-)

diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 07d147dc47..1f540f05d1 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -119,6 +119,7 @@ typedef struct TapeBlockTrailer
 typedef struct LogicalTape
 {
 	bool		writing;		/* T while in write phase */
+	bool		paused;			/* T if the tape is paused */
 	bool		frozen;			/* T if blocks should not be freed when read */
 	bool		dirty;			/* does buffer need to be written? */
 
@@ -380,6 +381,7 @@ LogicalTapeSetCreate(int ntapes)
 		lt->writing = true;
 		lt->frozen = false;
 		lt->dirty = false;
+		lt->paused = false;
 		lt->firstBlockNumber = -1L;
 		lt->curBlockNumber = -1L;
 		lt->buffer = NULL;
@@ -446,6 +448,22 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
 	{
 		lt->buffer = (char *) palloc(BLCKSZ);
 		lt->buffer_size = BLCKSZ;
+
+		/* if the tape was paused, resume it now. */
+		if (lt->paused)
+		{
+			ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+
+			/* 'pos' and 'nbytes' should still be valid */
+			Assert(lt->nbytes == TapeBlockGetNBytes(lt->buffer));
+			lt->paused = false;
+		}
+		else
+		{
+			Assert(lt->curBlockNumber == -1);
+			lt->pos = 0;
+			lt->nbytes = 0;
+		}
 	}
 	if (lt->curBlockNumber == -1)
 	{
@@ -466,12 +484,6 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
 			/* Buffer full, dump it out */
 			long		nextBlockNumber;
 
-			if (!lt->dirty)
-			{
-				/* Hmm, went directly from reading to writing? */
-				elog(ERROR, "invalid logtape state: should be dirty");
-			}
-
 			/*
 			 * First allocate the next block, so that we can store it in the
 			 * 'next' pointer of this block.
@@ -547,7 +559,17 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
 		buffer_size -= buffer_size % BLCKSZ;
 	}
 
-	if (lt->writing)
+	if (lt->paused)
+	{
+		/*
+		 * A paused tape is flushed to disk already, we just have to change
+		 * the state in memory to indicate that we're reading it now, and
+		 * allocate a buffer for the reading.
+		 */
+		Assert(lt->writing);
+		lt->paused = false;
+	}
+	else if (lt->writing)
 	{
 		/*
 		 * Completion of a write phase.  Flush last partial data block, and
@@ -558,7 +580,6 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
 			TapeBlockSetNBytes(lt->buffer, lt->nbytes);
 			ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
 		}
-		lt->writing = false;
 	}
 	else
 	{
@@ -568,6 +589,7 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
 		 */
 		Assert(lt->frozen);
 	}
+	lt->writing = false;
 
 	/* Allocate a read buffer (unless the tape is empty) */
 	if (lt->buffer)
@@ -617,6 +639,36 @@ LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
 }
 
 /*
+ * Pause writing to a tape.
+ *
+ * This writes the last partial data block to disk, and releases the memory
+ * allocated for the write buffer, but does not rewind it.  If you you call
+ * LogicalTapeWrite() on a paused tape, it will be un-paused implicitly,
+ * and the last partial block is read back into memory.
+ */
+void
+LogicalTapePause(LogicalTapeSet *lts, int tapenum)
+{
+	LogicalTape *lt;
+
+	Assert(tapenum >= 0 && tapenum < lts->nTapes);
+	lt = &lts->tapes[tapenum];
+
+	Assert(lt->writing);
+
+	/* Flush last partial data block. */
+	TapeBlockSetNBytes(lt->buffer, lt->nbytes);
+	ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+	lt->dirty = false;
+
+	pfree(lt->buffer);
+	lt->buffer = NULL;
+
+	lt->paused = true;
+	/* Note: leave 'pos' and 'nbytes' untouched */
+}
+
+/*
  * Read from a logical tape.
  *
  * Early EOF is indicated by return value less than #bytes requested.
@@ -689,6 +741,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
 		lt->writing = false;
 	}
 	lt->writing = false;
+	lt->paused = false;
 	lt->frozen = true;
 
 	/*
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 54c9eb58cc..9295ac6a34 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -2363,15 +2363,15 @@ inittapes(Tuplesortstate *state)
 #endif
 
 	/*
-	 * Decrease availMem to reflect the space needed for tape buffers, when
-	 * writing the initial runs; but don't decrease it to the point that we
-	 * have no room for tuples.  (That case is only likely to occur if sorting
-	 * pass-by-value Datums; in all other scenarios the memtuples[] array is
-	 * unlikely to occupy more than half of allowedMem.  In the pass-by-value
-	 * case it's not important to account for tuple space, so we don't care if
-	 * LACKMEM becomes inaccurate.)
+	 * Decrease availMem to reflect the space needed for tape buffer of the
+	 * output tape, when writing the initial runs; but don't decrease it to
+	 * the point that we have no room for tuples.  (That case is only likely
+	 * to occur if sorting pass-by-value Datums; in all other scenarios the
+	 * memtuples[] array is unlikely to occupy more than half of allowedMem.
+	 * In the pass-by-value case it's not important to account for tuple
+	 * space, so we don't care if LACKMEM becomes inaccurate.)
 	 */
-	tapeSpace = (int64) maxTapes *TAPE_BUFFER_OVERHEAD;
+	tapeSpace = (int64) TAPE_BUFFER_OVERHEAD;
 
 	if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
 		USEMEM(state, tapeSpace);
@@ -2463,6 +2463,12 @@ selectnewtape(Tuplesortstate *state)
 	int			j;
 	int			a;
 
+	/*
+	 * Pause the old tape, to release the memory that was used for its buffer,
+	 * so that we can use it for building the next run.
+	 */
+	LogicalTapePause(state->tapeset, state->destTape);
+
 	/* Step D3: advance j (destTape) */
 	if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
 	{
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index f580aade53..3a51cfbed9 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -34,6 +34,7 @@ extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
 extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum,
 						 size_t buffer_size);
 extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum);
+extern void LogicalTapePause(LogicalTapeSet *lts, int tapenum);
 extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum);
 extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
 					 size_t size);
-- 
2.11.0

