From 8e2308b6b9047c42d3906943dec3fd11e975ef72 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 1 Mar 2023 10:53:41 -0500
Subject: [PATCH v1] Return block_ref details as an array

In pg_walinspect, the block_ref column was a string, making it difficult
to access individual members without regular expressions. Now it is an
int array of int arrays for each block with the relevant information.

Note that in the case of a compressed block image, the compression type
is not listed as it is a string.
---
 contrib/pg_walinspect/pg_walinspect--1.0.sql |   6 +-
 contrib/pg_walinspect/pg_walinspect.c        | 135 +++++++++++++++----
 2 files changed, 114 insertions(+), 27 deletions(-)

diff --git a/contrib/pg_walinspect/pg_walinspect--1.0.sql b/contrib/pg_walinspect/pg_walinspect--1.0.sql
index 08b3dd5556..eb8ff82dd8 100644
--- a/contrib/pg_walinspect/pg_walinspect--1.0.sql
+++ b/contrib/pg_walinspect/pg_walinspect--1.0.sql
@@ -17,7 +17,7 @@ CREATE FUNCTION pg_get_wal_record_info(IN in_lsn pg_lsn,
     OUT main_data_length int4,
     OUT fpi_length int4,
     OUT description text,
-    OUT block_ref text
+    OUT block_ref int4[][]
 )
 AS 'MODULE_PATHNAME', 'pg_get_wal_record_info'
 LANGUAGE C STRICT PARALLEL SAFE;
@@ -40,7 +40,7 @@ CREATE FUNCTION pg_get_wal_records_info(IN start_lsn pg_lsn,
     OUT main_data_length int4,
     OUT fpi_length int4,
     OUT description text,
-    OUT block_ref text
+    OUT block_ref int4[][]
 )
 RETURNS SETOF record
 AS 'MODULE_PATHNAME', 'pg_get_wal_records_info'
@@ -63,7 +63,7 @@ CREATE FUNCTION pg_get_wal_records_info_till_end_of_wal(IN start_lsn pg_lsn,
     OUT main_data_length int4,
     OUT fpi_length int4,
     OUT description text,
-    OUT block_ref text
+    OUT block_ref int4[][]
 )
 RETURNS SETOF record
 AS 'MODULE_PATHNAME', 'pg_get_wal_records_info_till_end_of_wal'
diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c
index b7b0a805ee..446449bbe1 100644
--- a/contrib/pg_walinspect/pg_walinspect.c
+++ b/contrib/pg_walinspect/pg_walinspect.c
@@ -22,6 +22,7 @@
 #include "miscadmin.h"
 #include "utils/builtins.h"
 #include "utils/pg_lsn.h"
+#include "utils/array.h"
 
 /*
  * NOTE: For any code change or issue fix here, it is highly recommended to
@@ -57,6 +58,8 @@ static void FillXLogStatsRow(const char *name, uint64 n, uint64 total_count,
 static void GetWalStats(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
 						XLogRecPtr end_lsn, bool stats_per_record);
 static void GetWALFPIInfo(FunctionCallInfo fcinfo, XLogReaderState *record);
+static Datum
+walinspect_get_blkrefs(XLogReaderState *record, uint32 *fpi_data_size);
 
 /*
  * Check if the given LSN is in future. Also, return the LSN up to which the
@@ -174,6 +177,95 @@ ReadNextXLogRecord(XLogReaderState *xlogreader)
 	return record;
 }
 
+/* number of block ref items to display for every block (cols for our array) */
+#define WAL_BLOCK_INFO_TYPES 9
+
+static Datum
+walinspect_get_blkrefs(XLogReaderState *record, uint32 *fpi_data_size)
+{
+	DecodedBkpBlock *block;
+	ArrayType *aout;
+	Datum *cur_result;
+	Datum *result_datums;
+	bool *result_nulls;
+	int dims[2];
+	int lbs[2] = {0,0};
+	int nblocks = 0; /* nblocks is the number of rows of blocks */
+
+	int max_block_id  = XLogRecMaxBlockId(record);
+
+	/* determine how many blocks we have */
+	for (int block_id = 0; block_id <= max_block_id; block_id++)
+	{
+		if (XLogRecHasBlockRef(record, block_id))
+			nblocks++;
+	}
+
+	dims[0] = nblocks;
+	dims[1] = WAL_BLOCK_INFO_TYPES;
+
+	result_datums = palloc0(sizeof(Datum) * WAL_BLOCK_INFO_TYPES * nblocks);
+	result_nulls = palloc0(sizeof(bool) * WAL_BLOCK_INFO_TYPES * nblocks);
+
+	cur_result = result_datums;
+
+	for (int block_id = 0; block_id <= max_block_id; block_id++)
+	{
+		if (!XLogRecHasBlockRef(record, block_id))
+			continue;
+
+		block = XLogRecGetBlock(record, block_id);
+
+		*cur_result++ = Int32GetDatum(block_id);
+		*cur_result++ = Int32GetDatum(block->rlocator.spcOid);
+		*cur_result++ = Int32GetDatum(block->rlocator.dbOid);
+		*cur_result++ = Int32GetDatum(block->rlocator.relNumber);
+		*cur_result++ = Int32GetDatum(block->forknum);
+		*cur_result++ = Int32GetDatum(block->blkno);
+
+		if (!block->has_image)
+		{
+			cur_result += 3;
+			continue;
+		}
+
+		(*fpi_data_size) += block->bimg_len;
+
+		*cur_result++ = Int32GetDatum(block->hole_offset);
+		*cur_result++ = Int32GetDatum(block->hole_length);
+
+		if (!BKPIMAGE_COMPRESSED(block->bimg_info))
+		{
+			cur_result++;
+			continue;
+		}
+
+		*cur_result++ = Int32GetDatum(BLCKSZ - block->hole_length - block->bimg_len);
+	}
+
+	aout = construct_md_array(result_datums, result_nulls, 2, dims, lbs,
+			INT8OID, sizeof(int64), FLOAT8PASSBYVAL, TYPALIGN_DOUBLE);
+
+	PG_RETURN_POINTER(aout);
+}
+
+typedef enum wal_record_cols
+{
+	WAL_RECORD_START_LSN,
+	WAL_RECORD_END_LSN,
+	WAL_RECORD_PREV_LSN,
+	WAL_RECORD_XID,
+	WAL_RECORD_RMGR,
+	WAL_RECORD_TYPE,
+	WAL_RECORD_LENGTH,
+	WAL_RECORD_MAIN_DATA_LENGTH,
+	WAL_RECORD_FPI_LENGTH,
+	WAL_RECORD_DESC,
+	WAL_RECORD_BLOCK_REF
+} wal_record_cols;
+
+#define WAL_RECORD_NUM_COLS (WAL_RECORD_BLOCK_REF + 1)
+
 /*
  * Get a single WAL record info.
  */
