From 6b9989da160c8a96a8e70ae276796b460c205ff0 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 22 Jul 2023 17:31:54 +1200
Subject: [PATCH v3 12/13] Streaming Read API

---
 contrib/pg_prewarm/pg_prewarm.c          |  40 +-
 src/backend/access/transam/xlogutils.c   |   2 +-
 src/backend/postmaster/bgwriter.c        |   8 +-
 src/backend/postmaster/checkpointer.c    |  15 +-
 src/backend/storage/Makefile             |   2 +-
 src/backend/storage/aio/Makefile         |  14 +
 src/backend/storage/aio/meson.build      |   5 +
 src/backend/storage/aio/streaming_read.c | 435 ++++++++++++++++++
 src/backend/storage/buffer/bufmgr.c      | 560 +++++++++++++++--------
 src/backend/storage/buffer/localbuf.c    |  14 +-
 src/backend/storage/meson.build          |   1 +
 src/backend/storage/smgr/smgr.c          |  49 +-
 src/include/storage/bufmgr.h             |  22 +
 src/include/storage/smgr.h               |   4 +-
 src/include/storage/streaming_read.h     |  45 ++
 src/include/utils/rel.h                  |   6 -
 src/tools/pgindent/typedefs.list         |   2 +
 17 files changed, 986 insertions(+), 238 deletions(-)
 create mode 100644 src/backend/storage/aio/Makefile
 create mode 100644 src/backend/storage/aio/meson.build
 create mode 100644 src/backend/storage/aio/streaming_read.c
 create mode 100644 src/include/storage/streaming_read.h

diff --git a/contrib/pg_prewarm/pg_prewarm.c b/contrib/pg_prewarm/pg_prewarm.c
index 8541e4d6e46..9617bf130bd 100644
--- a/contrib/pg_prewarm/pg_prewarm.c
+++ b/contrib/pg_prewarm/pg_prewarm.c
@@ -20,6 +20,7 @@
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "storage/smgr.h"
+#include "storage/streaming_read.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -38,6 +39,25 @@ typedef enum
 
 static PGIOAlignedBlock blockbuffer;
 
+struct pg_prewarm_streaming_read_private
+{
+	BlockNumber blocknum;
+	int64		last_block;
+};
+
+static BlockNumber
+pg_prewarm_streaming_read_next(PgStreamingRead *pgsr,
+							   void *pgsr_private,
+							   void *per_buffer_data)
+{
+	struct pg_prewarm_streaming_read_private *p = pgsr_private;
+
+	if (p->blocknum <= p->last_block)
+		return p->blocknum++;
+
+	return InvalidBlockNumber;
+}
+
 /*
  * pg_prewarm(regclass, mode text, fork text,
  *			  first_block int8, last_block int8)
@@ -183,18 +203,36 @@ pg_prewarm(PG_FUNCTION_ARGS)
 	}
 	else if (ptype == PREWARM_BUFFER)
 	{
+		struct pg_prewarm_streaming_read_private p;
+		PgStreamingRead *pgsr;
+
 		/*
 		 * In buffer mode, we actually pull the data into shared_buffers.
 		 */
+
+		/* Set up the private state for our streaming buffer read callback. */
+		p.blocknum = first_block;
+		p.last_block = last_block;
+
+		pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT,
+											  &p,
+											  0,
+											  NULL,
+											  BMR_REL(rel),
+											  forkNumber,
+											  pg_prewarm_streaming_read_next);
+
 		for (block = first_block; block <= last_block; ++block)
 		{
 			Buffer		buf;
 
 			CHECK_FOR_INTERRUPTS();
-			buf = ReadBufferExtended(rel, forkNumber, block, RBM_NORMAL, NULL);
+			buf = pg_streaming_read_buffer_get_next(pgsr, NULL);
 			ReleaseBuffer(buf);
 			++blocks_done;
 		}
+		Assert(pg_streaming_read_buffer_get_next(pgsr, NULL) == InvalidBuffer);
+		pg_streaming_read_free(pgsr);
 	}
 
 	/* Close relation, release lock. */
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index aa8667abd10..8775b5789be 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -657,7 +657,7 @@ XLogDropDatabase(Oid dbid)
 	 * This is unnecessarily heavy-handed, as it will close SMgrRelation
 	 * objects for other databases as well. DROP DATABASE occurs seldom enough
 	 * that it's not worth introducing a variant of smgrclose for just this
-	 * purpose. XXX: Or should we rather leave the smgr entries dangling?
+	 * purpose.
 	 */
 	smgrcloseall();
 
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index d7d6cc0cd7b..13e5376619e 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -246,10 +246,12 @@ BackgroundWriterMain(void)
 		if (FirstCallSinceLastCheckpoint())
 		{
 			/*
-			 * After any checkpoint, close all smgr files.  This is so we
-			 * won't hang onto smgr references to deleted files indefinitely.
+			 * After any checkpoint, free all smgr objects.  Otherwise we
+			 * would never do so for dropped relations, as the bgwriter does
+			 * not process shared invalidation messages or call
+			 * AtEOXact_SMgr().
 			 */
-			smgrcloseall();
+			smgrdestroyall();
 		}
 
 		/*
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 5e949fc885b..5d843b61426 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -469,10 +469,12 @@ CheckpointerMain(void)
 				ckpt_performed = CreateRestartPoint(flags);
 
 			/*
-			 * After any checkpoint, close all smgr files.  This is so we
-			 * won't hang onto smgr references to deleted files indefinitely.
+			 * After any checkpoint, free all smgr objects.  Otherwise we
+			 * would never do so for dropped relations, as the checkpointer
+			 * does not process shared invalidation messages or call
+			 * AtEOXact_SMgr().
 			 */
-			smgrcloseall();
+			smgrdestroyall();
 
 			/*
 			 * Indicate checkpoint completion to any waiting backends.
@@ -958,11 +960,8 @@ RequestCheckpoint(int flags)
 		 */
 		CreateCheckPoint(flags | CHECKPOINT_IMMEDIATE);
 
-		/*
-		 * After any checkpoint, close all smgr files.  This is so we won't
-		 * hang onto smgr references to deleted files indefinitely.
-		 */
-		smgrcloseall();
+		/* Free all smgr objects, as CheckpointerMain() normally would. */
+		smgrdestroyall();
 
 		return;
 	}
diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile
index 8376cdfca20..eec03f6f2b4 100644
--- a/src/backend/storage/Makefile
+++ b/src/backend/storage/Makefile
@@ -8,6 +8,6 @@ subdir = src/backend/storage
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS     = buffer file freespace ipc large_object lmgr page smgr sync
+SUBDIRS     = aio buffer file freespace ipc large_object lmgr page smgr sync
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile
new file mode 100644
index 00000000000..bcab44c802f
--- /dev/null
+++ b/src/backend/storage/aio/Makefile
@@ -0,0 +1,14 @@
+#
+# Makefile for storage/aio
+#
+# src/backend/storage/aio/Makefile
+#
+
+subdir = src/backend/storage/aio
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+	streaming_read.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build
new file mode 100644
index 00000000000..39aef2a84a2
--- /dev/null
+++ b/src/backend/storage/aio/meson.build
@@ -0,0 +1,5 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+backend_sources += files(
+  'streaming_read.c',
+)
diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c
new file mode 100644
index 00000000000..19605090fea
--- /dev/null
+++ b/src/backend/storage/aio/streaming_read.c
@@ -0,0 +1,435 @@
+#include "postgres.h"
+
+#include "storage/streaming_read.h"
+#include "utils/rel.h"
+
+/*
+ * Element type for PgStreamingRead's circular array of block ranges.
+ *
+ * For hits, need_to_complete is false and there is just one block per
+ * range, already pinned and ready for use.
+ *
+ * For misses, need_to_complete is true and buffers[] holds a range of
+ * blocks that are contiguous in storage (though the buffers may not be
+ * contiguous in memory), so we can complete them with a single call to
+ * CompleteReadBuffers().
+ */
+typedef struct PgStreamingReadRange
+{
+	bool		advice_issued;
+	bool		need_complete;
+	BlockNumber blocknum;
+	int			nblocks;
+	int			per_buffer_data_index[MAX_BUFFERS_PER_TRANSFER];
+	Buffer		buffers[MAX_BUFFERS_PER_TRANSFER];
+} PgStreamingReadRange;
+
+struct PgStreamingRead
+{
+	int			max_ios;
+	int			ios_in_progress;
+	int			ios_in_progress_trigger;
+	int			max_pinned_buffers;
+	int			pinned_buffers;
+	int			pinned_buffers_trigger;
+	int			next_tail_buffer;
+	bool		finished;
+	void	   *pgsr_private;
+	PgStreamingReadBufferCB callback;
+	BufferAccessStrategy strategy;
+	BufferManagerRelation bmr;
+	ForkNumber	forknum;
+
+	bool		advice_enabled;
+
+	/* Next expected block, for detecting sequential access. */
+	BlockNumber seq_blocknum;
+
+	/* Space for optional per-buffer private data. */
+	size_t		per_buffer_data_size;
+	void	   *per_buffer_data;
+	int			per_buffer_data_next;
+
+	/* Circular buffer of ranges. */
+	int			size;
+	int			head;
+	int			tail;
+	PgStreamingReadRange ranges[FLEXIBLE_ARRAY_MEMBER];
+};
+
+static PgStreamingRead *
+pg_streaming_read_buffer_alloc_internal(int flags,
+										void *pgsr_private,
+										size_t per_buffer_data_size,
+										BufferAccessStrategy strategy)
+{
+	PgStreamingRead *pgsr;
+	int			size;
+	int			max_ios;
+	uint32		max_pinned_buffers;
+
+
+	/*
+	 * Decide how many assumed I/Os we will allow to run concurrently.  That
+	 * is, advice to the kernel to tell it that we will soon read.  This
+	 * number also affects how far we look ahead for opportunities to start
+	 * more I/Os.
+	 */
+	if (flags & PGSR_FLAG_MAINTENANCE)
+		max_ios = maintenance_io_concurrency;
+	else
+		max_ios = effective_io_concurrency;
+
+	/*
+	 * The desired level of I/O concurrency controls how far ahead we are
+	 * willing to look ahead.  We also clamp it to at least
+	 * MAX_BUFFER_PER_TRANFER so that we can have a chance to build up a full
+	 * sized read, even when max_ios is zero.
+	 */
+	max_pinned_buffers = Max(max_ios * 4, MAX_BUFFERS_PER_TRANSFER);
+
+	/*
+	 * The *_io_concurrency GUCs, we might have 0.  We want to allow at least
+	 * one, to keep our gating logic simple.
+	 */
+	max_ios = Max(max_ios, 1);
+
+	/*
+	 * Don't allow this backend to pin too many buffers.  For now we'll apply
+	 * the limit for the shared buffer pool and the local buffer pool, without
+	 * worrying which it is.
+	 */
+	LimitAdditionalPins(&max_pinned_buffers);
+	LimitAdditionalLocalPins(&max_pinned_buffers);
+	Assert(max_pinned_buffers > 0);
+
+	/*
+	 * pgsr->ranges is a circular buffer.  When it is empty, head == tail.
+	 * When it is full, there is an empty element between head and tail.  Head
+	 * can also be empty (nblocks == 0), therefore we need two extra elements
+	 * for non-occupied ranges, on top of max_pinned_buffers to allow for the
+	 * maxmimum possible number of occupied ranges of the smallest possible
+	 * size of one.
+	 */
+	size = max_pinned_buffers + 2;
+
+	pgsr = (PgStreamingRead *)
+		palloc0(offsetof(PgStreamingRead, ranges) +
+				sizeof(pgsr->ranges[0]) * size);
+
+	pgsr->max_ios = max_ios;
+	pgsr->per_buffer_data_size = per_buffer_data_size;
+	pgsr->max_pinned_buffers = max_pinned_buffers;
+	pgsr->pgsr_private = pgsr_private;
+	pgsr->strategy = strategy;
+	pgsr->size = size;
+
+#ifdef USE_PREFETCH
+
+	/*
+	 * This system supports prefetching advice.  As long as direct I/O isn't
+	 * enabled, and the caller hasn't promised sequential access, we can use
+	 * it.
+	 */
+	if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
+		(flags & PGSR_FLAG_SEQUENTIAL) == 0)
+		pgsr->advice_enabled = true;
+#endif
+
+	/*
+	 * We want to avoid creating ranges that are smaller than they could be
+	 * just because we hit max_pinned_buffers.  We only look ahead when the
+	 * number of pinned buffers falls below this trigger number, or put
+	 * another way, we stop looking ahead when we wouldn't be able to build a
+	 * "full sized" range.
+	 */
+	pgsr->pinned_buffers_trigger =
+		Max(1, (int) max_pinned_buffers - MAX_BUFFERS_PER_TRANSFER);
+
+	/* Space the callback to store extra data along with each block. */
+	if (per_buffer_data_size)
+		pgsr->per_buffer_data = palloc(per_buffer_data_size * max_pinned_buffers);
+
+	return pgsr;
+}
+
+/*
+ * Create a new streaming read object that can be used to perform the
+ * equivalent of a series of ReadBuffer() calls for one fork of one relation.
+ * Internally, it generates larger vectored reads where possible by looking
+ * ahead.
+ */
+PgStreamingRead *
+pg_streaming_read_buffer_alloc(int flags,
+							   void *pgsr_private,
+							   size_t per_buffer_data_size,
+							   BufferAccessStrategy strategy,
+							   BufferManagerRelation bmr,
+							   ForkNumber forknum,
+							   PgStreamingReadBufferCB next_block_cb)
+{
+	PgStreamingRead *result;
+
+	result = pg_streaming_read_buffer_alloc_internal(flags,
+													 pgsr_private,
+													 per_buffer_data_size,
+													 strategy);
+	result->callback = next_block_cb;
+	result->bmr = bmr;
+	result->forknum = forknum;
+
+	return result;
+}
+
+/*
+ * Start building a new range.  This is called after the previous one
+ * reached maximum size, or the callback's next block can't be merged with it.
+ *
+ * Since the previous head range has now reached its full potential size, this
+ * is also a good time to issue 'prefetch' advice, because we know that'll
+ * soon be reading.  In future, we could start an actual I/O here.
+ */
+static PgStreamingReadRange *
+pg_streaming_read_new_range(PgStreamingRead *pgsr)
+{
+	PgStreamingReadRange *head_range;
+
+	head_range = &pgsr->ranges[pgsr->head];
+	Assert(head_range->nblocks > 0);
+
+	/*
+	 * If a call to CompleteReadBuffers() will be needed, and we can issue
+	 * advice to the kernel to get the read started.  We suppress it if the
+	 * access pattern appears to be completely sequential, though, because on
+	 * some systems that interfers with the kernel's own sequential read ahead
+	 * heurstics and hurts performance.
+	 */
+	if (pgsr->advice_enabled)
+	{
+		BlockNumber blocknum = head_range->blocknum;
+		int			nblocks = head_range->nblocks;
+
+		if (head_range->need_complete && blocknum != pgsr->seq_blocknum)
+		{
+			SMgrRelation smgr =
+				pgsr->bmr.smgr ? pgsr->bmr.smgr :
+				RelationGetSmgr(pgsr->bmr.rel);
+
+			Assert(!head_range->advice_issued);
+
+			smgrprefetch(smgr, pgsr->forknum, blocknum, nblocks);
+
+			/*
+			 * Count this as an I/O that is concurrently in progress, though
+			 * we don't really know if the kernel generates a physical I/O.
+			 */
+			head_range->advice_issued = true;
+			pgsr->ios_in_progress++;
+		}
+
+		/* Remember the block after this range, for sequence detection. */
+		pgsr->seq_blocknum = blocknum + nblocks;
+	}
+
+	/* Create a new head range.  There must be space. */
+	Assert(pgsr->size > pgsr->max_pinned_buffers);
+	Assert((pgsr->head + 1) % pgsr->size != pgsr->tail);
+	if (++pgsr->head == pgsr->size)
+		pgsr->head = 0;
+	head_range = &pgsr->ranges[pgsr->head];
+	head_range->nblocks = 0;
+
+	return head_range;
+}
+
+static void
+pg_streaming_read_look_ahead(PgStreamingRead *pgsr)
+{
+	/*
+	 * If we're finished or can't start more I/O, then don't look ahead.
+	 */
+	if (pgsr->finished || pgsr->ios_in_progress == pgsr->max_ios)
+		return;
+
+	/*
+	 * We'll also wait until the number of pinned buffers falls below our
+	 * trigger level, so that we have the chance to create a full range.
+	 */
+	if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger)
+		return;
+
+	do
+	{
+		BufferManagerRelation bmr;
+		ForkNumber	forknum;
+		BlockNumber blocknum;
+		Buffer		buffer;
+		bool		found;
+		bool		need_complete;
+		PgStreamingReadRange *head_range;
+		void	   *per_buffer_data;
+
+		/* Do we have a full-sized range? */
+		head_range = &pgsr->ranges[pgsr->head];
+		if (head_range->nblocks == lengthof(head_range->buffers))
+		{
+			Assert(head_range->need_complete);
+			head_range = pg_streaming_read_new_range(pgsr);
+
+			/*
+			 * Give up now if I/O is saturated, or we wouldn't be able form
+			 * another full range after this due to the pin limit.
+			 */
+			if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger ||
+				pgsr->ios_in_progress == pgsr->max_ios)
+				break;
+		}
+
+		per_buffer_data = (char *) pgsr->per_buffer_data +
+			pgsr->per_buffer_data_size * pgsr->per_buffer_data_next;
+
+		/* Find out which block the callback wants to read next. */
+		blocknum = pgsr->callback(pgsr, pgsr->pgsr_private, per_buffer_data);
+		if (blocknum == InvalidBlockNumber)
+		{
+			pgsr->finished = true;
+			break;
+		}
+		bmr = pgsr->bmr;
+		forknum = pgsr->forknum;
+
+		Assert(pgsr->pinned_buffers < pgsr->max_pinned_buffers);
+
+		buffer = PrepareReadBuffer(bmr,
+								   forknum,
+								   blocknum,
+								   pgsr->strategy,
+								   &found);
+		pgsr->pinned_buffers++;
+
+		need_complete = !found;
+
+		/* Is there a head range that we can't extend? */
+		head_range = &pgsr->ranges[pgsr->head];
+		if (head_range->nblocks > 0 &&
+			(!need_complete ||
+			 !head_range->need_complete ||
+			 head_range->blocknum + head_range->nblocks != blocknum))
+		{
+			/* Yes, time to start building a new one. */
+			head_range = pg_streaming_read_new_range(pgsr);
+			Assert(head_range->nblocks == 0);
+		}
+
+		if (head_range->nblocks == 0)
+		{
+			/* Initialize a new range beginning at this block. */
+			head_range->blocknum = blocknum;
+			head_range->need_complete = need_complete;
+			head_range->advice_issued = false;
+		}
+		else
+		{
+			/* We can extend an existing range by one block. */
+			Assert(head_range->blocknum + head_range->nblocks == blocknum);
+			Assert(head_range->need_complete);
+		}
+
+		head_range->per_buffer_data_index[head_range->nblocks] = pgsr->per_buffer_data_next++;
+		head_range->buffers[head_range->nblocks] = buffer;
+		head_range->nblocks++;
+
+		if (pgsr->per_buffer_data_next == pgsr->max_pinned_buffers)
+			pgsr->per_buffer_data_next = 0;
+
+	} while (pgsr->pinned_buffers < pgsr->max_pinned_buffers &&
+			 pgsr->ios_in_progress < pgsr->max_ios);
+
+	if (pgsr->ranges[pgsr->head].nblocks > 0)
+		pg_streaming_read_new_range(pgsr);
+}
+
+Buffer
+pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_data)
+{
+	pg_streaming_read_look_ahead(pgsr);
+
+	/* See if we have one buffer to return. */
+	while (pgsr->tail != pgsr->head)
+	{
+		PgStreamingReadRange *tail_range;
+
+		tail_range = &pgsr->ranges[pgsr->tail];
+
+		/*
+		 * Do we need to perform an I/O before returning the buffers from this
+		 * range?
+		 */
+		if (tail_range->need_complete)
+		{
+			CompleteReadBuffers(pgsr->bmr,
+								tail_range->buffers,
+								pgsr->forknum,
+								tail_range->blocknum,
+								tail_range->nblocks,
+								false,
+								pgsr->strategy);
+			tail_range->need_complete = false;
+
+			/*
+			 * We don't really know if the kernel generated an physical I/O
+			 * when we issued advice, let alone when it finished, but it has
+			 * certainly finished after a read call returns.
+			 */
+			if (tail_range->advice_issued)
+				pgsr->ios_in_progress--;
+		}
+
+		/* Are there more buffers available in this range? */
+		if (pgsr->next_tail_buffer < tail_range->nblocks)
+		{
+			int			buffer_index;
+			Buffer		buffer;
+
+			buffer_index = pgsr->next_tail_buffer++;
+			buffer = tail_range->buffers[buffer_index];
+
+			Assert(BufferIsValid(buffer));
+
+			/* We are giving away ownership of this pinned buffer. */
+			Assert(pgsr->pinned_buffers > 0);
+			pgsr->pinned_buffers--;
+
+			if (per_buffer_data)
+				*per_buffer_data = (char *) pgsr->per_buffer_data +
+					tail_range->per_buffer_data_index[buffer_index] *
+					pgsr->per_buffer_data_size;
+
+			return buffer;
+		}
+
+		/* Advance tail to next range, if there is one. */
+		if (++pgsr->tail == pgsr->size)
+			pgsr->tail = 0;
+		pgsr->next_tail_buffer = 0;
+	}
+
+	Assert(pgsr->pinned_buffers == 0);
+
+	return InvalidBuffer;
+}
+
+void
+pg_streaming_read_free(PgStreamingRead *pgsr)
+{
+	Buffer		buffer;
+
+	/* Stop looking ahead, and unpin anything that wasn't consumed. */
+	pgsr->finished = true;
+	while ((buffer = pg_streaming_read_buffer_get_next(pgsr, NULL)) != InvalidBuffer)
+		ReleaseBuffer(buffer);
+
+	if (pgsr->per_buffer_data)
+		pfree(pgsr->per_buffer_data);
+	pfree(pgsr);
+}
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 7d601bef6dd..2157a97b973 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -472,7 +472,7 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
 )
 
 
