smgrextendv and vectorizing the bulk_write implementation

Started by Matthias van de Meentabout 1 year ago2 messages
#1Matthias van de Meent
boekewurm+postgres@gmail.com
2 attachment(s)

Hi,

While working on the fix for [0]/messages/by-id/flat/CACAa4VJ+QY4pY7M0ECq29uGkrOygikYtao1UG9yCDFosxaps9g@mail.gmail.com I noticed that bulk_write doens't use
any of the new vectorized IO features, which seemed like a waste.
After looking into it a bit deeper, I noticed the opportunity for
write vectorization was not very high, as one would expect most
bulk_write IOs to be smgrextend(), which only does page-sized writes.
That's something that can be solved, though, and so I started this
patch.

I've attached two patches to address these two items:

Patch 1/2 reworks smgrextend to smgrextendv, which does mostly the
same stuff as the current smgrextend, but operates on multiple pages.
Patch 2/2 updates bulk_write to make use of smgrwritev,
smgrzeroextend, and the new smgrextendv API, thus reducing the syscall
burden in processes that use bulk extend APIs.

Open question:
In my version of smgrextendv, I reject any failure to extend by the
requested size. This is different from smgrwrite, which tries to write
again when FileWriteV returns a short write. Should smgrextendv do
retries, too?

Kind regards,

Matthias van de Meent
Neon (https://neon.tech)

[0]: /messages/by-id/flat/CACAa4VJ+QY4pY7M0ECq29uGkrOygikYtao1UG9yCDFosxaps9g@mail.gmail.com

Attachments:

v0-0001-Vectorize-smgrextend.patchapplication/octet-stream; name=v0-0001-Vectorize-smgrextend.patchDownload
From f4014423a7e543792bb12c30029a23411af115ed Mon Sep 17 00:00:00 2001
From: Matthias van de Meent <boekewurm+postgres@gmail.com>
Date: Fri, 22 Nov 2024 14:54:01 +0100
Subject: [PATCH v0 1/2] Vectorize smgrextend

smgrextend writes blocks, but was one of the final apis to access only single
buffers.  This API change improves that by allowing multi-block extend calls.
---
 src/include/storage/md.h        |  5 +-
 src/include/storage/smgr.h      | 12 ++++-
 src/backend/storage/smgr/md.c   | 92 ++++++++++++++++++++++-----------
 src/backend/storage/smgr/smgr.c | 17 +++---
 4 files changed, 84 insertions(+), 42 deletions(-)

diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index b72293c79a..0e01ca01da 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -26,8 +26,9 @@ extern void mdclose(SMgrRelation reln, ForkNumber forknum);
 extern void mdcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern bool mdexists(SMgrRelation reln, ForkNumber forknum);
 extern void mdunlink(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo);
-extern void mdextend(SMgrRelation reln, ForkNumber forknum,
-					 BlockNumber blocknum, const void *buffer, bool skipFsync);
+extern void mdextendv(SMgrRelation reln, ForkNumber forknum,
+					  BlockNumber blocknum, const void **buffers,
+					  int nblocks, bool skipFsync);
 extern void mdzeroextend(SMgrRelation reln, ForkNumber forknum,
 						 BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool mdprefetch(SMgrRelation reln, ForkNumber forknum,
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 5ab992f5bd..bee0f3eb6b 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -86,8 +86,9 @@ extern void smgrreleaserellocator(RelFileLocatorBackend rlocator);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
-extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
-					   BlockNumber blocknum, const void *buffer, bool skipFsync);
+extern void smgrextendv(SMgrRelation reln, ForkNumber forknum,
+						BlockNumber blocknum, const void **buffers,
+						int nblocks, bool skipFsync);
 extern void smgrzeroextend(SMgrRelation reln, ForkNumber forknum,
 						   BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum,
@@ -126,4 +127,11 @@ smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
 }
 
+static inline void
+smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+		   const void *buffer, bool skipFsync)
+{
+	smgrextendv(reln, forknum, blocknum, &buffer, 1, skipFsync);
+}
+
 #endif							/* SMGR_H */
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index cc8a80ee96..56151aa65a 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -448,17 +448,17 @@ mdunlinkfork(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo)
 }
 
 /*
- * mdextend() -- Add a block to the specified relation.
+ * mdextendv() -- Add blocks to the specified relation.
  *
- * The semantics are nearly the same as mdwrite(): write at the
+ * The semantics are nearly the same as mdwritev(): write at the
  * specified position.  However, this is to be used for the case of
  * extending a relation (i.e., blocknum is at or beyond the current
  * EOF).  Note that we assume writing a block beyond current EOF
  * causes intervening file space to become filled with zeroes.
  */
 void
-mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void *buffer, bool skipFsync)
+mdextendv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+		  const void **buffers, int nblocks, bool skipFsync)
 {
 	off_t		seekpos;
 	int			nbytes;
@@ -466,7 +466,7 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 
 	/* If this build supports direct I/O, the buffer must be I/O aligned. */
 	if (PG_O_DIRECT != 0 && PG_IO_ALIGN_SIZE <= BLCKSZ)
-		Assert((uintptr_t) buffer == TYPEALIGN(PG_IO_ALIGN_SIZE, buffer));
+		Assert((uintptr_t) *buffers == TYPEALIGN(PG_IO_ALIGN_SIZE, *buffers));
 
 	/* This assert is too expensive to have on normally ... */
 #ifdef CHECK_WRITE_VS_EXTEND
@@ -479,40 +479,72 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	 * InvalidBlockNumber.  (Note that this failure should be unreachable
 	 * because of upstream checks in bufmgr.c.)
 	 */
-	if (blocknum == InvalidBlockNumber)
+	if (blocknum >= InvalidBlockNumber - nblocks)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 				 errmsg("cannot extend file \"%s\" beyond %u blocks",
 						relpath(reln->smgr_rlocator, forknum),
 						InvalidBlockNumber)));
 
-	v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_CREATE);
 
-	seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+	while (nblocks > 0)
+	{
+		int		seg_remaining = RELSEG_SIZE - (blocknum % RELSEG_SIZE);
+		int		write_this_seg = Min(seg_remaining, nblocks);
+		struct iovec ios[PG_IOV_MAX];
+		int		tot_io_len = 0;
+		int		num_ios = 0;
+		char   *last_buf_end = NULL;
 
-	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
+		v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_CREATE);
 
-	if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ)
-	{
-		if (nbytes < 0)
+		seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+
+		Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
+
+		for (int i = 0; i < write_this_seg; i++)
+		{
+			if (last_buf_end && last_buf_end == (char *) buffers[i])
+			{
+				ios[num_ios - 1].iov_len += BLCKSZ;
+				last_buf_end += BLCKSZ;
+			}
+			else
+			{
+				ios[num_ios].iov_len = BLCKSZ;
+				ios[num_ios].iov_base = (void *) buffers[i];
+				last_buf_end = ((char *) buffers[i]) + BLCKSZ;
+				num_ios++;
+			}
+			tot_io_len += BLCKSZ;
+		}
+
+		if ((nbytes = (int) FileWriteV(v->mdfd_vfd, ios, num_ios, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != tot_io_len)
+		{
+			if (nbytes < 0)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+							errmsg("could not extend file \"%s\": %m",
+								   FilePathName(v->mdfd_vfd)),
+							errhint("Check free disk space.")));
+			/* short write: complain appropriately */
 			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not extend file \"%s\": %m",
-							FilePathName(v->mdfd_vfd)),
-					 errhint("Check free disk space.")));
-		/* short write: complain appropriately */
-		ereport(ERROR,
-				(errcode(ERRCODE_DISK_FULL),
-				 errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u",
-						FilePathName(v->mdfd_vfd),
-						nbytes, BLCKSZ, blocknum),
-				 errhint("Check free disk space.")));
-	}
+					(errcode(ERRCODE_DISK_FULL),
+						errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u",
+							   FilePathName(v->mdfd_vfd),
+							   nbytes, tot_io_len, blocknum),
+						errhint("Check free disk space.")));
+		}
 
-	if (!skipFsync && !SmgrIsTemp(reln))
-		register_dirty_segment(reln, forknum, v);
+		if (!skipFsync && !SmgrIsTemp(reln))
+			register_dirty_segment(reln, forknum, v);
 
