From 7278c354d80e4d1f21c6fa0d810a723789f2d722 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 26 Feb 2024 23:48:31 +1300
Subject: [PATCH v3 1/2] Streaming read API changes that are not committed to
 master yet

Discussion: https://www.postgresql.org/message-id/CA%2BhUKGJkOiOCa%2Bmag4BF%2BzHo7qo%3Do9CFheB8%3Dg6uT5TUm2gkvA%40mail.gmail.com
---
 src/include/storage/bufmgr.h                  |  45 +-
 src/include/storage/streaming_read.h          |  50 ++
 src/backend/storage/Makefile                  |   2 +-
 src/backend/storage/aio/Makefile              |  14 +
 src/backend/storage/aio/meson.build           |   5 +
 src/backend/storage/aio/streaming_read.c      | 678 +++++++++++++++++
 src/backend/storage/buffer/bufmgr.c           | 706 ++++++++++++------
 src/backend/storage/buffer/localbuf.c         |  14 +-
 src/backend/storage/meson.build               |   1 +
 src/backend/utils/misc/guc_tables.c           |  14 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 doc/src/sgml/config.sgml                      |  14 +
 src/tools/pgindent/typedefs.list              |   2 +
 13 files changed, 1309 insertions(+), 237 deletions(-)
 create mode 100644 src/include/storage/streaming_read.h
 create mode 100644 src/backend/storage/aio/Makefile
 create mode 100644 src/backend/storage/aio/meson.build
 create mode 100644 src/backend/storage/aio/streaming_read.c

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index d51d46d3353..1cc198bde21 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -14,6 +14,7 @@
 #ifndef BUFMGR_H
 #define BUFMGR_H
 
+#include "port/pg_iovec.h"
 #include "storage/block.h"
 #include "storage/buf.h"
 #include "storage/bufpage.h"
@@ -133,6 +134,10 @@ extern PGDLLIMPORT bool track_io_timing;
 extern PGDLLIMPORT int effective_io_concurrency;
 extern PGDLLIMPORT int maintenance_io_concurrency;
 
+#define MAX_BUFFER_IO_SIZE PG_IOV_MAX
+#define DEFAULT_BUFFER_IO_SIZE Min(MAX_BUFFER_IO_SIZE, (128 * 1024) / BLCKSZ)
+extern PGDLLIMPORT int buffer_io_size;
+
 extern PGDLLIMPORT int checkpoint_flush_after;
 extern PGDLLIMPORT int backend_flush_after;
 extern PGDLLIMPORT int bgwriter_flush_after;
@@ -158,7 +163,6 @@ extern PGDLLIMPORT int32 *LocalRefCount;
 #define BUFFER_LOCK_SHARE		1
 #define BUFFER_LOCK_EXCLUSIVE	2
 
-
 /*
  * prototypes for functions in bufmgr.c
  */
@@ -177,6 +181,42 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator,
 										ForkNumber forkNum, BlockNumber blockNum,
 										ReadBufferMode mode, BufferAccessStrategy strategy,
 										bool permanent);
+
+#define READ_BUFFERS_ZERO_ON_ERROR 0x01
+#define READ_BUFFERS_ISSUE_ADVICE 0x02
+
+/*
+ * Private state used by StartReadBuffers() and WaitReadBuffers().  Declared
+ * in public header only to allow inclusion in other structs, but contents
+ * should not be accessed.
+ */
+struct ReadBuffersOperation
+{
+	/* Parameters passed in to StartReadBuffers(). */
+	BufferManagerRelation bmr;
+	Buffer	   *buffers;
+	ForkNumber	forknum;
+	BlockNumber blocknum;
+	int16		nblocks;
+	BufferAccessStrategy strategy;
+	int			flags;
+
+	/* Range of buffers, if we need to perform a read. */
+	int16		io_buffers_len;
+};
+
+typedef struct ReadBuffersOperation ReadBuffersOperation;
+
+extern bool StartReadBuffers(BufferManagerRelation bmr,
+							 Buffer *buffers,
+							 ForkNumber forknum,
+							 BlockNumber blocknum,
+							 int *nblocks,
+							 BufferAccessStrategy strategy,
+							 int flags,
+							 ReadBuffersOperation *operation);
+extern void WaitReadBuffers(ReadBuffersOperation *operation);
+
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
 extern bool BufferIsExclusiveLocked(Buffer buffer);
@@ -250,6 +290,9 @@ extern bool HoldingBufferPinThatDelaysRecovery(void);
 
 extern bool BgBufferSync(struct WritebackContext *wb_context);
 
+extern void LimitAdditionalPins(uint32 *additional_pins);
+extern void LimitAdditionalLocalPins(uint32 *additional_pins);
+
 /* in buf_init.c */
 extern void InitBufferPool(void);
 extern Size BufferShmemSize(void);
diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h
new file mode 100644
index 00000000000..7991402631a
--- /dev/null
+++ b/src/include/storage/streaming_read.h
@@ -0,0 +1,50 @@
+#ifndef STREAMING_READ_H
+#define STREAMING_READ_H
+
+#include "storage/bufmgr.h"
+#include "storage/fd.h"
+#include "storage/smgr.h"
+
+/* Default tuning, reasonable for many users. */
+#define STREAMING_READ_DEFAULT 0x00
+
+/*
+ * I/O streams that are performing maintenance work on behalf of potentially
+ * many users.
+ */
+#define STREAMING_READ_MAINTENANCE 0x01
+
+/*
+ * We usually avoid issuing prefetch advice automatically when sequential
+ * access is detected, but this flag explicitly disables it, for cases that
+ * might not be correctly detected.  Explicit advice is known to perform worse
+ * than letting the kernel (at least Linux) detect sequential access.
+ */
+#define STREAMING_READ_SEQUENTIAL 0x02
+
+/*
+ * We usually ramp up from smaller reads to larger ones, to support users who
+ * don't know if it's worth reading lots of buffers yet.  This flag disables
+ * that, declaring ahead of time that we'll be reading all available buffers.
+ */
+#define STREAMING_READ_FULL 0x04
+
+struct StreamingRead;
+typedef struct StreamingRead StreamingRead;
+
+/* Callback that returns the next block number to read. */
+typedef BlockNumber (*StreamingReadBufferCB) (StreamingRead *stream,
+											  void *callback_private_data,
+											  void *per_buffer_data);
+
+extern StreamingRead *streaming_read_buffer_begin(int flags,
+												  BufferAccessStrategy strategy,
+												  BufferManagerRelation bmr,
+												  ForkNumber forknum,
+												  StreamingReadBufferCB callback,
+												  void *callback_private_data,
+												  size_t per_buffer_data_size);
+extern Buffer streaming_read_buffer_next(StreamingRead *stream, void **per_buffer_private);
+extern void streaming_read_buffer_end(StreamingRead *stream);
+
+#endif
diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile
index 8376cdfca20..eec03f6f2b4 100644
--- a/src/backend/storage/Makefile
+++ b/src/backend/storage/Makefile
@@ -8,6 +8,6 @@ subdir = src/backend/storage
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS     = buffer file freespace ipc large_object lmgr page smgr sync
+SUBDIRS     = aio buffer file freespace ipc large_object lmgr page smgr sync
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile
new file mode 100644
index 00000000000..bcab44c802f
--- /dev/null
+++ b/src/backend/storage/aio/Makefile
@@ -0,0 +1,14 @@
+#
+# Makefile for storage/aio
+#
+# src/backend/storage/aio/Makefile
+#
+
+subdir = src/backend/storage/aio
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+	streaming_read.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build
new file mode 100644
index 00000000000..39aef2a84a2
--- /dev/null
+++ b/src/backend/storage/aio/meson.build
@@ -0,0 +1,5 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+backend_sources += files(
+  'streaming_read.c',
+)
diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c
new file mode 100644
index 00000000000..760a231500a
--- /dev/null
+++ b/src/backend/storage/aio/streaming_read.c
@@ -0,0 +1,678 @@
+/*-------------------------------------------------------------------------
+ *
+ * streaming_read.c
+ *	  Mechanism for buffer access with look-ahead
+ *
+ * Portions Copyright (c) 2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * Code that needs to access relation data typically pins blocks one at a
+ * time, often in a predictable order that might be sequential or data-driven.
+ * Calling the simple ReadBuffer() function for each block is inefficient,
+ * because blocks that are not yet in the buffer pool require I/O operations
+ * that are small and might stall waiting for storage.  This mechanism looks
+ * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
+ * neighboring blocks together and ahead of time, with an adaptive look-ahead
+ * distance.
+ *
+ * A user-provided callback generates a stream of block numbers that is used
+ * to form reads of up to size buffer_io_size, by attempting to merge them
+ * with a pending read.  When that isn't possible, the existing pending read
+ * is sent to StartReadBuffers() so that a new one can begin to form.
+ *
+ * The algorithm for controlling the look-ahead distance tries to classify the
+ * stream into three ideal behaviors:
+ *
+ * A) No I/O is necessary, because the requested blocks are fully cached
+ * already.  There is no benefit to looking ahead more than one block, so
+ * distance is 1.  This is the default initial assumption.
+ *
+ * B) I/O is necessary, but fadvise is undesirable because the access is
+ * sequential, or impossible because direct I/O is enabled or the system
+ * doesn't support advice.  There is no benefit in looking ahead more than
+ * buffer_io_size (the GUC controlling physical read size), because in this
+ * case only goal is larger read system calls.  Looking further ahead would
+ * pin many buffers and perform speculative work looking ahead for no benefit.
+ *
+ * C) I/O is necesssary, it appears random, and this system supports fadvise.
+ * We'll look further ahead in order to reach the configured level of I/O
+ * concurrency.
+ *
+ * The distance increases rapidly and decays slowly, so that it moves towards
+ * those levels as different I/O patterns are discovered.  For example, a
+ * sequential scan of fully cached data doesn't bother looking ahead, but a
+ * sequential scan that hits a region of uncached blocks will start issuing
+ * increasingly wide read calls until it plateaus at buffer_io_size.
+ *
+ * The main data structure is a circular queue of buffers of size
+ * max_pinned_buffers, ready to be returned by streaming_read_buffer_next().
+ * Each buffer also has an optional variable sized object that is passed from
+ * the callback to the consumer of buffers.  A third array records whether
+ * WaitReadBuffers() must be called before returning the buffer, and if so,
+ * points to the relevant ReadBuffersOperation object.
+ *
+ * For example, if the callback return block numbers 10, 42, 43, 60 in
+ * successive calls, then these data structures might appear as follows:
+ *
+ *                          buffers buf/data buf/io       ios
+ *
+ *                          +----+  +-----+  +---+        +--------+
+ *                          |    |  |     |  |   |  +---->| 42..44 |
+ *                          +----+  +-----+  +---+  |     +--------+
+ *   oldest_buffer_index -> | 10 |  |  ?  |  |   |  | +-->| 60..60 |
+ *                          +----+  +-----+  +---+  | |   +--------+
+ *                          | 42 |  |  ?  |  | 0 +--+ |   |        |
+ *                          +----+  +-----+  +---+    |   +--------+
+ *                          | 43 |  |  ?  |  |   |    |   |        |
+ *                          +----+  +-----+  +---+    |   +--------+
+ *                          | 44 |  |  ?  |  |   |    |   |        |
+ *                          +----+  +-----+  +---+    |   +--------+
+ *                          | 60 |  |  ?  |  | 1 +----+
+ *                          +----+  +-----+  +---+
+ *     next_buffer_index -> |    |  |     |  |   |
+ *                          +----+  +-----+  +---+
+ *
+ * In the example, 5 buffers are pinned, and the next buffer to be streamed to
+ * the client is block 10.  Block 10 was a hit and has no associated I/O, but
+ * the range 42..44 requires an I/O wait before its buffers are returned, as
+ * does block 60.
+ *
+ * IDENTIFICATION
+ *	  src/backend/storage/storage/aio/streaming_read.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "catalog/pg_tablespace.h"
+#include "miscadmin.h"
+#include "storage/streaming_read.h"
+#include "utils/rel.h"
+#include "utils/spccache.h"
+
+/*
+ * Streaming read object.
+ */
+struct StreamingRead
+{
+	int16		max_ios;
+	int16		ios_in_progress;
+	int16		max_pinned_buffers;
+	int16		pinned_buffers;
+	int16		distance;
+	bool		started;
+	bool		finished;
+	bool		advice_enabled;
+
+	/*
+	 * The callback that will tell us which block numbers to read, and an
+	 * opaque pointer that will be pass to it for its own purposes.
+	 */
+	StreamingReadBufferCB callback;
+	void	   *callback_private_data;
+
+	/* The relation we will read. */
+	BufferAccessStrategy strategy;
+	BufferManagerRelation bmr;
+	ForkNumber	forknum;
+
+	/* Sometimes we need to buffer one block for flow control. */
+	BlockNumber unget_blocknum;
+	void	   *unget_per_buffer_data;
+
+	/* Next expected block, for detecting sequential access. */
+	BlockNumber seq_blocknum;
+
+	/* The read operation we are currently preparing. */
+	BlockNumber pending_read_blocknum;
+	int16		pending_read_nblocks;
+
+	/* Space for buffers and optional per-buffer private data. */
+	Buffer	   *buffers;
+	size_t		per_buffer_data_size;
+	void	   *per_buffer_data;
+	int16	   *buffer_io_indexes;
+
+	/* Read operations that have been started by not waited for yet. */
+	ReadBuffersOperation *ios;
+	int16		next_io_index;
+
+	/* Head and tail of the circular queue of buffers. */
+	int16		oldest_buffer_index;	/* Next pinned buffer to return */
+	int16		next_buffer_index;	/* Index of next buffer to pin */
+};
+
+/*
+ * Return a pointer to the per-buffer data by index.
+ */
+static void *
+get_per_buffer_data(StreamingRead *stream, int16 buffer_index)
+{
+	return (char *) stream->per_buffer_data +
+		stream->per_buffer_data_size * buffer_index;
+}
+
+/*
+ * Ask the callback which block it would like us to read next, with a small
+ * buffer in front to allow streaming_unget_block() to work.
+ */
+static BlockNumber
+streaming_read_get_block(StreamingRead *stream, void *per_buffer_data)
+{
+	BlockNumber result;
+
+	if (unlikely(stream->unget_blocknum != InvalidBlockNumber))
+	{
+		/*
+		 * If we had to unget a block, now it is time to return that one
+		 * again.
+		 */
+		result = stream->unget_blocknum;
+		stream->unget_blocknum = InvalidBlockNumber;
+
+		/*
+		 * The same per_buffer_data element must have been used, and still
+		 * contains whatever data the callback wrote into it.  So we just
+		 * sanity-check that we were called with the value that
+		 * streaming_unget_block() pushed back.
+		 */
+		Assert(per_buffer_data == stream->unget_per_buffer_data);
+	}
+	else
+	{
+		/* Use the installed callback directly. */
+		result = stream->callback(stream,
+								  stream->callback_private_data,
+								  per_buffer_data);
+	}
+
+	return result;
+}
+
+/*
+ * In order to deal with short reads in StartReadBuffers(), we sometimes need
+ * to defer handling of a block until later.  This *must* be called with the
+ * last value returned by streaming_get_block().
+ */
+static void
+streaming_read_unget_block(StreamingRead *stream, BlockNumber blocknum, void *per_buffer_data)
+{
+	Assert(stream->unget_blocknum == InvalidBlockNumber);
+	stream->unget_blocknum = blocknum;
+	stream->unget_per_buffer_data = per_buffer_data;
+}
+
+static void
+streaming_read_start_pending_read(StreamingRead *stream)
+{
+	bool		need_wait;
+	int			nblocks;
+	int16		io_index;
+	int16		overflow;
+	int			flags;
+
+	/* This should only be called with a pending read. */
+	Assert(stream->pending_read_nblocks > 0);
+	Assert(stream->pending_read_nblocks <= buffer_io_size);
+
+	/* We had better not exceed the pin limit by starting this read. */
+	Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
+		   stream->max_pinned_buffers);
+
+	/* We had better not be overwriting an existing pinned buffer. */
+	if (stream->pinned_buffers > 0)
+		Assert(stream->next_buffer_index != stream->oldest_buffer_index);
+	else
+		Assert(stream->next_buffer_index == stream->oldest_buffer_index);
+
+	/*
+	 * If advice hasn't been suppressed, and this system supports it, this
+	 * isn't a strictly sequential pattern, then we'll issue advice.
+	 */
+	if (stream->advice_enabled &&
+		stream->started &&
+		stream->pending_read_blocknum != stream->seq_blocknum)
+		flags = READ_BUFFERS_ISSUE_ADVICE;
+	else
+		flags = 0;
+
+	/* Suppress advice on the first call, because it's too late to benefit. */
+	if (!stream->started)
+		stream->started = true;
+
+	/* We say how many blocks we want to read, but may be smaller on return. */
+	nblocks = stream->pending_read_nblocks;
+	need_wait =
+		StartReadBuffers(stream->bmr,
+						 &stream->buffers[stream->next_buffer_index],
+						 stream->forknum,
+						 stream->pending_read_blocknum,
+						 &nblocks,
+						 stream->strategy,
+						 flags,
+						 &stream->ios[stream->next_io_index]);
+	stream->pinned_buffers += nblocks;
+
+	/* Remember whether we need to wait before returning this buffer. */
+	if (!need_wait)
+	{
+		io_index = -1;
+
+		/* Look-ahead distance decays, no I/O necessary (behavior A). */
+		if (stream->distance > 1)
+			stream->distance--;
+	}
+	else
+	{
+		/*
+		 * Remember to call WaitReadBuffers() before returning head buffer.
+		 * Look-ahead distance will be adjusted after waiting.
+		 */
+		io_index = stream->next_io_index;
+		if (++stream->next_io_index == stream->max_ios)
+			stream->next_io_index = 0;
+
+		Assert(stream->ios_in_progress < stream->max_ios);
+		stream->ios_in_progress++;
+	}
+
+	/* Set up the pointer to the I/O for the head buffer, if there is one. */
+	stream->buffer_io_indexes[stream->next_buffer_index] = io_index;
+
+	/*
+	 * We gave a contiguous range of buffer space to StartReadBuffers(), but
+	 * we want it to wrap around at max_pinned_buffers.  Move values that
+	 * overflowed into the extra space.  At the same time, put -1 in the I/O
+	 * slots for the rest of the buffers to indicate no I/O.  They are covered
+	 * by the head buffer's I/O, if there is one.  We avoid a % operator.
+	 */
+	overflow = (stream->next_buffer_index + nblocks) - stream->max_pinned_buffers;
+	if (overflow > 0)
+	{
+		memmove(&stream->buffers[0],
+				&stream->buffers[stream->max_pinned_buffers],
+				sizeof(stream->buffers[0]) * overflow);
+		for (int i = 0; i < overflow; ++i)
+			stream->buffer_io_indexes[i] = -1;
+		for (int i = 1; i < nblocks - overflow; ++i)
+			stream->buffer_io_indexes[stream->next_buffer_index + i] = -1;
+	}
+	else
+	{
+		for (int i = 1; i < nblocks; ++i)
+			stream->buffer_io_indexes[stream->next_buffer_index + i] = -1;
+	}
+
+	/*
+	 * Remember where the next block would be after that, so we can detect
+	 * sequential access next time and suppress advice.
+	 */
+	stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
+
+	/* Compute location of start of next read, without using % operator. */
+	stream->next_buffer_index += nblocks;
+	if (stream->next_buffer_index >= stream->max_pinned_buffers)
+		stream->next_buffer_index -= stream->max_pinned_buffers;
+	Assert(stream->next_buffer_index >= 0);
+	Assert(stream->next_buffer_index < stream->max_pinned_buffers);
+
+	/* Adjust the pending read to cover the remaining portion, if any. */
+	stream->pending_read_blocknum += nblocks;
+	stream->pending_read_nblocks -= nblocks;
+}
+
+static void
+streaming_read_look_ahead(StreamingRead *stream)
+{
+	while (!stream->finished &&
+		   stream->ios_in_progress < stream->max_ios &&
+		   stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
+	{
+		BlockNumber blocknum;
+		int16		buffer_index;
+		void	   *per_buffer_data;
+
+		if (stream->pending_read_nblocks == buffer_io_size)
+		{
+			streaming_read_start_pending_read(stream);
+			continue;
+		}
+
+		/*
+		 * See which block the callback wants next in the stream.  We need to
+		 * compute the index of the Nth block of the pending read including
+		 * wrap-around, but we don't want to use the expensive % operator.
+		 */
+		buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
+		if (buffer_index > stream->max_pinned_buffers)
+			buffer_index -= stream->max_pinned_buffers;
+		per_buffer_data = get_per_buffer_data(stream, buffer_index);
+		blocknum = streaming_read_get_block(stream, per_buffer_data);
+		if (blocknum == InvalidBlockNumber)
+		{
+			stream->finished = true;
+			continue;
+		}
+
+		/* Can we merge it with the pending read? */
+		if (stream->pending_read_nblocks > 0 &&
+			stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
+		{
+			stream->pending_read_nblocks++;
+			continue;
+		}
+
+		/* We have to start the pending read before we can build another. */
+		if (stream->pending_read_nblocks > 0)
+		{
+			streaming_read_start_pending_read(stream);
+			if (stream->ios_in_progress == stream->max_ios)
+			{
+				/* And we've hit the limit.  Rewind, and stop here. */
+				streaming_read_unget_block(stream, blocknum, per_buffer_data);
+				return;
+			}
+		}
+
+		/* This is the start of a new pending read. */
+		stream->pending_read_blocknum = blocknum;
+		stream->pending_read_nblocks = 1;
+	}
+
+	/*
+	 * Normally we don't start the pending read just because we've hit a
+	 * limit, preferring to give it another chance to grow to a larger size
+	 * once more buffers have been consumed.  However, in cases where that
+	 * can't possibly happen, we might as well start the read immediately.
+	 */
+	if (((stream->pending_read_nblocks > 0 && stream->finished) ||
+		 (stream->pending_read_nblocks == stream->distance)) &&
+		stream->ios_in_progress < stream->max_ios)
+		streaming_read_start_pending_read(stream);
+}
+
+/*
+ * Create a new streaming read object that can be used to perform the
+ * equivalent of a series of ReadBuffer() calls for one fork of one relation.
+ * Internally, it generates larger vectored reads where possible by looking
+ * ahead.  The callback should return block numbers or InvalidBlockNumber to
+ * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
+ * write extra data for each block into the space provided to it.  It will
+ * also receive callback_private_data for its own purposes.
+ */
+StreamingRead *
+streaming_read_buffer_begin(int flags,
+							BufferAccessStrategy strategy,
+							BufferManagerRelation bmr,
+							ForkNumber forknum,
+							StreamingReadBufferCB callback,
+							void *callback_private_data,
+							size_t per_buffer_data_size)
+{
+	StreamingRead *stream;
+	int16		max_ios;
+	uint32		max_pinned_buffers;
+	Oid			tablespace_id;
+
+	/*
+	 * Make sure our bmr's smgr and persistent are populated.  The caller
+	 * asserts that the storage manager will remain valid.
+	 */
+	if (!bmr.smgr)
+	{
+		bmr.smgr = RelationGetSmgr(bmr.rel);
+		bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
+	}
+
+	/*
+	 * Decide how many assumed I/Os we will allow to run concurrently.  That
+	 * is, advice to the kernel to tell it that we will soon read.  This
+	 * number also affects how far we look ahead for opportunities to start
+	 * more I/Os.
+	 */
+	tablespace_id = bmr.smgr->smgr_rlocator.locator.spcOid;
+	if (!OidIsValid(MyDatabaseId) ||
+		(bmr.rel && IsCatalogRelation(bmr.rel)) ||
+		IsCatalogRelationOid(bmr.smgr->smgr_rlocator.locator.relNumber))
+	{
+		/*
+		 * Avoid circularity while trying to look up tablespace settings or
+		 * before spccache.c is ready.
+		 */
+		max_ios = effective_io_concurrency;
+	}
+	else if (flags & STREAMING_READ_MAINTENANCE)
+		max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
+	else
+		max_ios = get_tablespace_io_concurrency(tablespace_id);
+
+	/*
+	 * Choose a maximum number of buffers we're prepared to pin.  We try to
+	 * pin fewer if we can, though.  We clamp it to at least buffer_io_size so
+	 * that we can have a chance to build up a full sized read, even when
+	 * max_ios is zero.
+	 */
+	max_pinned_buffers = Max(max_ios * 4, buffer_io_size);
+
+	/* Don't allow this backend to pin more than its share of buffers. */
+	if (SmgrIsTemp(bmr.smgr))
+		LimitAdditionalLocalPins(&max_pinned_buffers);
+	else
+		LimitAdditionalPins(&max_pinned_buffers);
+	Assert(max_pinned_buffers > 0);
+
+	stream = (StreamingRead *) palloc0(sizeof(StreamingRead));
+
+#ifdef USE_PREFETCH
+
+	/*
+	 * This system supports prefetching advice.  We can use it as long as
+	 * direct I/O isn't enabled, the caller hasn't promised sequential access
+	 * (overriding our detection heuristics), and max_ios hasn't been set to
+	 * zero.
+	 */
+	if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
+		(flags & STREAMING_READ_SEQUENTIAL) == 0 &&
+		max_ios > 0)
+		stream->advice_enabled = true;
+#endif
+
+	/*
+	 * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
+	 * above.  If we had real asynchronous I/O we might need a slightly
+	 * different definition.
+	 */
+	if (max_ios == 0)
+		max_ios = 1;
+
+	stream->max_ios = max_ios;
+	stream->per_buffer_data_size = per_buffer_data_size;
+	stream->max_pinned_buffers = max_pinned_buffers;
+	stream->strategy = strategy;
+
+	stream->bmr = bmr;
+	stream->forknum = forknum;
+	stream->callback = callback;
+	stream->callback_private_data = callback_private_data;
+
+	stream->unget_blocknum = InvalidBlockNumber;
+
+	/*
+	 * Skip the initial ramp-up phase if the caller says we're going to be
+	 * reading the whole relation.  This way we start out doing full-sized
+	 * reads.
+	 */
+	if (flags & STREAMING_READ_FULL)
+		stream->distance = stream->max_pinned_buffers;
+	else
+		stream->distance = 1;
+
+	/*
+	 * Space for the buffers we pin.  Though we never pin more than
+	 * max_pinned_buffers, we want to be able to assume that all the buffers
+	 * for a single read are contiguous (i.e. don't wrap around halfway
+	 * through), so we let the final one run past that position temporarily by
+	 * allocating an extra buffer_io_size - 1 elements.
+	 */
+	stream->buffers = palloc((max_pinned_buffers + buffer_io_size - 1) *
+							 sizeof(stream->buffers[0]));
+
+	/* Space for per-buffer data, if configured. */
+	if (per_buffer_data_size)
+		stream->per_buffer_data =
+			palloc(per_buffer_data_size * (max_pinned_buffers +
+										   buffer_io_size - 1));
+
+	/* Space for the IOs that we might run. */
+	stream->buffer_io_indexes = palloc(max_pinned_buffers * sizeof(stream->buffer_io_indexes[0]));
+	stream->ios = palloc(max_ios * sizeof(ReadBuffersOperation));
+
+	return stream;
+}
+
+/*
+ * Pull one pinned buffer out of a stream created with
+ * streaming_read_buffer_begin().  Each call returns successive blocks in the
+ * order specified by the callback.  If per_buffer_data_size was set to a
+ * non-zero size, *per_buffer_data receives a pointer to the extra per-buffer
+ * data that the callback had a chance to populate.  When the stream runs out
+ * of data, InvalidBuffer is returned.  The caller may decide to end the
+ * stream early at any time by calling streaming_read_end().
+ */
+Buffer
+streaming_read_buffer_next(StreamingRead *stream, void **per_buffer_data)
+{
+	Buffer		buffer;
+	int16		io_index;
+	int16		oldest_buffer_index;
+
+	if (unlikely(stream->pinned_buffers == 0))
+	{
+		Assert(stream->oldest_buffer_index == stream->next_buffer_index);
+
+		if (stream->finished)
+			return InvalidBuffer;
+
+		/*
+		 * The usual order of operations is that we look ahead at the bottom
+		 * of this function after potentially finishing an I/O and making
+		 * space for more, but we need a special case to prime the stream when
+		 * we're getting started.
+		 */
+		Assert(!stream->started);
+		streaming_read_look_ahead(stream);
+		if (stream->pinned_buffers == 0)
+			return InvalidBuffer;
+	}
+
+	/* Grab the oldest pinned buffer and associated per-buffer data. */
+	oldest_buffer_index = stream->oldest_buffer_index;
+	Assert(oldest_buffer_index >= 0 &&
+		   oldest_buffer_index < stream->max_pinned_buffers);
+	buffer = stream->buffers[oldest_buffer_index];
+	if (per_buffer_data)
+		*per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
+
+	Assert(BufferIsValid(buffer));
+
+	/* Do we have to wait for an associated I/O first? */
+	io_index = stream->buffer_io_indexes[oldest_buffer_index];
+	Assert(io_index >= -1 && io_index < stream->max_ios);
+	if (io_index >= 0)
+	{
+		int			distance;
+
+		/* Sanity check that we still agree on the buffers. */
+		Assert(stream->ios[io_index].buffers == &stream->buffers[oldest_buffer_index]);
+
+		WaitReadBuffers(&stream->ios[io_index]);
+
+		Assert(stream->ios_in_progress > 0);
+		stream->ios_in_progress--;
+
+		if (stream->ios[io_index].flags & READ_BUFFERS_ISSUE_ADVICE)
+		{
+			/* Distance ramps up fast (behavior C). */
+			distance = stream->distance * 2;
+			distance = Min(distance, stream->max_pinned_buffers);
+			stream->distance = distance;
+		}
+		else
+		{
+			/* No advice; move towards full I/O size (behavior B). */
+			if (stream->distance > buffer_io_size)
+			{
+				stream->distance--;
+			}
+			else
+			{
+				distance = stream->distance * 2;
+				distance = Min(distance, buffer_io_size);
+				distance = Min(distance, stream->max_pinned_buffers);
+				stream->distance = distance;
+			}
+		}
+	}
+
+	/* Advance the oldest buffer, but clobber it first for debugging. */
+#ifdef USE_ASSERT_CHECKING
+	stream->buffers[oldest_buffer_index] = InvalidBuffer;
+	stream->buffer_io_indexes[oldest_buffer_index] = -1;
+	if (stream->per_buffer_data)
+		memset(get_per_buffer_data(stream, oldest_buffer_index),
+			   0xff,
+			   stream->per_buffer_data_size);
+#endif
+	if (++stream->oldest_buffer_index == stream->max_pinned_buffers)
+		stream->oldest_buffer_index = 0;
+
+	/* We are transferring ownership of the pin to the caller. */
+	Assert(stream->pinned_buffers > 0);
+	stream->pinned_buffers--;
+
+	/*
+	 * When distance is minimal, we finish up with no queued buffers.  As a
+	 * micro-optimization, we can then reset our circular queues, so that
+	 * all-cached streams re-use the same elements instead of rotating through
+	 * memory.
+	 */
+	if (stream->pinned_buffers == 0)
+	{
+		Assert(stream->oldest_buffer_index == stream->next_buffer_index);
+		stream->oldest_buffer_index = 0;
+		stream->next_buffer_index = 0;
+		stream->next_io_index = 0;
+	}
+
+	/* Prepare for the next call. */
+	streaming_read_look_ahead(stream);
+
+	return buffer;
+}
+
+/*
+ * Finish streaming blocks and release all resources.
+ */
+void
+streaming_read_buffer_end(StreamingRead *stream)
+{
+	Buffer		buffer;
+
+	/* Stop looking ahead. */
+	stream->finished = true;
+
+	/* Unpin anything that wasn't consumed. */
+	while ((buffer = streaming_read_buffer_next(stream, NULL)) != InvalidBuffer)
+		ReleaseBuffer(buffer);
+
+	Assert(stream->pinned_buffers == 0);
+	Assert(stream->ios_in_progress == 0);
+
+	/* Release memory. */
+	pfree(stream->buffers);
+	if (stream->per_buffer_data)
+		pfree(stream->per_buffer_data);
+	pfree(stream->ios);
+
+	pfree(stream);
+}
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f0f8d4259c5..b5347678726 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -19,6 +19,11 @@
  *		and pin it so that no one can destroy it while this process
  *		is using it.
  *
+ * StartReadBuffers() -- as above, but for multiple contiguous blocks in
+ *		two steps.
+ *
+ * WaitReadBuffers() -- second step of StartReadBuffers().
+ *
  * ReleaseBuffer() -- unpin a buffer
  *
  * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty".
@@ -160,6 +165,12 @@ int			checkpoint_flush_after = DEFAULT_CHECKPOINT_FLUSH_AFTER;
 int			bgwriter_flush_after = DEFAULT_BGWRITER_FLUSH_AFTER;
 int			backend_flush_after = DEFAULT_BACKEND_FLUSH_AFTER;
 
+/*
+ * How many buffers should be coalesced into single I/O operations where
+ * possible.
+ */
+int			buffer_io_size = DEFAULT_BUFFER_IO_SIZE;
+
 /* local state for LockBufferForCleanup */
 static BufferDesc *PinCountWaitBuf = NULL;
 
@@ -471,10 +482,9 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
 )
 
 
