From ab0e0c198b5299cf015a7c9ef357d8651a776aef Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Mon, 30 Mar 2026 12:25:07 -0400
Subject: [PATCH v3 08/10] WIP: read stream: Split decision about look ahead
 for AIO and combining

Previous commits caused a regression due to the this conflation. This is a
first attempt at fixing the problem.  Needs significant reordering and
splitting if it works out.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/storage/aio/read_stream.c | 242 +++++++++++++++++++++-----
 1 file changed, 195 insertions(+), 47 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 144b3613c92..1c375edad1d 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -98,10 +98,14 @@ struct ReadStream
 	int16		max_pinned_buffers;
 	int16		forwarded_buffers;
 	int16		pinned_buffers;
-	int16		distance;
+	/* limit of how far to read ahead for IO combining */
+	int16		combine_distance;
+	/* limit of how far to read ahead for starting IO early */
+	int16		readahead_distance;
 	uint16		distance_decay_holdoff;
 	int16		initialized_buffers;
-	int16		resume_distance;
+	int16		resume_readahead_distance;
+	int16		resume_combine_distance;
 	int			read_buffers_flags;
 	bool		sync_mode;		/* using io_method=sync */
 	bool		batch_mode;		/* READ_STREAM_USE_BATCHING */
@@ -332,8 +336,8 @@ read_stream_start_pending_read(ReadStream *stream)
 
 		/* Shrink distance: no more look-ahead until buffers are released. */
 		new_distance = stream->pinned_buffers + buffer_limit;
-		if (stream->distance > new_distance)
-			stream->distance = new_distance;
+		if (stream->readahead_distance > new_distance)
+			stream->readahead_distance = new_distance;
 
 		/* Unless we have nothing to give the consumer, stop here. */
 		if (stream->pinned_buffers > 0)
@@ -374,12 +378,23 @@ read_stream_start_pending_read(ReadStream *stream)
 		 * perform IO asynchronously when starting out with a small look-ahead
 		 * distance.
 		 */
-		if (stream->distance > 1 && stream->ios_in_progress == 0)
+		if (stream->ios_in_progress == 0)
 		{
-			if (stream->distance_decay_holdoff == 0)
-				stream->distance--;
-			else
+			if (stream->distance_decay_holdoff > 0)
 				stream->distance_decay_holdoff--;
+			else
+			{
+				if (stream->readahead_distance > 1)
+					stream->readahead_distance--;
+
+				/*
+				 * XXX: Should we actually reduce this at any time other than
+				 * a reset? For now we have to, as this is also a condition
+				 * for re-enabling fast_path.
+				 */
+				if (stream->combine_distance > 1)
+					stream->combine_distance--;
+			}
 		}
 	}
 	else
@@ -440,6 +455,101 @@ read_stream_start_pending_read(ReadStream *stream)
 	return true;
 }
 
