From a40f9adf9e0867ab208d15a80b5326aa1ca68d48 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Tue, 5 Aug 2025 14:35:09 +1200 Subject: [PATCH] Fix and refactor read_stream.c's split IO handling. If a circular queue wraparound, a multi-block IO split and a transition to the fast path happened in a certain sequence, a buffer forwarded from one StartReadBuffers() call to next would not be cleared out. That could confuse a later queue-wrapping StartReadBuffers() call by passing it a random buffer. Fixing that rare case would be a one-liner, but this slightly larger refactoring avoids the whole class of problem and saves a few cycles. Remove the need to initialize and clear stream->buffers[] elements that will be passed to StartReadBuffers() so that it can distinguish forwarded buffers. Instead, add an explicit npinned argument to tell it how many buffers are already pinned. It is an in/out argument, and the output value can be used directly as input as input to the following call along with the buffers it counts. This also removes the need to identify and count them when StartReadBuffers() returns. This number is now stored in stream->pending_read_npinned, the number of leading buffers that are already pinned. Defect in commit ed0b87ca. XXX WIP Bug: 19006 Backpatch-through: 18 Reported-by: Alexander Lakhin Reviewed-by: Discussion: https://postgr.es/m/19006-80fcaaf69000377e%40postgresql.org --- src/backend/storage/aio/read_stream.c | 68 ++++++--------------------- src/backend/storage/buffer/bufmgr.c | 44 +++++++++++------ src/include/storage/bufmgr.h | 1 + 3 files changed, 45 insertions(+), 68 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 0e7f5557f5c..1b327d0ca7a 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -96,10 +96,8 @@ struct ReadStream int16 ios_in_progress; int16 queue_size; int16 max_pinned_buffers; - int16 forwarded_buffers; int16 pinned_buffers; int16 distance; - int16 initialized_buffers; int read_buffers_flags; bool sync_mode; /* using io_method=sync */ bool batch_mode; /* READ_STREAM_USE_BATCHING */ @@ -126,6 +124,7 @@ struct ReadStream /* The read operation we are currently preparing. */ BlockNumber pending_read_blocknum; int16 pending_read_nblocks; + int pending_read_npinned; /* Space for buffers and optional per-buffer private data. */ size_t per_buffer_data_size; @@ -230,10 +229,8 @@ static bool read_stream_start_pending_read(ReadStream *stream) { bool need_wait; - int requested_nblocks; int nblocks; int flags; - int forwarded; int16 io_index; int16 overflow; int16 buffer_index; @@ -293,9 +290,9 @@ read_stream_start_pending_read(ReadStream *stream) buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX); else buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX); - Assert(stream->forwarded_buffers <= stream->pending_read_nblocks); + Assert(stream->pending_read_npinned <= stream->pending_read_nblocks); - buffer_limit += stream->forwarded_buffers; + buffer_limit += stream->pending_read_npinned; buffer_limit = Min(buffer_limit, PG_INT16_MAX); if (buffer_limit == 0 && stream->pinned_buffers == 0) @@ -322,20 +319,15 @@ read_stream_start_pending_read(ReadStream *stream) /* * We say how many blocks we want to read, but it may be smaller on return - * if the buffer manager decides to shorten the read. Initialize buffers - * to InvalidBuffer (= not a forwarded buffer) as input on first use only, - * and keep the original nblocks number so we can check for forwarded - * buffers as output, below. + * if the buffer manager decides to shorten the read. */ buffer_index = stream->next_buffer_index; io_index = stream->next_io_index; - while (stream->initialized_buffers < buffer_index + nblocks) - stream->buffers[stream->initialized_buffers++] = InvalidBuffer; - requested_nblocks = nblocks; need_wait = StartReadBuffers(&stream->ios[io_index].op, &stream->buffers[buffer_index], stream->pending_read_blocknum, &nblocks, + &stream->pending_read_npinned, flags); stream->pinned_buffers += nblocks; @@ -360,28 +352,15 @@ read_stream_start_pending_read(ReadStream *stream) stream->seq_blocknum = stream->pending_read_blocknum + nblocks; } - /* - * How many pins were acquired but forwarded to the next call? These need - * to be passed to the next StartReadBuffers() call by leaving them - * exactly where they are in the queue, or released if the stream ends - * early. We need the number for accounting purposes, since they are not - * counted in stream->pinned_buffers but we already hold them. - */ - forwarded = 0; - while (nblocks + forwarded < requested_nblocks && - stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer) - forwarded++; - stream->forwarded_buffers = forwarded; - /* * We gave a contiguous range of buffer space to StartReadBuffers(), but * we want it to wrap around at queue_size. Copy overflowing buffers to * the front of the array where they'll be consumed, but also leave a copy * in the overflow zone which the I/O operation has a pointer to (it needs - * a contiguous array). Both copies will be cleared when the buffers are - * handed to the consumer. + * a contiguous array). */ - overflow = (buffer_index + nblocks + forwarded) - stream->queue_size; + overflow = (buffer_index + nblocks + stream->pending_read_npinned) - + stream->queue_size; if (overflow > 0) { Assert(overflow < stream->queue_size); /* can't overlap */ @@ -400,6 +379,7 @@ read_stream_start_pending_read(ReadStream *stream) /* Adjust the pending read to cover the remaining portion, if any. */ stream->pending_read_blocknum += nblocks; stream->pending_read_nblocks -= nblocks; + Assert(stream->pending_read_nblocks >= stream->pending_read_npinned); return true; } @@ -786,12 +766,11 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* Fast path assumptions. */ Assert(stream->ios_in_progress == 0); - Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 1); Assert(stream->distance == 1); Assert(stream->pending_read_nblocks == 0); + Assert(stream->pending_read_npinned == 0); Assert(stream->per_buffer_data_size == 0); - Assert(stream->initialized_buffers > stream->oldest_buffer_index); /* We're going to return the buffer we pinned last time. */ oldest_buffer_index = stream->oldest_buffer_index; @@ -844,7 +823,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->distance = 0; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; - stream->buffers[oldest_buffer_index] = InvalidBuffer; } stream->fast_path = false; @@ -920,16 +898,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->seq_until_processed = InvalidBlockNumber; } - /* - * We must zap this queue entry, or else it would appear as a forwarded - * buffer. If it's potentially in the overflow zone (ie from a - * multi-block I/O that wrapped around the queue), also zap the copy. - */ - stream->buffers[oldest_buffer_index] = InvalidBuffer; - if (oldest_buffer_index < stream->io_combine_limit - 1) - stream->buffers[stream->queue_size + oldest_buffer_index] = - InvalidBuffer; - #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND) /* @@ -973,10 +941,10 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) #ifndef READ_STREAM_DISABLE_FAST_PATH /* See if we can take the fast path for all-cached scans next time. */ if (stream->ios_in_progress == 0 && - stream->forwarded_buffers == 0 && stream->pinned_buffers == 1 && stream->distance == 1 && stream->pending_read_nblocks == 0 && + stream->pending_read_npinned == 0 && stream->per_buffer_data_size == 0) { stream->fast_path = true; @@ -1025,22 +993,14 @@ read_stream_reset(ReadStream *stream) /* Unpin any unused forwarded buffers. */ index = stream->next_buffer_index; - while (index < stream->initialized_buffers && - (buffer = stream->buffers[index]) != InvalidBuffer) + while (stream->pending_read_npinned > 0) { - Assert(stream->forwarded_buffers > 0); - stream->forwarded_buffers--; - ReleaseBuffer(buffer); - - stream->buffers[index] = InvalidBuffer; - if (index < stream->io_combine_limit - 1) - stream->buffers[stream->queue_size + index] = InvalidBuffer; - + ReleaseBuffer(stream->buffers[index]); + stream->pending_read_npinned--; if (++index == stream->queue_size) index = 0; } - Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 0); Assert(stream->ios_in_progress == 0); diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 6c9ae83a7a8..e58a1539f37 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1263,14 +1263,18 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, Buffer *buffers, BlockNumber blockNum, int *nblocks, + int *npinned, int flags, bool allow_forwarding) { int actual_nblocks = *nblocks; + int actual_npinned = *npinned; int maxcombine = 0; bool did_start_io; + Assert(*npinned == 0 || allow_forwarding); Assert(*nblocks == 1 || allow_forwarding); + Assert(*npinned <= *nblocks); Assert(*nblocks > 0); Assert(*nblocks <= MAX_IO_COMBINE_LIMIT); @@ -1278,7 +1282,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, { bool found; - if (allow_forwarding && buffers[i] != InvalidBuffer) + if (allow_forwarding && i < actual_npinned) { BufferDesc *bufHdr; @@ -1323,6 +1327,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, blockNum + i, operation->strategy, &found); + actual_npinned++; } if (found) @@ -1336,6 +1341,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, if (i == 0) { *nblocks = 1; + *npinned = actual_npinned - 1; #ifdef USE_ASSERT_CHECKING @@ -1459,22 +1465,30 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, CheckReadBuffersOperation(operation, !did_start_io); + /* Pinned buffers not included in final nblocks, forwarded to next call. */ + *npinned = actual_npinned - *nblocks; + return did_start_io; } /* * Begin reading a range of blocks beginning at blockNum and extending for - * *nblocks. *nblocks and the buffers array are in/out parameters. On entry, - * the buffers elements covered by *nblocks must hold either InvalidBuffer or - * buffers forwarded by an earlier call to StartReadBuffers() that was split - * and is now being continued. On return, *nblocks holds the number of blocks - * accepted by this operation. If it is less than the original number then - * this operation has been split, but buffer elements up to the original - * requested size may hold forwarded buffers to be used for a continuing - * operation. The caller must either start a new I/O beginning at the block - * immediately following the blocks accepted by this call and pass those - * buffers back in, or release them if it chooses not to. It shouldn't make - * any other use of or assumptions about forwarded buffers. + * *nblocks. *nblocks, *npinned and the buffers array are in/out parameters. + * + * On entry, *nblocks is the desire number of block to read. On exit, it may + * be a smaller number if the operation was split. + * + * On entry, *npinned is the number of pinned buffers forwarded by an earlier + * operation that was split. On exit, it is the number forwarded by this call, + * which should be passed to the following call when continuing to read the + * same sequence of blocks, along with the corresponding buffers. + * + * When buffers are forwarded, as reported by *npinned, the caller must either + * start a new I/O beginning at the block immediately following the blocks + * accepted by this call (*nblocks on exit) and pass those buffers back in head + * position, or release them if it chooses not to. They are located in the + * buffers array beginning at index *nblocks (on exit). It shouldn't make any + * other use of or assumptions about forwarded buffers. * * If false is returned, no I/O is necessary and the buffers covered by * *nblocks on exit are valid and ready to be accessed. If true is returned, @@ -1490,9 +1504,10 @@ StartReadBuffers(ReadBuffersOperation *operation, Buffer *buffers, BlockNumber blockNum, int *nblocks, + int *npinned, int flags) { - return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags, + return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, npinned, flags, true /* expect forwarded buffers */ ); } @@ -1511,9 +1526,10 @@ StartReadBuffer(ReadBuffersOperation *operation, int flags) { int nblocks = 1; + int npinned = 0; bool result; - result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags, + result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, &npinned, flags, false /* single block, no forwarding */ ); Assert(nblocks == 1); /* single block can't be short */ diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 41fdc1e7693..e5e8d8129d4 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -225,6 +225,7 @@ extern bool StartReadBuffers(ReadBuffersOperation *operation, Buffer *buffers, BlockNumber blockNum, int *nblocks, + int *npinned, int flags); extern void WaitReadBuffers(ReadBuffersOperation *operation); -- 2.39.5 (Apple Git-154)