From 69d095e9050f7f747cc2586fc9d2900f3bee7380 Mon Sep 17 00:00:00 2001
From: Noah Misch <noah@leadboat.com>
Date: Tue, 3 Sep 2024 10:46:20 -0700
Subject: [PATCH v6 2/2] Optimize pg_visibility with read streams.

We've measured 5% performance improvement, and this arranges to benefit
automatically from future optimizations to the read_stream subsystem.

Nazir Bilal Yavuz

Discussion: https://postgr.es/m/CAN55FZ1_Ru3XpMgTwsU67FTH2fs_FrRROmb7x6zs+F44QBEiww@mail.gmail.com
---
 contrib/pg_visibility/pg_visibility.c | 113 +++++++++++++++++++++-----
 1 file changed, 92 insertions(+), 21 deletions(-)

diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c
index 773ba92e454..724122b1bc5 100644
--- a/contrib/pg_visibility/pg_visibility.c
+++ b/contrib/pg_visibility/pg_visibility.c
@@ -21,6 +21,7 @@
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "storage/read_stream.h"
 #include "storage/smgr.h"
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
@@ -41,6 +42,17 @@ typedef struct corrupt_items
 	ItemPointer tids;
 } corrupt_items;
 
+/* for collect_corrupt_items_read_stream_next_block */
+struct collect_corrupt_items_read_stream_private
+{
+	bool		all_frozen;
+	bool		all_visible;
+	BlockNumber current_blocknum;
+	BlockNumber last_exclusive;
+	Relation	rel;
+	Buffer		vmbuffer;
+};
+
 PG_FUNCTION_INFO_V1(pg_visibility_map);
 PG_FUNCTION_INFO_V1(pg_visibility_map_rel);
 PG_FUNCTION_INFO_V1(pg_visibility);
@@ -478,6 +490,8 @@ collect_visibility_data(Oid relid, bool include_pd)
 	BlockNumber blkno;
 	Buffer		vmbuffer = InvalidBuffer;
 	BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
+	BlockRangeReadStreamPrivate p;
+	ReadStream *stream = NULL;
 
 	rel = relation_open(relid, AccessShareLock);
 
@@ -489,6 +503,20 @@ collect_visibility_data(Oid relid, bool include_pd)
 	info->next = 0;
 	info->count = nblocks;
 
+	/* Create a stream if reading main fork. */
+	if (include_pd)
+	{
+		p.current_blocknum = 0;
+		p.last_exclusive = nblocks;
+		stream = read_stream_begin_relation(READ_STREAM_FULL,
+											bstrategy,
+											rel,
+											MAIN_FORKNUM,
+											block_range_read_stream_cb,
+											&p,
+											0);
+	}
+
 	for (blkno = 0; blkno < nblocks; ++blkno)
 	{
 		int32		mapbits;
@@ -513,8 +541,7 @@ collect_visibility_data(Oid relid, bool include_pd)
 			Buffer		buffer;
 			Page		page;
 
-			buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
-										bstrategy);
+			buffer = read_stream_next_buffer(stream, NULL);
 			LockBuffer(buffer, BUFFER_LOCK_SHARE);
 
 			page = BufferGetPage(buffer);
@@ -525,6 +552,12 @@ collect_visibility_data(Oid relid, bool include_pd)
 		}
 	}
 
+	if (include_pd)
+	{
+		Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+		read_stream_end(stream);
+	}
+
 	/* Clean up. */
 	if (vmbuffer != InvalidBuffer)
 		ReleaseBuffer(vmbuffer);
@@ -610,6 +643,38 @@ GetStrictOldestNonRemovableTransactionId(Relation rel)
 	}
 }
 
