From 2f2370859b959bf48424966db30e3ba25a1ac0f0 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 26 Mar 2025 14:41:48 -0400
Subject: [PATCH v2.13 08/28] squash-later: bufmgr: Implement AIO read support

---
 src/backend/storage/buffer/bufmgr.c | 200 ++++++++++++++++++++++------
 1 file changed, 156 insertions(+), 44 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 3cf8b0f98d2..0b96e256625 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -6327,20 +6327,75 @@ buffer_stage_common(PgAioHandle *ioh, bool is_write, bool is_temp)
 	}
 }
 
+/*
+ * Helper to encode errors for buffer_readv_complete()
+ *
+ * Errors are encoded as follows:
+ * - bit 0 indicates whether page was zeroed (1) or not (0)
+ * - next 8 bits indicate the first offset is the offset of the first page
+ *   that failed verification in a larger IO
+ * - next 8 bits indicate the number of corruptions
+ */
+static inline void
+buffer_readv_encode_error(PgAioResult *result, bool is_temp,
+						  bool was_zeroed, uint8 first_invalid_off,
+						  uint8 count_invalid)
+{
+
+	uint8		shift = 0;
+
+	StaticAssertStmt(PG_IOV_MAX <= 1 << 8,
+					 "PG_IOV_MAX is bigger than reserved space for error data");
+
+	result->error_data = 0;
+
+	result->error_data |= was_zeroed << shift;
+	shift += 1;
+
+	result->error_data |= first_invalid_off << shift;
+	shift += 8;
+
+	result->error_data |= count_invalid << shift;
+	shift += 8;
+
+	result->id = is_temp ? PGAIO_HCB_LOCAL_BUFFER_READV :
+		PGAIO_HCB_SHARED_BUFFER_READV;
+	result->status = was_zeroed ? PGAIO_RS_WARNING : PGAIO_RS_ERROR;
+}
+
+/*
+ * Decode readv errors as encoded by buffer_readv_encode_error().
+ */
+static inline void
+buffer_readv_decode_error(PgAioResult result,
+						  bool *was_zeroed, uint8 *first_invalid_off, uint8 *count_invalid)
+{
+	uint32		rem_error = result.error_data;
+
+	*was_zeroed = rem_error & 1;
+	rem_error >>= 1;
+
+	*first_invalid_off = rem_error & 0xff;
+	rem_error >>= 8;
+
+	*count_invalid = rem_error & 0xff;
+	rem_error >>= 8;
+}
+
 /*
  * Helper for AIO readv completion callbacks, supporting both shared and temp
  * buffers. Gets called once for each buffer in a multi-page read.
  */
