Vectored I/O in bulk_write.c

Started by Thomas Munroalmost 2 years ago16 messages
#1Thomas Munro
thomas.munro@gmail.com
2 attachment(s)

Hi,

I was trying to learn enough about the new bulk_write.c to figure out
what might be going wrong over at [1]/messages/by-id/CA+hUKGK+5DOmLaBp3Z7C4S-Yv6yoROvr1UncjH2S1ZbPT8D+Zg@mail.gmail.com, and finished up doing this
exercise, which is experiment quality but passes basic tests. It's a
bit like v1-0013 and v1-0014's experimental vectored checkpointing
from [2]/messages/by-id/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6uT5TUm2gkvA@mail.gmail.com (which themselves are not currently proposed, that too was in
the experiment category), but this usage is a lot simpler and might be
worth considering. Presumably both things would eventually finish up
being done by (not yet proposed) streaming write, but could also be
done directly in this simple case.

This way, CREATE INDEX generates 128kB pwritev() calls instead of 8kB
pwrite() calls. (There's a magic 16 in there, we'd probably need to
think harder about that.) It'd be better if bulk_write.c's memory
management were improved: if buffers were mostly contiguous neighbours
instead of being separately palloc'd objects, you'd probably often get
128kB pwrite() instead of pwritev(), which might be marginally more
efficient.

This made me wonder why smgrwritev() and smgrextendv() shouldn't be
backed by the same implementation, since they are essentially the same
operation. The differences are some assertions which might as well be
moved up to the smgr.c level as they must surely apply to any future
smgr implementation too, right?, and the segment file creation policy
which can be controlled with a new argument. I tried that here. An
alternative would be for md.c to have a workhorse function that both
mdextendv() and mdwritev() call, but I'm not sure if there's much
point in that.

While thinking about that I realised that an existing write-or-extend
assertion in master is wrong because it doesn't add nblocks.

Hmm, it's a bit weird that we have nblocks as int or BlockNumber in
various places, which I think should probably be fixed.

[1]: /messages/by-id/CA+hUKGK+5DOmLaBp3Z7C4S-Yv6yoROvr1UncjH2S1ZbPT8D+Zg@mail.gmail.com
[2]: /messages/by-id/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6uT5TUm2gkvA@mail.gmail.com

Attachments:

0001-Provide-vectored-variant-of-smgrextend.patchtext/x-patch; charset=US-ASCII; name=0001-Provide-vectored-variant-of-smgrextend.patchDownload
From 4611cb121bbfa787ddbba4bc0e80ac6c732345d0 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 9 Mar 2024 16:04:21 +1300
Subject: [PATCH 1/2] Provide vectored variant of smgrextend().

Since mdwrite() and mdextend() were basically the same, merge them.
They had different assertions, but those would surely apply to any
implementation of smgr, so move them up to the smgr.c wrapper
level.  The other difference was the policy on segment creation, but
that can be captured by having smgrwritev() and smgrextendv() call a
single mdwritev() function with a new "extend" flag.
---
 src/backend/storage/smgr/md.c   | 91 ++++-----------------------------
 src/backend/storage/smgr/smgr.c | 28 ++++++----
 src/include/storage/md.h        |  3 +-
 src/include/storage/smgr.h      | 12 ++++-
 4 files changed, 40 insertions(+), 94 deletions(-)

diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index bf0f3ca76d..80716f11e1 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -447,74 +447,6 @@ mdunlinkfork(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo)
 	pfree(path);
 }
 
-/*
- * mdextend() -- Add a block to the specified relation.
- *
- * The semantics are nearly the same as mdwrite(): write at the
- * specified position.  However, this is to be used for the case of
- * extending a relation (i.e., blocknum is at or beyond the current
- * EOF).  Note that we assume writing a block beyond current EOF
- * causes intervening file space to become filled with zeroes.
- */
-void
-mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void *buffer, bool skipFsync)
-{
-	off_t		seekpos;
-	int			nbytes;
-	MdfdVec    *v;
-
-	/* If this build supports direct I/O, the buffer must be I/O aligned. */
-	if (PG_O_DIRECT != 0 && PG_IO_ALIGN_SIZE <= BLCKSZ)
-		Assert((uintptr_t) buffer == TYPEALIGN(PG_IO_ALIGN_SIZE, buffer));
-
-	/* This assert is too expensive to have on normally ... */
-#ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum >= mdnblocks(reln, forknum));
-#endif
-
-	/*
-	 * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any
-	 * more --- we mustn't create a block whose number actually is
-	 * InvalidBlockNumber.  (Note that this failure should be unreachable
-	 * because of upstream checks in bufmgr.c.)
-	 */
-	if (blocknum == InvalidBlockNumber)
-		ereport(ERROR,
-				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-				 errmsg("cannot extend file \"%s\" beyond %u blocks",
-						relpath(reln->smgr_rlocator, forknum),
-						InvalidBlockNumber)));
-
-	v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_CREATE);
-
-	seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
-
-	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
-
-	if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ)
-	{
-		if (nbytes < 0)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not extend file \"%s\": %m",
-							FilePathName(v->mdfd_vfd)),
-					 errhint("Check free disk space.")));
-		/* short write: complain appropriately */
-		ereport(ERROR,
-				(errcode(ERRCODE_DISK_FULL),
-				 errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u",
-						FilePathName(v->mdfd_vfd),
-						nbytes, BLCKSZ, blocknum),
-				 errhint("Check free disk space.")));
-	}
-
-	if (!skipFsync && !SmgrIsTemp(reln))
-		register_dirty_segment(reln, forknum, v);
-
-	Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
-}
-
 /*
  * mdzeroextend() -- Add new zeroed out blocks to the specified relation.
  *
@@ -920,19 +852,14 @@ mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 /*
  * mdwritev() -- Write the supplied blocks at the appropriate location.
  *
- * This is to be used only for updating already-existing blocks of a
- * relation (ie, those before the current EOF).  To extend a relation,
- * use mdextend().
+ * Note that smgrextendv() and smgrwritev() are different operations, but both
+ * are handled here.
  */
 void
 mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void **buffers, BlockNumber nblocks, bool skipFsync)
+		 const void **buffers, BlockNumber nblocks,
+		 bool extend, bool skipFsync)
 {
-	/* This assert is too expensive to have on normally ... */
-#ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum < mdnblocks(reln, forknum));
-#endif
-
 	while (nblocks > 0)
 	{
 		struct iovec iov[PG_IOV_MAX];
@@ -945,6 +872,8 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 		size_t		size_this_segment;
 
 		v = _mdfd_getseg(reln, forknum, blocknum, skipFsync,
+						 extend ?
+						 EXTENSION_CREATE :
 						 EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
 
 		seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
@@ -1638,7 +1567,7 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 		{
 			/*
 			 * Normally we will create new segments only if authorized by the
-			 * caller (i.e., we are doing mdextend()).  But when doing WAL
+			 * caller (i.e., we are doing smgrextend()).  But when doing WAL
 			 * recovery, create segments anyway; this allows cases such as
 			 * replaying WAL data that has a write into a high-numbered
 			 * segment of a relation that was later deleted. We want to go
@@ -1655,9 +1584,9 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 				char	   *zerobuf = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE,
 													 MCXT_ALLOC_ZERO);
 
-				mdextend(reln, forknum,
-						 nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
-						 zerobuf, skipFsync);
+				smgrextend(reln, forknum,
+						   nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
+						   zerobuf, skipFsync);
 				pfree(zerobuf);
 			}
 			flags = O_CREAT;
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index a5b18328b8..3287fc26d1 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -82,8 +82,6 @@ typedef struct f_smgr
 	bool		(*smgr_exists) (SMgrRelation reln, ForkNumber forknum);
 	void		(*smgr_unlink) (RelFileLocatorBackend rlocator, ForkNumber forknum,
 								bool isRedo);
-	void		(*smgr_extend) (SMgrRelation reln, ForkNumber forknum,
-								BlockNumber blocknum, const void *buffer, bool skipFsync);
 	void		(*smgr_zeroextend) (SMgrRelation reln, ForkNumber forknum,
 									BlockNumber blocknum, int nblocks, bool skipFsync);
 	bool		(*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum,
@@ -94,7 +92,7 @@ typedef struct f_smgr
 	void		(*smgr_writev) (SMgrRelation reln, ForkNumber forknum,
 								BlockNumber blocknum,
 								const void **buffers, BlockNumber nblocks,
-								bool skipFsync);
+								bool extend, bool skipFsync);
 	void		(*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
 								   BlockNumber blocknum, BlockNumber nblocks);
 	BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
@@ -114,7 +112,6 @@ static const f_smgr smgrsw[] = {
 		.smgr_create = mdcreate,
 		.smgr_exists = mdexists,
 		.smgr_unlink = mdunlink,
-		.smgr_extend = mdextend,
 		.smgr_zeroextend = mdzeroextend,
 		.smgr_prefetch = mdprefetch,
 		.smgr_readv = mdreadv,
@@ -525,7 +522,7 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
 
 
 /*
- * smgrextend() -- Add a new block to a file.
+ * smgrextendv() -- Add new blocks to a file.
  *
  * The semantics are nearly the same as smgrwrite(): write at the
  * specified position.  However, this is to be used for the case of
@@ -534,11 +531,16 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
  * causes intervening file space to become filled with zeroes.
  */
 void
-smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		   const void *buffer, bool skipFsync)
+smgrextendv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+			const void **buffers, BlockNumber nblocks, bool skipFsync)
 {
-	smgrsw[reln->smgr_which].smgr_extend(reln, forknum, blocknum,
-										 buffer, skipFsync);
+	/* This assert is too expensive to have on normally ... */
+#ifdef CHECK_WRITE_VS_EXTEND
+	Assert(blocknum >= smgrnblocks(reln, forknum));
+#endif
+
+	smgrsw[reln->smgr_which].smgr_writev(reln, forknum, blocknum,
+										 buffers, nblocks, true, skipFsync);
 
 	/*
 	 * Normally we expect this to increase nblocks by one, but if the cached
@@ -633,8 +635,14 @@ void
 smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 		   const void **buffers, BlockNumber nblocks, bool skipFsync)
 {
+	/* This assert is too expensive to have on normally ... */
+#ifdef CHECK_WRITE_VS_EXTEND
+	Assert(blocknum + nblocks <= smgrnblocks(reln, forknum));
+#endif
+
 	smgrsw[reln->smgr_which].smgr_writev(reln, forknum, blocknum,
-										 buffers, nblocks, skipFsync);
+										 buffers, nblocks, false,
+										 skipFsync);
 }
 
 /*
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 620f10abde..8746a48a5f 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -36,7 +36,8 @@ extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 					void **buffers, BlockNumber nblocks);
 extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
 					 BlockNumber blocknum,
-					 const void **buffers, BlockNumber nblocks, bool skipFsync);
+					 const void **buffers, BlockNumber nblocks,
+					 bool extend, bool skipFsync);
 extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
 						BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index fc5f883ce1..7ffea4332d 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -86,8 +86,9 @@ extern void smgrreleaserellocator(RelFileLocatorBackend rlocator);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
-extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
-					   BlockNumber blocknum, const void *buffer, bool skipFsync);
+extern void smgrextendv(SMgrRelation reln, ForkNumber forknum,
+						BlockNumber blocknum,
+						const void **buffers, BlockNumber nblocks, bool skipFsync);
 extern void smgrzeroextend(SMgrRelation reln, ForkNumber forknum,
 						   BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum,
@@ -124,4 +125,11 @@ smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
 }
 
+static inline void
+smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+		   const void *buffer, bool skipFsync)
+{
+	smgrextendv(reln, forknum, blocknum, &buffer, 1, skipFsync);
+}
+
 #endif							/* SMGR_H */
-- 
2.43.2

0002-Use-vectored-I-O-for-bulk-writes.patchtext/x-patch; charset=US-ASCII; name=0002-Use-vectored-I-O-for-bulk-writes.patchDownload
From 3823d8a25fbf41ea2abf396329ce465cca74727b Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 9 Mar 2024 16:54:56 +1300
Subject: [PATCH 2/2] Use vectored I/O for bulk writes.

Zero-extend using smgrzeroextend().  Extend using smgrextendv().  Write
using smgrwritev().
---
 src/backend/storage/smgr/bulk_write.c | 83 +++++++++++++++++++--------
 1 file changed, 58 insertions(+), 25 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 4a10ece4c3..500a14fe30 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -8,7 +8,7 @@
  * the regular buffer manager and the bulk loading interface!
  *
  * We bypass the buffer manager to avoid the locking overhead, and call
- * smgrextend() directly.  A downside is that the pages will need to be
+ * smgrextendv() directly.  A downside is that the pages will need to be
  * re-read into shared buffers on first use after the build finishes.  That's
  * usually a good tradeoff for large relations, and for small relations, the
  * overhead isn't very significant compared to creating the relation in the
@@ -45,8 +45,6 @@
 
 #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
 
-static const PGIOAlignedBlock zero_buffer = {{0}};	/* worth BLCKSZ */
-
 typedef struct PendingWrite
 {
 	BulkWriteBuffer buf;
@@ -225,35 +223,70 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 	for (int i = 0; i < npending; i++)
 	{
-		BlockNumber blkno = pending_writes[i].blkno;
-		Page		page = pending_writes[i].buf->data;
-
+		Page		page;
+		const void *pages[16];
+		BlockNumber blkno;
+		int			nblocks;
+		int			max_nblocks;
+
+		/* Prepare to write the first block. */
+		blkno = pending_writes[i].blkno;
+		page = pending_writes[i].buf->data;
 		PageSetChecksumInplace(page, blkno);
+		pages[0] = page;
+		nblocks = 1;
 
-		if (blkno >= bulkstate->pages_written)
+		/* Zero-extend any missing space before the first block. */
+		if (blkno > bulkstate->pages_written)
+		{
+			int			nzeroblocks;
+
+			nzeroblocks = blkno - bulkstate->pages_written;
+			smgrzeroextend(bulkstate->smgr, bulkstate->forknum,
+						   bulkstate->pages_written, nzeroblocks, true);
+			bulkstate->pages_written += nzeroblocks;
+		}
+
+		if (blkno < bulkstate->pages_written)
 		{
 			/*
-			 * If we have to write pages nonsequentially, fill in the space
-			 * with zeroes until we come back and overwrite.  This is not
-			 * logically necessary on standard Unix filesystems (unwritten
-			 * space will read as zeroes anyway), but it should help to avoid
-			 * fragmentation.  The dummy pages aren't WAL-logged though.
+			 * We're overwriting.  Clamp at the existing size, because we can't
+			 * mix writing and extending in a single operation.
 			 */
-			while (blkno > bulkstate->pages_written)
-			{
-				/* don't set checksum for all-zero page */
-				smgrextend(bulkstate->smgr, bulkstate->forknum,
-						   bulkstate->pages_written++,
-						   &zero_buffer,
-						   true);
-			}
-
-			smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-			bulkstate->pages_written = pending_writes[i].blkno + 1;
+			max_nblocks = Min(lengthof(pages),
+							  bulkstate->pages_written - blkno);
+		}
+		else
+		{
+			/* We're extending. */
+			Assert(blkno == bulkstate->pages_written);
+			max_nblocks = lengthof(pages);
+		}
+
+		/* Find as many consecutive blocks as we can. */
+		while (i + 1 < npending &&
+			   pending_writes[i + 1].blkno == blkno + nblocks &&
+			   nblocks < max_nblocks)
+		{
+			page = pending_writes[++i].buf->data;
+			PageSetChecksumInplace(page, pending_writes[i].blkno);
+			pages[nblocks++] = page;
+		}
+
+		/* Extend or overwrite. */
+		if (blkno == bulkstate->pages_written)
+		{
+			smgrextendv(bulkstate->smgr, bulkstate->forknum, blkno, pages, nblocks, true);
+			bulkstate->pages_written += nblocks;
 		}
 		else
-			smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-		pfree(page);
+		{
+			Assert(blkno + nblocks <= bulkstate->pages_written);
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno, pages, nblocks, true);
+		}
+
+		for (int j = 0; j < nblocks; ++j)
+			pfree(pending_writes[i - j].buf->data);
 	}
 
 	bulkstate->npending = 0;
-- 
2.43.2

#2Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#1)
2 attachment(s)
Re: Vectored I/O in bulk_write.c

Slightly better version, adjusting a few obsoleted comments, adjusting
error message to distinguish write/extend, fixing a thinko in
smgr_cached_nblocks maintenance.

Attachments:

v2-0001-Provide-vectored-variant-of-smgrextend.patchtext/x-patch; charset=US-ASCII; name=v2-0001-Provide-vectored-variant-of-smgrextend.patchDownload
From c786f979b0c38364775e32b9403b79303507d37b Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 9 Mar 2024 16:04:21 +1300
Subject: [PATCH v2 1/2] Provide vectored variant of smgrextend().

Since mdwrite() and mdextend() were basically the same, merge them.
They had different assertions, but those would surely apply to any
implementation of smgr, so move them up to the smgr.c wrapper
level.  The other difference was the policy on segment creation, but
that can be captured by having smgrwritev() and smgrextendv() call a
single mdwritev() function with a new "extend" flag.
---
 src/backend/storage/smgr/md.c   | 106 +++++---------------------------
 src/backend/storage/smgr/smgr.c |  32 ++++++----
 src/include/storage/md.h        |   3 +-
 src/include/storage/smgr.h      |  12 +++-
 4 files changed, 47 insertions(+), 106 deletions(-)

diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index bf0f3ca76d..9b12584122 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -447,79 +447,10 @@ mdunlinkfork(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo)
 	pfree(path);
 }
 
