Some read stream improvements

Started by Thomas Munro11 months ago18 messages
#1Thomas Munro
thomas.munro@gmail.com
6 attachment(s)

Hi,

Here are some patches that address some of Andres's feedback since the
AIO v2 rebase[1]/messages/by-id/CA+hUKGLxH1tsUgzZfng4BU6GqnS6bKF2ThvxH1_w5c7-sLRKQw@mail.gmail.com, anticipate out-of-order streams, and make some other
minor improvements. They are independent of the main AIO patch set
and apply to master, hence separate thread.

0001-Refactor-read_stream.c-s-circular-arithmetic.patch

This just replaced open-coded arithmetic with inline functions. They
will be used a lot more in later work, and provide central places to
put assertions that were not checked as uniformly as I would like.

0002-Allow-more-buffers-for-sequential-read-streams.patch

Currently we're only generating I/O concurrency for random I/O, and I
originally guesstimated that random I/O wouldn't typically be able to
combine more than about 4 blocks on average when capping the buffer
queue. Debatable perhaps, but soon obsolete: for true asynchronous
I/O, we'll need full concurrency * full combine limit, so we'll
potentially want to pin more buffers. This just changes that
hard-coded 4 to io_combine_limit.

0003-Improve-buffer-pool-API-for-per-backend-pin-limits.patch

In preparation for the next patch, provide some new bufmgr APIs.

0004-Respect-pin-limits-accurately-in-read_stream.c.patch

The current coding only computes the remaining "fair share" of the
buffer pool for this backend at stream initialisation. It's hard, but
not impossible, to get one backend to pin more than 1/max_connections
of the buffer pool (the intended cap), when using multiple streams at
the same time in one backend. This patch moves the time of check to
the time of use, so it respects the limit strictly. I avoid adding
any changes to the fast path for all-cached streams, which only pin
one buffer at a time.

This is basically a TOCTOU bug, and could in theory be back-patched,
but I don't personally think it's necessary yet, since it's so hard to
actually break anything with v17's rather limited use of streams. The
only way I can think of right now to see a concrete problem would be
to get many backends all to create many CURSORs that stream cold data,
and FETCH in a particular order only after they've all been
constructed, which is also a recipe for running out of buffers without
using streams, albeit not quite as quickly.

0005-Support-buffer-forwarding-in-read_stream.c.patch
0006-Support-buffer-forwarding-in-StartReadBuffers.patch

Background: StartReadBuffers() reports a short read when it finds a
cached block that divides a potential I/O. For example, if you ask to
read 8 blocks, and the 6th one turns out to be a hit, ie already valid
in the buffer pool, then we only need to read the first 5 from disk.
Perhaps the 7th and 8th blocks will also need I/O, but we don't want
StartReadBuffers() to start *two* I/Os, so the 6th block terminates
our search. The question is what to do with that 6th block.
Currently we'll tell the user that the size of the operation is 6
blocks -- that is, we'll silently include that hit that prevented
further combining, even though we'll only actually do the read for the
first 5 blocks, and if someone else manages to make any buffers valid
before you call WaitReadBuffers(), we'll just silently skip over them
and loop.

That was simple enough, but having hits that are invisibly mixed up
with misses prevents us from implementing fully general reordering of
hits, an important optimisation that reduces I/O stalls for consumers
that can cope with out-of-order data (I'll post a new patch for that
separately).

A simple solution we rejected was to unpin such buffers and then repin
later, but that would waste cycles, allow eviction and potentially
mess up the usage count. The AIO patch set also introduces another
case involving potentially many buffers: it moves the
BM_IO_IN_PROGRESS negotiation into StartReadBuffers(), and reports a
short read then too for middle blocks, but it has already acquired all
the pins.

The solution we agreed on is to introduce a way for StartReadBuffers()
to communicate with future calls, and "forward" pinned buffers between
calls. The function arguments don't change, but its "buffers"
argument becomes an in/out array: one StartReadBuffers() call can
leave extra pinned buffers after the ones that were included in a
short read (*nblocks), and then when you retry (or possibly extend)
the rest of the read, you have to pass them back in. That is easy for
the read stream code, as it can just leave them in its circular queue
for the next call to take as input. It only needs to be aware of them
for pin limit accounting and stream reset (including early shutdown).

One goal was to avoid introducing any new instructions into
ReadBuffer() or the read stream fast path, so StartReadBuffer(), the
single-block specialisation, doesn't support forwarding (it already
can't split reads, they are only one block, but it also doesn't
support receiving forwarded buffers).

I plan to share some new C-level tests and illustrations of the
internal states of read_stream.c separately, as the complexity is
obviously increasing a bit (especially with out-of-order streams,
about which more soon).

[1]: /messages/by-id/CA+hUKGLxH1tsUgzZfng4BU6GqnS6bKF2ThvxH1_w5c7-sLRKQw@mail.gmail.com

Attachments:

v1-0001-Refactor-read_stream.c-s-circular-arithmetic.patchtext/x-patch; charset=US-ASCII; name=v1-0001-Refactor-read_stream.c-s-circular-arithmetic.patchDownload
From 57cc7f12e4a7799c581d4ced054dfbb2663c5b81 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 15 Feb 2025 14:47:25 +1300
Subject: [PATCH v1 1/6] Refactor read_stream.c's circular arithmetic.

Several places have open-coded circular index arithmetic.  Make some
common functions for better readability and consistent assertion
checking.

This avoids adding yet more open-coded duplication in later patches, and
standardizes on the vocabulary "advance" and "retreat" as used elsewhere
in PostgreSQL.
---
 src/backend/storage/aio/read_stream.c | 78 +++++++++++++++++++++------
 1 file changed, 61 insertions(+), 17 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 4b499dfb441..eadfe88c35a 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -224,6 +224,55 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
 	stream->buffered_blocknum = blocknum;
 }
 
+/*
+ * Increment index, wrapping around at queue size.
+ */
+static inline void
+read_stream_index_advance(ReadStream *stream, int16 *index)
+{
+	Assert(*index >= 0);
+	Assert(*index < stream->queue_size);
+
+	*index += 1;
+	if (*index == stream->queue_size)
+		*index = 0;
+}
+
+/*
+ * Increment index by n, wrapping around at queue size.
+ */
+static inline void
+read_stream_index_advance_n(ReadStream *stream, int16 *index, int16 n)
+{
+	Assert(*index >= 0);
+	Assert(*index < stream->queue_size);
+	Assert(n <= MAX_IO_COMBINE_LIMIT);
+
+	*index += n;
+	if (*index >= stream->queue_size)
+		*index -= stream->queue_size;
+
+	Assert(*index >= 0);
+	Assert(*index < stream->queue_size);
+}
+
+#if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
+/*
+ * Decrement index, wrapping around at queue size.
+ */
+static inline void
+read_stream_index_retreat(ReadStream *stream, int16 *index)
+{
+	Assert(*index >= 0);
+	Assert(*index < stream->queue_size);
+
+	if (*index == 0)
+		*index = stream->queue_size - 1;
+	else
+		*index -= 1;
+}
+#endif
+
 static void
 read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 {
@@ -302,11 +351,8 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 				&stream->buffers[stream->queue_size],
 				sizeof(stream->buffers[0]) * overflow);
 
-	/* Compute location of start of next read, without using % operator. */
-	buffer_index += nblocks;
-	if (buffer_index >= stream->queue_size)
-		buffer_index -= stream->queue_size;
-	Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
+	/* Move to the location of start of next read. */
+	read_stream_index_advance_n(stream, &buffer_index, nblocks);
 	stream->next_buffer_index = buffer_index;
 
 	/* Adjust the pending read to cover the remaining portion, if any. */
@@ -334,12 +380,12 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 		/*
 		 * See which block the callback wants next in the stream.  We need to
 		 * compute the index of the Nth block of the pending read including
-		 * wrap-around, but we don't want to use the expensive % operator.
+		 * wrap-around.
 		 */
-		buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
-		if (buffer_index >= stream->queue_size)
-			buffer_index -= stream->queue_size;
-		Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
+		buffer_index = stream->next_buffer_index;
+		read_stream_index_advance_n(stream,
+									&buffer_index,
+									stream->pending_read_nblocks);
 		per_buffer_data = get_per_buffer_data(stream, buffer_index);
 		blocknum = read_stream_get_block(stream, per_buffer_data);
 		if (blocknum == InvalidBlockNumber)
@@ -777,12 +823,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	 */
 	if (stream->per_buffer_data)
 	{
+		int16		index;
 		void	   *per_buffer_data;
 
-		per_buffer_data = get_per_buffer_data(stream,
-											  oldest_buffer_index == 0 ?
-											  stream->queue_size - 1 :
-											  oldest_buffer_index - 1);
+		index = oldest_buffer_index;
+		read_stream_index_retreat(stream, &index);
+		per_buffer_data = get_per_buffer_data(stream, index);
 
 #if defined(CLOBBER_FREED_MEMORY)
 		/* This also tells Valgrind the memory is "noaccess". */
@@ -800,9 +846,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	stream->pinned_buffers--;
 
 	/* Advance oldest buffer, with wrap-around. */
-	stream->oldest_buffer_index++;
-	if (stream->oldest_buffer_index == stream->queue_size)
-		stream->oldest_buffer_index = 0;
+	read_stream_index_advance(stream, &stream->oldest_buffer_index);
 
 	/* Prepare for the next call. */
 	read_stream_look_ahead(stream, false);
-- 
2.48.1

v1-0002-Allow-more-buffers-for-sequential-read-streams.patchtext/x-patch; charset=US-ASCII; name=v1-0002-Allow-more-buffers-for-sequential-read-streams.patchDownload
From cb597bdb53904a82cd27d2f67b8bad9873066f14 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 21 Jan 2025 08:08:08 +1300
Subject: [PATCH v1 2/6] Allow more buffers for sequential read streams.

Read streams currently only start concurrent I/Os (via read-ahead
advice) for random access, with a hard-coded guesstimate that their
average size is likely to be at most 4 blocks when planning the size of
the buffer queue.  Sequential streams benefit from kernel readahead when
using buffered I/O, and read-ahead advice doesn't exist for direct I/O
by definition, so we didn't need to look ahead more than
io_combine_limit in that case.

Proposed patches need more buffers to be able start multiple
asynchronous I/O operations even for sequential access.  Adjust the
arithmetic in preparation, replacing "4" with io_combine_limit, though
there is no benefit yet, just some wasted queue space.

As of the time of writing, the maximum GUC values for
effective_io_concurrent (1000) and io_combine_limit (32) imply a queue
with around 32K entries (slightly more for technical reasons), though
those numbers are likely to change.  That requires a wider type in one
place that has a intermediate value that might overflow before clamping.
---
 src/backend/storage/aio/read_stream.c | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index eadfe88c35a..cb308948b6e 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -499,7 +499,7 @@ read_stream_begin_impl(int flags,
 	 * overflow (even though that's not possible with the current GUC range
 	 * limits), allowing also for the spare entry and the overflow space.
 	 */
-	max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
+	max_pinned_buffers = Max(max_ios, 1) * io_combine_limit;
 	max_pinned_buffers = Min(max_pinned_buffers,
 							 PG_INT16_MAX - io_combine_limit - 1);
 
@@ -771,7 +771,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
 	{
 		int16		io_index = stream->oldest_io_index;
-		int16		distance;
+		int32		distance;	/* wider temporary value, clamped below */
 
 		/* Sanity check that we still agree on the buffers. */
 		Assert(stream->ios[io_index].op.buffers ==
-- 
2.48.1

v1-0003-Improve-buffer-pool-API-for-per-backend-pin-limit.patchtext/x-patch; charset=US-ASCII; name=v1-0003-Improve-buffer-pool-API-for-per-backend-pin-limit.patchDownload
From d98dbce9475d4640305bee2e6f536512de2d20d3 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Fri, 24 Jan 2025 10:59:39 +1300
Subject: [PATCH v1 3/6] Improve buffer pool API for per-backend pin limits.

Previously the support functions assumed that you needed one additional
pin to make progress, and could optionally use some more.  Add a couple
more functions for callers that want to know:

* what the maximum possible number could be, for space planning
  purposes, called the "soft pin limit"

* how many additional pins they could acquire right now, without the
  special case allowing one pin (ie for users that already hold pins and
  can already make progress even if zero extra pins are available now)

These APIs are better suited to read_stream.c, which will be adjusted in
a follow-up patch.  Also move the computation of the each backend's fair
share of the buffer pool to backend initialization time, since the
answer doesn't change and we don't want to perform a division operation
every time we compute availability.
---
 src/backend/storage/buffer/bufmgr.c   | 75 ++++++++++++++++++++-------
 src/backend/storage/buffer/localbuf.c | 16 ++++++
 src/include/storage/bufmgr.h          |  4 ++
 3 files changed, 77 insertions(+), 18 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 80b0d0c5ded..2ca641204fb 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -211,6 +211,8 @@ static int32 PrivateRefCountOverflowed = 0;
 static uint32 PrivateRefCountClock = 0;
 static PrivateRefCountEntry *ReservedRefCountEntry = NULL;
 
+static uint32 MaxProportionalPins;
+
 static void ReservePrivateRefCountEntry(void);
 static PrivateRefCountEntry *NewPrivateRefCountEntry(Buffer buffer);
 static PrivateRefCountEntry *GetPrivateRefCountEntry(Buffer buffer, bool do_move);
@@ -2097,6 +2099,46 @@ again:
 	return buf;
 }
 
+/*
+ * Return the maximum number of buffer than this backend should try to pin at
+ * once, to avoid pinning more than its fair share.  This is the highest value
+ * that GetAdditionalPinLimit() and LimitAdditionalPins() could ever return.
+ *
+ * It's called a soft limit because nothing stops a backend from trying to
+ * acquire more pins than this this with ReadBuffer(), but code that wants more
+ * for I/O optimizations should respect this per-backend limit when it can
+ * still make progress without them.
+ */
+uint32
+GetSoftPinLimit(void)
+{
+	return MaxProportionalPins;
+}
+
+/*
+ * Return the maximum number of additional buffers that this backend should
+ * pin if it wants to stay under the per-backend soft limit, considering the
+ * number of buffers it has already pinned.
+ */
+uint32
+GetAdditionalPinLimit(void)
+{
+	uint32		estimated_pins_held;
+
+	/*
+	 * We get the number of "overflowed" pins for free, but don't know the
+	 * number of pins in PrivateRefCountArray.  The cost of calculating that
+	 * exactly doesn't seem worth it, so just assume the max.
+	 */
+	estimated_pins_held = PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES;
+
+	/* Is this backend already holding more than its fair share? */
+	if (estimated_pins_held > MaxProportionalPins)
+		return 0;
+
+	return MaxProportionalPins - estimated_pins_held;
+}
+
 /*
  * Limit the number of pins a batch operation may additionally acquire, to
  * avoid running out of pinnable buffers.
@@ -2112,28 +2154,15 @@ again:
 void
 LimitAdditionalPins(uint32 *additional_pins)
 {
-	uint32		max_backends;
-	int			max_proportional_pins;
+	uint32		limit;
 
 	if (*additional_pins <= 1)
 		return;
 
-	max_backends = MaxBackends + NUM_AUXILIARY_PROCS;
-	max_proportional_pins = NBuffers / max_backends;
-
-	/*
-	 * Subtract the approximate number of buffers already pinned by this
-	 * backend. We get the number of "overflowed" pins for free, but don't
-	 * know the number of pins in PrivateRefCountArray. The cost of
-	 * calculating that exactly doesn't seem worth it, so just assume the max.
-	 */
-	max_proportional_pins -= PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES;
-
-	if (max_proportional_pins <= 0)
-		max_proportional_pins = 1;
-
-	if (*additional_pins > max_proportional_pins)
-		*additional_pins = max_proportional_pins;
+	limit = GetAdditionalPinLimit();
+	limit = Max(limit, 1);
+	if (limit < *additional_pins)
+		*additional_pins = limit;
 }
 
 /*
@@ -3574,6 +3603,16 @@ InitBufferManagerAccess(void)
 {
 	HASHCTL		hash_ctl;
 
+	/*
+	 * The soft limit on the number of pins each backend should respect, bast
+	 * on shared_buffers and the maximum number of connections possible.
+	 * That's very pessimistic, but outside toy-sized shared_buffers it should
+	 * allow plenty of pins.  Higher level code that pins non-trivial numbers
+	 * of buffers should use LimitAdditionalPins() or GetAdditionalPinLimit()
+	 * to stay under this limit.
+	 */
+	MaxProportionalPins = NBuffers / (MaxBackends + NUM_AUXILIARY_PROCS);
+
 	memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray));
 
 	hash_ctl.keysize = sizeof(int32);
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 64931efaa75..3c055f6ec8b 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -286,6 +286,22 @@ GetLocalVictimBuffer(void)
 	return BufferDescriptorGetBuffer(bufHdr);
 }
 
+/* see GetSoftPinLimit() */
+uint32
+GetSoftLocalPinLimit(void)
+{
+	/* Every backend has its own temporary buffers, and can pin them all. */
+	return num_temp_buffers;
+}
+
+/* see GetAdditionalPinLimit() */
+uint32
+GetAdditionalLocalPinLimit(void)
+{
+	Assert(NLocalPinnedBuffers <= num_temp_buffers);
+	return num_temp_buffers - NLocalPinnedBuffers;
+}
+
 /* see LimitAdditionalPins() */
 void
 LimitAdditionalLocalPins(uint32 *additional_pins)
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 7c1e4316dde..597ecb97897 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -290,6 +290,10 @@ extern bool HoldingBufferPinThatDelaysRecovery(void);
 
 extern bool BgBufferSync(struct WritebackContext *wb_context);
 
+extern uint32 GetSoftPinLimit(void);
+extern uint32 GetSoftLocalPinLimit(void);
+extern uint32 GetAdditionalPinLimit(void);
+extern uint32 GetAdditionalLocalPinLimit(void);
 extern void LimitAdditionalPins(uint32 *additional_pins);
 extern void LimitAdditionalLocalPins(uint32 *additional_pins);
 
-- 
2.48.1

v1-0004-Respect-pin-limits-accurately-in-read_stream.c.patchtext/x-patch; charset=US-ASCII; name=v1-0004-Respect-pin-limits-accurately-in-read_stream.c.patchDownload
From b71e09f174c61adf475355e5fff4c01b6a0af399 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Fri, 24 Jan 2025 23:52:53 +1300
Subject: [PATCH v1 4/6] Respect pin limits accurately in read_stream.c.

Read streams pin multiple buffers at once as required to combine I/O.
This also avoids having to unpin and repin later when issuing read-ahead
advice, and will be needed for proposed work that starts "real"
asynchronous I/O.

To avoid pinning too much of the buffer pool at once, we previously used
LimitAdditionalBuffers() to avoid pinning more than this backend's fair
share of the pool as a cap.  The coding was a naive and only checked the
cap once at stream initialization.

This commit moves the check to the time of use with new bufmgr APIs from
an earlier commit, since the result might change later due to pins
acquired later outside this stream.  No extra CPU cycles are added to
the all-buffered fast-path code (it only pins one buffer at a time), but
the I/O-starting path now re-checks the limit every time using simple
arithmetic.

In practice it was difficult to exceed the limit, but you could contrive
a workload to do it using multiple CURSORs and FETCHing from sequential
scans in round-robin fashion, so that each underlying stream computes
its limit before all the others have ramped up to their full look-ahead
distance.  Therefore, no back-patch for now.

Per code review from Andres, in the course of his AIO work.

Reported-by: Andres Freund <andres@anarazel.de>
---
 src/backend/storage/aio/read_stream.c | 111 ++++++++++++++++++++++----
 1 file changed, 95 insertions(+), 16 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index cb308948b6e..dc5ae60a089 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -115,6 +115,7 @@ struct ReadStream
 	int16		pinned_buffers;
 	int16		distance;
 	bool		advice_enabled;
