From ff6da44df5418d73ad9ec1911c87b66897f3b086 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 15 Jun 2024 14:37:26 +1200
Subject: [PATCH 1/2] Introduce read_stream_{pause,resume,yield}().

---
 src/backend/storage/aio/read_stream.c | 50 ++++++++++++++++++++++++++-
 src/include/storage/read_stream.h     |  3 ++
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 031fde9f4cb..964e1aa281c 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -100,11 +100,13 @@ struct ReadStream
 	int16		pinned_buffers;
 	int16		distance;
 	int16		initialized_buffers;
+	int16		resume_distance;
 	int			read_buffers_flags;
 	bool		sync_mode;		/* using io_method=sync */
 	bool		batch_mode;		/* READ_STREAM_USE_BATCHING */
 	bool		advice_enabled;
 	bool		temporary;
+	bool		yielded;
 
 	/*
 	 * One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -879,7 +881,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 
 		/* End of stream reached?  */
 		if (stream->distance == 0)
-			return InvalidBuffer;
+		{
+			if (!stream->yielded)
+				return InvalidBuffer;
+
+			/* The callback yielded.  Resume. */
+			stream->yielded = false;
+			read_stream_resume(stream);
+			Assert(stream->distance != 0);
+		}
 
 		/*
 		 * The usual order of operations is that we look ahead at the bottom
@@ -1034,6 +1044,44 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 	return read_stream_get_block(stream, NULL);
 }
 
+/*
+ * Temporarily stop consuming block numbers from the block number callback.  If
+ * called inside the block number callback, its return value should be
+ * returned by the callback.
+ */
+BlockNumber
+read_stream_pause(ReadStream *stream)
+{
+	stream->resume_distance = stream->distance;
+	stream->distance = 0;
+	return InvalidBlockNumber;
+}
+
+/*
+ * Resume looking ahead after the block number callback reported end-of-stream.
+ * This is useful for streams of self-referential blocks, after a buffer needed
+ * to be consumed and examined to find more block numbers.
+ */
+void
+read_stream_resume(ReadStream *stream)
+{
+	stream->distance = stream->resume_distance;
+}
+
+/*
+ * Called from inside a block number callback, to return control to the caller
+ * of read_stream_next_buffer() without looking further ahead.  Its return
+ * value should be returned by the callback.  This is equivalent to pausing and
+ * resuming automatically at the next call to read_stream_next_buffer().
+ */
+BlockNumber
+read_stream_yield(ReadStream *stream)
+{
+	read_stream_pause(stream);
+	stream->yielded = true;
+	return InvalidBlockNumber;
+}
+
 /*
  * Reset a read stream by releasing any queued up buffers, allowing the stream
  * to be used again for different blocks.  This can be used to clear an
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index 9b0d65161d0..8ac53d2902d 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -99,6 +99,9 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags,
 												   ReadStreamBlockNumberCB callback,
 												   void *callback_private_data,
 												   size_t per_buffer_data_size);
+extern BlockNumber read_stream_pause(ReadStream *stream);
+extern void read_stream_resume(ReadStream *stream);
+extern BlockNumber read_stream_yield(ReadStream *stream);
 extern void read_stream_reset(ReadStream *stream);
 extern void read_stream_end(ReadStream *stream);
 
-- 
2.51.2