-static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence,
+static Buffer ReadBuffer_common(BufferManagerRelation bmr,
 								ForkNumber forkNum, BlockNumber blockNum,
 								ReadBufferMode mode, BufferAccessStrategy strategy,
 								bool *hit);
@@ -501,7 +501,7 @@ static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
 static int	SyncOneBuffer(int buf_id, bool skip_recently_used,
 						  WritebackContext *wb_context);
 static void WaitIO(BufferDesc *buf);
-static bool StartBufferIO(BufferDesc *buf, bool forInput);
+static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
 static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
 							  uint32 set_flag_bits, bool forget_owner);
 static void AbortBufferIO(Buffer buffer);
@@ -795,15 +795,9 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("cannot access temporary tables of other sessions")));
 
-	/*
-	 * Read the buffer, and update pgstat counters to reflect a cache hit or
-	 * miss.
-	 */
-	pgstat_count_buffer_read(reln);
-	buf = ReadBuffer_common(RelationGetSmgr(reln), reln->rd_rel->relpersistence,
+	buf = ReadBuffer_common(BMR_REL(reln),
 							forkNum, blockNum, mode, strategy, &hit);
-	if (hit)
-		pgstat_count_buffer_hit(reln);
+
 	return buf;
 }
 
@@ -827,8 +821,9 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum,
 
 	SMgrRelation smgr = smgropen(rlocator, InvalidBackendId);
 
