From bf8f807a6c0d34a3447bb8ef6c7cc014be5c3603 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 9 Mar 2024 16:54:56 +1300
Subject: [PATCH v2 2/2] Use vectored I/O for bulk writes.

Zero-extend using smgrzeroextend().  Extend using smgrextendv().  Write
using smgrwritev().
---
 src/backend/storage/smgr/bulk_write.c | 83 +++++++++++++++++++--------
 1 file changed, 58 insertions(+), 25 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 4a10ece4c3..2a566a80f6 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -8,7 +8,7 @@
  * the regular buffer manager and the bulk loading interface!
  *
  * We bypass the buffer manager to avoid the locking overhead, and call
- * smgrextend() directly.  A downside is that the pages will need to be
+ * smgrextendv() directly.  A downside is that the pages will need to be
  * re-read into shared buffers on first use after the build finishes.  That's
  * usually a good tradeoff for large relations, and for small relations, the
  * overhead isn't very significant compared to creating the relation in the
@@ -45,8 +45,6 @@
 
 #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
 
-static const PGIOAlignedBlock zero_buffer = {{0}};	/* worth BLCKSZ */
-
 typedef struct PendingWrite
 {
 	BulkWriteBuffer buf;
@@ -225,35 +223,70 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 	for (int i = 0; i < npending; i++)
 	{
-		BlockNumber blkno = pending_writes[i].blkno;
-		Page		page = pending_writes[i].buf->data;
-
+		Page		page;
+		const void *pages[16];
+		BlockNumber blkno;
+		int			nblocks;
+		int			max_nblocks;
+
+		/* Prepare to write the first block. */
+		blkno = pending_writes[i].blkno;
+		page = pending_writes[i].buf->data;
 		PageSetChecksumInplace(page, blkno);
+		pages[0] = page;
+		nblocks = 1;
 
-		if (blkno >= bulkstate->pages_written)
+		/* Zero-extend any missing space before the first block. */
+		if (blkno > bulkstate->pages_written)
+		{
+			int			nzeroblocks;
+
+			nzeroblocks = blkno - bulkstate->pages_written;
+			smgrzeroextend(bulkstate->smgr, bulkstate->forknum,
+						   bulkstate->pages_written, nzeroblocks, true);
+			bulkstate->pages_written += nzeroblocks;
+		}
+
+		if (blkno < bulkstate->pages_written)
 		{
 			/*
-			 * If we have to write pages nonsequentially, fill in the space
-			 * with zeroes until we come back and overwrite.  This is not
-			 * logically necessary on standard Unix filesystems (unwritten
-			 * space will read as zeroes anyway), but it should help to avoid
-			 * fragmentation.  The dummy pages aren't WAL-logged though.
+			 * We're overwriting.  Clamp at the existing size, because we
+			 * can't mix writing and extending in a single operation.
 			 */
-			while (blkno > bulkstate->pages_written)
-			{
-				/* don't set checksum for all-zero page */
-				smgrextend(bulkstate->smgr, bulkstate->forknum,
-						   bulkstate->pages_written++,
-						   &zero_buffer,
-						   true);
-			}
-
-			smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-			bulkstate->pages_written = pending_writes[i].blkno + 1;
+			max_nblocks = Min(lengthof(pages),
+							  bulkstate->pages_written - blkno);
+		}
+		else
+		{
+			/* We're extending. */
+			Assert(blkno == bulkstate->pages_written);
+			max_nblocks = lengthof(pages);
+		}
+
+		/* Find as many consecutive blocks as we can. */
+		while (i + 1 < npending &&
+			   pending_writes[i + 1].blkno == blkno + nblocks &&
+			   nblocks < max_nblocks)
+		{
+			page = pending_writes[++i].buf->data;
+			PageSetChecksumInplace(page, pending_writes[i].blkno);
+			pages[nblocks++] = page;
+		}
+
+		/* Extend or overwrite. */
+		if (blkno == bulkstate->pages_written)
+		{
+			smgrextendv(bulkstate->smgr, bulkstate->forknum, blkno, pages, nblocks, true);
+			bulkstate->pages_written += nblocks;
 		}
 		else
-			smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-		pfree(page);
+		{
+			Assert(blkno + nblocks <= bulkstate->pages_written);
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno, pages, nblocks, true);
+		}
+
+		for (int j = 0; j < nblocks; ++j)
+			pfree(pending_writes[i - j].buf->data);
 	}
 
 	bulkstate->npending = 0;
-- 
2.43.2

