From e0d086eba083b105dfc59120d9a8d043602c1cb7 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 31 Mar 2026 12:45:41 -0400
Subject: [PATCH v4 06/14] WIP: read stream: Split decision about look ahead
 for AIO and combining

"read_stream: Only increase distance when waiting for IO" caused a regression
due to the this conflation, because we ended up not allowing IO combining when
never needing to wait for IO (as the distance ended up too small to allow for
that), which increased CPU overhead.

If we go this way, it probably should be moved ahead of the aforementioned
commit.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/storage/aio/read_stream.c | 147 ++++++++++++++++++++------
 1 file changed, 115 insertions(+), 32 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 5d7bfb8fa00..9d22a119bef 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -98,10 +98,14 @@ struct ReadStream
 	int16		max_pinned_buffers;
 	int16		forwarded_buffers;
 	int16		pinned_buffers;
-	int16		distance;
+	/* limit of how far to look ahead for IO combining */
+	int16		combine_distance;
+	/* limit of how far to look ahead for starting IO early */
+	int16		readahead_distance;
 	uint16		distance_decay_holdoff;
 	int16		initialized_buffers;
-	int16		resume_distance;
+	int16		resume_readahead_distance;
+	int16		resume_combine_distance;
 	int			read_buffers_flags;
 	bool		sync_mode;		/* using io_method=sync */
 	bool		batch_mode;		/* READ_STREAM_USE_BATCHING */
@@ -332,8 +336,8 @@ read_stream_start_pending_read(ReadStream *stream)
 
 		/* Shrink distance: no more look-ahead until buffers are released. */
 		new_distance = stream->pinned_buffers + buffer_limit;
-		if (stream->distance > new_distance)
-			stream->distance = new_distance;
+		if (stream->readahead_distance > new_distance)
+			stream->readahead_distance = new_distance;
 
 		/* Unless we have nothing to give the consumer, stop here. */
 		if (stream->pinned_buffers > 0)
@@ -374,12 +378,23 @@ read_stream_start_pending_read(ReadStream *stream)
 		 * perform IO asynchronously when starting out with a small look-ahead
 		 * distance.
 		 */
-		if (stream->distance > 1 && stream->ios_in_progress == 0)
+		if (stream->ios_in_progress == 0)
 		{
-			if (stream->distance_decay_holdoff == 0)
-				stream->distance--;
-			else
+			if (stream->distance_decay_holdoff > 0)
 				stream->distance_decay_holdoff--;
+			else
+			{
+				if (stream->readahead_distance > 1)
+					stream->readahead_distance--;
+
+				/*
+				 * XXX: Should we actually reduce this at any time other than
+				 * a reset? For now we have to, as this is also a condition
+				 * for re-enabling fast_path.
+				 */
+				if (stream->combine_distance > 1)
+					stream->combine_distance--;
+			}
 		}
 	}
 	else
@@ -452,18 +467,33 @@ read_stream_should_look_ahead(ReadStream *stream)
 		return false;
 
 	/* If the callback has signaled end-of-stream, we're done */
-	if (stream->distance == 0)
+	if (stream->readahead_distance == 0)
 		return false;
 
 	/* never pin more buffers than allowed */
 	if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->max_pinned_buffers)
 		return false;
 
+	/*
+	 * Allow looking further ahead if we have an the process of building a
+	 * larger IO, the IO is not yet big enough and we don't yet have IO in
+	 * flight.  Note that this is allowed even if we are reaching the
+	 * read-ahead limit (but not the buffer pin limit).
+	 *
+	 * This is important for cases where either effective_io_concurrency is
+	 * low or we never need to wait for IO and thus are not increasing the
+	 * distance. Without this we would end up with lots of small IOs.
+	 */
+	if (stream->pending_read_nblocks > 0 &&
+		stream->pinned_buffers == 0 &&
+		stream->pending_read_nblocks < stream->combine_distance)
+		return true;
+
 	/*
 	 * Don't start more read-ahead if that'd put us over the distance limit
 	 * for doing read-ahead.
 	 */