-/*
- * mdextend() -- Add a block to the specified relation.
- *
- * The semantics are nearly the same as mdwrite(): write at the
- * specified position.  However, this is to be used for the case of
- * extending a relation (i.e., blocknum is at or beyond the current
- * EOF).  Note that we assume writing a block beyond current EOF
- * causes intervening file space to become filled with zeroes.
- */
-void
-mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void *buffer, bool skipFsync)
-{
-	off_t		seekpos;
-	int			nbytes;
-	MdfdVec    *v;
-
-	/* If this build supports direct I/O, the buffer must be I/O aligned. */
-	if (PG_O_DIRECT != 0 && PG_IO_ALIGN_SIZE <= BLCKSZ)
-		Assert((uintptr_t) buffer == TYPEALIGN(PG_IO_ALIGN_SIZE, buffer));
-
-	/* This assert is too expensive to have on normally ... */
-#ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum >= mdnblocks(reln, forknum));
-#endif
-
-	/*
-	 * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any
-	 * more --- we mustn't create a block whose number actually is
-	 * InvalidBlockNumber.  (Note that this failure should be unreachable
-	 * because of upstream checks in bufmgr.c.)
-	 */
-	if (blocknum == InvalidBlockNumber)
-		ereport(ERROR,
-				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-				 errmsg("cannot extend file \"%s\" beyond %u blocks",
-						relpath(reln->smgr_rlocator, forknum),
-						InvalidBlockNumber)));
-
-	v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_CREATE);
-
-	seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
-
-	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
-
-	if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ)
-	{
-		if (nbytes < 0)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not extend file \"%s\": %m",
-							FilePathName(v->mdfd_vfd)),
-					 errhint("Check free disk space.")));
-		/* short write: complain appropriately */
-		ereport(ERROR,
-				(errcode(ERRCODE_DISK_FULL),
-				 errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u",
-						FilePathName(v->mdfd_vfd),
-						nbytes, BLCKSZ, blocknum),
-				 errhint("Check free disk space.")));
-	}
-
-	if (!skipFsync && !SmgrIsTemp(reln))
-		register_dirty_segment(reln, forknum, v);
-
-	Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
-}
-
 /*
  * mdzeroextend() -- Add new zeroed out blocks to the specified relation.
  *
- * Similar to mdextend(), except the relation can be extended by multiple
- * blocks at once and the added blocks will be filled with zeroes.
+ * The added blocks will be filled with zeroes.
  */
 void
 mdzeroextend(SMgrRelation reln, ForkNumber forknum,
@@ -595,13 +526,7 @@ mdzeroextend(SMgrRelation reln, ForkNumber forknum,
 		{
 			int			ret;
 
-			/*
-			 * Even if we don't want to use fallocate, we can still extend a
-			 * bit more efficiently than writing each 8kB block individually.
-			 * pg_pwrite_zeros() (via FileZero()) uses pg_pwritev_with_retry()
-			 * to avoid multiple writes or needing a zeroed buffer for the
-			 * whole length of the extension.
-			 */
+			/* Fall back to writing out zeroes. */
 			ret = FileZero(v->mdfd_vfd,
 						   seekpos, (off_t) BLCKSZ * numblocks,
 						   WAIT_EVENT_DATA_FILE_EXTEND);
@@ -920,19 +845,14 @@ mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 /*
  * mdwritev() -- Write the supplied blocks at the appropriate location.
  *
- * This is to be used only for updating already-existing blocks of a
- * relation (ie, those before the current EOF).  To extend a relation,
- * use mdextend().
+ * Note that smgrextendv() and smgrwritev() are different operations, but both
+ * are handled here.
  */
 void
 mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void **buffers, BlockNumber nblocks, bool skipFsync)
+		 const void **buffers, BlockNumber nblocks,
+		 bool extend, bool skipFsync)
 {
-	/* This assert is too expensive to have on normally ... */
-#ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum < mdnblocks(reln, forknum));
-#endif
-
 	while (nblocks > 0)
 	{
 		struct iovec iov[PG_IOV_MAX];
@@ -945,6 +865,8 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 		size_t		size_this_segment;
 
 		v = _mdfd_getseg(reln, forknum, blocknum, skipFsync,
+						 extend ?
+						 EXTENSION_CREATE :
 						 EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
 
 		seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
@@ -992,7 +914,9 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 
 				ereport(ERROR,
 						(errcode_for_file_access(),
-						 errmsg("could not write blocks %u..%u in file \"%s\": %m",
+						 errmsg(extend ?
+								"could not extend blocks %u..%u in file \"%s\": %m" :
+								"could not write blocks %u..%u in file \"%s\": %m",
 								blocknum,
 								blocknum + nblocks_this_segment - 1,
 								FilePathName(v->mdfd_vfd)),
@@ -1638,7 +1562,7 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 		{
 			/*
 			 * Normally we will create new segments only if authorized by the
-			 * caller (i.e., we are doing mdextend()).  But when doing WAL
+			 * caller (i.e., we are doing smgrextend()).  But when doing WAL
 			 * recovery, create segments anyway; this allows cases such as
 			 * replaying WAL data that has a write into a high-numbered
 			 * segment of a relation that was later deleted. We want to go
@@ -1655,9 +1579,9 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 				char	   *zerobuf = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE,
 													 MCXT_ALLOC_ZERO);
 
-				mdextend(reln, forknum,
-						 nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
-						 zerobuf, skipFsync);
+				smgrextend(reln, forknum,
+						   nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
+						   zerobuf, skipFsync);
 				pfree(zerobuf);
 			}
 			flags = O_CREAT;
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index a5b18328b8..ad0f02f205 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -82,8 +82,6 @@ typedef struct f_smgr
 	bool		(*smgr_exists) (SMgrRelation reln, ForkNumber forknum);
 	void		(*smgr_unlink) (RelFileLocatorBackend rlocator, ForkNumber forknum,
 								bool isRedo);
-	void		(*smgr_extend) (SMgrRelation reln, ForkNumber forknum,
-								BlockNumber blocknum, const void *buffer, bool skipFsync);
 	void		(*smgr_zeroextend) (SMgrRelation reln, ForkNumber forknum,
 									BlockNumber blocknum, int nblocks, bool skipFsync);
 	bool		(*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum,
@@ -94,7 +92,7 @@ typedef struct f_smgr
 	void		(*smgr_writev) (SMgrRelation reln, ForkNumber forknum,
 								BlockNumber blocknum,
 								const void **buffers, BlockNumber nblocks,
-								bool skipFsync);
+								bool extend, bool skipFsync);
 	void		(*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
 								   BlockNumber blocknum, BlockNumber nblocks);
 	BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
@@ -114,7 +112,6 @@ static const f_smgr smgrsw[] = {
 		.smgr_create = mdcreate,
 		.smgr_exists = mdexists,
 		.smgr_unlink = mdunlink,
-		.smgr_extend = mdextend,
 		.smgr_zeroextend = mdzeroextend,
 		.smgr_prefetch = mdprefetch,
 		.smgr_readv = mdreadv,
@@ -525,7 +522,7 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
 
 
 /*
- * smgrextend() -- Add a new block to a file.
+ * smgrextendv() -- Add new blocks to a file.
  *
  * The semantics are nearly the same as smgrwrite(): write at the
  * specified position.  However, this is to be used for the case of
@@ -534,19 +531,24 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
  * causes intervening file space to become filled with zeroes.
  */
 void
-smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		   const void *buffer, bool skipFsync)
+smgrextendv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+			const void **buffers, BlockNumber nblocks, bool skipFsync)
 {
-	smgrsw[reln->smgr_which].smgr_extend(reln, forknum, blocknum,
-										 buffer, skipFsync);
+	/* This assert is too expensive to have on normally ... */
+#ifdef CHECK_WRITE_VS_EXTEND
+	Assert(blocknum >= smgrnblocks(reln, forknum));
+#endif
+
+	smgrsw[reln->smgr_which].smgr_writev(reln, forknum, blocknum,
+										 buffers, nblocks, true, skipFsync);
 
 	/*
-	 * Normally we expect this to increase nblocks by one, but if the cached
+	 * Normally we expect this to increase nblocks by N, but if the cached
 	 * value isn't as expected, just invalidate it so the next call asks the
 	 * kernel.
 	 */
 	if (reln->smgr_cached_nblocks[forknum] == blocknum)
-		reln->smgr_cached_nblocks[forknum] = blocknum + 1;
+		reln->smgr_cached_nblocks[forknum] = blocknum + nblocks;
 	else
 		reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
 }
@@ -633,8 +635,14 @@ void
 smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 		   const void **buffers, BlockNumber nblocks, bool skipFsync)
 {
+	/* This assert is too expensive to have on normally ... */
+#ifdef CHECK_WRITE_VS_EXTEND
+	Assert(blocknum + nblocks <= smgrnblocks(reln, forknum));
+#endif
+
 	smgrsw[reln->smgr_which].smgr_writev(reln, forknum, blocknum,
-										 buffers, nblocks, skipFsync);
+										 buffers, nblocks, false,
+										 skipFsync);
 }
 
 /*
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 620f10abde..8746a48a5f 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -36,7 +36,8 @@ extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 					void **buffers, BlockNumber nblocks);
 extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
 					 BlockNumber blocknum,
-					 const void **buffers, BlockNumber nblocks, bool skipFsync);
+					 const void **buffers, BlockNumber nblocks,
+					 bool extend, bool skipFsync);
 extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
 						BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index fc5f883ce1..7ffea4332d 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -86,8 +86,9 @@ extern void smgrreleaserellocator(RelFileLocatorBackend rlocator);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
-extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
-					   BlockNumber blocknum, const void *buffer, bool skipFsync);
+extern void smgrextendv(SMgrRelation reln, ForkNumber forknum,
+						BlockNumber blocknum,
+						const void **buffers, BlockNumber nblocks, bool skipFsync);
 extern void smgrzeroextend(SMgrRelation reln, ForkNumber forknum,
 						   BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum,
@@ -124,4 +125,11 @@ smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
 }
 
+static inline void
+smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+		   const void *buffer, bool skipFsync)
+{
+	smgrextendv(reln, forknum, blocknum, &buffer, 1, skipFsync);
+}
+
 #endif							/* SMGR_H */
-- 
2.43.2

v2-0002-Use-vectored-I-O-for-bulk-writes.patchtext/x-patch; charset=US-ASCII; name=v2-0002-Use-vectored-I-O-for-bulk-writes.patchDownload
From bf8f807a6c0d34a3447bb8ef6c7cc014be5c3603 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 9 Mar 2024 16:54:56 +1300
Subject: [PATCH v2 2/2] Use vectored I/O for bulk writes.

Zero-extend using smgrzeroextend().  Extend using smgrextendv().  Write
using smgrwritev().
---
 src/backend/storage/smgr/bulk_write.c | 83 +++++++++++++++++++--------
 1 file changed, 58 insertions(+), 25 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 4a10ece4c3..2a566a80f6 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -8,7 +8,7 @@
  * the regular buffer manager and the bulk loading interface!
  *
  * We bypass the buffer manager to avoid the locking overhead, and call
- * smgrextend() directly.  A downside is that the pages will need to be
+ * smgrextendv() directly.  A downside is that the pages will need to be
  * re-read into shared buffers on first use after the build finishes.  That's
  * usually a good tradeoff for large relations, and for small relations, the
  * overhead isn't very significant compared to creating the relation in the
@@ -45,8 +45,6 @@
 
 #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
 
-static const PGIOAlignedBlock zero_buffer = {{0}};	/* worth BLCKSZ */
-
 typedef struct PendingWrite
 {
 	BulkWriteBuffer buf;
@@ -225,35 +223,70 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 	for (int i = 0; i < npending; i++)
 	{
-		BlockNumber blkno = pending_writes[i].blkno;
-		Page		page = pending_writes[i].buf->data;
-
+		Page		page;
+		const void *pages[16];
+		BlockNumber blkno;
+		int			nblocks;
+		int			max_nblocks;
+
+		/* Prepare to write the first block. */
+		blkno = pending_writes[i].blkno;
+		page = pending_writes[i].buf->data;
 		PageSetChecksumInplace(page, blkno);
+		pages[0] = page;
+		nblocks = 1;
 
-		if (blkno >= bulkstate->pages_written)
+		/* Zero-extend any missing space before the first block. */
+		if (blkno > bulkstate->pages_written)
+		{
+			int			nzeroblocks;
+
+			nzeroblocks = blkno - bulkstate->pages_written;
+			smgrzeroextend(bulkstate->smgr, bulkstate->forknum,
+						   bulkstate->pages_written, nzeroblocks, true);
+			bulkstate->pages_written += nzeroblocks;
+		}
+
+		if (blkno < bulkstate->pages_written)
 		{
 			/*
-			 * If we have to write pages nonsequentially, fill in the space
-			 * with zeroes until we come back and overwrite.  This is not
-			 * logically necessary on standard Unix filesystems (unwritten
-			 * space will read as zeroes anyway), but it should help to avoid
-			 * fragmentation.  The dummy pages aren't WAL-logged though.
+			 * We're overwriting.  Clamp at the existing size, because we
+			 * can't mix writing and extending in a single operation.
 			 */
-			while (blkno > bulkstate->pages_written)
-			{
-				/* don't set checksum for all-zero page */
-				smgrextend(bulkstate->smgr, bulkstate->forknum,
-						   bulkstate->pages_written++,
-						   &zero_buffer,
-						   true);
-			}
-
-			smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-			bulkstate->pages_written = pending_writes[i].blkno + 1;
+			max_nblocks = Min(lengthof(pages),
+							  bulkstate->pages_written - blkno);
+		}
+		else
+		{
+			/* We're extending. */
+			Assert(blkno == bulkstate->pages_written);
+			max_nblocks = lengthof(pages);
+		}
+
+		/* Find as many consecutive blocks as we can. */
+		while (i + 1 < npending &&
+			   pending_writes[i + 1].blkno == blkno + nblocks &&
+			   nblocks < max_nblocks)
+		{
+			page = pending_writes[++i].buf->data;
+			PageSetChecksumInplace(page, pending_writes[i].blkno);
+			pages[nblocks++] = page;
+		}
+
+		/* Extend or overwrite. */
+		if (blkno == bulkstate->pages_written)
+		{
+			smgrextendv(bulkstate->smgr, bulkstate->forknum, blkno, pages, nblocks, true);
+			bulkstate->pages_written += nblocks;
 		}
 		else
-			smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-		pfree(page);
+		{
+			Assert(blkno + nblocks <= bulkstate->pages_written);
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno, pages, nblocks, true);
+		}
+
+		for (int j = 0; j < nblocks; ++j)
+			pfree(pending_writes[i - j].buf->data);
 	}
 
 	bulkstate->npending = 0;
-- 
2.43.2

#3Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#2)
3 attachment(s)
Re: Vectored I/O in bulk_write.c

Here also is a first attempt at improving the memory allocation and
memory layout.

I wonder if bulk logging should trigger larger WAL writes in the "Have
to write it ourselves" case in AdvanceXLInsertBuffer(), since writing
8kB of WAL at a time seems like an unnecessarily degraded level of
performance, especially with wal_sync_method=open_datasync. Of course
the real answer is "make sure wal_buffers is high enough for your
workload" (usually indirectly by automatically scaling from
shared_buffers), but this problem jumps out when tracing bulk_writes.c
with default settings. We write out the index 128kB at a time, but
the WAL 8kB at a time.

Attachments:

v3-0001-Provide-vectored-variant-of-smgrextend.patchtext/x-patch; charset=US-ASCII; name=v3-0001-Provide-vectored-variant-of-smgrextend.patchDownload
From 793caf2db6c00314f5bd8f7146d4797508f2f627 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 9 Mar 2024 16:04:21 +1300
Subject: [PATCH v3 1/3] Provide vectored variant of smgrextend().

Since mdwrite() and mdextend() were basically the same, merge them.
They had different assertions, but those would surely apply to any
implementation of smgr, so move them up to the smgr.c wrapper
level.  The other difference was the policy on segment creation, but
that can be captured by having smgrwritev() and smgrextendv() call a
single mdwritev() function with a new "extend" flag.

Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/smgr/md.c   | 106 +++++---------------------------
 src/backend/storage/smgr/smgr.c |  32 ++++++----
 src/include/storage/md.h        |   3 +-
 src/include/storage/smgr.h      |  12 +++-
 4 files changed, 47 insertions(+), 106 deletions(-)

diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index bf0f3ca76d1..9b125841226 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -447,79 +447,10 @@ mdunlinkfork(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo)
 	pfree(path);
 }
 
-/*
- * mdextend() -- Add a block to the specified relation.
- *
- * The semantics are nearly the same as mdwrite(): write at the
- * specified position.  However, this is to be used for the case of
- * extending a relation (i.e., blocknum is at or beyond the current
- * EOF).  Note that we assume writing a block beyond current EOF
- * causes intervening file space to become filled with zeroes.
- */
-void
-mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void *buffer, bool skipFsync)
-{
-	off_t		seekpos;
-	int			nbytes;
-	MdfdVec    *v;
-
-	/* If this build supports direct I/O, the buffer must be I/O aligned. */
-	if (PG_O_DIRECT != 0 && PG_IO_ALIGN_SIZE <= BLCKSZ)
-		Assert((uintptr_t) buffer == TYPEALIGN(PG_IO_ALIGN_SIZE, buffer));
-
-	/* This assert is too expensive to have on normally ... */
-#ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum >= mdnblocks(reln, forknum));
-#endif
-
-	/*
-	 * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any
-	 * more --- we mustn't create a block whose number actually is
-	 * InvalidBlockNumber.  (Note that this failure should be unreachable
-	 * because of upstream checks in bufmgr.c.)
-	 */
-	if (blocknum == InvalidBlockNumber)
-		ereport(ERROR,
-				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-				 errmsg("cannot extend file \"%s\" beyond %u blocks",
-						relpath(reln->smgr_rlocator, forknum),
-						InvalidBlockNumber)));
-
-	v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_CREATE);
-
-	seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
-
-	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
-
-	if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ)
-	{
-		if (nbytes < 0)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not extend file \"%s\": %m",
-							FilePathName(v->mdfd_vfd)),
-					 errhint("Check free disk space.")));
-		/* short write: complain appropriately */
-		ereport(ERROR,
-				(errcode(ERRCODE_DISK_FULL),
-				 errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u",
-						FilePathName(v->mdfd_vfd),
-						nbytes, BLCKSZ, blocknum),
-				 errhint("Check free disk space.")));
-	}
-
-	if (!skipFsync && !SmgrIsTemp(reln))
-		register_dirty_segment(reln, forknum, v);
-
-	Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
-}
-
 /*
  * mdzeroextend() -- Add new zeroed out blocks to the specified relation.
  *
- * Similar to mdextend(), except the relation can be extended by multiple
- * blocks at once and the added blocks will be filled with zeroes.
+ * The added blocks will be filled with zeroes.
  */
 void
 mdzeroextend(SMgrRelation reln, ForkNumber forknum,
@@ -595,13 +526,7 @@ mdzeroextend(SMgrRelation reln, ForkNumber forknum,
 		{
 			int			ret;
 
-			/*
-			 * Even if we don't want to use fallocate, we can still extend a
-			 * bit more efficiently than writing each 8kB block individually.
-			 * pg_pwrite_zeros() (via FileZero()) uses pg_pwritev_with_retry()
-			 * to avoid multiple writes or needing a zeroed buffer for the
-			 * whole length of the extension.
-			 */
+			/* Fall back to writing out zeroes. */
 			ret = FileZero(v->mdfd_vfd,
 						   seekpos, (off_t) BLCKSZ * numblocks,
 						   WAIT_EVENT_DATA_FILE_EXTEND);
@@ -920,19 +845,14 @@ mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 /*
  * mdwritev() -- Write the supplied blocks at the appropriate location.
  *
- * This is to be used only for updating already-existing blocks of a
- * relation (ie, those before the current EOF).  To extend a relation,
- * use mdextend().
+ * Note that smgrextendv() and smgrwritev() are different operations, but both
+ * are handled here.
  */
 void
 mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void **buffers, BlockNumber nblocks, bool skipFsync)
+		 const void **buffers, BlockNumber nblocks,
+		 bool extend, bool skipFsync)
 {
-	/* This assert is too expensive to have on normally ... */
-#ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum < mdnblocks(reln, forknum));
-#endif
-
 	while (nblocks > 0)
 	{
 		struct iovec iov[PG_IOV_MAX];
@@ -945,6 +865,8 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 		size_t		size_this_segment;
 
 		v = _mdfd_getseg(reln, forknum, blocknum, skipFsync,
+						 extend ?
+						 EXTENSION_CREATE :
 						 EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
 
 		seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
@@ -992,7 +914,9 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 
 				ereport(ERROR,
 						(errcode_for_file_access(),
-						 errmsg("could not write blocks %u..%u in file \"%s\": %m",
+						 errmsg(extend ?
+								"could not extend blocks %u..%u in file \"%s\": %m" :
+								"could not write blocks %u..%u in file \"%s\": %m",
 								blocknum,
 								blocknum + nblocks_this_segment - 1,
 								FilePathName(v->mdfd_vfd)),
@@ -1638,7 +1562,7 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 		{
 			/*
 			 * Normally we will create new segments only if authorized by the
-			 * caller (i.e., we are doing mdextend()).  But when doing WAL
+			 * caller (i.e., we are doing smgrextend()).  But when doing WAL
 			 * recovery, create segments anyway; this allows cases such as
 			 * replaying WAL data that has a write into a high-numbered
 			 * segment of a relation that was later deleted. We want to go
@@ -1655,9 +1579,9 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 				char	   *zerobuf = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE,
 													 MCXT_ALLOC_ZERO);
 
-				mdextend(reln, forknum,
-						 nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
-						 zerobuf, skipFsync);
+				smgrextend(reln, forknum,
+						   nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
+						   zerobuf, skipFsync);
 				pfree(zerobuf);
 			}
 			flags = O_CREAT;
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index a5b18328b89..ad0f02f205e 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -82,8 +82,6 @@ typedef struct f_smgr
 	bool		(*smgr_exists) (SMgrRelation reln, ForkNumber forknum);
 	void		(*smgr_unlink) (RelFileLocatorBackend rlocator, ForkNumber forknum,
 								bool isRedo);
-	void		(*smgr_extend) (SMgrRelation reln, ForkNumber forknum,
-								BlockNumber blocknum, const void *buffer, bool skipFsync);
 	void		(*smgr_zeroextend) (SMgrRelation reln, ForkNumber forknum,
 									BlockNumber blocknum, int nblocks, bool skipFsync);
 	bool		(*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum,
@@ -94,7 +92,7 @@ typedef struct f_smgr
 	void		(*smgr_writev) (SMgrRelation reln, ForkNumber forknum,
 								BlockNumber blocknum,
 								const void **buffers, BlockNumber nblocks,
-								bool skipFsync);
+								bool extend, bool skipFsync);
 	void		(*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
 								   BlockNumber blocknum, BlockNumber nblocks);
 	BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
@@ -114,7 +112,6 @@ static const f_smgr smgrsw[] = {
 		.smgr_create = mdcreate,
 		.smgr_exists = mdexists,
 		.smgr_unlink = mdunlink,
-		.smgr_extend = mdextend,
 		.smgr_zeroextend = mdzeroextend,
 		.smgr_prefetch = mdprefetch,
 		.smgr_readv = mdreadv,
@@ -525,7 +522,7 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
 
 
 /*
- * smgrextend() -- Add a new block to a file.
+ * smgrextendv() -- Add new blocks to a file.
  *
  * The semantics are nearly the same as smgrwrite(): write at the
  * specified position.  However, this is to be used for the case of
@@ -534,19 +531,24 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
  * causes intervening file space to become filled with zeroes.
  */
 void
-smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		   const void *buffer, bool skipFsync)
+smgrextendv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+			const void **buffers, BlockNumber nblocks, bool skipFsync)
 {
-	smgrsw[reln->smgr_which].smgr_extend(reln, forknum, blocknum,
-										 buffer, skipFsync);
+	/* This assert is too expensive to have on normally ... */
+#ifdef CHECK_WRITE_VS_EXTEND
+	Assert(blocknum >= smgrnblocks(reln, forknum));
+#endif
+
+	smgrsw[reln->smgr_which].smgr_writev(reln, forknum, blocknum,
+										 buffers, nblocks, true, skipFsync);
 
 	/*
-	 * Normally we expect this to increase nblocks by one, but if the cached
+	 * Normally we expect this to increase nblocks by N, but if the cached
 	 * value isn't as expected, just invalidate it so the next call asks the
 	 * kernel.
 	 */
 	if (reln->smgr_cached_nblocks[forknum] == blocknum)
-		reln->smgr_cached_nblocks[forknum] = blocknum + 1;
+		reln->smgr_cached_nblocks[forknum] = blocknum + nblocks;
 	else
 		reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
 }
@@ -633,8 +635,14 @@ void
 smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 		   const void **buffers, BlockNumber nblocks, bool skipFsync)
 {
+	/* This assert is too expensive to have on normally ... */
+#ifdef CHECK_WRITE_VS_EXTEND
+	Assert(blocknum + nblocks <= smgrnblocks(reln, forknum));
+#endif
+
 	smgrsw[reln->smgr_which].smgr_writev(reln, forknum, blocknum,
-										 buffers, nblocks, skipFsync);
+										 buffers, nblocks, false,
+										 skipFsync);
 }
 
 /*
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 620f10abdeb..8746a48a5fb 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -36,7 +36,8 @@ extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 					void **buffers, BlockNumber nblocks);
 extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
 					 BlockNumber blocknum,
-					 const void **buffers, BlockNumber nblocks, bool skipFsync);
+					 const void **buffers, BlockNumber nblocks,
+					 bool extend, bool skipFsync);
 extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
 						BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index fc5f883ce14..7ffea4332d2 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -86,8 +86,9 @@ extern void smgrreleaserellocator(RelFileLocatorBackend rlocator);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
-extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
-					   BlockNumber blocknum, const void *buffer, bool skipFsync);
+extern void smgrextendv(SMgrRelation reln, ForkNumber forknum,
+						BlockNumber blocknum,
+						const void **buffers, BlockNumber nblocks, bool skipFsync);
 extern void smgrzeroextend(SMgrRelation reln, ForkNumber forknum,
 						   BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum,
@@ -124,4 +125,11 @@ smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
 }
 
+static inline void
+smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+		   const void *buffer, bool skipFsync)
+{
+	smgrextendv(reln, forknum, blocknum, &buffer, 1, skipFsync);
+}
+
 #endif							/* SMGR_H */
-- 
2.39.2

v3-0002-Use-vectored-I-O-for-bulk-writes.patchtext/x-patch; charset=US-ASCII; name=v3-0002-Use-vectored-I-O-for-bulk-writes.patchDownload
From 9f679c914cc81907ba7ae9bbbf6e08fe81c7ec2d Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 9 Mar 2024 16:54:56 +1300
Subject: [PATCH v3 2/3] Use vectored I/O for bulk writes.

Zero-extend using smgrzeroextend().  Extend using smgrextendv().  Write
using smgrwritev().

Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/smgr/bulk_write.c | 83 +++++++++++++++++++--------
 1 file changed, 58 insertions(+), 25 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 4a10ece4c39..2a566a80f60 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -8,7 +8,7 @@
  * the regular buffer manager and the bulk loading interface!
  *
  * We bypass the buffer manager to avoid the locking overhead, and call
- * smgrextend() directly.  A downside is that the pages will need to be
+ * smgrextendv() directly.  A downside is that the pages will need to be
  * re-read into shared buffers on first use after the build finishes.  That's
  * usually a good tradeoff for large relations, and for small relations, the
  * overhead isn't very significant compared to creating the relation in the
@@ -45,8 +45,6 @@
 
 #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
 
-static const PGIOAlignedBlock zero_buffer = {{0}};	/* worth BLCKSZ */
-
 typedef struct PendingWrite
 {
 	BulkWriteBuffer buf;
@@ -225,35 +223,70 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 	for (int i = 0; i < npending; i++)
 	{
-		BlockNumber blkno = pending_writes[i].blkno;
-		Page		page = pending_writes[i].buf->data;
-
+		Page		page;
+		const void *pages[16];
+		BlockNumber blkno;
+		int			nblocks;
+		int			max_nblocks;
+
+		/* Prepare to write the first block. */
+		blkno = pending_writes[i].blkno;
+		page = pending_writes[i].buf->data;
 		PageSetChecksumInplace(page, blkno);
+		pages[0] = page;
+		nblocks = 1;
 
-		if (blkno >= bulkstate->pages_written)
+		/* Zero-extend any missing space before the first block. */
+		if (blkno > bulkstate->pages_written)
+		{
+			int			nzeroblocks;
+
+			nzeroblocks = blkno - bulkstate->pages_written;
+			smgrzeroextend(bulkstate->smgr, bulkstate->forknum,
+						   bulkstate->pages_written, nzeroblocks, true);
+			bulkstate->pages_written += nzeroblocks;
+		}
+
+		if (blkno < bulkstate->pages_written)
 		{
 			/*
-			 * If we have to write pages nonsequentially, fill in the space
-			 * with zeroes until we come back and overwrite.  This is not
-			 * logically necessary on standard Unix filesystems (unwritten
-			 * space will read as zeroes anyway), but it should help to avoid
-			 * fragmentation.  The dummy pages aren't WAL-logged though.
+			 * We're overwriting.  Clamp at the existing size, because we
+			 * can't mix writing and extending in a single operation.
 			 */
-			while (blkno > bulkstate->pages_written)
-			{
-				/* don't set checksum for all-zero page */
-				smgrextend(bulkstate->smgr, bulkstate->forknum,
-						   bulkstate->pages_written++,
-						   &zero_buffer,
-						   true);
-			}
-
-			smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-			bulkstate->pages_written = pending_writes[i].blkno + 1;
+			max_nblocks = Min(lengthof(pages),
+							  bulkstate->pages_written - blkno);
+		}
+		else
+		{
+			/* We're extending. */
+			Assert(blkno == bulkstate->pages_written);
+			max_nblocks = lengthof(pages);
+		}
+
+		/* Find as many consecutive blocks as we can. */
+		while (i + 1 < npending &&
+			   pending_writes[i + 1].blkno == blkno + nblocks &&
+			   nblocks < max_nblocks)
+		{
+			page = pending_writes[++i].buf->data;
+			PageSetChecksumInplace(page, pending_writes[i].blkno);
+			pages[nblocks++] = page;
+		}
+
+		/* Extend or overwrite. */
+		if (blkno == bulkstate->pages_written)
+		{
+			smgrextendv(bulkstate->smgr, bulkstate->forknum, blkno, pages, nblocks, true);
+			bulkstate->pages_written += nblocks;
 		}
 		else
-			smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-		pfree(page);
+		{
+			Assert(blkno + nblocks <= bulkstate->pages_written);
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno, pages, nblocks, true);
+		}
+
+		for (int j = 0; j < nblocks; ++j)
+			pfree(pending_writes[i - j].buf->data);
 	}
 
 	bulkstate->npending = 0;
-- 
2.39.2

v3-0003-Improve-bulk_write.c-memory-management.patchtext/x-patch; charset=US-ASCII; name=v3-0003-Improve-bulk_write.c-memory-management.patchDownload
From 6d807a3c85b77d7daf7b8345ab988b6f44fd4c87 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 11 Mar 2024 11:44:41 +1300
Subject: [PATCH v3 3/3] Improve bulk_write.c memory management.

Instead of allocating buffers one at a time with palloc(), allocate an
array full of them up front, and then manage them in a FIFO freelist.
Aside from avoiding allocator overheads, this means that callers who
write sequential blocks will tend to fill up sequential memory, which
hopefully generates more efficient vectored writes.

Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/smgr/bulk_write.c | 62 +++++++++++++++++++--------
 1 file changed, 43 insertions(+), 19 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 2a566a80f60..d9c38ffa88b 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -36,6 +36,7 @@
 
 #include "access/xloginsert.h"
 #include "access/xlogrecord.h"
+#include "lib/ilist.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
 #include "storage/bulk_write.h"
@@ -45,9 +46,15 @@
 
 #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
 
+typedef union BufferSlot
+{
+	PGIOAlignedBlock buffer;
+	dlist_node	freelist_node;
+}			BufferSlot;
+
 typedef struct PendingWrite
 {
-	BulkWriteBuffer buf;
+	BufferSlot *slot;
 	BlockNumber blkno;
 	bool		page_std;
 } PendingWrite;
@@ -57,6 +64,10 @@ typedef struct PendingWrite
  */
 struct BulkWriteState
 {
+	/* Comes first so we can align it correctly. */
+	BufferSlot	buffer_slots[MAX_PENDING_WRITES + 2];
+	dlist_head	buffer_slots_freelist;
+
 	/* Information about the target relation we're writing */
 	SMgrRelation smgr;
 	ForkNumber	forknum;
@@ -71,8 +82,6 @@ struct BulkWriteState
 
 	/* The RedoRecPtr at the time that the bulk operation started */
 	XLogRecPtr	start_RedoRecPtr;
-
-	MemoryContext memcxt;
 };
 
 static void smgr_bulk_flush(BulkWriteState *bulkstate);
@@ -98,7 +107,7 @@ smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
 {
 	BulkWriteState *state;
 
-	state = palloc(sizeof(BulkWriteState));
+	state = palloc_aligned(sizeof(BulkWriteState), PG_IO_ALIGN_SIZE, 0);
 	state->smgr = smgr;
 	state->forknum = forknum;
 	state->use_wal = use_wal;
@@ -108,11 +117,11 @@ smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
 
 	state->start_RedoRecPtr = GetRedoRecPtr();
 
-	/*
-	 * Remember the memory context.  We will use it to allocate all the
-	 * buffers later.
-	 */
-	state->memcxt = CurrentMemoryContext;
+	/* Set up the free-list of buffers. */
+	dlist_init(&state->buffer_slots_freelist);
+	for (int i = 0; i < lengthof(state->buffer_slots); ++i)
+		dlist_push_tail(&state->buffer_slots_freelist,
+						&state->buffer_slots[i].freelist_node);
 
 	return state;
 }
@@ -206,7 +215,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 		for (int i = 0; i < npending; i++)
 		{
 			blknos[i] = pending_writes[i].blkno;
-			pages[i] = pending_writes[i].buf->data;
+			pages[i] = pending_writes[i].slot->buffer.data;
 
 			/*
 			 * If any of the pages use !page_std, we log them all as such.
@@ -231,7 +240,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 		/* Prepare to write the first block. */
 		blkno = pending_writes[i].blkno;
-		page = pending_writes[i].buf->data;
+		page = pending_writes[i].slot->buffer.data;
 		PageSetChecksumInplace(page, blkno);
 		pages[0] = page;
 		nblocks = 1;
@@ -268,7 +277,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 			   pending_writes[i + 1].blkno == blkno + nblocks &&
 			   nblocks < max_nblocks)
 		{
-			page = pending_writes[++i].buf->data;
+			page = pending_writes[++i].slot->buffer.data;
 			PageSetChecksumInplace(page, pending_writes[i].blkno);
 			pages[nblocks++] = page;
 		}
@@ -285,8 +294,14 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno, pages, nblocks, true);
 		}
 
-		for (int j = 0; j < nblocks; ++j)
-			pfree(pending_writes[i - j].buf->data);
+		/*
+		 * Maintain FIFO ordering in the free list, so that users who write
+		 * blocks in sequential order tend to get sequential chunks of buffer
+		 * memory, which may be slight more efficient for vectored writes.
+		 */
+		for (int j = i - nblocks + 1; j <= i; ++j)
+			dlist_push_tail(&bulkstate->buffer_slots_freelist,
+							&pending_writes[j].slot->freelist_node);
 	}
 
 	bulkstate->npending = 0;
@@ -306,7 +321,7 @@ smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer
 	PendingWrite *w;
 
 	w = &bulkstate->pending_writes[bulkstate->npending++];
-	w->buf = buf;
+	w->slot = (BufferSlot *) buf;
 	w->blkno = blocknum;
 	w->page_std = page_std;
 
@@ -320,12 +335,21 @@ smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer
  * There is no function to free the buffer.  When you pass it to
  * smgr_bulk_write(), it takes ownership and frees it when it's no longer
  * needed.
- *
- * This is currently implemented as a simple palloc, but could be implemented
- * using a ring buffer or larger chunks in the future, so don't rely on it.
  */
 BulkWriteBuffer
 smgr_bulk_get_buf(BulkWriteState *bulkstate)
 {
-	return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+	BufferSlot *slot;
+
+	if (dlist_is_empty(&bulkstate->buffer_slots_freelist))
+	{
+		smgr_bulk_flush(bulkstate);
+		if (dlist_is_empty(&bulkstate->buffer_slots_freelist))
+			elog(ERROR, "too many bulk write buffers used but not yet written");
+	}
+
+	slot = dlist_head_element(BufferSlot, freelist_node, &bulkstate->buffer_slots_freelist);
+	dlist_pop_head_node(&bulkstate->buffer_slots_freelist);
+
+	return &slot->buffer;
 }
-- 
2.39.2

#4Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#3)
Re: Vectored I/O in bulk_write.c

One more observation while I'm thinking about bulk_write.c... hmm, it
writes the data out and asks the checkpointer to fsync it, but doesn't
call smgrwriteback(). I assume that means that on Linux the physical
writeback sometimes won't happen until the checkpointer eventually
calls fsync() sequentially, one segment file at a time. I see that
it's difficult to decide how to do that though; unlike checkpoints,
which have rate control/spreading, bulk writes could more easily flood
the I/O subsystem in a burst. I expect most non-Linux systems'
write-behind heuristics to fire up for bulk sequential writes, but on
Linux where most users live, there is no write-behind heuristic AFAIK
(on the most common file systems anyway), so you have to crank that
handle if you want it to wake up and smell the coffee before it hits
internal limits, but then you have to decide how fast to crank it.

This problem will come into closer focus when we start talking about
streaming writes. For the current non-streaming bulk_write.c coding,
I don't have any particular idea of what to do about that, so I'm just
noting the observation here.

Sorry for the sudden wall of text/monologue; this is all a sort of
reaction to bulk_write.c that I should perhaps have sent to the
bulk_write.c thread, triggered by a couple of debugging sessions.

#5Heikki Linnakangas
hlinnaka@iki.fi
In reply to: Thomas Munro (#4)
Re: Vectored I/O in bulk_write.c

(Replying to all your messages in this thread together)

This made me wonder why smgrwritev() and smgrextendv() shouldn't be
backed by the same implementation, since they are essentially the same
operation.

+1 to the general idea of merging the write and extend functions.

The differences are some assertions which might as well be
moved up to the smgr.c level as they must surely apply to any future
smgr implementation too, right?, and the segment file creation policy
which can be controlled with a new argument. I tried that here.

Currently, smgrwrite/smgrextend just calls through to mdwrite/mdextend.
I'd like to keep it that way. Otherwise we need to guess what a
hypothetical smgr implementation might look like.

For example this assertion:

/* This assert is too expensive to have on normally ... */
#ifdef CHECK_WRITE_VS_EXTEND
Assert(blocknum >= mdnblocks(reln, forknum));
#endif

I think that should continue to be md.c's internal affair. For example,
imagine that you had a syscall like write() but which always writes to
the end of the file and also returns the offset that the data was
written to. You would want to assert the returned offset instead of the
above.

So -1 on moving up the assertions to smgr.c.

Let's bite the bullet and merge the smgrwrite and smgrextend functions
at the smgr level too. I propose the following signature:

#define SWF_SKIP_FSYNC 0x01
#define SWF_EXTEND 0x02
#define SWF_ZERO 0x04

void smgrwritev(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum,
const void **buffer, BlockNumber nblocks,
int flags);

This would replace smgwrite, smgrextend, and smgrzeroextend. The
mdwritev() function would have the same signature. A single 'flags' arg
looks better in the callers than booleans, because you don't need to
remember what 'true' or 'false' means. The callers would look like this:

/* like old smgrwrite(reln, MAIN_FORKNUM, 123, buf, false) */
smgrwritev(reln, MAIN_FORKNUM, 123, buf, 1, 0);

/* like old smgrwrite(reln, MAIN_FORKNUM, 123, buf, true) */
smgrwritev(reln, MAIN_FORKNUM, 123, buf, 1, SWF_SKIP_FSYNC);

/* like old smgrextend(reln, MAIN_FORKNUM, 123, buf, true) */
smgrwritev(reln, MAIN_FORKNUM, 123, buf, 1,
SWF_EXTEND | SWF_SKIP_FSYNC);

/* like old smgrzeroextend(reln, MAIN_FORKNUM, 123, 10, true) */
smgrwritev(reln, MAIN_FORKNUM, 123, NULL, 10,
SWF_EXTEND | SWF_ZERO | SWF_SKIP_FSYNC);

While thinking about that I realised that an existing write-or-extend
assertion in master is wrong because it doesn't add nblocks.

Hmm, it's a bit weird that we have nblocks as int or BlockNumber in
various places, which I think should probably be fixed.

+1

Here also is a first attempt at improving the memory allocation and
memory layout.
...
+typedef union BufferSlot
+{
+	PGIOAlignedBlock buffer;
+	dlist_node	freelist_node;
+}			BufferSlot;
+

If you allocated the buffers in one large contiguous chunk, you could
often do one large write() instead of a gathered writev() of multiple
blocks. That should be even better, although I don't know much of a
difference it makes. The above layout wastes a fair amount memory too,
because 'buffer' is I/O aligned.

I wonder if bulk logging should trigger larger WAL writes in the "Have
to write it ourselves" case in AdvanceXLInsertBuffer(), since writing
8kB of WAL at a time seems like an unnecessarily degraded level of
performance, especially with wal_sync_method=open_datasync. Of course
the real answer is "make sure wal_buffers is high enough for your
workload" (usually indirectly by automatically scaling from
shared_buffers), but this problem jumps out when tracing bulk_writes.c
with default settings. We write out the index 128kB at a time, but
the WAL 8kB at a time.

Makes sense.

On 12/03/2024 23:38, Thomas Munro wrote:

One more observation while I'm thinking about bulk_write.c... hmm, it
writes the data out and asks the checkpointer to fsync it, but doesn't
call smgrwriteback(). I assume that means that on Linux the physical
writeback sometimes won't happen until the checkpointer eventually
calls fsync() sequentially, one segment file at a time. I see that
it's difficult to decide how to do that though; unlike checkpoints,
which have rate control/spreading, bulk writes could more easily flood
the I/O subsystem in a burst. I expect most non-Linux systems'
write-behind heuristics to fire up for bulk sequential writes, but on
Linux where most users live, there is no write-behind heuristic AFAIK
(on the most common file systems anyway), so you have to crank that
handle if you want it to wake up and smell the coffee before it hits
internal limits, but then you have to decide how fast to crank it.

This problem will come into closer focus when we start talking about
streaming writes. For the current non-streaming bulk_write.c coding,
I don't have any particular idea of what to do about that, so I'm just
noting the observation here.

It would be straightforward to call smgrwriteback() from sgmr_bulk_flush
every 1 GB of written data for example. It would be nice to do it in the
background though, and not stall the bulk writing for it. With the AIO
patches, I presume we could easily start an asynchronous writeback and
not wait for it to finish.

Sorry for the sudden wall of text/monologue; this is all a sort of
reaction to bulk_write.c that I should perhaps have sent to the
bulk_write.c thread, triggered by a couple of debugging sessions.

Thanks for looking! This all makes sense. The idea of introducing the
bulk write interface was that now we have a natural place to put all
these heuristics and optimizations in. That seems to be a success,
AFAICS none of the things discussed here will change the bulk_write API,
just the implementation.

--
Heikki Linnakangas
Neon (https://neon.tech)

#6Thomas Munro
thomas.munro@gmail.com
In reply to: Heikki Linnakangas (#5)
Re: Vectored I/O in bulk_write.c

On Wed, Mar 13, 2024 at 9:57 PM Heikki Linnakangas <hlinnaka@iki.fi> wrote:

Let's bite the bullet and merge the smgrwrite and smgrextend functions
at the smgr level too. I propose the following signature:

#define SWF_SKIP_FSYNC 0x01
#define SWF_EXTEND 0x02
#define SWF_ZERO 0x04

void smgrwritev(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum,
const void **buffer, BlockNumber nblocks,
int flags);

This would replace smgwrite, smgrextend, and smgrzeroextend. The

That sounds pretty good to me.

Here also is a first attempt at improving the memory allocation and
memory layout.
...
+typedef union BufferSlot
+{
+     PGIOAlignedBlock buffer;
+     dlist_node      freelist_node;
+}                    BufferSlot;
+

If you allocated the buffers in one large contiguous chunk, you could
often do one large write() instead of a gathered writev() of multiple
blocks. That should be even better, although I don't know much of a
difference it makes. The above layout wastes a fair amount memory too,
because 'buffer' is I/O aligned.

The patch I posted has an array of buffers with the properties you
describe, so you get a pwrite() (no 'v') sometimes, and a pwritev()
with a small iovcnt when it wraps around:

pwrite(...) = 131072 (0x20000)
pwritev(...,3,...) = 131072 (0x20000)
pwrite(...) = 131072 (0x20000)
pwritev(...,3,...) = 131072 (0x20000)
pwrite(...) = 131072 (0x20000)

Hmm, I expected pwrite() alternating with pwritev(iovcnt=2), the
latter for when it wraps around the buffer array, so I'm not sure why it's
3. I guess the btree code isn't writing them strictly monotonically or
something...

I don't believe it wastes any memory on padding (except a few bytes
wasted by palloc_aligned() before BulkWriteState):

(gdb) p &bulkstate->buffer_slots[0]
$4 = (BufferSlot *) 0x15c731cb4000
(gdb) p &bulkstate->buffer_slots[1]
$5 = (BufferSlot *) 0x15c731cb6000
(gdb) p sizeof(bulkstate->buffer_slots[0])
$6 = 8192

#7Heikki Linnakangas
hlinnaka@iki.fi
In reply to: Thomas Munro (#6)
Re: Vectored I/O in bulk_write.c

On 13/03/2024 12:18, Thomas Munro wrote:

On Wed, Mar 13, 2024 at 9:57 PM Heikki Linnakangas <hlinnaka@iki.fi> wrote:

Here also is a first attempt at improving the memory allocation and
memory layout.
...
+typedef union BufferSlot
+{
+     PGIOAlignedBlock buffer;
+     dlist_node      freelist_node;
+}                    BufferSlot;
+

If you allocated the buffers in one large contiguous chunk, you could
often do one large write() instead of a gathered writev() of multiple
blocks. That should be even better, although I don't know much of a
difference it makes. The above layout wastes a fair amount memory too,
because 'buffer' is I/O aligned.

The patch I posted has an array of buffers with the properties you
describe, so you get a pwrite() (no 'v') sometimes, and a pwritev()
with a small iovcnt when it wraps around:

Oh I missed that it was "union BufferSlot". I thought it was a struct.
Never mind then.

--
Heikki Linnakangas
Neon (https://neon.tech)

#8Thomas Munro
thomas.munro@gmail.com
In reply to: Heikki Linnakangas (#7)
3 attachment(s)
Re: Vectored I/O in bulk_write.c

Alright, here is a first attempt at merging all three interfaces as
you suggested. I like it! I especially like the way it removes lots
of duplication.

I don't understand your argument about the location of the
write-vs-extent assertions. It seems to me that these are assertions
about what the *public* smgrnblocks() function returns. In other
words, we assert that the caller is aware of the current relation size
(and has some kind of interlocking scheme for that to be possible),
according to the smgr implementation's public interface. That's not
an assertion about internal details of the smgr implementation, it's
part of the "contract" for the API.

Attachments:

v4-0001-Merge-smgrzeroextend-and-smgrextend-with-smgrwrit.patchtext/x-patch; charset=US-ASCII; name=v4-0001-Merge-smgrzeroextend-and-smgrextend-with-smgrwrit.patchDownload
From 0a57274e29369e61712941e379c24f7db1dec068 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 9 Mar 2024 16:04:21 +1300
Subject: [PATCH v4 1/3] Merge smgrzeroextend() and smgrextend() with
 smgrwritev().

Since mdwrite() and mdextend() were basically the same and both need
vectored variants, merge them into a single interface.  We still want
to be able to assert that callers know the difference between
overwriting and extending and activate slightly difference behavior
during recovery, so use flags to control that.

Likewise for the zero-extending variant, which is has much in common at
the interface level, except it doesn't deal in buffers.

The traditional single-block smgrwrite() and smgrextend() functions with
skipFsync boolean argument are translated to smgrwritev() by inlinable
wrapper functions, for low-overhead backwards-compatibility.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/buffer/bufmgr.c   |   7 +-
 src/backend/storage/buffer/localbuf.c |   3 +-
 src/backend/storage/smgr/md.c         | 119 +++++-------------------
 src/backend/storage/smgr/smgr.c       | 127 +++++++++-----------------
 src/include/storage/md.h              |   7 +-
 src/include/storage/smgr.h            |  22 +++--
 6 files changed, 91 insertions(+), 194 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f0f8d4259c..52bbdff336 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -2064,7 +2064,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 	io_start = pgstat_prepare_io_time(track_io_timing);
 
 	/*
-	 * Note: if smgrzeroextend fails, we will end up with buffers that are
+	 * Note: if smgrwritev fails, we will end up with buffers that are
 	 * allocated but not marked BM_VALID.  The next relation extension will
 	 * still select the same block number (because the relation didn't get any
 	 * longer on disk) and so future attempts to extend the relation will find
@@ -2073,7 +2073,8 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 	 *
 	 * We don't need to set checksum for all-zero pages.
 	 */
-	smgrzeroextend(bmr.smgr, fork, first_block, extend_by, false);
+	smgrwritev(bmr.smgr, fork, first_block, NULL, extend_by,
+			   SMGR_WRITE_EXTEND | SMGR_WRITE_ZERO);
 
 	/*
 	 * Release the file-extension lock; it's now OK for someone else to extend
@@ -3720,7 +3721,7 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum,
 	 *
 	 * In recovery, we cache the value returned by the first lseek(SEEK_END)
 	 * and the future writes keeps the cached value up-to-date. See
-	 * smgrextend. It is possible that the value of the first lseek is smaller
+	 * smgrwritev(). It is possible that the value of the first lseek is smaller
 	 * than the actual number of existing blocks in the file due to buggy
 	 * Linux kernels that might not have accounted for the recent write. But
 	 * that should be fine because there must not be any buffers after that
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index fcfac335a5..5b2b0fe9f4 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -416,7 +416,8 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 	io_start = pgstat_prepare_io_time(track_io_timing);
 
 	/* actually extend relation */
-	smgrzeroextend(bmr.smgr, fork, first_block, extend_by, false);
+	smgrwritev(bmr.smgr, fork, first_block, NULL, extend_by,
+			   SMGR_WRITE_EXTEND | SMGR_WRITE_ZERO);
 
 	pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
 							io_start, extend_by);
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index bf0f3ca76d..0d560393e5 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -447,83 +447,14 @@ mdunlinkfork(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo)
 	pfree(path);
 }
 
-/*
- * mdextend() -- Add a block to the specified relation.
- *
- * The semantics are nearly the same as mdwrite(): write at the
- * specified position.  However, this is to be used for the case of
- * extending a relation (i.e., blocknum is at or beyond the current
- * EOF).  Note that we assume writing a block beyond current EOF
- * causes intervening file space to become filled with zeroes.
- */
-void
-mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void *buffer, bool skipFsync)
-{
-	off_t		seekpos;
-	int			nbytes;
-	MdfdVec    *v;
-
-	/* If this build supports direct I/O, the buffer must be I/O aligned. */
-	if (PG_O_DIRECT != 0 && PG_IO_ALIGN_SIZE <= BLCKSZ)
-		Assert((uintptr_t) buffer == TYPEALIGN(PG_IO_ALIGN_SIZE, buffer));
-
-	/* This assert is too expensive to have on normally ... */
-#ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum >= mdnblocks(reln, forknum));
-#endif
-
-	/*
-	 * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any
-	 * more --- we mustn't create a block whose number actually is
-	 * InvalidBlockNumber.  (Note that this failure should be unreachable
-	 * because of upstream checks in bufmgr.c.)
-	 */
-	if (blocknum == InvalidBlockNumber)
-		ereport(ERROR,
-				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-				 errmsg("cannot extend file \"%s\" beyond %u blocks",
-						relpath(reln->smgr_rlocator, forknum),
-						InvalidBlockNumber)));
-
-	v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_CREATE);
-
-	seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
-
-	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
-
-	if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ)
-	{
-		if (nbytes < 0)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not extend file \"%s\": %m",
-							FilePathName(v->mdfd_vfd)),
-					 errhint("Check free disk space.")));
-		/* short write: complain appropriately */
-		ereport(ERROR,
-				(errcode(ERRCODE_DISK_FULL),
-				 errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u",
-						FilePathName(v->mdfd_vfd),
-						nbytes, BLCKSZ, blocknum),
-				 errhint("Check free disk space.")));
-	}
-
-	if (!skipFsync && !SmgrIsTemp(reln))
-		register_dirty_segment(reln, forknum, v);
-
-	Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
-}
-
 /*
  * mdzeroextend() -- Add new zeroed out blocks to the specified relation.
  *
- * Similar to mdextend(), except the relation can be extended by multiple
- * blocks at once and the added blocks will be filled with zeroes.
+ * The added blocks will be filled with zeroes.
  */
