From d8ea06b87cbbb5c68a0fb80e7779981f153ac6e3 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Thu, 27 Jul 2023 15:53:13 +1200
Subject: [PATCH v1 14/14] WIP: Use vector writes in checkpointer.

Experimental hack to teach the checkpointer to write out consecutive
blocks with FlushBuffers().  This would be refactored to use
"streaming write", but the basic idea is shown here.

Author: Thomas Munro <thomas.munro@gmail.com>
---
 src/backend/storage/buffer/bufmgr.c | 167 ++++++++++++++++++++++++----
 src/include/storage/buf_internals.h |   4 +-
 2 files changed, 148 insertions(+), 23 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 61eceb4b99..7de95b04bb 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -495,7 +495,8 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr,
 							   IOContext io_context);
 static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
 static int	FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln,
-						 IOObject io_object, IOContext io_context);
+						 IOObject io_object, IOContext io_context,
+						 int *first_written_index, int *written);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 						IOObject io_object, IOContext io_context);
 static void FindAndDropRelationBuffers(RelFileLocator rlocator,
@@ -1824,7 +1825,7 @@ again:
 		LWLockRelease(content_lock);
 
 		ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
-									  &buf_hdr->tag);
+									  &buf_hdr->tag, 1);
 	}
 
 
@@ -2608,6 +2609,72 @@ UnpinBuffer(BufferDesc *buf)
 #define ST_DEFINE
 #include <lib/sort_template.h>
 
+/*
+ * Flush a range of already pinned buffers that hold consecutive blocks of a
+ * relation fork.  They are not pinned on return.  Returns the number that
+ * were written out (if this is less than nbuffers, it is because another
+ * backend already wrote some out).
+ */
+static int
+SyncBuffers(BufferDesc **bufs, int nbuffers,
+			WritebackContext *wb_context)
+{
+	int			total_written = 0;
+
+	while (nbuffers > 0)
+	{
+		int			nlocked;
+
+		/* Lock first buffer. */
+		LWLockAcquire(BufferDescriptorGetContentLock(bufs[0]), LW_SHARED);
+		nlocked = 1;
+
+		/* Lock as many more as we can without waiting, to avoid deadlocks. */
+		while (nlocked < nbuffers &&
+			   LWLockConditionalAcquire(BufferDescriptorGetContentLock(bufs[nlocked]),
+										LW_SHARED))
+			nlocked++;
+
+		while (nlocked > 0)
+		{
+			int			flushed;
+			int			written;
+			int			first_written_index;
+
+			/*
+			 * Flush as many as we can with a single write, which may be fewer
+			 * than requested if buffers in this range turn out to have been
+			 * flushed already, creating gaps between flushable block ranges.
+			 */
+			flushed = FlushBuffers(bufs, nlocked, NULL, IOOBJECT_RELATION,
+								   IOCONTEXT_NORMAL,
+								   &first_written_index, &written);
+			total_written += written;
+
+			/* Unlock in reverse order (currently more efficient). */
+			for (int i = flushed - 1; i >= 0; --i)
+				LWLockRelease(BufferDescriptorGetContentLock(bufs[i]));
+
+			/* Queue writeback control. */
+			if (written > 0)
+				ScheduleBufferTagForWriteback(wb_context,
+											  IOCONTEXT_NORMAL,
+											  &bufs[first_written_index]->tag,
+											  written);
+
+			/* Unpin. */
+			for (int i = 0; i < flushed; ++i)
+				UnpinBuffer(bufs[i]);
+
+			bufs += flushed;
+			nlocked -= flushed;
+			nbuffers -= flushed;
+		}
+	}
+
+	return total_written;
+}
+
 /*
  * BufferSync -- Write out all dirty buffers in the pool.
  *
@@ -2633,9 +2700,8 @@ BufferSync(int flags)
 	int			i;
 	int			mask = BM_DIRTY;
 	WritebackContext wb_context;
-
-	/* Make sure we can handle the pin inside SyncOneBuffer */
-	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+	BufferDesc *range_buffers[MAX_BUFFERS_PER_TRANSFER];
+	int			range_nblocks = 0;
 
 	/*
 	 * Unless this is a shutdown checkpoint or we have been explicitly told,
@@ -2832,11 +2898,40 @@ BufferSync(int flags)
 		 */
 		if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
 		{
-			if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
+			ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+			ReservePrivateRefCountEntry();
+
+			buf_state = LockBufHdr(bufHdr);
+			if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
+			{
+				/* It's clean, so nothing to do */
+				UnlockBufHdr(bufHdr, buf_state);
+			}
+			else
 			{
-				TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
-				PendingCheckpointerStats.buf_written_checkpoints++;
-				num_written++;
+				PinBuffer_Locked(bufHdr);
+
+				/*
+				 * Check if we need to sync the range of buffers we've
+				 * collected so far, because we've collected enough already or
+				 * because our newly pinned buffer is not consecutive with the
+				 * last one.
+				 */
+				if (range_nblocks == lengthof(range_buffers) ||
+					(range_nblocks > 0 &&
+					 !BufferTagsConsecutive(&range_buffers[range_nblocks - 1]->tag,
+											&bufHdr->tag)))
+				{
+					int			written;
+
+					written = SyncBuffers(range_buffers, range_nblocks, &wb_context);
+					range_nblocks = 0;
+					num_written += written;
+					PendingCheckpointerStats.buf_written_checkpoints += written;
+				}
+
+				/* Collect this pinned buffer in our range. */
+				range_buffers[range_nblocks++] = bufHdr;
 			}
 		}
 
