From 4d8f8a05156008b12fc3038c9e16b3574192a5af Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 9 Mar 2024 16:54:56 +1300
Subject: [PATCH v5 2/3] Use vectored I/O for bulk writes.

bulk_write.c was originally designed with the goal of being able to
use the new vectored write APIs, but couldn't initially  because the
vectored variant of "smgrextend" didn't exist yet.  Now that
smgrwritev() can also handle extension, we can use it here to get wide
write system calls.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/smgr/bulk_write.c | 90 +++++++++++++++++++--------
 1 file changed, 65 insertions(+), 25 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 4a10ece4c3..8e46249a98 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -8,7 +8,7 @@
  * the regular buffer manager and the bulk loading interface!
  *
  * We bypass the buffer manager to avoid the locking overhead, and call
- * smgrextend() directly.  A downside is that the pages will need to be
+ * smgrwritev() directly.  A downside is that the pages will need to be
  * re-read into shared buffers on first use after the build finishes.  That's
  * usually a good tradeoff for large relations, and for small relations, the
  * overhead isn't very significant compared to creating the relation in the
@@ -45,8 +45,6 @@
 
 #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
 
-static const PGIOAlignedBlock zero_buffer = {{0}};	/* worth BLCKSZ */
-
 typedef struct PendingWrite
 {
 	BulkWriteBuffer buf;
@@ -225,35 +223,77 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 	for (int i = 0; i < npending; i++)
 	{
-		BlockNumber blkno = pending_writes[i].blkno;
-		Page		page = pending_writes[i].buf->data;
-
+		Page		page;
+		const void *pages[16];
+		BlockNumber blkno;
+		int			nblocks;
+		int			max_nblocks;
+
+		/* Prepare to write the first block. */
+		blkno = pending_writes[i].blkno;
+		page = pending_writes[i].buf->data;
 		PageSetChecksumInplace(page, blkno);
+		pages[0] = page;
+		nblocks = 1;
+
+		/* Zero-extend any missing space before the first block. */
+		if (blkno > bulkstate->pages_written)
+		{
+			int			nzeroblocks;
+
+			nzeroblocks = blkno - bulkstate->pages_written;
+			smgrwritev(bulkstate->smgr, bulkstate->forknum,
+					   bulkstate->pages_written, NULL, nzeroblocks,
+					   SMGR_WRITE_SKIP_FSYNC |
+					   SMGR_WRITE_EXTEND |
+					   SMGR_WRITE_ZERO);
+			bulkstate->pages_written += nzeroblocks;
+		}
 
-		if (blkno >= bulkstate->pages_written)
+		if (blkno < bulkstate->pages_written)
 		{
 			/*
-			 * If we have to write pages nonsequentially, fill in the space
-			 * with zeroes until we come back and overwrite.  This is not
-			 * logically necessary on standard Unix filesystems (unwritten
-			 * space will read as zeroes anyway), but it should help to avoid
-			 * fragmentation.  The dummy pages aren't WAL-logged though.
+			 * We're overwriting.  Clamp at the existing size, because we
+			 * can't mix writing and extending in a single operation.
 			 */
-			while (blkno > bulkstate->pages_written)
-			{
-				/* don't set checksum for all-zero page */
-				smgrextend(bulkstate->smgr, bulkstate->forknum,
-						   bulkstate->pages_written++,
-						   &zero_buffer,
-						   true);
-			}
-
-			smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-			bulkstate->pages_written = pending_writes[i].blkno + 1;
+			max_nblocks = Min(lengthof(pages),
+							  bulkstate->pages_written - blkno);
 		}
 		else
-			smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-		pfree(page);
+		{
+			/* We're extending. */
+			Assert(blkno == bulkstate->pages_written);
+			max_nblocks = lengthof(pages);
+		}
+
+		/* Find as many consecutive blocks as we can. */
+		while (i + 1 < npending &&
+			   pending_writes[i + 1].blkno == blkno + nblocks &&
+			   nblocks < max_nblocks)
+		{
+			page = pending_writes[++i].buf->data;
+			PageSetChecksumInplace(page, pending_writes[i].blkno);
+			pages[nblocks++] = page;
+		}
+
+		/* Extend or overwrite. */
+		if (blkno == bulkstate->pages_written)
+		{
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno,
+					   pages, nblocks,
+					   SMGR_WRITE_SKIP_FSYNC | SMGR_WRITE_EXTEND);
+			bulkstate->pages_written += nblocks;
+		}
+		else
+		{
+			Assert(blkno + nblocks <= bulkstate->pages_written);
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno,
+					   pages, nblocks,
+					   SMGR_WRITE_SKIP_FSYNC);
+		}
+
+		for (int j = 0; j < nblocks; ++j)
+			pfree(pending_writes[i - j].buf->data);
 	}
 
 	bulkstate->npending = 0;
-- 
2.44.0