-void
+static void
 mdzeroextend(SMgrRelation reln, ForkNumber forknum,
-			 BlockNumber blocknum, int nblocks, bool skipFsync)
+			 BlockNumber blocknum, int nblocks, int flags)
 {
 	MdfdVec    *v;
 	BlockNumber curblocknum = blocknum;
@@ -559,7 +490,8 @@ mdzeroextend(SMgrRelation reln, ForkNumber forknum,
 		else
 			numblocks = remblocks;
 
-		v = _mdfd_getseg(reln, forknum, curblocknum, skipFsync, EXTENSION_CREATE);
+		v = _mdfd_getseg(reln, forknum, curblocknum,
+						 flags & SMGR_WRITE_SKIP_FSYNC, EXTENSION_CREATE);
 
 		Assert(segstartblock < RELSEG_SIZE);
 		Assert(segstartblock + numblocks <= RELSEG_SIZE);
@@ -595,13 +527,7 @@ mdzeroextend(SMgrRelation reln, ForkNumber forknum,
 		{
 			int			ret;
 
-			/*
-			 * Even if we don't want to use fallocate, we can still extend a
-			 * bit more efficiently than writing each 8kB block individually.
-			 * pg_pwrite_zeros() (via FileZero()) uses pg_pwritev_with_retry()
-			 * to avoid multiple writes or needing a zeroed buffer for the
-			 * whole length of the extension.
-			 */
+			/* Fall back to writing out zeroes. */
 			ret = FileZero(v->mdfd_vfd,
 						   seekpos, (off_t) BLCKSZ * numblocks,
 						   WAIT_EVENT_DATA_FILE_EXTEND);
@@ -613,7 +539,7 @@ mdzeroextend(SMgrRelation reln, ForkNumber forknum,
 						errhint("Check free disk space."));
 		}
 
-		if (!skipFsync && !SmgrIsTemp(reln))
+		if ((flags & SMGR_WRITE_SKIP_FSYNC) == 0 && !SmgrIsTemp(reln))
 			register_dirty_segment(reln, forknum, v);
 
 		Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
@@ -919,19 +845,14 @@ mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 
 /*
  * mdwritev() -- Write the supplied blocks at the appropriate location.
- *
- * This is to be used only for updating already-existing blocks of a
- * relation (ie, those before the current EOF).  To extend a relation,
- * use mdextend().
  */
 void
 mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void **buffers, BlockNumber nblocks, bool skipFsync)
+		 const void **buffers, BlockNumber nblocks,
+		 int flags)
 {
-	/* This assert is too expensive to have on normally ... */
-#ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum < mdnblocks(reln, forknum));
-#endif
+	if (flags & SMGR_WRITE_ZERO)
+		return mdzeroextend(reln, forknum, blocknum, nblocks, flags);
 
 	while (nblocks > 0)
 	{
@@ -944,7 +865,9 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 		size_t		transferred_this_segment;
 		size_t		size_this_segment;
 
-		v = _mdfd_getseg(reln, forknum, blocknum, skipFsync,
+		v = _mdfd_getseg(reln, forknum, blocknum, flags & SMGR_WRITE_SKIP_FSYNC,
+						 (flags & SMGR_WRITE_EXTEND) ?
+						 EXTENSION_CREATE :
 						 EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
 
 		seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
@@ -992,7 +915,9 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 
 				ereport(ERROR,
 						(errcode_for_file_access(),
-						 errmsg("could not write blocks %u..%u in file \"%s\": %m",
+						 errmsg((flags & SMGR_WRITE_EXTEND) ?
+								"could not extend blocks %u..%u in file \"%s\": %m" :
+								"could not write blocks %u..%u in file \"%s\": %m",
 								blocknum,
 								blocknum + nblocks_this_segment - 1,
 								FilePathName(v->mdfd_vfd)),
@@ -1010,7 +935,7 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 			iovcnt = compute_remaining_iovec(iov, iov, iovcnt, nbytes);
 		}
 
-		if (!skipFsync && !SmgrIsTemp(reln))
+		if ((flags & SMGR_WRITE_SKIP_FSYNC) == 0 && !SmgrIsTemp(reln))
 			register_dirty_segment(reln, forknum, v);
 
 		nblocks -= nblocks_this_segment;
@@ -1638,7 +1563,7 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 		{
 			/*
 			 * Normally we will create new segments only if authorized by the
-			 * caller (i.e., we are doing mdextend()).  But when doing WAL
+			 * caller (i.e., we are doing smgrextend()).  But when doing WAL
 			 * recovery, create segments anyway; this allows cases such as
 			 * replaying WAL data that has a write into a high-numbered
 			 * segment of a relation that was later deleted. We want to go
@@ -1655,9 +1580,9 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 				char	   *zerobuf = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE,
 													 MCXT_ALLOC_ZERO);
 
-				mdextend(reln, forknum,
-						 nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
-						 zerobuf, skipFsync);
+				smgrextend(reln, forknum,
+						   nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
+						   zerobuf, skipFsync);
 				pfree(zerobuf);
 			}
 			flags = O_CREAT;
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 62226d5dca..6ead87a795 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -82,10 +82,6 @@ typedef struct f_smgr
 	bool		(*smgr_exists) (SMgrRelation reln, ForkNumber forknum);
 	void		(*smgr_unlink) (RelFileLocatorBackend rlocator, ForkNumber forknum,
 								bool isRedo);
-	void		(*smgr_extend) (SMgrRelation reln, ForkNumber forknum,
-								BlockNumber blocknum, const void *buffer, bool skipFsync);
-	void		(*smgr_zeroextend) (SMgrRelation reln, ForkNumber forknum,
-									BlockNumber blocknum, int nblocks, bool skipFsync);
 	bool		(*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum,
 								  BlockNumber blocknum, int nblocks);
 	void		(*smgr_readv) (SMgrRelation reln, ForkNumber forknum,
@@ -94,7 +90,7 @@ typedef struct f_smgr
 	void		(*smgr_writev) (SMgrRelation reln, ForkNumber forknum,
 								BlockNumber blocknum,
 								const void **buffers, BlockNumber nblocks,
-								bool skipFsync);
+								int flags);
 	void		(*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
 								   BlockNumber blocknum, BlockNumber nblocks);
 	BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
@@ -114,8 +110,6 @@ static const f_smgr smgrsw[] = {
 		.smgr_create = mdcreate,
 		.smgr_exists = mdexists,
 		.smgr_unlink = mdunlink,
-		.smgr_extend = mdextend,
-		.smgr_zeroextend = mdzeroextend,
 		.smgr_prefetch = mdprefetch,
 		.smgr_readv = mdreadv,
 		.smgr_writev = mdwritev,
@@ -521,59 +515,6 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
 	pfree(rlocators);
 }
 
-
-/*
- * smgrextend() -- Add a new block to a file.
- *
- * The semantics are nearly the same as smgrwrite(): write at the
- * specified position.  However, this is to be used for the case of
- * extending a relation (i.e., blocknum is at or beyond the current
- * EOF).  Note that we assume writing a block beyond current EOF
- * causes intervening file space to become filled with zeroes.
- */
-void
-smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		   const void *buffer, bool skipFsync)
-{
-	smgrsw[reln->smgr_which].smgr_extend(reln, forknum, blocknum,
-										 buffer, skipFsync);
-
-	/*
-	 * Normally we expect this to increase nblocks by one, but if the cached
-	 * value isn't as expected, just invalidate it so the next call asks the
-	 * kernel.
-	 */
-	if (reln->smgr_cached_nblocks[forknum] == blocknum)
-		reln->smgr_cached_nblocks[forknum] = blocknum + 1;
-	else
-		reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
-}
-
-/*
- * smgrzeroextend() -- Add new zeroed out blocks to a file.
- *
- * Similar to smgrextend(), except the relation can be extended by
- * multiple blocks at once and the added blocks will be filled with
- * zeroes.
- */
-void
-smgrzeroextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-			   int nblocks, bool skipFsync)
-{
-	smgrsw[reln->smgr_which].smgr_zeroextend(reln, forknum, blocknum,
-											 nblocks, skipFsync);
-
-	/*
-	 * Normally we expect this to increase the fork size by nblocks, but if
-	 * the cached value isn't as expected, just invalidate it so the next call
-	 * asks the kernel.
-	 */
-	if (reln->smgr_cached_nblocks[forknum] == blocknum)
-		reln->smgr_cached_nblocks[forknum] = blocknum + nblocks;
-	else
-		reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
-}
-
 /*
  * smgrprefetch() -- Initiate asynchronous read of the specified block of a relation.
  *
@@ -607,9 +548,9 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 /*
  * smgrwritev() -- Write the supplied buffers out.
  *
- * This is to be used only for updating already-existing blocks of a
- * relation (ie, those before the current EOF).  To extend a relation,
- * use smgrextend().
+ * By default this is to be used only for updating already-existing blocks of
+ * a relation (ie, those before the current EOF).  To extend a relation,
+ * specify SMGR_WRITE_EXTEND, optionally with SMGR_WRITE_ZERO.
  *
  * This is not a synchronous write -- the block is not necessarily
  * on disk at return, only dumped out to the kernel.  However,
@@ -629,10 +570,33 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
  */
 void
 smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		   const void **buffers, BlockNumber nblocks, bool skipFsync)
+		   const void **buffers, BlockNumber nblocks, int flags)
 {
+	if (flags & SMGR_WRITE_ZERO)
+		Assert(flags & SMGR_WRITE_EXTEND);
+#ifdef CHECK_WRITE_VS_EXTEND
+	/* These assert are too expensive to have on normally ... */
+	if (flags & SMGR_WRITE_EXTEND)
+		Assert(blocknum >= smgrnblocks(reln, forknum));
+	else
+		Assert(blocknum + nblocks <= smgrnblocks(reln, forknum));
+#endif
+
 	smgrsw[reln->smgr_which].smgr_writev(reln, forknum, blocknum,
-										 buffers, nblocks, skipFsync);
+										 buffers, nblocks, flags);
+
+	if (flags & SMGR_WRITE_EXTEND)
+	{
+		/*
+		 * Normally we expect this to increase the fork size by nblocks, but
+		 * if the cached value isn't as expected, just invalidate it so the
+		 * next call asks the smgr implementation.
+		 */
+		if (reln->smgr_cached_nblocks[forknum] == blocknum)
+			reln->smgr_cached_nblocks[forknum] = blocknum + nblocks;
+		else
+			reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
+	}
 }
 
 /*
@@ -743,14 +707,14 @@ smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nb
 /*
  * smgrregistersync() -- Request a relation to be sync'd at next checkpoint
  *
- * This can be used after calling smgrwrite() or smgrextend() with skipFsync =
- * true, to register the fsyncs that were skipped earlier.
+ * This can be used after calling smgrwritev() with SMGR_WRITE_SKIP_FSYNC,
+ * to register the fsyncs that were skipped earlier.
  *
  * Note: be mindful that a checkpoint could already have happened between the
- * smgrwrite or smgrextend calls and this!  In that case, the checkpoint
- * already missed fsyncing this relation, and you should use smgrimmedsync
- * instead.  Most callers should use the bulk loading facility in bulk_write.c
- * which handles all that.
+ * smgrwritev calls and this!  In that case, the checkpoint already missed
+ * fsyncing this relation, and you should use smgrimmedsync instead.  Most
+ * callers should use the bulk loading facility in bulk_write.c which handles
+ * all that.
  */
 void
 smgrregistersync(SMgrRelation reln, ForkNumber forknum)
@@ -764,17 +728,16 @@ smgrregistersync(SMgrRelation reln, ForkNumber forknum)
  * Synchronously force all previous writes to the specified relation
  * down to disk.
  *
- * This is useful for building completely new relations (eg, new
- * indexes).  Instead of incrementally WAL-logging the index build
- * steps, we can just write completed index pages to disk with smgrwrite
- * or smgrextend, and then fsync the completed index file before
- * committing the transaction.  (This is sufficient for purposes of
- * crash recovery, since it effectively duplicates forcing a checkpoint
- * for the completed index.  But it is *not* sufficient if one wishes
- * to use the WAL log for PITR or replication purposes: in that case
- * we have to make WAL entries as well.)
- *
- * The preceding writes should specify skipFsync = true to avoid
+ * This is useful for building completely new relations (eg, new indexes).
+ * Instead of incrementally WAL-logging the index build steps, we can just
+ * write completed index pages to disk with smgrwritev, nd then fsync the
+ * completed index file before committing the transaction.  (This is
+ * sufficient for purposes of crash recovery, since it effectively duplicates
+ * forcing a checkpoint for the completed index.  But it is *not* sufficient
+ * if one wishes to use the WAL log for PITR or replication purposes: in that
+ * case we have to make WAL entries as well.)
+ *
+ * The preceding writes should specify SMGR_WRITE_SKIP_FSYNC to avoid
  * duplicative fsyncs.
  *
  * Note that you need to do FlushRelationBuffers() first if there is
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 620f10abde..794e3d7b40 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -26,17 +26,14 @@ extern void mdclose(SMgrRelation reln, ForkNumber forknum);
 extern void mdcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern bool mdexists(SMgrRelation reln, ForkNumber forknum);
 extern void mdunlink(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo);
-extern void mdextend(SMgrRelation reln, ForkNumber forknum,
-					 BlockNumber blocknum, const void *buffer, bool skipFsync);
-extern void mdzeroextend(SMgrRelation reln, ForkNumber forknum,
-						 BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool mdprefetch(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber blocknum, int nblocks);
 extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 					void **buffers, BlockNumber nblocks);
 extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
 					 BlockNumber blocknum,
-					 const void **buffers, BlockNumber nblocks, bool skipFsync);
+					 const void **buffers, BlockNumber nblocks,
+					 int flags);
 extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
 						BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index fc5f883ce1..8598b89e2e 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -73,6 +73,10 @@ typedef SMgrRelationData *SMgrRelation;
 #define SmgrIsTemp(smgr) \
 	RelFileLocatorBackendIsTemp((smgr)->smgr_rlocator)
 
+#define SMGR_WRITE_SKIP_FSYNC 0x01
+#define SMGR_WRITE_EXTEND 0x02
+#define SMGR_WRITE_ZERO 0x04
+
 extern void smgrinit(void);
 extern SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend);
 extern bool smgrexists(SMgrRelation reln, ForkNumber forknum);