-	if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->distance)
+	if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
 		return false;
 
 	return true;
@@ -492,7 +522,7 @@ read_stream_should_issue_now(ReadStream *stream)
 	 * If the callback has signaled end-of-stream, start the pending read
 	 * immediately. There is no further potential for IO combining.
 	 */
-	if (stream->distance == 0)
+	if (stream->readahead_distance == 0)
 		return true;
 
 	/*
@@ -502,6 +532,11 @@ read_stream_should_issue_now(ReadStream *stream)
 	if (pending_read_nblocks >= stream->io_combine_limit)
 		return true;
 
+	/* same if capped not by io_combine_limit but combine_distance */
+	if (stream->combine_distance > 0 &&
+		pending_read_nblocks >= stream->combine_distance)
+		return true;
+
 	/*
 	 * If we currently have no reads in flight or prepared, issue the IO once
 	 * we are not looking ahead further. This ensures there's always at least
@@ -552,7 +587,8 @@ read_stream_look_ahead(ReadStream *stream)
 		if (blocknum == InvalidBlockNumber)
 		{
 			/* End of stream. */
-			stream->distance = 0;
+			stream->readahead_distance = 0;
+			stream->combine_distance = 0;
 			break;
 		}
 
@@ -599,7 +635,7 @@ read_stream_look_ahead(ReadStream *stream)
 	 * stream.  In the worst case we can always make progress one buffer at a
 	 * time.
 	 */
-	Assert(stream->pinned_buffers > 0 || stream->distance == 0);
+	Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
 
 	if (stream->batch_mode)
 		pgaio_exit_batchmode();
@@ -789,10 +825,17 @@ read_stream_begin_impl(int flags,
 	 * doing full io_combine_limit sized reads.
 	 */
 	if (flags & READ_STREAM_FULL)
-		stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
+	{
+		stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
+		stream->combine_distance = stream->io_combine_limit;
+	}
 	else
-		stream->distance = 1;
-	stream->resume_distance = stream->distance;
+	{
+		stream->readahead_distance = 1;
+		stream->combine_distance = 1;
+	}
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
 
 	/*
 	 * Since we always access the same relation, we can initialize parts of
@@ -891,7 +934,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->ios_in_progress == 0);
 		Assert(stream->forwarded_buffers == 0);
 		Assert(stream->pinned_buffers == 1);
-		Assert(stream->distance == 1);
+		Assert(stream->readahead_distance == 1);
+		Assert(stream->combine_distance == 1);
 		Assert(stream->pending_read_nblocks == 0);
 		Assert(stream->per_buffer_data_size == 0);
 		Assert(stream->initialized_buffers > stream->oldest_buffer_index);
@@ -965,7 +1009,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		else
 		{
 			/* No more blocks, end of stream. */
-			stream->distance = 0;
+			stream->readahead_distance = 0;
 			stream->oldest_buffer_index = stream->next_buffer_index;
 			stream->pinned_buffers = 0;
 			stream->buffers[oldest_buffer_index] = InvalidBuffer;
@@ -981,7 +1025,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->oldest_buffer_index == stream->next_buffer_index);
 
 		/* End of stream reached?  */
-		if (stream->distance == 0)
+		if (stream->readahead_distance == 0)
 			return InvalidBuffer;
 
 		/*
@@ -995,7 +1039,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		/* End of stream reached? */
 		if (stream->pinned_buffers == 0)
 		{
-			Assert(stream->distance == 0);
+			Assert(stream->readahead_distance == 0);
 			return InvalidBuffer;
 		}
 	}