-	return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT :
-							 RELPERSISTENCE_UNLOGGED, forkNum, blockNum,
+	return ReadBuffer_common(BMR_SMGR(smgr, permanent ? RELPERSISTENCE_PERMANENT :
+									  RELPERSISTENCE_UNLOGGED),
+							 forkNum, blockNum,
 							 mode, strategy, &hit);
 }
 
@@ -1002,7 +997,7 @@ ExtendBufferedRelTo(BufferManagerRelation bmr,
 		bool		hit;
 
 		Assert(extended_by == 0);
-		buffer = ReadBuffer_common(bmr.smgr, bmr.relpersistence,
+		buffer = ReadBuffer_common(bmr,
 								   fork, extend_to - 1, mode, strategy,
 								   &hit);
 	}
@@ -1016,18 +1011,11 @@ ExtendBufferedRelTo(BufferManagerRelation bmr,
  * *hit is set to true if the request was satisfied from shared buffer cache.
  */
 static Buffer
-ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
+ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum,
 				  BlockNumber blockNum, ReadBufferMode mode,
 				  BufferAccessStrategy strategy, bool *hit)
 {
-	BufferDesc *bufHdr;
-	Block		bufBlock;
-	bool		found;
-	IOContext	io_context;
-	IOObject	io_object;
-	bool		isLocalBuf = SmgrIsTemp(smgr);
-
-	*hit = false;
+	Buffer		buffer;
 
 	/*
 	 * Backward compatibility path, most code should use ExtendBufferedRel()
@@ -1046,175 +1034,339 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
 			flags |= EB_LOCK_FIRST;
 
-		return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence),
-								 forkNum, strategy, flags);
+		*hit = false;
+
+		return ExtendBufferedRel(bmr, forkNum, strategy, flags);
 	}
 
-	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
-									   smgr->smgr_rlocator.locator.spcOid,
-									   smgr->smgr_rlocator.locator.dbOid,
-									   smgr->smgr_rlocator.locator.relNumber,
-									   smgr->smgr_rlocator.backend);
+	buffer = PrepareReadBuffer(bmr,
+							   forkNum,
+							   blockNum,
+							   strategy,
+							   hit);
+
+	/* At this point we do NOT hold any locks. */
 
+	if (mode == RBM_ZERO_AND_CLEANUP_LOCK || mode == RBM_ZERO_AND_LOCK)
+	{
+		/* if we just want zeroes and a lock, we're done */
+		ZeroBuffer(buffer, mode);
+	}
+	else if (!*hit)
+	{
+		/* we might need to perform I/O */
+		CompleteReadBuffers(bmr,
+							&buffer,
+							forkNum,
+							blockNum,
+							1,
+							mode == RBM_ZERO_ON_ERROR,
+							strategy);
+	}
+
+	return buffer;
+}
+
+/*
+ * Prepare to read a block.  The buffer is pinned.  If this is a 'hit', then
+ * the returned buffer can be used immediately.  Otherwise, a physical read
+ * should be completed with CompleteReadBuffers(), or the buffer should be
+ * zeroed with ZeroBuffer().  PrepareReadBuffer() followed by
+ * CompleteReadBuffers() or ZeroBuffer() is equivalent to ReadBuffer(), but
+ * the caller has the opportunity to combine reads of multiple neighboring
+ * blocks into one CompleteReadBuffers() call.
+ *
+ * *foundPtr is set to true for a hit, and false for a miss.
+ */
+Buffer
+PrepareReadBuffer(BufferManagerRelation bmr,
+				  ForkNumber forkNum,
+				  BlockNumber blockNum,
+				  BufferAccessStrategy strategy,
+				  bool *foundPtr)
+{
+	BufferDesc *bufHdr;
+	bool		isLocalBuf;
+	IOContext	io_context;
+	IOObject	io_object;
+
+	Assert(blockNum != P_NEW);
+
+	if (bmr.rel)
+	{
+		bmr.smgr = RelationGetSmgr(bmr.rel);
+		bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
+	}
+
+	isLocalBuf = SmgrIsTemp(bmr.smgr);
 	if (isLocalBuf)
 	{
-		/*
-		 * We do not use a BufferAccessStrategy for I/O of temporary tables.
-		 * However, in some cases, the "strategy" may not be NULL, so we can't
-		 * rely on IOContextForStrategy() to set the right IOContext for us.
-		 * This may happen in cases like CREATE TEMPORARY TABLE AS...
-		 */
 		io_context = IOCONTEXT_NORMAL;
 		io_object = IOOBJECT_TEMP_RELATION;
-		bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
-		if (found)
-			pgBufferUsage.local_blks_hit++;
-		else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
-				 mode == RBM_ZERO_ON_ERROR)
-			pgBufferUsage.local_blks_read++;
 	}
 	else
 	{
-		/*
-		 * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
-		 * not currently in memory.
-		 */
 		io_context = IOContextForStrategy(strategy);
 		io_object = IOOBJECT_RELATION;
-		bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
-							 strategy, &found, io_context);
-		if (found)
-			pgBufferUsage.shared_blks_hit++;
-		else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
-				 mode == RBM_ZERO_ON_ERROR)
-			pgBufferUsage.shared_blks_read++;
 	}
 
-	/* At this point we do NOT hold any locks. */
+	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
+									   bmr.smgr->smgr_rlocator.locator.spcOid,
+									   bmr.smgr->smgr_rlocator.locator.dbOid,
+									   bmr.smgr->smgr_rlocator.locator.relNumber,
+									   bmr.smgr->smgr_rlocator.backend);
 