@@ -86,10 +90,6 @@ extern void smgrreleaserellocator(RelFileLocatorBackend rlocator);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
-extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
-					   BlockNumber blocknum, const void *buffer, bool skipFsync);
-extern void smgrzeroextend(SMgrRelation reln, ForkNumber forknum,
-						   BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum,
 						 BlockNumber blocknum, int nblocks);
 extern void smgrreadv(SMgrRelation reln, ForkNumber forknum,
@@ -98,7 +98,7 @@ extern void smgrreadv(SMgrRelation reln, ForkNumber forknum,
 extern void smgrwritev(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber blocknum,
 					   const void **buffer, BlockNumber nblocks,
-					   bool skipFsync);
+					   int flags);
 extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,
 						  BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
@@ -121,7 +121,17 @@ static inline void
 smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 		  const void *buffer, bool skipFsync)
 {
-	smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
+	smgrwritev(reln, forknum, blocknum, &buffer, 1,
+			   skipFsync ? SMGR_WRITE_SKIP_FSYNC : 0);
+}
+
+static inline void
+smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+		   const void *buffer, bool skipFsync)
+{
+	smgrwritev(reln, forknum, blocknum, &buffer, 1,
+			   SMGR_WRITE_EXTEND |
+			   (skipFsync ? SMGR_WRITE_SKIP_FSYNC : 0));
 }
 
 #endif							/* SMGR_H */
-- 
2.43.2

v4-0002-Use-vectored-I-O-for-bulk-writes.patchtext/x-patch; charset=US-ASCII; name=v4-0002-Use-vectored-I-O-for-bulk-writes.patchDownload
From d9446573bade5ead71159f75c39d965c98d9038e Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 9 Mar 2024 16:54:56 +1300
Subject: [PATCH v4 2/3] Use vectored I/O for bulk writes.

bulk_write.c was originally designed with the goal of being able to
use the new vectored write APIs, but couldn't initially  because the
vectored variant of "smgrextend" didn't exist yet.  Now that
smgrwritev() can also handle extension, we can use it here to get wide
write system calls.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/smgr/bulk_write.c | 90 +++++++++++++++++++--------
 1 file changed, 65 insertions(+), 25 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 4a10ece4c3..df1c401e88 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -8,7 +8,7 @@
  * the regular buffer manager and the bulk loading interface!
  *
  * We bypass the buffer manager to avoid the locking overhead, and call
- * smgrextend() directly.  A downside is that the pages will need to be
+ * smgrextendv() directly.  A downside is that the pages will need to be
  * re-read into shared buffers on first use after the build finishes.  That's
  * usually a good tradeoff for large relations, and for small relations, the
  * overhead isn't very significant compared to creating the relation in the
@@ -45,8 +45,6 @@
 
 #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
 
-static const PGIOAlignedBlock zero_buffer = {{0}};	/* worth BLCKSZ */
-
 typedef struct PendingWrite
 {
 	BulkWriteBuffer buf;
@@ -225,35 +223,77 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 	for (int i = 0; i < npending; i++)
 	{
-		BlockNumber blkno = pending_writes[i].blkno;
-		Page		page = pending_writes[i].buf->data;
-
+		Page		page;
+		const void *pages[16];
+		BlockNumber blkno;
+		int			nblocks;
+		int			max_nblocks;
+
+		/* Prepare to write the first block. */
+		blkno = pending_writes[i].blkno;
+		page = pending_writes[i].buf->data;
 		PageSetChecksumInplace(page, blkno);
+		pages[0] = page;
+		nblocks = 1;
+
+		/* Zero-extend any missing space before the first block. */
+		if (blkno > bulkstate->pages_written)
+		{
+			int			nzeroblocks;
+
+			nzeroblocks = blkno - bulkstate->pages_written;
+			smgrwritev(bulkstate->smgr, bulkstate->forknum,
+					   bulkstate->pages_written, NULL, nzeroblocks,
+					   SMGR_WRITE_SKIP_FSYNC |
+					   SMGR_WRITE_EXTEND |
+					   SMGR_WRITE_ZERO);
+			bulkstate->pages_written += nzeroblocks;
+		}
 
-		if (blkno >= bulkstate->pages_written)
+		if (blkno < bulkstate->pages_written)
 		{
 			/*
-			 * If we have to write pages nonsequentially, fill in the space
-			 * with zeroes until we come back and overwrite.  This is not
-			 * logically necessary on standard Unix filesystems (unwritten
-			 * space will read as zeroes anyway), but it should help to avoid
-			 * fragmentation.  The dummy pages aren't WAL-logged though.
+			 * We're overwriting.  Clamp at the existing size, because we
+			 * can't mix writing and extending in a single operation.
 			 */
-			while (blkno > bulkstate->pages_written)
-			{
-				/* don't set checksum for all-zero page */
-				smgrextend(bulkstate->smgr, bulkstate->forknum,
-						   bulkstate->pages_written++,
-						   &zero_buffer,
-						   true);
-			}
-
-			smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-			bulkstate->pages_written = pending_writes[i].blkno + 1;
+			max_nblocks = Min(lengthof(pages),
+							  bulkstate->pages_written - blkno);
 		}
 		else
-			smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-		pfree(page);
+		{
+			/* We're extending. */
+			Assert(blkno == bulkstate->pages_written);
+			max_nblocks = lengthof(pages);
+		}
+
+		/* Find as many consecutive blocks as we can. */
+		while (i + 1 < npending &&
+			   pending_writes[i + 1].blkno == blkno + nblocks &&
+			   nblocks < max_nblocks)
+		{
+			page = pending_writes[++i].buf->data;
+			PageSetChecksumInplace(page, pending_writes[i].blkno);
+			pages[nblocks++] = page;
+		}
+
+		/* Extend or overwrite. */
+		if (blkno == bulkstate->pages_written)
+		{
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno,
+					   pages, nblocks,
+					   SMGR_WRITE_SKIP_FSYNC | SMGR_WRITE_EXTEND);
+			bulkstate->pages_written += nblocks;
+		}
+		else
+		{
+			Assert(blkno + nblocks <= bulkstate->pages_written);
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno,
+					   pages, nblocks,
+					   SMGR_WRITE_SKIP_FSYNC);
+		}
+
+		for (int j = 0; j < nblocks; ++j)
+			pfree(pending_writes[i - j].buf->data);
 	}
 
 	bulkstate->npending = 0;
-- 
2.43.2

v4-0003-Improve-bulk_write.c-memory-management.patchtext/x-patch; charset=US-ASCII; name=v4-0003-Improve-bulk_write.c-memory-management.patchDownload
From 5246f51acdcfa815ca71522963e1ad897e438957 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 11 Mar 2024 11:44:41 +1300
Subject: [PATCH v4 3/3] Improve bulk_write.c memory management.

Instead of allocating buffers one at a time with palloc(), allocate an
array full of them up front, and then manage them in a FIFO freelist.
Aside from avoiding allocator overheads, this means that callers who
write sequential blocks will tend to fill up sequential memory, which
hopefully generates more efficient vectored writes.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/smgr/bulk_write.c | 62 +++++++++++++++++++--------
 1 file changed, 43 insertions(+), 19 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index df1c401e88..38e45c3178 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -36,6 +36,7 @@
 
 #include "access/xloginsert.h"
 #include "access/xlogrecord.h"
+#include "lib/ilist.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
 #include "storage/bulk_write.h"
@@ -45,9 +46,15 @@
 
 #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
 
+typedef union BufferSlot
+{
+	PGIOAlignedBlock buffer;
+	dlist_node	freelist_node;
+}			BufferSlot;
+
 typedef struct PendingWrite
 {
-	BulkWriteBuffer buf;
+	BufferSlot *slot;
 	BlockNumber blkno;
 	bool		page_std;
 } PendingWrite;
@@ -57,6 +64,10 @@ typedef struct PendingWrite
  */
 struct BulkWriteState
 {
+	/* Comes first so we can align it correctly. */
+	BufferSlot	buffer_slots[MAX_PENDING_WRITES + 2];
+	dlist_head	buffer_slots_freelist;
+
 	/* Information about the target relation we're writing */
 	SMgrRelation smgr;
 	ForkNumber	forknum;
@@ -71,8 +82,6 @@ struct BulkWriteState
 
 	/* The RedoRecPtr at the time that the bulk operation started */
 	XLogRecPtr	start_RedoRecPtr;
-
-	MemoryContext memcxt;
 };
 
 static void smgr_bulk_flush(BulkWriteState *bulkstate);
@@ -98,7 +107,7 @@ smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
 {
 	BulkWriteState *state;
 
-	state = palloc(sizeof(BulkWriteState));
+	state = palloc_aligned(sizeof(BulkWriteState), PG_IO_ALIGN_SIZE, 0);
 	state->smgr = smgr;
 	state->forknum = forknum;
 	state->use_wal = use_wal;
@@ -108,11 +117,11 @@ smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
 
 	state->start_RedoRecPtr = GetRedoRecPtr();
 
-	/*
-	 * Remember the memory context.  We will use it to allocate all the
-	 * buffers later.
-	 */
-	state->memcxt = CurrentMemoryContext;
+	/* Set up the free-list of buffers. */
+	dlist_init(&state->buffer_slots_freelist);
+	for (int i = 0; i < lengthof(state->buffer_slots); ++i)
+		dlist_push_tail(&state->buffer_slots_freelist,
+						&state->buffer_slots[i].freelist_node);
 
 	return state;
 }
@@ -206,7 +215,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 		for (int i = 0; i < npending; i++)
 		{
 			blknos[i] = pending_writes[i].blkno;
-			pages[i] = pending_writes[i].buf->data;
+			pages[i] = pending_writes[i].slot->buffer.data;
 
 			/*
 			 * If any of the pages use !page_std, we log them all as such.
@@ -231,7 +240,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 		/* Prepare to write the first block. */
 		blkno = pending_writes[i].blkno;
-		page = pending_writes[i].buf->data;
+		page = pending_writes[i].slot->buffer.data;
 		PageSetChecksumInplace(page, blkno);
 		pages[0] = page;
 		nblocks = 1;
@@ -271,7 +280,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 			   pending_writes[i + 1].blkno == blkno + nblocks &&
 			   nblocks < max_nblocks)
 		{
-			page = pending_writes[++i].buf->data;
+			page = pending_writes[++i].slot->buffer.data;
 			PageSetChecksumInplace(page, pending_writes[i].blkno);
 			pages[nblocks++] = page;
 		}
@@ -292,8 +301,14 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 					   SMGR_WRITE_SKIP_FSYNC);
 		}
 
-		for (int j = 0; j < nblocks; ++j)
-			pfree(pending_writes[i - j].buf->data);
+		/*
+		 * Maintain FIFO ordering in the free list, so that users who write
+		 * blocks in sequential order tend to get sequential chunks of buffer
+		 * memory, which may be slight more efficient for vectored writes.
+		 */
+		for (int j = i - nblocks + 1; j <= i; ++j)
+			dlist_push_tail(&bulkstate->buffer_slots_freelist,
+							&pending_writes[j].slot->freelist_node);
 	}
 
 	bulkstate->npending = 0;
@@ -313,7 +328,7 @@ smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer
 	PendingWrite *w;
 
 	w = &bulkstate->pending_writes[bulkstate->npending++];
-	w->buf = buf;
+	w->slot = (BufferSlot *) buf;
 	w->blkno = blocknum;
 	w->page_std = page_std;
 
@@ -327,12 +342,21 @@ smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer
  * There is no function to free the buffer.  When you pass it to
  * smgr_bulk_write(), it takes ownership and frees it when it's no longer
  * needed.
- *
- * This is currently implemented as a simple palloc, but could be implemented
- * using a ring buffer or larger chunks in the future, so don't rely on it.
  */
 BulkWriteBuffer
 smgr_bulk_get_buf(BulkWriteState *bulkstate)
 {
-	return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+	BufferSlot *slot;
+
+	if (dlist_is_empty(&bulkstate->buffer_slots_freelist))
+	{
+		smgr_bulk_flush(bulkstate);
+		if (dlist_is_empty(&bulkstate->buffer_slots_freelist))
+			elog(ERROR, "too many bulk write buffers used but not yet written");
+	}
+
+	slot = dlist_head_element(BufferSlot, freelist_node, &bulkstate->buffer_slots_freelist);
+	dlist_pop_head_node(&bulkstate->buffer_slots_freelist);
+
+	return &slot->buffer;
 }
-- 
2.43.2

