From 89ac3d5aae4f36c3c536dcd6b198e23b9f9e2024 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Fri, 28 Jul 2023 22:35:32 +1200
Subject: [PATCH v1 06/14] Provide basic streaming read API.

"Streaming reads" can be used as a more efficient replacement for
ReadBuffer() calls.

The client code supplies a callback that can say which block to read
next, and then consumes individual buffers one at a time.  This division
allows the PgStreamingRead object to build larger calls to
CompleteReadBuffers(), which in turn produce large preadv() calls
instead of traditional single-block pread() calls.  Unless the read is
purely sequential, posix_fadvise() calls are also issued.

This API is intended as a stepping stone, allowing for true asynchronous
implementation in later work.  Code that adapts to the streaming read
API would automatically benefit.

Author: Thomas Munro <thomas.munro@gmail.com>
---
 src/backend/storage/Makefile             |   2 +-
 src/backend/storage/aio/Makefile         |  14 +
 src/backend/storage/aio/meson.build      |   5 +
 src/backend/storage/aio/streaming_read.c | 436 +++++++++++++++++++++++
 src/backend/storage/buffer/bufmgr.c      |   2 +-
 src/backend/storage/meson.build          |   1 +
 src/include/storage/bufmgr.h             |   1 +
 src/include/storage/streaming_read.h     |  38 ++
 src/tools/pgindent/typedefs.list         |   2 +
 9 files changed, 499 insertions(+), 2 deletions(-)
 create mode 100644 src/backend/storage/aio/Makefile
 create mode 100644 src/backend/storage/aio/meson.build
 create mode 100644 src/backend/storage/aio/streaming_read.c
 create mode 100644 src/include/storage/streaming_read.h

diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile
index 8376cdfca2..eec03f6f2b 100644
--- a/src/backend/storage/Makefile
+++ b/src/backend/storage/Makefile
@@ -8,6 +8,6 @@ subdir = src/backend/storage
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS     = buffer file freespace ipc large_object lmgr page smgr sync
+SUBDIRS     = aio buffer file freespace ipc large_object lmgr page smgr sync
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile
new file mode 100644
index 0000000000..bcab44c802
--- /dev/null
+++ b/src/backend/storage/aio/Makefile
@@ -0,0 +1,14 @@
+#
+# Makefile for storage/aio
+#
+# src/backend/storage/aio/Makefile
+#
+
+subdir = src/backend/storage/aio
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+	streaming_read.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build
new file mode 100644
index 0000000000..156e87cab7
--- /dev/null
+++ b/src/backend/storage/aio/meson.build
@@ -0,0 +1,5 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+backend_sources += files(
+  'streaming_read.c',
+)
diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c
new file mode 100644
index 0000000000..ee5a7e961d
--- /dev/null
+++ b/src/backend/storage/aio/streaming_read.c
@@ -0,0 +1,436 @@
+#include "postgres.h"
+
+#include "storage/streaming_read.h"
+#include "utils/rel.h"
+
+/*
+ * Element type for PgStreamingRead's circular array of clusters of buffers.
+ *
+ * For hits and RBM_WILL_ZERO, need_to_complete is false, we have just one
+ * buffer in each cluster, already pinned and ready for use.
+ *
+ * For misses that require a physical read, need_to_complete is true, and
+ * buffers[] holds a group of of neighboring blocks, so we can complete them
+ * with a single call to CompleteReadBuffers().  We can also issue a single
+ * prefetch for it as soon as it has grown to its largest possible size, if
+ * our random access heuristics determine that is a good idea.
+ */
+typedef struct PgStreamingReadCluster
+{
+	bool		advice_issued;
+	bool		need_complete;
+
+	BufferManagerRelation bmr;
+	ForkNumber	forknum;
+	BlockNumber blocknum;
+	int			nblocks;
+
+	int			per_io_data_index[MAX_BUFFERS_PER_TRANSFER];
+	bool		need_advice[MAX_BUFFERS_PER_TRANSFER];
+	Buffer		buffers[MAX_BUFFERS_PER_TRANSFER];
+} PgStreamingReadCluster;
+
+struct PgStreamingRead
+{
+	int			max_ios;
+	int			ios_in_progress;
+	int			ios_in_progress_trigger;
+	int			max_pinned_buffers;
+	int			pinned_buffers;
+	int			pinned_buffers_trigger;
+	int			next_tail_buffer;
+	bool		finished;
+	uintptr_t	pgsr_private;
+	PgStreamingReadBufferDetermineNextCB next_cb;
+	BufferAccessStrategy strategy;
+
+	/* Next expected prefetch, for sequential prefetch avoidance. */
+	BufferManagerRelation seq_bmr;
+	ForkNumber	seq_forknum;
+	BlockNumber seq_blocknum;
+
+	/* Space for optional per-I/O private data. */
+	size_t		per_io_data_size;
+	void	   *per_io_data;
+	int			per_io_data_next;
+
+	/* Circular buffer of clusters. */
+	int			size;
+	int			head;
+	int			tail;
+	PgStreamingReadCluster clusters[FLEXIBLE_ARRAY_MEMBER];
+};
+
+PgStreamingRead *
+pg_streaming_read_buffer_alloc(int max_ios,
+							   size_t per_io_data_size,
+							   uintptr_t pgsr_private,
+							   BufferAccessStrategy strategy,
+							   PgStreamingReadBufferDetermineNextCB determine_next_cb)
+{
+	PgStreamingRead *pgsr;
+	int			size;
+	int			max_pinned_buffers;
+
+	Assert(max_ios > 0);
+
+	/*
+	 * We allow twice as many buffers to be pinned as I/Os.  This allows us to
+	 * look further ahead for blocks that need to be read in.
+	 */
+	max_pinned_buffers = max_ios * 2;
+
+	/* Don't allow this backend to pin too many buffers. */
+	LimitAdditionalPins((uint32 *) &max_pinned_buffers);
+	max_pinned_buffers = Max(2, max_pinned_buffers);
+	max_ios = max_pinned_buffers / 2;
+	Assert(max_ios > 0);
+	Assert(max_pinned_buffers > 0);
+	Assert(max_pinned_buffers > max_ios);
+
+	/*
+	 * pgsr->clusters is a circular buffer.  When it is empty, head == tail.
+	 * When it is full, there is an empty element between head and tail.  Head
+	 * can also be empty (nblocks == 0).  So we need two extra elements.
+	 */
+	size = max_pinned_buffers + 2;
+
+	pgsr = (PgStreamingRead *)
+		palloc0(offsetof(PgStreamingRead, clusters) +
+				sizeof(pgsr->clusters[0]) * size);
+
+	pgsr->per_io_data_size = per_io_data_size;
+	pgsr->max_ios = max_ios;
+	pgsr->max_pinned_buffers = max_pinned_buffers;
+	pgsr->pgsr_private = pgsr_private;
+	pgsr->strategy = strategy;
+	pgsr->next_cb = determine_next_cb;
+	pgsr->size = size;
+
+	/*
+	 * We look ahead when the number of pinned buffers falls below this
+	 * number.  This encourages the formation of large vectored reads.
+	 */
+	pgsr->pinned_buffers_trigger =
+		Max(max_ios, max_pinned_buffers - MAX_BUFFERS_PER_TRANSFER);
+
+	/* Space the callback to store extra data along with each block. */
+	if (per_io_data_size)
+		pgsr->per_io_data = palloc(per_io_data_size * max_pinned_buffers);
+
+	return pgsr;
+}
+
+/*
+ * Issue WILLNEED advice for the head cluster, and allocate a new head
+ * cluster.
+ *
+ * We don't have true asynchronous I/O to actually submit, but this is
+ * equivalent because it might start I/O on systems that understand WILLNEED
+ * advice.  We count it as an I/O in progress.
+ */
+static PgStreamingReadCluster *
+pg_streaming_read_submit(PgStreamingRead *pgsr)
+{
+	PgStreamingReadCluster *head_cluster;
+
+	head_cluster = &pgsr->clusters[pgsr->head];
+	Assert(head_cluster->nblocks > 0);
+
+#ifdef USE_PREFETCH
+
+	/*
+	 * Don't bother with advice if there will be no call to
+	 * CompleteReadBuffers() or direct I/O is enabled.
+	 */
+	if (head_cluster->need_complete &&
+		(io_direct_flags & IO_DIRECT_DATA) == 0)
+	{
+		/*
+		 * Purely sequential advice is known to hurt performance on some
+		 * systems, so only issue it if this looks random.
+		 */
+		if (head_cluster->bmr.smgr != pgsr->seq_bmr.smgr ||
+			head_cluster->bmr.rel != pgsr->seq_bmr.rel ||
+			head_cluster->forknum != pgsr->seq_forknum ||
+			head_cluster->blocknum != pgsr->seq_blocknum)
+		{
+			SMgrRelation smgr =
+				head_cluster->bmr.smgr ? head_cluster->bmr.smgr
+				: RelationGetSmgr(head_cluster->bmr.rel);
+
+			Assert(!head_cluster->advice_issued);
+
+			for (int i = 0; i < head_cluster->nblocks; i++)
+			{
+				if (head_cluster->need_advice[i])
+				{
+					BlockNumber first_blocknum = head_cluster->blocknum + i;
+					int			nblocks = 1;
+
+					/*
+					 * How many adjacent blocks can we merge with to reduce
+					 * system calls?  Usually this is all of them, unless
+					 * there are overlapping reads and our timing is unlucky.
+					 */
+					while ((i + 1) < head_cluster->nblocks &&
+						   head_cluster->need_advice[i + 1])
+					{
+						nblocks++;
+						i++;
+					}
+
+					smgrprefetch(smgr,
+								 head_cluster->forknum,
+								 first_blocknum,
+								 nblocks);
+				}
+
+			}
+
+			/*
+			 * Count this as an I/O that is concurrently in progress.  We
+			 * might have called smgrprefetch() more than once, if some of the
+			 * buffers in the range were already in buffer pool but not valid
+			 * yet, because of a concurrent read, but for now we choose to
+			 * track this as one I/O.
+			 */
+			head_cluster->advice_issued = true;
+			pgsr->ios_in_progress++;
+		}
+
+		/* Remember the point after this, for the above heuristics. */
+		pgsr->seq_bmr = head_cluster->bmr;
+		pgsr->seq_forknum = head_cluster->forknum;
+		pgsr->seq_blocknum = head_cluster->blocknum + head_cluster->nblocks;
+	}
+#endif
+
+	/* Create a new head cluster.  There must be space. */
+	Assert(pgsr->size > pgsr->max_pinned_buffers);
+	Assert((pgsr->head + 1) % pgsr->size != pgsr->tail);
+	if (++pgsr->head == pgsr->size)
+		pgsr->head = 0;
+	head_cluster = &pgsr->clusters[pgsr->head];
+	head_cluster->nblocks = 0;
+
+	return head_cluster;
+}
+
+void
+pg_streaming_read_prefetch(PgStreamingRead *pgsr)
+{
+	/* If we're finished or can't start one more I/O, then no prefetching. */
+	if (pgsr->finished || pgsr->ios_in_progress == pgsr->max_ios)
+		return;
+
+	/*
+	 * We'll also wait until the number of pinned buffers falls below our
+	 * trigger level, so that we have the chance to create a large cluster.
+	 */
+	if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger)
+		return;
+
+	do
+	{
+		BufferManagerRelation bmr;
+		ForkNumber	forknum;
+		BlockNumber blocknum;
+		ReadBufferMode mode;
+		Buffer		buffer;
+		bool		found;
+		bool		allocated;
+		bool		need_complete;
+		PgStreamingReadCluster *head_cluster;
+		void	   *per_io_data;
+
+		/* Do we have a full-sized cluster? */
+		head_cluster = &pgsr->clusters[pgsr->head];
+		if (head_cluster->nblocks == lengthof(head_cluster->buffers))
+		{
+			Assert(head_cluster->need_complete);
+			head_cluster = pg_streaming_read_submit(pgsr);
+
+			/*
+			 * Give up now if I/O is saturated or we couldn't form another
+			 * full cluster after this.
+			 */
+			if (pgsr->ios_in_progress == pgsr->max_ios ||
+				pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger)
+				break;
+		}
+
+		per_io_data = (char *) pgsr->per_io_data +
+			pgsr->per_io_data_size * pgsr->per_io_data_next;
+
+		/*
+		 * Try to find out which block the callback wants to read next.  False
+		 * indicates end-of-stream (but the client can restart).
+		 */
+		if (!pgsr->next_cb(pgsr, pgsr->pgsr_private, per_io_data,
+						   &bmr, &forknum, &blocknum, &mode))
+		{
+			pgsr->finished = true;
+			break;
+		}
+
+		Assert(mode == RBM_NORMAL || mode == RBM_WILL_ZERO);
+		Assert(pgsr->pinned_buffers < pgsr->max_pinned_buffers);
+
+		buffer = PrepareReadBuffer(bmr,
+								   forknum,
+								   blocknum,
+								   pgsr->strategy,
+								   &found,
+								   &allocated);
+		pgsr->pinned_buffers++;
+
+		need_complete = !found && mode != RBM_WILL_ZERO;
+
+		/* Is there a head cluster that we can't extend? */
+		head_cluster = &pgsr->clusters[pgsr->head];
+		if (head_cluster->nblocks > 0 &&
+			(!need_complete ||
+			 !head_cluster->need_complete ||
+			 head_cluster->bmr.smgr != bmr.smgr ||
+			 head_cluster->bmr.rel != bmr.rel ||
+			 head_cluster->forknum != forknum ||
+			 head_cluster->blocknum + head_cluster->nblocks != blocknum))
+		{
+			/* Submit it so we can start a new one. */
+			head_cluster = pg_streaming_read_submit(pgsr);
+			Assert(head_cluster->nblocks == 0);
+		}
+
+		if (head_cluster->nblocks == 0)
+		{
+			/* Initialize the cluster. */
+			head_cluster->bmr = bmr;
+			head_cluster->forknum = forknum;
+			head_cluster->blocknum = blocknum;
+			head_cluster->advice_issued = false;
+			head_cluster->need_complete = need_complete;
+		}
+		else
+		{
+			/* We'll extend an existing cluster by one buffer. */
+			Assert(head_cluster->bmr.smgr == bmr.smgr);
+			Assert(head_cluster->bmr.rel == bmr.rel);
+			Assert(head_cluster->forknum == forknum);
+			Assert(head_cluster->blocknum + head_cluster->nblocks == blocknum);
+			Assert(head_cluster->need_complete);
+		}
+
+		head_cluster->per_io_data_index[head_cluster->nblocks] = pgsr->per_io_data_next++;
+		head_cluster->need_advice[head_cluster->nblocks] = allocated;
+		head_cluster->buffers[head_cluster->nblocks] = buffer;
+		head_cluster->nblocks++;
+
+		if (pgsr->per_io_data_next == pgsr->max_pinned_buffers)
+			pgsr->per_io_data_next = 0;
+
+	} while (pgsr->ios_in_progress < pgsr->max_ios &&
+			 pgsr->pinned_buffers < pgsr->max_pinned_buffers);
+
+	/*
+	 * Initiate as soon as we can if we can't prepare any more reads right
+	 * now.  This makes sure we issue the advice as soon as possible, since
+	 * any other backend that tries to read the same block won't do that.
+	 */
+	if (pgsr->clusters[pgsr->head].nblocks > 0)
+		pg_streaming_read_submit(pgsr);
+}
+
+void
+pg_streaming_read_reset(PgStreamingRead *pgsr)
+{
+	pgsr->finished = false;
+}
+
+Buffer
+pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_io_data)
+{
+	pg_streaming_read_prefetch(pgsr);
+
+	/* See if we have one buffer to return. */
+	while (pgsr->tail != pgsr->head)
+	{
+		PgStreamingReadCluster *tail_cluster;
+
+		tail_cluster = &pgsr->clusters[pgsr->tail];
+
+		/*
+		 * Do we need to perform an I/O before returning the buffers from this
+		 * cluster?
+		 */
+		if (tail_cluster->need_complete)
+		{
+			CompleteReadBuffers(tail_cluster->bmr,
+								tail_cluster->buffers,
+								tail_cluster->forknum,
+								tail_cluster->blocknum,
+								tail_cluster->nblocks,
+								false,
+								pgsr->strategy);
+			tail_cluster->need_complete = false;
+
+			/* We only counted this I/O as running if we issued advice. */
+			if (tail_cluster->advice_issued)
+				pgsr->ios_in_progress--;
+		}
+
+		/* Are there more buffers available in this cluster? */
+		if (pgsr->next_tail_buffer < tail_cluster->nblocks)
+		{
+			/* We are giving away ownership of this pinned buffer. */
+			Assert(pgsr->pinned_buffers > 0);
+			pgsr->pinned_buffers--;
+
+			if (per_io_data)
+				*per_io_data = (char *) pgsr->per_io_data +
+					tail_cluster->per_io_data_index[pgsr->next_tail_buffer] *
+					pgsr->per_io_data_size;
+
+			return tail_cluster->buffers[pgsr->next_tail_buffer++];
+		}
+
+		/* Advance tail to next cluster, if there is one. */
+		if (++pgsr->tail == pgsr->size)
+			pgsr->tail = 0;
+		pgsr->next_tail_buffer = 0;
+	}
+
+	return InvalidBuffer;
+}
+
+void
+pg_streaming_read_free(PgStreamingRead *pgsr)
+{
+	Buffer		buffer;
+
+	/* Stop reading ahead, and unpin anything that wasn't consumed. */
+	pgsr->finished = true;
+	for (;;)
+	{
+		buffer = pg_streaming_read_buffer_get_next(pgsr, NULL);
+		if (buffer == InvalidBuffer)
+			break;
+		ReleaseBuffer(buffer);
+	}
+
+	if (pgsr->per_io_data)
+		pfree(pgsr->per_io_data);
+	pfree(pgsr);
+}
+
+int
+pg_streaming_read_ios(PgStreamingRead *pgsr)
+{
+	return pgsr->ios_in_progress;
+}
+
+int
+pg_streaming_read_pins(PgStreamingRead *pgsr)
+{
+	return pgsr->pinned_buffers;
+}
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index d647708f7f..678e3239a9 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1878,7 +1878,7 @@ again:
  * pessimistic, but outside of toy-sized shared_buffers it should allow
  * sufficient pins.
  */
