From d4a86ee55c0a1d4e77a5de06df9f625e37863371 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 6 Apr 2024 13:28:28 +1300
Subject: [PATCH v2 3/4] Add READ_STREAM_OUT_OF_ORDER.

If the stream consumer authorizes it, already cached buffers can jump
the queue and be emitted in LIFO order if there are IOs running.  This
gives the storage more time to complete IOs, and the consumer something
to work on immediately.
---
 src/backend/storage/aio/read_stream.c | 214 +++++++++++++++++++++++---
 src/include/storage/read_stream.h     |   8 +
 2 files changed, 202 insertions(+), 20 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index a98213d4df2..e90db1546fd 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -105,6 +105,7 @@ struct ReadStream
 	bool		batch_mode;		/* READ_STREAM_USE_BATCHING */
 	bool		advice_enabled;
 	bool		temporary;
+	bool		out_of_order_enabled;
 
 	/*
 	 * One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -154,6 +155,27 @@ get_per_buffer_data(ReadStream *stream, int16 buffer_index)
 		stream->per_buffer_data_size * buffer_index;
 }
 
+/*
+ * To help catch bugs, wipe per-buffer data when it shouldn't be accessed
+ * again by client code.  This has no effect unless CLOBBER_FREED_MEMORY or
+ * USE_VALGRIND is defined.  After clobbering, this per-buffer data object
+ * must be explicitly marked as undefined or defined before it is accessed
+ * again.
+  */
+static void
+clobber_per_buffer_data(ReadStream *stream, int16 buffer_index)
+{
+#ifdef CLOBBER_FREED_MEMORY
+	/* This also tells Valgrind the memory is "noaccess". */
+	wipe_mem(get_per_buffer_data(stream, buffer_index),
+			 stream->per_buffer_data_size);
+#else
+	/* Tell it ourselves. */
+	VALGRIND_MAKE_MEM_NOACCESS(get_per_buffer_data(stream, buffer_index),
+							   stream->per_buffer_data_size);
+#endif
+}
+
 /*
  * General-use ReadStreamBlockNumberCB for block range scans.  Loops over the
  * blocks [current_blocknum, last_exclusive).
@@ -277,6 +299,158 @@ read_stream_advance_io(ReadStream *stream, int16 *index)
 		*index = 0;
 }
 
+/*
+ * Relocate a single valid buffer that has been stored in FIFO position to
+ * LIFO position so it will be consumed next.
+ *
+ * When there is no per-buffer data, reordering the buffer queue itself is
+ * cheap, as we just relocate a buffer and adjust an index.  Per-buffer data
+ * is more expensive: we often have more of those ahead of us corresponding to
+ * the blocks of the current pending read, and their size is user-controlled
+ * but expected to be very small.  (It's not yet clear if a more complex data
+ * structure is worth the overheads to avoid this.)
+ */
+static void
+read_stream_reorder(ReadStream *stream)
+{
+	int16		gap_index = stream->next_buffer_index;
+	int16		new_index = stream->oldest_buffer_index;	/* retreated below */
+	int16		per_buffer_data_size = stream->per_buffer_data_size;
+	int16		per_buffer_data_count = 0;
+	int16		forwarded_buffers = stream->forwarded_buffers;
+
+	Assert(BufferIsValid(stream->buffers[gap_index]));
+	Assert(stream->ios_in_progress > 0);
+	Assert(stream->out_of_order_enabled);
+
+	/* Move "gap" entry to "new" entry. */
+	read_stream_retreat_buffer(stream, &new_index);
+	Assert(new_index >= stream->initialized_buffers ||
+		   stream->buffers[new_index] == InvalidBuffer);
+	stream->buffers[new_index] = stream->buffers[gap_index];
+	stream->buffers[gap_index] = InvalidBuffer;
+	stream->oldest_buffer_index = new_index;
+
+	/* If there are forwarded buffers (rare), then we'll have to shift them. */
+	if (forwarded_buffers > 0)
+	{
+		int16		zap_index = gap_index + forwarded_buffers;
+
+		Assert(forwarded_buffers < stream->io_combine_limit);
+
+		/* Shift to fill in the gap created above, and zap the final one. */
+		memmove(&stream->buffers[gap_index],
+				&stream->buffers[gap_index + 1],
+				sizeof(stream->buffers[gap_index]) * stream->forwarded_buffers);
+		stream->buffers[zap_index] = InvalidBuffer;
+
+		/*
+		 * If some of them are in the overflow zone then we also need to shift
+		 * the copies that begin at index 0 and zap their trailing element
+		 * too.
+		 */
+		if (zap_index >= stream->queue_size)
+		{
+			int16		copies = zap_index - stream->queue_size;
+
+			memmove(&stream->buffers[0], &stream->buffers[1], copies);
+			stream->buffers[copies] = InvalidBuffer;
+		}
+	}
+
+	/*
+	 * If there is per-buffer data, we also need to relocate one element from
+	 * the "gap" to the "new" index, to keep it in sync with the buffer index.
+	 */
+	if (per_buffer_data_size > 0)
+	{
+		void	   *src = get_per_buffer_data(stream, gap_index);
+		void	   *dst = get_per_buffer_data(stream, new_index);
+
+		/*
+		 * We don't know how much of the per-buffer data was filled in by the
+		 * callback (that's a private matter between the callback and the
+		 * consumer).  We falsely claim that we know the source is entirely
+		 * initialized to avoid Valgrind errors from memcpy(), and also mark
+		 * the destination as defined, because it was previously unused and
+		 * clobbered ("noaccess").
+		 */
+		VALGRIND_MAKE_MEM_DEFINED(src, per_buffer_data_size);
+		VALGRIND_MAKE_MEM_DEFINED(dst, per_buffer_data_size);
+		memcpy(dst, src, per_buffer_data_size);
+
+		/*
+		 * We also have per-buffer data for every block of the current pending
+		 * read, filled in by the callback while it was being built up.
+		 */
+		per_buffer_data_count = stream->pending_read_nblocks;
+
+		/*
+		 * We might also have one extra object beyond that, if we had to
+		 * "unget" a block that couldn't be combined.
+		 */
+		if (stream->buffered_blocknum != InvalidBlockNumber)
+			per_buffer_data_count++;
+
+		Assert(forwarded_buffers < stream->io_combine_limit);
+		Assert(forwarded_buffers <= per_buffer_data_count);
+	}
+
+	/* Are there additional per-buffer objects to shift? */
+	if (per_buffer_data_count > 0)
+	{
+		int16		remaining = per_buffer_data_count;
+		int16		contiguous;
+		void	   *src;
+		void	   *dst;
+
+		/*
+		 * This first memmove() should often shift all of them.  (See note
+		 * above about Valgrind vs partial initialization.)
+		 */
+		contiguous = Min(remaining, stream->queue_size - (gap_index + 1));
+		dst = get_per_buffer_data(stream, gap_index);
+		src = get_per_buffer_data(stream, gap_index + 1);
+		VALGRIND_MAKE_MEM_DEFINED(src, contiguous * per_buffer_data_size);
+		memmove(dst, src, contiguous * per_buffer_data_size);
+		remaining -= contiguous;
+
+		/*
+		 * If it doesn't, then we need to deal with wraparound.  Start by
+		 * rotating one object from the physical start to the physical end.
+		 */
+		if (remaining > 0)
+		{
+			dst = get_per_buffer_data(stream, stream->queue_size - 1);
+			src = get_per_buffer_data(stream, 0);
+			VALGRIND_MAKE_MEM_DEFINED(src, per_buffer_data_size);
+			memcpy(dst, src, per_buffer_data_size);
+			remaining -= 1;
+
+			/*
+			 * If any are left after that, they must be contiguous at the
+			 * physical start of the queue, so shift them too.
+			 */
+			if (remaining > 0)
+			{
+				dst = get_per_buffer_data(stream, 0);
+				src = get_per_buffer_data(stream, 1);
+				VALGRIND_MAKE_MEM_DEFINED(src, remaining * per_buffer_data_size);
+				memmove(dst, src, remaining * per_buffer_data_size);
+			}
+		}
+	}
+
+	/* Clobber the final per-buffer-data object. */
+	if (per_buffer_data_size > 0)
+	{
+		int16		zap_index = gap_index;
+
+		read_stream_advance_buffer_n(stream, &zap_index, per_buffer_data_count);
+		clobber_per_buffer_data(stream, zap_index);
+	}
+}
+
 /*
  * Start as much of the current pending read as we can.  If we have to split it
  * because of the per-backend buffer limit, or the buffer manager decides to
@@ -450,9 +624,21 @@ read_stream_start_pending_read(ReadStream *stream)
 			   sizeof(stream->buffers[0]) * overflow);
 	}
 
-	/* Move to the location of start of next read. */
-	read_stream_advance_buffer_n(stream, &buffer_index, nblocks);
-	stream->next_buffer_index = buffer_index;
+	if (stream->out_of_order_enabled &&
+		stream->ios_in_progress > 0 &&
+		!need_wait &&
+		nblocks == 1)
+	{
+		/* Promote cached block to LIFO order, jumping in front of IOs. */
+		read_stream_reorder(stream);
+	}
+	else
+	{
+		/* Advance to the position of next read, maintaining FIFO order. */
+		read_stream_advance_buffer_n(stream,
+									 &stream->next_buffer_index,
+									 nblocks);
+	}
 
 	/* Adjust the pending read to cover the remaining portion, if any. */
 	stream->pending_read_blocknum += nblocks;
