From 631478435f4aff6e55fecd03ea27aca37e5f232e Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Thu, 27 Jul 2023 10:58:26 +1200
Subject: [PATCH v1 13/14] WIP: Provide vectored variant of FlushBuffer().

FlushBuffers() is just like FlushBuffer() except that it tries to write
out multiple consecutive disk blocks at a time with smgrwritev().  The
traditional FlushBuffer() call is now implemented in terms of it.

Author: Thomas Munro <thomas.munro@gmail.com>
---
 src/backend/storage/buffer/bufmgr.c | 235 ++++++++++++++++++++--------
 src/include/storage/buf_internals.h |  10 ++
 2 files changed, 181 insertions(+), 64 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 04dbede971..61eceb4b99 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -53,6 +53,7 @@
 #include "storage/smgr.h"
 #include "storage/standby.h"
 #include "utils/memdebug.h"
+#include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/rel.h"
 #include "utils/resowner_private.h"
@@ -478,6 +479,11 @@ static void WaitIO(BufferDesc *buf);
 static bool StartBufferIO(BufferDesc *buf, bool forInput);
 static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
 							  uint32 set_flag_bits);
+struct shared_buffer_write_error_info
+{
+	BufferDesc *buf;
+	int			nblocks;
+};
 static void shared_buffer_write_error_callback(void *arg);
 static void local_buffer_write_error_callback(void *arg);
 static BufferDesc *BufferAlloc(SMgrRelation smgr,
@@ -488,6 +494,8 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr,
 							   bool *foundPtr, bool *allocPtr,
 							   IOContext io_context);
 static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
+static int	FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln,
+						 IOObject io_object, IOContext io_context);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 						IOObject io_object, IOContext io_context);
 static void FindAndDropRelationBuffers(RelFileLocator rlocator,
@@ -3463,8 +3471,12 @@ BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum,
 }
 
 /*
- * FlushBuffer
- *		Physically write out a shared buffer.
+ * FlushBuffers
+ *		Physically write out shared buffers.
+ *
+ * The buffers do not have to be consecutive in memory but must refer to
+ * consecutive blocks of the same relation fork in increasing order of block
+ * number.
  *
  * NOTE: this actually just passes the buffer contents to the kernel; the
  * real write to disk won't happen until the kernel feels like it.  This
@@ -3472,66 +3484,148 @@ BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum,
  * However, we will need to force the changes to disk via fsync before
  * we can checkpoint WAL.
  *
- * The caller must hold a pin on the buffer and have share-locked the
+ * The caller must hold a pin on the buffers and have share-locked the
  * buffer contents.  (Note: a share-lock does not prevent updates of
  * hint bits in the buffer, so the page could change while the write
  * is in progress, but we assume that that will not invalidate the data
  * written.)
  *
  * If the caller has an smgr reference for the buffer's relation, pass it
- * as the second parameter.  If not, pass NULL.
+ * as the third parameter.  If not, pass NULL.
  */