-	Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
+		Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
+
+		blocknum += write_this_seg;
+		buffers += write_this_seg;
+		nblocks -= write_this_seg;
+	}
 }
 
 /*
@@ -1676,9 +1708,9 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 				char	   *zerobuf = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE,
 													 MCXT_ALLOC_ZERO);
 
-				mdextend(reln, forknum,
-						 nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
-						 zerobuf, skipFsync);
+				mdextendv(reln, forknum,
+						  nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
+						  (const void **) &zerobuf, 1, skipFsync);
 				pfree(zerobuf);
 			}
 			flags = O_CREAT;
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 925728eb6c..7302fb496d 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -82,8 +82,9 @@ typedef struct f_smgr
 	bool		(*smgr_exists) (SMgrRelation reln, ForkNumber forknum);
 	void		(*smgr_unlink) (RelFileLocatorBackend rlocator, ForkNumber forknum,
 								bool isRedo);
-	void		(*smgr_extend) (SMgrRelation reln, ForkNumber forknum,
-								BlockNumber blocknum, const void *buffer, bool skipFsync);
+	void		(*smgr_extendv) (SMgrRelation reln, ForkNumber forknum,
+								 BlockNumber blocknum, const void **buffers,
+								 int nblocks, bool skipFsync);
 	void		(*smgr_zeroextend) (SMgrRelation reln, ForkNumber forknum,
 									BlockNumber blocknum, int nblocks, bool skipFsync);
 	bool		(*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum,
@@ -116,7 +117,7 @@ static const f_smgr smgrsw[] = {
 		.smgr_create = mdcreate,
 		.smgr_exists = mdexists,
 		.smgr_unlink = mdunlink,
-		.smgr_extend = mdextend,
+		.smgr_extendv = mdextendv,
 		.smgr_zeroextend = mdzeroextend,
 		.smgr_prefetch = mdprefetch,
 		.smgr_maxcombine = mdmaxcombine,
@@ -535,11 +536,11 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
  * causes intervening file space to become filled with zeroes.
  */
 void
-smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		   const void *buffer, bool skipFsync)
+smgrextendv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+			const void **buffers, int nblocks, bool skipFsync)
 {
-	smgrsw[reln->smgr_which].smgr_extend(reln, forknum, blocknum,
-										 buffer, skipFsync);
+	smgrsw[reln->smgr_which].smgr_extendv(reln, forknum, blocknum,
+										  buffers, nblocks, skipFsync);
 
 	/*
 	 * Normally we expect this to increase nblocks by one, but if the cached
@@ -547,7 +548,7 @@ smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	 * kernel.
 	 */
 	if (reln->smgr_cached_nblocks[forknum] == blocknum)
-		reln->smgr_cached_nblocks[forknum] = blocknum + 1;
+		reln->smgr_cached_nblocks[forknum] = blocknum + nblocks;
 	else
 		reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
 }
-- 
2.45.2

v0-0002-Modernize-bulk_write-s-internals-with-vectorized-.patchapplication/octet-stream; name=v0-0002-Modernize-bulk_write-s-internals-with-vectorized-.patchDownload
From 5fc8eaf271ae7019880fbb53b7ba51760b0ef596 Mon Sep 17 00:00:00 2001
From: Matthias van de Meent <boekewurm+postgres@gmail.com>
Date: Fri, 22 Nov 2024 17:50:15 +0100
Subject: [PATCH v0 2/2] Modernize bulk_write's internals with vectorized
 writes

By merging operations we reduce the number of syscalls and thus context
switches.  We now also make use of zeroextend, rather than our own artisinal
version using smgrextend() with an empty page; further increasing efficiency
in the storage layer.
---
 src/backend/storage/smgr/bulk_write.c | 79 ++++++++++++++++++---------
 1 file changed, 53 insertions(+), 26 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 274051c40d..1c40801e2b 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -46,8 +46,6 @@
 
 #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
 
-static const PGIOAlignedBlock zero_buffer = {{0}};	/* worth BLCKSZ */
-
 typedef struct PendingWrite
 {
 	BulkWriteBuffer buf;
@@ -274,40 +272,69 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 					 npending, blknos, pages, page_std);
 	}
 