@@ -709,6 +895,9 @@ read_stream_begin_impl(int flags,
 		stream->advice_enabled = true;
 #endif
 
+	if (flags & READ_STREAM_OUT_OF_ORDER)
+		stream->out_of_order_enabled = true;
+
 	/*
 	 * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
 	 * we still need to allocate space to combine and run one I/O.  Bump it up
@@ -925,7 +1114,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	buffer = stream->buffers[oldest_buffer_index];
 	if (per_buffer_data)
 		*per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
-
 	Assert(BufferIsValid(buffer));
 
 	/* Do we have to wait for an associated I/O first? */
@@ -971,8 +1159,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		stream->buffers[stream->queue_size + oldest_buffer_index] =
 			InvalidBuffer;
 
-#if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
-
 	/*
 	 * The caller will get access to the per-buffer data, until the next call.
 	 * We wipe the one before, which is never occupied because queue_size
@@ -981,23 +1167,11 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	 */
 	if (stream->per_buffer_data)
 	{
-		int16		index;
-		void	   *per_buffer_data;
+		int16		index = oldest_buffer_index;
 
-		index = oldest_buffer_index;
 		read_stream_retreat_buffer(stream, &index);
-		per_buffer_data = get_per_buffer_data(stream, index);
-
-#if defined(CLOBBER_FREED_MEMORY)
-		/* This also tells Valgrind the memory is "noaccess". */
-		wipe_mem(per_buffer_data, stream->per_buffer_data_size);
-#elif defined(USE_VALGRIND)
-		/* Tell it ourselves. */
-		VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
-								   stream->per_buffer_data_size);
-#endif
+		clobber_per_buffer_data(stream, index);
 	}
-#endif
 
 	/* Pin transferred to caller. */
 	Assert(stream->pinned_buffers > 0);
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index 9b0d65161d0..5af2f1d0ec7 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -63,6 +63,14 @@
  */
 #define READ_STREAM_USE_BATCHING 0x08
 
+/*
+ * Blocks are usually streamed in FIFO order.  This flag allows reordering,
+ * for consumers that can deal with out-of-order buffers.  Whenever IOs are
+ * running, any already-cached buffers found in the look-ahead window jump
+ * directly to the front of the queue, ready to be consumed immediately.
+ */
+#define READ_STREAM_OUT_OF_ORDER 0x10
+
 struct ReadStream;
 typedef struct ReadStream ReadStream;
 
-- 
2.39.5