-	/* if it was already in the buffer pool, we're done */
-	if (found)
+	ResourceOwnerEnlarge(CurrentResourceOwner);
+	if (isLocalBuf)
+	{
+		bufHdr = LocalBufferAlloc(bmr.smgr, forkNum, blockNum, foundPtr);
+		if (*foundPtr)
+			pgBufferUsage.local_blks_hit++;
+	}
+	else
+	{
+		bufHdr = BufferAlloc(bmr.smgr, bmr.relpersistence, forkNum, blockNum,
+							 strategy, foundPtr, io_context);
+		if (*foundPtr)
+			pgBufferUsage.shared_blks_hit++;
+	}
+	if (bmr.rel)
+	{
+		/*
+		 * While pgBufferUsage's "read" counter isn't bumped unless we reach
+		 * CompleteReadBuffers() (so, not for hits, and not for buffers that
+		 * are zeroed instead), the per-relation stats always count them.
+		 */
+		pgstat_count_buffer_read(bmr.rel);
+		if (*foundPtr)
+			pgstat_count_buffer_hit(bmr.rel);
+	}
+	if (*foundPtr)
 	{
-		/* Just need to update stats before we exit */
-		*hit = true;
 		VacuumPageHit++;
 		pgstat_count_io_op(io_object, io_context, IOOP_HIT);
-
 		if (VacuumCostActive)
 			VacuumCostBalance += VacuumCostPageHit;
 
 		TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-										  smgr->smgr_rlocator.locator.spcOid,
-										  smgr->smgr_rlocator.locator.dbOid,
-										  smgr->smgr_rlocator.locator.relNumber,
-										  smgr->smgr_rlocator.backend,
-										  found);
+										  bmr.smgr->smgr_rlocator.locator.spcOid,
+										  bmr.smgr->smgr_rlocator.locator.dbOid,
+										  bmr.smgr->smgr_rlocator.locator.relNumber,
+										  bmr.smgr->smgr_rlocator.backend,
+										  true);
+	}
 
-		/*
-		 * In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked
-		 * on return.
-		 */
-		if (!isLocalBuf)
-		{
-			if (mode == RBM_ZERO_AND_LOCK)
-				LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
-							  LW_EXCLUSIVE);
-			else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
-				LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
-		}
+	return BufferDescriptorGetBuffer(bufHdr);
+}
 
-		return BufferDescriptorGetBuffer(bufHdr);
+static inline bool
+CompleteReadBuffersCanStartIO(Buffer buffer, bool nowait)
+{
+	if (BufferIsLocal(buffer))
+	{
+		BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+
+		return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
 	}
+	else
+		return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
+}
 
-	/*
-	 * if we have gotten to this point, we have allocated a buffer for the
-	 * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it,
-	 * if it's a shared buffer.
-	 */
-	Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID));	/* spinlock not needed */
+/*
+ * Complete a set reads prepared with PrepareReadBuffers().  The buffers must
+ * cover a cluster of neighboring block numbers.
+ *
+ * Typically this performs one physical vector read covering the block range,
+ * but if some of the buffers have already been read in the meantime by any
+ * backend, zero or multiple reads may be performed.
+ */
+void
+CompleteReadBuffers(BufferManagerRelation bmr,
+					Buffer *buffers,
+					ForkNumber forknum,
+					BlockNumber blocknum,
+					int nblocks,
+					bool zero_on_error,
+					BufferAccessStrategy strategy)
+{
+	bool		isLocalBuf;
+	IOContext	io_context;
+	IOObject	io_object;
 
-	bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
+	if (bmr.rel)
+	{
+		bmr.smgr = RelationGetSmgr(bmr.rel);
+		bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
+	}
+
+	isLocalBuf = SmgrIsTemp(bmr.smgr);
+	if (isLocalBuf)
+	{
+		io_context = IOCONTEXT_NORMAL;
+		io_object = IOOBJECT_TEMP_RELATION;
+	}
+	else
+	{
+		io_context = IOContextForStrategy(strategy);
+		io_object = IOOBJECT_RELATION;
+	}
 
 	/*
-	 * Read in the page, unless the caller intends to overwrite it and just
-	 * wants us to allocate a buffer.
+	 * We count all these blocks as read by this backend.  This is traditional
+	 * behavior, but might turn out to be not true if we find that someone
+	 * else has beaten us and completed the read of some of these blocks.  In
+	 * that case the system globally double-counts, but we traditionally don't
+	 * count this as a "hit", and we don't have a separate counter for "miss,
+	 * but another backend completed the read".
 	 */
-	if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
-		MemSet((char *) bufBlock, 0, BLCKSZ);
+	if (isLocalBuf)
+		pgBufferUsage.local_blks_read += nblocks;
 	else
+		pgBufferUsage.shared_blks_read += nblocks;
+
+	for (int i = 0; i < nblocks; ++i)
 	{
-		instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
+		int			io_buffers_len;
+		Buffer		io_buffers[MAX_BUFFERS_PER_TRANSFER];
+		void	   *io_pages[MAX_BUFFERS_PER_TRANSFER];
+		instr_time	io_start;
+		BlockNumber io_first_block;
 
-		smgrread(smgr, forkNum, blockNum, bufBlock);
+#ifdef USE_ASSERT_CHECKING
 
-		pgstat_count_io_op_time(io_object, io_context,
-								IOOP_READ, io_start, 1);
+		/*
+		 * We could get all the information from buffer headers, but it can be
+		 * expensive to access buffer header cache lines so we make the caller
+		 * provide all the information we need, and assert that it is
+		 * consistent.
+		 */
+		{
+			RelFileLocator xlocator;
+			ForkNumber	xforknum;
+			BlockNumber xblocknum;
+
+			BufferGetTag(buffers[i], &xlocator, &xforknum, &xblocknum);
+			Assert(RelFileLocatorEquals(bmr.smgr->smgr_rlocator.locator, xlocator));
+			Assert(xforknum == forknum);
+			Assert(xblocknum == blocknum + i);
+		}
+#endif
+
+		/*
+		 * Skip this block if someone else has already completed it.  If an
+		 * I/O is already in progress in another backend, this will wait for
+		 * the outcome: either done, or something went wrong and we will
+		 * retry.
+		 */
+		if (!CompleteReadBuffersCanStartIO(buffers[i], false))
+		{
+			/*
+			 * Report this as a 'hit' for this backend, even though it must
+			 * have started out as a miss in PrepareReadBuffer().
+			 */
+			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i,
+											  bmr.smgr->smgr_rlocator.locator.spcOid,
+											  bmr.smgr->smgr_rlocator.locator.dbOid,
+											  bmr.smgr->smgr_rlocator.locator.relNumber,
+											  bmr.smgr->smgr_rlocator.backend,
+											  true);
+			continue;
+		}
+
+		/* We found a buffer that we need to read in. */
+		io_buffers[0] = buffers[i];
+		io_pages[0] = BufferGetBlock(buffers[i]);
+		io_first_block = blocknum + i;
+		io_buffers_len = 1;
 
-		/* check for garbage data */
-		if (!PageIsVerifiedExtended((Page) bufBlock, blockNum,
-									PIV_LOG_WARNING | PIV_REPORT_STAT))
+		/*
+		 * How many neighboring-on-disk blocks can we can scatter-read into
+		 * other buffers at the same time?  In this case we don't wait if we
+		 * see an I/O already in progress.  We already hold BM_IO_IN_PROGRESS
+		 * for the head block, so we should get on with that I/O as soon as
+		 * possible.  We'll come back to this block again, above.
+		 */
+		while ((i + 1) < nblocks &&
+			   CompleteReadBuffersCanStartIO(buffers[i + 1], true))
+		{
+			/* Must be consecutive block numbers. */
+			Assert(BufferGetBlockNumber(buffers[i + 1]) ==
+				   BufferGetBlockNumber(buffers[i]) + 1);
+
+			io_buffers[io_buffers_len] = buffers[++i];
+			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
+		}
+
+		io_start = pgstat_prepare_io_time(track_io_timing);
+		smgrreadv(bmr.smgr, forknum, io_first_block, io_pages, io_buffers_len);
+		pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start,
+								io_buffers_len);
+
+		/* Verify each block we read, and terminate the I/O. */
+		for (int j = 0; j < io_buffers_len; ++j)
 		{
-			if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
+			BufferDesc *bufHdr;
+			Block		bufBlock;
+
+			if (isLocalBuf)
 			{
-				ereport(WARNING,
-						(errcode(ERRCODE_DATA_CORRUPTED),
-						 errmsg("invalid page in block %u of relation %s; zeroing out page",
-								blockNum,
-								relpath(smgr->smgr_rlocator, forkNum))));
-				MemSet((char *) bufBlock, 0, BLCKSZ);
+				bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1);
+				bufBlock = LocalBufHdrGetBlock(bufHdr);
 			}
 			else
