From 8ead766192b863a8738208e358f07d845a91677a Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 18 Mar 2025 14:40:06 -0400
Subject: [PATCH v2.10 09/28] Support buffer forwarding in read_stream.c.

In preparation for a following change to the buffer manager, teach
streams to keep track of buffers that were "forwarded" from one call to
StartReadBuffers() to the next.

Since StartReadBuffers() buffers argument will become an in/out
argument, we need to initialize the buffer queue entries with
InvalidBuffer.  We don't want to do that up front, because we try to
keep stream initialization cheap for code that uses the fast path and
stays in one single buffer queue element.  Satisfy both goals by
initializing the queue incrementally on the first cycle.

Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
---
 src/backend/storage/aio/read_stream.c | 103 +++++++++++++++++++++++---
 1 file changed, 93 insertions(+), 10 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index d65fa07b44c..cdf4b5a86a2 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -95,8 +95,10 @@ struct ReadStream
 	int16		ios_in_progress;
 	int16		queue_size;
 	int16		max_pinned_buffers;
+	int16		forwarded_buffers;
 	int16		pinned_buffers;
 	int16		distance;
+	int16		initialized_buffers;
 	bool		advice_enabled;
 	bool		temporary;
 
@@ -224,8 +226,10 @@ static bool
 read_stream_start_pending_read(ReadStream *stream)
 {
 	bool		need_wait;
+	int			requested_nblocks;
 	int			nblocks;
 	int			flags;
+	int			forwarded;
 	int16		io_index;
 	int16		overflow;
 	int16		buffer_index;
@@ -272,11 +276,20 @@ read_stream_start_pending_read(ReadStream *stream)
 		}
 	}
 
-	/* How many more buffers is this backend allowed? */
+	/*
+	 * How many more buffers is this backend allowed?
+	 *
+	 * If we already have some forwarded buffers, we can certainly use those.
+	 * They are already pinned, and are mapped to the starting blocks of the
+	 * pending read, they just don't have any I/O started yet and are not
+	 * counted in stream->pinned_buffers.
+	 */
 	if (stream->temporary)
 		buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
 	else
 		buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
+	Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
+	buffer_limit += stream->forwarded_buffers;
 	if (buffer_limit == 0 && stream->pinned_buffers == 0)
 		buffer_limit = 1;		/* guarantee progress */
 
@@ -303,8 +316,31 @@ read_stream_start_pending_read(ReadStream *stream)
 	 * We say how many blocks we want to read, but it may be smaller on return
 	 * if the buffer manager decides to shorten the read.
 	 */
+	requested_nblocks = Min(buffer_limit, stream->pending_read_nblocks);
+	nblocks = requested_nblocks;
 	buffer_index = stream->next_buffer_index;
 	io_index = stream->next_io_index;
+
+	/*
+	 * The first time around the queue we initialize it as we go, including
+	 * the overflow zone, because otherwise the entries would appear as
+	 * forwarded buffers.  This avoids initializing the whole queue up front
+	 * in cases where it is large but we don't ever use it due to the
+	 * all-cached fast path or small scans.
+	 */
+	while (stream->initialized_buffers < buffer_index + nblocks)
+		stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
+
+	/*
+	 * Start the I/O.  Any buffers that are not InvalidBuffer will be
+	 * interpreted as already pinned, forwarded by an earlier call to
+	 * StartReadBuffers(), and must map to the expected blocks.  The nblocks
+	 * value may be smaller on return indicating the size of the I/O that
+	 * could be started.  Buffers beyond the output nblocks number may also
+	 * have been pinned without starting I/O due to various edge cases.  In
+	 * that case we'll just leave them in the queue ahead of us, "forwarded"
+	 * to the next call, avoiding the need to unpin/repin.
+	 */
 	need_wait = StartReadBuffers(&stream->ios[io_index].op,
 								 &stream->buffers[buffer_index],
 								 stream->pending_read_blocknum,
@@ -333,16 +369,35 @@ read_stream_start_pending_read(ReadStream *stream)
 		stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
 	}
 