-static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence,
+static Buffer ReadBuffer_common(BufferManagerRelation bmr,
 								ForkNumber forkNum, BlockNumber blockNum,
-								ReadBufferMode mode, BufferAccessStrategy strategy,
-								bool *hit);
+								ReadBufferMode mode, BufferAccessStrategy strategy);
 static BlockNumber ExtendBufferedRelCommon(BufferManagerRelation bmr,
 										   ForkNumber fork,
 										   BufferAccessStrategy strategy,
@@ -500,7 +510,7 @@ static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
 static int	SyncOneBuffer(int buf_id, bool skip_recently_used,
 						  WritebackContext *wb_context);
 static void WaitIO(BufferDesc *buf);
-static bool StartBufferIO(BufferDesc *buf, bool forInput);
+static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
 static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
 							  uint32 set_flag_bits, bool forget_owner);
 static void AbortBufferIO(Buffer buffer);
@@ -781,7 +791,6 @@ Buffer
 ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
 				   ReadBufferMode mode, BufferAccessStrategy strategy)
 {
-	bool		hit;
 	Buffer		buf;
 
 	/*
@@ -794,15 +803,9 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("cannot access temporary tables of other sessions")));
 
-	/*
-	 * Read the buffer, and update pgstat counters to reflect a cache hit or
-	 * miss.
-	 */
-	pgstat_count_buffer_read(reln);
-	buf = ReadBuffer_common(RelationGetSmgr(reln), reln->rd_rel->relpersistence,
-							forkNum, blockNum, mode, strategy, &hit);
-	if (hit)
-		pgstat_count_buffer_hit(reln);
+	buf = ReadBuffer_common(BMR_REL(reln),
+							forkNum, blockNum, mode, strategy);
+
 	return buf;
 }
 
