From 8027a52d95b5865700b879581c8c4848fcf2a501 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Wed, 10 Jan 2024 00:36:14 +1300
Subject: [PATCH v5 05/10] Issue advice for random streaming reads.

For random access to blocks that are not yet in our buffer pool, issue
POSIX_FADV_WILLNEED advice as soon as we have formed the largest range
we can.  (This is the point at which we'd start a real asynchronous
read, given more infrastructure.)

The total number of assumed-to-be-running I/Os is tracked and kept at or
below the requested number.

Even though we try to avoid generating purely sequential hints, based on
evidence that that is slower than just letting the kernel (at least on
Linux) detect sequential access, at least one future user needs a way to
disable advice, so provide a flag for that.
---
 src/backend/storage/aio/streaming_read.c | 100 ++++++++++++++++++++---
 src/include/storage/streaming_read.h     |   8 ++
 2 files changed, 97 insertions(+), 11 deletions(-)

diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c
index 3279956754..19605090fe 100644
--- a/src/backend/storage/aio/streaming_read.c
+++ b/src/backend/storage/aio/streaming_read.c
@@ -16,6 +16,7 @@
  */
 typedef struct PgStreamingReadRange
 {
+	bool		advice_issued;
 	bool		need_complete;
 	BlockNumber blocknum;
 	int			nblocks;
@@ -25,6 +26,9 @@ typedef struct PgStreamingReadRange
 
 struct PgStreamingRead
 {
+	int			max_ios;
+	int			ios_in_progress;
+	int			ios_in_progress_trigger;
 	int			max_pinned_buffers;
 	int			pinned_buffers;
 	int			pinned_buffers_trigger;
@@ -36,6 +40,11 @@ struct PgStreamingRead
 	BufferManagerRelation bmr;
 	ForkNumber	forknum;
 
+	bool		advice_enabled;
+
+	/* Next expected block, for detecting sequential access. */
+	BlockNumber seq_blocknum;
+
 	/* Space for optional per-buffer private data. */
 	size_t		per_buffer_data_size;
 	void	   *per_buffer_data;
@@ -59,9 +68,12 @@ pg_streaming_read_buffer_alloc_internal(int flags,
 	int			max_ios;
 	uint32		max_pinned_buffers;
 
+
 	/*
-	 * For now, max_ios is a nominal value because we don't generate I/O
-	 * concurrency yet.  Later this will serve more purpose.
+	 * Decide how many assumed I/Os we will allow to run concurrently.  That
+	 * is, advice to the kernel to tell it that we will soon read.  This
+	 * number also affects how far we look ahead for opportunities to start
+	 * more I/Os.
 	 */
 	if (flags & PGSR_FLAG_MAINTENANCE)
 		max_ios = maintenance_io_concurrency;
@@ -76,6 +88,12 @@ pg_streaming_read_buffer_alloc_internal(int flags,
 	 */
 	max_pinned_buffers = Max(max_ios * 4, MAX_BUFFERS_PER_TRANSFER);
 
+	/*
+	 * The *_io_concurrency GUCs, we might have 0.  We want to allow at least
+	 * one, to keep our gating logic simple.
+	 */
+	max_ios = Max(max_ios, 1);
+
 	/*
 	 * Don't allow this backend to pin too many buffers.  For now we'll apply
 	 * the limit for the shared buffer pool and the local buffer pool, without
@@ -99,12 +117,25 @@ pg_streaming_read_buffer_alloc_internal(int flags,
 		palloc0(offsetof(PgStreamingRead, ranges) +
 				sizeof(pgsr->ranges[0]) * size);
 
+	pgsr->max_ios = max_ios;
 	pgsr->per_buffer_data_size = per_buffer_data_size;
 	pgsr->max_pinned_buffers = max_pinned_buffers;
 	pgsr->pgsr_private = pgsr_private;
 	pgsr->strategy = strategy;
 	pgsr->size = size;
 
+#ifdef USE_PREFETCH
+
+	/*
+	 * This system supports prefetching advice.  As long as direct I/O isn't
+	 * enabled, and the caller hasn't promised sequential access, we can use
+	 * it.
+	 */
+	if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
+		(flags & PGSR_FLAG_SEQUENTIAL) == 0)
+		pgsr->advice_enabled = true;
+#endif
+
 	/*
 	 * We want to avoid creating ranges that are smaller than they could be
 	 * just because we hit max_pinned_buffers.  We only look ahead when the
@@ -154,9 +185,9 @@ pg_streaming_read_buffer_alloc(int flags,
  * Start building a new range.  This is called after the previous one
  * reached maximum size, or the callback's next block can't be merged with it.
  *
- * Since the previous head range has now reached its full potential size,
- * this is also the place where we would issue advice or start an asynchronous
- * I/O, in future development.
+ * Since the previous head range has now reached its full potential size, this
+ * is also a good time to issue 'prefetch' advice, because we know that'll
+ * soon be reading.  In future, we could start an actual I/O here.
  */
 static PgStreamingReadRange *
 pg_streaming_read_new_range(PgStreamingRead *pgsr)
@@ -166,6 +197,40 @@ pg_streaming_read_new_range(PgStreamingRead *pgsr)
 	head_range = &pgsr->ranges[pgsr->head];
 	Assert(head_range->nblocks > 0);
 
+	/*
+	 * If a call to CompleteReadBuffers() will be needed, and we can issue
+	 * advice to the kernel to get the read started.  We suppress it if the
+	 * access pattern appears to be completely sequential, though, because on
+	 * some systems that interfers with the kernel's own sequential read ahead
+	 * heurstics and hurts performance.
+	 */
+	if (pgsr->advice_enabled)
+	{
+		BlockNumber blocknum = head_range->blocknum;
+		int			nblocks = head_range->nblocks;
+
+		if (head_range->need_complete && blocknum != pgsr->seq_blocknum)
+		{
+			SMgrRelation smgr =
+				pgsr->bmr.smgr ? pgsr->bmr.smgr :
+				RelationGetSmgr(pgsr->bmr.rel);
+
+			Assert(!head_range->advice_issued);
+
+			smgrprefetch(smgr, pgsr->forknum, blocknum, nblocks);
+
+			/*
+			 * Count this as an I/O that is concurrently in progress, though
+			 * we don't really know if the kernel generates a physical I/O.
+			 */
+			head_range->advice_issued = true;
+			pgsr->ios_in_progress++;
+		}
+
+		/* Remember the block after this range, for sequence detection. */
+		pgsr->seq_blocknum = blocknum + nblocks;
+	}
+
 	/* Create a new head range.  There must be space. */
 	Assert(pgsr->size > pgsr->max_pinned_buffers);
 	Assert((pgsr->head + 1) % pgsr->size != pgsr->tail);
@@ -180,8 +245,10 @@ pg_streaming_read_new_range(PgStreamingRead *pgsr)
 static void
 pg_streaming_read_look_ahead(PgStreamingRead *pgsr)
 {
-	/* If we're finished, then we can't look ahead. */
-	if (pgsr->finished)
+	/*
+	 * If we're finished or can't start more I/O, then don't look ahead.
+	 */
+	if (pgsr->finished || pgsr->ios_in_progress == pgsr->max_ios)
 		return;
 
 	/*
@@ -210,10 +277,11 @@ pg_streaming_read_look_ahead(PgStreamingRead *pgsr)
 			head_range = pg_streaming_read_new_range(pgsr);
 
 			/*
-			 * Give up now if we wouldn't be able form another full range
-			 * after this due to the pin limit.
+			 * Give up now if I/O is saturated, or we wouldn't be able form
+			 * another full range after this due to the pin limit.
 			 */
-			if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger)
+			if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger ||
+				pgsr->ios_in_progress == pgsr->max_ios)
 				break;
 		}
 
@@ -258,6 +326,7 @@ pg_streaming_read_look_ahead(PgStreamingRead *pgsr)
 			/* Initialize a new range beginning at this block. */
 			head_range->blocknum = blocknum;
 			head_range->need_complete = need_complete;
+			head_range->advice_issued = false;
 		}
 		else
 		{
@@ -273,7 +342,8 @@ pg_streaming_read_look_ahead(PgStreamingRead *pgsr)
 		if (pgsr->per_buffer_data_next == pgsr->max_pinned_buffers)
 			pgsr->per_buffer_data_next = 0;
 
-	} while (pgsr->pinned_buffers < pgsr->max_pinned_buffers);
+	} while (pgsr->pinned_buffers < pgsr->max_pinned_buffers &&
+			 pgsr->ios_in_progress < pgsr->max_ios);
 
 	if (pgsr->ranges[pgsr->head].nblocks > 0)
 		pg_streaming_read_new_range(pgsr);
@@ -305,6 +375,14 @@ pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_data)
 								false,
 								pgsr->strategy);
 			tail_range->need_complete = false;
+
+			/*
+			 * We don't really know if the kernel generated an physical I/O
+			 * when we issued advice, let alone when it finished, but it has
+			 * certainly finished after a read call returns.
+			 */
+			if (tail_range->advice_issued)
+				pgsr->ios_in_progress--;
 		}
 
 		/* Are there more buffers available in this range? */
diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h
index a445ea5c45..40c3408c54 100644
--- a/src/include/storage/streaming_read.h
+++ b/src/include/storage/streaming_read.h
@@ -14,6 +14,14 @@
  */
 #define PGSR_FLAG_MAINTENANCE 0x01
 
+/*
+ * We usually avoid issuing prefetch advice automatically when sequential
+ * access is detected, but this flag explicitly disables it, for cases that
+ * might not be correctly detected.  Explicit advice is known to perform worse
+ * than letting the kernel (at least Linux) detect sequential access.
+ */
+#define PGSR_FLAG_SEQUENTIAL 0x02
+
 struct PgStreamingRead;
 typedef struct PgStreamingRead PgStreamingRead;
 
-- 
2.39.2