@@ -1016,7 +1060,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
 	{
 		int16		io_index = stream->oldest_io_index;
-		int32		distance;	/* wider temporary value, clamped below */
 		bool		needed_wait;
 
 		/* Sanity check that we still agree on the buffers. */
@@ -1056,11 +1099,38 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		 * the stream, as stream->distance == 0 is used to keep track of
 		 * having reached the end.
 		 */
-		if (stream->distance > 0 && needed_wait)
+		if (stream->readahead_distance > 0 && needed_wait)
 		{
-			distance = stream->distance * 2;
-			distance = Min(distance, stream->max_pinned_buffers);
-			stream->distance = distance;
+			/* wider temporary value, due to oveflow risk */
+			int32		readahead_distance;
+
+			readahead_distance = stream->readahead_distance * 2;
+			readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
+			stream->readahead_distance = readahead_distance;
+		}
+
+		/*
+		 * Whether we needed to wait or not, allow for more IO combining if we
+		 * needed to do IO. The reason to do so independent of needing to wait
+		 * is that when the data is resident in the kernel page cache, IO
+		 * combining reduces the syscall / dispatch overhead, making it
+		 * worthwhile regardless of needing to wait.
+		 *
+		 * It is also important with io_uring as it will never signal the need
+		 * to wait for reads if all the data is in the page cache. There are
+		 * heuristics to deal with that in method_io_uring.c, but they only
+		 * work when the IO gets large enough.
+		 */
+		if (stream->combine_distance > 0 &&
+			stream->combine_distance < stream->io_combine_limit)
+		{
+			/* wider temporary value, due to oveflow risk */
+			int32		combine_distance;
+
+			combine_distance = stream->combine_distance * 2;
+			combine_distance = Min(combine_distance, stream->io_combine_limit);
+			combine_distance = Min(combine_distance, stream->max_pinned_buffers);
+			stream->combine_distance = combine_distance;
 		}
 
 		/*
@@ -1139,10 +1209,18 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 
 #ifndef READ_STREAM_DISABLE_FAST_PATH
 	/* See if we can take the fast path for all-cached scans next time. */
+	/*
+	 * FIXME: It's way too easy to wrongly fast path. I'm pretty sure there's
+	 * several pre-existing cases where it triggers because we are not issuing
+	 * additional prefetching (e.g. because of a small
+	 * effective_io_concurrency) and thus stream->pinned_buffers stays at 1
+	 * after read_stream_look_ahead().
+	 */
 	if (stream->ios_in_progress == 0 &&
 		stream->forwarded_buffers == 0 &&
 		stream->pinned_buffers == 1 &&
-		stream->distance == 1 &&
+		stream->readahead_distance == 1 &&
+		stream->combine_distance == 1 &&
 		stream->pending_read_nblocks == 0 &&
 		stream->per_buffer_data_size == 0)
 	{
@@ -1188,8 +1266,9 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 BlockNumber
 read_stream_pause(ReadStream *stream)
 {
-	stream->resume_distance = stream->distance;
-	stream->distance = 0;
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
+	stream->readahead_distance = 0;
 	return InvalidBlockNumber;
 }
 
@@ -1201,7 +1280,8 @@ read_stream_pause(ReadStream *stream)
 void
 read_stream_resume(ReadStream *stream)
 {
-	stream->distance = stream->resume_distance;
+	stream->readahead_distance = stream->resume_readahead_distance;
+	stream->combine_distance = stream->resume_combine_distance;
 }
 
 /*
@@ -1217,7 +1297,8 @@ read_stream_reset(ReadStream *stream)
 	Buffer		buffer;
 
 	/* Stop looking ahead. */
-	stream->distance = 0;
+	stream->readahead_distance = 0;
+	stream->combine_distance = 0;
 
 	/* Forget buffered block number and fast path state. */
 	stream->buffered_blocknum = InvalidBlockNumber;
@@ -1249,8 +1330,10 @@ read_stream_reset(ReadStream *stream)
 	Assert(stream->ios_in_progress == 0);
 
 	/* Start off assuming data is cached. */
-	stream->distance = 1;
-	stream->resume_distance = stream->distance;
+	stream->readahead_distance = 1;
+	stream->combine_distance = 1;
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
 	stream->distance_decay_holdoff = 0;
 }
 
-- 
2.53.0.1.gb2826b52eb