-				ereport(ERROR,
-						(errcode(ERRCODE_DATA_CORRUPTED),
-						 errmsg("invalid page in block %u of relation %s",
-								blockNum,
-								relpath(smgr->smgr_rlocator, forkNum))));
-		}
-	}
-
-	/*
-	 * In RBM_ZERO_AND_LOCK / RBM_ZERO_AND_CLEANUP_LOCK mode, grab the buffer
-	 * content lock before marking the page as valid, to make sure that no
-	 * other backend sees the zeroed page before the caller has had a chance
-	 * to initialize it.
-	 *
-	 * Since no-one else can be looking at the page contents yet, there is no
-	 * difference between an exclusive lock and a cleanup-strength lock. (Note
-	 * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
-	 * they assert that the buffer is already valid.)
-	 */
-	if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
-		!isLocalBuf)
-	{
-		LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
-	}
+			{
+				bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
+				bufBlock = BufHdrGetBlock(bufHdr);
+			}
 
-	if (isLocalBuf)
-	{
-		/* Only need to adjust flags */
-		uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+			/* check for garbage data */
+			if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j,
+										PIV_LOG_WARNING | PIV_REPORT_STAT))
+			{
+				if (zero_on_error || zero_damaged_pages)
+				{
+					ereport(WARNING,
+							(errcode(ERRCODE_DATA_CORRUPTED),
+							 errmsg("invalid page in block %u of relation %s; zeroing out page",
+									io_first_block + j,
+									relpath(bmr.smgr->smgr_rlocator, forknum))));
+					memset(bufBlock, 0, BLCKSZ);
+				}
+				else
+					ereport(ERROR,
+							(errcode(ERRCODE_DATA_CORRUPTED),
+							 errmsg("invalid page in block %u of relation %s",
+									io_first_block + j,
+									relpath(bmr.smgr->smgr_rlocator, forknum))));
+			}
 
-		buf_state |= BM_VALID;
-		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
-	}
-	else
-	{
-		/* Set BM_VALID, terminate IO, and wake up any waiters */
-		TerminateBufferIO(bufHdr, false, BM_VALID, true);
-	}
+			/* Terminate I/O and set BM_VALID. */
+			if (isLocalBuf)
+			{
+				uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
 
-	VacuumPageMiss++;
-	if (VacuumCostActive)
-		VacuumCostBalance += VacuumCostPageMiss;
+				buf_state |= BM_VALID;
+				pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+			}
+			else
+			{
+				/* Set BM_VALID, terminate IO, and wake up any waiters */
+				TerminateBufferIO(bufHdr, false, BM_VALID, true);
+			}
 
-	TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-									  smgr->smgr_rlocator.locator.spcOid,
-									  smgr->smgr_rlocator.locator.dbOid,
-									  smgr->smgr_rlocator.locator.relNumber,
-									  smgr->smgr_rlocator.backend,
-									  found);
+			/* Report I/Os as completing individually. */
+			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
+											  bmr.smgr->smgr_rlocator.locator.spcOid,
+											  bmr.smgr->smgr_rlocator.locator.dbOid,
+											  bmr.smgr->smgr_rlocator.locator.relNumber,
+											  bmr.smgr->smgr_rlocator.backend,
+											  false);
+		}
 
-	return BufferDescriptorGetBuffer(bufHdr);
+		VacuumPageMiss += io_buffers_len;
+		if (VacuumCostActive)
+			VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+	}
 }
 
 /*
@@ -1228,11 +1380,8 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
  *
  * The returned buffer is pinned and is already marked as holding the
  * desired page.  If it already did have the desired page, *foundPtr is
- * set true.  Otherwise, *foundPtr is set false and the buffer is marked
- * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
- *
- * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
- * we keep it for simplicity in ReadBuffer.
+ * set true.  Otherwise, *foundPtr is set false.  A read should be
+ * performed with CompleteReadBuffers().
  *
  * io_context is passed as an output parameter to avoid calling
  * IOContextForStrategy() when there is a shared buffers hit and no IO
@@ -1291,19 +1440,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		{
 			/*
 			 * We can only get here if (a) someone else is still reading in
-			 * the page, or (b) a previous read attempt failed.  We have to
-			 * wait for any active read attempt to finish, and then set up our
-			 * own read attempt if the page is still not BM_VALID.
-			 * StartBufferIO does it all.
+			 * the page, (b) a previous read attempt failed, or (c) someone
+			 * called PrepareReadBuffer() but not yet CompleteReadBuffers().
 			 */
-			if (StartBufferIO(buf, true))
-			{
-				/*
-				 * If we get here, previous attempts to read the buffer must
-				 * have failed ... but we shall bravely try again.
-				 */
-				*foundPtr = false;
-			}
+			*foundPtr = false;
 		}
 
 		return buf;
@@ -1368,19 +1508,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		{
 			/*
 			 * We can only get here if (a) someone else is still reading in
-			 * the page, or (b) a previous read attempt failed.  We have to
-			 * wait for any active read attempt to finish, and then set up our
-			 * own read attempt if the page is still not BM_VALID.
-			 * StartBufferIO does it all.
+			 * the page, (b) a previous read attempt failed, or (c) someone
+			 * called PrepareReadBuffer() but not yet CompleteReadBuffers().
 			 */
-			if (StartBufferIO(existing_buf_hdr, true))
-			{
-				/*
-				 * If we get here, previous attempts to read the buffer must
-				 * have failed ... but we shall bravely try again.
-				 */
-				*foundPtr = false;
-			}
+			*foundPtr = false;
 		}
 
 		return existing_buf_hdr;
@@ -1412,15 +1543,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	LWLockRelease(newPartitionLock);
 
 	/*
-	 * Buffer contents are currently invalid.  Try to obtain the right to
-	 * start I/O.  If StartBufferIO returns false, then someone else managed
-	 * to read it before we did, so there's nothing left for BufferAlloc() to
-	 * do.
+	 * Buffer contents are currently invalid.
 	 */
-	if (StartBufferIO(victim_buf_hdr, true))
-		*foundPtr = false;
-	else
-		*foundPtr = true;
+	*foundPtr = false;
 
 	return victim_buf_hdr;
 }
@@ -1774,7 +1899,7 @@ again:
  * pessimistic, but outside of toy-sized shared_buffers it should allow
  * sufficient pins.
  */
-static void
+void
 LimitAdditionalPins(uint32 *additional_pins)
 {
 	uint32		max_backends;
@@ -2043,7 +2168,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 
 				buf_state &= ~BM_VALID;
 				UnlockBufHdr(existing_hdr, buf_state);
-			} while (!StartBufferIO(existing_hdr, true));
+			} while (!StartBufferIO(existing_hdr, true, false));
 		}
 		else
 		{
@@ -2066,7 +2191,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 			LWLockRelease(partition_lock);
 
 			/* XXX: could combine the locked operations in it with the above */
-			StartBufferIO(victim_buf_hdr, true);
+			StartBufferIO(victim_buf_hdr, true, false);
 		}
 	}
 
