From 3b460cb102c8af49a0af0d3b08bf86a436564fd3 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 15 Feb 2025 14:47:25 +1300
Subject: [PATCH v2 1/4] Move read stream modulo arithmetic into functions.

Several places have open-coded circular index arithmetic.  Make some
common functions for better readability and consistent assertion
checking.  This avoids adding yet more open-coding in later patches.
---
 src/backend/storage/aio/read_stream.c | 93 +++++++++++++++++++++------
 1 file changed, 74 insertions(+), 19 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 0e7f5557f5c..0d3dd65bfed 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -216,6 +216,67 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
 	stream->buffered_blocknum = blocknum;
 }
 
+/*
+ * Increment buffer index, wrapping around at queue size.
+ */
+static inline void
+read_stream_advance_buffer(ReadStream *stream, int16 *index)
+{
+	Assert(*index >= 0);
+	Assert(*index < stream->queue_size);
+
+	*index += 1;
+	if (*index == stream->queue_size)
+		*index = 0;
+}
+
+/*
+ * Increment buffer index by n, wrapping around at queue size.
+ */
+static inline void
+read_stream_advance_buffer_n(ReadStream *stream, int16 *index, int16 n)
+{
+	Assert(*index >= 0);
+	Assert(*index < stream->queue_size);
+	Assert(n <= stream->io_combine_limit);
+
+	*index += n;
+	if (*index >= stream->queue_size)
+		*index -= stream->queue_size;
+
+	Assert(*index >= 0);
+	Assert(*index < stream->queue_size);
+}
+
+/*
+ * Decrement buffer index, wrapping around at queue size.
+ */
+static inline void
+read_stream_retreat_buffer(ReadStream *stream, int16 *index)
+{
+	Assert(*index >= 0);
+	Assert(*index < stream->queue_size);
+
+	if (*index == 0)
+		*index = stream->queue_size - 1;
+	else
+		*index -= 1;
+}
+
+/*
+ * Increment IO index, wrapping around at queue size.
+ */
+static inline void
+read_stream_advance_io(ReadStream *stream, int16 *index)
+{
+	Assert(*index >= 0);
+	Assert(*index < stream->max_ios);
+
+	*index += 1;
+	if (*index == stream->max_ios)
+		*index = 0;
+}
+
 /*
  * Start as much of the current pending read as we can.  If we have to split it
  * because of the per-backend buffer limit, or the buffer manager decides to
@@ -353,8 +414,7 @@ read_stream_start_pending_read(ReadStream *stream)
 		 * Look-ahead distance will be adjusted after waiting.
 		 */
 		stream->ios[io_index].buffer_index = buffer_index;
-		if (++stream->next_io_index == stream->max_ios)
-			stream->next_io_index = 0;
+		read_stream_advance_io(stream, &stream->next_io_index);
 		Assert(stream->ios_in_progress < stream->max_ios);
 		stream->ios_in_progress++;
 		stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
@@ -390,11 +450,8 @@ read_stream_start_pending_read(ReadStream *stream)
 			   sizeof(stream->buffers[0]) * overflow);
 	}
 
-	/* Compute location of start of next read, without using % operator. */
-	buffer_index += nblocks;
-	if (buffer_index >= stream->queue_size)
-		buffer_index -= stream->queue_size;
-	Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
+	/* Move to the location of start of next read. */
+	read_stream_advance_buffer_n(stream, &buffer_index, nblocks);
 	stream->next_buffer_index = buffer_index;
 
 	/* Adjust the pending read to cover the remaining portion, if any. */
@@ -432,12 +489,12 @@ read_stream_look_ahead(ReadStream *stream)
 		/*
 		 * See which block the callback wants next in the stream.  We need to
 		 * compute the index of the Nth block of the pending read including
-		 * wrap-around, but we don't want to use the expensive % operator.
+		 * wrap-around.
 		 */
-		buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
-		if (buffer_index >= stream->queue_size)
-			buffer_index -= stream->queue_size;
-		Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
+		buffer_index = stream->next_buffer_index;
+		read_stream_advance_buffer_n(stream,
+									 &buffer_index,
+									 stream->pending_read_nblocks);
 		per_buffer_data = get_per_buffer_data(stream, buffer_index);
 		blocknum = read_stream_get_block(stream, per_buffer_data);
 		if (blocknum == InvalidBlockNumber)
@@ -940,12 +997,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	 */
 	if (stream->per_buffer_data)
 	{
+		int16		index;
 		void	   *per_buffer_data;
 
-		per_buffer_data = get_per_buffer_data(stream,
-											  oldest_buffer_index == 0 ?
-											  stream->queue_size - 1 :
-											  oldest_buffer_index - 1);
+		index = oldest_buffer_index;
+		read_stream_retreat_buffer(stream, &index);
+		per_buffer_data = get_per_buffer_data(stream, index);
 
 #if defined(CLOBBER_FREED_MEMORY)
 		/* This also tells Valgrind the memory is "noaccess". */
@@ -963,9 +1020,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	stream->pinned_buffers--;
 
 	/* Advance oldest buffer, with wrap-around. */
-	stream->oldest_buffer_index++;
-	if (stream->oldest_buffer_index == stream->queue_size)
-		stream->oldest_buffer_index = 0;
+	read_stream_advance_buffer(stream, &stream->oldest_buffer_index);
 
 	/* Prepare for the next call. */
 	read_stream_look_ahead(stream);
-- 
2.39.5