#9Heikki Linnakangas
hlinnaka@iki.fi
In reply to: Thomas Munro (#8)
Re: Vectored I/O in bulk_write.c

On 13/03/2024 23:12, Thomas Munro wrote:

Alright, here is a first attempt at merging all three interfaces as
you suggested. I like it! I especially like the way it removes lots
of duplication.

I don't understand your argument about the location of the
write-vs-extent assertions. It seems to me that these are assertions
about what the *public* smgrnblocks() function returns. In other
words, we assert that the caller is aware of the current relation size
(and has some kind of interlocking scheme for that to be possible),
according to the smgr implementation's public interface. That's not
an assertion about internal details of the smgr implementation, it's
part of the "contract" for the API.

I tried to say that smgr implementation might have better ways to assert
that than calling smgrnblocks(), so it would be better to leave it to
the implementation. But what bothered me most was that smgrwrite() had a
different signature than mdwrite(). I'm happy with the way you have it
in the v4 patch.

--
Heikki Linnakangas
Neon (https://neon.tech)

#10Thomas Munro
thomas.munro@gmail.com
In reply to: Heikki Linnakangas (#9)
3 attachment(s)
Re: Vectored I/O in bulk_write.c

On Thu, Mar 14, 2024 at 10:49 AM Heikki Linnakangas <hlinnaka@iki.fi> wrote:

I tried to say that smgr implementation might have better ways to assert
that than calling smgrnblocks(), so it would be better to leave it to
the implementation. But what bothered me most was that smgrwrite() had a
different signature than mdwrite(). I'm happy with the way you have it
in the v4 patch.

Looking for other things that can be hoisted up to smgr.c level
because they are part of the contract or would surely need to be
duplicated by any implementation: I think the check that you don't
exceed the maximum possible block number could be there too, no?
Here's a version like that.

Does anyone else want to object to doing this for 17? Probably still
needs some cleanup eg comments etc that may be lurking around the
place and another round of testing. As for the overall idea, I'll
leave it a few days and see if others have feedback. My take is that
this is what bulk_write.c was clearly intended to do, it's just that
smgr let it down by not allowing vectored extension yet. It's a
fairly mechanical simplification, generalisation, and net code
reduction to do so by merging, like this.

Attachments:

v5-0001-Use-smgrwritev-for-both-overwriting-and-extending.patchtext/x-patch; charset=US-ASCII; name=v5-0001-Use-smgrwritev-for-both-overwriting-and-extending.patchDownload
From 61b351b60d22060e5fc082645cdfc19188ac4841 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 9 Mar 2024 16:04:21 +1300
Subject: [PATCH v5 1/3] Use smgrwritev() for both overwriting and extending.

Since mdwrite() and mdextend() were basically the same and both need
vectored variants, merge them into a single interface to reduce
duplication.  We still want to be able to assert that callers know the
difference between overwriting and extending and activate slightly
different behavior during recovery, so use a new flags argument to
control that.

Likewise for the zero-extending variant, which is has much in common at
the interface level, except it doesn't deal in buffers.

The traditional single-block smgrwrite() and smgrextend() functions with
skipFsync boolean argument are translated to smgrwritev() by inlinable
wrapper functions, for low-overhead backwards-compatibility.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/buffer/bufmgr.c   |  15 +--
 src/backend/storage/buffer/localbuf.c |   3 +-
 src/backend/storage/smgr/md.c         | 136 ++++--------------------
 src/backend/storage/smgr/smgr.c       | 145 +++++++++++---------------
 src/include/storage/md.h              |   7 +-
 src/include/storage/smgr.h            |  22 ++--
 6 files changed, 110 insertions(+), 218 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f0f8d4259c..6caf35a9d6 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -2064,7 +2064,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 	io_start = pgstat_prepare_io_time(track_io_timing);
 
 	/*
-	 * Note: if smgrzeroextend fails, we will end up with buffers that are
+	 * Note: if smgrwritev fails, we will end up with buffers that are
 	 * allocated but not marked BM_VALID.  The next relation extension will
 	 * still select the same block number (because the relation didn't get any
 	 * longer on disk) and so future attempts to extend the relation will find
@@ -2073,7 +2073,8 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 	 *
 	 * We don't need to set checksum for all-zero pages.
 	 */
-	smgrzeroextend(bmr.smgr, fork, first_block, extend_by, false);
+	smgrwritev(bmr.smgr, fork, first_block, NULL, extend_by,
+			   SMGR_WRITE_EXTEND | SMGR_WRITE_ZERO);
 
 	/*
 	 * Release the file-extension lock; it's now OK for someone else to extend
@@ -3720,11 +3721,11 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum,
 	 *
 	 * In recovery, we cache the value returned by the first lseek(SEEK_END)
 	 * and the future writes keeps the cached value up-to-date. See
-	 * smgrextend. It is possible that the value of the first lseek is smaller
-	 * than the actual number of existing blocks in the file due to buggy
-	 * Linux kernels that might not have accounted for the recent write. But
-	 * that should be fine because there must not be any buffers after that
-	 * file size.
+	 * smgrwritev(). It is possible that the value of the first lseek is
+	 * smaller than the actual number of existing blocks in the file due to
+	 * buggy Linux kernels that might not have accounted for the recent write.
+	 * But that should be fine because there must not be any buffers after
+	 * that file size.
 	 */
 	for (i = 0; i < nforks; i++)
 	{
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index fcfac335a5..5b2b0fe9f4 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -416,7 +416,8 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 	io_start = pgstat_prepare_io_time(track_io_timing);
 
 	/* actually extend relation */
-	smgrzeroextend(bmr.smgr, fork, first_block, extend_by, false);
+	smgrwritev(bmr.smgr, fork, first_block, NULL, extend_by,
+			   SMGR_WRITE_EXTEND | SMGR_WRITE_ZERO);
 
 	pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
 							io_start, extend_by);
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index bf0f3ca76d..3df0d5fffe 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -447,83 +447,14 @@ mdunlinkfork(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo)
 	pfree(path);
 }
 
-/*
- * mdextend() -- Add a block to the specified relation.
- *
- * The semantics are nearly the same as mdwrite(): write at the
- * specified position.  However, this is to be used for the case of
- * extending a relation (i.e., blocknum is at or beyond the current
- * EOF).  Note that we assume writing a block beyond current EOF
- * causes intervening file space to become filled with zeroes.
- */
-void
-mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void *buffer, bool skipFsync)
-{
-	off_t		seekpos;
-	int			nbytes;
-	MdfdVec    *v;
-
-	/* If this build supports direct I/O, the buffer must be I/O aligned. */
-	if (PG_O_DIRECT != 0 && PG_IO_ALIGN_SIZE <= BLCKSZ)
-		Assert((uintptr_t) buffer == TYPEALIGN(PG_IO_ALIGN_SIZE, buffer));
-
-	/* This assert is too expensive to have on normally ... */
-#ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum >= mdnblocks(reln, forknum));
-#endif
-
-	/*
-	 * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any
-	 * more --- we mustn't create a block whose number actually is
-	 * InvalidBlockNumber.  (Note that this failure should be unreachable
-	 * because of upstream checks in bufmgr.c.)
-	 */
-	if (blocknum == InvalidBlockNumber)
-		ereport(ERROR,
-				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-				 errmsg("cannot extend file \"%s\" beyond %u blocks",
-						relpath(reln->smgr_rlocator, forknum),
-						InvalidBlockNumber)));
-
-	v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_CREATE);
-
-	seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
-
-	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
-
-	if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ)
-	{
-		if (nbytes < 0)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not extend file \"%s\": %m",
-							FilePathName(v->mdfd_vfd)),
-					 errhint("Check free disk space.")));
-		/* short write: complain appropriately */
-		ereport(ERROR,
-				(errcode(ERRCODE_DISK_FULL),
-				 errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u",
-						FilePathName(v->mdfd_vfd),
-						nbytes, BLCKSZ, blocknum),
-				 errhint("Check free disk space.")));
-	}
-
-	if (!skipFsync && !SmgrIsTemp(reln))
-		register_dirty_segment(reln, forknum, v);
-
-	Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
-}
-
 /*
  * mdzeroextend() -- Add new zeroed out blocks to the specified relation.
  *
- * Similar to mdextend(), except the relation can be extended by multiple
- * blocks at once and the added blocks will be filled with zeroes.
+ * The added blocks will be filled with zeroes.
  */
-void
+static void
 mdzeroextend(SMgrRelation reln, ForkNumber forknum,
-			 BlockNumber blocknum, int nblocks, bool skipFsync)
+			 BlockNumber blocknum, int nblocks, int flags)
 {
 	MdfdVec    *v;
 	BlockNumber curblocknum = blocknum;
@@ -531,23 +462,6 @@ mdzeroextend(SMgrRelation reln, ForkNumber forknum,
 
 	Assert(nblocks > 0);
 
-	/* This assert is too expensive to have on normally ... */
-#ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum >= mdnblocks(reln, forknum));
-#endif
-
-	/*
-	 * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any
-	 * more --- we mustn't create a block whose number actually is
-	 * InvalidBlockNumber or larger.
-	 */
-	if ((uint64) blocknum + nblocks >= (uint64) InvalidBlockNumber)
-		ereport(ERROR,
-				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-				 errmsg("cannot extend file \"%s\" beyond %u blocks",
-						relpath(reln->smgr_rlocator, forknum),
-						InvalidBlockNumber)));
-
 	while (remblocks > 0)
 	{
 		BlockNumber segstartblock = curblocknum % ((BlockNumber) RELSEG_SIZE);
@@ -559,7 +473,8 @@ mdzeroextend(SMgrRelation reln, ForkNumber forknum,
 		else
 			numblocks = remblocks;
 
-		v = _mdfd_getseg(reln, forknum, curblocknum, skipFsync, EXTENSION_CREATE);
+		v = _mdfd_getseg(reln, forknum, curblocknum,
+						 flags & SMGR_WRITE_SKIP_FSYNC, EXTENSION_CREATE);
 
 		Assert(segstartblock < RELSEG_SIZE);
 		Assert(segstartblock + numblocks <= RELSEG_SIZE);
@@ -595,13 +510,7 @@ mdzeroextend(SMgrRelation reln, ForkNumber forknum,
 		{
 			int			ret;
 
-			/*
-			 * Even if we don't want to use fallocate, we can still extend a
-			 * bit more efficiently than writing each 8kB block individually.
-			 * pg_pwrite_zeros() (via FileZero()) uses pg_pwritev_with_retry()
-			 * to avoid multiple writes or needing a zeroed buffer for the
-			 * whole length of the extension.
-			 */
+			/* Fall back to writing out zeroes. */
 			ret = FileZero(v->mdfd_vfd,
 						   seekpos, (off_t) BLCKSZ * numblocks,
 						   WAIT_EVENT_DATA_FILE_EXTEND);
@@ -613,7 +522,7 @@ mdzeroextend(SMgrRelation reln, ForkNumber forknum,
 						errhint("Check free disk space."));
 		}
 
-		if (!skipFsync && !SmgrIsTemp(reln))
+		if ((flags & SMGR_WRITE_SKIP_FSYNC) == 0 && !SmgrIsTemp(reln))
 			register_dirty_segment(reln, forknum, v);
 
 		Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
@@ -919,19 +828,14 @@ mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 
 /*
  * mdwritev() -- Write the supplied blocks at the appropriate location.
- *
- * This is to be used only for updating already-existing blocks of a
- * relation (ie, those before the current EOF).  To extend a relation,
- * use mdextend().
  */
 void
 mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void **buffers, BlockNumber nblocks, bool skipFsync)
+		 const void **buffers, BlockNumber nblocks,
+		 int flags)
 {
-	/* This assert is too expensive to have on normally ... */
-#ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum < mdnblocks(reln, forknum));
-#endif
+	if (flags & SMGR_WRITE_ZERO)
+		return mdzeroextend(reln, forknum, blocknum, nblocks, flags);
 
 	while (nblocks > 0)
 	{
@@ -944,7 +848,9 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 		size_t		transferred_this_segment;
 		size_t		size_this_segment;
 
-		v = _mdfd_getseg(reln, forknum, blocknum, skipFsync,
+		v = _mdfd_getseg(reln, forknum, blocknum, flags & SMGR_WRITE_SKIP_FSYNC,
+						 (flags & SMGR_WRITE_EXTEND) ?
+						 EXTENSION_CREATE :
 						 EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
 
 		seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
@@ -992,7 +898,9 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 
 				ereport(ERROR,
 						(errcode_for_file_access(),
-						 errmsg("could not write blocks %u..%u in file \"%s\": %m",
+						 errmsg((flags & SMGR_WRITE_EXTEND) ?
+								"could not extend blocks %u..%u in file \"%s\": %m" :
+								"could not write blocks %u..%u in file \"%s\": %m",
 								blocknum,
 								blocknum + nblocks_this_segment - 1,
 								FilePathName(v->mdfd_vfd)),
@@ -1010,7 +918,7 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 			iovcnt = compute_remaining_iovec(iov, iov, iovcnt, nbytes);
 		}
 
-		if (!skipFsync && !SmgrIsTemp(reln))
+		if ((flags & SMGR_WRITE_SKIP_FSYNC) == 0 && !SmgrIsTemp(reln))
 			register_dirty_segment(reln, forknum, v);
 
 		nblocks -= nblocks_this_segment;
@@ -1638,7 +1546,7 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 		{
 			/*
 			 * Normally we will create new segments only if authorized by the
-			 * caller (i.e., we are doing mdextend()).  But when doing WAL
+			 * caller (i.e., we are doing smgrextend()).  But when doing WAL
 			 * recovery, create segments anyway; this allows cases such as
 			 * replaying WAL data that has a write into a high-numbered
 			 * segment of a relation that was later deleted. We want to go
@@ -1655,9 +1563,9 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 				char	   *zerobuf = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE,
 													 MCXT_ALLOC_ZERO);
 
-				mdextend(reln, forknum,
-						 nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
-						 zerobuf, skipFsync);
+				smgrextend(reln, forknum,
+						   nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
+						   zerobuf, skipFsync);
 				pfree(zerobuf);
 			}
 			flags = O_CREAT;
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 62226d5dca..f5299a07bf 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -82,10 +82,6 @@ typedef struct f_smgr
 	bool		(*smgr_exists) (SMgrRelation reln, ForkNumber forknum);
 	void		(*smgr_unlink) (RelFileLocatorBackend rlocator, ForkNumber forknum,
 								bool isRedo);
-	void		(*smgr_extend) (SMgrRelation reln, ForkNumber forknum,
-								BlockNumber blocknum, const void *buffer, bool skipFsync);
-	void		(*smgr_zeroextend) (SMgrRelation reln, ForkNumber forknum,
-									BlockNumber blocknum, int nblocks, bool skipFsync);
 	bool		(*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum,
 								  BlockNumber blocknum, int nblocks);
 	void		(*smgr_readv) (SMgrRelation reln, ForkNumber forknum,
@@ -94,7 +90,7 @@ typedef struct f_smgr
 	void		(*smgr_writev) (SMgrRelation reln, ForkNumber forknum,
 								BlockNumber blocknum,
 								const void **buffers, BlockNumber nblocks,
-								bool skipFsync);
+								int flags);
 	void		(*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
 								   BlockNumber blocknum, BlockNumber nblocks);
 	BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
@@ -114,8 +110,6 @@ static const f_smgr smgrsw[] = {
 		.smgr_create = mdcreate,
 		.smgr_exists = mdexists,
 		.smgr_unlink = mdunlink,
-		.smgr_extend = mdextend,
-		.smgr_zeroextend = mdzeroextend,
 		.smgr_prefetch = mdprefetch,
 		.smgr_readv = mdreadv,
 		.smgr_writev = mdwritev,
@@ -521,59 +515,6 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
 	pfree(rlocators);
 }
 
-
-/*
- * smgrextend() -- Add a new block to a file.
- *
- * The semantics are nearly the same as smgrwrite(): write at the
- * specified position.  However, this is to be used for the case of
- * extending a relation (i.e., blocknum is at or beyond the current
- * EOF).  Note that we assume writing a block beyond current EOF
- * causes intervening file space to become filled with zeroes.
- */
-void
-smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		   const void *buffer, bool skipFsync)
-{
-	smgrsw[reln->smgr_which].smgr_extend(reln, forknum, blocknum,
-										 buffer, skipFsync);
-
-	/*
-	 * Normally we expect this to increase nblocks by one, but if the cached
-	 * value isn't as expected, just invalidate it so the next call asks the
-	 * kernel.
-	 */
-	if (reln->smgr_cached_nblocks[forknum] == blocknum)
-		reln->smgr_cached_nblocks[forknum] = blocknum + 1;
-	else
-		reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
-}
-
-/*
- * smgrzeroextend() -- Add new zeroed out blocks to a file.
- *
- * Similar to smgrextend(), except the relation can be extended by
- * multiple blocks at once and the added blocks will be filled with
- * zeroes.
- */
-void
-smgrzeroextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-			   int nblocks, bool skipFsync)
-{
-	smgrsw[reln->smgr_which].smgr_zeroextend(reln, forknum, blocknum,
-											 nblocks, skipFsync);
-
-	/*
-	 * Normally we expect this to increase the fork size by nblocks, but if
-	 * the cached value isn't as expected, just invalidate it so the next call
-	 * asks the kernel.
-	 */
-	if (reln->smgr_cached_nblocks[forknum] == blocknum)
-		reln->smgr_cached_nblocks[forknum] = blocknum + nblocks;
-	else
-		reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
-}
-
 /*
  * smgrprefetch() -- Initiate asynchronous read of the specified block of a relation.
  *
@@ -607,9 +548,9 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 /*
  * smgrwritev() -- Write the supplied buffers out.
  *
- * This is to be used only for updating already-existing blocks of a
- * relation (ie, those before the current EOF).  To extend a relation,
- * use smgrextend().
+ * By default this is to be used only for updating already-existing blocks of
+ * a relation (ie, those before the current EOF).  To extend a relation,
+ * specify SMGR_WRITE_EXTEND, optionally with SMGR_WRITE_ZERO.
  *
  * This is not a synchronous write -- the block is not necessarily
  * on disk at return, only dumped out to the kernel.  However,
@@ -623,16 +564,51 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
  * checkpoint happened; that relies on the fact that no other backend can be
  * concurrently modifying the page.
  *
- * skipFsync indicates that the caller will make other provisions to
- * fsync the relation, so we needn't bother.  Temporary relations also
- * do not require fsync.
+ * SMGR_WRITE_SKIP_FSYNC indicates that the caller will make other provisions
+ * to fsync the relation, so we needn't bother.  Temporary relations also do
+ * not require fsync.
  */
 void
 smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		   const void **buffers, BlockNumber nblocks, bool skipFsync)
+		   const void **buffers, BlockNumber nblocks, int flags)
 {
+	if (flags & SMGR_WRITE_ZERO)
+		Assert(flags & SMGR_WRITE_EXTEND);
+#ifdef CHECK_WRITE_VS_EXTEND
+	/* These assert are too expensive to have on normally ... */
+	if (flags & SMGR_WRITE_EXTEND)
+		Assert(blocknum >= smgrnblocks(reln, forknum));
+	else
+		Assert(blocknum + nblocks <= smgrnblocks(reln, forknum));
+#endif
+
+	/*
+	 * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any
+	 * more --- we mustn't create a block whose number actually is
+	 * InvalidBlockNumber or larger.
+	 */
+	if ((uint64) blocknum + nblocks >= (uint64) InvalidBlockNumber)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+				 errmsg("cannot extend file \"%s\" beyond %u blocks",
+						relpath(reln->smgr_rlocator, forknum),
+						InvalidBlockNumber)));
+
 	smgrsw[reln->smgr_which].smgr_writev(reln, forknum, blocknum,
-										 buffers, nblocks, skipFsync);
+										 buffers, nblocks, flags);
+
+	if (flags & SMGR_WRITE_EXTEND)
+	{
+		/*
+		 * Normally we expect this to increase the fork size by nblocks, but
+		 * if the cached value isn't as expected, just invalidate it so the
+		 * next call asks the smgr implementation.
+		 */
+		if (reln->smgr_cached_nblocks[forknum] == blocknum)
+			reln->smgr_cached_nblocks[forknum] = blocknum + nblocks;
+		else
+			reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
+	}
 }
 
 /*
@@ -743,14 +719,14 @@ smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nb
 /*
  * smgrregistersync() -- Request a relation to be sync'd at next checkpoint
  *
- * This can be used after calling smgrwrite() or smgrextend() with skipFsync =
- * true, to register the fsyncs that were skipped earlier.
+ * This can be used after calling smgrwritev() with SMGR_WRITE_SKIP_FSYNC,
+ * to register the fsyncs that were skipped earlier.
  *
  * Note: be mindful that a checkpoint could already have happened between the
- * smgrwrite or smgrextend calls and this!  In that case, the checkpoint
- * already missed fsyncing this relation, and you should use smgrimmedsync
- * instead.  Most callers should use the bulk loading facility in bulk_write.c
- * which handles all that.
+ * smgrwritev calls and this!  In that case, the checkpoint already missed
+ * fsyncing this relation, and you should use smgrimmedsync instead.  Most
+ * callers should use the bulk loading facility in bulk_write.c which handles
+ * all that.
  */
 void
 smgrregistersync(SMgrRelation reln, ForkNumber forknum)
@@ -764,17 +740,16 @@ smgrregistersync(SMgrRelation reln, ForkNumber forknum)
  * Synchronously force all previous writes to the specified relation
  * down to disk.
  *
- * This is useful for building completely new relations (eg, new
- * indexes).  Instead of incrementally WAL-logging the index build
- * steps, we can just write completed index pages to disk with smgrwrite
- * or smgrextend, and then fsync the completed index file before
- * committing the transaction.  (This is sufficient for purposes of
- * crash recovery, since it effectively duplicates forcing a checkpoint
- * for the completed index.  But it is *not* sufficient if one wishes
- * to use the WAL log for PITR or replication purposes: in that case
- * we have to make WAL entries as well.)
- *
- * The preceding writes should specify skipFsync = true to avoid
+ * This is useful for building completely new relations (eg, new indexes).
+ * Instead of incrementally WAL-logging the index build steps, we can just
+ * write completed index pages to disk with smgrwritev, nd then fsync the
+ * completed index file before committing the transaction.  (This is
+ * sufficient for purposes of crash recovery, since it effectively duplicates
+ * forcing a checkpoint for the completed index.  But it is *not* sufficient
+ * if one wishes to use the WAL log for PITR or replication purposes: in that
+ * case we have to make WAL entries as well.)
+ *
+ * The preceding writes should specify SMGR_WRITE_SKIP_FSYNC to avoid
  * duplicative fsyncs.
  *
  * Note that you need to do FlushRelationBuffers() first if there is
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 620f10abde..794e3d7b40 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -26,17 +26,14 @@ extern void mdclose(SMgrRelation reln, ForkNumber forknum);
 extern void mdcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern bool mdexists(SMgrRelation reln, ForkNumber forknum);
 extern void mdunlink(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo);
-extern void mdextend(SMgrRelation reln, ForkNumber forknum,
-					 BlockNumber blocknum, const void *buffer, bool skipFsync);
-extern void mdzeroextend(SMgrRelation reln, ForkNumber forknum,
-						 BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool mdprefetch(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber blocknum, int nblocks);
 extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 					void **buffers, BlockNumber nblocks);
 extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
 					 BlockNumber blocknum,
-					 const void **buffers, BlockNumber nblocks, bool skipFsync);
+					 const void **buffers, BlockNumber nblocks,
+					 int flags);
 extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
 						BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index fc5f883ce1..8598b89e2e 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -73,6 +73,10 @@ typedef SMgrRelationData *SMgrRelation;
 #define SmgrIsTemp(smgr) \
 	RelFileLocatorBackendIsTemp((smgr)->smgr_rlocator)
 
+#define SMGR_WRITE_SKIP_FSYNC 0x01
+#define SMGR_WRITE_EXTEND 0x02
+#define SMGR_WRITE_ZERO 0x04
+
 extern void smgrinit(void);
 extern SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend);
 extern bool smgrexists(SMgrRelation reln, ForkNumber forknum);
@@ -86,10 +90,6 @@ extern void smgrreleaserellocator(RelFileLocatorBackend rlocator);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
-extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
-					   BlockNumber blocknum, const void *buffer, bool skipFsync);
-extern void smgrzeroextend(SMgrRelation reln, ForkNumber forknum,
-						   BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum,
 						 BlockNumber blocknum, int nblocks);
 extern void smgrreadv(SMgrRelation reln, ForkNumber forknum,
@@ -98,7 +98,7 @@ extern void smgrreadv(SMgrRelation reln, ForkNumber forknum,
 extern void smgrwritev(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber blocknum,
 					   const void **buffer, BlockNumber nblocks,
-					   bool skipFsync);
+					   int flags);
 extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,
 						  BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
@@ -121,7 +121,17 @@ static inline void
 smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 		  const void *buffer, bool skipFsync)
 {
-	smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
+	smgrwritev(reln, forknum, blocknum, &buffer, 1,
+			   skipFsync ? SMGR_WRITE_SKIP_FSYNC : 0);
+}
+
+static inline void
+smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+		   const void *buffer, bool skipFsync)
+{
+	smgrwritev(reln, forknum, blocknum, &buffer, 1,
+			   SMGR_WRITE_EXTEND |
+			   (skipFsync ? SMGR_WRITE_SKIP_FSYNC : 0));
 }
 
 #endif							/* SMGR_H */
-- 
2.44.0

v5-0002-Use-vectored-I-O-for-bulk-writes.patchtext/x-patch; charset=US-ASCII; name=v5-0002-Use-vectored-I-O-for-bulk-writes.patchDownload
From 4d8f8a05156008b12fc3038c9e16b3574192a5af Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 9 Mar 2024 16:54:56 +1300
Subject: [PATCH v5 2/3] Use vectored I/O for bulk writes.

bulk_write.c was originally designed with the goal of being able to
use the new vectored write APIs, but couldn't initially  because the
vectored variant of "smgrextend" didn't exist yet.  Now that
smgrwritev() can also handle extension, we can use it here to get wide
write system calls.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/smgr/bulk_write.c | 90 +++++++++++++++++++--------
 1 file changed, 65 insertions(+), 25 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 4a10ece4c3..8e46249a98 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -8,7 +8,7 @@
  * the regular buffer manager and the bulk loading interface!
  *
  * We bypass the buffer manager to avoid the locking overhead, and call
- * smgrextend() directly.  A downside is that the pages will need to be
+ * smgrwritev() directly.  A downside is that the pages will need to be
  * re-read into shared buffers on first use after the build finishes.  That's
  * usually a good tradeoff for large relations, and for small relations, the
  * overhead isn't very significant compared to creating the relation in the
@@ -45,8 +45,6 @@
 
 #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
 
