From b0bb4b478b27c2a38bf819ee927be9167e551d28 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sat, 31 Aug 2024 21:39:30 -0400
Subject: [PATCH v2.3 22/30] aio: Very-WIP: read_stream.c adjustments for real
 AIO

Things that need to be fixed / are fixed in this:
- max pinned buffers should be limited by io_combine_limit, not * 4
- overflow distance
- pins need to be limited in more places
---
 src/include/storage/bufmgr.h          |  2 ++
 src/backend/storage/aio/read_stream.c | 31 +++++++++++++++++++++------
 src/backend/storage/buffer/bufmgr.c   |  3 ++-
 3 files changed, 28 insertions(+), 8 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 46ee957e99c..f205643c4ef 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -119,6 +119,8 @@ typedef struct BufferManagerRelation
 #define READ_BUFFERS_ISSUE_ADVICE (1 << 1)
 /* IO will immediately be waited for */
 #define READ_BUFFERS_SYNCHRONOUSLY (1 << 2)
+/* caller will issue more io, don't submit */
+#define READ_BUFFERS_MORE_MORE_MORE (1 << 3)
 
 
 struct ReadBuffersOperation
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index e4414b2e915..c2211cab02a 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -90,6 +90,7 @@
 #include "postgres.h"
 
 #include "miscadmin.h"
+#include "storage/aio.h"
 #include "storage/fd.h"
 #include "storage/smgr.h"
 #include "storage/read_stream.h"
@@ -240,14 +241,18 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	/*
 	 * If advice hasn't been suppressed, this system supports it, and this
 	 * isn't a strictly sequential pattern, then we'll issue advice.
+	 *
+	 * XXX: Used to also check stream->pending_read_blocknum !=
+	 * stream->seq_blocknum
 	 */
 	if (!suppress_advice &&
-		stream->advice_enabled &&
-		stream->pending_read_blocknum != stream->seq_blocknum)
+		stream->advice_enabled)
 		flags = READ_BUFFERS_ISSUE_ADVICE;
 	else
 		flags = 0;
 
+	flags |= READ_BUFFERS_MORE_MORE_MORE;
+
 	/* We say how many blocks we want to read, but may be smaller on return. */
 	buffer_index = stream->next_buffer_index;
 	io_index = stream->next_io_index;
@@ -306,6 +311,14 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 static void
 read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 {
+	if (stream->distance > (io_combine_limit * 8))
+	{
+		if (stream->pinned_buffers + stream->pending_read_nblocks > ((stream->distance * 3) / 4))
+		{
+			return;
+		}
+	}
+
 	while (stream->ios_in_progress < stream->max_ios &&
 		   stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
 	{
@@ -355,6 +368,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 			{
 				/* And we've hit the limit.  Rewind, and stop here. */
 				read_stream_unget_block(stream, blocknum);
+				pgaio_submit_staged();
 				return;
 			}
 		}
@@ -379,6 +393,8 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 		 stream->distance == 0) &&
 		stream->ios_in_progress < stream->max_ios)
 		read_stream_start_pending_read(stream, suppress_advice);
+
+	pgaio_submit_staged();
 }
 
 /*
@@ -442,7 +458,7 @@ read_stream_begin_impl(int flags,
 	 * overflow (even though that's not possible with the current GUC range
 	 * limits), allowing also for the spare entry and the overflow space.
 	 */
-	max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
+	max_pinned_buffers = Max(max_ios * io_combine_limit, io_combine_limit);
 	max_pinned_buffers = Min(max_pinned_buffers,
 							 PG_INT16_MAX - io_combine_limit - 1);
 
@@ -493,10 +509,11 @@ read_stream_begin_impl(int flags,
 	 * direct I/O isn't enabled, the caller hasn't promised sequential access
 	 * (overriding our detection heuristics), and max_ios hasn't been set to
 	 * zero.
+	 *
+	 * FIXME: Used to also check (io_direct_flags & IO_DIRECT_DATA) == 0 &&
+	 * (flags & READ_STREAM_SEQUENTIAL) == 0
 	 */
-	if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
-		(flags & READ_STREAM_SEQUENTIAL) == 0 &&
-		max_ios > 0)
+	if (max_ios > 0)
 		stream->advice_enabled = true;
 #endif
 
@@ -727,7 +744,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		if (++stream->oldest_io_index == stream->max_ios)
 			stream->oldest_io_index = 0;
 
-		if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
+		if (stream->ios[io_index].op.flags & (READ_BUFFERS_ISSUE_ADVICE | READ_BUFFERS_MORE_MORE_MORE))
 		{
 			/* Distance ramps up fast (behavior C). */
 			distance = stream->distance * 2;
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 70f1da84083..118a6e1ca31 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1752,7 +1752,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation,
 
 	if (did_start_io_overall)
 	{
-		pgaio_submit_staged();
+		if (!(flags & READ_BUFFERS_MORE_MORE_MORE))
+			pgaio_submit_staged();
 		return true;
 	}
 	else
-- 
2.48.1.76.g4e746b1a31.dirty