@@ -822,13 +825,12 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum,
 						  BlockNumber blockNum, ReadBufferMode mode,
 						  BufferAccessStrategy strategy, bool permanent)
 {
-	bool		hit;
-
 	SMgrRelation smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
 
-	return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT :
-							 RELPERSISTENCE_UNLOGGED, forkNum, blockNum,
-							 mode, strategy, &hit);
+	return ReadBuffer_common(BMR_SMGR(smgr, permanent ? RELPERSISTENCE_PERMANENT :
+									  RELPERSISTENCE_UNLOGGED),
+							 forkNum, blockNum,
+							 mode, strategy);
 }
 
 /*
@@ -994,35 +996,66 @@ ExtendBufferedRelTo(BufferManagerRelation bmr,
 	 */
 	if (buffer == InvalidBuffer)
 	{
-		bool		hit;
-
 		Assert(extended_by == 0);
-		buffer = ReadBuffer_common(bmr.smgr, bmr.relpersistence,
-								   fork, extend_to - 1, mode, strategy,
-								   &hit);
+		buffer = ReadBuffer_common(bmr, fork, extend_to - 1, mode, strategy);
 	}
 
 	return buffer;
 }
 
 /*
- * ReadBuffer_common -- common logic for all ReadBuffer variants
- *
- * *hit is set to true if the request was satisfied from shared buffer cache.
+ * Zero a buffer and lock it, as part of the implementation of
+ * RBM_ZERO_AND_LOCK or RBM_ZERO_AND_CLEANUP_LOCK.  The buffer must be already
+ * pinned.  It does not have to be valid, but it is valid and locked on
+ * return.
  */