@@ -2381,7 +2506,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
 	else
 	{
 		/*
-		 * If we previously pinned the buffer, it must surely be valid.
+		 * If we previously pinned the buffer, it is likely to be valid, but
+		 * it may not be if PrepareReadBuffer() was called and
+		 * CompleteReadBuffers() hasn't been called yet.  We'll check by
+		 * loading the flags without locking.  This is racy, but it's OK to
+		 * return false spuriously: when CompleteReadBuffers() calls
+		 * StartBufferIO(), it'll see that it's now valid.
 		 *
 		 * Note: We deliberately avoid a Valgrind client request here.
 		 * Individual access methods can optionally superimpose buffer page
@@ -2390,7 +2520,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
 		 * that the buffer page is legitimately non-accessible here.  We
 		 * cannot meddle with that.
 		 */
-		result = true;
+		result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0;
 	}
 
 	ref->refcount++;
@@ -3458,7 +3588,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 * someone else flushed the buffer before we could, so we need not do
 	 * anything.
 	 */
-	if (!StartBufferIO(buf, false))
+	if (!StartBufferIO(buf, false, false))
 		return;
 
 	/* Setup error traceback support for ereport() */
@@ -4845,6 +4975,46 @@ ConditionalLockBuffer(Buffer buffer)
 									LW_EXCLUSIVE);
 }
 
+/*
+ * Zero a buffer, and lock it as RBM_ZERO_AND_LOCK or
+ * RBM_ZERO_AND_CLEANUP_LOCK would.  The buffer must be already pinned.  It
+ * does not have to be valid, but it is valid and locked on return.
+ */
+void
+ZeroBuffer(Buffer buffer, ReadBufferMode mode)
+{
+	BufferDesc *bufHdr;
+	uint32		buf_state;
+
+	Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
+
+	if (BufferIsLocal(buffer))
+		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+	else
+	{
+		bufHdr = GetBufferDescriptor(buffer - 1);
+		if (mode == RBM_ZERO_AND_LOCK)
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+		else
+			LockBufferForCleanup(buffer);
+	}
+
+	memset(BufferGetPage(buffer), 0, BLCKSZ);
+
+	if (BufferIsLocal(buffer))
+	{
+		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		buf_state |= BM_VALID;
+		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+	}
+	else
+	{
+		buf_state = LockBufHdr(bufHdr);
+		buf_state |= BM_VALID;
+		UnlockBufHdr(bufHdr, buf_state);
+	}
+}
+
 /*
  * Verify that this backend is pinning the buffer exactly once.
  *
@@ -5197,9 +5367,15 @@ WaitIO(BufferDesc *buf)
  *
  * Returns true if we successfully marked the buffer as I/O busy,
  * false if someone else already did the work.
+ *
+ * If nowait is true, then we don't wait for an I/O to be finished by another
+ * backend.  In that case, false indicates either that the I/O was already
+ * finished, or is still in progress.  This is useful for callers that want to
+ * find out if they can perform the I/O as part of a larger operation, without
+ * waiting for the answer or distinguishing the reasons why not.
  */
 static bool
-StartBufferIO(BufferDesc *buf, bool forInput)
+StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 {
 	uint32		buf_state;
 
@@ -5212,6 +5388,8 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 		if (!(buf_state & BM_IO_IN_PROGRESS))
 			break;
 		UnlockBufHdr(buf, buf_state);
+		if (nowait)
+			return false;
 		WaitIO(buf);
 	}
 
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 1be4f4f8daf..717b8f58daf 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -109,10 +109,9 @@ PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
  * LocalBufferAlloc -
  *	  Find or create a local buffer for the given page of the given relation.
  *
- * API is similar to bufmgr.c's BufferAlloc, except that we do not need
- * to do any locking since this is all local.   Also, IO_IN_PROGRESS
- * does not get set.  Lastly, we support only default access strategy
- * (hence, usage_count is always advanced).
+ * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
+ * any locking since this is all local.  We support only default access
+ * strategy (hence, usage_count is always advanced).
  */
 BufferDesc *
 LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
@@ -288,7 +287,7 @@ GetLocalVictimBuffer(void)
 }
 
 /* see LimitAdditionalPins() */
-static void
+void
 LimitAdditionalLocalPins(uint32 *additional_pins)
 {
 	uint32		max_pins;
@@ -298,9 +297,10 @@ LimitAdditionalLocalPins(uint32 *additional_pins)
 
 	/*
 	 * In contrast to LimitAdditionalPins() other backends don't play a role
-	 * here. We can allow up to NLocBuffer pins in total.
+	 * here. We can allow up to NLocBuffer pins in total, but it might not be
+	 * initialized yet so read num_temp_buffers.
 	 */
-	max_pins = (NLocBuffer - NLocalPinnedBuffers);
+	max_pins = (num_temp_buffers - NLocalPinnedBuffers);
 
 	if (*additional_pins >= max_pins)
 		*additional_pins = max_pins;
diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build
index 40345bdca27..739d13293fb 100644
--- a/src/backend/storage/meson.build
+++ b/src/backend/storage/meson.build
@@ -1,5 +1,6 @@
 # Copyright (c) 2022-2024, PostgreSQL Global Development Group
 
+subdir('aio')
 subdir('buffer')
 subdir('file')
 subdir('freespace')
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 563a0be5c74..0d7272e796e 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -147,7 +147,9 @@ smgrshutdown(int code, Datum arg)
 /*
  * smgropen() -- Return an SMgrRelation object, creating it if need be.
  *
- * This does not attempt to actually open the underlying file.
+ * This does not attempt to actually open the underlying files.  The returned
+ * object remains valid at least until AtEOXact_SMgr() is called, or until
+ * smgrdestroy() is called in non-transaction backends.
  */
 SMgrRelation
 smgropen(RelFileLocator rlocator, BackendId backend)
@@ -259,10 +261,10 @@ smgrexists(SMgrRelation reln, ForkNumber forknum)
 }
 
 /*
- * smgrclose() -- Close and delete an SMgrRelation object.
+ * smgrdestroy() -- Delete an SMgrRelation object.
  */
 void
-smgrclose(SMgrRelation reln)
+smgrdestroy(SMgrRelation reln)
 {
 	SMgrRelation *owner;
 	ForkNumber	forknum;
@@ -289,12 +291,14 @@ smgrclose(SMgrRelation reln)
 }
 
 /*
- * smgrrelease() -- Release all resources used by this object.
+ * smgrclose() -- Release all resources used by this object.
  *
- * The object remains valid.
+ * The object remains valid, but is moved to the unknown list where it will
+ * be destroyed by AtEOXact_SMgr().  It may be re-owned if it is accessed by a
+ * relation before then.
  */
 void
-smgrrelease(SMgrRelation reln)
+smgrclose(SMgrRelation reln)
 {
 	for (ForkNumber forknum = 0; forknum <= MAX_FORKNUM; forknum++)
 	{
@@ -302,15 +306,20 @@ smgrrelease(SMgrRelation reln)
 		reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
 	}
 	reln->smgr_targblock = InvalidBlockNumber;
+
+	if (reln->smgr_owner)
+	{
+		*reln->smgr_owner = NULL;
+		reln->smgr_owner = NULL;
+		dlist_push_tail(&unowned_relns, &reln->node);
+	}
 }
 
 /*
- * smgrreleaseall() -- Release resources used by all objects.
- *
- * This is called for PROCSIGNAL_BARRIER_SMGRRELEASE.
+ * smgrcloseall() -- Close all objects.
  */
 void
-smgrreleaseall(void)
+smgrcloseall(void)
 {
 	HASH_SEQ_STATUS status;
 	SMgrRelation reln;
@@ -322,14 +331,17 @@ smgrreleaseall(void)
 	hash_seq_init(&status, SMgrRelationHash);
 
 	while ((reln = (SMgrRelation) hash_seq_search(&status)) != NULL)
-		smgrrelease(reln);
+		smgrclose(reln);
 }
 
 /*
- * smgrcloseall() -- Close all existing SMgrRelation objects.
+ * smgrdestroyall() -- Destroy all SMgrRelation objects.
+ *
+ * It must be known that there are no pointers to SMgrRelations, other than
+ * those registered with smgrsetowner().
  */
 void