+	/*
+	 * How many pins were acquired but forwarded to the next call?  These need
+	 * to be passed to the next StartReadBuffers() call, or released if the
+	 * stream ends early.  We need the number for accounting purposes, since
+	 * they are not counted in stream->pinned_buffers but we already hold
+	 * them.
+	 */
+	forwarded = 0;
+	while (nblocks + forwarded < requested_nblocks &&
+		   stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
+		forwarded++;
+	stream->forwarded_buffers = forwarded;
+
 	/*
 	 * We gave a contiguous range of buffer space to StartReadBuffers(), but
-	 * we want it to wrap around at queue_size.  Slide overflowing buffers to
-	 * the front of the array.
+	 * we want it to wrap around at queue_size.  Copy overflowing buffers to
+	 * the front of the array where they'll be consumed, but also leave a copy
+	 * in the overflow zone which the I/O operation has a pointer to (it needs
+	 * a contiguous array).  Both copies will be cleared when the buffers are
+	 * handed to the consumer.
 	 */
-	overflow = (buffer_index + nblocks) - stream->queue_size;
+	overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
 	if (overflow > 0)
-		memmove(&stream->buffers[0],
-				&stream->buffers[stream->queue_size],
-				sizeof(stream->buffers[0]) * overflow);
+	{
+		Assert(overflow < stream->queue_size);	/* can't overlap */
+		memcpy(&stream->buffers[0],
+			   &stream->buffers[stream->queue_size],
+			   sizeof(stream->buffers[0]) * overflow);
+	}
 
 	/* Compute location of start of next read, without using % operator. */
 	buffer_index += nblocks;
@@ -718,10 +773,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 
 		/* Fast path assumptions. */
 		Assert(stream->ios_in_progress == 0);
+		Assert(stream->forwarded_buffers == 0);
 		Assert(stream->pinned_buffers == 1);
 		Assert(stream->distance == 1);
 		Assert(stream->pending_read_nblocks == 0);
 		Assert(stream->per_buffer_data_size == 0);
+		Assert(stream->initialized_buffers > stream->oldest_buffer_index);
 
 		/* We're going to return the buffer we pinned last time. */
 		oldest_buffer_index = stream->oldest_buffer_index;
@@ -770,6 +827,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			stream->distance = 0;
 			stream->oldest_buffer_index = stream->next_buffer_index;
 			stream->pinned_buffers = 0;
+			stream->buffers[oldest_buffer_index] = InvalidBuffer;
 		}
 
 		stream->fast_path = false;
@@ -845,10 +903,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			stream->seq_until_processed = InvalidBlockNumber;
 	}
 
-#ifdef CLOBBER_FREED_MEMORY
-	/* Clobber old buffer for debugging purposes. */
+	/*
+	 * We must zap this queue entry, or else it would appear as a forwarded
+	 * buffer.  If it's potentially in the overflow zone (ie it wrapped around
+	 * the queue), also zap that copy.
+	 */
 	stream->buffers[oldest_buffer_index] = InvalidBuffer;
-#endif
+	if (oldest_buffer_index < io_combine_limit - 1)
+		stream->buffers[stream->queue_size + oldest_buffer_index] =
+			InvalidBuffer;
 
 #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
 
@@ -893,6 +956,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 #ifndef READ_STREAM_DISABLE_FAST_PATH
 	/* See if we can take the fast path for all-cached scans next time. */
 	if (stream->ios_in_progress == 0 &&
+		stream->forwarded_buffers == 0 &&
 		stream->pinned_buffers == 1 &&
 		stream->distance == 1 &&
 		stream->pending_read_nblocks == 0 &&
@@ -928,6 +992,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 void
 read_stream_reset(ReadStream *stream)
 {
+	int16		index;
 	Buffer		buffer;
 
 	/* Stop looking ahead. */
@@ -941,6 +1006,24 @@ read_stream_reset(ReadStream *stream)
 	while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
 		ReleaseBuffer(buffer);
 
+	/* Unpin any unused forwarded buffers. */
+	index = stream->next_buffer_index;
+	while (index < stream->initialized_buffers &&
+		   (buffer = stream->buffers[index]) != InvalidBuffer)
+	{
+		Assert(stream->forwarded_buffers > 0);
+		stream->forwarded_buffers--;
+		ReleaseBuffer(buffer);
+
+		stream->buffers[index] = InvalidBuffer;
+		if (index < io_combine_limit - 1)
+			stream->buffers[stream->queue_size + index] = InvalidBuffer;
+
+		if (++index == stream->queue_size)
+			index = 0;
+	}
+
+	Assert(stream->forwarded_buffers == 0);
 	Assert(stream->pinned_buffers == 0);
 	Assert(stream->ios_in_progress == 0);
 
-- 
2.48.1.76.g4e746b1a31.dirty