@@ -183,40 +275,35 @@ GetWALRecordInfo(XLogReaderState *record, Datum *values,
 {
 	const char *id;
 	RmgrData	desc;
-	uint32		fpi_len = 0;
 	StringInfoData rec_desc;
-	StringInfoData rec_blk_ref;
-	uint32		main_data_len;
-	int			i = 0;
+	Datum blkref_array;
+	uint32		fpi_len = 0;
+
+	values[WAL_RECORD_START_LSN] = LSNGetDatum(record->ReadRecPtr);
+	values[WAL_RECORD_END_LSN] = LSNGetDatum(record->EndRecPtr);
+	values[WAL_RECORD_PREV_LSN] = LSNGetDatum(XLogRecGetPrev(record));
+	values[WAL_RECORD_XID] = TransactionIdGetDatum(XLogRecGetXid(record));
 
 	desc = GetRmgr(XLogRecGetRmid(record));
-	id = desc.rm_identify(XLogRecGetInfo(record));
+	values[WAL_RECORD_RMGR] = CStringGetTextDatum(desc.rm_name);
 
+	id = desc.rm_identify(XLogRecGetInfo(record));
 	if (id == NULL)
 		id = psprintf("UNKNOWN (%x)", XLogRecGetInfo(record) & ~XLR_INFO_MASK);
+	values[WAL_RECORD_TYPE] = CStringGetTextDatum(id);
 
-	initStringInfo(&rec_desc);
-	desc.rm_desc(&rec_desc, record);
-
-	/* Block references. */
-	initStringInfo(&rec_blk_ref);
-	XLogRecGetBlockRefInfo(record, false, true, &rec_blk_ref, &fpi_len);
+	values[WAL_RECORD_LENGTH] = UInt32GetDatum(XLogRecGetTotalLen(record));
+	values[WAL_RECORD_MAIN_DATA_LENGTH] = UInt32GetDatum(XLogRecGetDataLen(record));
 
-	main_data_len = XLogRecGetDataLen(record);
+	blkref_array = walinspect_get_blkrefs(record, &fpi_len);
 
-	values[i++] = LSNGetDatum(record->ReadRecPtr);
-	values[i++] = LSNGetDatum(record->EndRecPtr);
-	values[i++] = LSNGetDatum(XLogRecGetPrev(record));
-	values[i++] = TransactionIdGetDatum(XLogRecGetXid(record));
-	values[i++] = CStringGetTextDatum(desc.rm_name);
-	values[i++] = CStringGetTextDatum(id);
-	values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record));
-	values[i++] = UInt32GetDatum(main_data_len);
-	values[i++] = UInt32GetDatum(fpi_len);
-	values[i++] = CStringGetTextDatum(rec_desc.data);
-	values[i++] = CStringGetTextDatum(rec_blk_ref.data);
+	values[WAL_RECORD_FPI_LENGTH] = UInt32GetDatum(fpi_len);
+	initStringInfo(&rec_desc);
+	desc.rm_desc(&rec_desc, record);
+	values[WAL_RECORD_DESC] = CStringGetTextDatum(rec_desc.data);
+	values[WAL_RECORD_BLOCK_REF] = blkref_array;
 
-	Assert(i == ncols);
+	Assert(ncols == WAL_RECORD_NUM_COLS);
 }
 
 
-- 
2.37.2

