From 4460435cd2170123f2f316fbaed2b1bd8578393e Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sun, 9 Mar 2025 18:44:36 -0400
Subject: [PATCH v2.8 20/38] localbuf: Introduce StartLocalBufferIO()

To initiate IO on a shared buffer we have StartBufferIO(). For temporary table
buffers no similar function exists - likely because the code for that
currently is very simple due to the lack of concurrency.

However, the upcoming AIO support will make it possible to re-encounter a
local buffer, while the buffer already is the target of IO. In that case we
need to wait for already in-progress IO to complete. This commit makes it
easier to add the necessary code, by introducing StartLocalBufferIO().

Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Discussion: https://postgr.es/m/CAAKRu_b9anbWzEs5AAF9WCvcEVmgz-1AkHSQ-CLLy-p7WHzvFw@mail.gmail.com
---
 src/include/storage/buf_internals.h   |  1 +
 src/backend/storage/buffer/bufmgr.c   |  8 ++----
 src/backend/storage/buffer/localbuf.c | 36 +++++++++++++++++++++++++++
 3 files changed, 39 insertions(+), 6 deletions(-)

diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 90bc7e0db7b..9327f60c44c 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -473,6 +473,7 @@ extern BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr,
 extern void MarkLocalBufferDirty(Buffer buffer);
 extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty,
 								   uint32 set_flag_bits);
+extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput);
 extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln);
 extern void DropRelationLocalBuffers(RelFileLocator rlocator,
 									 ForkNumber forkNum,
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 4a7cf3f3cc4..95911b51d58 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1038,7 +1038,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
 	{
 		/* Simple case for non-shared buffers. */
 		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
-		need_to_zero = (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
+		need_to_zero = StartLocalBufferIO(bufHdr, true);
 	}
 	else
 	{
@@ -1450,11 +1450,7 @@ static inline bool
 WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
 {
 	if (BufferIsLocal(buffer))
-	{
-		BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);
-
-		return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
-	}
+		return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), true);
 	else
 		return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
 }
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 8efde05c0a5..f172a5c7820 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -183,6 +183,13 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
 	instr_time	io_start;
 	Page		localpage = (char *) LocalBufHdrGetBlock(bufHdr);
 
+	/*
+	 * Try to start an I/O operation.  There currently are no reasons for
+	 * StartLocalBufferIO to return false, so we raise an error in that case.
+	 */
+	if (!StartLocalBufferIO(bufHdr, false))
+		elog(ERROR, "failed to start write IO on local buffer");
+
 	/* Find smgr relation for buffer */
 	if (reln == NULL)
 		reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
@@ -406,11 +413,17 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 			PinLocalBuffer(existing_hdr, false);
 			buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
 
+			/*
+			 * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
+			 */
 			buf_state = pg_atomic_read_u32(&existing_hdr->state);
 			Assert(buf_state & BM_TAG_VALID);
 			Assert(!(buf_state & BM_DIRTY));
 			buf_state &= ~BM_VALID;
 			pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
+
+			/* no need to loop for local buffers */
+			StartLocalBufferIO(existing_hdr, true);
 		}
 		else
 		{
@@ -425,6 +438,8 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 			pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
 
 			hresult->id = victim_buf_id;
+
+			StartLocalBufferIO(victim_buf_hdr, true);
 		}
 	}
 
@@ -489,6 +504,27 @@ MarkLocalBufferDirty(Buffer buffer)
 	pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
 }
 
+/*
+ * Like StartBufferIO, but for local buffers
+ */
+bool
+StartLocalBufferIO(BufferDesc *bufHdr, bool forInput)
+{
+	uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+	if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
+	{
+		/* someone else already did the I/O */
+		return false;
+	}
+
+	/* BM_IO_IN_PROGRESS isn't currently used for local buffers */
+
+	/* local buffers don't track IO using resowners */
+
+	return true;
+}
+
 /*
  * Like TerminateBufferIO, but for local buffers
  */
-- 
2.48.1.76.g4e746b1a31.dirty

