From df9e2356d61c47f4a943d0a21ccc64faf6616cc0 Mon Sep 17 00:00:00 2001
From: Thomas Munro <tmunro@postgresql.org>
Date: Mon, 8 Apr 2024 18:58:57 +1200
Subject: [PATCH v7 2/3] Use vectored I/O in CREATE INDEX.

Commit 8af25652's bulk_write.c was originally designed with the goal of
being able to use vectored writes (along with other batching
opportunities), but it couldn't do that at the time because the vectored
variant of smgrextend() didn't exist yet.  Now it does, so we can
connect the dots.

Concretely, CREATE INDEX commands for several index types now respect
the io_combine_limit setting, when bypassing the buffer pool and writing
new indexes directly to the storage manager.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
Discussion: https://postgr.es/m/30e8f366-58b3-b239-c521-422122dd5150%40iki.fi
---
 src/backend/storage/smgr/bulk_write.c | 93 ++++++++++++++++++++-------
 1 file changed, 69 insertions(+), 24 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 4a10ece4c39..45d66245860 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -8,7 +8,7 @@
  * the regular buffer manager and the bulk loading interface!
  *
  * We bypass the buffer manager to avoid the locking overhead, and call
- * smgrextend() directly.  A downside is that the pages will need to be
+ * smgrwritev() directly.  A downside is that the pages will need to be
  * re-read into shared buffers on first use after the build finishes.  That's
  * usually a good tradeoff for large relations, and for small relations, the
  * overhead isn't very significant compared to creating the relation in the
@@ -225,35 +225,79 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 	for (int i = 0; i < npending; i++)
 	{
-		BlockNumber blkno = pending_writes[i].blkno;
-		Page		page = pending_writes[i].buf->data;
-
+		Page		page;
+		const void *pages[MAX_IO_COMBINE_LIMIT];
+		BlockNumber blkno;
+		int			nblocks;
+		int			max_nblocks;
+
+		/* Prepare to write the first block. */
+		blkno = pending_writes[i].blkno;
+		page = pending_writes[i].buf->data;
 		PageSetChecksumInplace(page, blkno);
+		pages[0] = page;
+		nblocks = 1;
 
-		if (blkno >= bulkstate->pages_written)
+		/*
+		 * If we have to write pages nonsequentially, fill in the space with
+		 * zeroes until we come back and overwrite.  This is not logically
+		 * necessary on standard Unix filesystems (unwritten space will read
+		 * as zeroes anyway), but it should help to avoid fragmentation.  The
+		 * dummy pages aren't WAL-logged though.
+		 */
+		while (blkno > bulkstate->pages_written)
+		{
+			/* don't set checksum for all-zero page */
+			smgrextend(bulkstate->smgr, bulkstate->forknum,
+					   bulkstate->pages_written++,
+					   &zero_buffer,
+					   true);
+		}
+
+		if (blkno < bulkstate->pages_written)
 		{
 			/*
-			 * If we have to write pages nonsequentially, fill in the space
-			 * with zeroes until we come back and overwrite.  This is not
-			 * logically necessary on standard Unix filesystems (unwritten
-			 * space will read as zeroes anyway), but it should help to avoid
-			 * fragmentation.  The dummy pages aren't WAL-logged though.
+			 * We're overwriting.  Clamp at the existing size, because we
+			 * can't mix writing and extending in a single operation.
 			 */
-			while (blkno > bulkstate->pages_written)
-			{
-				/* don't set checksum for all-zero page */
-				smgrextend(bulkstate->smgr, bulkstate->forknum,
-						   bulkstate->pages_written++,
-						   &zero_buffer,
-						   true);
-			}
-
-			smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-			bulkstate->pages_written = pending_writes[i].blkno + 1;
+			max_nblocks = Min(io_combine_limit,
+							  bulkstate->pages_written - blkno);
+		}
+		else
+		{
+			/* We're extending. */
+			Assert(blkno == bulkstate->pages_written);
+			max_nblocks = io_combine_limit;
+		}
+
+		/* Collect as many consecutive blocks as we can. */
+		while (i + 1 < npending &&
+			   pending_writes[i + 1].blkno == blkno + nblocks &&
+			   nblocks < max_nblocks)
+		{
+			page = pending_writes[++i].buf->data;
+			PageSetChecksumInplace(page, pending_writes[i].blkno);
+			pages[nblocks++] = page;
+		}
+
+		/* Extend or overwrite. */
+		if (blkno == bulkstate->pages_written)
+		{
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno,
+					   pages, nblocks,
+					   SMGR_WRITE_SKIP_FSYNC | SMGR_WRITE_EXTEND);
+			bulkstate->pages_written += nblocks;
 		}
 		else
-			smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-		pfree(page);
+		{
+			Assert(blkno + nblocks <= bulkstate->pages_written);
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno,
+					   pages, nblocks,
+					   SMGR_WRITE_SKIP_FSYNC);
+		}
+
+		for (int j = 0; j < nblocks; ++j)
+			pfree(pending_writes[i - j].buf->data);
 	}
 
 	bulkstate->npending = 0;
@@ -277,7 +321,8 @@ smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer
 	w->blkno = blocknum;
 	w->page_std = page_std;
 
-	if (bulkstate->npending == MAX_PENDING_WRITES)
+	if (bulkstate->npending == MAX_PENDING_WRITES ||
+		bulkstate->npending == io_combine_limit)
 		smgr_bulk_flush(bulkstate);
 }
 
-- 
2.44.0