+/*
+ * Callback function to get next block for read stream object used in
+ * collect_corrupt_items() function.
+ */
+static BlockNumber
+collect_corrupt_items_read_stream_next_block(ReadStream *stream,
+											 void *callback_private_data,
+											 void *per_buffer_data)
+{
+	struct collect_corrupt_items_read_stream_private *p = callback_private_data;
+
+	for (; p->current_blocknum < p->last_exclusive; p->current_blocknum++)
+	{
+		bool		check_frozen = false;
+		bool		check_visible = false;
+
+		/* Make sure we are interruptible. */
+		CHECK_FOR_INTERRUPTS();
+
+		if (p->all_frozen && VM_ALL_FROZEN(p->rel, p->current_blocknum, &p->vmbuffer))
+			check_frozen = true;
+		if (p->all_visible && VM_ALL_VISIBLE(p->rel, p->current_blocknum, &p->vmbuffer))
+			check_visible = true;
+		if (!check_visible && !check_frozen)
+			continue;
+
+		return p->current_blocknum++;
+	}
+
+	return InvalidBlockNumber;
+}
+
 /*
  * Returns a list of items whose visibility map information does not match
  * the status of the tuples on the page.
@@ -628,12 +693,13 @@ static corrupt_items *
 collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 {
 	Relation	rel;
-	BlockNumber nblocks;
 	corrupt_items *items;
-	BlockNumber blkno;
 	Buffer		vmbuffer = InvalidBuffer;
 	BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
 	TransactionId OldestXmin = InvalidTransactionId;
+	struct collect_corrupt_items_read_stream_private p;
+	ReadStream *stream;
+	Buffer		buffer;
 
 	rel = relation_open(relid, AccessShareLock);
 
@@ -643,8 +709,6 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 	if (all_visible)
 		OldestXmin = GetStrictOldestNonRemovableTransactionId(rel);
 
-	nblocks = RelationGetNumberOfBlocks(rel);
-
 	/*
 	 * Guess an initial array size. We don't expect many corrupted tuples, so
 	 * start with a small array.  This function uses the "next" field to track
@@ -658,34 +722,38 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 	items->count = 64;
 	items->tids = palloc(items->count * sizeof(ItemPointerData));
 
+	p.current_blocknum = 0;
+	p.last_exclusive = RelationGetNumberOfBlocks(rel);
+	p.rel = rel;
+	p.vmbuffer = InvalidBuffer;
+	p.all_frozen = all_frozen;
+	p.all_visible = all_visible;
+	stream = read_stream_begin_relation(READ_STREAM_FULL,
+										bstrategy,
+										rel,
+										MAIN_FORKNUM,
+										collect_corrupt_items_read_stream_next_block,
+										&p,
+										0);
+
 	/* Loop over every block in the relation. */
-	for (blkno = 0; blkno < nblocks; ++blkno)
+	while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
 	{
-		bool		check_frozen = false;
-		bool		check_visible = false;
-		Buffer		buffer;
+		bool		check_frozen = all_frozen;
+		bool		check_visible = all_visible;
 		Page		page;
 		OffsetNumber offnum,
 					maxoff;
+		BlockNumber blkno;
 
 		/* Make sure we are interruptible. */
 		CHECK_FOR_INTERRUPTS();
 
-		/* Use the visibility map to decide whether to check this page. */
-		if (all_frozen && VM_ALL_FROZEN(rel, blkno, &vmbuffer))
-			check_frozen = true;
-		if (all_visible && VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
-			check_visible = true;
-		if (!check_visible && !check_frozen)
-			continue;
-
-		/* Read and lock the page. */
-		buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
-									bstrategy);
 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
 
 		page = BufferGetPage(buffer);
 		maxoff = PageGetMaxOffsetNumber(page);
+		blkno = BufferGetBlockNumber(buffer);
 
 		/*
 		 * The visibility map bits might have changed while we were acquiring
@@ -778,10 +846,13 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 
 		UnlockReleaseBuffer(buffer);
 	}
+	read_stream_end(stream);
 
 	/* Clean up. */
 	if (vmbuffer != InvalidBuffer)
 		ReleaseBuffer(vmbuffer);
+	if (p.vmbuffer != InvalidBuffer)
+		ReleaseBuffer(p.vmbuffer);
 	relation_close(rel, AccessShareLock);
 
 	/*
-- 
2.45.2