-	for (int i = 0; i < npending; i++)
+	while (npending > 0)
 	{
-		BlockNumber blkno = pending_writes[i].blkno;
-		Page		page = pending_writes[i].buf->data;
+		const void  *buffers[MAX_PENDING_WRITES];
+		int		nbufs = 0;
+
+		BlockNumber	blkno = pending_writes[nbufs].blkno;
+		buffers[nbufs] = pending_writes[nbufs].buf->data;
 
-		PageSetChecksumInplace(page, blkno);
+		PageSetChecksumInplace(pending_writes[nbufs].buf->data, blkno + nbufs);
+		nbufs++;
 
-		if (blkno >= bulkstate->relsize)
+		for (; nbufs < npending && nbufs < MAX_PENDING_WRITES;)
 		{
+			if (pending_writes[nbufs].blkno != blkno + nbufs)
+				break;
+
 			/*
-			 * If we have to write pages nonsequentially, fill in the space
-			 * with zeroes until we come back and overwrite.  This is not
-			 * logically necessary on standard Unix filesystems (unwritten
-			 * space will read as zeroes anyway), but it should help to avoid
-			 * fragmentation.  The dummy pages aren't WAL-logged though.
+			 * Never combine past the end of the relation; we need to separate
+			 * smgrwritev() and smgrextendv()
 			 */
-			while (blkno > bulkstate->relsize)
-			{
-				/* don't set checksum for all-zero page */
-				smgrextend(bulkstate->smgr, bulkstate->forknum,
-						   bulkstate->relsize,
-						   &zero_buffer,
-						   true);
-				bulkstate->relsize++;
-			}
-
-			smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-			bulkstate->relsize++;
+			if (blkno + nbufs == bulkstate->relsize)
+				break;
+
+			buffers[nbufs] = pending_writes[nbufs].buf->data;
+			PageSetChecksumInplace(pending_writes[nbufs].buf->data,
+								   blkno + nbufs);
+			nbufs++;
+		}
+		
+		Assert(((int64) blkno) + nbufs <= InvalidBlockNumber);
+
+		if (blkno < bulkstate->relsize)
+		{
+			Assert(blkno + nbufs <= bulkstate->relsize);
+
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno,
+					   buffers, nbufs, true);
 		}
 		else
-			smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-		pfree(page);
+		{
+			Assert(blkno >= bulkstate->relsize);
+
+			if (blkno > bulkstate->relsize)
+				smgrzeroextend(bulkstate->smgr, bulkstate->forknum,
+							   bulkstate->relsize,
+							   (int) (blkno - bulkstate->relsize),
+							   true);
+
+			smgrextendv(bulkstate->smgr, bulkstate->forknum, blkno,
+						buffers, nbufs, true);
+
+			bulkstate->relsize = blkno + nbufs;
+		}
+
+		npending -= nbufs;
+		pending_writes += nbufs;
+
+		for (int i = 0; i < nbufs; i++)
+			pfree((void *) buffers[i]);
 	}
 
+	Assert(npending == 0);
+
 	bulkstate->npending = 0;
 }
 
-- 
2.45.2

#2Heikki Linnakangas
hlinnaka@iki.fi
In reply to: Matthias van de Meent (#1)
Re: smgrextendv and vectorizing the bulk_write implementation

On 22/11/2024 19:49, Matthias van de Meent wrote:

Hi,

While working on the fix for [0] I noticed that bulk_write doens't use
any of the new vectorized IO features, which seemed like a waste.
After looking into it a bit deeper, I noticed the opportunity for
write vectorization was not very high, as one would expect most
bulk_write IOs to be smgrextend(), which only does page-sized writes.
That's something that can be solved, though, and so I started this
patch.

+1

I've attached two patches to address these two items:

Patch 1/2 reworks smgrextend to smgrextendv, which does mostly the
same stuff as the current smgrextend, but operates on multiple pages.
Patch 2/2 updates bulk_write to make use of smgrwritev,
smgrzeroextend, and the new smgrextendv API, thus reducing the syscall
burden in processes that use bulk extend APIs.

Seems straightforward.

Thomas wrote patches earlier to do similar thing, see
/messages/by-id/CA+hUKGLx5bLwezZKAYB2O_qHj=ov10RpgRVY7e8TSJVE74oVjg@mail.gmail.com.
I haven't looked closely at the patches to see what the differences are.

Open question:
In my version of smgrextendv, I reject any failure to extend by the
requested size. This is different from smgrwrite, which tries to write
again when FileWriteV returns a short write. Should smgrextendv do
retries, too?

Hmm, a short write seems just as possible in smgrextendv() too, it
should retry.

In principle you could get a short write even with a BLCKSZ write. We've
always just assumed that it won't happen, or if it does it means you ran
out of disk space. I don't know why we ever assumed that, even though it
has worked in practice. But I think we should stop assuming that going
forward, and always retry short writes.

--
Heikki Linnakangas
Neon (https://neon.tech)