-static const PGIOAlignedBlock zero_buffer = {{0}};	/* worth BLCKSZ */
-
 typedef struct PendingWrite
 {
 	BulkWriteBuffer buf;
@@ -225,35 +223,77 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 	for (int i = 0; i < npending; i++)
 	{
-		BlockNumber blkno = pending_writes[i].blkno;
-		Page		page = pending_writes[i].buf->data;
-
+		Page		page;
+		const void *pages[16];
+		BlockNumber blkno;
+		int			nblocks;
+		int			max_nblocks;
+
+		/* Prepare to write the first block. */
+		blkno = pending_writes[i].blkno;
+		page = pending_writes[i].buf->data;
 		PageSetChecksumInplace(page, blkno);
+		pages[0] = page;
+		nblocks = 1;
+
+		/* Zero-extend any missing space before the first block. */
+		if (blkno > bulkstate->pages_written)
+		{
+			int			nzeroblocks;
+
+			nzeroblocks = blkno - bulkstate->pages_written;
+			smgrwritev(bulkstate->smgr, bulkstate->forknum,
+					   bulkstate->pages_written, NULL, nzeroblocks,
+					   SMGR_WRITE_SKIP_FSYNC |
+					   SMGR_WRITE_EXTEND |
+					   SMGR_WRITE_ZERO);
+			bulkstate->pages_written += nzeroblocks;
+		}
 
-		if (blkno >= bulkstate->pages_written)
+		if (blkno < bulkstate->pages_written)
 		{
 			/*
-			 * If we have to write pages nonsequentially, fill in the space
-			 * with zeroes until we come back and overwrite.  This is not
-			 * logically necessary on standard Unix filesystems (unwritten
-			 * space will read as zeroes anyway), but it should help to avoid
-			 * fragmentation.  The dummy pages aren't WAL-logged though.
+			 * We're overwriting.  Clamp at the existing size, because we
+			 * can't mix writing and extending in a single operation.
 			 */
-			while (blkno > bulkstate->pages_written)
-			{
-				/* don't set checksum for all-zero page */
-				smgrextend(bulkstate->smgr, bulkstate->forknum,
-						   bulkstate->pages_written++,
-						   &zero_buffer,
-						   true);
-			}
-
-			smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-			bulkstate->pages_written = pending_writes[i].blkno + 1;
+			max_nblocks = Min(lengthof(pages),
+							  bulkstate->pages_written - blkno);
 		}
 		else
-			smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-		pfree(page);
+		{
+			/* We're extending. */
+			Assert(blkno == bulkstate->pages_written);
+			max_nblocks = lengthof(pages);
+		}
+
+		/* Find as many consecutive blocks as we can. */
+		while (i + 1 < npending &&
+			   pending_writes[i + 1].blkno == blkno + nblocks &&
+			   nblocks < max_nblocks)
+		{
+			page = pending_writes[++i].buf->data;
+			PageSetChecksumInplace(page, pending_writes[i].blkno);
+			pages[nblocks++] = page;
+		}
+
+		/* Extend or overwrite. */
+		if (blkno == bulkstate->pages_written)
+		{
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno,
+					   pages, nblocks,
+					   SMGR_WRITE_SKIP_FSYNC | SMGR_WRITE_EXTEND);
+			bulkstate->pages_written += nblocks;
+		}
+		else
+		{
+			Assert(blkno + nblocks <= bulkstate->pages_written);
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno,
+					   pages, nblocks,
+					   SMGR_WRITE_SKIP_FSYNC);
+		}
+
+		for (int j = 0; j < nblocks; ++j)
+			pfree(pending_writes[i - j].buf->data);
 	}
 
 	bulkstate->npending = 0;
-- 
2.44.0

v5-0003-Improve-bulk_write.c-memory-management.patchtext/x-patch; charset=US-ASCII; name=v5-0003-Improve-bulk_write.c-memory-management.patchDownload
From 27991a6c1ec1919a97f414c5b663fa8c6ec20a32 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 11 Mar 2024 11:44:41 +1300
Subject: [PATCH v5 3/3] Improve bulk_write.c memory management.

Instead of allocating buffers one at a time with palloc(), allocate an
array full of them up front, and then manage them in a FIFO freelist.
Aside from avoiding allocator overheads, this means that callers who
write sequential blocks will tend to fill up sequential memory, which
hopefully generates more efficient vectored writes.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/smgr/bulk_write.c | 62 +++++++++++++++++++--------
 1 file changed, 43 insertions(+), 19 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 8e46249a98..fdb8fd36b5 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -36,6 +36,7 @@
 
 #include "access/xloginsert.h"
 #include "access/xlogrecord.h"
+#include "lib/ilist.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
 #include "storage/bulk_write.h"
@@ -45,9 +46,15 @@
 
 #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
 
+typedef union BufferSlot
+{
+	PGIOAlignedBlock buffer;
+	dlist_node	freelist_node;
+}			BufferSlot;
+
 typedef struct PendingWrite
 {
-	BulkWriteBuffer buf;
+	BufferSlot *slot;
 	BlockNumber blkno;
 	bool		page_std;
 } PendingWrite;
@@ -57,6 +64,10 @@ typedef struct PendingWrite
  */
 struct BulkWriteState
 {
+	/* Comes first so we can align it correctly. */
+	BufferSlot	buffer_slots[MAX_PENDING_WRITES + 2];
+	dlist_head	buffer_slots_freelist;
+
 	/* Information about the target relation we're writing */
 	SMgrRelation smgr;
 	ForkNumber	forknum;
@@ -71,8 +82,6 @@ struct BulkWriteState
 
 	/* The RedoRecPtr at the time that the bulk operation started */
 	XLogRecPtr	start_RedoRecPtr;
-
-	MemoryContext memcxt;
 };
 
 static void smgr_bulk_flush(BulkWriteState *bulkstate);
@@ -98,7 +107,7 @@ smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
 {
 	BulkWriteState *state;
 
-	state = palloc(sizeof(BulkWriteState));
+	state = palloc_aligned(sizeof(BulkWriteState), PG_IO_ALIGN_SIZE, 0);
 	state->smgr = smgr;
 	state->forknum = forknum;
 	state->use_wal = use_wal;
@@ -108,11 +117,11 @@ smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
 
 	state->start_RedoRecPtr = GetRedoRecPtr();
 
-	/*
-	 * Remember the memory context.  We will use it to allocate all the
-	 * buffers later.
-	 */
-	state->memcxt = CurrentMemoryContext;
+	/* Set up the free-list of buffers. */
+	dlist_init(&state->buffer_slots_freelist);
+	for (int i = 0; i < lengthof(state->buffer_slots); ++i)
+		dlist_push_tail(&state->buffer_slots_freelist,
+						&state->buffer_slots[i].freelist_node);
 
 	return state;
 }
@@ -206,7 +215,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 		for (int i = 0; i < npending; i++)
 		{
 			blknos[i] = pending_writes[i].blkno;
-			pages[i] = pending_writes[i].buf->data;
+			pages[i] = pending_writes[i].slot->buffer.data;
 
 			/*
 			 * If any of the pages use !page_std, we log them all as such.
@@ -231,7 +240,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 		/* Prepare to write the first block. */
 		blkno = pending_writes[i].blkno;
-		page = pending_writes[i].buf->data;
+		page = pending_writes[i].slot->buffer.data;
 		PageSetChecksumInplace(page, blkno);
 		pages[0] = page;
 		nblocks = 1;
@@ -271,7 +280,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 			   pending_writes[i + 1].blkno == blkno + nblocks &&
 			   nblocks < max_nblocks)
 		{
-			page = pending_writes[++i].buf->data;
+			page = pending_writes[++i].slot->buffer.data;
 			PageSetChecksumInplace(page, pending_writes[i].blkno);
 			pages[nblocks++] = page;
 		}
@@ -292,8 +301,14 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 					   SMGR_WRITE_SKIP_FSYNC);
 		}
 
-		for (int j = 0; j < nblocks; ++j)
-			pfree(pending_writes[i - j].buf->data);
+		/*
+		 * Maintain FIFO ordering in the free list, so that users who write
+		 * blocks in sequential order tend to get sequential chunks of buffer
+		 * memory, which may be slight more efficient for vectored writes.
+		 */
+		for (int j = i - nblocks + 1; j <= i; ++j)
+			dlist_push_tail(&bulkstate->buffer_slots_freelist,
+							&pending_writes[j].slot->freelist_node);
 	}
 
 	bulkstate->npending = 0;
@@ -313,7 +328,7 @@ smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer
 	PendingWrite *w;
 
 	w = &bulkstate->pending_writes[bulkstate->npending++];
-	w->buf = buf;
+	w->slot = (BufferSlot *) buf;
 	w->blkno = blocknum;
 	w->page_std = page_std;
 
@@ -327,12 +342,21 @@ smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer
  * There is no function to free the buffer.  When you pass it to
  * smgr_bulk_write(), it takes ownership and frees it when it's no longer
  * needed.
- *
- * This is currently implemented as a simple palloc, but could be implemented
- * using a ring buffer or larger chunks in the future, so don't rely on it.
  */
 BulkWriteBuffer
 smgr_bulk_get_buf(BulkWriteState *bulkstate)
 {
-	return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+	BufferSlot *slot;
+
+	if (dlist_is_empty(&bulkstate->buffer_slots_freelist))
+	{
+		smgr_bulk_flush(bulkstate);
+		if (dlist_is_empty(&bulkstate->buffer_slots_freelist))
+			elog(ERROR, "too many bulk write buffers used but not yet written");
+	}
+
+	slot = dlist_head_element(BufferSlot, freelist_node, &bulkstate->buffer_slots_freelist);
+	dlist_pop_head_node(&bulkstate->buffer_slots_freelist);
+
+	return &slot->buffer;
 }
-- 
2.44.0

#11Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#10)
Re: Vectored I/O in bulk_write.c

I canvassed Andres off-list since smgrzeroextend() is his invention,
and he wondered if it was a good idea to blur the distinction between
the different zero-extension strategies like that. Good question. My
take is that it's fine:

mdzeroextend() already uses fallocate() only for nblocks > 8, but
otherwise writes out zeroes, because that was seen to interact better
with file system heuristics on common systems. That preserves current
behaviour for callers of plain-old sgmrextend(), which becomes a
wrapper for smgrwrite(..., 1, _ZERO | _EXTEND). If some hypothetical
future caller wants to be able to call smgrwritev(..., NULL, 9 blocks,
_ZERO | _EXTEND) directly without triggering the fallocate() strategy
for some well researched reason, we could add a new flag to say so, ie
adjust that gating.

In other words, we have already blurred the semantics. To me, it
seems nicer to have a single high level interface for the same logical
operation, and flags to select strategies more explicitly if that is
eventually necessary.

#12Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#11)
Re: Vectored I/O in bulk_write.c

Hi,

On 2024-03-16 12:27:15 +1300, Thomas Munro wrote:

I canvassed Andres off-list since smgrzeroextend() is his invention,
and he wondered if it was a good idea to blur the distinction between
the different zero-extension strategies like that. Good question. My
take is that it's fine:

mdzeroextend() already uses fallocate() only for nblocks > 8, but
otherwise writes out zeroes, because that was seen to interact better
with file system heuristics on common systems. That preserves current
behaviour for callers of plain-old sgmrextend(), which becomes a
wrapper for smgrwrite(..., 1, _ZERO | _EXTEND). If some hypothetical
future caller wants to be able to call smgrwritev(..., NULL, 9 blocks,
_ZERO | _EXTEND) directly without triggering the fallocate() strategy
for some well researched reason, we could add a new flag to say so, ie
adjust that gating.

In other words, we have already blurred the semantics. To me, it
seems nicer to have a single high level interface for the same logical
operation, and flags to select strategies more explicitly if that is
eventually necessary.

I don't think zeroextend on the one hand and and on the other hand a normal
write or extend are really the same operation. In the former case the content
is hard-coded in the latter it's caller provided. Sure, we can deal with that
by special-casing NULL content - but at that point, what's the benefit of
combinding the two operations? I'm not dead-set against this, just not really
convinced that it's a good idea to combine the operations.

Greetings,

Andres Freund

#13Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#12)
3 attachment(s)
Re: Vectored I/O in bulk_write.c

On Sun, Mar 17, 2024 at 8:10 AM Andres Freund <andres@anarazel.de> wrote:

I don't think zeroextend on the one hand and and on the other hand a normal
write or extend are really the same operation. In the former case the content
is hard-coded in the latter it's caller provided. Sure, we can deal with that
by special-casing NULL content - but at that point, what's the benefit of
combinding the two operations? I'm not dead-set against this, just not really
convinced that it's a good idea to combine the operations.

I liked some things about that, but I'm happy to drop that part.
Here's a version that leaves smgrzeroextend() alone, and I also gave
up on moving errors and assertions up into the smgr layer for now to
minimise the change. So to summarise, this gives us smgrwritev(...,
flags), where flags can include _SKIP_FSYNC and _EXTEND. This way we
don't have to invent smgrextendv(). The old single block smgrextend()
still exists as a static inline wrapper.

Attachments:

v6-0001-Use-smgrwritev-for-both-overwriting-and-extending.patchapplication/octet-stream; name=v6-0001-Use-smgrwritev-for-both-overwriting-and-extending.patchDownload
From 50abfee2334abcee69154f49ea5656eca150b8aa Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 9 Mar 2024 16:04:21 +1300
Subject: [PATCH v6 1/3] Use smgrwritev() for both overwriting and extending.

Since mdwrite() and mdextend() were basically the same and both need
vectored variants, merge them into a single interface to reduce
duplication.  We still want to be able to assert that callers know the
difference between overwriting and extending and activate slightly
different behavior during recovery, so use a new flags argument to
control that.

The traditional single-block smgrwrite() and smgrextend() functions with
skipFsync boolean argument are translated to smgrwritev() with flags by
inlinable wrapper functions, for low-overhead backwards-compatibility.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/buffer/bufmgr.c |   2 +-
 src/backend/storage/smgr/md.c       | 120 ++++++++--------------------
 src/backend/storage/smgr/smgr.c     | 100 ++++++++++-------------
 src/include/storage/md.h            |   4 +-
 src/include/storage/smgr.h          |  19 ++++-
 5 files changed, 90 insertions(+), 155 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f0f8d4259c..aa7331778e 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -3720,7 +3720,7 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum,
 	 *
 	 * In recovery, we cache the value returned by the first lseek(SEEK_END)
 	 * and the future writes keeps the cached value up-to-date. See
-	 * smgrextend. It is possible that the value of the first lseek is smaller
+	 * smgrwritev. It is possible that the value of the first lseek is smaller
 	 * than the actual number of existing blocks in the file due to buggy
 	 * Linux kernels that might not have accounted for the recent write. But
 	 * that should be fine because there must not be any buffers after that
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index bf0f3ca76d..73d077ca3e 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -447,79 +447,10 @@ mdunlinkfork(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo)
 	pfree(path);
 }
 
-/*
- * mdextend() -- Add a block to the specified relation.
- *
- * The semantics are nearly the same as mdwrite(): write at the
- * specified position.  However, this is to be used for the case of
- * extending a relation (i.e., blocknum is at or beyond the current
- * EOF).  Note that we assume writing a block beyond current EOF
- * causes intervening file space to become filled with zeroes.
- */
-void
-mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void *buffer, bool skipFsync)
-{
-	off_t		seekpos;
-	int			nbytes;
-	MdfdVec    *v;
-
-	/* If this build supports direct I/O, the buffer must be I/O aligned. */
-	if (PG_O_DIRECT != 0 && PG_IO_ALIGN_SIZE <= BLCKSZ)
-		Assert((uintptr_t) buffer == TYPEALIGN(PG_IO_ALIGN_SIZE, buffer));
-
-	/* This assert is too expensive to have on normally ... */
-#ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum >= mdnblocks(reln, forknum));
-#endif
-
-	/*
-	 * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any
-	 * more --- we mustn't create a block whose number actually is
-	 * InvalidBlockNumber.  (Note that this failure should be unreachable
-	 * because of upstream checks in bufmgr.c.)
-	 */
-	if (blocknum == InvalidBlockNumber)
-		ereport(ERROR,
-				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-				 errmsg("cannot extend file \"%s\" beyond %u blocks",
-						relpath(reln->smgr_rlocator, forknum),
-						InvalidBlockNumber)));
-
-	v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_CREATE);
-
-	seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
-
-	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
-
-	if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ)
-	{
-		if (nbytes < 0)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not extend file \"%s\": %m",
-							FilePathName(v->mdfd_vfd)),
-					 errhint("Check free disk space.")));
-		/* short write: complain appropriately */
-		ereport(ERROR,
-				(errcode(ERRCODE_DISK_FULL),
-				 errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u",
-						FilePathName(v->mdfd_vfd),
-						nbytes, BLCKSZ, blocknum),
-				 errhint("Check free disk space.")));
-	}
-
-	if (!skipFsync && !SmgrIsTemp(reln))
-		register_dirty_segment(reln, forknum, v);
-
-	Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
-}
-
 /*
  * mdzeroextend() -- Add new zeroed out blocks to the specified relation.
  *
- * Similar to mdextend(), except the relation can be extended by multiple
- * blocks at once and the added blocks will be filled with zeroes.
+ * The added blocks will be filled with zeroes.
  */
 void
 mdzeroextend(SMgrRelation reln, ForkNumber forknum,
@@ -919,20 +850,31 @@ mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 
 /*
  * mdwritev() -- Write the supplied blocks at the appropriate location.
- *
- * This is to be used only for updating already-existing blocks of a
- * relation (ie, those before the current EOF).  To extend a relation,
- * use mdextend().
  */
 void
 mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void **buffers, BlockNumber nblocks, bool skipFsync)
+		 const void **buffers, BlockNumber nblocks, int flags)
 {
 	/* This assert is too expensive to have on normally ... */
 #ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum < mdnblocks(reln, forknum));
+	if (flags & SMGR_WRITE_EXTEND)
+		Assert(blocknum >= mdnblocks(reln, forknum));
+	else
+		Assert(blocknum + nblocks <= mdnblocks(reln, forknum));
 #endif
 
+	/*
+	 * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any
+	 * more --- we mustn't create a block whose number actually is
+	 * InvalidBlockNumber or larger.
+	 */
+	if ((uint64) blocknum + nblocks >= (uint64) InvalidBlockNumber)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+				 errmsg("cannot extend file \"%s\" beyond %u blocks",
+						relpath(reln->smgr_rlocator, forknum),
+						InvalidBlockNumber)));
+
 	while (nblocks > 0)
 	{
 		struct iovec iov[PG_IOV_MAX];
@@ -944,7 +886,9 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 		size_t		transferred_this_segment;
 		size_t		size_this_segment;
 
-		v = _mdfd_getseg(reln, forknum, blocknum, skipFsync,
+		v = _mdfd_getseg(reln, forknum, blocknum, flags & SMGR_WRITE_SKIP_FSYNC,
+						 (flags & SMGR_WRITE_EXTEND) ?
+						 EXTENSION_CREATE :
 						 EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
 
 		seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
@@ -992,7 +936,9 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 
 				ereport(ERROR,
 						(errcode_for_file_access(),
-						 errmsg("could not write blocks %u..%u in file \"%s\": %m",
+						 errmsg((flags & SMGR_WRITE_EXTEND) ?
+								"could not extend blocks %u..%u in file \"%s\": %m" :
+								"could not write blocks %u..%u in file \"%s\": %m",
 								blocknum,
 								blocknum + nblocks_this_segment - 1,
 								FilePathName(v->mdfd_vfd)),
@@ -1010,7 +956,7 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 			iovcnt = compute_remaining_iovec(iov, iov, iovcnt, nbytes);
 		}
 
-		if (!skipFsync && !SmgrIsTemp(reln))
+		if ((flags & SMGR_WRITE_SKIP_FSYNC) == 0 && !SmgrIsTemp(reln))
 			register_dirty_segment(reln, forknum, v);
 
 		nblocks -= nblocks_this_segment;
@@ -1638,11 +1584,11 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 		{
 			/*
 			 * Normally we will create new segments only if authorized by the
-			 * caller (i.e., we are doing mdextend()).  But when doing WAL
-			 * recovery, create segments anyway; this allows cases such as
-			 * replaying WAL data that has a write into a high-numbered
-			 * segment of a relation that was later deleted. We want to go
-			 * ahead and create the segments so we can finish out the replay.
+			 * caller (i.e., we are extending).  But when doing WAL recovery,
+			 * create segments anyway; this allows cases such as replaying WAL
+			 * data that has a write into a high-numbered segment of a
+			 * relation that was later deleted. We want to go ahead and create
+			 * the segments so we can finish out the replay.
 			 *
 			 * We have to maintain the invariant that segments before the last
 			 * active segment are of size RELSEG_SIZE; therefore, if
@@ -1655,9 +1601,9 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 				char	   *zerobuf = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE,
 													 MCXT_ALLOC_ZERO);
 
-				mdextend(reln, forknum,
-						 nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
-						 zerobuf, skipFsync);
+				smgrextend(reln, forknum,
+						   nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
+						   zerobuf, skipFsync);
 				pfree(zerobuf);
 			}
 			flags = O_CREAT;
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 62226d5dca..92628a0339 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -82,8 +82,6 @@ typedef struct f_smgr
 	bool		(*smgr_exists) (SMgrRelation reln, ForkNumber forknum);
 	void		(*smgr_unlink) (RelFileLocatorBackend rlocator, ForkNumber forknum,
 								bool isRedo);
-	void		(*smgr_extend) (SMgrRelation reln, ForkNumber forknum,
-								BlockNumber blocknum, const void *buffer, bool skipFsync);
 	void		(*smgr_zeroextend) (SMgrRelation reln, ForkNumber forknum,
 									BlockNumber blocknum, int nblocks, bool skipFsync);
 	bool		(*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum,
@@ -94,7 +92,7 @@ typedef struct f_smgr
 	void		(*smgr_writev) (SMgrRelation reln, ForkNumber forknum,
 								BlockNumber blocknum,
 								const void **buffers, BlockNumber nblocks,
-								bool skipFsync);
+								int flags);
 	void		(*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
 								   BlockNumber blocknum, BlockNumber nblocks);
 	BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
@@ -114,7 +112,6 @@ static const f_smgr smgrsw[] = {
 		.smgr_create = mdcreate,
 		.smgr_exists = mdexists,
 		.smgr_unlink = mdunlink,
-		.smgr_extend = mdextend,
 		.smgr_zeroextend = mdzeroextend,
 		.smgr_prefetch = mdprefetch,
 		.smgr_readv = mdreadv,
@@ -521,40 +518,11 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
 	pfree(rlocators);
 }
 
-
-/*
- * smgrextend() -- Add a new block to a file.
- *
- * The semantics are nearly the same as smgrwrite(): write at the
- * specified position.  However, this is to be used for the case of
- * extending a relation (i.e., blocknum is at or beyond the current
- * EOF).  Note that we assume writing a block beyond current EOF
- * causes intervening file space to become filled with zeroes.
- */
-void
-smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		   const void *buffer, bool skipFsync)
-{
-	smgrsw[reln->smgr_which].smgr_extend(reln, forknum, blocknum,
-										 buffer, skipFsync);
-
-	/*
-	 * Normally we expect this to increase nblocks by one, but if the cached
-	 * value isn't as expected, just invalidate it so the next call asks the
-	 * kernel.
-	 */
-	if (reln->smgr_cached_nblocks[forknum] == blocknum)
-		reln->smgr_cached_nblocks[forknum] = blocknum + 1;
-	else
-		reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
-}
-
 /*
  * smgrzeroextend() -- Add new zeroed out blocks to a file.
  *
- * Similar to smgrextend(), except the relation can be extended by
- * multiple blocks at once and the added blocks will be filled with
- * zeroes.
+ * Similar to writing with SMGR_WRITE_EXTEND, except the blocks will be filled
+ * with zeroes.
  */
 void
 smgrzeroextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
@@ -607,9 +575,9 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 /*
  * smgrwritev() -- Write the supplied buffers out.
  *
- * This is to be used only for updating already-existing blocks of a
- * relation (ie, those before the current EOF).  To extend a relation,
- * use smgrextend().
+ * By default this is to be used only for updating already-existing blocks of
+ * a relation (ie, those before the current EOF).  To extend a relation,
+ * specify SMGR_WRITE_EXTEND in flags.
  *
  * This is not a synchronous write -- the block is not necessarily
  * on disk at return, only dumped out to the kernel.  However,
@@ -623,16 +591,29 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
  * checkpoint happened; that relies on the fact that no other backend can be
  * concurrently modifying the page.
  *
- * skipFsync indicates that the caller will make other provisions to
- * fsync the relation, so we needn't bother.  Temporary relations also
- * do not require fsync.
+ * SMGR_WRITE_SKIP_FSYNC indicates that the caller will make other provisions
+ * to fsync the relation, so we needn't bother.  Temporary relations also do
+ * not require fsync.
  */
 void
 smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		   const void **buffers, BlockNumber nblocks, bool skipFsync)
+		   const void **buffers, BlockNumber nblocks, int flags)
 {
 	smgrsw[reln->smgr_which].smgr_writev(reln, forknum, blocknum,
-										 buffers, nblocks, skipFsync);
+										 buffers, nblocks, flags);
+
+	if (flags & SMGR_WRITE_EXTEND)
+	{
+		/*
+		 * Normally we expect this to increase the fork size by nblocks, but
+		 * if the cached value isn't as expected, just invalidate it so the
+		 * next call asks the smgr implementation.
+		 */
+		if (reln->smgr_cached_nblocks[forknum] == blocknum)
+			reln->smgr_cached_nblocks[forknum] = blocknum + nblocks;
+		else
+			reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
+	}
 }
 
 /*
@@ -743,14 +724,14 @@ smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nb
 /*
  * smgrregistersync() -- Request a relation to be sync'd at next checkpoint
  *
- * This can be used after calling smgrwrite() or smgrextend() with skipFsync =
- * true, to register the fsyncs that were skipped earlier.
+ * This can be used after calling smgrwritev() with SMGR_WRITE_SKIP_FSYNC,
+ * to register the fsyncs that were skipped earlier.
  *
  * Note: be mindful that a checkpoint could already have happened between the
- * smgrwrite or smgrextend calls and this!  In that case, the checkpoint
- * already missed fsyncing this relation, and you should use smgrimmedsync
- * instead.  Most callers should use the bulk loading facility in bulk_write.c
- * which handles all that.
+ * smgrwritev calls and this!  In that case, the checkpoint already missed
+ * fsyncing this relation, and you should use smgrimmedsync instead.  Most
+ * callers should use the bulk loading facility in bulk_write.c which handles
+ * all that.
  */
 void
 smgrregistersync(SMgrRelation reln, ForkNumber forknum)
@@ -764,17 +745,16 @@ smgrregistersync(SMgrRelation reln, ForkNumber forknum)
  * Synchronously force all previous writes to the specified relation
  * down to disk.
  *
- * This is useful for building completely new relations (eg, new
- * indexes).  Instead of incrementally WAL-logging the index build
- * steps, we can just write completed index pages to disk with smgrwrite
- * or smgrextend, and then fsync the completed index file before
- * committing the transaction.  (This is sufficient for purposes of
- * crash recovery, since it effectively duplicates forcing a checkpoint
- * for the completed index.  But it is *not* sufficient if one wishes
- * to use the WAL log for PITR or replication purposes: in that case
- * we have to make WAL entries as well.)
- *
- * The preceding writes should specify skipFsync = true to avoid
+ * This is useful for building completely new relations (eg, new indexes).
+ * Instead of incrementally WAL-logging the index build steps, we can just
+ * write completed index pages to disk with smgrwritev, and then fsync the
+ * completed index file before committing the transaction.  (This is
+ * sufficient for purposes of crash recovery, since it effectively duplicates
+ * forcing a checkpoint for the completed index.  But it is *not* sufficient
+ * if one wishes to use the WAL log for PITR or replication purposes: in that
+ * case we have to make WAL entries as well.)
+ *
+ * The preceding writes should specify SMGR_WRITE_SKIP_FSYNC to avoid
  * duplicative fsyncs.
  *
  * Note that you need to do FlushRelationBuffers() first if there is
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 620f10abde..5fcd6f47df 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -26,8 +26,6 @@ extern void mdclose(SMgrRelation reln, ForkNumber forknum);
 extern void mdcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern bool mdexists(SMgrRelation reln, ForkNumber forknum);
 extern void mdunlink(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo);
-extern void mdextend(SMgrRelation reln, ForkNumber forknum,
-					 BlockNumber blocknum, const void *buffer, bool skipFsync);
 extern void mdzeroextend(SMgrRelation reln, ForkNumber forknum,
 						 BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool mdprefetch(SMgrRelation reln, ForkNumber forknum,
@@ -36,7 +34,7 @@ extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 					void **buffers, BlockNumber nblocks);
 extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
 					 BlockNumber blocknum,
-					 const void **buffers, BlockNumber nblocks, bool skipFsync);
+					 const void **buffers, BlockNumber nblocks, int flags);
 extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
 						BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index fc5f883ce1..2b8b72820c 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -73,6 +73,9 @@ typedef SMgrRelationData *SMgrRelation;
 #define SmgrIsTemp(smgr) \
 	RelFileLocatorBackendIsTemp((smgr)->smgr_rlocator)
 
+#define SMGR_WRITE_SKIP_FSYNC 0x01
+#define SMGR_WRITE_EXTEND 0x02
+
 extern void smgrinit(void);
 extern SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend);
 extern bool smgrexists(SMgrRelation reln, ForkNumber forknum);