-static Buffer
-ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
-				  BlockNumber blockNum, ReadBufferMode mode,
-				  BufferAccessStrategy strategy, bool *hit)
+static void
+ZeroBuffer(Buffer buffer, ReadBufferMode mode)
 {
 	BufferDesc *bufHdr;
-	Block		bufBlock;
-	bool		found;
-	IOContext	io_context;
-	IOObject	io_object;
-	bool		isLocalBuf = SmgrIsTemp(smgr);
+	uint32		buf_state;
 
-	*hit = false;
+	Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
+
+	if (BufferIsLocal(buffer))
+		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+	else
+	{
+		bufHdr = GetBufferDescriptor(buffer - 1);
+		if (mode == RBM_ZERO_AND_LOCK)
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+		else
+			LockBufferForCleanup(buffer);
+	}
+
+	memset(BufferGetPage(buffer), 0, BLCKSZ);
+
+	if (BufferIsLocal(buffer))
+	{
+		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		buf_state |= BM_VALID;
+		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+	}
+	else
+	{
+		buf_state = LockBufHdr(bufHdr);
+		buf_state |= BM_VALID;
+		UnlockBufHdr(bufHdr, buf_state);
+	}
+}
+
+/*
+ * ReadBuffer_common -- common logic for all ReadBuffer variants
+ */
+static Buffer
+ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum,
+				  BlockNumber blockNum, ReadBufferMode mode,
+				  BufferAccessStrategy strategy)
+{
+	ReadBuffersOperation operation;
+	Buffer		buffer;
+	int			nblocks;
+	int			flags;
 
 	/*
 	 * Backward compatibility path, most code should use ExtendBufferedRel()
@@ -1041,181 +1074,413 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
 			flags |= EB_LOCK_FIRST;
 
-		return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence),
-								 forkNum, strategy, flags);
+		return ExtendBufferedRel(bmr, forkNum, strategy, flags);
 	}
 
-	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
-									   smgr->smgr_rlocator.locator.spcOid,
-									   smgr->smgr_rlocator.locator.dbOid,
-									   smgr->smgr_rlocator.locator.relNumber,
-									   smgr->smgr_rlocator.backend);
+	nblocks = 1;
+	if (mode == RBM_ZERO_ON_ERROR)
+		flags = READ_BUFFERS_ZERO_ON_ERROR;
+	else
+		flags = 0;
+	if (StartReadBuffers(bmr,
+						 &buffer,
+						 forkNum,
+						 blockNum,
+						 &nblocks,
+						 strategy,
+						 flags,
+						 &operation))
+		WaitReadBuffers(&operation);
+	Assert(nblocks == 1);		/* single block can't be short */
 
+	if (mode == RBM_ZERO_AND_CLEANUP_LOCK || mode == RBM_ZERO_AND_LOCK)
+		ZeroBuffer(buffer, mode);
+
+	return buffer;
+}
+
+/*
+ * Pin a buffer for a given block.  *foundPtr is set to true if the block was
+ * already present, or false if more work is required to either read it in or
+ * zero it.
+ */
+static inline Buffer
+PinBufferForBlock(BufferManagerRelation bmr,
+				  ForkNumber forkNum,
+				  BlockNumber blockNum,
+				  BufferAccessStrategy strategy,
+				  bool *foundPtr)
+{
+	BufferDesc *bufHdr;
+	bool		isLocalBuf;
+	IOContext	io_context;
+	IOObject	io_object;
+
+	Assert(blockNum != P_NEW);
+
+	Assert(bmr.smgr);
+
+	isLocalBuf = SmgrIsTemp(bmr.smgr);
 	if (isLocalBuf)
 	{
-		/*
-		 * We do not use a BufferAccessStrategy for I/O of temporary tables.
-		 * However, in some cases, the "strategy" may not be NULL, so we can't
-		 * rely on IOContextForStrategy() to set the right IOContext for us.
-		 * This may happen in cases like CREATE TEMPORARY TABLE AS...
-		 */
 		io_context = IOCONTEXT_NORMAL;
 		io_object = IOOBJECT_TEMP_RELATION;
-		bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
-		if (found)
-			pgBufferUsage.local_blks_hit++;
-		else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
-				 mode == RBM_ZERO_ON_ERROR)
-			pgBufferUsage.local_blks_read++;
 	}
 	else
 	{
-		/*
-		 * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
-		 * not currently in memory.
-		 */
 		io_context = IOContextForStrategy(strategy);
 		io_object = IOOBJECT_RELATION;
-		bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
-							 strategy, &found, io_context);
-		if (found)
-			pgBufferUsage.shared_blks_hit++;
-		else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
-				 mode == RBM_ZERO_ON_ERROR)
-			pgBufferUsage.shared_blks_read++;
 	}
 
-	/* At this point we do NOT hold any locks. */
+	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
+									   bmr.smgr->smgr_rlocator.locator.spcOid,
+									   bmr.smgr->smgr_rlocator.locator.dbOid,
+									   bmr.smgr->smgr_rlocator.locator.relNumber,
+									   bmr.smgr->smgr_rlocator.backend);
 
