From c2ca448331460c813085a0a6ff09d53ef9f14b37 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 12 Feb 2021 14:19:10 -0800
Subject: [PATCH v1 10/14] WIP: Use streaming reads in nbtree vacuum scan.

XXX Cherry-picked from https://github.com/anarazel/postgres/tree/aio and
lightly modified by TM, for demonstration purposes.

Author: Andres Freund <andres@anarazel.de>
---
 src/backend/access/nbtree/nbtree.c | 77 +++++++++++++++++++++++++-----
 src/include/access/nbtree.h        |  3 ++
 2 files changed, 67 insertions(+), 13 deletions(-)

diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 62bc9917f1..5b72144fd4 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -34,6 +34,7 @@
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
+#include "storage/streaming_read.h"
 #include "utils/builtins.h"
 #include "utils/index_selfuncs.h"
 #include "utils/memutils.h"
@@ -81,7 +82,7 @@ typedef struct BTParallelScanDescData *BTParallelScanDesc;
 static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 						 IndexBulkDeleteCallback callback, void *callback_state,
 						 BTCycleId cycleid);
-static void btvacuumpage(BTVacState *vstate, BlockNumber scanblkno);
+static void btvacuumpage(BTVacState *vstate, Buffer buf, BlockNumber scanblkno);
 static BTVacuumPosting btreevacuumposting(BTVacState *vstate,
 										  IndexTuple posting,
 										  OffsetNumber updatedoffset,
@@ -890,6 +891,31 @@ btvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 	return stats;
 }
 
+static bool
+btvacuumscan_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
+					   void *io_private,
+					   BufferManagerRelation *bmr, ForkNumber *fork,
+					   BlockNumber *block, ReadBufferMode *mode)
+{
+	BTVacState *vstate = (BTVacState *) pgsr_private;
+
+	if (vstate->nextblock == vstate->num_pages)
+		return false;
+
+	/*
+	 * We can't use _bt_getbuf() here because it always applies
+	 * _bt_checkpage(), which will barf on an all-zero page. We want to
+	 * recycle all-zero pages, not fail.  Also, we want to use a nondefault
+	 * buffer access strategy. Also want to use AIO.
+	 */
+	*bmr = BMR_REL(vstate->info->index);
+	*fork = MAIN_FORKNUM;
+	*block = vstate->nextblock++;
+	*mode = RBM_NORMAL;
+
+	return true;
+}
+
 /*
  * btvacuumscan --- scan the index for VACUUMing purposes
  *
@@ -983,6 +1009,8 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	scanblkno = BTREE_METAPAGE + 1;
 	for (;;)
 	{
+		PgStreamingRead *pgsr;
+
 		/* Get the current relation length */
 		if (needLock)
 			LockRelationForExtension(rel, ExclusiveLock);
@@ -997,14 +1025,36 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 		/* Quit if we've scanned the whole relation */
 		if (scanblkno >= num_pages)
 			break;
+
+		vstate.nextblock = scanblkno;
+		vstate.num_pages = num_pages;
+
+		pgsr = pg_streaming_read_buffer_alloc(512, 0, (uintptr_t) &vstate,
+											  vstate.info->strategy,
+											  btvacuumscan_pgsr_next);
+
 		/* Iterate over pages, then loop back to recheck length */
 		for (; scanblkno < num_pages; scanblkno++)
 		{
-			btvacuumpage(&vstate, scanblkno);
+			Buffer		buf = pg_streaming_read_buffer_get_next(pgsr, NULL);
+
+			Assert(BufferIsValid(buf));
+			Assert(BufferGetBlockNumber(buf) == scanblkno);
+
+			btvacuumpage(&vstate, buf, scanblkno);
+
 			if (info->report_progress)
 				pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_DONE,
 											 scanblkno);
+
+			/* call vacuum_delay_point while not holding any buffer lock */
+			vacuum_delay_point();
 		}
+
+		if (pg_streaming_read_buffer_get_next(pgsr, NULL) != 0)
+			elog(ERROR, "aio and plain pos out of sync");
+
+		pg_streaming_read_free(pgsr);
 	}
 
 	/* Set statistics num_pages field to final size of index */
@@ -1037,7 +1087,7 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
  * recycled (i.e. before the page split).
  */
 static void
-btvacuumpage(BTVacState *vstate, BlockNumber scanblkno)
+btvacuumpage(BTVacState *vstate, Buffer buf, BlockNumber scanblkno)
 {
 	IndexVacuumInfo *info = vstate->info;
 	IndexBulkDeleteResult *stats = vstate->stats;
@@ -1048,28 +1098,28 @@ btvacuumpage(BTVacState *vstate, BlockNumber scanblkno)
 	bool		attempt_pagedel;
 	BlockNumber blkno,
 				backtrack_to;
-	Buffer		buf;
 	Page		page;
 	BTPageOpaque opaque;
 
 	blkno = scanblkno;
 
+	Assert(BufferIsValid(buf));
 backtrack:
 
 	attempt_pagedel = false;
 	backtrack_to = P_NONE;
 
-	/* call vacuum_delay_point while not holding any buffer lock */
-	vacuum_delay_point();
-
 	/*
-	 * We can't use _bt_getbuf() here because it always applies
-	 * _bt_checkpage(), which will barf on an all-zero page. We want to
-	 * recycle all-zero pages, not fail.  Also, we want to use a nondefault
-	 * buffer access strategy.
+	 * If we're not backtracking we already read the page asynchronously. Not
+	 * using AIO when backtracking isn't too tragic - it's relatively rare,
+	 * and the buffer quite possibly is still in shared buffers.
 	 */
-	buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
-							 info->strategy);
+	if (!BufferIsValid(buf))
+		buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
+								 info->strategy);
+
+	Assert(BufferGetBlockNumber(buf) == blkno);
+
 	_bt_lockbuf(rel, buf, BT_READ);
 	page = BufferGetPage(buf);
 	opaque = NULL;
@@ -1356,6 +1406,7 @@ backtrack:
 	if (backtrack_to != P_NONE)
 	{
 		blkno = backtrack_to;
+		buf = InvalidBuffer;
 		goto backtrack;
 	}
 }
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index 8891fa7973..51009d24bd 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -343,6 +343,9 @@ typedef struct BTVacState
 	int			maxbufsize;		/* max bufsize that respects work_mem */
 	BTPendingFSM *pendingpages; /* One entry per newly deleted page */
 	int			npendingpages;	/* current # valid pendingpages */
+
+	BlockNumber nextblock;
+	BlockNumber num_pages;
 } BTVacState;
 
 /*
-- 
2.39.2