-static pg_attribute_always_inline PgAioResult
-buffer_readv_complete_one(uint8 buf_off, Buffer buffer, uint8 flags,
-						  bool failed, bool is_temp)
+static pg_attribute_always_inline void
+buffer_readv_complete_one(PgAioTargetData *td, uint8 buf_off, Buffer buffer,
+						  uint8 flags, bool failed, bool is_temp,
+						  bool *failed_verification)
 {
 	BufferDesc *buf_hdr = is_temp ?
 		GetLocalBufferDescriptor(-buffer - 1)
 		: GetBufferDescriptor(buffer - 1);
 	BufferTag	tag = buf_hdr->tag;
 	char	   *bufdata = BufferGetBlock(buffer);
-	PgAioResult result = {.status = PGAIO_RS_OK};
 	uint32		set_flag_bits;
 
 	/* check that the buffer is in the expected state for a read */
@@ -6357,37 +6412,45 @@ buffer_readv_complete_one(uint8 buf_off, Buffer buffer, uint8 flags,
 	}
 #endif
 
+	*failed_verification = false;
+
 	/* check for garbage data */
 	if (!failed &&
 		!PageIsVerifiedExtended((Page) bufdata, tag.blockNum,
 								PIV_LOG_WARNING | PIV_REPORT_STAT))
 	{
-		RelFileLocator rlocator = BufTagGetRelFileLocator(&tag);
+		PgAioResult result_one;
 
-		if ((flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages)
+		*failed_verification = true;
+
+		if (flags & READ_BUFFERS_ZERO_ON_ERROR)
 		{
-			ereport(WARNING,
-					(errcode(ERRCODE_DATA_CORRUPTED),
-					 errmsg("invalid page in block %u of relation %s; zeroing out page",
-							tag.blockNum,
-							relpathbackend(rlocator,
-										   is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
-										   tag.forkNum).str)));
 			memset(bufdata, 0, BLCKSZ);
 		}
 		else
 		{
 			/* mark buffer as having failed */
 			failed = true;
-
-			/* encode error for buffer_readv_report */
-			result.status = PGAIO_RS_ERROR;
-			if (is_temp)
-				result.id = PGAIO_HCB_LOCAL_BUFFER_READV;
-			else
-				result.id = PGAIO_HCB_SHARED_BUFFER_READV;
-			result.error_data = buf_off;
 		}
+
+		/*
+		 * Immediately log a message about the invalid page, but only to the
+		 * server log. The reason to do so immediately is that this may be
+		 * executed in a different backend than the one that originated the
+		 * request. The reason to do so immediately is that the originator
+		 * might not process the query result immediately (because it is busy
+		 * doing another part of query processing) or at all (e.g. if it was
+		 * cancelled or errored out due to another IO also failing). The
+		 * issuer of the IO will emit an ERROR or WARNING when processing the
+		 * IO's results
+		 *
+		 * To avoid duplicating the code to emit these log messages, we reuse
+		 * buffer_readv_report().
+		 */
+		buffer_readv_encode_error(&result_one, is_temp,
+								  flags & READ_BUFFERS_ZERO_ON_ERROR,
+								  buf_off, 1);
+		pgaio_result_report(result_one, td, LOG_SERVER_ONLY);
 	}
 
 	/* Terminate I/O and set BM_VALID. */
@@ -6412,8 +6475,6 @@ buffer_readv_complete_one(uint8 buf_off, Buffer buffer, uint8 flags,
 									  tag.relNumber,
 									  is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
 									  false);
-
-	return result;
 }
 
 /*
@@ -6428,6 +6489,8 @@ buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
 {
 	PgAioResult result = prior_result;
 	PgAioTargetData *td = pgaio_io_get_target_data(ioh);
+	uint8		first_invalid_off = 0;
+	uint8		invalid_count = 0;
 	uint64	   *io_data;
 	uint8		handle_data_len;
 
@@ -6447,35 +6510,46 @@ buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
 	for (uint8 buf_off = 0; buf_off < handle_data_len; buf_off++)
 	{
 		Buffer		buf = io_data[buf_off];
-		PgAioResult buf_result;
 		bool		failed;
+		bool		failed_verification = false;
 
 		Assert(BufferIsValid(buf));
 
 		/*
 		 * If the entire I/O failed on a lower-level, each buffer needs to be
-		 * marked as failed. In case of a partial read, some buffers may be
-		 * ok.
+		 * marked as failed. In case of a partial read, the first few buffers
+		 * may be ok.
 		 */
 		failed =
 			prior_result.status == PGAIO_RS_ERROR
 			|| prior_result.result <= buf_off;
 
-		buf_result = buffer_readv_complete_one(buf_off, buf, cb_data, failed,
-											   is_temp);
+		buffer_readv_complete_one(td, buf_off, buf, cb_data, failed, is_temp,
+								  &failed_verification);
 
 		/*
-		 * If there wasn't any prior error and page verification failed in
-		 * some form, set the whole IO's result to the page's result.
+		 * Track information about the number of errors across all pages, as
+		 * there can be multiple pages failing verification as part of one IO.
 		 */
-		if (result.status != PGAIO_RS_ERROR
-			&& buf_result.status != PGAIO_RS_OK)
+		if (failed_verification)
 		{
-			result = buf_result;
-			pgaio_result_report(result, td, LOG);
+			if (invalid_count++ == 0)
+				first_invalid_off = buf_off;
 		}
 	}
 