-static void
+void
 LimitAdditionalPins(uint32 *additional_pins)
 {
 	uint32		max_backends;
diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build
index 6ea9faa439..b196d9f885 100644
--- a/src/backend/storage/meson.build
+++ b/src/backend/storage/meson.build
@@ -1,5 +1,6 @@
 # Copyright (c) 2022-2023, PostgreSQL Global Development Group
 
+subdir('aio')
 subdir('buffer')
 subdir('file')
 subdir('freespace')
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 5a2f66ed47..435239955e 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -271,6 +271,7 @@ extern void AbortBufferIO(Buffer buffer);
 extern bool BgBufferSync(struct WritebackContext *wb_context);
 
 extern void TestForOldSnapshot_impl(Snapshot snapshot, Relation relation);
+extern void LimitAdditionalPins(uint32 *additional_pins);
 
 /* in buf_init.c */
 extern void InitBufferPool(void);
diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h
new file mode 100644
index 0000000000..7717cdf056
--- /dev/null
+++ b/src/include/storage/streaming_read.h
@@ -0,0 +1,38 @@
+#ifndef STREAMING_READ_H
+#define STREAMING_READ_H
+
+#include "storage/bufmgr.h"
+#include "storage/fd.h"
+#include "storage/smgr.h"
+
+/*
+ * For most sequential access, callers can user this size to build full sized
+ * reads without pinning many extra buffers.
+ */
+#define PG_STREAMING_READ_DEFAULT_MAX_IOS MAX_BUFFERS_PER_TRANSFER
+
+struct PgStreamingRead;
+typedef struct PgStreamingRead PgStreamingRead;
+
+typedef bool (*PgStreamingReadBufferDetermineNextCB) (PgStreamingRead *pgsr,
+													  uintptr_t pgsr_private,
+													  void *per_io_private,
+													  BufferManagerRelation *bmr,
+													  ForkNumber *forkNum,
+													  BlockNumber *blockNum,
+													  ReadBufferMode *mode);
+
+extern PgStreamingRead *pg_streaming_read_buffer_alloc(int max_ios,
+													   size_t per_io_private_size,
+													   uintptr_t pgsr_private,
+													   BufferAccessStrategy strategy,
+													   PgStreamingReadBufferDetermineNextCB determine_next_cb);
+extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr);
+extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_io_private);
+extern void pg_streaming_read_reset(PgStreamingRead *pgsr);
+extern void pg_streaming_read_free(PgStreamingRead *pgsr);
+
+extern int pg_streaming_read_ios(PgStreamingRead *pgsr);
+extern int pg_streaming_read_pins(PgStreamingRead *pgsr);
+
+#endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 49a33c0387..a0752fa30e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2076,6 +2076,8 @@ PgStat_TableCounts
 PgStat_TableStatus
 PgStat_TableXactStatus
 PgStat_WalStats
+PgStreamingRead
+PgStreamingReadCluster
 PgXmlErrorContext
 PgXmlStrictness
 Pg_finfo_record
-- 
2.39.2