@@ -86,8 +89,6 @@ extern void smgrreleaserellocator(RelFileLocatorBackend rlocator);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
-extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
-					   BlockNumber blocknum, const void *buffer, bool skipFsync);
 extern void smgrzeroextend(SMgrRelation reln, ForkNumber forknum,
 						   BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum,
@@ -98,7 +99,7 @@ extern void smgrreadv(SMgrRelation reln, ForkNumber forknum,
 extern void smgrwritev(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber blocknum,
 					   const void **buffer, BlockNumber nblocks,
-					   bool skipFsync);
+					   int flags);
 extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,
 						  BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
@@ -121,7 +122,17 @@ static inline void
 smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 		  const void *buffer, bool skipFsync)
 {
-	smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
+	smgrwritev(reln, forknum, blocknum, &buffer, 1,
+			   skipFsync ? SMGR_WRITE_SKIP_FSYNC : 0);
+}
+
+static inline void
+smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+		   const void *buffer, bool skipFsync)
+{
+	smgrwritev(reln, forknum, blocknum, &buffer, 1,
+			   SMGR_WRITE_EXTEND |
+			   (skipFsync ? SMGR_WRITE_SKIP_FSYNC : 0));
 }
 
 #endif							/* SMGR_H */
-- 
2.39.3 (Apple Git-146)

v6-0002-Use-vectored-I-O-for-bulk-writes.patchapplication/octet-stream; name=v6-0002-Use-vectored-I-O-for-bulk-writes.patchDownload
From 26efdeeecaf701b59d18f8c75954e28cfee5c0c8 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 9 Mar 2024 16:54:56 +1300
Subject: [PATCH v6 2/3] Use vectored I/O for bulk writes.

bulk_write.c was originally designed with the goal of being able to
use the new vectored write APIs, but couldn't initially  because the
vectored variant of smgrextend() didn't exist yet.  Now that
smgrwritev() can also handle extension, we can use it here to get wide
write system calls.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/smgr/bulk_write.c | 96 ++++++++++++++++++++-------
 1 file changed, 73 insertions(+), 23 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 4a10ece4c3..848c3054f5 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -8,7 +8,7 @@
  * the regular buffer manager and the bulk loading interface!
  *
  * We bypass the buffer manager to avoid the locking overhead, and call
- * smgrextend() directly.  A downside is that the pages will need to be
+ * smgrwritev() directly.  A downside is that the pages will need to be
  * re-read into shared buffers on first use after the build finishes.  That's
  * usually a good tradeoff for large relations, and for small relations, the
  * overhead isn't very significant compared to creating the relation in the
@@ -45,6 +45,12 @@
 
 #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
 
+/*
+ * How many blocks to send to smgrwritev() at a time.  Arbitrary value for
+ * now.
+ */
+#define MAX_BLOCKS_PER_WRITE ((128 * 1024) / BLCKSZ)
+
 static const PGIOAlignedBlock zero_buffer = {{0}};	/* worth BLCKSZ */
 
 typedef struct PendingWrite
@@ -225,35 +231,79 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 	for (int i = 0; i < npending; i++)
 	{
-		BlockNumber blkno = pending_writes[i].blkno;
-		Page		page = pending_writes[i].buf->data;
-
+		Page		page;
+		const void *pages[MAX_BLOCKS_PER_WRITE];
+		BlockNumber blkno;
+		int			nblocks;
+		int			max_nblocks;
+
+		/* Prepare to write the first block. */
+		blkno = pending_writes[i].blkno;
+		page = pending_writes[i].buf->data;
 		PageSetChecksumInplace(page, blkno);
+		pages[0] = page;
+		nblocks = 1;
 
-		if (blkno >= bulkstate->pages_written)
+		/*
+		 * If we have to write pages nonsequentially, fill in the space with
+		 * zeroes until we come back and overwrite.  This is not logically
+		 * necessary on standard Unix filesystems (unwritten space will read
+		 * as zeroes anyway), but it should help to avoid fragmentation.  The
+		 * dummy pages aren't WAL-logged though.
+		 */
+		while (blkno > bulkstate->pages_written)
+		{
+			/* don't set checksum for all-zero page */
+			smgrextend(bulkstate->smgr, bulkstate->forknum,
+					   bulkstate->pages_written++,
+					   &zero_buffer,
+					   true);
+		}
+
+		if (blkno < bulkstate->pages_written)
 		{
 			/*
-			 * If we have to write pages nonsequentially, fill in the space
-			 * with zeroes until we come back and overwrite.  This is not
-			 * logically necessary on standard Unix filesystems (unwritten
-			 * space will read as zeroes anyway), but it should help to avoid
-			 * fragmentation.  The dummy pages aren't WAL-logged though.
+			 * We're overwriting.  Clamp at the existing size, because we
+			 * can't mix writing and extending in a single operation.
 			 */
-			while (blkno > bulkstate->pages_written)
-			{
-				/* don't set checksum for all-zero page */
-				smgrextend(bulkstate->smgr, bulkstate->forknum,
-						   bulkstate->pages_written++,
-						   &zero_buffer,
-						   true);
-			}
-
-			smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-			bulkstate->pages_written = pending_writes[i].blkno + 1;
+			max_nblocks = Min(lengthof(pages),
+							  bulkstate->pages_written - blkno);
 		}
 		else
-			smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-		pfree(page);
+		{
+			/* We're extending. */
+			Assert(blkno == bulkstate->pages_written);
+			max_nblocks = lengthof(pages);
+		}
+
+		/* Find as many consecutive blocks as we can. */
+		while (i + 1 < npending &&
+			   pending_writes[i + 1].blkno == blkno + nblocks &&
+			   nblocks < max_nblocks)
+		{
+			page = pending_writes[++i].buf->data;
+			PageSetChecksumInplace(page, pending_writes[i].blkno);
+			pages[nblocks++] = page;
+		}
+
+		/* Extend or overwrite. */
+		if (blkno == bulkstate->pages_written)
+		{
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno,
+					   pages, nblocks,
+					   SMGR_WRITE_SKIP_FSYNC | SMGR_WRITE_EXTEND);
+			bulkstate->pages_written += nblocks;
+		}
+		else
+		{
+			Assert(blkno + nblocks <= bulkstate->pages_written);
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno,
+					   pages, nblocks,
+					   SMGR_WRITE_SKIP_FSYNC);
+		}
+
+		for (int j = 0; j < nblocks; ++j)
+			pfree(pending_writes[i - j].buf->data);
 	}
 
 	bulkstate->npending = 0;
-- 
2.39.3 (Apple Git-146)

v6-0003-Improve-bulk_write.c-memory-management.patchapplication/octet-stream; name=v6-0003-Improve-bulk_write.c-memory-management.patchDownload
From 08d628b08d2e0eb974b8345b54e44fb20be984f4 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 11 Mar 2024 11:44:41 +1300
Subject: [PATCH v6 3/3] Improve bulk_write.c memory management.

Instead of allocating buffers one at a time with palloc(), allocate an
array full of them up front, and then manage them in a FIFO freelist.
Aside from avoiding allocator overheads, this means that callers who
write sequential blocks will tend to fill up sequential memory, which
hopefully generates more efficient vectored writes.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/smgr/bulk_write.c | 62 +++++++++++++++++++--------
 1 file changed, 43 insertions(+), 19 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 848c3054f5..b94d81d43b 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -36,6 +36,7 @@
 
 #include "access/xloginsert.h"
 #include "access/xlogrecord.h"
+#include "lib/ilist.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
 #include "storage/bulk_write.h"
@@ -53,9 +54,15 @@
 
 static const PGIOAlignedBlock zero_buffer = {{0}};	/* worth BLCKSZ */
 
+typedef union BufferSlot
+{
+	PGIOAlignedBlock buffer;
+	dlist_node	freelist_node;
+}			BufferSlot;
+
 typedef struct PendingWrite
 {
-	BulkWriteBuffer buf;
+	BufferSlot *slot;
 	BlockNumber blkno;
 	bool		page_std;
 } PendingWrite;
@@ -65,6 +72,10 @@ typedef struct PendingWrite
  */
 struct BulkWriteState
 {
+	/* Comes first so we can align it correctly. */
+	BufferSlot	buffer_slots[MAX_PENDING_WRITES + 2];
+	dlist_head	buffer_slots_freelist;
+
 	/* Information about the target relation we're writing */
 	SMgrRelation smgr;
 	ForkNumber	forknum;
@@ -79,8 +90,6 @@ struct BulkWriteState
 
 	/* The RedoRecPtr at the time that the bulk operation started */
 	XLogRecPtr	start_RedoRecPtr;
-
-	MemoryContext memcxt;
 };
 
 static void smgr_bulk_flush(BulkWriteState *bulkstate);
@@ -106,7 +115,7 @@ smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
 {
 	BulkWriteState *state;
 
-	state = palloc(sizeof(BulkWriteState));
+	state = palloc_aligned(sizeof(BulkWriteState), PG_IO_ALIGN_SIZE, 0);
 	state->smgr = smgr;
 	state->forknum = forknum;
 	state->use_wal = use_wal;
@@ -116,11 +125,11 @@ smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
 
 	state->start_RedoRecPtr = GetRedoRecPtr();
 
-	/*
-	 * Remember the memory context.  We will use it to allocate all the
-	 * buffers later.
-	 */
-	state->memcxt = CurrentMemoryContext;
+	/* Set up the free-list of buffers. */
+	dlist_init(&state->buffer_slots_freelist);
+	for (int i = 0; i < lengthof(state->buffer_slots); ++i)
+		dlist_push_tail(&state->buffer_slots_freelist,
+						&state->buffer_slots[i].freelist_node);
 
 	return state;
 }
@@ -214,7 +223,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 		for (int i = 0; i < npending; i++)
 		{
 			blknos[i] = pending_writes[i].blkno;
-			pages[i] = pending_writes[i].buf->data;
+			pages[i] = pending_writes[i].slot->buffer.data;
 
 			/*
 			 * If any of the pages use !page_std, we log them all as such.
@@ -239,7 +248,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 		/* Prepare to write the first block. */
 		blkno = pending_writes[i].blkno;
-		page = pending_writes[i].buf->data;
+		page = pending_writes[i].slot->buffer.data;
 		PageSetChecksumInplace(page, blkno);
 		pages[0] = page;
 		nblocks = 1;
@@ -281,7 +290,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 			   pending_writes[i + 1].blkno == blkno + nblocks &&
 			   nblocks < max_nblocks)
 		{
-			page = pending_writes[++i].buf->data;
+			page = pending_writes[++i].slot->buffer.data;
 			PageSetChecksumInplace(page, pending_writes[i].blkno);
 			pages[nblocks++] = page;
 		}
@@ -302,8 +311,14 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 					   SMGR_WRITE_SKIP_FSYNC);
 		}
 
-		for (int j = 0; j < nblocks; ++j)
-			pfree(pending_writes[i - j].buf->data);
+		/*
+		 * Maintain FIFO ordering in the free list, so that users who write
+		 * blocks in sequential order tend to get sequential chunks of buffer
+		 * memory, which may be slight more efficient for vectored writes.
+		 */
+		for (int j = i - nblocks + 1; j <= i; ++j)
+			dlist_push_tail(&bulkstate->buffer_slots_freelist,
+							&pending_writes[j].slot->freelist_node);
 	}
 
 	bulkstate->npending = 0;
@@ -323,7 +338,7 @@ smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer
 	PendingWrite *w;
 
 	w = &bulkstate->pending_writes[bulkstate->npending++];
-	w->buf = buf;
+	w->slot = (BufferSlot *) buf;
 	w->blkno = blocknum;
 	w->page_std = page_std;
 
@@ -337,12 +352,21 @@ smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer
  * There is no function to free the buffer.  When you pass it to
  * smgr_bulk_write(), it takes ownership and frees it when it's no longer
  * needed.
- *
- * This is currently implemented as a simple palloc, but could be implemented
- * using a ring buffer or larger chunks in the future, so don't rely on it.
  */
 BulkWriteBuffer
 smgr_bulk_get_buf(BulkWriteState *bulkstate)
 {
-	return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+	BufferSlot *slot;
+
+	if (dlist_is_empty(&bulkstate->buffer_slots_freelist))
+	{
+		smgr_bulk_flush(bulkstate);
+		if (dlist_is_empty(&bulkstate->buffer_slots_freelist))
+			elog(ERROR, "too many bulk write buffers used but not yet written");
+	}
+
+	slot = dlist_head_element(BufferSlot, freelist_node, &bulkstate->buffer_slots_freelist);
+	dlist_pop_head_node(&bulkstate->buffer_slots_freelist);
+
+	return &slot->buffer;
 }
-- 
2.39.3 (Apple Git-146)

#14Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#13)
1 attachment(s)
Re: Vectored I/O in bulk_write.c

Then I would make the trivial change to respect the new
io_combine_limit GUC that I'm gearing up to commit in another thread.
As attached.

Attachments:

0001-fixup-respect-io_combine_limit-in-bulk_write.c.txttext/plain; charset=US-ASCII; name=0001-fixup-respect-io_combine_limit-in-bulk_write.c.txtDownload
From 7993cede8939cad9172867ccc690a44ea25d1ad6 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Fri, 29 Mar 2024 00:22:53 +1300
Subject: [PATCH] fixup: respect io_combine_limit in bulk_write.c

---
 src/backend/storage/smgr/bulk_write.c | 12 +++---------
 1 file changed, 3 insertions(+), 9 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 848c3054f5..612a9a23b3 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -45,12 +45,6 @@
 
 #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
 
-/*
- * How many blocks to send to smgrwritev() at a time.  Arbitrary value for
- * now.
- */
-#define MAX_BLOCKS_PER_WRITE ((128 * 1024) / BLCKSZ)
-
 static const PGIOAlignedBlock zero_buffer = {{0}};	/* worth BLCKSZ */
 
 typedef struct PendingWrite
@@ -232,7 +226,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 	for (int i = 0; i < npending; i++)
 	{
 		Page		page;
-		const void *pages[MAX_BLOCKS_PER_WRITE];
+		const void *pages[MAX_IO_COMBINE_LIMIT];
 		BlockNumber blkno;
 		int			nblocks;
 		int			max_nblocks;
@@ -266,14 +260,14 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 			 * We're overwriting.  Clamp at the existing size, because we
 			 * can't mix writing and extending in a single operation.
 			 */
-			max_nblocks = Min(lengthof(pages),
+			max_nblocks = Min(io_combine_limit,
 							  bulkstate->pages_written - blkno);
 		}
 		else
 		{
 			/* We're extending. */
 			Assert(blkno == bulkstate->pages_written);
-			max_nblocks = lengthof(pages);
+			max_nblocks = io_combine_limit;
 		}
 
 		/* Find as many consecutive blocks as we can. */
-- 
2.39.3 (Apple Git-146)

#15Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#14)
3 attachment(s)
Re: Vectored I/O in bulk_write.c

Here's a rebase. I decided against committing this for v17 in the
end. There's not much wrong with it AFAIK, except perhaps an
unprincipled chopping up of writes with large io_combine_limit due to
simplistic flow control, and I liked the idea of having a decent user
of smgrwritev() in the tree, and it probably makes CREATE INDEX a bit
faster, but... I'd like to try something more ambitious that
streamifies this and also the "main" writeback paths. I shared some
patches for that that are counterparts to this, over at[1]/messages/by-id/CA+hUKGK1in4FiWtisXZ+Jo-cNSbWjmBcPww3w3DBM+whJTABXA@mail.gmail.com.

[1]: /messages/by-id/CA+hUKGK1in4FiWtisXZ+Jo-cNSbWjmBcPww3w3DBM+whJTABXA@mail.gmail.com

Attachments:

v7-0001-Use-smgrwritev-for-both-overwriting-and-extending.patchtext/x-patch; charset=US-ASCII; name=v7-0001-Use-smgrwritev-for-both-overwriting-and-extending.patchDownload
From 7ee50aae3d4eba0df5bce05c196f411abb0bd9ab Mon Sep 17 00:00:00 2001
From: Thomas Munro <tmunro@postgresql.org>
Date: Mon, 8 Apr 2024 18:19:41 +1200
Subject: [PATCH v7 1/3] Use smgrwritev() for both overwriting and extending.

Since mdwrite() and mdextend() were very similar and both needed
vectored variants, merge them into a single interface.  This reduces
duplication and fills out the set of operations.  We still want to be
able to assert that callers know the difference between overwriting and
extending, and to activate slightly different behavior during recovery,
so add a new "flags" argument to control that.

The goal here is to provide the missing vectored interface without which
the new bulk write facility from commit 8af25652 can't write to the file
system in bulk.  A following commit will connect them together.

Like smgrwrite(), the traditional single-block smgrextend() function
with skipFsync boolean argument is now translated to smgrwritev() by
an inlinable wrapper function.

The smgrzeroextend() interface remains distinct; the idea was floated of
merging that too, but so far without consensus.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/buffer/bufmgr.c |  10 +--
 src/backend/storage/smgr/md.c       | 120 ++++++++--------------------
 src/backend/storage/smgr/smgr.c     | 100 ++++++++++-------------
 src/include/storage/md.h            |   4 +-
 src/include/storage/smgr.h          |  19 ++++-
 5 files changed, 94 insertions(+), 159 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 44836751b71..5166a839da8 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -4011,11 +4011,11 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum,
 	 *
 	 * In recovery, we cache the value returned by the first lseek(SEEK_END)
 	 * and the future writes keeps the cached value up-to-date. See
-	 * smgrextend. It is possible that the value of the first lseek is smaller
-	 * than the actual number of existing blocks in the file due to buggy
-	 * Linux kernels that might not have accounted for the recent write. But
-	 * that should be fine because there must not be any buffers after that
-	 * file size.
+	 * smgrzeroextend and smgrwritev. It is possible that the value of the
+	 * first lseek is smaller than the actual number of existing blocks in the
+	 * file due to buggy Linux kernels that might not have accounted for the
+	 * recent write. But that should be fine because there must not be any
+	 * buffers after that file size.
 	 */
 	for (i = 0; i < nforks; i++)
 	{
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index bf0f3ca76d1..73d077ca3ea 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -447,79 +447,10 @@ mdunlinkfork(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo)
 	pfree(path);
 }
 
-/*
- * mdextend() -- Add a block to the specified relation.
- *
- * The semantics are nearly the same as mdwrite(): write at the
- * specified position.  However, this is to be used for the case of
- * extending a relation (i.e., blocknum is at or beyond the current
- * EOF).  Note that we assume writing a block beyond current EOF
- * causes intervening file space to become filled with zeroes.
- */
-void
-mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void *buffer, bool skipFsync)
-{
-	off_t		seekpos;
-	int			nbytes;
-	MdfdVec    *v;
-
-	/* If this build supports direct I/O, the buffer must be I/O aligned. */
-	if (PG_O_DIRECT != 0 && PG_IO_ALIGN_SIZE <= BLCKSZ)
-		Assert((uintptr_t) buffer == TYPEALIGN(PG_IO_ALIGN_SIZE, buffer));
-
-	/* This assert is too expensive to have on normally ... */
-#ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum >= mdnblocks(reln, forknum));
-#endif
-
-	/*
-	 * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any
-	 * more --- we mustn't create a block whose number actually is
-	 * InvalidBlockNumber.  (Note that this failure should be unreachable
-	 * because of upstream checks in bufmgr.c.)
-	 */
-	if (blocknum == InvalidBlockNumber)
-		ereport(ERROR,
-				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-				 errmsg("cannot extend file \"%s\" beyond %u blocks",
-						relpath(reln->smgr_rlocator, forknum),
-						InvalidBlockNumber)));
-
-	v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_CREATE);
-
-	seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
-
-	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
-
-	if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ)
-	{
-		if (nbytes < 0)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not extend file \"%s\": %m",
-							FilePathName(v->mdfd_vfd)),
-					 errhint("Check free disk space.")));
-		/* short write: complain appropriately */
-		ereport(ERROR,
-				(errcode(ERRCODE_DISK_FULL),
-				 errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u",
-						FilePathName(v->mdfd_vfd),
-						nbytes, BLCKSZ, blocknum),
-				 errhint("Check free disk space.")));
-	}
-
-	if (!skipFsync && !SmgrIsTemp(reln))
-		register_dirty_segment(reln, forknum, v);
-
-	Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
-}
-
 /*
  * mdzeroextend() -- Add new zeroed out blocks to the specified relation.
  *
- * Similar to mdextend(), except the relation can be extended by multiple
- * blocks at once and the added blocks will be filled with zeroes.
+ * The added blocks will be filled with zeroes.
  */
 void
 mdzeroextend(SMgrRelation reln, ForkNumber forknum,
@@ -919,20 +850,31 @@ mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 
 /*
  * mdwritev() -- Write the supplied blocks at the appropriate location.
- *
- * This is to be used only for updating already-existing blocks of a
- * relation (ie, those before the current EOF).  To extend a relation,
- * use mdextend().
  */
 void
 mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void **buffers, BlockNumber nblocks, bool skipFsync)