-static void
-FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
-			IOContext io_context)
+static int
+FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln,
+			 IOObject io_object, IOContext io_context)
 {
-	XLogRecPtr	recptr;
+	XLogRecPtr	max_recptr;
+	struct shared_buffer_write_error_info errinfo;
 	ErrorContextCallback errcallback;
 	instr_time	io_start;
-	Block		bufBlock;
-	char	   *bufToWrite;
-	uint32		buf_state;
+	int			first_start_io_index;
+	void	   *bufBlocks[MAX_BUFFERS_PER_TRANSFER];
+	bool		need_checksums = DataChecksumsEnabled();
+	static PGIOAlignedBlock *copies;
+
+	Assert(nbuffers > 0);
+	nbuffers = Min(nbuffers, lengthof(bufBlocks));
 
 	/*
-	 * Try to start an I/O operation.  If StartBufferIO returns false, then
-	 * someone else flushed the buffer before we could, so we need not do
-	 * anything.
+	 * Update page checksums if desired.  Since we have only shared lock on
+	 * the buffer, other processes might be updating hint bits in it, so we
+	 * must copy the page to private storage if we do checksumming.
+	 *
+	 * XXX:TODO kill static local memory, or better yet, kill unlocked hint
+	 * bit modifications so we don't need this
 	 */
-	if (!StartBufferIO(buf, false))
-		return;
+	if (need_checksums)
+	{
+		if (!copies)
+			copies = MemoryContextAllocAligned(TopMemoryContext,
+											   BLCKSZ * lengthof(bufBlocks),
+											   PG_IO_ALIGN_SIZE,
+											   0);
+		for (int i = 0; i < nbuffers; ++i)
+		{
+			memcpy(&copies[i], BufHdrGetBlock(bufs[i]), BLCKSZ);
+			PageSetChecksumInplace((Page) &copies[i],
+								   bufs[0]->tag.blockNum + i);
+		}
+	}
+
+	/*
+	 * Try to start an I/O operation on as many buffers as we can.  If
+	 * StartBufferIO returns false, then someone else flushed the buffer
+	 * before we could, so we need not do anything any we give up on any
+	 * remaining buffers, as they are not consecutive with the blocks we've
+	 * collected.
+	 */
+	first_start_io_index = -1;
+	for (int i = 0; i < nbuffers; ++i)
+	{
+		/* Must be consecutive blocks. */
+		if (i > 0)
+			Assert(BufferTagsConsecutive(&bufs[i - 1]->tag, &bufs[i]->tag));
+
+		if (!StartBufferIO(bufs[i], false))
+		{
+			if (first_start_io_index >= 0)
+			{
+				/*
+				 * We can't go any further because later blocks after this gap
+				 * would not be consecutive.  Cap nbuffers here.
+				 */
+				nbuffers = i;
+				break;
+			}
+			else
+			{
+				/*
+				 * Still searching for the first block we can win the right to
+				 * start I/O on, so it doesn't matter that we couldn't get
+				 * this one.
+				 */
+			}
+		}
+		else
+		{
+			/* Keep track of the first block we started I/O on. */
+			if (first_start_io_index < 0)
+				first_start_io_index = i;
+		}
+		/* Collect the source buffers (copies or shared buffers). */
+		bufBlocks[i] = need_checksums ? &copies[i] : BufHdrGetBlock(bufs[i]);
+	}
+
+	/* If we can't write even one buffer, then we're done. */
+	if (first_start_io_index < 0)
+		return nbuffers;
 
 	/* Setup error traceback support for ereport() */
+	errinfo.buf = bufs[first_start_io_index];
+	errinfo.nblocks = nbuffers;
 	errcallback.callback = shared_buffer_write_error_callback;
-	errcallback.arg = (void *) buf;
+	errcallback.arg = &errinfo;
 	errcallback.previous = error_context_stack;
 	error_context_stack = &errcallback;
 
 	/* Find smgr relation for buffer */
 	if (reln == NULL)
-		reln = smgropen(BufTagGetRelFileLocator(&buf->tag), InvalidBackendId);
+		reln = smgropen(BufTagGetRelFileLocator(&bufs[first_start_io_index]->tag),
+						InvalidBackendId);
 
-	TRACE_POSTGRESQL_BUFFER_FLUSH_START(BufTagGetForkNum(&buf->tag),
-										buf->tag.blockNum,
+	TRACE_POSTGRESQL_BUFFER_FLUSH_START(BufTagGetForkNum(&bufs[first_start_io_index]->tag),
+										bufs[first_start_io_index]->tag.blockNum,
 										reln->smgr_rlocator.locator.spcOid,
 										reln->smgr_rlocator.locator.dbOid,
 										reln->smgr_rlocator.locator.relNumber);
 
-	buf_state = LockBufHdr(buf);
+	/* Find the highest LSN across all the I/O-started buffers. */
+	max_recptr = 0;
+	for (int i = first_start_io_index; i < nbuffers; ++i)
+	{
+		XLogRecPtr	page_recptr;
+		uint32		buf_state;
 
-	/*
-	 * Run PageGetLSN while holding header lock, since we don't have the
-	 * buffer locked exclusively in all cases.
-	 */
-	recptr = BufferGetLSN(buf);
+		buf_state = LockBufHdr(bufs[i]);
 
-	/* To check if block content changes while flushing. - vadim 01/17/97 */
-	buf_state &= ~BM_JUST_DIRTIED;
-	UnlockBufHdr(buf, buf_state);
+		/*
+		 * Run PageGetLSN while holding header lock, since we don't have the
+		 * buffer locked exclusively in all cases.
+		 */
+		page_recptr = BufferGetLSN(bufs[i]);
+
+		/* To check if block content changes while flushing. - vadim 01/17/97 */
+		buf_state &= ~BM_JUST_DIRTIED;
+		UnlockBufHdr(bufs[i], buf_state);
+
+		if (buf_state & BM_PERMANENT)
+			max_recptr = Max(page_recptr, max_recptr);
+	}
 
 	/*
-	 * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
-	 * rule that log updates must hit disk before any of the data-file changes
-	 * they describe do.
+	 * Force XLOG flush up to buffers' greatest LSN.  This implements the
+	 * basic WAL rule that log updates must hit disk before any of the
+	 * data-file changes they describe do.
 	 *
 	 * However, this rule does not apply to unlogged relations, which will be
 	 * lost after a crash anyway.  Most unlogged relation pages do not bear
@@ -3545,33 +3639,23 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 * disastrous system-wide consequences.  To make sure that can't happen,
 	 * skip the flush if the buffer isn't permanent.
 	 */
-	if (buf_state & BM_PERMANENT)
-		XLogFlush(recptr);
+	if (max_recptr > 0)
+		XLogFlush(max_recptr);
 
 	/*
-	 * Now it's safe to write buffer to disk. Note that no one else should
+	 * Now it's safe to write buffers to disk. Note that no one else should
 	 * have been able to write it while we were busy with log flushing because
-	 * only one process at a time can set the BM_IO_IN_PROGRESS bit.
+	 * only one process at a time can set the BM_IO_IN_PROGRESS bits.
 	 */
-	bufBlock = BufHdrGetBlock(buf);
-
-	/*
-	 * Update page checksum if desired.  Since we have only shared lock on the
-	 * buffer, other processes might be updating hint bits in it, so we must
-	 * copy the page to private storage if we do checksumming.
-	 */
-	bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
 
 	io_start = pgstat_prepare_io_time();
 
-	/*
-	 * bufToWrite is either the shared buffer or a copy, as appropriate.
-	 */
-	smgrwrite(reln,
-			  BufTagGetForkNum(&buf->tag),
-			  buf->tag.blockNum,
-			  bufToWrite,
-			  false);
+	smgrwritev(reln,
+			   BufTagGetForkNum(&bufs[first_start_io_index]->tag),
+			   bufs[first_start_io_index]->tag.blockNum,
+			   (const void **) &bufBlocks[first_start_io_index],
+			   nbuffers - first_start_io_index,
+			   false);
 
 	/*
 	 * When a strategy is in use, only flushes of dirty buffers already in the
@@ -3594,22 +3678,38 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	pgstat_count_io_op_time(IOOBJECT_RELATION, io_context,
 							IOOP_WRITE, io_start, 1);
 
-	pgBufferUsage.shared_blks_written++;
+	pgBufferUsage.shared_blks_written += nbuffers - first_start_io_index;
 
 	/*
-	 * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
+	 * Mark the buffers as clean (unless BM_JUST_DIRTIED has become set) and
 	 * end the BM_IO_IN_PROGRESS state.
 	 */
-	TerminateBufferIO(buf, true, 0);
+	for (int i = first_start_io_index; i < nbuffers; ++i)
+		TerminateBufferIO(bufs[i], true, 0);
 
-	TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(BufTagGetForkNum(&buf->tag),
-									   buf->tag.blockNum,
+	TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(BufTagGetForkNum(&bufs[first_start_io_index]->tag),
+									   bufs[0]->tag.blockNum,
 									   reln->smgr_rlocator.locator.spcOid,
 									   reln->smgr_rlocator.locator.dbOid,
 									   reln->smgr_rlocator.locator.relNumber);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	return nbuffers;
+}
+
+/*
+ * FlushBuffer
+ *		Physically write out just one shared buffer.
+ *
+ * Single-block variant of FlushBuffers().
+ */
+static void
+FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
+			IOContext io_context)
+{
+	FlushBuffers(&buf, 1, reln, io_object, io_context);
 }
 
 /*
@@ -5411,16 +5511,23 @@ AbortBufferIO(Buffer buffer)
 static void
 shared_buffer_write_error_callback(void *arg)
 {
-	BufferDesc *bufHdr = (BufferDesc *) arg;
+	struct shared_buffer_write_error_info *info = arg;
 
 	/* Buffer is pinned, so we can read the tag without locking the spinlock */
-	if (bufHdr != NULL)
+	if (info != NULL)
 	{
-		char	   *path = relpathperm(BufTagGetRelFileLocator(&bufHdr->tag),
-									   BufTagGetForkNum(&bufHdr->tag));
-
-		errcontext("writing block %u of relation %s",
-				   bufHdr->tag.blockNum, path);
+		char	   *path = relpathperm(BufTagGetRelFileLocator(&info->buf->tag),
+									   BufTagGetForkNum(&info->buf->tag));
+
+		if (info->nblocks > 1)
+			errcontext("writing blocks %u..%u of relation %s",
+					   info->buf->tag.blockNum,
+					   info->buf->tag.blockNum + info->nblocks - 1,
+					   path);
+		else
+			errcontext("writing block %u of relation %s",
+					   info->buf->tag.blockNum,
+					   path);
 		pfree(path);
 	}
 }
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 427a989aaf..6a493d2b96 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -159,6 +159,16 @@ BufferTagsEqual(const BufferTag *tag1, const BufferTag *tag2)
 		(tag1->forkNum == tag2->forkNum);
 }
 
+static inline bool
+BufferTagsConsecutive(const BufferTag *tag1, const BufferTag *tag2)
+{
+	return (tag1->spcOid == tag2->spcOid) &&
+		(tag1->dbOid == tag2->dbOid) &&
+		(tag1->relNumber == tag2->relNumber) &&
+		(tag1->forkNum == tag2->forkNum) &&
+		(tag1->blockNum + 1 == tag2->blockNum);
+}
+
 static inline bool
 BufTagMatchesRelFileLocator(const BufferTag *tag,
 							const RelFileLocator *rlocator)
-- 
2.39.2

