From 3b51bfa51eac42157c8177437fb6993ed349c0f3 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sat, 31 Aug 2024 21:39:30 -0400
Subject: [PATCH v2.1 16/20] aio: Very-WIP: read_stream.c adjustments for real
 AIO

---
 src/include/storage/bufmgr.h          |  2 ++
 src/backend/storage/aio/read_stream.c | 29 +++++++++++++++++++++------
 src/backend/storage/buffer/bufmgr.c   |  3 ++-
 3 files changed, 27 insertions(+), 7 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index a075a40b2ed..ac6496bb1eb 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -117,6 +117,8 @@ typedef struct BufferManagerRelation
 #define READ_BUFFERS_ZERO_ON_ERROR (1 << 0)
 /* Call smgrprefetch() if I/O necessary. */
 #define READ_BUFFERS_ISSUE_ADVICE (1 << 1)
+/* caller will issue more io, don't submit */
+#define READ_BUFFERS_MORE_MORE_MORE (1 << 2)
 
 /*
  * FIXME: PgAioReturn is defined in aio.h. It'd be much better if we didn't
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 7f0e07d9586..7ff2d6a2071 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -91,6 +91,7 @@
 
 #include "catalog/pg_tablespace.h"
 #include "miscadmin.h"
+#include "storage/aio.h"
 #include "storage/fd.h"
 #include "storage/smgr.h"
 #include "storage/read_stream.h"
@@ -241,14 +242,18 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	/*
 	 * If advice hasn't been suppressed, this system supports it, and this
 	 * isn't a strictly sequential pattern, then we'll issue advice.
+	 *
+	 * XXX: Used to also check stream->pending_read_blocknum !=
+	 * stream->seq_blocknum
 	 */
 	if (!suppress_advice &&
-		stream->advice_enabled &&
-		stream->pending_read_blocknum != stream->seq_blocknum)
+		stream->advice_enabled)
 		flags = READ_BUFFERS_ISSUE_ADVICE;
 	else
 		flags = 0;
 
+	flags |= READ_BUFFERS_MORE_MORE_MORE;
+
 	/* We say how many blocks we want to read, but may be smaller on return. */
 	buffer_index = stream->next_buffer_index;
 	io_index = stream->next_io_index;
@@ -307,6 +312,14 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 static void
 read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 {
+	if (stream->distance > (io_combine_limit * 8))
+	{
+		if (stream->pinned_buffers + stream->pending_read_nblocks > ((stream->distance * 3) / 4))
+		{
+			return;
+		}
+	}
+
 	while (stream->ios_in_progress < stream->max_ios &&
 		   stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
 	{
@@ -356,6 +369,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 			{
 				/* And we've hit the limit.  Rewind, and stop here. */
 				read_stream_unget_block(stream, blocknum);
+				pgaio_submit_staged();
 				return;
 			}
 		}
@@ -380,6 +394,8 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 		 stream->distance == 0) &&
 		stream->ios_in_progress < stream->max_ios)
 		read_stream_start_pending_read(stream, suppress_advice);
+
+	pgaio_submit_staged();
 }
 
 /*
@@ -494,10 +510,11 @@ read_stream_begin_impl(int flags,
 	 * direct I/O isn't enabled, the caller hasn't promised sequential access
 	 * (overriding our detection heuristics), and max_ios hasn't been set to
 	 * zero.
+	 *
+	 * FIXME: Used to also check (io_direct_flags & IO_DIRECT_DATA) == 0 &&
+	 * (flags & READ_STREAM_SEQUENTIAL) == 0
 	 */
-	if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
-		(flags & READ_STREAM_SEQUENTIAL) == 0 &&
-		max_ios > 0)
+	if (max_ios > 0)
 		stream->advice_enabled = true;
 #endif
 
@@ -728,7 +745,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		if (++stream->oldest_io_index == stream->max_ios)
 			stream->oldest_io_index = 0;
 
-		if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
+		if (stream->ios[io_index].op.flags & (READ_BUFFERS_ISSUE_ADVICE | READ_BUFFERS_MORE_MORE_MORE))
 		{
 			/* Distance ramps up fast (behavior C). */
 			distance = stream->distance * 2;
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 4914c71d41e..ed384fa1a44 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1638,7 +1638,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation,
 
 	if (did_start_io_overall)
 	{
-		pgaio_submit_staged();
+		if (!(flags & READ_BUFFERS_MORE_MORE_MORE))
+			pgaio_submit_staged();
 		return true;
 	}
 	else
-- 
2.45.2.827.g557ae147e6

