From 2dd3d12fa75248bf8473b4b69884dd056bdc9163 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 18 Feb 2025 15:59:13 +1300
Subject: [PATCH v3 3/6] Improve read stream advice for large random chunks.

read_stream.c tries not to issue advice when it thinks the kernel's
readahead should be active, ie when using buffered I/O and reading
sequential blocks.  It previously gave up a little too easily: it should
issue advice until it has started running sequential pread() calls, not
just when it's planning to.  The simpler strategy worked for random
chunks of size <= io_combine_limit and entirely sequential streams, but
so not well when reading random chunks > io_combine limit.  For example,
a 256kB chunk of sequential data would benefit from only one fadvise(),
but (assuming io_combine_limit=128kB) could suffer an I/O stall for the
second half of it.

Keep issuing advice until the pread() calls catch up with the start of
the region we're currently issuing advice for, if ever.  In practice, if
there are any jumps in the lookahead window, we'll never stop issuing
advice, and if the whole lookahead window becomes sequential we'll
finally stop issuing advice.

Discovered by Tomas Vondra's regression testing of many data clustering
patterns using Melanie Plageman's streaming Bitmap Heap Scan patch, with
analysis of the I/O stall-producing pattern from Andres Freund.

Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
Discussion: https://postgr.es/m/CA%2BhUKGJ3HSWciQCz8ekP1Zn7N213RfA4nbuotQawfpq23%2Bw-5Q%40mail.gmail.com
---
 src/backend/storage/aio/read_stream.c | 71 +++++++++++++++++++--------
 1 file changed, 50 insertions(+), 21 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 11ee16ec228..a8a96baf8c1 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -133,6 +133,7 @@ struct ReadStream
 
 	/* Next expected block, for detecting sequential access. */
 	BlockNumber seq_blocknum;
+	BlockNumber seq_until_processed;
 
 	/* The read operation we are currently preparing. */
 	BlockNumber pending_read_blocknum;
@@ -237,11 +238,11 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
  * distance to a level that prevents look-ahead until buffers are released.
  */
 static bool
-read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
+read_stream_start_pending_read(ReadStream *stream)
 {
 	bool		need_wait;
 	int			nblocks;
-	int			flags;
+	int			flags = 0;
 	int16		io_index;
 	int16		overflow;
 	int16		buffer_index;
@@ -261,16 +262,36 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	else
 		Assert(stream->next_buffer_index == stream->oldest_buffer_index);
 
-	/*
-	 * If advice hasn't been suppressed, this system supports it, and this
-	 * isn't a strictly sequential pattern, then we'll issue advice.
-	 */
-	if (!suppress_advice &&
-		stream->advice_enabled &&
-		stream->pending_read_blocknum != stream->seq_blocknum)
-		flags = READ_BUFFERS_ISSUE_ADVICE;
-	else
-		flags = 0;
+	/* Do we need to issue read-ahead advice? */
+	if (stream->advice_enabled)
+	{
+		bool		no_wait;
+
+		/*
+		 * We only issue advice if we won't immediately have to call
+		 * WaitReadBuffers().
+		 */
+		no_wait = stream->pinned_buffers > 0 ||
+			stream->pending_read_nblocks < stream->distance;
+
+		if (stream->pending_read_blocknum == stream->seq_blocknum)
+		{
+			/*
+			 * Sequential: issue advice only until the WaitReadBuffers() calls
+			 * catch up with the first advice issued for this sequential
+			 * region, so the kernel can see sequential access.
+			 */
+			if (stream->seq_until_processed != InvalidBlockNumber && no_wait)
+				flags = READ_BUFFERS_ISSUE_ADVICE;
+		}
+		else
+		{
+			/* Random jump: start tracking new region. */
+			stream->seq_until_processed = stream->pending_read_blocknum;
+			if (no_wait)
+				flags = READ_BUFFERS_ISSUE_ADVICE;
+		}
+	}
 
 	/* Compute the remaining portion of the per-backend buffer limit. */
 	if (stream->temporary)
@@ -359,7 +380,7 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 }
 
 static void
-read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
+read_stream_look_ahead(ReadStream *stream)
 {
 	while (stream->ios_in_progress < stream->max_ios &&
 		   stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
@@ -370,8 +391,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 
 		if (stream->pending_read_nblocks == stream->io_combine_limit)
 		{
-			read_stream_start_pending_read(stream, suppress_advice);
-			suppress_advice = false;
+			read_stream_start_pending_read(stream);
 			continue;
 		}
 
@@ -404,15 +424,13 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 		/* We have to start the pending read before we can build another. */
 		while (stream->pending_read_nblocks > 0)
 		{
-			if (!read_stream_start_pending_read(stream, suppress_advice) ||
+			if (!read_stream_start_pending_read(stream) ||
 				stream->ios_in_progress == stream->max_ios)
 			{
 				/* And we've hit a buffer or I/O limit.  Rewind and wait. */
 				read_stream_unget_block(stream, blocknum);
 				return;
 			}
-
-			suppress_advice = false;
 		}
 
 		/* This is the start of a new pending read. */
@@ -436,7 +454,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 		  stream->pinned_buffers == 0) ||
 		 stream->distance == 0) &&
 		stream->ios_in_progress < stream->max_ios)
-		read_stream_start_pending_read(stream, suppress_advice);
+		read_stream_start_pending_read(stream);
 
 	/*
 	 * There should always be something pinned when we leave this function,
@@ -612,6 +630,8 @@ read_stream_begin_impl(int flags,
 	stream->callback = callback;
 	stream->callback_private_data = callback_private_data;
 	stream->buffered_blocknum = InvalidBlockNumber;
+	stream->seq_blocknum = InvalidBlockNumber;
+	stream->seq_until_processed = InvalidBlockNumber;
 	stream->temporary = SmgrIsTemp(smgr);
 
 	/*
@@ -792,7 +812,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		 * space for more, but if we're just starting up we'll need to crank
 		 * the handle to get started.
 		 */
-		read_stream_look_ahead(stream, true);
+		read_stream_look_ahead(stream);
 
 		/* End of stream reached? */
 		if (stream->pinned_buffers == 0)
@@ -837,6 +857,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			distance = stream->distance * 2;
 			distance = Min(distance, stream->max_pinned_buffers);
 			stream->distance = distance;
+
+			/*
+			 * If we've caught up with the first advice issued for the current
+			 * sequential region, cancel further advice until the next random
+			 * jump.  The kernel should be able to see the pattern now that
+			 * we're actually making sequential preadv() calls.
+			 */
+			if (stream->ios[io_index].op.blocknum == stream->seq_until_processed)
+				stream->seq_until_processed = InvalidBlockNumber;
 		}
 		else
 		{
@@ -898,7 +927,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		stream->oldest_buffer_index = 0;
 
 	/* Prepare for the next call. */
-	read_stream_look_ahead(stream, false);
+	read_stream_look_ahead(stream);
 
 #ifndef READ_STREAM_DISABLE_FAST_PATH
 	/* See if we can take the fast path for all-cached scans next time. */
-- 
2.39.5