+		 const void **buffers, BlockNumber nblocks, int flags)
 {
 	/* This assert is too expensive to have on normally ... */
 #ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum < mdnblocks(reln, forknum));
+	if (flags & SMGR_WRITE_EXTEND)
+		Assert(blocknum >= mdnblocks(reln, forknum));
+	else
+		Assert(blocknum + nblocks <= mdnblocks(reln, forknum));
 #endif
 
+	/*
+	 * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any
+	 * more --- we mustn't create a block whose number actually is
+	 * InvalidBlockNumber or larger.
+	 */
+	if ((uint64) blocknum + nblocks >= (uint64) InvalidBlockNumber)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+				 errmsg("cannot extend file \"%s\" beyond %u blocks",
+						relpath(reln->smgr_rlocator, forknum),
+						InvalidBlockNumber)));
+
 	while (nblocks > 0)
 	{
 		struct iovec iov[PG_IOV_MAX];
@@ -944,7 +886,9 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 		size_t		transferred_this_segment;
 		size_t		size_this_segment;
 
-		v = _mdfd_getseg(reln, forknum, blocknum, skipFsync,
+		v = _mdfd_getseg(reln, forknum, blocknum, flags & SMGR_WRITE_SKIP_FSYNC,
+						 (flags & SMGR_WRITE_EXTEND) ?
+						 EXTENSION_CREATE :
 						 EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
 
 		seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
@@ -992,7 +936,9 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 
 				ereport(ERROR,
 						(errcode_for_file_access(),
-						 errmsg("could not write blocks %u..%u in file \"%s\": %m",
+						 errmsg((flags & SMGR_WRITE_EXTEND) ?
+								"could not extend blocks %u..%u in file \"%s\": %m" :
+								"could not write blocks %u..%u in file \"%s\": %m",
 								blocknum,
 								blocknum + nblocks_this_segment - 1,
 								FilePathName(v->mdfd_vfd)),
@@ -1010,7 +956,7 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 			iovcnt = compute_remaining_iovec(iov, iov, iovcnt, nbytes);
 		}
 
-		if (!skipFsync && !SmgrIsTemp(reln))
+		if ((flags & SMGR_WRITE_SKIP_FSYNC) == 0 && !SmgrIsTemp(reln))
 			register_dirty_segment(reln, forknum, v);
 
 		nblocks -= nblocks_this_segment;
@@ -1638,11 +1584,11 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 		{
 			/*
 			 * Normally we will create new segments only if authorized by the
-			 * caller (i.e., we are doing mdextend()).  But when doing WAL
-			 * recovery, create segments anyway; this allows cases such as
-			 * replaying WAL data that has a write into a high-numbered
-			 * segment of a relation that was later deleted. We want to go
-			 * ahead and create the segments so we can finish out the replay.
+			 * caller (i.e., we are extending).  But when doing WAL recovery,
+			 * create segments anyway; this allows cases such as replaying WAL
+			 * data that has a write into a high-numbered segment of a
+			 * relation that was later deleted. We want to go ahead and create
+			 * the segments so we can finish out the replay.
 			 *
 			 * We have to maintain the invariant that segments before the last
 			 * active segment are of size RELSEG_SIZE; therefore, if
@@ -1655,9 +1601,9 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 				char	   *zerobuf = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE,
 													 MCXT_ALLOC_ZERO);
 
-				mdextend(reln, forknum,
-						 nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
-						 zerobuf, skipFsync);
+				smgrextend(reln, forknum,
+						   nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
+						   zerobuf, skipFsync);
 				pfree(zerobuf);
 			}
 			flags = O_CREAT;
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 62226d5dca7..92628a0339e 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -82,8 +82,6 @@ typedef struct f_smgr
 	bool		(*smgr_exists) (SMgrRelation reln, ForkNumber forknum);
 	void		(*smgr_unlink) (RelFileLocatorBackend rlocator, ForkNumber forknum,
 								bool isRedo);
-	void		(*smgr_extend) (SMgrRelation reln, ForkNumber forknum,
-								BlockNumber blocknum, const void *buffer, bool skipFsync);
 	void		(*smgr_zeroextend) (SMgrRelation reln, ForkNumber forknum,
 									BlockNumber blocknum, int nblocks, bool skipFsync);
 	bool		(*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum,
@@ -94,7 +92,7 @@ typedef struct f_smgr
 	void		(*smgr_writev) (SMgrRelation reln, ForkNumber forknum,
 								BlockNumber blocknum,
 								const void **buffers, BlockNumber nblocks,
-								bool skipFsync);
+								int flags);
 	void		(*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
 								   BlockNumber blocknum, BlockNumber nblocks);
 	BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
@@ -114,7 +112,6 @@ static const f_smgr smgrsw[] = {
 		.smgr_create = mdcreate,
 		.smgr_exists = mdexists,
 		.smgr_unlink = mdunlink,
-		.smgr_extend = mdextend,
 		.smgr_zeroextend = mdzeroextend,
 		.smgr_prefetch = mdprefetch,
 		.smgr_readv = mdreadv,
@@ -521,40 +518,11 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
 	pfree(rlocators);
 }
 
-
-/*
- * smgrextend() -- Add a new block to a file.
- *
- * The semantics are nearly the same as smgrwrite(): write at the
- * specified position.  However, this is to be used for the case of
- * extending a relation (i.e., blocknum is at or beyond the current
- * EOF).  Note that we assume writing a block beyond current EOF
- * causes intervening file space to become filled with zeroes.
- */
-void
-smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		   const void *buffer, bool skipFsync)
-{
-	smgrsw[reln->smgr_which].smgr_extend(reln, forknum, blocknum,
-										 buffer, skipFsync);
-
-	/*
-	 * Normally we expect this to increase nblocks by one, but if the cached
-	 * value isn't as expected, just invalidate it so the next call asks the
-	 * kernel.
-	 */
-	if (reln->smgr_cached_nblocks[forknum] == blocknum)
-		reln->smgr_cached_nblocks[forknum] = blocknum + 1;
-	else
-		reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
-}
-
 /*
  * smgrzeroextend() -- Add new zeroed out blocks to a file.
  *
- * Similar to smgrextend(), except the relation can be extended by
- * multiple blocks at once and the added blocks will be filled with
- * zeroes.
+ * Similar to writing with SMGR_WRITE_EXTEND, except the blocks will be filled
+ * with zeroes.
  */
 void
 smgrzeroextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
@@ -607,9 +575,9 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 /*
  * smgrwritev() -- Write the supplied buffers out.
  *
- * This is to be used only for updating already-existing blocks of a
- * relation (ie, those before the current EOF).  To extend a relation,
- * use smgrextend().
+ * By default this is to be used only for updating already-existing blocks of
+ * a relation (ie, those before the current EOF).  To extend a relation,
+ * specify SMGR_WRITE_EXTEND in flags.
  *
  * This is not a synchronous write -- the block is not necessarily
  * on disk at return, only dumped out to the kernel.  However,
@@ -623,16 +591,29 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
  * checkpoint happened; that relies on the fact that no other backend can be
  * concurrently modifying the page.
  *
- * skipFsync indicates that the caller will make other provisions to
- * fsync the relation, so we needn't bother.  Temporary relations also
- * do not require fsync.
+ * SMGR_WRITE_SKIP_FSYNC indicates that the caller will make other provisions
+ * to fsync the relation, so we needn't bother.  Temporary relations also do
+ * not require fsync.
  */
 void
 smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		   const void **buffers, BlockNumber nblocks, bool skipFsync)
+		   const void **buffers, BlockNumber nblocks, int flags)
 {
 	smgrsw[reln->smgr_which].smgr_writev(reln, forknum, blocknum,
-										 buffers, nblocks, skipFsync);
+										 buffers, nblocks, flags);
+
+	if (flags & SMGR_WRITE_EXTEND)
+	{
+		/*
+		 * Normally we expect this to increase the fork size by nblocks, but
+		 * if the cached value isn't as expected, just invalidate it so the
+		 * next call asks the smgr implementation.
+		 */
+		if (reln->smgr_cached_nblocks[forknum] == blocknum)
+			reln->smgr_cached_nblocks[forknum] = blocknum + nblocks;
+		else
+			reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
+	}
 }
 
 /*
@@ -743,14 +724,14 @@ smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nb
 /*
  * smgrregistersync() -- Request a relation to be sync'd at next checkpoint
  *
- * This can be used after calling smgrwrite() or smgrextend() with skipFsync =
- * true, to register the fsyncs that were skipped earlier.
+ * This can be used after calling smgrwritev() with SMGR_WRITE_SKIP_FSYNC,
+ * to register the fsyncs that were skipped earlier.
  *
  * Note: be mindful that a checkpoint could already have happened between the
- * smgrwrite or smgrextend calls and this!  In that case, the checkpoint
- * already missed fsyncing this relation, and you should use smgrimmedsync
- * instead.  Most callers should use the bulk loading facility in bulk_write.c
- * which handles all that.
+ * smgrwritev calls and this!  In that case, the checkpoint already missed
+ * fsyncing this relation, and you should use smgrimmedsync instead.  Most
+ * callers should use the bulk loading facility in bulk_write.c which handles
+ * all that.
  */
 void
 smgrregistersync(SMgrRelation reln, ForkNumber forknum)
@@ -764,17 +745,16 @@ smgrregistersync(SMgrRelation reln, ForkNumber forknum)
  * Synchronously force all previous writes to the specified relation
  * down to disk.
  *
- * This is useful for building completely new relations (eg, new
- * indexes).  Instead of incrementally WAL-logging the index build
- * steps, we can just write completed index pages to disk with smgrwrite
- * or smgrextend, and then fsync the completed index file before
- * committing the transaction.  (This is sufficient for purposes of
- * crash recovery, since it effectively duplicates forcing a checkpoint
- * for the completed index.  But it is *not* sufficient if one wishes
- * to use the WAL log for PITR or replication purposes: in that case
- * we have to make WAL entries as well.)
- *
- * The preceding writes should specify skipFsync = true to avoid
+ * This is useful for building completely new relations (eg, new indexes).
+ * Instead of incrementally WAL-logging the index build steps, we can just
+ * write completed index pages to disk with smgrwritev, and then fsync the
+ * completed index file before committing the transaction.  (This is
+ * sufficient for purposes of crash recovery, since it effectively duplicates
+ * forcing a checkpoint for the completed index.  But it is *not* sufficient
+ * if one wishes to use the WAL log for PITR or replication purposes: in that
+ * case we have to make WAL entries as well.)
+ *
+ * The preceding writes should specify SMGR_WRITE_SKIP_FSYNC to avoid
  * duplicative fsyncs.
  *
  * Note that you need to do FlushRelationBuffers() first if there is
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 620f10abdeb..5fcd6f47dfe 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -26,8 +26,6 @@ extern void mdclose(SMgrRelation reln, ForkNumber forknum);
 extern void mdcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern bool mdexists(SMgrRelation reln, ForkNumber forknum);
 extern void mdunlink(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo);
-extern void mdextend(SMgrRelation reln, ForkNumber forknum,
-					 BlockNumber blocknum, const void *buffer, bool skipFsync);
 extern void mdzeroextend(SMgrRelation reln, ForkNumber forknum,
 						 BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool mdprefetch(SMgrRelation reln, ForkNumber forknum,
@@ -36,7 +34,7 @@ extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 					void **buffers, BlockNumber nblocks);
 extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
 					 BlockNumber blocknum,
-					 const void **buffers, BlockNumber nblocks, bool skipFsync);
+					 const void **buffers, BlockNumber nblocks, int flags);
 extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
 						BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index fc5f883ce14..2b8b72820cd 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -73,6 +73,9 @@ typedef SMgrRelationData *SMgrRelation;
 #define SmgrIsTemp(smgr) \
 	RelFileLocatorBackendIsTemp((smgr)->smgr_rlocator)
 
+#define SMGR_WRITE_SKIP_FSYNC 0x01
+#define SMGR_WRITE_EXTEND 0x02
+
 extern void smgrinit(void);
 extern SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend);
 extern bool smgrexists(SMgrRelation reln, ForkNumber forknum);
@@ -86,8 +89,6 @@ extern void smgrreleaserellocator(RelFileLocatorBackend rlocator);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
-extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
-					   BlockNumber blocknum, const void *buffer, bool skipFsync);
 extern void smgrzeroextend(SMgrRelation reln, ForkNumber forknum,
 						   BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum,
@@ -98,7 +99,7 @@ extern void smgrreadv(SMgrRelation reln, ForkNumber forknum,
 extern void smgrwritev(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber blocknum,
 					   const void **buffer, BlockNumber nblocks,
-					   bool skipFsync);
+					   int flags);
 extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,
 						  BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
@@ -121,7 +122,17 @@ static inline void
 smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 		  const void *buffer, bool skipFsync)
 {
-	smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
+	smgrwritev(reln, forknum, blocknum, &buffer, 1,
+			   skipFsync ? SMGR_WRITE_SKIP_FSYNC : 0);
+}
+
+static inline void
+smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+		   const void *buffer, bool skipFsync)
+{
+	smgrwritev(reln, forknum, blocknum, &buffer, 1,
+			   SMGR_WRITE_EXTEND |
+			   (skipFsync ? SMGR_WRITE_SKIP_FSYNC : 0));
 }
 
 #endif							/* SMGR_H */
-- 
2.44.0

v7-0002-Use-vectored-I-O-in-CREATE-INDEX.patchtext/x-patch; charset=US-ASCII; name=v7-0002-Use-vectored-I-O-in-CREATE-INDEX.patchDownload
From df9e2356d61c47f4a943d0a21ccc64faf6616cc0 Mon Sep 17 00:00:00 2001
From: Thomas Munro <tmunro@postgresql.org>
Date: Mon, 8 Apr 2024 18:58:57 +1200
Subject: [PATCH v7 2/3] Use vectored I/O in CREATE INDEX.

Commit 8af25652's bulk_write.c was originally designed with the goal of
being able to use vectored writes (along with other batching
opportunities), but it couldn't do that at the time because the vectored
variant of smgrextend() didn't exist yet.  Now it does, so we can
connect the dots.

Concretely, CREATE INDEX commands for several index types now respect
the io_combine_limit setting, when bypassing the buffer pool and writing
new indexes directly to the storage manager.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
Discussion: https://postgr.es/m/30e8f366-58b3-b239-c521-422122dd5150%40iki.fi
---
 src/backend/storage/smgr/bulk_write.c | 93 ++++++++++++++++++++-------
 1 file changed, 69 insertions(+), 24 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 4a10ece4c39..45d66245860 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -8,7 +8,7 @@
  * the regular buffer manager and the bulk loading interface!
  *
  * We bypass the buffer manager to avoid the locking overhead, and call
- * smgrextend() directly.  A downside is that the pages will need to be
+ * smgrwritev() directly.  A downside is that the pages will need to be
  * re-read into shared buffers on first use after the build finishes.  That's
  * usually a good tradeoff for large relations, and for small relations, the
  * overhead isn't very significant compared to creating the relation in the
@@ -225,35 +225,79 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 	for (int i = 0; i < npending; i++)
 	{
-		BlockNumber blkno = pending_writes[i].blkno;
-		Page		page = pending_writes[i].buf->data;
-
+		Page		page;
+		const void *pages[MAX_IO_COMBINE_LIMIT];
+		BlockNumber blkno;
+		int			nblocks;
+		int			max_nblocks;
+
+		/* Prepare to write the first block. */
+		blkno = pending_writes[i].blkno;
+		page = pending_writes[i].buf->data;
 		PageSetChecksumInplace(page, blkno);
+		pages[0] = page;
+		nblocks = 1;
 
-		if (blkno >= bulkstate->pages_written)
+		/*
+		 * If we have to write pages nonsequentially, fill in the space with
+		 * zeroes until we come back and overwrite.  This is not logically
+		 * necessary on standard Unix filesystems (unwritten space will read
+		 * as zeroes anyway), but it should help to avoid fragmentation.  The
+		 * dummy pages aren't WAL-logged though.
+		 */
+		while (blkno > bulkstate->pages_written)
+		{
+			/* don't set checksum for all-zero page */
+			smgrextend(bulkstate->smgr, bulkstate->forknum,
+					   bulkstate->pages_written++,
+					   &zero_buffer,
+					   true);
+		}
+
+		if (blkno < bulkstate->pages_written)
 		{
 			/*
-			 * If we have to write pages nonsequentially, fill in the space
-			 * with zeroes until we come back and overwrite.  This is not
-			 * logically necessary on standard Unix filesystems (unwritten
-			 * space will read as zeroes anyway), but it should help to avoid
-			 * fragmentation.  The dummy pages aren't WAL-logged though.
+			 * We're overwriting.  Clamp at the existing size, because we
+			 * can't mix writing and extending in a single operation.
 			 */
-			while (blkno > bulkstate->pages_written)
-			{
-				/* don't set checksum for all-zero page */
-				smgrextend(bulkstate->smgr, bulkstate->forknum,
-						   bulkstate->pages_written++,
-						   &zero_buffer,
-						   true);
-			}
-
-			smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-			bulkstate->pages_written = pending_writes[i].blkno + 1;
+			max_nblocks = Min(io_combine_limit,
+							  bulkstate->pages_written - blkno);
+		}
+		else
+		{
+			/* We're extending. */
+			Assert(blkno == bulkstate->pages_written);
+			max_nblocks = io_combine_limit;
+		}
+
+		/* Collect as many consecutive blocks as we can. */
+		while (i + 1 < npending &&
+			   pending_writes[i + 1].blkno == blkno + nblocks &&
+			   nblocks < max_nblocks)
+		{
+			page = pending_writes[++i].buf->data;
+			PageSetChecksumInplace(page, pending_writes[i].blkno);
+			pages[nblocks++] = page;
+		}
+
+		/* Extend or overwrite. */
+		if (blkno == bulkstate->pages_written)
+		{
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno,
+					   pages, nblocks,
+					   SMGR_WRITE_SKIP_FSYNC | SMGR_WRITE_EXTEND);
+			bulkstate->pages_written += nblocks;
 		}
 		else
-			smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-		pfree(page);
+		{
+			Assert(blkno + nblocks <= bulkstate->pages_written);
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno,
+					   pages, nblocks,
+					   SMGR_WRITE_SKIP_FSYNC);
+		}
+
+		for (int j = 0; j < nblocks; ++j)
+			pfree(pending_writes[i - j].buf->data);
 	}
 
 	bulkstate->npending = 0;
@@ -277,7 +321,8 @@ smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer
 	w->blkno = blocknum;
 	w->page_std = page_std;
 
-	if (bulkstate->npending == MAX_PENDING_WRITES)
+	if (bulkstate->npending == MAX_PENDING_WRITES ||
+		bulkstate->npending == io_combine_limit)
 		smgr_bulk_flush(bulkstate);
 }
 
-- 
2.44.0

v7-0003-Use-contiguous-memory-for-bulk_write.c.patchtext/x-patch; charset=US-ASCII; name=v7-0003-Use-contiguous-memory-for-bulk_write.c.patchDownload
From 642e563a1aa8a4c27be2792dde558730edc79900 Mon Sep 17 00:00:00 2001
From: Thomas Munro <tmunro@postgresql.org>
Date: Mon, 8 Apr 2024 19:59:45 +1200
Subject: [PATCH v7 3/3] Use contiguous memory for bulk_write.c.

Instead of allocating buffers one at a time with palloc(), allocate an
array full of them up front, and then manage them in a FIFO freelist.
Aside from avoiding repeated allocator overheads, this means that
callers that tend to write sequential blocks will tend to fill up
sequential memory, which hopefully generates more efficient vectored
writes or simple non-vectored writes.  This implements an idea described
in the comments of commit 8af25652.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/smgr/bulk_write.c | 66 +++++++++++++++++++--------
 src/tools/pgindent/typedefs.list      |  1 +
 2 files changed, 48 insertions(+), 19 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 45d66245860..6d16619f6be 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -36,6 +36,7 @@
 
 #include "access/xloginsert.h"
 #include "access/xlogrecord.h"
+#include "lib/ilist.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
 #include "storage/bulk_write.h"
@@ -47,9 +48,15 @@
 
 static const PGIOAlignedBlock zero_buffer = {{0}};	/* worth BLCKSZ */
 
+typedef union BufferSlot
+{
+	PGIOAlignedBlock buffer;
+	dlist_node	freelist_node;
+} BufferSlot;
+
 typedef struct PendingWrite
 {
-	BulkWriteBuffer buf;
+	BufferSlot *slot;
 	BlockNumber blkno;
 	bool		page_std;
 } PendingWrite;
@@ -59,6 +66,14 @@ typedef struct PendingWrite
  */
 struct BulkWriteState
 {
+	/*
+	 * This must come first so that it is correctly aligned for I/O.  We have
+	 * one extra slot for the write that has been allocated but not yet
+	 * submitted to smgr_bulk_write() yet.
+	 */
+	BufferSlot	buffer_slots[MAX_PENDING_WRITES + 1];
+	dlist_head	buffer_slots_freelist;
+
 	/* Information about the target relation we're writing */
 	SMgrRelation smgr;
 	ForkNumber	forknum;
@@ -73,8 +88,6 @@ struct BulkWriteState
 
 	/* The RedoRecPtr at the time that the bulk operation started */
 	XLogRecPtr	start_RedoRecPtr;
-
-	MemoryContext memcxt;
 };
 
 static void smgr_bulk_flush(BulkWriteState *bulkstate);
@@ -100,7 +113,7 @@ smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
 {
 	BulkWriteState *state;
 
-	state = palloc(sizeof(BulkWriteState));
+	state = palloc_aligned(sizeof(BulkWriteState), PG_IO_ALIGN_SIZE, 0);
 	state->smgr = smgr;
 	state->forknum = forknum;
 	state->use_wal = use_wal;
@@ -110,11 +123,11 @@ smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
 
 	state->start_RedoRecPtr = GetRedoRecPtr();
 
-	/*
-	 * Remember the memory context.  We will use it to allocate all the
-	 * buffers later.
-	 */
-	state->memcxt = CurrentMemoryContext;
+	/* Set up the free-list of buffers. */
+	dlist_init(&state->buffer_slots_freelist);
+	for (int i = 0; i < lengthof(state->buffer_slots); ++i)
+		dlist_push_tail(&state->buffer_slots_freelist,
+						&state->buffer_slots[i].freelist_node);
 
 	return state;
 }
@@ -208,7 +221,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 		for (int i = 0; i < npending; i++)
 		{
 			blknos[i] = pending_writes[i].blkno;
-			pages[i] = pending_writes[i].buf->data;
+			pages[i] = pending_writes[i].slot->buffer.data;
 
 			/*
 			 * If any of the pages use !page_std, we log them all as such.
@@ -233,7 +246,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 		/* Prepare to write the first block. */
 		blkno = pending_writes[i].blkno;
-		page = pending_writes[i].buf->data;
+		page = pending_writes[i].slot->buffer.data;
 		PageSetChecksumInplace(page, blkno);
 		pages[0] = page;
 		nblocks = 1;
@@ -275,7 +288,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 			   pending_writes[i + 1].blkno == blkno + nblocks &&
 			   nblocks < max_nblocks)
 		{
-			page = pending_writes[++i].buf->data;
+			page = pending_writes[++i].slot->buffer.data;
 			PageSetChecksumInplace(page, pending_writes[i].blkno);
 			pages[nblocks++] = page;
 		}
@@ -296,8 +309,14 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 					   SMGR_WRITE_SKIP_FSYNC);
 		}
 
-		for (int j = 0; j < nblocks; ++j)
-			pfree(pending_writes[i - j].buf->data);
+		/*
+		 * Maintain FIFO ordering in the free list, so that users who write
+		 * blocks in sequential order tend to get sequential chunks of buffer
+		 * memory, which may be slight more efficient for vectored writes.
+		 */
+		for (int j = i - nblocks + 1; j <= i; ++j)
+			dlist_push_tail(&bulkstate->buffer_slots_freelist,
+							&pending_writes[j].slot->freelist_node);
 	}
 
 	bulkstate->npending = 0;
@@ -317,7 +336,7 @@ smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer
 	PendingWrite *w;
 
 	w = &bulkstate->pending_writes[bulkstate->npending++];
-	w->buf = buf;
+	w->slot = (BufferSlot *) buf;
 	w->blkno = blocknum;
 	w->page_std = page_std;
 
@@ -332,12 +351,21 @@ smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer
  * There is no function to free the buffer.  When you pass it to
  * smgr_bulk_write(), it takes ownership and frees it when it's no longer
  * needed.
- *
- * This is currently implemented as a simple palloc, but could be implemented
- * using a ring buffer or larger chunks in the future, so don't rely on it.
  */
 BulkWriteBuffer
 smgr_bulk_get_buf(BulkWriteState *bulkstate)
 {
-	return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+	BufferSlot *slot;
+
+	if (dlist_is_empty(&bulkstate->buffer_slots_freelist))
+	{
+		smgr_bulk_flush(bulkstate);
+		if (dlist_is_empty(&bulkstate->buffer_slots_freelist))
+			elog(ERROR, "too many bulk write buffers used but not yet written");
+	}
+
+	slot = dlist_head_element(BufferSlot, freelist_node, &bulkstate->buffer_slots_freelist);
+	dlist_pop_head_node(&bulkstate->buffer_slots_freelist);
+
+	return &slot->buffer;
 }
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 704e61dcaa2..05669916c78 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -328,6 +328,7 @@ BufferDescPadded
 BufferHeapTupleTableSlot
 BufferLookupEnt
 BufferManagerRelation
+BufferSlot
 BufferStrategyControl
 BufferTag
 BufferUsage
-- 
2.44.0

#16Noah Misch
noah@leadboat.com
In reply to: Thomas Munro (#15)
Re: Vectored I/O in bulk_write.c

On Tue, Apr 09, 2024 at 04:51:52PM +1200, Thomas Munro wrote:

Here's a rebase. I decided against committing this for v17 in the
end. There's not much wrong with it AFAIK, except perhaps an
unprincipled chopping up of writes with large io_combine_limit due to
simplistic flow control, and I liked the idea of having a decent user
of smgrwritev() in the tree, and it probably makes CREATE INDEX a bit
faster, but... I'd like to try something more ambitious that
streamifies this and also the "main" writeback paths.

I see this in the CF as Needs Review since 2024-03-10, but this 2024-04-09
message sounds like you were abandoning it. Are you still commissioning a
review of this thread's patches, or is the CF entry obsolete?