-	/* if it was already in the buffer pool, we're done */
-	if (found)
+	if (isLocalBuf)
+	{
+		bufHdr = LocalBufferAlloc(bmr.smgr, forkNum, blockNum, foundPtr);
+		if (*foundPtr)
+			pgBufferUsage.local_blks_hit++;
+	}
+	else
+	{
+		bufHdr = BufferAlloc(bmr.smgr, bmr.relpersistence, forkNum, blockNum,
+							 strategy, foundPtr, io_context);
+		if (*foundPtr)
+			pgBufferUsage.shared_blks_hit++;
+	}
+	if (bmr.rel)
+	{
+		/*
+		 * While pgBufferUsage's "read" counter isn't bumped unless we reach
+		 * WaitReadBuffers() (so, not for hits, and not for buffers that are
+		 * zeroed instead), the per-relation stats always count them.
+		 */
+		pgstat_count_buffer_read(bmr.rel);
+		if (*foundPtr)
+			pgstat_count_buffer_hit(bmr.rel);
+	}
+	if (*foundPtr)
 	{
-		/* Just need to update stats before we exit */
-		*hit = true;
 		VacuumPageHit++;
 		pgstat_count_io_op(io_object, io_context, IOOP_HIT);
-
 		if (VacuumCostActive)
 			VacuumCostBalance += VacuumCostPageHit;
 
 		TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-										  smgr->smgr_rlocator.locator.spcOid,
-										  smgr->smgr_rlocator.locator.dbOid,
-										  smgr->smgr_rlocator.locator.relNumber,
-										  smgr->smgr_rlocator.backend,
-										  found);
-
-		/*
-		 * In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked
-		 * on return.
-		 */
-		if (!isLocalBuf)
-		{
-			if (mode == RBM_ZERO_AND_LOCK)
-				LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
-							  LW_EXCLUSIVE);
-			else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
-				LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
-		}
-
-		return BufferDescriptorGetBuffer(bufHdr);
+										  bmr.smgr->smgr_rlocator.locator.spcOid,
+										  bmr.smgr->smgr_rlocator.locator.dbOid,
+										  bmr.smgr->smgr_rlocator.locator.relNumber,
+										  bmr.smgr->smgr_rlocator.backend,
+										  true);
 	}
 