-smgrcloseall(void)
+smgrdestroyall(void)
 {
 	HASH_SEQ_STATUS status;
 	SMgrRelation reln;
@@ -341,7 +353,7 @@ smgrcloseall(void)
 	hash_seq_init(&status, SMgrRelationHash);
 
 	while ((reln = (SMgrRelation) hash_seq_search(&status)) != NULL)
-		smgrclose(reln);
+		smgrdestroy(reln);
 }
 
 /*
@@ -733,7 +745,8 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
  * AtEOXact_SMgr
  *
  * This routine is called during transaction commit or abort (it doesn't
- * particularly care which).  All transient SMgrRelation objects are closed.
+ * particularly care which).  All transient SMgrRelation objects are
+ * destroyed.
  *
  * We do this as a compromise between wanting transient SMgrRelations to
  * live awhile (to amortize the costs of blind writes of multiple blocks)
@@ -747,7 +760,7 @@ AtEOXact_SMgr(void)
 	dlist_mutable_iter iter;
 
 	/*
-	 * Zap all unowned SMgrRelations.  We rely on smgrclose() to remove each
+	 * Zap all unowned SMgrRelations.  We rely on smgrdestroy() to remove each
 	 * one from the list.
 	 */
 	dlist_foreach_modify(iter, &unowned_relns)
@@ -757,7 +770,7 @@ AtEOXact_SMgr(void)
 
 		Assert(rel->smgr_owner == NULL);
 
-		smgrclose(rel);
+		smgrdestroy(rel);
 	}
 }
 
@@ -768,6 +781,6 @@ AtEOXact_SMgr(void)
 bool
 ProcessBarrierSmgrRelease(void)
 {
-	smgrreleaseall();
+	smgrcloseall();
 	return true;
 }
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index d51d46d3353..a38f1acb37a 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -14,6 +14,7 @@
 #ifndef BUFMGR_H
 #define BUFMGR_H
 
+#include "port/pg_iovec.h"
 #include "storage/block.h"
 #include "storage/buf.h"
 #include "storage/bufpage.h"
@@ -158,6 +159,11 @@ extern PGDLLIMPORT int32 *LocalRefCount;
 #define BUFFER_LOCK_SHARE		1
 #define BUFFER_LOCK_EXCLUSIVE	2
 
+/*
+ * Maximum number of buffers for multi-buffer I/O functions.  This is set to
+ * allow 128kB transfers, unless BLCKSZ and IOV_MAX imply a a smaller maximum.
+ */
+#define MAX_BUFFERS_PER_TRANSFER Min(PG_IOV_MAX, (128 * 1024) / BLCKSZ)
 
 /*
  * prototypes for functions in bufmgr.c
@@ -177,6 +183,18 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator,
 										ForkNumber forkNum, BlockNumber blockNum,
 										ReadBufferMode mode, BufferAccessStrategy strategy,
 										bool permanent);
+extern Buffer PrepareReadBuffer(BufferManagerRelation bmr,
+								ForkNumber forkNum,
+								BlockNumber blockNum,
+								BufferAccessStrategy strategy,
+								bool *foundPtr);
+extern void CompleteReadBuffers(BufferManagerRelation bmr,
+								Buffer *buffers,
+								ForkNumber forknum,
+								BlockNumber blocknum,
+								int nblocks,
+								bool zero_on_error,
+								BufferAccessStrategy strategy);
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
 extern bool BufferIsExclusiveLocked(Buffer buffer);
@@ -247,9 +265,13 @@ extern void LockBufferForCleanup(Buffer buffer);
 extern bool ConditionalLockBufferForCleanup(Buffer buffer);
 extern bool IsBufferCleanupOK(Buffer buffer);
 extern bool HoldingBufferPinThatDelaysRecovery(void);
+extern void ZeroBuffer(Buffer buffer, ReadBufferMode mode);
 
 extern bool BgBufferSync(struct WritebackContext *wb_context);
 
+extern void LimitAdditionalPins(uint32 *additional_pins);
+extern void LimitAdditionalLocalPins(uint32 *additional_pins);
+
 /* in buf_init.c */
 extern void InitBufferPool(void);
 extern Size BufferShmemSize(void);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 527cd2a0568..d8ffe397faf 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -85,8 +85,8 @@ extern void smgrclearowner(SMgrRelation *owner, SMgrRelation reln);
 extern void smgrclose(SMgrRelation reln);
 extern void smgrcloseall(void);
 extern void smgrcloserellocator(RelFileLocatorBackend rlocator);
-extern void smgrrelease(SMgrRelation reln);
-extern void smgrreleaseall(void);
+extern void smgrdestroy(SMgrRelation reln);
+extern void smgrdestroyall(void);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h
new file mode 100644
index 00000000000..40c3408c541
--- /dev/null
+++ b/src/include/storage/streaming_read.h
@@ -0,0 +1,45 @@
+#ifndef STREAMING_READ_H
+#define STREAMING_READ_H
+
+#include "storage/bufmgr.h"
+#include "storage/fd.h"
+#include "storage/smgr.h"
+
+/* Default tuning, reasonable for many users. */
+#define PGSR_FLAG_DEFAULT 0x00
+
+/*
+ * I/O streams that are performing maintenance work on behalf of potentially
+ * many users.
+ */
+#define PGSR_FLAG_MAINTENANCE 0x01
+
+/*
+ * We usually avoid issuing prefetch advice automatically when sequential
+ * access is detected, but this flag explicitly disables it, for cases that
+ * might not be correctly detected.  Explicit advice is known to perform worse
+ * than letting the kernel (at least Linux) detect sequential access.
+ */
+#define PGSR_FLAG_SEQUENTIAL 0x02
+
+struct PgStreamingRead;
+typedef struct PgStreamingRead PgStreamingRead;
+
+/* Callback that returns the next block number to read. */
+typedef BlockNumber (*PgStreamingReadBufferCB) (PgStreamingRead *pgsr,
+												void *pgsr_private,
+												void *per_buffer_private);
+
+extern PgStreamingRead *pg_streaming_read_buffer_alloc(int flags,
+													   void *pgsr_private,
+													   size_t per_buffer_private_size,
+													   BufferAccessStrategy strategy,
+													   BufferManagerRelation bmr,
+													   ForkNumber forknum,
+													   PgStreamingReadBufferCB next_block_cb);
+
+extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr);
+extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_private);
+extern void pg_streaming_read_free(PgStreamingRead *pgsr);
+
+#endif
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index a584b1ddff3..6636cc82c09 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -561,12 +561,6 @@ typedef struct ViewOptions
  *
  * Very little code is authorized to touch rel->rd_smgr directly.  Instead
  * use this function to fetch its value.
- *
- * Note: since a relcache flush can cause the file handle to be closed again,
- * it's unwise to hold onto the pointer returned by this function for any
- * long period.  Recommended practice is to just re-execute RelationGetSmgr
- * each time you need to access the SMgrRelation.  It's quite cheap in
- * comparison to whatever an smgr function is going to do.
  */
 static inline SMgrRelation
 RelationGetSmgr(Relation rel)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 91433d439b7..8007f17320a 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2094,6 +2094,8 @@ PgStat_TableCounts
 PgStat_TableStatus
 PgStat_TableXactStatus
 PgStat_WalStats
+PgStreamingRead
+PgStreamingReadRange
 PgXmlErrorContext
 PgXmlStrictness
 Pg_finfo_record
-- 
2.37.2