+/*
+ * Should we continue to perform look ahead?  The look ahead may allow us to
+ * make the pending IO larger via IO combining or to issue more read ahead.
+ */
+static bool
+read_stream_should_look_ahead(ReadStream *stream)
+{
+	/* never start more IOs than our cap */
+	if (stream->ios_in_progress >= stream->max_ios)
+		return false;
+
+	/* If the callback has signaled end-of-stream, we're done */
+	if (stream->readahead_distance <= 0)
+		return false;
+
+	/* never pin more buffers than allowed */
+	if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->max_pinned_buffers)
+		return false;
+
+	/*
+	 * Allow looking further ahead if we have an the process of building a
+	 * larger IO, the IO is not yet big enough and we don't yet have IO in
+	 * flight.  Note that this is allowed even if we are reaching the
+	 * readahead limit (but not the buffer pin limit).
+	 *
+	 * This is important for cases where either effective_io_concurrency is
+	 * low or we never need to wait for IO and thus are not increasing the
+	 * distance. Without this we would end up with lots of small IOs.
+	 */
+	if (stream->pending_read_nblocks > 0 &&
+		stream->pinned_buffers == 0 &&
+		stream->pending_read_nblocks < stream->combine_distance)
+		return true;
+
+	/*
+	 * Don't start more readahead if that'd put us over the limit for doing
+	 * readahead.
+	 */
+	if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
+		return false;
+
+	return true;
+}
+
+
+/*
+ * We don't start the pending read just because we've hit the distance limit,
+ * preferring to give it another chance to grow to full io_combine_limit size
+ * once more buffers have been consumed.  But this is not desirable in all
+ * situations - see below.
+ */
+static bool
+read_stream_should_issue_now(ReadStream *stream)
+{
+	int16		pending_read_nblocks = stream->pending_read_nblocks;
+
+	/* no IO to issue */
+	if (pending_read_nblocks == 0)
+		return false;
+
+	/* never start more IOs than our cap */
+	if (stream->ios_in_progress >= stream->max_ios)
+		return false;
+
+	/*
+	 * If the callback has signaled end-of-stream, start the read
+	 * immediately. There's no deferring it for later.
+	 */
+	if (stream->readahead_distance <= 0)
+		return true;
+
+	/*
+	 * If we've already reached io_combine_limit, there's no chance of growing
+	 * the read further.
+	 */
+	if (pending_read_nblocks >= stream->io_combine_limit)
+		return true;
+
+	/* same if capped not by io_combine_limit but combine_distance */
+	if (stream->combine_distance > 0 &&
+		pending_read_nblocks >= stream->combine_distance)
+		return true;
+
+	/*
+	 * If we currently have no reads in flight or prepared, issue the IO once
+	 * we stopped looking ahead. This ensures there's always at least one IO
+	 * prepared.
+	 */
+	if (stream->pinned_buffers == 0 &&
+		!read_stream_should_look_ahead(stream))
+		return true;
+
+	return false;
+}
+
 static void
 read_stream_look_ahead(ReadStream *stream)
 {
@@ -452,14 +562,13 @@ read_stream_look_ahead(ReadStream *stream)
 	if (stream->batch_mode)
 		pgaio_enter_batchmode();
 
-	while (stream->ios_in_progress < stream->max_ios &&
-		   stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
+	while (read_stream_should_look_ahead(stream))
 	{
 		BlockNumber blocknum;
 		int16		buffer_index;
 		void	   *per_buffer_data;
 
-		if (stream->pending_read_nblocks == stream->io_combine_limit)
+		if (read_stream_should_issue_now(stream))
 		{
 			read_stream_start_pending_read(stream);
 			continue;
@@ -479,7 +588,8 @@ read_stream_look_ahead(ReadStream *stream)
 		if (blocknum == InvalidBlockNumber)
 		{
 			/* End of stream. */
-			stream->distance = 0;
+			stream->readahead_distance = 0;
+			stream->combine_distance = 0;
 			break;
 		}
 
@@ -511,21 +621,13 @@ read_stream_look_ahead(ReadStream *stream)
 	}
 
 	/*
-	 * We don't start the pending read just because we've hit the distance
-	 * limit, preferring to give it another chance to grow to full
-	 * io_combine_limit size once more buffers have been consumed.  However,
-	 * if we've already reached io_combine_limit, or we've reached the
-	 * distance limit and there isn't anything pinned yet, or the callback has
-	 * signaled end-of-stream, we start the read immediately.  Note that the
-	 * pending read can exceed the distance goal, if the latter was reduced
-	 * after hitting the per-backend buffer limit.
+	 * Check if the pending read should be issued now, or if we should give it
+	 * another chance to grow to the full size.
+	 *
+	 * Note that the pending read can exceed the distance goal, if the latter
+	 * was reduced after hitting the per-backend buffer limit.
 	 */
-	if (stream->pending_read_nblocks > 0 &&
-		(stream->pending_read_nblocks == stream->io_combine_limit ||
-		 (stream->pending_read_nblocks >= stream->distance &&
-		  stream->pinned_buffers == 0) ||
-		 stream->distance <= 0) &&
-		stream->ios_in_progress < stream->max_ios)
+	if (read_stream_should_issue_now(stream))
 		read_stream_start_pending_read(stream);
 
 	/*
@@ -534,7 +636,7 @@ read_stream_look_ahead(ReadStream *stream)
 	 * stream.  In the worst case we can always make progress one buffer at a
 	 * time.
 	 */
-	Assert(stream->pinned_buffers > 0 || stream->distance <= 0);
+	Assert(stream->pinned_buffers > 0 || stream->readahead_distance <= 0);
 
 	if (stream->batch_mode)
 		pgaio_exit_batchmode();
@@ -724,10 +826,17 @@ read_stream_begin_impl(int flags,
 	 * doing full io_combine_limit sized reads.
 	 */
 	if (flags & READ_STREAM_FULL)
-		stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
+	{
+		stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
+		stream->combine_distance = stream->io_combine_limit;
+	}
 	else
-		stream->distance = 1;
-	stream->resume_distance = stream->distance;
+	{
+		stream->readahead_distance = 1;
+		stream->combine_distance = 1;
+	}
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
 
 	/*
 	 * Since we always access the same relation, we can initialize parts of
@@ -826,7 +935,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->ios_in_progress == 0);
 		Assert(stream->forwarded_buffers == 0);
 		Assert(stream->pinned_buffers == 1);
-		Assert(stream->distance == 1);
+		Assert(stream->readahead_distance == 1);
 		Assert(stream->pending_read_nblocks == 0);
 		Assert(stream->per_buffer_data_size == 0);
 		Assert(stream->initialized_buffers > stream->oldest_buffer_index);
@@ -900,7 +1009,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		else
 		{
 			/* No more blocks, end of stream. */
-			stream->distance = 0;
+			stream->readahead_distance = 0;
 			stream->oldest_buffer_index = stream->next_buffer_index;
 			stream->pinned_buffers = 0;
 			stream->buffers[oldest_buffer_index] = InvalidBuffer;
@@ -916,7 +1025,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->oldest_buffer_index == stream->next_buffer_index);
 
 		/* End of stream reached?  */
-		if (stream->distance <= 0)
+		if (stream->readahead_distance <= 0)
 			return InvalidBuffer;
 
 		/*
@@ -930,7 +1039,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		/* End of stream reached? */
 		if (stream->pinned_buffers == 0)
 		{
-			Assert(stream->distance <= 0);
+			Assert(stream->readahead_distance <= 0);
 			return InvalidBuffer;
 		}
 	}
@@ -951,7 +1060,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
 	{
 		int16		io_index = stream->oldest_io_index;
-		int32		distance;	/* wider temporary value, clamped below */
 		bool		needed_wait;
 
 		/* Sanity check that we still agree on the buffers. */
@@ -962,7 +1070,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		 * If the stream has been reset, don't even wait for the IO, just
 		 * discard it.
 		 */
-		if (stream->distance < 0)
+		if (stream->readahead_distance < 0)
 		{
 			if (pgaio_wref_valid(&stream->ios[io_index].op.io_wref) &&
 				!stream->ios[io_index].op.foreign_io)
@@ -1011,11 +1119,38 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		 * the stream, as stream->distance == 0 is used to keep track of
 		 * having reached the end.
 		 */
-		if (stream->distance > 0 && needed_wait)
+		if (stream->readahead_distance > 0 && needed_wait)
 		{
-			distance = stream->distance * 2;
-			distance = Min(distance, stream->max_pinned_buffers);
-			stream->distance = distance;
+			/* wider temporary value, due to oveflow risk */
+			int32		readahead_distance;
+
+			readahead_distance = stream->readahead_distance * 2;
+			readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
+			stream->readahead_distance = readahead_distance;
+		}
+
+		/*
+		 * Whether we needed to wait or not, allow for more IO combining if we
+		 * needed to do IO. The reason to do so independent of needing to wait
+		 * is that when the data is resident in the kernel page cache, IO
+		 * combining reduces the syscall / dispatch overhead, making it
+		 * worthwhile regardless of needing to wait.
+		 *
+		 * It is also important with io_uring as it will never signal the need
+		 * to wait for reads if all the data is in the page cache. There are
+		 * heuristics to deal with that in method_io_uring.c, but they only
+		 * work when the IO gets large enough.
+		 */
+		if (stream->combine_distance > 0 &&
+			stream->combine_distance < stream->io_combine_limit)
+		{
+			/* wider temporary value, due to oveflow risk */
+			int32		combine_distance;
+
+			combine_distance = stream->combine_distance * 2;
+			combine_distance = Min(combine_distance, stream->io_combine_limit);
+			combine_distance = Min(combine_distance, stream->max_pinned_buffers);
+			stream->combine_distance = combine_distance;
 		}
 
 		/*
@@ -1094,10 +1229,18 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 
 #ifndef READ_STREAM_DISABLE_FAST_PATH
 	/* See if we can take the fast path for all-cached scans next time. */
+	/*
+	 * FIXME: It's way too easy to wrongly fast path. I'm pretty sure there's
+	 * several pre-existing cases where it triggers because we are not issuing
+	 * additional prefetching (e.g. because of a small
+	 * effective_io_concurrency) and thus stream->pinned_buffers stays at 1
+	 * after read_stream_look_ahead().
+	 */
 	if (stream->ios_in_progress == 0 &&
 		stream->forwarded_buffers == 0 &&
 		stream->pinned_buffers == 1 &&
-		stream->distance == 1 &&
+		stream->readahead_distance == 1 &&
+		stream->combine_distance == 1 &&
 		stream->pending_read_nblocks == 0 &&
 		stream->per_buffer_data_size == 0)
 	{
@@ -1143,8 +1286,9 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 BlockNumber
 read_stream_pause(ReadStream *stream)
 {
-	stream->resume_distance = stream->distance;
-	stream->distance = 0;
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
+	stream->readahead_distance = 0;
 	return InvalidBlockNumber;
 }
 
@@ -1156,7 +1300,8 @@ read_stream_pause(ReadStream *stream)
 void
 read_stream_resume(ReadStream *stream)
 {
-	stream->distance = stream->resume_distance;
+	stream->readahead_distance = stream->resume_readahead_distance;
+	stream->combine_distance = stream->resume_combine_distance;
 }
 
 /*
@@ -1172,7 +1317,8 @@ read_stream_reset(ReadStream *stream)
 	Buffer		buffer;
 
 	/* Stop looking ahead. */
-	stream->distance = -1;
+	stream->readahead_distance = -1;
+	stream->combine_distance = -1;
 
 	/* Forget buffered block number and fast path state. */
 	stream->buffered_blocknum = InvalidBlockNumber;
@@ -1204,8 +1350,10 @@ read_stream_reset(ReadStream *stream)
 	Assert(stream->ios_in_progress == 0);
 
 	/* Start off assuming data is cached. */
-	stream->distance = 1;
-	stream->resume_distance = stream->distance;
+	stream->readahead_distance = 1;
+	stream->combine_distance = 1;
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
 	stream->distance_decay_holdoff = 0;
 }
 
-- 
2.53.0.1.gb2826b52eb