-	/*
-	 * if we have gotten to this point, we have allocated a buffer for the
-	 * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it,
-	 * if it's a shared buffer.
-	 */
-	Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID));	/* spinlock not needed */
-
-	bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
-
-	/*
-	 * Read in the page, unless the caller intends to overwrite it and just
-	 * wants us to allocate a buffer.
-	 */
-	if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
-		MemSet((char *) bufBlock, 0, BLCKSZ);
-	else
-	{
-		instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
-
-		smgrread(smgr, forkNum, blockNum, bufBlock);
-
-		pgstat_count_io_op_time(io_object, io_context,
-								IOOP_READ, io_start, 1);
-
-		/* check for garbage data */
-		if (!PageIsVerifiedExtended((Page) bufBlock, blockNum,
-									PIV_LOG_WARNING | PIV_REPORT_STAT))
-		{
-			if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
-			{
-				ereport(WARNING,
-						(errcode(ERRCODE_DATA_CORRUPTED),
-						 errmsg("invalid page in block %u of relation %s; zeroing out page",
-								blockNum,
-								relpath(smgr->smgr_rlocator, forkNum))));
-				MemSet((char *) bufBlock, 0, BLCKSZ);
-			}
-			else
-				ereport(ERROR,
-						(errcode(ERRCODE_DATA_CORRUPTED),
-						 errmsg("invalid page in block %u of relation %s",
-								blockNum,
-								relpath(smgr->smgr_rlocator, forkNum))));
-		}
-	}
-
-	/*
-	 * In RBM_ZERO_AND_LOCK / RBM_ZERO_AND_CLEANUP_LOCK mode, grab the buffer
-	 * content lock before marking the page as valid, to make sure that no
-	 * other backend sees the zeroed page before the caller has had a chance
-	 * to initialize it.
-	 *
-	 * Since no-one else can be looking at the page contents yet, there is no
-	 * difference between an exclusive lock and a cleanup-strength lock. (Note
-	 * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
-	 * they assert that the buffer is already valid.)
-	 */
-	if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
-		!isLocalBuf)
-	{
-		LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
-	}
-
-	if (isLocalBuf)
-	{
-		/* Only need to adjust flags */
-		uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
-
-		buf_state |= BM_VALID;
-		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
-	}
-	else
-	{
-		/* Set BM_VALID, terminate IO, and wake up any waiters */
-		TerminateBufferIO(bufHdr, false, BM_VALID, true);
-	}
-
-	VacuumPageMiss++;
-	if (VacuumCostActive)
-		VacuumCostBalance += VacuumCostPageMiss;
-
-	TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-									  smgr->smgr_rlocator.locator.spcOid,
-									  smgr->smgr_rlocator.locator.dbOid,
-									  smgr->smgr_rlocator.locator.relNumber,
-									  smgr->smgr_rlocator.backend,
-									  found);
-
 	return BufferDescriptorGetBuffer(bufHdr);
 }
 
 /*
- * BufferAlloc -- subroutine for ReadBuffer.  Handles lookup of a shared
- *		buffer.  If no buffer exists already, selects a replacement
- *		victim and evicts the old page, but does NOT read in new page.
+ * Begin reading a range of blocks beginning at blockNum and extending for
+ * *nblocks.  On return, up to *nblocks pinned buffers holding those blocks
+ * are written into the buffers array, and *nblocks is updated to contain the
+ * actual number, which may be fewer than requested.
+ *
+ * If false is returned, no I/O is necessary and WaitReadBuffers() is not
+ * necessary.  If true is returned, one I/O has been started, and
+ * WaitReadBuffers() must be called with the same operation object before the
+ * buffers are accessed.  Along with the operation object, the caller-supplied
+ * array of buffers must remain valid until WaitReadBuffers() is called.
+ *
+ * Currently the I/O is only started with optional operating system advice,
+ * and the real I/O happens in WaitReadBuffers().  In future work, true I/O
+ * could be initiated here.
+ */
+bool
+StartReadBuffers(BufferManagerRelation bmr,
+				 Buffer *buffers,
+				 ForkNumber forkNum,
+				 BlockNumber blockNum,
+				 int *nblocks,
+				 BufferAccessStrategy strategy,
+				 int flags,
+				 ReadBuffersOperation *operation)
+{
+	int			actual_nblocks = *nblocks;
+	int			io_buffers_len = 0;
+
+	Assert(*nblocks > 0);
+	Assert(*nblocks <= MAX_BUFFER_IO_SIZE);
+
+	if (bmr.rel)
+	{
+		bmr.smgr = RelationGetSmgr(bmr.rel);
+		bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
+	}
+
+	for (int i = 0; i < actual_nblocks; ++i)
+	{
+		bool		found;
+
+		buffers[i] = PinBufferForBlock(bmr,
+									   forkNum,
+									   blockNum + i,
+									   strategy,
+									   &found);
+
+		if (found)
+		{
+			/*
+			 * Terminate the read as soon as we get a hit.  It could be a
+			 * single buffer hit, or it could be a hit that follows a readable
+			 * range.  We don't want to create more than one readable range,
+			 * so we stop here.
+			 */
+			actual_nblocks = operation->nblocks = *nblocks = i + 1;
+			break;
+		}
+		else
+		{
+			/* Extend the readable range to cover this block. */
+			io_buffers_len++;
+		}
+	}
+
+	if (io_buffers_len > 0)
+	{
+		/* Populate extra information needed for I/O. */
+		operation->io_buffers_len = io_buffers_len;
+		operation->blocknum = blockNum;
+		operation->buffers = buffers;
+		operation->nblocks = actual_nblocks;
+		operation->bmr = bmr;
+		operation->forknum = forkNum;
+		operation->strategy = strategy;
+		operation->flags = flags;
+
+		if (flags & READ_BUFFERS_ISSUE_ADVICE)
+		{
+			/*
+			 * In theory we should only do this if PinBufferForBlock() had to
+			 * allocate new buffers above.  That way, if two calls to
+			 * StartReadBuffers() were made for the same blocks before
+			 * WaitReadBuffers(), only the first would issue the advice.
+			 * That'd be a better simulation of true asynchronous I/O, which
+			 * would only start the I/O once, but isn't done here for
+			 * simplicity.  Note also that the following call might actually
+			 * issue two advice calls if we cross a segment boundary; in a
+			 * true asynchronous version we might choose to process only one
+			 * real I/O at a time in that case.
+			 */
+			smgrprefetch(bmr.smgr, forkNum, blockNum, operation->io_buffers_len);
+		}
+
+		/* Indicate that WaitReadBuffers() should be called. */
+		return true;
+	}
+	else
+	{
+		return false;
+	}
+}
+
+static inline bool
+WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
+{
+	if (BufferIsLocal(buffer))
+	{
+		BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+
+		return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
+	}
+	else
+		return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
+}
+
+void
+WaitReadBuffers(ReadBuffersOperation *operation)
+{
+	BufferManagerRelation bmr;
+	Buffer	   *buffers;
+	int			nblocks;
+	BlockNumber blocknum;
+	ForkNumber	forknum;
+	bool		isLocalBuf;
+	IOContext	io_context;
+	IOObject	io_object;
+
+	/*
+	 * Currently operations are only allowed to include a read of some range,
+	 * with an optional extra buffer that is already pinned at the end.  So
+	 * nblocks can be at most one more than io_buffers_len.
+	 */
+	Assert((operation->nblocks == operation->io_buffers_len) ||
+		   (operation->nblocks == operation->io_buffers_len + 1));
+
+	/* Find the range of the physical read we need to perform. */
+	nblocks = operation->io_buffers_len;
+	if (nblocks == 0)
+		return;					/* nothing to do */
+
+	buffers = &operation->buffers[0];
+	blocknum = operation->blocknum;
+	forknum = operation->forknum;
+	bmr = operation->bmr;
+
+	isLocalBuf = SmgrIsTemp(bmr.smgr);
+	if (isLocalBuf)
+	{
+		io_context = IOCONTEXT_NORMAL;
+		io_object = IOOBJECT_TEMP_RELATION;
+	}
+	else
+	{
+		io_context = IOContextForStrategy(operation->strategy);
+		io_object = IOOBJECT_RELATION;
+	}
+
+	/*
+	 * We count all these blocks as read by this backend.  This is traditional
+	 * behavior, but might turn out to be not true if we find that someone
+	 * else has beaten us and completed the read of some of these blocks.  In
+	 * that case the system globally double-counts, but we traditionally don't
+	 * count this as a "hit", and we don't have a separate counter for "miss,
+	 * but another backend completed the read".
+	 */
+	if (isLocalBuf)
+		pgBufferUsage.local_blks_read += nblocks;
+	else
+		pgBufferUsage.shared_blks_read += nblocks;
+
+	for (int i = 0; i < nblocks; ++i)
+	{
+		int			io_buffers_len;
+		Buffer		io_buffers[MAX_BUFFER_IO_SIZE];
+		void	   *io_pages[MAX_BUFFER_IO_SIZE];
+		instr_time	io_start;
+		BlockNumber io_first_block;
+
+		/*
+		 * Skip this block if someone else has already completed it.  If an
+		 * I/O is already in progress in another backend, this will wait for
+		 * the outcome: either done, or something went wrong and we will
+		 * retry.
+		 */
+		if (!WaitReadBuffersCanStartIO(buffers[i], false))
+		{
+			/*
+			 * Report this as a 'hit' for this backend, even though it must
+			 * have started out as a miss in PinBufferForBlock().
+			 */
+			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i,
+											  bmr.smgr->smgr_rlocator.locator.spcOid,
+											  bmr.smgr->smgr_rlocator.locator.dbOid,
+											  bmr.smgr->smgr_rlocator.locator.relNumber,
+											  bmr.smgr->smgr_rlocator.backend,
+											  true);
+			continue;
+		}
+
+		/* We found a buffer that we need to read in. */
+		io_buffers[0] = buffers[i];
+		io_pages[0] = BufferGetBlock(buffers[i]);
+		io_first_block = blocknum + i;
+		io_buffers_len = 1;
+
+		/*
+		 * How many neighboring-on-disk blocks can we can scatter-read into
+		 * other buffers at the same time?  In this case we don't wait if we
+		 * see an I/O already in progress.  We already hold BM_IO_IN_PROGRESS
+		 * for the head block, so we should get on with that I/O as soon as
+		 * possible.  We'll come back to this block again, above.
+		 */
+		while ((i + 1) < nblocks &&
+			   WaitReadBuffersCanStartIO(buffers[i + 1], true))
+		{
+			/* Must be consecutive block numbers. */
+			Assert(BufferGetBlockNumber(buffers[i + 1]) ==
+				   BufferGetBlockNumber(buffers[i]) + 1);
+
+			io_buffers[io_buffers_len] = buffers[++i];
+			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
+		}
+
+		io_start = pgstat_prepare_io_time(track_io_timing);
+		smgrreadv(bmr.smgr, forknum, io_first_block, io_pages, io_buffers_len);
+		pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start,
+								io_buffers_len);
+
+		/* Verify each block we read, and terminate the I/O. */
+		for (int j = 0; j < io_buffers_len; ++j)
+		{
+			BufferDesc *bufHdr;
+			Block		bufBlock;
+
+			if (isLocalBuf)
+			{
+				bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1);
+				bufBlock = LocalBufHdrGetBlock(bufHdr);
+			}
+			else
+			{
+				bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
+				bufBlock = BufHdrGetBlock(bufHdr);
+			}
+
+			/* check for garbage data */
+			if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j,
+										PIV_LOG_WARNING | PIV_REPORT_STAT))
+			{
+				if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages)
+				{
+					ereport(WARNING,
+							(errcode(ERRCODE_DATA_CORRUPTED),
+							 errmsg("invalid page in block %u of relation %s; zeroing out page",
+									io_first_block + j,
+									relpath(bmr.smgr->smgr_rlocator, forknum))));
+					memset(bufBlock, 0, BLCKSZ);
+				}
+				else
+					ereport(ERROR,
+							(errcode(ERRCODE_DATA_CORRUPTED),
+							 errmsg("invalid page in block %u of relation %s",
+									io_first_block + j,
+									relpath(bmr.smgr->smgr_rlocator, forknum))));
+			}
+
+			/* Terminate I/O and set BM_VALID. */
+			if (isLocalBuf)
+			{
+				uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+				buf_state |= BM_VALID;
+				pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+			}
+			else
+			{
+				/* Set BM_VALID, terminate IO, and wake up any waiters */
+				TerminateBufferIO(bufHdr, false, BM_VALID, true);
+			}
+
+			/* Report I/Os as completing individually. */
+			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
+											  bmr.smgr->smgr_rlocator.locator.spcOid,
+											  bmr.smgr->smgr_rlocator.locator.dbOid,
+											  bmr.smgr->smgr_rlocator.locator.relNumber,
+											  bmr.smgr->smgr_rlocator.backend,
+											  false);
+		}
+
+		VacuumPageMiss += io_buffers_len;
+		if (VacuumCostActive)
+			VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+	}
+}
+
+/*
+ * BufferAlloc -- subroutine for PinBufferForBlock.  Handles lookup of a shared
+ *		buffer.  If no buffer exists already, selects a replacement victim and
+ *		evicts the old page, but does NOT read in new page.
  *
  * "strategy" can be a buffer replacement strategy object, or NULL for
  * the default strategy.  The selected buffer's usage_count is advanced when
@@ -1223,11 +1488,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
  *
  * The returned buffer is pinned and is already marked as holding the
  * desired page.  If it already did have the desired page, *foundPtr is
- * set true.  Otherwise, *foundPtr is set false and the buffer is marked
- * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
- *
- * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
- * we keep it for simplicity in ReadBuffer.
+ * set true.  Otherwise, *foundPtr is set false.
  *
  * io_context is passed as an output parameter to avoid calling
  * IOContextForStrategy() when there is a shared buffers hit and no IO
@@ -1286,19 +1547,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		{
 			/*
 			 * We can only get here if (a) someone else is still reading in
-			 * the page, or (b) a previous read attempt failed.  We have to
-			 * wait for any active read attempt to finish, and then set up our
-			 * own read attempt if the page is still not BM_VALID.
-			 * StartBufferIO does it all.
+			 * the page, (b) a previous read attempt failed, or (c) someone
+			 * called StartReadBuffers() but not yet WaitReadBuffers().
 			 */
-			if (StartBufferIO(buf, true))
-			{
-				/*
-				 * If we get here, previous attempts to read the buffer must
-				 * have failed ... but we shall bravely try again.
-				 */
-				*foundPtr = false;
-			}
+			*foundPtr = false;
 		}
 
 		return buf;