+	/*
+	 * If the smgr read succeeded [partially] and page verification failed for
+	 * some of the pages, adjust the IO's result state appropriately.
+	 */
+	if (prior_result.status != PGAIO_RS_ERROR && invalid_count > 0)
+	{
+		bool		was_zeroed = cb_data & READ_BUFFERS_ZERO_ON_ERROR;
+
+		buffer_readv_encode_error(&result, is_temp, was_zeroed,
+								  first_invalid_off, invalid_count);
+	}
+
 	return result;
 }
 
@@ -6483,28 +6557,66 @@ buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
  * AIO error reporting callback for aio_shared_buffer_readv_cb and
  * aio_local_buffer_readv_cb.
  *
- * Errors are encoded as follows:
- * - the only error is page verification failing
- * - result.error_data is the offset of the first page that failed
- *   verification in a larger IO
+ * The error is encoded / decoded in buffer_readv_encode_error() /
+ * buffer_readv_decode_error().
  */
 static void
 buffer_readv_report(PgAioResult result, const PgAioTargetData *target_data,
 					int elevel)
 {
+	BlockNumber blocknum = target_data->smgr.blockNum;
+	int			nblocks = target_data->smgr.nblocks;
 	ProcNumber	errProc;
+	bool		was_zeroed;
+	uint8		first_invalid_off;
+	uint8		invalid_count;
+	RelPathStr	rpath;
+
+	buffer_readv_decode_error(result, &was_zeroed, &first_invalid_off,
+							  &invalid_count);
 
 	if (target_data->smgr.is_temp)
 		errProc = MyProcNumber;
 	else
 		errProc = INVALID_PROC_NUMBER;
 
-	ereport(elevel,
-			errcode(ERRCODE_DATA_CORRUPTED),
-			errmsg("invalid page in block %u of relation %s",
-				   target_data->smgr.blockNum + result.error_data,
-				   relpathbackend(target_data->smgr.rlocator, errProc,
-								  target_data->smgr.forkNum).str));
+	rpath = relpathbackend(target_data->smgr.rlocator, errProc,
+						   target_data->smgr.forkNum);
+
+	if (was_zeroed)
+	{
+		ereport(elevel,
+				errcode(ERRCODE_DATA_CORRUPTED),
+				invalid_count == 1 ?
+				errmsg("invalid page in block %u of relation %s; zeroing out page",
+					   blocknum + first_invalid_off, rpath.str) :
+				errmsg("zeroing out %u invalid pages among blocks %u..%u of relation %s",
+					   invalid_count,
+					   blocknum, blocknum + nblocks - 1, rpath.str),
+				invalid_count > 1 ?
+				errdetail("Block %u held first invalid page.",
+						  blocknum + first_invalid_off) : 0,
+				invalid_count > 1 ?
+				errhint("See server log for the other %u invalid blocks.",
+						invalid_count - 1) : 0);
+	}
+	else
+	{
+		ereport(elevel,
+				errcode(ERRCODE_DATA_CORRUPTED),
+				invalid_count == 1 ?
+				errmsg("invalid page in block %u of relation %s",
+					   blocknum + first_invalid_off, rpath.str) :
+				errmsg("%u invalid pages among blocks %u..%u of relation %s",
+					   invalid_count,
+					   blocknum, blocknum + nblocks - 1, rpath.str),
+				invalid_count > 1 ?
+				errdetail("Block %u held first invalid page.",
+						  blocknum + first_invalid_off) : 0,
+				invalid_count > 1 ?
+				errhint("See server log for the other %u invalid blocks.",
+						invalid_count - 1) : 0);
+	}
 }
 
 static void
-- 
2.48.1.76.g4e746b1a31.dirty