+	bool		temporary;
 
 	/*
 	 * One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -274,7 +275,9 @@ read_stream_index_retreat(ReadStream *stream, int16 *index)
 #endif
 
 static void
-read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
+read_stream_start_pending_read(ReadStream *stream,
+							   int16 buffer_limit,
+							   bool suppress_advice)
 {
 	bool		need_wait;
 	int			nblocks;
@@ -308,10 +311,14 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	else
 		flags = 0;
 
-	/* We say how many blocks we want to read, but may be smaller on return. */
+	/*
+	 * We say how many blocks we want to read, but may be smaller on return.
+	 * On memory-constrained systems we may be also have to ask for a smaller
+	 * read ourselves.
+	 */
 	buffer_index = stream->next_buffer_index;
 	io_index = stream->next_io_index;
-	nblocks = stream->pending_read_nblocks;
+	nblocks = Min(buffer_limit, stream->pending_read_nblocks);
 	need_wait = StartReadBuffers(&stream->ios[io_index].op,
 								 &stream->buffers[buffer_index],
 								 stream->pending_read_blocknum,
@@ -360,11 +367,60 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	stream->pending_read_nblocks -= nblocks;
 }
 
+/*
+ * How many more buffers could we use, while respecting the soft limit?
+ */
+static int16
+read_stream_get_buffer_limit(ReadStream *stream)
+{
+	uint32		buffers;
+
+	/* Check how many local or shared pins we could acquire. */
+	if (stream->temporary)
+		buffers = GetAdditionalLocalPinLimit();
+	else
+		buffers = GetAdditionalPinLimit();
+
+	/*
+	 * Each stream is always allowed to try to acquire one pin if it doesn't
+	 * hold one already.  This is needed to guarantee progress, and just like
+	 * the simple ReadBuffer() operation in code that is not using this stream
+	 * API, if a buffer can't be pinned we'll raise an error when trying to
+	 * pin, ie the buffer pool is simply too small for the workload.
+	 */
+	if (buffers == 0 && stream->pinned_buffers == 0)
+		return 1;
+
+	/*
+	 * Otherwise, see how many additional pins the backend can currently pin,
+	 * which may be zero.  As above, this only guarantees that this backend
+	 * won't use more than its fair share if all backends can respect the soft
+	 * limit, not that a pin can actually be acquired without error.
+	 */
+	return Min(buffers, INT16_MAX);
+}
+
 static void
 read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 {
+	int16		buffer_limit;
+
+	/*
+	 * Check how many pins we could acquire now.  We do this here rather than
+	 * pushing it down into read_stream_start_pending_read(), because it
+	 * allows more flexibility in behavior when we run out of allowed pins.
+	 * Currently the policy is to start an I/O when we've run out of allowed
+	 * pins only if we have to to make progress, and otherwise to stop looking
+	 * ahead until more pins become available, so that we don't start issuing
+	 * a lot of smaller I/Os, prefering to build the largest ones we can. This
+	 * choice is debatable, but it should only really come up with the buffer
+	 * pool/connection ratio is very constrained.
+	 */
+	buffer_limit = read_stream_get_buffer_limit(stream);
+
 	while (stream->ios_in_progress < stream->max_ios &&
-		   stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
+		   stream->pinned_buffers + stream->pending_read_nblocks <
+		   Min(stream->distance, buffer_limit))
 	{
 		BlockNumber blocknum;
 		int16		buffer_index;
@@ -372,7 +428,9 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 
 		if (stream->pending_read_nblocks == io_combine_limit)
 		{
-			read_stream_start_pending_read(stream, suppress_advice);
+			read_stream_start_pending_read(stream, buffer_limit,
+										   suppress_advice);
+			buffer_limit = read_stream_get_buffer_limit(stream);
 			suppress_advice = false;
 			continue;
 		}
@@ -406,11 +464,12 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 		/* We have to start the pending read before we can build another. */
 		while (stream->pending_read_nblocks > 0)
 		{
-			read_stream_start_pending_read(stream, suppress_advice);
+			read_stream_start_pending_read(stream, buffer_limit, suppress_advice);
+			buffer_limit = read_stream_get_buffer_limit(stream);
 			suppress_advice = false;
-			if (stream->ios_in_progress == stream->max_ios)
+			if (stream->ios_in_progress == stream->max_ios || buffer_limit == 0)
 			{
-				/* And we've hit the limit.  Rewind, and stop here. */
+				/* And we've hit a limit.  Rewind, and stop here. */
 				read_stream_unget_block(stream, blocknum);
 				return;
 			}
@@ -426,16 +485,17 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 	 * limit, preferring to give it another chance to grow to full
 	 * io_combine_limit size once more buffers have been consumed.  However,
 	 * if we've already reached io_combine_limit, or we've reached the
-	 * distance limit and there isn't anything pinned yet, or the callback has
-	 * signaled end-of-stream, we start the read immediately.
+	 * distance limit or buffer limit and there isn't anything pinned yet, or
+	 * the callback has signaled end-of-stream, we start the read immediately.
 	 */
 	if (stream->pending_read_nblocks > 0 &&
 		(stream->pending_read_nblocks == io_combine_limit ||
-		 (stream->pending_read_nblocks == stream->distance &&
+		 ((stream->pending_read_nblocks == stream->distance ||
+		   stream->pending_read_nblocks == buffer_limit) &&
 		  stream->pinned_buffers == 0) ||
 		 stream->distance == 0) &&
 		stream->ios_in_progress < stream->max_ios)
-		read_stream_start_pending_read(stream, suppress_advice);
+		read_stream_start_pending_read(stream, buffer_limit, suppress_advice);
 }
 
 /*
@@ -464,6 +524,7 @@ read_stream_begin_impl(int flags,
 	int			max_ios;
 	int			strategy_pin_limit;
 	uint32		max_pinned_buffers;
+	uint32		max_possible_buffer_limit;
 	Oid			tablespace_id;
 
 	/*
@@ -507,12 +568,23 @@ read_stream_begin_impl(int flags,
 	strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
 	max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
 
-	/* Don't allow this backend to pin more than its share of buffers. */
+	/*
+	 * Also limit by the maximum possible number of pins we could be allowed
+	 * to acquire according to bufmgr.  We may not be able to use them all due
+	 * to other pins held by this backend, but we'll enforce the dynamic limit
+	 * later when starting I/O.
+	 */
 	if (SmgrIsTemp(smgr))
-		LimitAdditionalLocalPins(&max_pinned_buffers);
+		max_possible_buffer_limit = GetSoftLocalPinLimit();
 	else
-		LimitAdditionalPins(&max_pinned_buffers);
-	Assert(max_pinned_buffers > 0);
+		max_possible_buffer_limit = GetSoftPinLimit();
+	max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
+
+	/*
+	 * The soft limit might be zero on a system configured with more
+	 * connections than buffers.  We need at least one.
+	 */
+	max_pinned_buffers = Max(1, max_pinned_buffers);
 
 	/*
 	 * We need one extra entry for buffers and per-buffer data, because users
@@ -572,6 +644,7 @@ read_stream_begin_impl(int flags,
 	stream->callback = callback;
 	stream->callback_private_data = callback_private_data;
 	stream->buffered_blocknum = InvalidBlockNumber;
+	stream->temporary = SmgrIsTemp(smgr);
 
 	/*
 	 * Skip the initial ramp-up phase if the caller says we're going to be
@@ -700,6 +773,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			 * arbitrary I/O entry (they're all free).  We don't have to
 			 * adjust pinned_buffers because we're transferring one to caller
 			 * but pinning one more.
+			 *
+			 * In the fast path we don't need to check the pin limit.  We're
+			 * always allowed at least one pin so that progress can be made,
+			 * and that's all we need here.  Although two pins are momentarily
+			 * held at the same time, the model used here is that the stream
+			 * holds only one, and the other now belongs to the caller.
 			 */
 			if (likely(!StartReadBuffer(&stream->ios[0].op,
 										&stream->buffers[oldest_buffer_index],
-- 
2.48.1

v1-0005-Support-buffer-forwarding-in-read_stream.c.patchtext/x-patch; charset=US-ASCII; name=v1-0005-Support-buffer-forwarding-in-read_stream.c.patchDownload
From 4e2250ad385572c560c89ed0cf3e345c47651b8c Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Thu, 30 Jan 2025 11:42:03 +1300
Subject: [PATCH v1 5/6] Support buffer forwarding in read_stream.c.

In preparation for a following change to the buffer manager, teach read
stream to keep track of buffers that were "forwarded" from one call to
StartReadBuffers() to the next.

Since StartReadBuffers() buffers argument will become an in/out
argument, we need to initialize the buffer queue entries with
InvalidBuffer.  We don't want to do that up front, because we try to
keep stream initialization cheap and code that uses the fast path stays
in one single buffer queue element.  Satisfy both goals by initializing
the queue incrementally on the first cycle.
---
 src/backend/storage/aio/read_stream.c | 108 ++++++++++++++++++++++----
 1 file changed, 94 insertions(+), 14 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index dc5ae60a089..049fda98257 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -112,8 +112,10 @@ struct ReadStream
 	int16		ios_in_progress;
 	int16		queue_size;
 	int16		max_pinned_buffers;
+	int16		forwarded_buffers;
 	int16		pinned_buffers;
 	int16		distance;
+	int16		initialized_buffers;
 	bool		advice_enabled;
 	bool		temporary;
 
@@ -280,7 +282,9 @@ read_stream_start_pending_read(ReadStream *stream,
 							   bool suppress_advice)
 {
 	bool		need_wait;
+	int			requested_nblocks;
 	int			nblocks;
+	int			forwarded;
 	int			flags;
 	int16		io_index;
 	int16		overflow;
@@ -312,13 +316,34 @@ read_stream_start_pending_read(ReadStream *stream,
 		flags = 0;
 
 	/*
-	 * We say how many blocks we want to read, but may be smaller on return.
-	 * On memory-constrained systems we may be also have to ask for a smaller
-	 * read ourselves.
+	 * On buffer-constrained systems we may need to limit the I/O size by the
+	 * available pin count.
 	 */
+	requested_nblocks = Min(buffer_limit, stream->pending_read_nblocks);
+	nblocks = requested_nblocks;
 	buffer_index = stream->next_buffer_index;
 	io_index = stream->next_io_index;
-	nblocks = Min(buffer_limit, stream->pending_read_nblocks);
+
+	/*
+	 * The first time around the queue we initialize it as we go, including
+	 * the overflow zone, because otherwise the entries would appear as
+	 * forwarded buffers.  This avoids initializing the whole queue up front
+	 * in cases where it is large but we don't ever use it due to the
+	 * all-cached fast path or small scans.
+	 */
+	while (stream->initialized_buffers < buffer_index + nblocks)
+		stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
+
+	/*
+	 * Start the I/O.  Any buffers that are not InvalidBuffer will be
+	 * interpreted as already pinned, forwarded by an earlier call to
+	 * StartReadBuffers(), and must map to the expected blocks.  The nblocks
+	 * value may be smaller on return indicating the size of the I/O that
+	 * could be started.  Buffers beyond the output nblocks number may also
+	 * have been pinned without starting I/O due to various edge cases.  In
+	 * that case we'll just leave them in the queue ahead of us, "forwarded"
+	 * to the next call, avoiding the need to unpin/repin.
+	 */
 	need_wait = StartReadBuffers(&stream->ios[io_index].op,
 								 &stream->buffers[buffer_index],
 								 stream->pending_read_blocknum,
@@ -347,16 +372,35 @@ read_stream_start_pending_read(ReadStream *stream,
 		stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
 	}
 
+	/*
+	 * How many pins were acquired but forwarded to the next call?  These need
+	 * to be passed to the next StartReadBuffers() call, or released if the
+	 * stream ends early.  We need the number for accounting purposes, since
+	 * they are not counted in stream->pinned_buffers but we already hold
+	 * them.
+	 */
+	forwarded = 0;
+	while (nblocks + forwarded < requested_nblocks &&
+		   stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
+		forwarded++;
+	stream->forwarded_buffers = forwarded;
+
 	/*
 	 * We gave a contiguous range of buffer space to StartReadBuffers(), but
-	 * we want it to wrap around at queue_size.  Slide overflowing buffers to
-	 * the front of the array.
+	 * we want it to wrap around at queue_size.  Copy overflowing buffers to
+	 * the front of the array where they'll be consumed, but also leave a copy
+	 * in the overflow zone which the I/O operation has a pointer to (it needs
+	 * a contiguous array).  Both copies will be cleared when the buffers are
+	 * handed to the consumer.
 	 */
-	overflow = (buffer_index + nblocks) - stream->queue_size;
+	overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
 	if (overflow > 0)
-		memmove(&stream->buffers[0],
-				&stream->buffers[stream->queue_size],
-				sizeof(stream->buffers[0]) * overflow);
+	{
+		Assert(overflow < stream->queue_size);	/* can't overlap */
+		memcpy(&stream->buffers[0],
+			   &stream->buffers[stream->queue_size],
+			   sizeof(stream->buffers[0]) * overflow);
+	}
 
 	/* Move to the location of start of next read. */
 	read_stream_index_advance_n(stream, &buffer_index, nblocks);
@@ -381,6 +425,15 @@ read_stream_get_buffer_limit(ReadStream *stream)
 	else
 		buffers = GetAdditionalPinLimit();
 
+	/*
+	 * If we already have some forwarded buffers, we can certainly use those.
+	 * They are already pinned, and are mapped to the starting blocks of the
+	 * pending read, they just don't have any I/O started yet and are not
+	 * counted in stream->pinned_buffers.
+	 */
+	Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
+	buffers += stream->forwarded_buffers;
+
 	/*
 	 * Each stream is always allowed to try to acquire one pin if it doesn't
 	 * hold one already.  This is needed to guarantee progress, and just like
@@ -389,7 +442,7 @@ read_stream_get_buffer_limit(ReadStream *stream)
 	 * pin, ie the buffer pool is simply too small for the workload.
 	 */
 	if (buffers == 0 && stream->pinned_buffers == 0)
-		return 1;
+		buffers = 1;
 
 	/*
 	 * Otherwise, see how many additional pins the backend can currently pin,
@@ -751,10 +804,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 
 		/* Fast path assumptions. */
 		Assert(stream->ios_in_progress == 0);
+		Assert(stream->forwarded_buffers == 0);
 		Assert(stream->pinned_buffers == 1);
 		Assert(stream->distance == 1);
 		Assert(stream->pending_read_nblocks == 0);
 		Assert(stream->per_buffer_data_size == 0);
+		Assert(stream->initialized_buffers > stream->oldest_buffer_index);
 
 		/* We're going to return the buffer we pinned last time. */
 		oldest_buffer_index = stream->oldest_buffer_index;
@@ -803,6 +858,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			stream->distance = 0;
 			stream->oldest_buffer_index = stream->next_buffer_index;
 			stream->pinned_buffers = 0;
+			stream->buffers[oldest_buffer_index] = InvalidBuffer;
 		}
 
 		stream->fast_path = false;
@@ -887,10 +943,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		}
 	}
 
-#ifdef CLOBBER_FREED_MEMORY
-	/* Clobber old buffer for debugging purposes. */
+	/*
+	 * We must zap this queue entry, or else it would appear as a forwarded
+	 * buffer.  If it's potentially in the overflow zone (ie it wrapped around
+	 * the queue), also zap that copy.
+	 */
 	stream->buffers[oldest_buffer_index] = InvalidBuffer;
-#endif
+	if (oldest_buffer_index < io_combine_limit - 1)
+		stream->buffers[stream->queue_size + oldest_buffer_index] =
+			InvalidBuffer;
 
 #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
 
@@ -933,6 +994,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 #ifndef READ_STREAM_DISABLE_FAST_PATH
 	/* See if we can take the fast path for all-cached scans next time. */
 	if (stream->ios_in_progress == 0 &&
+		stream->forwarded_buffers == 0 &&
 		stream->pinned_buffers == 1 &&
 		stream->distance == 1 &&
 		stream->pending_read_nblocks == 0 &&
@@ -968,6 +1030,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 void
 read_stream_reset(ReadStream *stream)
 {
+	int16		index;
 	Buffer		buffer;
 
 	/* Stop looking ahead. */
@@ -981,6 +1044,23 @@ read_stream_reset(ReadStream *stream)
 	while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
 		ReleaseBuffer(buffer);
 
+	/* Unpin any unused forwarded buffers. */
+	index = stream->next_buffer_index;
+	while (index < stream->initialized_buffers &&
+		   (buffer = stream->buffers[index]) != InvalidBuffer)
+	{
+		Assert(stream->forwarded_buffers > 0);
+		stream->forwarded_buffers--;
+		ReleaseBuffer(buffer);
+
+		stream->buffers[index] = InvalidBuffer;
+		if (index < io_combine_limit - 1)
+			stream->buffers[stream->queue_size + index] = InvalidBuffer;
+
+		read_stream_index_advance(stream, &index);
+	}
+
+	Assert(stream->forwarded_buffers == 0);
 	Assert(stream->pinned_buffers == 0);
 	Assert(stream->ios_in_progress == 0);
 
-- 
2.48.1

v1-0006-Support-buffer-forwarding-in-StartReadBuffers.patchtext/x-patch; charset=US-ASCII; name=v1-0006-Support-buffer-forwarding-in-StartReadBuffers.patchDownload
From 37316501bfbe9fc6fc86f72351e18d6f103b9077 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 10 Feb 2025 21:55:40 +1300
Subject: [PATCH v1 6/6] Support buffer forwarding in StartReadBuffers().

Sometimes we have to perform a short read because we hit a cached block
that ends a contiguous run of blocks requiring I/O.  We don't want
StartReadBuffers() to have to start more than one I/O, so we stop there.
We also don't want to have to unpin the cached block (and repin it
later), so previously we'd silently pretend the hit was part of the I/O,
and just leave it out of the read from disk.  Now, we'll "forward" it to
the next call.  We still write it to the buffers[] array for the caller
to pass back to us later, but it's not included in *nblocks.

This policy means that we no longer mix hits and misses in a single
operation's results, so we avoid the requirement to call
WaitReadBuffers(), which might stall, before the caller can make use of
the hits.  The caller will get the hit in the next call instead, and
know that it doesn't have to wait.  That's important for later work on
out-of-order read streams that minimize I/O stalls.

This also makes life easier for proposed work on true AIO, which
occasionally needs to split a large I/O after pinning all the buffers,
while the current coding only ever forwards a single bookending hit.

This API is natural for read_stream.c: it just leaves forwarded buffers
where they are in its circular queue, where the next call will pick them
up and continue, minimizing pin churn.

If we ever think of a good reason to disable this feature, i.e. for
other users of StartReadBuffers() that don't want to deal with forwarded
buffers, then we could add a flag for that.  For now read_steam.c is the
only user.
---
 src/backend/storage/buffer/bufmgr.c | 128 ++++++++++++++++++++--------
 src/include/storage/bufmgr.h        |   1 -
 2 files changed, 91 insertions(+), 38 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 2ca641204fb..6fe70328e38 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1257,10 +1257,10 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 					 Buffer *buffers,
 					 BlockNumber blockNum,
 					 int *nblocks,
-					 int flags)
+					 int flags,
+					 bool allow_forwarding)
 {
 	int			actual_nblocks = *nblocks;
-	int			io_buffers_len = 0;
 	int			maxcombine = 0;
 
 	Assert(*nblocks > 0);
@@ -1270,30 +1270,80 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 	{
 		bool		found;
 
-		buffers[i] = PinBufferForBlock(operation->rel,
-									   operation->smgr,
-									   operation->persistence,
-									   operation->forknum,
-									   blockNum + i,
-									   operation->strategy,
-									   &found);
+		if (allow_forwarding && buffers[i] != InvalidBuffer)
+		{
+			BufferDesc *bufHdr;
+
+			/*
+			 * This is a buffer that was pinned by an earlier call to
+			 * StartReadBuffers(), but couldn't be handled in one operation at
+			 * that time.  The operation was split, and the caller has passed
+			 * an already pinned buffer back to us to handle the rest of the
+			 * operation.  It must continue at the expected block number.
+			 */
+			Assert(BufferGetBlockNumber(buffers[i]) == blockNum + i);
+
+			/*
+			 * It might be an already valid buffer (a hit) that followed the
+			 * final contiguous block of an earlier I/O (a miss) marking the
+			 * end of it, or a buffer that some other backend has since made
+			 * valid by performing the I/O for us, in which case we can handle
+			 * it as a hit now.  It is safe to check for a BM_VALID flag with
+			 * a relaxed load, because we got a fresh view of it while pinning
+			 * it in the previous call.
+			 *
+			 * On the other hand if we don't see BM_VALID yet, it must be an
+			 * I/O that was split by the previous call and we need to try to
+			 * start a new I/O from this block.  We're also racing against any
+			 * other backend that might start the I/O or even manage to mark
+			 * it BM_VALID after this check, BM_VALID after this check, but
+			 * StartBufferIO() will handle those cases.
+			 */
+			if (BufferIsLocal(buffers[i]))
+				bufHdr = GetLocalBufferDescriptor(-buffers[i] - 1);
+			else
+				bufHdr = GetBufferDescriptor(buffers[i] - 1);
+			found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID;
+		}
+		else
+		{
+			buffers[i] = PinBufferForBlock(operation->rel,
+										   operation->smgr,
+										   operation->persistence,
+										   operation->forknum,
+										   blockNum + i,
+										   operation->strategy,
+										   &found);
+		}
 
 		if (found)
 		{
 			/*
-			 * Terminate the read as soon as we get a hit.  It could be a
-			 * single buffer hit, or it could be a hit that follows a readable
-			 * range.  We don't want to create more than one readable range,
-			 * so we stop here.
+			 * We have a hit.  If it's the first block in the requested range,
+			 * we can return it immediately and report that WaitReadBuffers()
+			 * does not need to be called.  If the initial value of *nblocks
+			 * was larger, the caller will have to call again for the rest.
 			 */
-			actual_nblocks = i + 1;
+			if (i == 0)
+			{
+				*nblocks = 1;
+				return false;
+			}
+
+			/*
+			 * Otherwise we already have an I/O to perform, but this block
+			 * can't be included as it is already valid.  Split the I/O here.
+			 * There may or may not be more blocks requiring I/O after this
+			 * one, we haven't checked, but it can't be contiguous with this
+			 * hit in the way.  We'll leave this buffer pinned, forwarding it
+			 * to the next call, avoiding the need to unpin it here and re-pin
+			 * it in the next call.
+			 */
+			actual_nblocks = i;
 			break;
 		}
 		else
 		{
-			/* Extend the readable range to cover this block. */
-			io_buffers_len++;
-
 			/*
 			 * Check how many blocks we can cover with the same IO. The smgr
 			 * implementation might e.g. be limited due to a segment boundary.
@@ -1314,15 +1364,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 	}
 	*nblocks = actual_nblocks;
 
-	if (likely(io_buffers_len == 0))
-		return false;
-
 	/* Populate information needed for I/O. */
 	operation->buffers = buffers;
 	operation->blocknum = blockNum;
 	operation->flags = flags;
 	operation->nblocks = actual_nblocks;
-	operation->io_buffers_len = io_buffers_len;
 
 	if (flags & READ_BUFFERS_ISSUE_ADVICE)
 	{
@@ -1337,7 +1383,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 		smgrprefetch(operation->smgr,
 					 operation->forknum,
 					 blockNum,
-					 operation->io_buffers_len);
+					 actual_nblocks);
 	}
 
 	/* Indicate that WaitReadBuffers() should be called. */
@@ -1351,11 +1397,21 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
  * actual number, which may be fewer than requested.  Caller sets some of the
  * members of operation; see struct definition.
  *
+ * The initial contents of the elements of buffers up to *nblocks should
+ * either be InvalidBuffer or an already-pinned buffer that was left by an
+ * preceding call to StartReadBuffers() that had to be split.  On return, some
+ * elements of buffers may hold pinned buffers beyond the number indicated by
+ * the updated value of *nblocks.  Operations are split on boundaries known to
+ * smgr (eg md.c segment boundaries that require crossing into a different
+ * underlying file), or when already cached blocks are found in the buffer
+ * that prevent the formation of a contiguous read.
+ *
  * If false is returned, no I/O is necessary.  If true is returned, one I/O
  * has been started, and WaitReadBuffers() must be called with the same
  * operation object before the buffers are accessed.  Along with the operation
  * object, the caller-supplied array of buffers must remain valid until
- * WaitReadBuffers() is called.
+ * WaitReadBuffers() is called, and any forwarded buffers must also be
+ * preserved for a future call unless explicitly released.
  *
  * Currently the I/O is only started with optional operating system advice if
  * requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O
@@ -1369,13 +1425,18 @@ StartReadBuffers(ReadBuffersOperation *operation,
 				 int *nblocks,
 				 int flags)
 {
-	return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags);
+	return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags,
+								true /* expect forwarded buffers */ );
 }
 
 /*
  * Single block version of the StartReadBuffers().  This might save a few
  * instructions when called from another translation unit, because it is
  * specialized for nblocks == 1.
+ *
+ * This version does not support "forwarded" buffers: they cannot be created
+ * by reading only one block, and the current contents of *buffer is ignored
+ * on entry.
  */
 bool
 StartReadBuffer(ReadBuffersOperation *operation,
@@ -1386,7 +1447,8 @@ StartReadBuffer(ReadBuffersOperation *operation,
 	int			nblocks = 1;
 	bool		result;
 
-	result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags);
+	result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags,
+								  false /* single block, no forwarding */ );
 	Assert(nblocks == 1);		/* single block can't be short */
 
 	return result;
@@ -1416,24 +1478,16 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 	IOObject	io_object;
 	char		persistence;
 
-	/*
-	 * Currently operations are only allowed to include a read of some range,
-	 * with an optional extra buffer that is already pinned at the end.  So
-	 * nblocks can be at most one more than io_buffers_len.
-	 */
-	Assert((operation->nblocks == operation->io_buffers_len) ||
-		   (operation->nblocks == operation->io_buffers_len + 1));
-
 	/* Find the range of the physical read we need to perform. */
-	nblocks = operation->io_buffers_len;
-	if (nblocks == 0)
-		return;					/* nothing to do */
-
+	nblocks = operation->nblocks;
 	buffers = &operation->buffers[0];
 	blocknum = operation->blocknum;
 	forknum = operation->forknum;
 	persistence = operation->persistence;
 
+	Assert(nblocks > 0);
+	Assert(nblocks <= MAX_IO_COMBINE_LIMIT);
+
 	if (persistence == RELPERSISTENCE_TEMP)
 	{
 		io_context = IOCONTEXT_NORMAL;
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 597ecb97897..4a035f59a7d 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -130,7 +130,6 @@ struct ReadBuffersOperation
 	BlockNumber blocknum;
 	int			flags;
 	int16		nblocks;
-	int16		io_buffers_len;
 };
 
 typedef struct ReadBuffersOperation ReadBuffersOperation;
-- 
2.48.1

#2Kirill Reshke
reshkekirill@gmail.com
In reply to: Thomas Munro (#1)
Re: Some read stream improvements

On Mon, 17 Feb 2025 at 09:55, Thomas Munro <thomas.munro@gmail.com> wrote:

Hi,

Here are some patches that address some of Andres's feedback since the
AIO v2 rebase[1], anticipate out-of-order streams, and make some other
minor improvements. They are independent of the main AIO patch set
and apply to master, hence separate thread.

Hi, great!

0001-Refactor-read_stream.c-s-circular-arithmetic.patch

This just replaced open-coded arithmetic with inline functions. They
will be used a lot more in later work, and provide central places to
put assertions that were not checked as uniformly as I would like.

Just out of curiosity, should we `Assert(*index + n <
stream->queue_size);` in `read_stream_index_advance_n`?

--
Best regards,
Kirill Reshke

#3Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#1)
Re: Some read stream improvements

On Mon, Feb 17, 2025 at 5:55 PM Thomas Munro <thomas.munro@gmail.com> wrote:

The solution we agreed on is to introduce a way for StartReadBuffers()
to communicate with future calls, and "forward" pinned buffers between
calls. The function arguments don't change, but its "buffers"
argument becomes an in/out array: one StartReadBuffers() call can
leave extra pinned buffers after the ones that were included in a
short read (*nblocks), and then when you retry (or possibly extend)
the rest of the read, you have to pass them back in. That is easy for
the read stream code, as it can just leave them in its circular queue
for the next call to take as input. It only needs to be aware of them
for pin limit accounting and stream reset (including early shutdown).

BTW here's a small historical footnote about that: I had another
solution to the same problem in the original stream proposal[1]/messages/by-id/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6uT5TUm2gkvA@mail.gmail.com, which
used a three-step bufmgr API. You had to call PrepareReadBuffer() for
each block you intended to read, and then StartReadBuffers() for each
cluster of adjacent misses it reported, and finally WaitReadBuffers().
Hits would require only the first step. That was intended to allow
the stream to manage the prepared buffers itself, short reads would
leave some of them for a later call. Based on review feedback, I
simplified to arrive at the two-step API, ie just start and wait, and
PrepareReadBuffer() became the private helper function
PinBufferForBlock(). I had to invent the "hide-the-trailing-hit"
trick to avoid unpin/repin sequence in the two-step API, thinking we
might eventually want to consider the three-step API again later.
This new design achieves the same end result: buffers that can't be
part of an I/O stay pinned and in the stream's queue ready for the
next call, just like "prepared" buffers in the early prototypes, but
it keeps the two-step bufmgr API and calls them "forwarded".

[1]: /messages/by-id/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6uT5TUm2gkvA@mail.gmail.com

#4Thomas Munro
thomas.munro@gmail.com
In reply to: Kirill Reshke (#2)
Re: Some read stream improvements

On Mon, Feb 17, 2025 at 6:55 PM Kirill Reshke <reshkekirill@gmail.com> wrote:

Just out of curiosity, should we `Assert(*index + n <
stream->queue_size);` in `read_stream_index_advance_n`?

No: it is allowed to be >= queue_size temporarily, but if so we
subtract queue_size. The result should be equal to (index + n) %
queue_size, assuming small values of n, except we don't want to use %
in hot code. Perhaps we should assert that though!

#5Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#1)
Re: Some read stream improvements

Hi,

On 2025-02-17 17:55:09 +1300, Thomas Munro wrote:

0004-Respect-pin-limits-accurately-in-read_stream.c.patch

The current coding only computes the remaining "fair share" of the
buffer pool for this backend at stream initialisation. It's hard, but
not impossible, to get one backend to pin more than 1/max_connections
of the buffer pool (the intended cap), when using multiple streams at
the same time in one backend. This patch moves the time of check to
the time of use, so it respects the limit strictly. I avoid adding
any changes to the fast path for all-cached streams, which only pin
one buffer at a time.

I was working on expanding tests for AIO and as part of that wrote a test for
temp tables -- our coverage is fairly awful, there were many times during AIO
development where I knew I had trivially reachable temp table specific bugs
but all tests passed.

The test for that does trigger the problem described above and is fixed by the
patches in this thread (which I included in the other thread):

/messages/by-id/knr4aazlaa4nj3xnpe4tu6plwayovzxhmteatcpry2j6a6kc4v@aonkl53s2ecs

Just linked instead of attached to not trigger cfbot.

Greetings,

Andres Freund

#6Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#5)
Re: Some read stream improvements

On Wed, Feb 26, 2025 at 10:55 PM Andres Freund <andres@anarazel.de> wrote:

I was working on expanding tests for AIO and as part of that wrote a test for
temp tables -- our coverage is fairly awful, there were many times during AIO
development where I knew I had trivially reachable temp table specific bugs
but all tests passed.

The test for that does trigger the problem described above and is fixed by the
patches in this thread (which I included in the other thread):

Thanks. Alright, I'm assuming that you don't have any objections to
the way I restyled that API, so I'm going to go ahead and push some of
these shortly, and then follow up with a few newer patches that
simplify and improve the look-ahead and advice control. More very
soon.

#7Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#6)
Re: Some read stream improvements

On Thu, Feb 27, 2025 at 11:19 AM Thomas Munro <thomas.munro@gmail.com> wrote:

On Wed, Feb 26, 2025 at 10:55 PM Andres Freund <andres@anarazel.de> wrote:

I was working on expanding tests for AIO and as part of that wrote a test for
temp tables -- our coverage is fairly awful, there were many times during AIO
development where I knew I had trivially reachable temp table specific bugs
but all tests passed.

The test for that does trigger the problem described above and is fixed by the
patches in this thread (which I included in the other thread):

Thanks. Alright, I'm assuming that you don't have any objections to
the way I restyled that API, so I'm going to go ahead and push some of
these shortly, and then follow up with a few newer patches that
simplify and improve the look-ahead and advice control. More very
soon.

Ugh, I realised in another round of self-review that that version
could exceed the soft limit by a small amount if the registered
callback pins more buffers underneath it, so not pushed yet. I think
I see how to fix that (namely the alternative design that a comment
already contemplated), more soon...

#8Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#6)
Re: Some read stream improvements

On 2025-02-27 11:19:55 +1300, Thomas Munro wrote:

On Wed, Feb 26, 2025 at 10:55 PM Andres Freund <andres@anarazel.de> wrote:

I was working on expanding tests for AIO and as part of that wrote a test for
temp tables -- our coverage is fairly awful, there were many times during AIO
development where I knew I had trivially reachable temp table specific bugs
but all tests passed.

The test for that does trigger the problem described above and is fixed by the
patches in this thread (which I included in the other thread):

Thanks. Alright, I'm assuming that you don't have any objections to
the way I restyled that API, so I'm going to go ahead and push some of
these shortly, and then follow up with a few newer patches that
simplify and improve the look-ahead and advice control. More very
soon.

Indeed, no objections, rather the opposite. Thanks!

Greetings,

Andres Freund

#9Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#8)
4 attachment(s)
Re: Some read stream improvements

On Thu, Feb 27, 2025 at 11:20 PM Andres Freund <andres@anarazel.de> wrote:

On 2025-02-27 11:19:55 +1300, Thomas Munro wrote:

On Wed, Feb 26, 2025 at 10:55 PM Andres Freund <andres@anarazel.de> wrote:

I was working on expanding tests for AIO and as part of that wrote a test for
temp tables -- our coverage is fairly awful, there were many times during AIO
development where I knew I had trivially reachable temp table specific bugs
but all tests passed.

The test for that does trigger the problem described above and is fixed by the
patches in this thread (which I included in the other thread):

Here is a subset of those patches again:

1. Per-backend buffer limit, take III. Now the check is in
read_stream_start_pending_read() so TOC == TOU.

Annoyingly, test cases like the one below still fail, despite
following the rules. The other streams eat all the buffers and then
one gets an allowance of zero, but uses its right to take one pin
anyway to make progress, and there isn't one. I wonder if we should
use temp_buffers - 100? Then leave the minimum GUC value at 100
still, so you have an easy way to test with 0, 1, ... additional
buffers?

2. It shouldn't give up issuing random advice immediately after a
jump, or it could stall on (say) the second 128kB of a 256kB
sequential chunk (ie the strace you showed on the BHS thread). It
only makes sense to assume kernel readahead takes over once you've
actually *read* sequentially. In practice this makes it a lot more
aggressive about advice (like the BHS code in master): it only gives
up if the whole look-ahead window is sequential.

3. Change the distance algorithm to care only about hits and misses,
not sequential heuristics. It made at least some sense before, but it
doesn't make sense for AIO, and even in synchronous mode it means that
you hit random jumps with insufficient look-ahead, so I don't think we
should keep it.

I also realised that the sequential heuristics are confused by that
hidden trailing block thing, so in contrived pattern testing with
hit-miss-hit-miss... would be considered sequential, and even if you
fix that (the forwarding patches above fix that), an exact
hit-miss-hit-miss pattern also gets stuck between distances 1 and 2
(double, decrement, double, ... might be worth waiting a bit longer
before decrementing, IDK.

I'll rebase the others and post soon.

set io_combine_limit = 32;
set temp_buffers = 100;

create temp table t1 as select generate_series(1, 10000);
create temp table t2 as select generate_series(1, 10000);
create temp table t3 as select generate_series(1, 10000);
create temp table t4 as select generate_series(1, 10000);
create temp table t5 as select generate_series(1, 10000);

do
$$
declare
c1 cursor for select * from t1;
c2 cursor for select * from t2;
c3 cursor for select * from t3;
c4 cursor for select * from t4;
c5 cursor for select * from t5;
x record;
begin
open c1;
open c2;
open c3;
open c4;
open c5;
loop
fetch next from c1 into x;
exit when not found;
fetch next from c2 into x;
exit when not found;
fetch next from c3 into x;
exit when not found;
fetch next from c4 into x;
exit when not found;
fetch next from c5 into x;
exit when not found;
end loop;
end;
$$;

Attachments:

v2-0001-Improve-buffer-manager-API-for-backend-pin-limits.patchtext/x-patch; charset=US-ASCII; name=v2-0001-Improve-buffer-manager-API-for-backend-pin-limits.patchDownload
From dd6b334cb744e71fc042057624e4953e3c039629 Mon Sep 17 00:00:00 2001
From: Thomas Munro <tmunro@postgresql.org>
Date: Thu, 27 Feb 2025 21:03:39 +1300
Subject: [PATCH v2 1/4] Improve buffer manager API for backend pin limits.

Previously the support functions assumed that the caller needed one pin
to make progress, and could optionally use some more.  Add a couple more
functions for callers that want to know:

* what the maximum possible number could be irrespective of currently
  held pins, for space planning purposes, called the "soft pin limit"

* how many additional pins they could acquire right now, without the
  special case allowing one pin, for users that already hold pins and
  could make progress even if zero extra pins are available

These APIs are better suited to read_stream.c, which will be improved in
a follow-up patch.  Also compute MaxProportionalPins up front, to avoid
performing division whenever we check the balance.

Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
---
 src/backend/storage/buffer/bufmgr.c   | 83 +++++++++++++++++++--------
 src/backend/storage/buffer/localbuf.c | 16 ++++++
 src/include/storage/bufmgr.h          |  4 ++
 3 files changed, 78 insertions(+), 25 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 7915ed624c1..3f44681b1fe 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -211,6 +211,8 @@ static int32 PrivateRefCountOverflowed = 0;
 static uint32 PrivateRefCountClock = 0;
 static PrivateRefCountEntry *ReservedRefCountEntry = NULL;
 
+static uint32 MaxProportionalPins;
+
 static void ReservePrivateRefCountEntry(void);
 static PrivateRefCountEntry *NewPrivateRefCountEntry(Buffer buffer);
 static PrivateRefCountEntry *GetPrivateRefCountEntry(Buffer buffer, bool do_move);
@@ -2097,43 +2099,65 @@ again:
 	return buf;
 }
 
+/*
+ * Return the maximum number of buffer than this backend should try to pin at
+ * once, to avoid pinning more than its fair share.  This is the highest value
+ * that GetAdditionalPinLimit() and LimitAdditionalPins() could ever return.
+ *
+ * It's called a soft limit because nothing stops a backend from trying to
+ * acquire more pins than this if it needs them to make progress, but code that
+ * wants optional extra buffers for optimizations should respect this
+ * per-backend limit.
+ */
+uint32
+GetSoftPinLimit(void)
+{
+	return MaxProportionalPins;
+}
+
+/*
+ * Return the maximum number of additional buffers that this backend should
+ * pin if it wants to stay under the per-backend soft limit, considering the
+ * number of buffers it has already pinned.
+ */
+uint32
+GetAdditionalPinLimit(void)
+{
+	uint32		estimated_pins_held;
+
+	/*
+	 * We get the number of "overflowed" pins for free, but don't know the
+	 * number of pins in PrivateRefCountArray.  The cost of calculating that
+	 * exactly doesn't seem worth it, so just assume the max.
+	 */
+	estimated_pins_held = PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES;
+
+	/* Is this backend already holding more than its fair share? */
+	if (estimated_pins_held > MaxProportionalPins)
+		return 0;
+
+	return MaxProportionalPins - estimated_pins_held;
+}
+
 /*
  * Limit the number of pins a batch operation may additionally acquire, to
  * avoid running out of pinnable buffers.
  *
- * One additional pin is always allowed, as otherwise the operation likely
- * cannot be performed at all.
- *
- * The number of allowed pins for a backend is computed based on
- * shared_buffers and the maximum number of connections possible. That's very
- * pessimistic, but outside of toy-sized shared_buffers it should allow
- * sufficient pins.
+ * One additional pin is always allowed, on the assumption that the operation
+ * requires at least one to make progress.
  */
 void
 LimitAdditionalPins(uint32 *additional_pins)
 {
-	uint32		max_backends;
-	int			max_proportional_pins;
+	uint32		limit;
 
 	if (*additional_pins <= 1)
 		return;
 
-	max_backends = MaxBackends + NUM_AUXILIARY_PROCS;
-	max_proportional_pins = NBuffers / max_backends;
-
-	/*
-	 * Subtract the approximate number of buffers already pinned by this
-	 * backend. We get the number of "overflowed" pins for free, but don't
-	 * know the number of pins in PrivateRefCountArray. The cost of
-	 * calculating that exactly doesn't seem worth it, so just assume the max.
-	 */
-	max_proportional_pins -= PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES;
-
-	if (max_proportional_pins <= 0)
-		max_proportional_pins = 1;
-
-	if (*additional_pins > max_proportional_pins)
-		*additional_pins = max_proportional_pins;
+	limit = GetAdditionalPinLimit();
+	limit = Max(limit, 1);
+	if (limit < *additional_pins)
+		*additional_pins = limit;
 }
 
 /*
@@ -3575,6 +3599,15 @@ InitBufferManagerAccess(void)
 {
 	HASHCTL		hash_ctl;
 
+	/*
+	 * The soft limit on the number of pins each backend should respect, based
+	 * on shared_buffers and the maximum number of connections possible.
+	 * That's very pessimistic, but outside toy-sized shared_buffers it should
+	 * allow plenty of pins.  LimitAdditionalPins() or GetAdditionalPinLimit()
+	 * can be used to check the remaining balance.
+	 */
+	MaxProportionalPins = NBuffers / (MaxBackends + NUM_AUXILIARY_PROCS);
+
 	memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray));
 
 	hash_ctl.keysize = sizeof(int32);
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 80b83444eb2..5378ba84316 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -286,6 +286,22 @@ GetLocalVictimBuffer(void)
 	return BufferDescriptorGetBuffer(bufHdr);
 }
 
+/* see GetSoftPinLimit() */
+uint32
+GetSoftLocalPinLimit(void)
+{
+	/* Every backend has its own temporary buffers, and can pin them all. */
+	return num_temp_buffers;
+}
+
+/* see GetAdditionalPinLimit() */
+uint32
+GetAdditionalLocalPinLimit(void)
+{
+	Assert(NLocalPinnedBuffers <= num_temp_buffers);
+	return num_temp_buffers - NLocalPinnedBuffers;
+}
+
 /* see LimitAdditionalPins() */
 void
 LimitAdditionalLocalPins(uint32 *additional_pins)
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 7c1e4316dde..597ecb97897 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -290,6 +290,10 @@ extern bool HoldingBufferPinThatDelaysRecovery(void);
 
 extern bool BgBufferSync(struct WritebackContext *wb_context);
 
+extern uint32 GetSoftPinLimit(void);
+extern uint32 GetSoftLocalPinLimit(void);
+extern uint32 GetAdditionalPinLimit(void);
+extern uint32 GetAdditionalLocalPinLimit(void);
 extern void LimitAdditionalPins(uint32 *additional_pins);
 extern void LimitAdditionalLocalPins(uint32 *additional_pins);
 
-- 
2.39.5

v2-0002-Respect-pin-limits-accurately-in-read_stream.c.patchtext/x-patch; charset=US-ASCII; name=v2-0002-Respect-pin-limits-accurately-in-read_stream.c.patchDownload
From 812bc196c977fb3610d3ea8320988d6bd4b00f29 Mon Sep 17 00:00:00 2001
From: Thomas Munro <tmunro@postgresql.org>
Date: Thu, 27 Feb 2025 21:42:05 +1300
Subject: [PATCH v2 2/4] Respect pin limits accurately in read_stream.c.

To avoid pinning too much of the buffer pool at once, we previously used
LimitAdditionalBuffers().  The coding was naive, and only considered the
available buffers at stream construction time.

This commit checks at the time of use with new buffer manager APIs.  The
result might change dynamically due to pins acquired outside this stream
by the same backend.  No extra CPU cycles are added to the all-buffered
fast-path code, but the I/O-starting path now considers the up-to-date
remaining buffer limit when making look-ahead decisions.

In practice it was very difficult to exceed the limit in v17, so no
back-patch, but changes due to land soon make it easy.

Per code review from Andres, in the course of testing his AIO patches.

Reported-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
---
 src/backend/storage/aio/read_stream.c | 108 ++++++++++++++++++++++----
 1 file changed, 94 insertions(+), 14 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 04bdb5e6d4b..6c2b4ec011d 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -115,6 +115,7 @@ struct ReadStream
 	int16		pinned_buffers;
 	int16		distance;
 	bool		advice_enabled;
+	bool		temporary;
 
 	/*
 	 * One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -224,7 +225,17 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
 	stream->buffered_blocknum = blocknum;
 }
 
-static void
+/*
+ * Start as much of the current pending read as we can.  If we have to split it
+ * because of the per-backend buffer limit, or the buffer manager decides to
+ * split it, then the pending read is adjusted to hold the remaining portion.
+ *
+ * We can always start a read of at least size one if we have no progress yet.
+ * Otherwise it's possible that we can't start a read at all because of a lack
+ * of buffers, and then false is returned.  Buffer shortages also reduce the
+ * distance to a level that prevents look-ahead until buffers are released.
+ */
+static bool
 read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 {
 	bool		need_wait;
@@ -233,12 +244,13 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	int16		io_index;
 	int16		overflow;
 	int16		buffer_index;
+	int16		buffer_limit;
 
 	/* This should only be called with a pending read. */
 	Assert(stream->pending_read_nblocks > 0);
 	Assert(stream->pending_read_nblocks <= io_combine_limit);
 
-	/* We had better not exceed the pin limit by starting this read. */
+	/* We had better not exceed the per-stream buffer limit with this read. */
 	Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
 		   stream->max_pinned_buffers);
 
@@ -259,10 +271,39 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	else
 		flags = 0;
 
-	/* We say how many blocks we want to read, but may be smaller on return. */
+	/* Compute the remaining portion of the per-backend buffer limit. */
+	if (stream->temporary)
+		buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
+	else
+		buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
+	if (buffer_limit == 0 && stream->pinned_buffers == 0)
+		buffer_limit = 1;		/* guarantee progress */
+
+	/* Does the per-backend buffer limit affect this read? */
+	nblocks = stream->pending_read_nblocks;
+	if (buffer_limit < nblocks)
+	{
+		int16		new_distance;
+
+		/* Shrink distance: no more look-ahead until buffers are released. */
+		new_distance = stream->pinned_buffers + buffer_limit;
+		if (stream->distance > new_distance)
+			stream->distance = new_distance;
+
+		/* If we've already made progress, just give up and wait for buffers. */
+		if (stream->pinned_buffers > 0)
+			return false;
+
+		/* A short read is required to make progress. */
+		nblocks = buffer_limit;
+	}
+
+	/*
+	 * We say how many blocks we want to read, but it may be smaller on return
+	 * if the buffer manager decides it needs a short read at its level.
+	 */
 	buffer_index = stream->next_buffer_index;
 	io_index = stream->next_io_index;
-	nblocks = stream->pending_read_nblocks;
 	need_wait = StartReadBuffers(&stream->ios[io_index].op,
 								 &stream->buffers[buffer_index],
 								 stream->pending_read_blocknum,
@@ -312,19 +353,27 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	/* Adjust the pending read to cover the remaining portion, if any. */
 	stream->pending_read_blocknum += nblocks;
 	stream->pending_read_nblocks -= nblocks;
+
+	return true;
 }
 
 static void
 read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 {
 	while (stream->ios_in_progress < stream->max_ios &&
-		   stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
+		   ((stream->pinned_buffers == 0 && stream->distance > 0) ||
+			stream->pinned_buffers + stream->pending_read_nblocks < stream->distance))
 	{
 		BlockNumber blocknum;
 		int16		buffer_index;
 		void	   *per_buffer_data;
 
-		if (stream->pending_read_nblocks == io_combine_limit)
+		/* If have a pending read that can't be extended, start it now. */
+		Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
+			   stream->max_pinned_buffers);
+		if (stream->pending_read_nblocks == io_combine_limit ||
+			(stream->pinned_buffers == 0 &&
+			 stream->pending_read_nblocks == stream->max_pinned_buffers))
 		{
 			read_stream_start_pending_read(stream, suppress_advice);
 			suppress_advice = false;
@@ -360,14 +409,15 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 		/* We have to start the pending read before we can build another. */
 		while (stream->pending_read_nblocks > 0)
 		{
-			read_stream_start_pending_read(stream, suppress_advice);
-			suppress_advice = false;
-			if (stream->ios_in_progress == stream->max_ios)
+			if (!read_stream_start_pending_read(stream, suppress_advice) ||
+				stream->ios_in_progress == stream->max_ios)
 			{
-				/* And we've hit the limit.  Rewind, and stop here. */
+				/* And we've hit a buffer or I/O limit.  Rewind and wait. */
 				read_stream_unget_block(stream, blocknum);
 				return;
 			}
+
+			suppress_advice = false;
 		}
 
 		/* This is the start of a new pending read. */
@@ -390,6 +440,14 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 		 stream->distance == 0) &&
 		stream->ios_in_progress < stream->max_ios)
 		read_stream_start_pending_read(stream, suppress_advice);
+
+	/*
+	 * There should always be something pinned when we leave this function,
+	 * whether started by this call or not, unless we've hit the end of the
+	 * stream.  In the worst case we can always make progress one buffer at a
+	 * time.
+	 */
+	Assert(stream->pinned_buffers > 0 || stream->distance == 0);
 }
 
 /*
@@ -418,6 +476,7 @@ read_stream_begin_impl(int flags,
 	int			max_ios;
 	int			strategy_pin_limit;
 	uint32		max_pinned_buffers;
+	uint32		max_possible_buffer_limit;
 	Oid			tablespace_id;
 
 	/*
@@ -465,12 +524,23 @@ read_stream_begin_impl(int flags,
 	strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
 	max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
 
-	/* Don't allow this backend to pin more than its share of buffers. */
+	/*
+	 * Also limit our queue to the maximum number of pins we could possibly
+	 * ever be allowed to acquire according to the buffer manager.  We may not
+	 * really be able to use them all due to other pins held by this backend,
+	 * but we'll check that later in read_stream_start_pending_read().
+	 */
 	if (SmgrIsTemp(smgr))
-		LimitAdditionalLocalPins(&max_pinned_buffers);
+		max_possible_buffer_limit = GetSoftLocalPinLimit();
 	else
-		LimitAdditionalPins(&max_pinned_buffers);
-	Assert(max_pinned_buffers > 0);
+		max_possible_buffer_limit = GetSoftPinLimit();
+	max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
+
+	/*
+	 * The soft limit might be zero on a system configured with more
+	 * connections than buffers.  We need at least one to make progress.
+	 */
+	max_pinned_buffers = Max(1, max_pinned_buffers);
 
 	/*
 	 * We need one extra entry for buffers and per-buffer data, because users
@@ -530,6 +600,7 @@ read_stream_begin_impl(int flags,
 	stream->callback = callback;
 	stream->callback_private_data = callback_private_data;
 	stream->buffered_blocknum = InvalidBlockNumber;
+	stream->temporary = SmgrIsTemp(smgr);
 
 	/*
 	 * Skip the initial ramp-up phase if the caller says we're going to be
@@ -658,6 +729,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			 * arbitrary I/O entry (they're all free).  We don't have to
 			 * adjust pinned_buffers because we're transferring one to caller
 			 * but pinning one more.
+			 *
+			 * In the fast path we don't need to check the pin limit.  We're
+			 * always allowed at least one pin so that progress can be made,
+			 * and that's all we need here.  Although two pins are momentarily
+			 * held at the same time, the model used here is that the stream
+			 * holds only one, and the other now belongs to the caller.
 			 */
 			if (likely(!StartReadBuffer(&stream->ios[0].op,
 										&stream->buffers[oldest_buffer_index],
@@ -858,6 +935,9 @@ read_stream_reset(ReadStream *stream)
 	stream->buffered_blocknum = InvalidBlockNumber;
 	stream->fast_path = false;
 
+	/* There is no point in reading whatever was pending. */
+	stream->pending_read_nblocks = 0;
+
 	/* Unpin anything that wasn't consumed. */
 	while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
 		ReleaseBuffer(buffer);
-- 
2.39.5

v2-0003-Improve-read-stream-advice-for-larger-random-read.patchtext/x-patch; charset=US-ASCII; name=v2-0003-Improve-read-stream-advice-for-larger-random-read.patchDownload
From f4d0c7c81c1b960e71d7f76f1262279b329eac07 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 18 Feb 2025 15:59:13 +1300
Subject: [PATCH v2 3/4] Improve read stream advice for larger random reads.

read_stream.c tries not to issue advice when it thinks the kernel's
readahead should be active, ie when using buffered I/O and reading
sequential blocks.  It previously gave up a little too easily: it
should issue advice until it has started running sequential pread()
calls, not just when it's planning to.  The simpler strategy worked for
random regions of size <= io_combine_limit and large sequential regions,
but not so well when reading random regions of size
> io_combine limit.  For a 256kB chunk of data far away
from recent access, it would issue advice for the first half (assuming
io_combine_limit=128kB) and then suffer an I/O stall for the second
half.

Discovered by Tomas Vondra's regression testing of many data clustering
patterns using Melanie Plageman's streaming Bitmap Heap Scan patch, with
analysis of the I/O stall-producing pattern from Andres Freund.

Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
Discussion: https://postgr.es/m/CA%2BhUKGJ3HSWciQCz8ekP1Zn7N213RfA4nbuotQawfpq23%2Bw-5Q%40mail.gmail.com
---
 src/backend/storage/aio/read_stream.c | 44 +++++++++++++++++++++------
 1 file changed, 35 insertions(+), 9 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 6c2b4ec011d..a028217a08e 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -132,6 +132,7 @@ struct ReadStream
 
 	/* Next expected block, for detecting sequential access. */
 	BlockNumber seq_blocknum;
+	BlockNumber seq_start;
 
 	/* The read operation we are currently preparing. */
 	BlockNumber pending_read_blocknum;
@@ -260,16 +261,30 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	else
 		Assert(stream->next_buffer_index == stream->oldest_buffer_index);
 
-	/*
-	 * If advice hasn't been suppressed, this system supports it, and this
-	 * isn't a strictly sequential pattern, then we'll issue advice.
-	 */
-	if (!suppress_advice &&
-		stream->advice_enabled &&
-		stream->pending_read_blocknum != stream->seq_blocknum)
+	/* Do we need to issue read-ahead advice? */
+	flags = 0;
+	if (stream->advice_enabled)
+	{
 		flags = READ_BUFFERS_ISSUE_ADVICE;
-	else
-		flags = 0;
+
+		if (stream->pending_read_blocknum == stream->seq_blocknum)
+		{
+			/*
+			 * Suppress advice if our WaitReadBuffers() calls have caught up
+			 * with the first advice we issued for this sequential run.
+			 */
+			if (stream->seq_start == InvalidBlockNumber)
+				suppress_advice = true;
+		}
+		else
+		{
+			/* Random jump, so start a new sequential run. */
+			stream->seq_start = stream->pending_read_blocknum;
+		}
+
+		if (suppress_advice)
+			flags = 0;
+	}
 
 	/* Compute the remaining portion of the per-backend buffer limit. */
 	if (stream->temporary)
@@ -601,6 +616,8 @@ read_stream_begin_impl(int flags,
 	stream->callback_private_data = callback_private_data;
 	stream->buffered_blocknum = InvalidBlockNumber;
 	stream->temporary = SmgrIsTemp(smgr);
+	stream->seq_blocknum = InvalidBlockNumber;
+	stream->seq_start = InvalidBlockNumber;
 
 	/*
 	 * Skip the initial ramp-up phase if the caller says we're going to be
@@ -825,6 +842,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			distance = stream->distance * 2;
 			distance = Min(distance, stream->max_pinned_buffers);
 			stream->distance = distance;
+
+			/*
+			 * If we've caught up with the first advice issued for the current
+			 * sequential run, cancel further advice until the next random
+			 * jump.  The kernel should be able to see the pattern now that
+			 * we're issuing sequential preadv() calls.
+			 */
+			if (stream->ios[io_index].op.blocknum == stream->seq_start)
+				stream->seq_start = InvalidBlockNumber;
 		}
 		else
 		{
-- 
2.39.5

v2-0004-Look-ahead-more-when-sequential-in-read_stream.c.patchtext/x-patch; charset=US-ASCII; name=v2-0004-Look-ahead-more-when-sequential-in-read_stream.c.patchDownload
From cdafc8a7ec64dd669ad9dff26a5e7a63fc64fee2 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Wed, 19 Feb 2025 01:25:40 +1300
Subject: [PATCH v2 4/4] Look ahead more when sequential in read_stream.c.

Previously, sequential reads would cause the look-ahead distance to
fall back to io_combine_limit, on the basis that kernel read-ahead
should start helping.  It also meant that we'd have to ramp the distance
back up when a sequential region was followed by a burst of random
jumps, with little hope of avoiding a stall, which is not a good
trade-off and is incompatible with AIO plans (you have to look ahead if
you have to start real I/O).

Simplify the algorithm: now only cache hits make the look-ahead distance
drop off, and cache misses still make it grow rapidly.  Random vs
sequential heuristics are no longer taken into consideration while
making that decision.

Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
---
 src/backend/storage/aio/read_stream.c | 92 ++++++++++-----------------
 1 file changed, 33 insertions(+), 59 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index a028217a08e..170ec805b4c 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -17,30 +17,12 @@
  * pending read.  When that isn't possible, the existing pending read is sent
  * to StartReadBuffers() so that a new one can begin to form.
  *
- * The algorithm for controlling the look-ahead distance tries to classify the
- * stream into three ideal behaviors:
+ * The algorithm for controlling the look-ahead distance is based on recent
+ * cache hits and misses:
  *
- * A) No I/O is necessary, because the requested blocks are fully cached
- * already.  There is no benefit to looking ahead more than one block, so
- * distance is 1.  This is the default initial assumption.
- *
- * B) I/O is necessary, but read-ahead advice is undesirable because the
- * access is sequential and we can rely on the kernel's read-ahead heuristics,
- * or impossible because direct I/O is enabled, or the system doesn't support
- * read-ahead advice.  There is no benefit in looking ahead more than
- * io_combine_limit, because in this case the only goal is larger read system
- * calls.  Looking further ahead would pin many buffers and perform
- * speculative work for no benefit.
- *
- * C) I/O is necessary, it appears to be random, and this system supports
- * read-ahead advice.  We'll look further ahead in order to reach the
- * configured level of I/O concurrency.
- *
- * The distance increases rapidly and decays slowly, so that it moves towards
- * those levels as different I/O patterns are discovered.  For example, a
- * sequential scan of fully cached data doesn't bother looking ahead, but a
- * sequential scan that hits a region of uncached blocks will start issuing
- * increasingly wide read calls until it plateaus at io_combine_limit.
+ * When no I/O is necessary, there is no point in looking ahead more than one
+ * block.  This is the default initial assumption.  Otherwise rapidly increase
+ * the distance to try to benefit from I/O combining and I/O concurrency.
  *
  * The main data structure is a circular queue of buffers of size
  * max_pinned_buffers plus some extra space for technical reasons, ready to be
@@ -329,7 +311,7 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	/* Remember whether we need to wait before returning this buffer. */
 	if (!need_wait)
 	{
-		/* Look-ahead distance decays, no I/O necessary (behavior A). */
+		/* Look-ahead distance decays, no I/O necessary. */
 		if (stream->distance > 1)
 			stream->distance--;
 	}
@@ -516,6 +498,15 @@ read_stream_begin_impl(int flags,
 	else
 		max_ios = get_tablespace_io_concurrency(tablespace_id);
 
+	/*
+	 * XXX Since we don't have asynchronous I/O yet, if direct I/O is enabled
+	 * then just behave as though I/O concurrency is set to 0.  Otherwise we
+	 * would look ahead pinning many buffers for no benefit, for lack of
+	 * advice and AIO.
+	 */
+	if (io_direct_flags & IO_DIRECT_DATA)
+		max_ios = 0;
+
 	/* Cap to INT16_MAX to avoid overflowing below */
 	max_ios = Min(max_ios, PG_INT16_MAX);
 
@@ -622,7 +613,7 @@ read_stream_begin_impl(int flags,
 	/*
 	 * Skip the initial ramp-up phase if the caller says we're going to be
 	 * reading the whole relation.  This way we start out assuming we'll be
-	 * doing full io_combine_limit sized reads (behavior B).
+	 * doing full io_combine_limit sized reads.
 	 */
 	if (flags & READ_STREAM_FULL)
 		stream->distance = Min(max_pinned_buffers, io_combine_limit);
@@ -713,10 +704,10 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 #ifndef READ_STREAM_DISABLE_FAST_PATH
 
 	/*
-	 * A fast path for all-cached scans (behavior A).  This is the same as the
-	 * usual algorithm, but it is specialized for no I/O and no per-buffer
-	 * data, so we can skip the queue management code, stay in the same buffer
-	 * slot and use singular StartReadBuffer().
+	 * A fast path for all-cached scans.  This is the same as the usual
+	 * algorithm, but it is specialized for no I/O and no per-buffer data, so
+	 * we can skip the queue management code, stay in the same buffer slot and
+	 * use singular StartReadBuffer().
 	 */
 	if (likely(stream->fast_path))
 	{
@@ -836,37 +827,20 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		if (++stream->oldest_io_index == stream->max_ios)
 			stream->oldest_io_index = 0;
 
-		if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
-		{
-			/* Distance ramps up fast (behavior C). */
-			distance = stream->distance * 2;
-			distance = Min(distance, stream->max_pinned_buffers);
-			stream->distance = distance;
+		/* Look-ahead distance ramps up quickly after we do I/O. */
+		distance = stream->distance * 2;
+		distance = Min(distance, stream->max_pinned_buffers);
+		stream->distance = distance;
 
-			/*
-			 * If we've caught up with the first advice issued for the current
-			 * sequential run, cancel further advice until the next random
-			 * jump.  The kernel should be able to see the pattern now that
-			 * we're issuing sequential preadv() calls.
-			 */
-			if (stream->ios[io_index].op.blocknum == stream->seq_start)
-				stream->seq_start = InvalidBlockNumber;
-		}
-		else
-		{
-			/* No advice; move towards io_combine_limit (behavior B). */
-			if (stream->distance > io_combine_limit)
-			{
-				stream->distance--;
-			}
-			else
-			{
-				distance = stream->distance * 2;
-				distance = Min(distance, io_combine_limit);
-				distance = Min(distance, stream->max_pinned_buffers);
-				stream->distance = distance;
-			}
-		}
+		/*
+		 * If we've caught up with the first advice issued for the current
+		 * sequential run, cancel further advice until the next random jump.
+		 * The kernel should be able to see the pattern now that we're issuing
+		 * sequential preadv() calls.
+		 */
+		if (stream->advice_enabled &&
+			stream->ios[io_index].op.blocknum == stream->seq_start)
+			stream->seq_start = InvalidBlockNumber;
 	}
 
 #ifdef CLOBBER_FREED_MEMORY
-- 
2.39.5

#10Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#9)
Re: Some read stream improvements

Hi,

On 2025-03-12 07:35:46 +1300, Thomas Munro wrote:

On Thu, Feb 27, 2025 at 11:20 PM Andres Freund <andres@anarazel.de> wrote:

On 2025-02-27 11:19:55 +1300, Thomas Munro wrote:

On Wed, Feb 26, 2025 at 10:55 PM Andres Freund <andres@anarazel.de> wrote:

I was working on expanding tests for AIO and as part of that wrote a test for
temp tables -- our coverage is fairly awful, there were many times during AIO
development where I knew I had trivially reachable temp table specific bugs
but all tests passed.

The test for that does trigger the problem described above and is fixed by the
patches in this thread (which I included in the other thread):

Here is a subset of those patches again:

1. Per-backend buffer limit, take III. Now the check is in
read_stream_start_pending_read() so TOC == TOU.

Annoyingly, test cases like the one below still fail, despite
following the rules. The other streams eat all the buffers and then
one gets an allowance of zero, but uses its right to take one pin
anyway to make progress, and there isn't one.

I think that may be ok. If there are no unpinned buffers, it seems to be
expected that starting a new stream will fail. That's not the same as - as we
did previously - failing in a read stream that did start successfully, because
we issue large IOs even though there are only a small number of unpinned
buffers.

I wonder if we should use temp_buffers - 100? Then leave the minimum GUC
value at 100 still, so you have an easy way to test with 0, 1,
... additional buffers?

I think that just makes it harder to test the exhaustion scenario without
really fixing anything?

2. It shouldn't give up issuing random advice immediately after a
jump, or it could stall on (say) the second 128kB of a 256kB
sequential chunk (ie the strace you showed on the BHS thread). It
only makes sense to assume kernel readahead takes over once you've
actually *read* sequentially. In practice this makes it a lot more
aggressive about advice (like the BHS code in master): it only gives
up if the whole look-ahead window is sequential.

3. Change the distance algorithm to care only about hits and misses,
not sequential heuristics. It made at least some sense before, but it
doesn't make sense for AIO, and even in synchronous mode it means that
you hit random jumps with insufficient look-ahead, so I don't think we
should keep it.

I also realised that the sequential heuristics are confused by that
hidden trailing block thing, so in contrived pattern testing with
hit-miss-hit-miss... would be considered sequential, and even if you
fix that (the forwarding patches above fix that), an exact
hit-miss-hit-miss pattern also gets stuck between distances 1 and 2
(double, decrement, double, ... might be worth waiting a bit longer
before decrementing, IDK.

I'll rebase the others and post soon.

+
+/* see GetAdditionalPinLimit() */
+uint32
+GetAdditionalLocalPinLimit(void)
+{
+	Assert(NLocalPinnedBuffers <= num_temp_buffers);
+	return num_temp_buffers - NLocalPinnedBuffers;
+}

This doesn't behave quite the way GetAdditionalPinLimit() does - it can return
0. Which makes some sense, pinning an additional buffer will always
fail. Perhaps worth calling out though?

static void
read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
{
while (stream->ios_in_progress < stream->max_ios &&
-		   stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
+		   ((stream->pinned_buffers == 0 && stream->distance > 0) ||
+			stream->pinned_buffers + stream->pending_read_nblocks < stream->distance))

What does the new "stream->pinned_buffers == 0 && stream->distance > 0" really
mean? And when would it be true when the pre-existing condition wouldn't
already be true?

{
BlockNumber blocknum;
int16 buffer_index;
void *per_buffer_data;

-		if (stream->pending_read_nblocks == io_combine_limit)
+		/* If have a pending read that can't be extended, start it now. */
+		Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
+			   stream->max_pinned_buffers);
+		if (stream->pending_read_nblocks == io_combine_limit ||
+			(stream->pinned_buffers == 0 &&
+			 stream->pending_read_nblocks == stream->max_pinned_buffers))
{
read_stream_start_pending_read(stream, suppress_advice);
suppress_advice = false;
@@ -360,14 +409,15 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
/* We have to start the pending read before we can build another. */
while (stream->pending_read_nblocks > 0)
{
-			read_stream_start_pending_read(stream, suppress_advice);
-			suppress_advice = false;
-			if (stream->ios_in_progress == stream->max_ios)
+			if (!read_stream_start_pending_read(stream, suppress_advice) ||
+				stream->ios_in_progress == stream->max_ios)
{
-				/* And we've hit the limit.  Rewind, and stop here. */
+				/* And we've hit a buffer or I/O limit.  Rewind and wait. */
read_stream_unget_block(stream, blocknum);
return;
}
+
+			suppress_advice = false;
}

If read_stream_start_pending_read() returns false because we hit the pin
limit, does it really help to call read_stream_unget_block()? IIUC that'll
defer one block for later - but what about the other buffers in a multi-block
read?

@@ -260,16 +261,30 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
else
Assert(stream->next_buffer_index == stream->oldest_buffer_index);

-	/*
-	 * If advice hasn't been suppressed, this system supports it, and this
-	 * isn't a strictly sequential pattern, then we'll issue advice.
-	 */
-	if (!suppress_advice &&
-		stream->advice_enabled &&
-		stream->pending_read_blocknum != stream->seq_blocknum)
+	/* Do we need to issue read-ahead advice? */
+	flags = 0;
+	if (stream->advice_enabled)
+	{
flags = READ_BUFFERS_ISSUE_ADVICE;
-	else
-		flags = 0;
+
+		if (stream->pending_read_blocknum == stream->seq_blocknum)
+		{
+			/*
+			 * Suppress advice if our WaitReadBuffers() calls have caught up
+			 * with the first advice we issued for this sequential run.
+			 */
+			if (stream->seq_start == InvalidBlockNumber)
+				suppress_advice = true;
+		}
+		else
+		{
+			/* Random jump, so start a new sequential run. */
+			stream->seq_start = stream->pending_read_blocknum;
+		}
+
+		if (suppress_advice)
+			flags = 0;
+	}

Seems a bit confusing to first set
flags = READ_BUFFERS_ISSUE_ADVICE
to then later unset it again. Maybe just set it in if (!suppress_advice)?

* Skip the initial ramp-up phase if the caller says we're going to be
@@ -825,6 +842,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
distance = stream->distance * 2;
distance = Min(distance, stream->max_pinned_buffers);
stream->distance = distance;
+
+			/*
+			 * If we've caught up with the first advice issued for the current
+			 * sequential run, cancel further advice until the next random
+			 * jump.  The kernel should be able to see the pattern now that
+			 * we're issuing sequential preadv() calls.
+			 */
+			if (stream->ios[io_index].op.blocknum == stream->seq_start)
+				stream->seq_start = InvalidBlockNumber;

So stream->seq_start doesn't really denote the start of sequentialness, it
denotes up to where the caller needs to process before we disable sequential
access. Maybe add a comment to it and rename it to something like
->seq_until_processed?

Other than this the approach seems to make sense!

Greetings,

Andres Freund

#11Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#10)
6 attachment(s)
Re: Some read stream improvements

On Wed, Mar 12, 2025 at 8:29 AM Andres Freund <andres@anarazel.de> wrote:

On 2025-03-12 07:35:46 +1300, Thomas Munro wrote:

On Thu, Feb 27, 2025 at 11:20 PM Andres Freund <andres@anarazel.de> wrote:

On 2025-02-27 11:19:55 +1300, Thomas Munro wrote:

I wonder if we should use temp_buffers - 100? Then leave the minimum GUC
value at 100 still, so you have an easy way to test with 0, 1,
... additional buffers?

I think that just makes it harder to test the exhaustion scenario without
really fixing anything?

Not sure I agree yet but I'll come back to this in a bit (I think
there might be something worth thinking about some more here but it's
not in the way of committing these patches).

+/* see GetAdditionalPinLimit() */
+uint32
+GetAdditionalLocalPinLimit(void)
+{
+     Assert(NLocalPinnedBuffers <= num_temp_buffers);
+     return num_temp_buffers - NLocalPinnedBuffers;
+}

This doesn't behave quite the way GetAdditionalPinLimit() does - it can return
0. Which makes some sense, pinning an additional buffer will always
fail. Perhaps worth calling out though?

No, GetAdditionalPinLimit() works that way too. It's only
LimitAdditional[Local]PinLimit() that has the special "you can always
have one" logic that I needed to escape from. But yes I should
highlight that in a comment: done above GetAdditionalPinLimit().
GetAdditionalLocalPinLimit() just tells you to see the shared version.

static void
read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
{
while (stream->ios_in_progress < stream->max_ios &&
-                stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
+                ((stream->pinned_buffers == 0 && stream->distance > 0) ||
+                     stream->pinned_buffers + stream->pending_read_nblocks < stream->distance))

What does the new "stream->pinned_buffers == 0 && stream->distance > 0" really
mean? And when would it be true when the pre-existing condition wouldn't
already be true?

Well the reason is basically that the distance can get chomped lower
than pending_read_nblocks if you recently hit the pin limit, and we
have to be able to start your I/O anyway (at least one block of it) to
make progress. But I realised that was a stupid way to handle that,
and actually I had screwed up further down, and the right way is just:

@@ -382,15 +435,25 @@ read_stream_look_ahead(ReadStream *stream, bool
suppress_advice)
         * io_combine_limit size once more buffers have been consumed.  However,
         * if we've already reached io_combine_limit, or we've reached the
         * distance limit and there isn't anything pinned yet, or the
callback has
-        * signaled end-of-stream, we start the read immediately.
+        * signaled end-of-stream, we start the read immediately.  Note that the
+        * pending read could even exceed the distance goal, if the latter was
+        * reduced on buffer limit exhaustion.
         */
        if (stream->pending_read_nblocks > 0 &&
                (stream->pending_read_nblocks == stream->io_combine_limit ||
-                (stream->pending_read_nblocks == stream->distance &&
+                (stream->pending_read_nblocks >= stream->distance &&
                  stream->pinned_buffers == 0) ||
                 stream->distance == 0) &&
                stream->ios_in_progress < stream->max_ios)
                read_stream_start_pending_read(stream, suppress_advice);

{
BlockNumber blocknum;
int16 buffer_index;
void *per_buffer_data;

-             if (stream->pending_read_nblocks == io_combine_limit)
+             /* If have a pending read that can't be extended, start it now. */
+             Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
+                        stream->max_pinned_buffers);
+             if (stream->pending_read_nblocks == io_combine_limit ||
+                     (stream->pinned_buffers == 0 &&
+                      stream->pending_read_nblocks == stream->max_pinned_buffers))
{
read_stream_start_pending_read(stream, suppress_advice);
suppress_advice = false;
@@ -360,14 +409,15 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
/* We have to start the pending read before we can build another. */
while (stream->pending_read_nblocks > 0)
{
-                     read_stream_start_pending_read(stream, suppress_advice);
-                     suppress_advice = false;
-                     if (stream->ios_in_progress == stream->max_ios)
+                     if (!read_stream_start_pending_read(stream, suppress_advice) ||
+                             stream->ios_in_progress == stream->max_ios)
{
-                             /* And we've hit the limit.  Rewind, and stop here. */
+                             /* And we've hit a buffer or I/O limit.  Rewind and wait. */
read_stream_unget_block(stream, blocknum);
return;
}
+
+                     suppress_advice = false;
}

If read_stream_start_pending_read() returns false because we hit the pin
limit, does it really help to call read_stream_unget_block()? IIUC that'll
defer one block for later - but what about the other buffers in a multi-block
read?

They are already represented by (pending_read_blocknum,
pending_read_nblocks). We were unable to append this block number to
the pending read because it's already full-sized or the newly acquired
block number isn't consecutive, but we were also unable to start the
pending read to get it out of the way. That was a pre-existing edge
case that you could already hit if it turned out to be a short read:
ie the remaining part of the pending read is still in your way, and
now you've reached stream->max_ios so you can't start it. So we had
to put it aside for later.

This change piggy-backs on that approach for buffer starvation:
read_stream_start_buffers() can now decline to start even a partial
read. In fact it usually declines, unless it is forced to accept a
short read because stream->pinned_buffers == 0 (ie, we have to do
whatever we can to make progress).

It's OK that pending_read_nblocks exceeds what we can start right now,
we still remember it, and we can always start it one block at a time
if it comes to it. Make sense?

@@ -260,16 +261,30 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
else
Assert(stream->next_buffer_index == stream->oldest_buffer_index);

-     /*
-      * If advice hasn't been suppressed, this system supports it, and this
-      * isn't a strictly sequential pattern, then we'll issue advice.
-      */
-     if (!suppress_advice &&
-             stream->advice_enabled &&
-             stream->pending_read_blocknum != stream->seq_blocknum)
+     /* Do we need to issue read-ahead advice? */
+     flags = 0;
+     if (stream->advice_enabled)
+     {
flags = READ_BUFFERS_ISSUE_ADVICE;
-     else
-             flags = 0;
+
+             if (stream->pending_read_blocknum == stream->seq_blocknum)
+             {
+                     /*
+                      * Suppress advice if our WaitReadBuffers() calls have caught up
+                      * with the first advice we issued for this sequential run.
+                      */
+                     if (stream->seq_start == InvalidBlockNumber)
+                             suppress_advice = true;
+             }
+             else
+             {
+                     /* Random jump, so start a new sequential run. */
+                     stream->seq_start = stream->pending_read_blocknum;
+             }
+
+             if (suppress_advice)
+                     flags = 0;
+     }

Seems a bit confusing to first set
flags = READ_BUFFERS_ISSUE_ADVICE
to then later unset it again. Maybe just set it in if (!suppress_advice)?

Yeah that was a bit too tangly. I found a better expression of that
logic, which also removed that annoying suppress_advice function
argument. I hope this is much clearer.

* Skip the initial ramp-up phase if the caller says we're going to be
@@ -825,6 +842,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
distance = stream->distance * 2;
distance = Min(distance, stream->max_pinned_buffers);
stream->distance = distance;
+
+                     /*
+                      * If we've caught up with the first advice issued for the current
+                      * sequential run, cancel further advice until the next random
+                      * jump.  The kernel should be able to see the pattern now that
+                      * we're issuing sequential preadv() calls.
+                      */
+                     if (stream->ios[io_index].op.blocknum == stream->seq_start)
+                             stream->seq_start = InvalidBlockNumber;

So stream->seq_start doesn't really denote the start of sequentialness, it
denotes up to where the caller needs to process before we disable sequential
access. Maybe add a comment to it and rename it to something like
->seq_until_processed?

WFM.

Other than this the approach seems to make sense!

Cool, so I'm planning to start pushing the earlier ones tomorrow.
Here also are the buffer forwarding ones, rebased on top.

Here's some strace art showing the old and new advice for patch 0003.
I traced ANALYZE io_combine_limit=8 and used different
default_statistics_targets to find interesting test cases. The
"bracket" on the right shows a sequential range of blocks. Master
only calls fadvise once per sequential chunk:

fadvise ●──────────────────────╮││ 3 0.000006
││╰─► pread 1 676..676 2 0.000007
fadvise ●─────────────────────╮││ 3 0.000006
││╰──► pread 1 678..678 2 0.000007
fadvise ●────────────────────╮││ 3 0.000007
││╰───► pread 3 680..682 2 0.000031
│╰────► pread 6 684..689 1 0.000015
╰─────► pread 8 691..698 ─╮ 0 0.000018
pread 8 699..706 │ 0 0.000016
fadvise ●────────────────────────╮ │ 1 0.000007
│ pread 8 707..714 │ 1 0.000019
│ pread 7 715..721 ─╯ 1 0.000017
╰─► pread 8 723..730 ─╮ 0 0.000016
pread 8 731..738 │ 0 0.000019
fadvise ●────────────────────────╮ │ 1 0.000007
│ pread 8 739..746 │ 1 0.000018
│ pread 5 747..751 ─╯ 1 0.000013

The patch can call three times when that's the configured I/O
concurrency level, because that controls when the pread() calls catch
up with the first block:

fadvise ●────────────────────╮││ 3 0.000007
││╰───► pread 2 255..256 2 0.000014
fadvise ●───────────────────╮││ 3 0.000007
││╰────► pread 8 258..265 ─╮ 2 0.000035
│╰─────► preadv 8 266..273 │ 1 0.000021
╰──────► pread 8 274..281 │ 0 0.000017
fadvise ●────────────────────────╮ │ 1 0.000007
│ pread 8 282..289 │ 1 0.000017
fadvise ●───────────────────────╮│ │ 2 0.000007
││ pread 6 290..295 ─╯ 2 0.000015
fadvise ●──────────────────────╮││ 3 0.000007
││╰─► pread 8 297..304 ─╮ 2 0.000015
fadvise ●─────────────────────╮││ │ 3 0.000007
││╰──► pread 8 305..312 │ 2 0.000017

Purely sequential streams still get none:

pread 1 0..0 ─╮ 0 0.000016
pread 2 1..2 │ 0 0.000014
pread 4 3..6 │ 0 0.000021
pread 8 7..14 │ 0 0.000034
... blah blah blah ...
pread 8 4455..4462 │ 0 0.000029
pread 8 4463..4470 │ 0 0.000026
pread 8 4471..4478 │ 0 0.000020
pread 1 4479..4479 ─╯ 0 0.000010

Attachments:

v3-0001-Improve-buffer-manager-API-for-backend-pin-limits.patchtext/x-patch; charset=US-ASCII; name=v3-0001-Improve-buffer-manager-API-for-backend-pin-limits.patchDownload
From a493ef860337641678ab1c39c48758d038481196 Mon Sep 17 00:00:00 2001
From: Thomas Munro <tmunro@postgresql.org>
Date: Thu, 27 Feb 2025 21:03:39 +1300
Subject: [PATCH v3 1/6] Improve buffer manager API for backend pin limits.

Previously the support functions assumed that the caller needed one pin
to make progress, and could optionally use some more.  Add a couple more
functions for callers that want to know:

* what the maximum possible number could be irrespective of currently
  held pins, for space planning purposes, called the "soft pin limit"

* how many additional pins they could acquire right now, without the
  special case allowing one pin, for users that already hold pins and
  could make progress even if zero extra pins are available

These APIs are better suited to read_stream.c, which will be improved in
a follow-up patch.  Also compute MaxProportionalPins up front, to avoid
performing division whenever we check the balance.

Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
---
 src/backend/storage/buffer/bufmgr.c   | 85 +++++++++++++++++++--------
 src/backend/storage/buffer/localbuf.c | 16 +++++
 src/include/storage/bufmgr.h          |  4 ++
 3 files changed, 80 insertions(+), 25 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 7915ed624c1..a6138e79306 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -211,6 +211,8 @@ static int32 PrivateRefCountOverflowed = 0;
 static uint32 PrivateRefCountClock = 0;
 static PrivateRefCountEntry *ReservedRefCountEntry = NULL;
 
+static uint32 MaxProportionalPins;
+
 static void ReservePrivateRefCountEntry(void);
 static PrivateRefCountEntry *NewPrivateRefCountEntry(Buffer buffer);
 static PrivateRefCountEntry *GetPrivateRefCountEntry(Buffer buffer, bool do_move);
@@ -2097,43 +2099,67 @@ again:
 	return buf;
 }
 
+/*
+ * Return the maximum number of buffer than this backend should try to pin at
+ * once, to avoid pinning more than its fair share.  This is the highest value
+ * that GetAdditionalPinLimit() and LimitAdditionalPins() could ever return.
+ *
+ * It's called a soft limit because nothing stops a backend from trying to
+ * acquire more pins than this if it needs them to make progress, but code that
+ * wants optional extra buffers for optimizations should respect this
+ * per-backend limit.
+ */
+uint32
+GetSoftPinLimit(void)
+{
+	return MaxProportionalPins;
+}
+
+/*
+ * Return the maximum number of additional buffers that this backend should
+ * pin if it wants to stay under the per-backend soft limit, considering the
+ * number of buffers it has already pinned.  Unlike LimitAdditionalPins(), the
+ * result can be zero, so the caller is expected to adjust it if required to
+ * make progress.
+ */
+uint32
+GetAdditionalPinLimit(void)
+{
+	uint32		estimated_pins_held;
+
+	/*
+	 * We get the number of "overflowed" pins for free, but don't know the
+	 * number of pins in PrivateRefCountArray.  The cost of calculating that
+	 * exactly doesn't seem worth it, so just assume the max.
+	 */
+	estimated_pins_held = PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES;
+
+	/* Is this backend already holding more than its fair share? */
+	if (estimated_pins_held > MaxProportionalPins)
+		return 0;
+
+	return MaxProportionalPins - estimated_pins_held;
+}
+
 /*
  * Limit the number of pins a batch operation may additionally acquire, to
  * avoid running out of pinnable buffers.
  *
- * One additional pin is always allowed, as otherwise the operation likely
- * cannot be performed at all.
- *
- * The number of allowed pins for a backend is computed based on
- * shared_buffers and the maximum number of connections possible. That's very
- * pessimistic, but outside of toy-sized shared_buffers it should allow
- * sufficient pins.
+ * One additional pin is always allowed, on the assumption that the operation
+ * requires at least one to make progress.
  */
 void
 LimitAdditionalPins(uint32 *additional_pins)
 {
-	uint32		max_backends;
-	int			max_proportional_pins;
+	uint32		limit;
 
 	if (*additional_pins <= 1)
 		return;
 
-	max_backends = MaxBackends + NUM_AUXILIARY_PROCS;
-	max_proportional_pins = NBuffers / max_backends;
-
-	/*
-	 * Subtract the approximate number of buffers already pinned by this
-	 * backend. We get the number of "overflowed" pins for free, but don't
-	 * know the number of pins in PrivateRefCountArray. The cost of
-	 * calculating that exactly doesn't seem worth it, so just assume the max.
-	 */
-	max_proportional_pins -= PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES;
-
-	if (max_proportional_pins <= 0)
-		max_proportional_pins = 1;
-
-	if (*additional_pins > max_proportional_pins)
-		*additional_pins = max_proportional_pins;
+	limit = GetAdditionalPinLimit();
+	limit = Max(limit, 1);
+	if (limit < *additional_pins)
+		*additional_pins = limit;
 }
 
 /*
@@ -3575,6 +3601,15 @@ InitBufferManagerAccess(void)
 {
 	HASHCTL		hash_ctl;
 
+	/*
+	 * The soft limit on the number of pins each backend should respect, based
+	 * on shared_buffers and the maximum number of connections possible.
+	 * That's very pessimistic, but outside toy-sized shared_buffers it should
+	 * allow plenty of pins.  LimitAdditionalPins() or GetAdditionalPinLimit()
+	 * can be used to check the remaining balance.
+	 */
+	MaxProportionalPins = NBuffers / (MaxBackends + NUM_AUXILIARY_PROCS);
+
 	memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray));
 
 	hash_ctl.keysize = sizeof(int32);
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 80b83444eb2..5378ba84316 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -286,6 +286,22 @@ GetLocalVictimBuffer(void)
 	return BufferDescriptorGetBuffer(bufHdr);
 }
 
+/* see GetSoftPinLimit() */
+uint32
+GetSoftLocalPinLimit(void)
+{
+	/* Every backend has its own temporary buffers, and can pin them all. */
+	return num_temp_buffers;
+}
+
+/* see GetAdditionalPinLimit() */
+uint32
+GetAdditionalLocalPinLimit(void)
+{
+	Assert(NLocalPinnedBuffers <= num_temp_buffers);
+	return num_temp_buffers - NLocalPinnedBuffers;
+}
+
 /* see LimitAdditionalPins() */
 void
 LimitAdditionalLocalPins(uint32 *additional_pins)
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index b204e4731c1..74b5afe8a1a 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -290,6 +290,10 @@ extern bool HoldingBufferPinThatDelaysRecovery(void);
 
 extern bool BgBufferSync(struct WritebackContext *wb_context);
 
+extern uint32 GetSoftPinLimit(void);
+extern uint32 GetSoftLocalPinLimit(void);
+extern uint32 GetAdditionalPinLimit(void);
+extern uint32 GetAdditionalLocalPinLimit(void);
 extern void LimitAdditionalPins(uint32 *additional_pins);
 extern void LimitAdditionalLocalPins(uint32 *additional_pins);
 
-- 
2.39.5

v3-0002-Respect-pin-limits-accurately-in-read_stream.c.patchtext/x-patch; charset=US-ASCII; name=v3-0002-Respect-pin-limits-accurately-in-read_stream.c.patchDownload
From b168a9a7369302bcd92288d48cde8e0b6792760d Mon Sep 17 00:00:00 2001
From: Thomas Munro <tmunro@postgresql.org>
Date: Thu, 27 Feb 2025 21:42:05 +1300
Subject: [PATCH v3 2/6] Respect pin limits accurately in read_stream.c.

To avoid pinning too much of the buffer pool at once, we previously used
LimitAdditionalBuffers().  The coding was naive, and only considered the
available buffers at stream construction time.

This commit checks at the time of use with new buffer manager APIs.  The
result might change dynamically due to pins acquired outside this stream
by the same backend.  No extra CPU cycles are added to the all-buffered
fast-path code, but the I/O-starting path now considers the up-to-date
remaining buffer limit when making look-ahead decisions.

In practice it was very difficult to exceed the limit in v17, so no
back-patch, but changes due to land soon make it easy.

Per code review from Andres, in the course of testing his AIO patches.

Reviewed-by: Andres Freund <andres@anarazel.de>
Reported-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
---
 src/backend/storage/aio/read_stream.c | 104 ++++++++++++++++++++++----
 1 file changed, 90 insertions(+), 14 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 36fb9fe152c..11ee16ec228 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -116,6 +116,7 @@ struct ReadStream
 	int16		pinned_buffers;
 	int16		distance;
 	bool		advice_enabled;
+	bool		temporary;
 
 	/*
 	 * One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -225,7 +226,17 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
 	stream->buffered_blocknum = blocknum;
 }
 
-static void
+/*
+ * Start as much of the current pending read as we can.  If we have to split it
+ * because of the per-backend buffer limit, or the buffer manager decides to
+ * split it, then the pending read is adjusted to hold the remaining portion.
+ *
+ * We can always start a read of at least size one if we have no progress yet.
+ * Otherwise it's possible that we can't start a read at all because of a lack
+ * of buffers, and then false is returned.  Buffer shortages also reduce the
+ * distance to a level that prevents look-ahead until buffers are released.
+ */
+static bool
 read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 {
 	bool		need_wait;
@@ -234,12 +245,13 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	int16		io_index;
 	int16		overflow;
 	int16		buffer_index;
+	int16		buffer_limit;
 
 	/* This should only be called with a pending read. */
 	Assert(stream->pending_read_nblocks > 0);
 	Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
 
-	/* We had better not exceed the pin limit by starting this read. */
+	/* We had better not exceed the per-stream buffer limit with this read. */
 	Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
 		   stream->max_pinned_buffers);
 
@@ -260,10 +272,39 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	else
 		flags = 0;
 
-	/* We say how many blocks we want to read, but may be smaller on return. */
+	/* Compute the remaining portion of the per-backend buffer limit. */
+	if (stream->temporary)
+		buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
+	else
+		buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
+	if (buffer_limit == 0 && stream->pinned_buffers == 0)
+		buffer_limit = 1;		/* guarantee progress */
+
+	/* Does the per-backend buffer limit affect this read? */
+	nblocks = stream->pending_read_nblocks;
+	if (buffer_limit < nblocks)
+	{
+		int16		new_distance;
+
+		/* Shrink distance: no more look-ahead until buffers are released. */
+		new_distance = stream->pinned_buffers + buffer_limit;
+		if (stream->distance > new_distance)
+			stream->distance = new_distance;
+
+		/* If we've already made progress, just give up and wait for buffers. */
+		if (stream->pinned_buffers > 0)
+			return false;
+
+		/* A short read is required to make progress. */
+		nblocks = buffer_limit;
+	}
+
+	/*
+	 * We say how many blocks we want to read, but it may be smaller on return
+	 * if the buffer manager decides it needs a short read at its level.
+	 */
 	buffer_index = stream->next_buffer_index;
 	io_index = stream->next_io_index;
-	nblocks = stream->pending_read_nblocks;
 	need_wait = StartReadBuffers(&stream->ios[io_index].op,
 								 &stream->buffers[buffer_index],
 								 stream->pending_read_blocknum,
@@ -313,6 +354,8 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	/* Adjust the pending read to cover the remaining portion, if any. */
 	stream->pending_read_blocknum += nblocks;
 	stream->pending_read_nblocks -= nblocks;
+
+	return true;
 }
 
 static void
@@ -361,14 +404,15 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 		/* We have to start the pending read before we can build another. */
 		while (stream->pending_read_nblocks > 0)
 		{
-			read_stream_start_pending_read(stream, suppress_advice);
-			suppress_advice = false;
-			if (stream->ios_in_progress == stream->max_ios)
+			if (!read_stream_start_pending_read(stream, suppress_advice) ||
+				stream->ios_in_progress == stream->max_ios)
 			{
-				/* And we've hit the limit.  Rewind, and stop here. */
+				/* And we've hit a buffer or I/O limit.  Rewind and wait. */
 				read_stream_unget_block(stream, blocknum);
 				return;
 			}
+
+			suppress_advice = false;
 		}
 
 		/* This is the start of a new pending read. */
@@ -382,15 +426,25 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 	 * io_combine_limit size once more buffers have been consumed.  However,
 	 * if we've already reached io_combine_limit, or we've reached the
 	 * distance limit and there isn't anything pinned yet, or the callback has
-	 * signaled end-of-stream, we start the read immediately.
+	 * signaled end-of-stream, we start the read immediately.  Note that the
+	 * pending read could even exceed the distance goal, if the latter was
+	 * reduced on buffer limit exhaustion.
 	 */
 	if (stream->pending_read_nblocks > 0 &&
 		(stream->pending_read_nblocks == stream->io_combine_limit ||
-		 (stream->pending_read_nblocks == stream->distance &&
+		 (stream->pending_read_nblocks >= stream->distance &&
 		  stream->pinned_buffers == 0) ||
 		 stream->distance == 0) &&
 		stream->ios_in_progress < stream->max_ios)
 		read_stream_start_pending_read(stream, suppress_advice);
+
+	/*
+	 * There should always be something pinned when we leave this function,
+	 * whether started by this call or not, unless we've hit the end of the
+	 * stream.  In the worst case we can always make progress one buffer at a
+	 * time.
+	 */
+	Assert(stream->pinned_buffers > 0 || stream->distance == 0);
 }
 
 /*
@@ -420,6 +474,7 @@ read_stream_begin_impl(int flags,
 	int			max_ios;
 	int			strategy_pin_limit;
 	uint32		max_pinned_buffers;
+	uint32		max_possible_buffer_limit;
 	Oid			tablespace_id;
 
 	/*
@@ -475,12 +530,23 @@ read_stream_begin_impl(int flags,
 	strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
 	max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
 
-	/* Don't allow this backend to pin more than its share of buffers. */
+	/*
+	 * Also limit our queue to the maximum number of pins we could possibly
+	 * ever be allowed to acquire according to the buffer manager.  We may not
+	 * really be able to use them all due to other pins held by this backend,
+	 * but we'll check that later in read_stream_start_pending_read().
+	 */
 	if (SmgrIsTemp(smgr))
-		LimitAdditionalLocalPins(&max_pinned_buffers);
+		max_possible_buffer_limit = GetSoftLocalPinLimit();
 	else
-		LimitAdditionalPins(&max_pinned_buffers);
-	Assert(max_pinned_buffers > 0);
+		max_possible_buffer_limit = GetSoftPinLimit();
+	max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
+
+	/*
+	 * The soft limit might be zero on a system configured with more
+	 * connections than buffers.  We need at least one to make progress.
+	 */
+	max_pinned_buffers = Max(1, max_pinned_buffers);
 
 	/*
 	 * We need one extra entry for buffers and per-buffer data, because users
@@ -546,6 +612,7 @@ read_stream_begin_impl(int flags,
 	stream->callback = callback;
 	stream->callback_private_data = callback_private_data;
 	stream->buffered_blocknum = InvalidBlockNumber;
+	stream->temporary = SmgrIsTemp(smgr);
 
 	/*
 	 * Skip the initial ramp-up phase if the caller says we're going to be
@@ -674,6 +741,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			 * arbitrary I/O entry (they're all free).  We don't have to
 			 * adjust pinned_buffers because we're transferring one to caller
 			 * but pinning one more.
+			 *
+			 * In the fast path we don't need to check the pin limit.  We're
+			 * always allowed at least one pin so that progress can be made,
+			 * and that's all we need here.  Although two pins are momentarily
+			 * held at the same time, the model used here is that the stream
+			 * holds only one, and the other now belongs to the caller.
 			 */
 			if (likely(!StartReadBuffer(&stream->ios[0].op,
 										&stream->buffers[oldest_buffer_index],
@@ -874,6 +947,9 @@ read_stream_reset(ReadStream *stream)
 	stream->buffered_blocknum = InvalidBlockNumber;
 	stream->fast_path = false;
 
+	/* There is no point in reading whatever was pending. */
+	stream->pending_read_nblocks = 0;
+
 	/* Unpin anything that wasn't consumed. */
 	while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
 		ReleaseBuffer(buffer);
-- 
2.39.5

v3-0003-Improve-read-stream-advice-for-large-random-chunk.patchtext/x-patch; charset=US-ASCII; name=v3-0003-Improve-read-stream-advice-for-large-random-chunk.patchDownload
From 2dd3d12fa75248bf8473b4b69884dd056bdc9163 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 18 Feb 2025 15:59:13 +1300
Subject: [PATCH v3 3/6] Improve read stream advice for large random chunks.

read_stream.c tries not to issue advice when it thinks the kernel's
readahead should be active, ie when using buffered I/O and reading
sequential blocks.  It previously gave up a little too easily: it should
issue advice until it has started running sequential pread() calls, not
just when it's planning to.  The simpler strategy worked for random
chunks of size <= io_combine_limit and entirely sequential streams, but
so not well when reading random chunks > io_combine limit.  For example,
a 256kB chunk of sequential data would benefit from only one fadvise(),
but (assuming io_combine_limit=128kB) could suffer an I/O stall for the
second half of it.

Keep issuing advice until the pread() calls catch up with the start of
the region we're currently issuing advice for, if ever.  In practice, if
there are any jumps in the lookahead window, we'll never stop issuing
advice, and if the whole lookahead window becomes sequential we'll
finally stop issuing advice.

Discovered by Tomas Vondra's regression testing of many data clustering
patterns using Melanie Plageman's streaming Bitmap Heap Scan patch, with
analysis of the I/O stall-producing pattern from Andres Freund.

Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
Discussion: https://postgr.es/m/CA%2BhUKGJ3HSWciQCz8ekP1Zn7N213RfA4nbuotQawfpq23%2Bw-5Q%40mail.gmail.com
---
 src/backend/storage/aio/read_stream.c | 71 +++++++++++++++++++--------
 1 file changed, 50 insertions(+), 21 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 11ee16ec228..a8a96baf8c1 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -133,6 +133,7 @@ struct ReadStream
 
 	/* Next expected block, for detecting sequential access. */
 	BlockNumber seq_blocknum;
+	BlockNumber seq_until_processed;
 
 	/* The read operation we are currently preparing. */
 	BlockNumber pending_read_blocknum;
@@ -237,11 +238,11 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
  * distance to a level that prevents look-ahead until buffers are released.
  */
 static bool
-read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
+read_stream_start_pending_read(ReadStream *stream)
 {
 	bool		need_wait;
 	int			nblocks;
-	int			flags;
+	int			flags = 0;
 	int16		io_index;
 	int16		overflow;
 	int16		buffer_index;
@@ -261,16 +262,36 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	else
 		Assert(stream->next_buffer_index == stream->oldest_buffer_index);
 
-	/*
-	 * If advice hasn't been suppressed, this system supports it, and this
-	 * isn't a strictly sequential pattern, then we'll issue advice.
-	 */
-	if (!suppress_advice &&
-		stream->advice_enabled &&
-		stream->pending_read_blocknum != stream->seq_blocknum)
-		flags = READ_BUFFERS_ISSUE_ADVICE;
-	else
-		flags = 0;
+	/* Do we need to issue read-ahead advice? */
+	if (stream->advice_enabled)
+	{
+		bool		no_wait;
+
+		/*
+		 * We only issue advice if we won't immediately have to call
+		 * WaitReadBuffers().
+		 */
+		no_wait = stream->pinned_buffers > 0 ||
+			stream->pending_read_nblocks < stream->distance;
+
+		if (stream->pending_read_blocknum == stream->seq_blocknum)
+		{
+			/*
+			 * Sequential: issue advice only until the WaitReadBuffers() calls
+			 * catch up with the first advice issued for this sequential
+			 * region, so the kernel can see sequential access.
+			 */
+			if (stream->seq_until_processed != InvalidBlockNumber && no_wait)
+				flags = READ_BUFFERS_ISSUE_ADVICE;
+		}
+		else
+		{
+			/* Random jump: start tracking new region. */
+			stream->seq_until_processed = stream->pending_read_blocknum;
+			if (no_wait)
+				flags = READ_BUFFERS_ISSUE_ADVICE;
+		}
+	}
 
 	/* Compute the remaining portion of the per-backend buffer limit. */
 	if (stream->temporary)
@@ -359,7 +380,7 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 }
 
 static void
-read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
+read_stream_look_ahead(ReadStream *stream)
 {
 	while (stream->ios_in_progress < stream->max_ios &&
 		   stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
@@ -370,8 +391,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 
 		if (stream->pending_read_nblocks == stream->io_combine_limit)
 		{
-			read_stream_start_pending_read(stream, suppress_advice);
-			suppress_advice = false;
+			read_stream_start_pending_read(stream);
 			continue;
 		}
 
@@ -404,15 +424,13 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 		/* We have to start the pending read before we can build another. */
 		while (stream->pending_read_nblocks > 0)
 		{
-			if (!read_stream_start_pending_read(stream, suppress_advice) ||
+			if (!read_stream_start_pending_read(stream) ||
 				stream->ios_in_progress == stream->max_ios)
 			{
 				/* And we've hit a buffer or I/O limit.  Rewind and wait. */
 				read_stream_unget_block(stream, blocknum);
 				return;
 			}
-
-			suppress_advice = false;
 		}
 
 		/* This is the start of a new pending read. */
@@ -436,7 +454,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 		  stream->pinned_buffers == 0) ||
 		 stream->distance == 0) &&
 		stream->ios_in_progress < stream->max_ios)
-		read_stream_start_pending_read(stream, suppress_advice);
+		read_stream_start_pending_read(stream);
 
 	/*
 	 * There should always be something pinned when we leave this function,
@@ -612,6 +630,8 @@ read_stream_begin_impl(int flags,
 	stream->callback = callback;
 	stream->callback_private_data = callback_private_data;
 	stream->buffered_blocknum = InvalidBlockNumber;
+	stream->seq_blocknum = InvalidBlockNumber;
+	stream->seq_until_processed = InvalidBlockNumber;
 	stream->temporary = SmgrIsTemp(smgr);
 
 	/*
@@ -792,7 +812,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		 * space for more, but if we're just starting up we'll need to crank
 		 * the handle to get started.
 		 */
-		read_stream_look_ahead(stream, true);
+		read_stream_look_ahead(stream);
 
 		/* End of stream reached? */
 		if (stream->pinned_buffers == 0)
@@ -837,6 +857,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			distance = stream->distance * 2;
 			distance = Min(distance, stream->max_pinned_buffers);
 			stream->distance = distance;
+
+			/*
+			 * If we've caught up with the first advice issued for the current
+			 * sequential region, cancel further advice until the next random
+			 * jump.  The kernel should be able to see the pattern now that
+			 * we're actually making sequential preadv() calls.
+			 */
+			if (stream->ios[io_index].op.blocknum == stream->seq_until_processed)
+				stream->seq_until_processed = InvalidBlockNumber;
 		}
 		else
 		{
@@ -898,7 +927,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		stream->oldest_buffer_index = 0;
 
 	/* Prepare for the next call. */
-	read_stream_look_ahead(stream, false);
+	read_stream_look_ahead(stream);
 
 #ifndef READ_STREAM_DISABLE_FAST_PATH
 	/* See if we can take the fast path for all-cached scans next time. */
-- 
2.39.5

v3-0004-Look-ahead-more-when-sequential-in-read_stream.c.patchtext/x-patch; charset=US-ASCII; name=v3-0004-Look-ahead-more-when-sequential-in-read_stream.c.patchDownload
From 5457ef3b17fd28be63c1ba31fcfc1d845a3010ca Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Wed, 19 Feb 2025 01:25:40 +1300
Subject: [PATCH v3 4/6] Look ahead more when sequential in read_stream.c.

Previously, sequential reads would cause the look-ahead distance to
fall back to io_combine_limit, on the basis that kernel read-ahead
should start helping.  It also meant that we'd have to ramp the distance
back up when a sequential region was followed by a burst of random
jumps, with little hope of avoiding a stall, which is not a good
trade-off and is incompatible with AIO plans (you have to look ahead if
you have to start real I/O).

Simplify the algorithm: now only cache hits make the look-ahead distance
drop off, and cache misses still make it grow rapidly.  Random vs
sequential heuristics are no longer taken into consideration while
making that decision.

Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
---
 src/backend/storage/aio/read_stream.c | 92 ++++++++++-----------------
 1 file changed, 33 insertions(+), 59 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index a8a96baf8c1..57cde89cfdc 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -17,30 +17,12 @@
  * pending read.  When that isn't possible, the existing pending read is sent
  * to StartReadBuffers() so that a new one can begin to form.
  *
- * The algorithm for controlling the look-ahead distance tries to classify the
- * stream into three ideal behaviors:
+ * The algorithm for controlling the look-ahead distance is based on recent
+ * cache hits and misses:
  *
- * A) No I/O is necessary, because the requested blocks are fully cached
- * already.  There is no benefit to looking ahead more than one block, so
- * distance is 1.  This is the default initial assumption.
- *
- * B) I/O is necessary, but read-ahead advice is undesirable because the
- * access is sequential and we can rely on the kernel's read-ahead heuristics,
- * or impossible because direct I/O is enabled, or the system doesn't support
- * read-ahead advice.  There is no benefit in looking ahead more than
- * io_combine_limit, because in this case the only goal is larger read system
- * calls.  Looking further ahead would pin many buffers and perform
- * speculative work for no benefit.
- *
- * C) I/O is necessary, it appears to be random, and this system supports
- * read-ahead advice.  We'll look further ahead in order to reach the
- * configured level of I/O concurrency.
- *
- * The distance increases rapidly and decays slowly, so that it moves towards
- * those levels as different I/O patterns are discovered.  For example, a
- * sequential scan of fully cached data doesn't bother looking ahead, but a
- * sequential scan that hits a region of uncached blocks will start issuing
- * increasingly wide read calls until it plateaus at io_combine_limit.
+ * When no I/O is necessary, there is no point in looking ahead more than one
+ * block.  This is the default initial assumption.  Otherwise rapidly increase
+ * the distance to try to benefit from I/O combining and I/O concurrency.
  *
  * The main data structure is a circular queue of buffers of size
  * max_pinned_buffers plus some extra space for technical reasons, ready to be
@@ -336,7 +318,7 @@ read_stream_start_pending_read(ReadStream *stream)
 	/* Remember whether we need to wait before returning this buffer. */
 	if (!need_wait)
 	{
-		/* Look-ahead distance decays, no I/O necessary (behavior A). */
+		/* Look-ahead distance decays, no I/O necessary. */
 		if (stream->distance > 1)
 			stream->distance--;
 	}
@@ -517,6 +499,15 @@ read_stream_begin_impl(int flags,
 	else
 		max_ios = get_tablespace_io_concurrency(tablespace_id);
 
+	/*
+	 * XXX Since we don't have asynchronous I/O yet, if direct I/O is enabled
+	 * then just behave as though I/O concurrency is set to 0.  Otherwise we
+	 * would look ahead pinning many buffers for no benefit, for lack of
+	 * advice and AIO.
+	 */
+	if (io_direct_flags & IO_DIRECT_DATA)
+		max_ios = 0;
+
 	/* Cap to INT16_MAX to avoid overflowing below */
 	max_ios = Min(max_ios, PG_INT16_MAX);
 
@@ -637,7 +628,7 @@ read_stream_begin_impl(int flags,
 	/*
 	 * Skip the initial ramp-up phase if the caller says we're going to be
 	 * reading the whole relation.  This way we start out assuming we'll be
-	 * doing full io_combine_limit sized reads (behavior B).
+	 * doing full io_combine_limit sized reads.
 	 */
 	if (flags & READ_STREAM_FULL)
 		stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
@@ -728,10 +719,10 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 #ifndef READ_STREAM_DISABLE_FAST_PATH
 
 	/*
-	 * A fast path for all-cached scans (behavior A).  This is the same as the
-	 * usual algorithm, but it is specialized for no I/O and no per-buffer
-	 * data, so we can skip the queue management code, stay in the same buffer
-	 * slot and use singular StartReadBuffer().
+	 * A fast path for all-cached scans.  This is the same as the usual
+	 * algorithm, but it is specialized for no I/O and no per-buffer data, so
+	 * we can skip the queue management code, stay in the same buffer slot and
+	 * use singular StartReadBuffer().
 	 */
 	if (likely(stream->fast_path))
 	{
@@ -851,37 +842,20 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		if (++stream->oldest_io_index == stream->max_ios)
 			stream->oldest_io_index = 0;
 
-		if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
-		{
-			/* Distance ramps up fast (behavior C). */
-			distance = stream->distance * 2;
-			distance = Min(distance, stream->max_pinned_buffers);
-			stream->distance = distance;
+		/* Look-ahead distance ramps up quickly after we do I/O. */
+		distance = stream->distance * 2;
+		distance = Min(distance, stream->max_pinned_buffers);
+		stream->distance = distance;
 
-			/*
-			 * If we've caught up with the first advice issued for the current
-			 * sequential region, cancel further advice until the next random
-			 * jump.  The kernel should be able to see the pattern now that
-			 * we're actually making sequential preadv() calls.
-			 */
-			if (stream->ios[io_index].op.blocknum == stream->seq_until_processed)
-				stream->seq_until_processed = InvalidBlockNumber;
-		}
-		else
-		{
-			/* No advice; move towards io_combine_limit (behavior B). */
-			if (stream->distance > stream->io_combine_limit)
-			{
-				stream->distance--;
-			}
-			else
-			{
-				distance = stream->distance * 2;
-				distance = Min(distance, stream->io_combine_limit);
-				distance = Min(distance, stream->max_pinned_buffers);
-				stream->distance = distance;
-			}
-		}
+		/*
+		 * If we've caught up with the first advice issued for the current
+		 * sequential region, cancel further advice until the next random
+		 * jump.  The kernel should be able to see the pattern now that we're
+		 * actually making sequential preadv() calls.
+		 */
+		if (stream->advice_enabled &&
+			stream->ios[io_index].op.blocknum == stream->seq_until_processed)
+			stream->seq_until_processed = InvalidBlockNumber;
 	}
 
 #ifdef CLOBBER_FREED_MEMORY
-- 
2.39.5

v3-0005-Support-buffer-forwarding-in-read_stream.c.patchtext/x-patch; charset=US-ASCII; name=v3-0005-Support-buffer-forwarding-in-read_stream.c.patchDownload
From b8573757a4c61d72004e805974dc2a8900ed3eef Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Thu, 30 Jan 2025 11:42:03 +1300
Subject: [PATCH v3 5/6] Support buffer forwarding in read_stream.c.

In preparation for a following change to the buffer manager, teach read
stream to keep track of buffers that were "forwarded" from one call to
StartReadBuffers() to the next.

Since StartReadBuffers() buffers argument will become an in/out
argument, we need to initialize the buffer queue entries with
InvalidBuffer.  We don't want to do that up front, because we try to
keep stream initialization cheap and code that uses the fast path stays
in one single buffer queue element.  Satisfy both goals by initializing
the queue incrementally on the first cycle.

Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
---
 src/backend/storage/aio/read_stream.c | 102 +++++++++++++++++++++++---
 1 file changed, 92 insertions(+), 10 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 57cde89cfdc..51c15330117 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -95,8 +95,10 @@ struct ReadStream
 	int16		ios_in_progress;
 	int16		queue_size;
 	int16		max_pinned_buffers;
+	int16		forwarded_buffers;
 	int16		pinned_buffers;
 	int16		distance;
+	int16		initialized_buffers;
 	bool		advice_enabled;
 	bool		temporary;
 
@@ -223,8 +225,10 @@ static bool
 read_stream_start_pending_read(ReadStream *stream)
 {
 	bool		need_wait;
+	int			requested_nblocks;
 	int			nblocks;
 	int			flags = 0;
+	int			forwarded;
 	int16		io_index;
 	int16		overflow;
 	int16		buffer_index;
@@ -275,11 +279,19 @@ read_stream_start_pending_read(ReadStream *stream)
 		}
 	}
 
-	/* Compute the remaining portion of the per-backend buffer limit. */
+	/*
+	 * Compute the remaining portion of the per-backend buffer limit.  If we
+	 * already have some forwarded buffers, we can certainly use those.  They
+	 * are already pinned, and are mapped to the starting blocks of the pending
+	 * read, they just don't have any I/O started yet and are not counted in
+	 * stream->pinned_buffers.
+	 */
 	if (stream->temporary)
 		buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
 	else
 		buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
+	Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
+	buffer_limit += stream->forwarded_buffers;
 	if (buffer_limit == 0 && stream->pinned_buffers == 0)
 		buffer_limit = 1;		/* guarantee progress */
 
@@ -306,8 +318,31 @@ read_stream_start_pending_read(ReadStream *stream)
 	 * We say how many blocks we want to read, but it may be smaller on return
 	 * if the buffer manager decides it needs a short read at its level.
 	 */
+	requested_nblocks = Min(buffer_limit, stream->pending_read_nblocks);
+	nblocks = requested_nblocks;
 	buffer_index = stream->next_buffer_index;
 	io_index = stream->next_io_index;
+
+	/*
+	 * The first time around the queue we initialize it as we go, including
+	 * the overflow zone, because otherwise the entries would appear as
+	 * forwarded buffers.  This avoids initializing the whole queue up front
+	 * in cases where it is large but we don't ever use it due to the
+	 * all-cached fast path or small scans.
+	 */
+	while (stream->initialized_buffers < buffer_index + nblocks)
+		stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
+
+	/*
+	 * Start the I/O.  Any buffers that are not InvalidBuffer will be
+	 * interpreted as already pinned, forwarded by an earlier call to
+	 * StartReadBuffers(), and must map to the expected blocks.  The nblocks
+	 * value may be smaller on return indicating the size of the I/O that
+	 * could be started.  Buffers beyond the output nblocks number may also
+	 * have been pinned without starting I/O due to various edge cases.  In
+	 * that case we'll just leave them in the queue ahead of us, "forwarded"
+	 * to the next call, avoiding the need to unpin/repin.
+	 */
 	need_wait = StartReadBuffers(&stream->ios[io_index].op,
 								 &stream->buffers[buffer_index],
 								 stream->pending_read_blocknum,
@@ -336,16 +371,35 @@ read_stream_start_pending_read(ReadStream *stream)
 		stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
 	}
 
+	/*
+	 * How many pins were acquired but forwarded to the next call?  These need
+	 * to be passed to the next StartReadBuffers() call, or released if the
+	 * stream ends early.  We need the number for accounting purposes, since
+	 * they are not counted in stream->pinned_buffers but we already hold
+	 * them.
+	 */
+	forwarded = 0;
+	while (nblocks + forwarded < requested_nblocks &&
+		   stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
+		forwarded++;
+	stream->forwarded_buffers = forwarded;
+
 	/*
 	 * We gave a contiguous range of buffer space to StartReadBuffers(), but
-	 * we want it to wrap around at queue_size.  Slide overflowing buffers to
-	 * the front of the array.
+	 * we want it to wrap around at queue_size.  Copy overflowing buffers to
+	 * the front of the array where they'll be consumed, but also leave a copy
+	 * in the overflow zone which the I/O operation has a pointer to (it needs
+	 * a contiguous array).  Both copies will be cleared when the buffers are
+	 * handed to the consumer.
 	 */
-	overflow = (buffer_index + nblocks) - stream->queue_size;
+	overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
 	if (overflow > 0)
-		memmove(&stream->buffers[0],
-				&stream->buffers[stream->queue_size],
-				sizeof(stream->buffers[0]) * overflow);
+	{
+		Assert(overflow < stream->queue_size);	/* can't overlap */
+		memcpy(&stream->buffers[0],
+			   &stream->buffers[stream->queue_size],
+			   sizeof(stream->buffers[0]) * overflow);
+	}
 
 	/* Compute location of start of next read, without using % operator. */
 	buffer_index += nblocks;
@@ -730,10 +784,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 
 		/* Fast path assumptions. */
 		Assert(stream->ios_in_progress == 0);
+		Assert(stream->forwarded_buffers == 0);
 		Assert(stream->pinned_buffers == 1);
 		Assert(stream->distance == 1);
 		Assert(stream->pending_read_nblocks == 0);
 		Assert(stream->per_buffer_data_size == 0);
+		Assert(stream->initialized_buffers > stream->oldest_buffer_index);
 
 		/* We're going to return the buffer we pinned last time. */
 		oldest_buffer_index = stream->oldest_buffer_index;
@@ -782,6 +838,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			stream->distance = 0;
 			stream->oldest_buffer_index = stream->next_buffer_index;
 			stream->pinned_buffers = 0;
+			stream->buffers[oldest_buffer_index] = InvalidBuffer;
 		}
 
 		stream->fast_path = false;
@@ -858,10 +915,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			stream->seq_until_processed = InvalidBlockNumber;
 	}
 
-#ifdef CLOBBER_FREED_MEMORY
-	/* Clobber old buffer for debugging purposes. */
+	/*
+	 * We must zap this queue entry, or else it would appear as a forwarded
+	 * buffer.  If it's potentially in the overflow zone (ie it wrapped around
+	 * the queue), also zap that copy.
+	 */
 	stream->buffers[oldest_buffer_index] = InvalidBuffer;
-#endif
+	if (oldest_buffer_index < io_combine_limit - 1)
+		stream->buffers[stream->queue_size + oldest_buffer_index] =
+			InvalidBuffer;
 
 #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
 
@@ -906,6 +968,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 #ifndef READ_STREAM_DISABLE_FAST_PATH
 	/* See if we can take the fast path for all-cached scans next time. */
 	if (stream->ios_in_progress == 0 &&
+		stream->forwarded_buffers == 0 &&
 		stream->pinned_buffers == 1 &&
 		stream->distance == 1 &&
 		stream->pending_read_nblocks == 0 &&
@@ -941,6 +1004,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 void
 read_stream_reset(ReadStream *stream)
 {
+	int16		index;
 	Buffer		buffer;
 
 	/* Stop looking ahead. */
@@ -957,6 +1021,24 @@ read_stream_reset(ReadStream *stream)
 	while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
 		ReleaseBuffer(buffer);
 
+	/* Unpin any unused forwarded buffers. */
+	index = stream->next_buffer_index;
+	while (index < stream->initialized_buffers &&
+		   (buffer = stream->buffers[index]) != InvalidBuffer)
+	{
+		Assert(stream->forwarded_buffers > 0);
+		stream->forwarded_buffers--;
+		ReleaseBuffer(buffer);
+
+		stream->buffers[index] = InvalidBuffer;
+		if (index < io_combine_limit - 1)
+			stream->buffers[stream->queue_size + index] = InvalidBuffer;
+
+		if (++index == stream->queue_size)
+			index = 0;
+	}
+
+	Assert(stream->forwarded_buffers == 0);
 	Assert(stream->pinned_buffers == 0);
 	Assert(stream->ios_in_progress == 0);
 
-- 
2.39.5

v3-0006-Support-buffer-forwarding-in-StartReadBuffers.patchtext/x-patch; charset=US-ASCII; name=v3-0006-Support-buffer-forwarding-in-StartReadBuffers.patchDownload
From 02cd8f3f977de017760ff8f4b89fcc83c5aa36ab Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 10 Feb 2025 21:55:40 +1300
Subject: [PATCH v3 6/6] Support buffer forwarding in StartReadBuffers().

Sometimes we have to perform a short read because we hit a cached block
that ends a contiguous run of blocks requiring I/O.  We don't want
StartReadBuffers() to have to start more than one I/O, so we stop there.
We also don't want to have to unpin the cached block (and repin it
later), so previously we'd silently pretend the hit was part of the I/O,
and just leave it out of the read from disk.  Now, we'll "forward" it to
the next call.  We still write it to the buffers[] array for the caller
to pass back to us later, but it's not included in *nblocks.

This policy means that we no longer mix hits and misses in a single
operation's results, so we avoid the requirement to call
WaitReadBuffers(), which might stall, before the caller can make use of
the hits.  The caller will get the hit in the next call instead, and
know that it doesn't have to wait.  That's important for later work on
out-of-order read streams that minimize I/O stalls.

This also makes life easier for proposed work on true AIO, which
occasionally needs to split a large I/O after pinning all the buffers,
while the current coding only ever forwards a single bookending hit.

This API is natural for read_stream.c: it just leaves forwarded buffers
where they are in its circular queue, where the next call will pick them
up and continue, minimizing pin churn.

If we ever think of a good reason to disable this feature, i.e. for
other users of StartReadBuffers() that don't want to deal with forwarded
buffers, then we could add a flag for that.  For now read_steam.c is the
only user.

Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
---
 src/backend/storage/buffer/bufmgr.c | 128 ++++++++++++++++++++--------
 src/include/storage/bufmgr.h        |   1 -
 2 files changed, 91 insertions(+), 38 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index a6138e79306..d56bff96cec 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1257,10 +1257,10 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 					 Buffer *buffers,
 					 BlockNumber blockNum,
 					 int *nblocks,
-					 int flags)
+					 int flags,
+					 bool allow_forwarding)
 {
 	int			actual_nblocks = *nblocks;
-	int			io_buffers_len = 0;
 	int			maxcombine = 0;
 
 	Assert(*nblocks > 0);
@@ -1270,30 +1270,80 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 	{
 		bool		found;
 
-		buffers[i] = PinBufferForBlock(operation->rel,
-									   operation->smgr,
-									   operation->persistence,
-									   operation->forknum,
-									   blockNum + i,
-									   operation->strategy,
-									   &found);
+		if (allow_forwarding && buffers[i] != InvalidBuffer)
+		{
+			BufferDesc *bufHdr;
+
+			/*
+			 * This is a buffer that was pinned by an earlier call to
+			 * StartReadBuffers(), but couldn't be handled in one operation at
+			 * that time.  The operation was split, and the caller has passed
+			 * an already pinned buffer back to us to handle the rest of the
+			 * operation.  It must continue at the expected block number.
+			 */
+			Assert(BufferGetBlockNumber(buffers[i]) == blockNum + i);
+
+			/*
+			 * It might be an already valid buffer (a hit) that followed the
+			 * final contiguous block of an earlier I/O (a miss) marking the
+			 * end of it, or a buffer that some other backend has since made
+			 * valid by performing the I/O for us, in which case we can handle
+			 * it as a hit now.  It is safe to check for a BM_VALID flag with
+			 * a relaxed load, because we got a fresh view of it while pinning
+			 * it in the previous call.
+			 *
+			 * On the other hand if we don't see BM_VALID yet, it must be an
+			 * I/O that was split by the previous call and we need to try to
+			 * start a new I/O from this block.  We're also racing against any
+			 * other backend that might start the I/O or even manage to mark
+			 * it BM_VALID after this check, BM_VALID after this check, but
+			 * StartBufferIO() will handle those cases.
+			 */
+			if (BufferIsLocal(buffers[i]))
+				bufHdr = GetLocalBufferDescriptor(-buffers[i] - 1);
+			else
+				bufHdr = GetBufferDescriptor(buffers[i] - 1);
+			found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID;
+		}
+		else
+		{
+			buffers[i] = PinBufferForBlock(operation->rel,
+										   operation->smgr,
+										   operation->persistence,
+										   operation->forknum,
+										   blockNum + i,
+										   operation->strategy,
+										   &found);
+		}
 
 		if (found)
 		{
 			/*
-			 * Terminate the read as soon as we get a hit.  It could be a
-			 * single buffer hit, or it could be a hit that follows a readable
-			 * range.  We don't want to create more than one readable range,
-			 * so we stop here.
+			 * We have a hit.  If it's the first block in the requested range,
+			 * we can return it immediately and report that WaitReadBuffers()
+			 * does not need to be called.  If the initial value of *nblocks
+			 * was larger, the caller will have to call again for the rest.
 			 */
-			actual_nblocks = i + 1;
+			if (i == 0)
+			{
+				*nblocks = 1;
+				return false;
+			}
+
+			/*
+			 * Otherwise we already have an I/O to perform, but this block
+			 * can't be included as it is already valid.  Split the I/O here.
+			 * There may or may not be more blocks requiring I/O after this
+			 * one, we haven't checked, but it can't be contiguous with this
+			 * hit in the way.  We'll leave this buffer pinned, forwarding it
+			 * to the next call, avoiding the need to unpin it here and re-pin
+			 * it in the next call.
+			 */
+			actual_nblocks = i;
 			break;
 		}
 		else
 		{
-			/* Extend the readable range to cover this block. */
-			io_buffers_len++;
-
 			/*
 			 * Check how many blocks we can cover with the same IO. The smgr
 			 * implementation might e.g. be limited due to a segment boundary.
@@ -1314,15 +1364,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 	}
 	*nblocks = actual_nblocks;
 
-	if (likely(io_buffers_len == 0))
-		return false;
-
 	/* Populate information needed for I/O. */
 	operation->buffers = buffers;
 	operation->blocknum = blockNum;
 	operation->flags = flags;
 	operation->nblocks = actual_nblocks;
-	operation->io_buffers_len = io_buffers_len;
 
 	if (flags & READ_BUFFERS_ISSUE_ADVICE)
 	{
@@ -1337,7 +1383,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 		smgrprefetch(operation->smgr,
 					 operation->forknum,
 					 blockNum,
-					 operation->io_buffers_len);
+					 actual_nblocks);
 	}
 
 	/* Indicate that WaitReadBuffers() should be called. */
@@ -1351,11 +1397,21 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
  * actual number, which may be fewer than requested.  Caller sets some of the
  * members of operation; see struct definition.
  *
+ * The initial contents of the elements of buffers up to *nblocks should
+ * either be InvalidBuffer or an already-pinned buffer that was left by an
+ * preceding call to StartReadBuffers() that had to be split.  On return, some
+ * elements of buffers may hold pinned buffers beyond the number indicated by
+ * the updated value of *nblocks.  Operations are split on boundaries known to
+ * smgr (eg md.c segment boundaries that require crossing into a different
+ * underlying file), or when already cached blocks are found in the buffer
+ * that prevent the formation of a contiguous read.
+ *
  * If false is returned, no I/O is necessary.  If true is returned, one I/O
  * has been started, and WaitReadBuffers() must be called with the same
  * operation object before the buffers are accessed.  Along with the operation
  * object, the caller-supplied array of buffers must remain valid until
- * WaitReadBuffers() is called.
+ * WaitReadBuffers() is called, and any forwarded buffers must also be
+ * preserved for a future call unless explicitly released.
  *
  * Currently the I/O is only started with optional operating system advice if
  * requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O
@@ -1369,13 +1425,18 @@ StartReadBuffers(ReadBuffersOperation *operation,
 				 int *nblocks,
 				 int flags)
 {
-	return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags);
+	return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags,
+								true /* expect forwarded buffers */ );
 }
 
 /*
  * Single block version of the StartReadBuffers().  This might save a few
  * instructions when called from another translation unit, because it is
  * specialized for nblocks == 1.
+ *
+ * This version does not support "forwarded" buffers: they cannot be created
+ * by reading only one block, and the current contents of *buffer is ignored
+ * on entry.
  */
 bool
 StartReadBuffer(ReadBuffersOperation *operation,
@@ -1386,7 +1447,8 @@ StartReadBuffer(ReadBuffersOperation *operation,
 	int			nblocks = 1;
 	bool		result;
 
-	result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags);
+	result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags,
+								  false /* single block, no forwarding */ );
 	Assert(nblocks == 1);		/* single block can't be short */
 
 	return result;
@@ -1416,24 +1478,16 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 	IOObject	io_object;
 	char		persistence;
 
-	/*
-	 * Currently operations are only allowed to include a read of some range,
-	 * with an optional extra buffer that is already pinned at the end.  So
-	 * nblocks can be at most one more than io_buffers_len.
-	 */
-	Assert((operation->nblocks == operation->io_buffers_len) ||
-		   (operation->nblocks == operation->io_buffers_len + 1));
-
 	/* Find the range of the physical read we need to perform. */
-	nblocks = operation->io_buffers_len;
-	if (nblocks == 0)
-		return;					/* nothing to do */
-
+	nblocks = operation->nblocks;
 	buffers = &operation->buffers[0];
 	blocknum = operation->blocknum;
 	forknum = operation->forknum;
 	persistence = operation->persistence;
 
+	Assert(nblocks > 0);
+	Assert(nblocks <= MAX_IO_COMBINE_LIMIT);
+
 	if (persistence == RELPERSISTENCE_TEMP)
 	{
 		io_context = IOCONTEXT_NORMAL;
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 74b5afe8a1a..307f36af384 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -130,7 +130,6 @@ struct ReadBuffersOperation
 	BlockNumber blocknum;
 	int			flags;
 	int16		nblocks;
-	int16		io_buffers_len;
 };
 
 typedef struct ReadBuffersOperation ReadBuffersOperation;
-- 
2.39.5

#12Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#11)
1 attachment(s)
Re: Some read stream improvements

I have pushed the new pin limit patches, after some more testing and
copy editing. I dropped an unnecessary hunk (in read_stream_reset(), a
change I'd like to make but it didn't belong in this commit) and
dropped the word "Soft" from GetSoftPinLimit() as it wasn't helping
comprehension and isn't even true for the local buffer version (which
I still think might be an issue, will come back to that, but again it
seemed independent).

For the record, here's a way to see 92fc6856^ or v17 misbehave and pin
too many buffers without involving any other proposed patches, only
v17 streaming seq scan: with shared_buffers=16MB, max_connections=4,
which gives MaxProportionalBuffers == 44, the attached shows three
cursors each pinning io_combine_limit = 32 buffers for a total of 96
at peak. That's because each cursor starts a stream and sees (the
only time it would check) that it is allowed 44, and then we fetch in
round-robin order until they all ramp up to full I/O size. In v17 we
can't really do much worse than that and it requires pretty contrived
settings and workload for it to be a real issue AFAIK but obviously
and hopefully we soon will eg BHS and more.

Attachments:

test.sqlapplication/sql; name=test.sqlDownload
#13Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#12)
Re: Some read stream improvements

Hi,

On 2025-03-14 22:03:15 +1300, Thomas Munro wrote:

I have pushed the new pin limit patches, after some more testing and
copy editing. I dropped an unnecessary hunk (in read_stream_reset(), a
change I'd like to make but it didn't belong in this commit) and
dropped the word "Soft" from GetSoftPinLimit() as it wasn't helping
comprehension and isn't even true for the local buffer version (which
I still think might be an issue, will come back to that, but again it
seemed independent).

Something odd:

I was looking to push a test that increases localbuf.c coverage, which passed
with a previous version of these changes. However, it failed, and it did so
on FreeBSD alone. The same test doesn't fail when tested together with the
rest of the AIO work.

https://cirrus-ci.com/build/5951090857869312
https://cirrus-ci.com/task/6177637229395968

I do not understand what could be OS dependent here. I tried to build with
exactly the same options as CI on freebsd, but couldn't repro the issue.

It's perhaps worth noting that this failed even before my recent localbuf:
commits.

I ran CI with a patch that adds a bunch of debug information and just runs the
relevant test:
https://cirrus-ci.com/task/6516935619248128

2025-03-17 16:19:14.270 UTC client backend[5526] pg_regress/temp LOG: statement: SELECT count(*), max(a) max_a, min(a) min_a, max(cnt) max_cnt FROM test_temp;
2025-03-17 16:19:14.271 UTC client backend[5526] pg_regress/temp DEBUG: NlocalPinnedBuffers=98++
2025-03-17 16:19:14.271 UTC client backend[5526] pg_regress/temp DEBUG: NlocalPinnedBuffers=99--
2025-03-17 16:19:14.271 UTC client backend[5526] pg_regress/temp DEBUG: pgsr create 0xde34f1f57f8: test_temp, max_pinned: 100, NLocPin: 98
2025-03-17 16:19:14.271 UTC client backend[5526] pg_regress/temp DEBUG: pgsr 0xde34f1f57f8: buffer_limit: 2, pinned_buffers: 0, max_pinned: 100, ios_i_p: 0, distance: 1, pending_read_nblocks: 1, NLocPin: 98

comparing that with a local run:

2025-03-17 12:18:55.989 EDT client backend[4117093] pg_regress/temp LOG: statement: SELECT count(*), max(a) max_a, min(a) min_a, max(cnt) max_cnt FROM test_temp;
2025-03-17 12:18:55.989 EDT client backend[4117093] pg_regress/temp DEBUG: pgsr create 0x56029555cad8: test_temp, max_pinned: 100, NLocPin: 97
2025-03-17 12:18:55.989 EDT client backend[4117093] pg_regress/temp DEBUG: pgsr 0x56029555cad8: buffer_limit: 3, pinned_buffers: 0, max_pinned: 100, ios_i_p: 0, distance: 1, pending_read_nblocks: 1, NLocPin: 97
2025-03-17 12:18:55.989 EDT client backend[4117093] pg_regress/temp DEBUG: pgsr 0x56029555cad8: StartReadBuffers: wait: 0, pinned: 0 += 1, distance: 1
2025-03-17 12:18:55.989 EDT client backend[4117093] pg_regress/temp DEBUG: pgsr 0x56029555cad8: buffer_limit: 3, pinned_buffers: 0, max_pinned: 100, ios_i_p: 0, distance: 1, pending_read_nblocks: 1, NLocPin: 97
2025-03-17 12:18:55.989 EDT client backend[4117093] pg_regress/temp DEBUG: pgsr 0x56029555cad8: StartReadBuffers: wait: 0, pinned: 0 += 1, distance: 1

So one thing is that the pin count differs by 1 at the start of the scan. No
idea why.

But knowing that I was able to repro the issue locally too, by just ensuring
the pin count is one higher by the start of the query.

I still don't know what drives the difference between freebsd and the rest,
but IIUC the reason this fails is just that we are holding too many buffers
pinned, due to some buffers being pinned outside of read_stream.c.

Greetings,

Andres Freund

#14Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#13)
Re: Some read stream improvements

On Tue, Mar 18, 2025 at 5:56 AM Andres Freund <andres@anarazel.de> wrote:

So one thing is that the pin count differs by 1 at the start of the scan. No
idea why.

I still don't know what drives the difference between freebsd and the rest,
but IIUC the reason this fails is just that we are holding too many buffers
pinned, due to some buffers being pinned outside of read_stream.c.

I couldn't reproduce this on my local FreeBSD box, but I think I see
one part of the problem: the cursor a few lines up has a stream with a
higher distance holding a couple of pins. Not sure how the local
buffer pool got into a state that caused that if it isn't doing the
same on other machines, but anyway, if I read the test right you
intend to pin strictly one page per cursor, so I tried saying so
explicitly:

-       query = format($q$DECLARE %I CURSOR FOR SELECT ctid FROM
test_temp WHERE ctid >= '( %s, 1)'::tid $q$, cursorname, i);
+       query = format($q$DECLARE %I CURSOR FOR SELECT ctid FROM
test_temp WHERE ctid between '(%s, 1)'::tid and '(%s, 9999)'::tid $q$,
cursorname, i, i);

That passed on the FreeBSD CI task.

#15Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#12)
Re: Some read stream improvements

Hi,

On 2025-03-14 22:03:15 +1300, Thomas Munro wrote:

I have pushed the new pin limit patches, after some more testing and
copy editing. I dropped an unnecessary hunk (in read_stream_reset(), a
change I'd like to make but it didn't belong in this commit) and
dropped the word "Soft" from GetSoftPinLimit() as it wasn't helping
comprehension and isn't even true for the local buffer version (which
I still think might be an issue, will come back to that, but again it
seemed independent).

I found an, easy to fix, issue in read_stream.c. If the limit returned by
GetAdditionalPinLimit() is very large, the buffer_limit variable in
read_stream_start_pending_read() can overflow.

While the code is careful to limit buffer_limit PG_INT16_MAX, we subsequently
add the number of forwarded buffers:

if (stream->temporary)
buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
else
buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
buffer_limit += stream->forwarded_buffers;

I think there's a second, theoretical, overflow issue shortly after:

/* Shrink distance: no more look-ahead until buffers are released. */
new_distance = stream->pinned_buffers + buffer_limit;
if (stream->distance > new_distance)
stream->distance = new_distance;

while that was hit in the case I was looking at, it was only hit because
buffer_limit had already wrapped around into the negative. I don't think
nblocks can be big enough to really hit this.

I don't actually see any reason for buffer_limit to be a 16bit quantity? It's
just to clamp things down, right?

Greetings,

Andres Freund

#16Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#15)
Re: Some read stream improvements

On Thu, Apr 3, 2025 at 11:17 AM Andres Freund <andres@anarazel.de> wrote:

I don't actually see any reason for buffer_limit to be a 16bit quantity? It's
just to clamp things down, right?

Ugh. It might be worth just flipping this whole thing over to ints,
let me look into that...

#17Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#16)
Re: Some read stream improvements

Hi,

On 2025-04-03 14:43:40 +1300, Thomas Munro wrote:

On Thu, Apr 3, 2025 at 11:17 AM Andres Freund <andres@anarazel.de> wrote:

I don't actually see any reason for buffer_limit to be a 16bit quantity? It's
just to clamp things down, right?

Ugh. It might be worth just flipping this whole thing over to ints,
let me look into that...

If you don't mind I'll push the obvious minimal fix soon - it's a confusing
crash when one encounters this issue...

Greetings,

Andres Freund

#18Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#17)
Re: Some read stream improvements

On Mon, Apr 7, 2025 at 1:29 PM Andres Freund <andres@anarazel.de> wrote:

On 2025-04-03 14:43:40 +1300, Thomas Munro wrote:

On Thu, Apr 3, 2025 at 11:17 AM Andres Freund <andres@anarazel.de> wrote:

I don't actually see any reason for buffer_limit to be a 16bit quantity? It's
just to clamp things down, right?

Ugh. It might be worth just flipping this whole thing over to ints,
let me look into that...

If you don't mind I'll push the obvious minimal fix soon - it's a confusing
crash when one encounters this issue...

Please go ahead, sorry I got distracted trying to fix something else...