@@ -2867,6 +2962,16 @@ BufferSync(int flags)
 		CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
 	}
 
+	/* Sync our final range. */
+	if (range_nblocks > 0)
+	{
+		int			written;
+
+		written = SyncBuffers(range_buffers, range_nblocks, &wb_context);
+		num_written += written;
+		PendingCheckpointerStats.buf_written_checkpoints += written;
+	}
+
 	/*
 	 * Issue all pending flushes. Only checkpointer calls BufferSync(), so
 	 * IOContext will always be IOCONTEXT_NORMAL.
@@ -3245,6 +3350,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
 	 * buffer is clean by the time we've locked it.)
 	 */
 	PinBuffer_Locked(bufHdr);
+
 	LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
 
 	FlushBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
@@ -3259,7 +3365,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
 	 * SyncOneBuffer() is only called by checkpointer and bgwriter, so
 	 * IOContext will always be IOCONTEXT_NORMAL.
 	 */
-	ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag);
+	ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag, 1);
 
 	return result | BUF_WRITTEN;
 }
@@ -3495,7 +3601,8 @@ BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum,
  */
 static int
 FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln,
-			 IOObject io_object, IOContext io_context)
+			 IOObject io_object, IOContext io_context,
+			 int *first_written_index, int *written)
 {
 	XLogRecPtr	max_recptr;
 	struct shared_buffer_write_error_info errinfo;
@@ -3578,7 +3685,13 @@ FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln,
 
 	/* If we can't write even one buffer, then we're done. */
 	if (first_start_io_index < 0)
+	{
+		if (first_written_index)
+			*first_written_index = 0;
+		if (written)
+			*written = 0;
 		return nbuffers;
+	}
 
 	/* Setup error traceback support for ereport() */
 	errinfo.buf = bufs[first_start_io_index];
@@ -3696,6 +3809,12 @@ FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln,
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
 
+	/* Report the range of buffers that we actually wrote, if any. */
+	if (first_written_index)
+		*first_written_index = first_start_io_index;
+	if (written)
+		*written = nbuffers - first_start_io_index;
+
 	return nbuffers;
 }
 
@@ -3709,7 +3828,7 @@ static void
 FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 			IOContext io_context)
 {
-	FlushBuffers(&buf, 1, reln, io_object, io_context);
+	FlushBuffers(&buf, 1, reln, io_object, io_context, NULL, NULL);
 }
 
 /*
@@ -5734,11 +5853,11 @@ WritebackContextInit(WritebackContext *context, int *max_pending)
 }
 
 /*
- * Add buffer to list of pending writeback requests.
+ * Add buffer tag range to list of pending writeback requests.
  */
 void
 ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context,
-							  BufferTag *tag)
+							  BufferTag *tag, int nblocks)
 {
 	PendingWriteback *pending;
 
@@ -5756,6 +5875,7 @@ ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context
 		pending = &wb_context->pending_writebacks[wb_context->nr_pending++];
 
 		pending->tag = *tag;
+		pending->nblocks = nblocks;
 	}
 
 	/*
@@ -5812,10 +5932,11 @@ IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context)
 		int			ahead;
 		BufferTag	tag;
 		RelFileLocator currlocator;
-		Size		nblocks = 1;
+		Size		nblocks;
 
 		cur = &wb_context->pending_writebacks[i];
 		tag = cur->tag;
+		nblocks = cur->nblocks;
 		currlocator = BufTagGetRelFileLocator(&tag);
 
 		/*
@@ -5824,6 +5945,8 @@ IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context)
 		 */
 		for (ahead = 0; i + ahead + 1 < wb_context->nr_pending; ahead++)
 		{
+			BlockNumber this_end;
+			BlockNumber next_end;
 
 			next = &wb_context->pending_writebacks[i + ahead + 1];
 
@@ -5833,15 +5956,15 @@ IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context)
 				BufTagGetForkNum(&cur->tag) != BufTagGetForkNum(&next->tag))
 				break;
 
-			/* ok, block queued twice, skip */
-			if (cur->tag.blockNum == next->tag.blockNum)
-				continue;
-
-			/* only merge consecutive writes */
-			if (cur->tag.blockNum + 1 != next->tag.blockNum)
+			/* only merge consecutive or overlapping writes */
+			if (next->tag.blockNum > tag.blockNum + nblocks)
 				break;
 
-			nblocks++;
+			/* find the nblocks value that covers the end of both */
+			this_end = tag.blockNum + nblocks;
+			next_end = next->tag.blockNum + next->nblocks;
+			nblocks = Max(this_end, next_end) - tag.blockNum;
+
 			cur = next;
 		}
 
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 6a493d2b96..0b9a3e3fa8 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -300,6 +300,7 @@ typedef struct PendingWriteback
 {
 	/* could store different types of pending flushes here */
 	BufferTag	tag;
+	int			nblocks;
 } PendingWriteback;
 
 /* struct forward declared in bufmgr.h */
@@ -400,7 +401,8 @@ extern PGDLLIMPORT CkptSortItem *CkptBufferIds;
 extern void WritebackContextInit(WritebackContext *context, int *max_pending);
 extern void IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context);
 extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
-										  IOContext io_context, BufferTag *tag);
+										  IOContext io_context, BufferTag *tag,
+										  int nblocks);
 
 /* freelist.c */
 extern IOContext IOContextForStrategy(BufferAccessStrategy strategy);
-- 
2.39.2