@@ -1363,19 +1615,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		{
 			/*
 			 * We can only get here if (a) someone else is still reading in
-			 * the page, or (b) a previous read attempt failed.  We have to
-			 * wait for any active read attempt to finish, and then set up our
-			 * own read attempt if the page is still not BM_VALID.
-			 * StartBufferIO does it all.
+			 * the page, (b) a previous read attempt failed, or (c) someone
+			 * called StartReadBuffers() but not yet WaitReadBuffers().
 			 */
-			if (StartBufferIO(existing_buf_hdr, true))
-			{
-				/*
-				 * If we get here, previous attempts to read the buffer must
-				 * have failed ... but we shall bravely try again.
-				 */
-				*foundPtr = false;
-			}
+			*foundPtr = false;
 		}
 
 		return existing_buf_hdr;
@@ -1407,15 +1650,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	LWLockRelease(newPartitionLock);
 
 	/*
-	 * Buffer contents are currently invalid.  Try to obtain the right to
-	 * start I/O.  If StartBufferIO returns false, then someone else managed
-	 * to read it before we did, so there's nothing left for BufferAlloc() to
-	 * do.
+	 * Buffer contents are currently invalid.
 	 */
-	if (StartBufferIO(victim_buf_hdr, true))
-		*foundPtr = false;
-	else
-		*foundPtr = true;
+	*foundPtr = false;
 
 	return victim_buf_hdr;
 }
@@ -1769,7 +2006,7 @@ again:
  * pessimistic, but outside of toy-sized shared_buffers it should allow
  * sufficient pins.
  */
-static void
+void
 LimitAdditionalPins(uint32 *additional_pins)
 {
 	uint32		max_backends;
@@ -2034,7 +2271,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 
 				buf_state &= ~BM_VALID;
 				UnlockBufHdr(existing_hdr, buf_state);
-			} while (!StartBufferIO(existing_hdr, true));
+			} while (!StartBufferIO(existing_hdr, true, false));
 		}
 		else
 		{
@@ -2057,7 +2294,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 			LWLockRelease(partition_lock);
 
 			/* XXX: could combine the locked operations in it with the above */
-			StartBufferIO(victim_buf_hdr, true);
+			StartBufferIO(victim_buf_hdr, true, false);
 		}
 	}
 
@@ -2372,7 +2609,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
 	else
 	{
 		/*
-		 * If we previously pinned the buffer, it must surely be valid.
+		 * If we previously pinned the buffer, it is likely to be valid, but
+		 * it may not be if StartReadBuffers() was called and
+		 * WaitReadBuffers() hasn't been called yet.  We'll check by loading
+		 * the flags without locking.  This is racy, but it's OK to return
+		 * false spuriously: when WaitReadBuffers() calls StartBufferIO(),
+		 * it'll see that it's now valid.
 		 *
 		 * Note: We deliberately avoid a Valgrind client request here.
 		 * Individual access methods can optionally superimpose buffer page
@@ -2381,7 +2623,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
 		 * that the buffer page is legitimately non-accessible here.  We
 		 * cannot meddle with that.
 		 */
-		result = true;
+		result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0;
 	}
 
 	ref->refcount++;
@@ -3449,7 +3691,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 * someone else flushed the buffer before we could, so we need not do
 	 * anything.
 	 */
-	if (!StartBufferIO(buf, false))
+	if (!StartBufferIO(buf, false, false))
 		return;
 
 	/* Setup error traceback support for ereport() */
@@ -5184,9 +5426,15 @@ WaitIO(BufferDesc *buf)
  *
  * Returns true if we successfully marked the buffer as I/O busy,
  * false if someone else already did the work.
+ *
+ * If nowait is true, then we don't wait for an I/O to be finished by another
+ * backend.  In that case, false indicates either that the I/O was already
+ * finished, or is still in progress.  This is useful for callers that want to
+ * find out if they can perform the I/O as part of a larger operation, without
+ * waiting for the answer or distinguishing the reasons why not.
  */
 static bool
-StartBufferIO(BufferDesc *buf, bool forInput)
+StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 {
 	uint32		buf_state;
 
@@ -5199,6 +5447,8 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 		if (!(buf_state & BM_IO_IN_PROGRESS))
 			break;
 		UnlockBufHdr(buf, buf_state);
+		if (nowait)
+			return false;
 		WaitIO(buf);
 	}
 
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index fcfac335a57..985a2c7049c 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -108,10 +108,9 @@ PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
  * LocalBufferAlloc -
  *	  Find or create a local buffer for the given page of the given relation.
  *
- * API is similar to bufmgr.c's BufferAlloc, except that we do not need
- * to do any locking since this is all local.   Also, IO_IN_PROGRESS
- * does not get set.  Lastly, we support only default access strategy
- * (hence, usage_count is always advanced).
+ * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
+ * any locking since this is all local.  We support only default access
+ * strategy (hence, usage_count is always advanced).
  */
 BufferDesc *
 LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
@@ -287,7 +286,7 @@ GetLocalVictimBuffer(void)
 }
 
 /* see LimitAdditionalPins() */
-static void
+void
 LimitAdditionalLocalPins(uint32 *additional_pins)
 {
 	uint32		max_pins;
@@ -297,9 +296,10 @@ LimitAdditionalLocalPins(uint32 *additional_pins)
 
 	/*
 	 * In contrast to LimitAdditionalPins() other backends don't play a role
-	 * here. We can allow up to NLocBuffer pins in total.
+	 * here. We can allow up to NLocBuffer pins in total, but it might not be
+	 * initialized yet so read num_temp_buffers.
 	 */
-	max_pins = (NLocBuffer - NLocalPinnedBuffers);
+	max_pins = (num_temp_buffers - NLocalPinnedBuffers);
 
 	if (*additional_pins >= max_pins)
 		*additional_pins = max_pins;
diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build
index 40345bdca27..739d13293fb 100644
--- a/src/backend/storage/meson.build
+++ b/src/backend/storage/meson.build
@@ -1,5 +1,6 @@
 # Copyright (c) 2022-2024, PostgreSQL Global Development Group
 
+subdir('aio')
 subdir('buffer')
 subdir('file')
 subdir('freespace')
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 1e71e7db4a0..71889471266 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3112,6 +3112,20 @@ struct config_int ConfigureNamesInt[] =
 		NULL
 	},
 
+	{
+		{"buffer_io_size",
+			PGC_USERSET,
+			RESOURCES_ASYNCHRONOUS,
+			gettext_noop("Target size for coalescing reads and writes of buffered data blocks."),
+			NULL,
+			GUC_UNIT_BLOCKS
+		},
+		&buffer_io_size,
+		DEFAULT_BUFFER_IO_SIZE,
+		1, MAX_BUFFER_IO_SIZE,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"backend_flush_after", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
 			gettext_noop("Number of pages after which previously performed writes are flushed to disk."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 2244ee52f79..b7a4143df21 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -203,6 +203,7 @@
 #backend_flush_after = 0		# measured in pages, 0 disables
 #effective_io_concurrency = 1		# 1-1000; 0 disables prefetching
 #maintenance_io_concurrency = 10	# 1-1000; 0 disables prefetching
+#buffer_io_size = 128kB
 #max_worker_processes = 8		# (change requires restart)
 #max_parallel_workers_per_gather = 2	# limited by max_parallel_workers
 #max_parallel_maintenance_workers = 2	# limited by max_parallel_workers
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 65a6e6c4086..3af86c59384 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2719,6 +2719,20 @@ include_dir 'conf.d'
        </listitem>
       </varlistentry>
 
+      <varlistentry id="guc-buffer-io-size" xreflabel="buffer_io_size">
+       <term><varname>buffer_io_size</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>buffer_io_size</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Controls the target I/O size in operations that coalesce buffer I/O.
+         The default is 128kB.
+        </para>
+       </listitem>
+      </varlistentry>
+
       <varlistentry id="guc-max-worker-processes" xreflabel="max_worker_processes">
        <term><varname>max_worker_processes</varname> (<type>integer</type>)
        <indexterm>
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 4679660837c..a603b52ed70 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2287,6 +2287,7 @@ ReInitializeDSMForeignScan_function
 ReScanForeignScan_function
 ReadBufPtrType
 ReadBufferMode
+ReadBuffersOperation
 ReadBytePtrType
 ReadExtraTocPtrType
 ReadFunc
@@ -2693,6 +2694,7 @@ StopList
 StrategyNumber
 StreamCtl
 StreamStopReason
+StreamingRead
 String
 StringInfo
 StringInfoData
-- 
2.43.0

