From 30e59a9eee0ac1b10c8016ad3a4e524202f0ce5e Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Wed, 10 Jan 2024 02:20:07 +1300
Subject: [PATCH v5 06/10] Allow streaming reads to ramp up in size.

Instead of assuming that the consumer of buffers will consume all blocks
until the end of the stream, start by looking ahead just one block, and
doubling each time until we reach the full possible distance.  This can
be disabled by callers that already know they will read all available
blocks, for example pg_prewarm.
---
 contrib/pg_prewarm/pg_prewarm.c          |  2 +-
 src/backend/storage/aio/streaming_read.c | 39 +++++++++++++++++++++++-
 src/include/storage/streaming_read.h     |  7 +++++
 3 files changed, 46 insertions(+), 2 deletions(-)

diff --git a/contrib/pg_prewarm/pg_prewarm.c b/contrib/pg_prewarm/pg_prewarm.c
index 9617bf130b..1cc84bcb0c 100644
--- a/contrib/pg_prewarm/pg_prewarm.c
+++ b/contrib/pg_prewarm/pg_prewarm.c
@@ -214,7 +214,7 @@ pg_prewarm(PG_FUNCTION_ARGS)
 		p.blocknum = first_block;
 		p.last_block = last_block;
 
-		pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT,
+		pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_FULL,
 											  &p,
 											  0,
 											  NULL,
diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c
index 19605090fe..bb5b863be7 100644
--- a/src/backend/storage/aio/streaming_read.c
+++ b/src/backend/storage/aio/streaming_read.c
@@ -33,6 +33,8 @@ struct PgStreamingRead
 	int			pinned_buffers;
 	int			pinned_buffers_trigger;
 	int			next_tail_buffer;
+	int			ramp_up_pin_limit;
+	int			ramp_up_pin_stall;
 	bool		finished;
 	void	   *pgsr_private;
 	PgStreamingReadBufferCB callback;
@@ -136,6 +138,16 @@ pg_streaming_read_buffer_alloc_internal(int flags,
 		pgsr->advice_enabled = true;
 #endif
 
+	/*
+	 * We start off building small ranges, but double that quickly, for the
+	 * benefit of users that don't know how far ahead they'll read.  This can
+	 * be disabled by users that already know they'll read all the way.
+	 */
+	if (flags & PGSR_FLAG_FULL)
+		pgsr->ramp_up_pin_limit = INT_MAX;
+	else
+		pgsr->ramp_up_pin_limit = 1;
+
 	/*
 	 * We want to avoid creating ranges that are smaller than they could be
 	 * just because we hit max_pinned_buffers.  We only look ahead when the
@@ -245,6 +257,16 @@ pg_streaming_read_new_range(PgStreamingRead *pgsr)
 static void
 pg_streaming_read_look_ahead(PgStreamingRead *pgsr)
 {
+	/*
+	 * If we're still ramping up, we may have to stall to wait for buffers to
+	 * be consumed first before we do any more prefetching.
+	 */
+	if (pgsr->ramp_up_pin_stall > 0)
+	{
+		Assert(pgsr->pinned_buffers > 0);
+		return;
+	}
+
 	/*
 	 * If we're finished or can't start more I/O, then don't look ahead.
 	 */
@@ -343,7 +365,19 @@ pg_streaming_read_look_ahead(PgStreamingRead *pgsr)
 			pgsr->per_buffer_data_next = 0;
 
 	} while (pgsr->pinned_buffers < pgsr->max_pinned_buffers &&
-			 pgsr->ios_in_progress < pgsr->max_ios);
+			 pgsr->ios_in_progress < pgsr->max_ios &&
+			 pgsr->pinned_buffers < pgsr->ramp_up_pin_limit);
+
+	/* If we've hit the ramp-up limit, insert a stall. */
+	if (pgsr->pinned_buffers >= pgsr->ramp_up_pin_limit)
+	{
+		/* Can't get here if an earlier stall hasn't finished. */
+		Assert(pgsr->ramp_up_pin_stall == 0);
+		/* Don't do any more prefetching until these buffers are consumed. */
+		pgsr->ramp_up_pin_stall = pgsr->ramp_up_pin_limit;
+		/* Double it.  It will soon be out of the way. */
+		pgsr->ramp_up_pin_limit *= 2;
+	}
 
 	if (pgsr->ranges[pgsr->head].nblocks > 0)
 		pg_streaming_read_new_range(pgsr);
@@ -400,6 +434,9 @@ pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_data)
 			Assert(pgsr->pinned_buffers > 0);
 			pgsr->pinned_buffers--;
 
+			if (pgsr->ramp_up_pin_stall > 0)
+				pgsr->ramp_up_pin_stall--;
+
 			if (per_buffer_data)
 				*per_buffer_data = (char *) pgsr->per_buffer_data +
 					tail_range->per_buffer_data_index[buffer_index] *
diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h
index 40c3408c54..c4d3892bb2 100644
--- a/src/include/storage/streaming_read.h
+++ b/src/include/storage/streaming_read.h
@@ -22,6 +22,13 @@
  */
 #define PGSR_FLAG_SEQUENTIAL 0x02
 
+/*
+ * We usually ramp up from smaller reads to larger ones, to support users who
+ * don't know if it's worth reading lots of buffers yet.  This flag disables
+ * that, declaring ahead of time that we'll be reading all available buffers.
+ */
+#define PGSR_FLAG_FULL 0x04
+
 struct PgStreamingRead;
 typedef struct PgStreamingRead PgStreamingRead;
 
-- 
2.39.2

