From f7ad1fbd6a37434b67cb50916a5c28255d3a14eb Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 23 Jul 2024 10:01:23 -0700
Subject: [PATCH v2.0 15/17] bufmgr: use AIO in checkpointer, bgwriter

This is far from ready - just included to be able to exercise AIO writes and
get some preliminary numbers.  In all likelihood this will instead be based
ontop of work by Thomas Munro instead of the preceding commit.
---
 src/include/postmaster/bgwriter.h     |   3 +-
 src/include/storage/buf_internals.h   |   1 +
 src/include/storage/bufmgr.h          |   3 +-
 src/include/storage/bufpage.h         |   1 +
 src/backend/postmaster/bgwriter.c     |  25 +-
 src/backend/postmaster/checkpointer.c |  12 +-
 src/backend/storage/buffer/bufmgr.c   | 588 +++++++++++++++++++++++---
 src/backend/storage/page/bufpage.c    |  10 +
 src/tools/pgindent/typedefs.list      |   1 +
 9 files changed, 580 insertions(+), 64 deletions(-)

diff --git a/src/include/postmaster/bgwriter.h b/src/include/postmaster/bgwriter.h
index 407f26e5302..01a936fbc0a 100644
--- a/src/include/postmaster/bgwriter.h
+++ b/src/include/postmaster/bgwriter.h
@@ -31,7 +31,8 @@ extern void BackgroundWriterMain(char *startup_data, size_t startup_data_len) pg
 extern void CheckpointerMain(char *startup_data, size_t startup_data_len) pg_attribute_noreturn();
 
 extern void RequestCheckpoint(int flags);
-extern void CheckpointWriteDelay(int flags, double progress);
+struct IOQueue;
+extern void CheckpointWriteDelay(struct IOQueue *ioq, int flags, double progress);
 
 extern bool ForwardSyncRequest(const FileTag *ftag, SyncRequestType type);
 
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 5cfa7dbd1f1..9d3123663b3 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -21,6 +21,7 @@
 #include "storage/buf.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
+#include "storage/io_queue.h"
 #include "storage/latch.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index ac6496bb1eb..a65888c8915 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -325,7 +325,8 @@ extern bool ConditionalLockBufferForCleanup(Buffer buffer);
 extern bool IsBufferCleanupOK(Buffer buffer);
 extern bool HoldingBufferPinThatDelaysRecovery(void);
 
-extern bool BgBufferSync(struct WritebackContext *wb_context);
+struct IOQueue;
+extern bool BgBufferSync(struct IOQueue *ioq, struct WritebackContext *wb_context);
 
 extern void LimitAdditionalPins(uint32 *additional_pins);
 extern void LimitAdditionalLocalPins(uint32 *additional_pins);
diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index 5999e5ca5a5..f5f5adb066d 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -509,5 +509,6 @@ extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
 									Item newtup, Size newsize);
 extern char *PageSetChecksumCopy(Page page, BlockNumber blkno);
 extern void PageSetChecksumInplace(Page page, BlockNumber blkno);
+extern bool PageNeedsChecksumCopy(Page page);
 
 #endif							/* BUFPAGE_H */
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index 0f75548759a..71c08da45db 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -38,10 +38,12 @@
 #include "postmaster/auxprocess.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/interrupt.h"
+#include "storage/aio.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
 #include "storage/fd.h"
+#include "storage/io_queue.h"
 #include "storage/lwlock.h"
 #include "storage/proc.h"
 #include "storage/procsignal.h"
@@ -89,6 +91,7 @@ BackgroundWriterMain(char *startup_data, size_t startup_data_len)
 	sigjmp_buf	local_sigjmp_buf;
 	MemoryContext bgwriter_context;
 	bool		prev_hibernate;
+	IOQueue    *ioq;
 	WritebackContext wb_context;
 
 	Assert(startup_data_len == 0);
@@ -130,6 +133,7 @@ BackgroundWriterMain(char *startup_data, size_t startup_data_len)
 											 ALLOCSET_DEFAULT_SIZES);
 	MemoryContextSwitchTo(bgwriter_context);
 
+	ioq = io_queue_create(128, 0);
 	WritebackContextInit(&wb_context, &bgwriter_flush_after);
 
 	/*
@@ -167,6 +171,7 @@ BackgroundWriterMain(char *startup_data, size_t startup_data_len)
 		 * about in bgwriter, but we do have LWLocks, buffers, and temp files.
 		 */
 		LWLockReleaseAll();
+		pgaio_at_error();
 		ConditionVariableCancelSleep();
 		UnlockBuffers();
 		ReleaseAuxProcessResources(false);
@@ -226,12 +231,27 @@ BackgroundWriterMain(char *startup_data, size_t startup_data_len)
 		/* Clear any already-pending wakeups */
 		ResetLatch(MyLatch);
 
+		/*
+		 * XXX: Before exiting, wait for all IO to finish. That's only
+		 * important to avoid spurious PrintBufferLeakWarning() /
+		 * PrintAioIPLeakWarning() calls, triggered by
+		 * ReleaseAuxProcessResources() being called with isCommit=true.
+		 *
+		 * FIXME: this is theoretically racy, but I didn't want to copy
+		 * HandleMainLoopInterrupts() remaining body here.
+		 */
+		if (ShutdownRequestPending)
+		{
+			io_queue_wait_all(ioq);
+			io_queue_free(ioq);
+		}
+
 		HandleMainLoopInterrupts();
 
 		/*
 		 * Do one cycle of dirty-buffer writing.
 		 */
-		can_hibernate = BgBufferSync(&wb_context);
+		can_hibernate = BgBufferSync(ioq, &wb_context);
 
 		/* Report pending statistics to the cumulative stats system */
 		pgstat_report_bgwriter();
@@ -248,6 +268,9 @@ BackgroundWriterMain(char *startup_data, size_t startup_data_len)
 			smgrdestroyall();
 		}
 
+		/* finish IO before sleeping, to avoid blocking other backends */
+		io_queue_wait_all(ioq);
+
 		/*
 		 * Log a new xl_running_xacts every now and then so replication can
 		 * get into a consistent state faster (think of suboverflowed
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 199f008bcda..0350a71cab4 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -46,9 +46,11 @@
 #include "postmaster/bgwriter.h"
 #include "postmaster/interrupt.h"
 #include "replication/syncrep.h"
+#include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
 #include "storage/fd.h"
+#include "storage/io_queue.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
 #include "storage/proc.h"
@@ -266,6 +268,7 @@ CheckpointerMain(char *startup_data, size_t startup_data_len)
 		 * files.
 		 */
 		LWLockReleaseAll();
+		pgaio_at_error();
 		ConditionVariableCancelSleep();
 		pgstat_report_wait_end();
 		UnlockBuffers();
@@ -708,7 +711,7 @@ ImmediateCheckpointRequested(void)
  * fraction between 0.0 meaning none, and 1.0 meaning all done.
  */
 void
-CheckpointWriteDelay(int flags, double progress)
+CheckpointWriteDelay(IOQueue *ioq, int flags, double progress)
 {
 	static int	absorb_counter = WRITES_PER_ABSORB;
 
@@ -741,6 +744,13 @@ CheckpointWriteDelay(int flags, double progress)
 		/* Report interim statistics to the cumulative stats system */
 		pgstat_report_checkpointer();
 
+		/*
+		 * Ensure all pending IO is submitted to avoid unnecessary delays for
+		 * other processes.
+		 */
+		io_queue_wait_all(ioq);
+
+
 		/*
 		 * This sleep used to be connected to bgwriter_delay, typically 200ms.
 		 * That resulted in more frequent wakeups if not much work to do.
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 59f4b22457d..e62f2de2034 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -52,6 +52,7 @@
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
+#include "storage/io_queue.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -77,6 +78,7 @@
 /* Bits in SyncOneBuffer's return value */
 #define BUF_WRITTEN				0x01
 #define BUF_REUSABLE			0x02
+#define BUF_CANT_MERGE			0x04
 
 #define RELS_BSEARCH_THRESHOLD		20
 
@@ -538,8 +540,6 @@ static void UnpinBuffer(BufferDesc *buf);
 static void UnpinBufferNoOwner(BufferDesc *buf);
 static void BufferSync(int flags);
 static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
-static int	SyncOneBuffer(int buf_id, bool skip_recently_used,
-						  WritebackContext *wb_context);
 static void WaitIO(BufferDesc *buf);
 static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
 static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
@@ -557,6 +557,7 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr,
 static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 						IOObject io_object, IOContext io_context);
+
 static void FindAndDropRelationBuffers(RelFileLocator rlocator,
 									   ForkNumber forkNum,
 									   BlockNumber nForkBlock,
@@ -2981,6 +2982,56 @@ UnpinBufferNoOwner(BufferDesc *buf)
 	}
 }
 
+typedef struct BuffersToWrite
+{
+	int			nbuffers;
+	BufferTag	start_at_tag;
+	uint32		max_combine;
+
+	XLogRecPtr	max_lsn;
+
+	PgAioHandle *ioh;
+	PgAioHandleRef ior;
+
+	uint64		total_writes;
+
+	Buffer		buffers[IOV_MAX];
+	PgAioBounceBuffer *bounce_buffers[IOV_MAX];
+	const void *data_ptrs[IOV_MAX];
+} BuffersToWrite;
+
+static int	PrepareToWriteBuffer(BuffersToWrite *to_write, Buffer buf,
+								 bool skip_recently_used,
+								 IOQueue *ioq, WritebackContext *wb_context);
+
+static void WriteBuffers(BuffersToWrite *to_write,
+						 IOQueue *ioq, WritebackContext *wb_context);
+
+static void
+BuffersToWriteInit(BuffersToWrite *to_write,
+				   IOQueue *ioq, WritebackContext *wb_context)
+{
+	to_write->total_writes = 0;
+	to_write->nbuffers = 0;
+	to_write->ioh = NULL;
+	pgaio_io_ref_clear(&to_write->ior);
+	to_write->max_lsn = InvalidXLogRecPtr;
+}
+
+static void
+BuffersToWriteEnd(BuffersToWrite *to_write)
+{
+	if (to_write->ioh != NULL)
+	{
+		pgaio_io_release(to_write->ioh);
+		to_write->ioh = NULL;
+	}
+
+	if (to_write->total_writes > 0)
+		pgaio_submit_staged();
+}
+
+
 #define ST_SORT sort_checkpoint_bufferids
 #define ST_ELEMENT_TYPE CkptSortItem
 #define ST_COMPARE(a, b) ckpt_buforder_comparator(a, b)
@@ -3012,7 +3063,10 @@ BufferSync(int flags)
 	binaryheap *ts_heap;
 	int			i;
 	int			mask = BM_DIRTY;
+	IOQueue    *ioq;
 	WritebackContext wb_context;
+	BuffersToWrite to_write;
+	int			max_combine;
 
 	/*
 	 * Unless this is a shutdown checkpoint or we have been explicitly told,
@@ -3074,7 +3128,9 @@ BufferSync(int flags)
 	if (num_to_scan == 0)
 		return;					/* nothing to do */
 
+	ioq = io_queue_create(512, 0);
 	WritebackContextInit(&wb_context, &checkpoint_flush_after);
+	max_combine = Min(io_bounce_buffers, io_combine_limit);
 
 	TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan);
 
@@ -3182,48 +3238,89 @@ BufferSync(int flags)
 	 */
 	num_processed = 0;
 	num_written = 0;
+
+	BuffersToWriteInit(&to_write, ioq, &wb_context);
+
 	while (!binaryheap_empty(ts_heap))
 	{
 		BufferDesc *bufHdr = NULL;
 		CkptTsStatus *ts_stat = (CkptTsStatus *)
 			DatumGetPointer(binaryheap_first(ts_heap));
 
-		buf_id = CkptBufferIds[ts_stat->index].buf_id;
-		Assert(buf_id != -1);
-
-		bufHdr = GetBufferDescriptor(buf_id);
-
-		num_processed++;
+		Assert(ts_stat->num_scanned <= ts_stat->num_to_scan);
 
 		/*
-		 * We don't need to acquire the lock here, because we're only looking
-		 * at a single bit. It's possible that someone else writes the buffer
-		 * and clears the flag right after we check, but that doesn't matter
-		 * since SyncOneBuffer will then do nothing.  However, there is a
-		 * further race condition: it's conceivable that between the time we
-		 * examine the bit here and the time SyncOneBuffer acquires the lock,
-		 * someone else not only wrote the buffer but replaced it with another
-		 * page and dirtied it.  In that improbable case, SyncOneBuffer will
-		 * write the buffer though we didn't need to.  It doesn't seem worth
-		 * guarding against this, though.
+		 * Collect a batch of buffers to write out from the current
+		 * tablespace. That causes some imbalance between the tablespaces, but
+		 * that's more than outweighed by the efficiency gain due to batching.
 		 */
-		if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
+		while (to_write.nbuffers < max_combine &&
+			   ts_stat->num_scanned < ts_stat->num_to_scan)
 		{
-			if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
+			buf_id = CkptBufferIds[ts_stat->index].buf_id;
+			Assert(buf_id != -1);
+
+			bufHdr = GetBufferDescriptor(buf_id);
+
+			num_processed++;
+
+			/*
+			 * We don't need to acquire the lock here, because we're only
+			 * looking at a single bit. It's possible that someone else writes
+			 * the buffer and clears the flag right after we check, but that
+			 * doesn't matter since SyncOneBuffer will then do nothing.
+			 * However, there is a further race condition: it's conceivable
+			 * that between the time we examine the bit here and the time
+			 * SyncOneBuffer acquires the lock, someone else not only wrote
+			 * the buffer but replaced it with another page and dirtied it. In
+			 * that improbable case, SyncOneBuffer will write the buffer
+			 * though we didn't need to.  It doesn't seem worth guarding
+			 * against this, though.
+			 */
+			if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
 			{
-				TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
-				PendingCheckpointerStats.buffers_written++;
-				num_written++;
+				int			result = PrepareToWriteBuffer(&to_write, buf_id + 1, false,
+														  ioq, &wb_context);
+
+				if (result & BUF_CANT_MERGE)
+				{
+					Assert(to_write.nbuffers > 0);
+					WriteBuffers(&to_write, ioq, &wb_context);
+
+					result = PrepareToWriteBuffer(&to_write, buf_id + 1, false,
+												  ioq, &wb_context);
+					Assert(result != BUF_CANT_MERGE);
+				}
+
+				if (result & BUF_WRITTEN)
+				{
+					TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
+					PendingCheckpointerStats.buffers_written++;
+					num_written++;
+				}
+				else
+				{
+					break;
+				}
 			}
+			else
+			{
+				if (to_write.nbuffers > 0)
+					WriteBuffers(&to_write, ioq, &wb_context);
+			}
+
+			/*
+			 * Measure progress independent of actually having to flush the
+			 * buffer - otherwise writing become unbalanced.
+			 */
+			ts_stat->progress += ts_stat->progress_slice;
+			ts_stat->num_scanned++;
+			ts_stat->index++;
 		}
 
-		/*
-		 * Measure progress independent of actually having to flush the buffer
-		 * - otherwise writing become unbalanced.
-		 */
-		ts_stat->progress += ts_stat->progress_slice;
-		ts_stat->num_scanned++;
-		ts_stat->index++;
+		if (to_write.nbuffers > 0)
+			WriteBuffers(&to_write, ioq, &wb_context);
+
 
 		/* Have all the buffers from the tablespace been processed? */
 		if (ts_stat->num_scanned == ts_stat->num_to_scan)
@@ -3241,15 +3338,23 @@ BufferSync(int flags)
 		 *
 		 * (This will check for barrier events even if it doesn't sleep.)
 		 */
-		CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
+		CheckpointWriteDelay(ioq, flags, (double) num_processed / num_to_scan);
 	}
 
+	Assert(to_write.nbuffers == 0);
+	io_queue_wait_all(ioq);
+
 	/*
 	 * Issue all pending flushes. Only checkpointer calls BufferSync(), so
 	 * IOContext will always be IOCONTEXT_NORMAL.
 	 */
 	IssuePendingWritebacks(&wb_context, IOCONTEXT_NORMAL);
 
+	io_queue_wait_all(ioq);		/* IssuePendingWritebacks might have added
+								 * more */
+	io_queue_free(ioq);
+	BuffersToWriteEnd(&to_write);
+
 	pfree(per_ts_stat);
 	per_ts_stat = NULL;
 	binaryheap_free(ts_heap);
@@ -3275,7 +3380,7 @@ BufferSync(int flags)
  * bgwriter_lru_maxpages to 0.)
  */
 bool
-BgBufferSync(WritebackContext *wb_context)
+BgBufferSync(IOQueue *ioq, WritebackContext *wb_context)
 {
 	/* info obtained from freelist.c */
 	int			strategy_buf_id;
@@ -3318,6 +3423,8 @@ BgBufferSync(WritebackContext *wb_context)
 	long		new_strategy_delta;
 	uint32		new_recent_alloc;
 
+	BuffersToWrite to_write;
+
 	/*
 	 * Find out where the freelist clock sweep currently is, and how many
 	 * buffer allocations have happened since our last call.
@@ -3494,11 +3601,25 @@ BgBufferSync(WritebackContext *wb_context)
 	num_written = 0;
 	reusable_buffers = reusable_buffers_est;
 
+	BuffersToWriteInit(&to_write, ioq, wb_context);
+
 	/* Execute the LRU scan */
 	while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
 	{
-		int			sync_state = SyncOneBuffer(next_to_clean, true,
-											   wb_context);
+		int			sync_state;
+
+		sync_state = PrepareToWriteBuffer(&to_write, next_to_clean + 1,
+										  true, ioq, wb_context);
+		if (sync_state & BUF_CANT_MERGE)
+		{
+			Assert(to_write.nbuffers > 0);
+
+			WriteBuffers(&to_write, ioq, wb_context);
+
+			sync_state = PrepareToWriteBuffer(&to_write, next_to_clean + 1,
+											  true, ioq, wb_context);
+			Assert(sync_state != BUF_CANT_MERGE);
+		}
 
 		if (++next_to_clean >= NBuffers)
 		{
@@ -3509,6 +3630,13 @@ BgBufferSync(WritebackContext *wb_context)
 
 		if (sync_state & BUF_WRITTEN)
 		{
+			Assert(sync_state & BUF_REUSABLE);
+
+			if (to_write.nbuffers == io_combine_limit)
+			{
+				WriteBuffers(&to_write, ioq, wb_context);
+			}
+
 			reusable_buffers++;
 			if (++num_written >= bgwriter_lru_maxpages)
 			{
@@ -3520,6 +3648,11 @@ BgBufferSync(WritebackContext *wb_context)
 			reusable_buffers++;
 	}
 
+	if (to_write.nbuffers > 0)
+		WriteBuffers(&to_write, ioq, wb_context);
+
+	BuffersToWriteEnd(&to_write);
+
 	PendingBgWriterStats.buf_written_clean += num_written;
 
 #ifdef BGW_DEBUG
@@ -3558,8 +3691,66 @@ BgBufferSync(WritebackContext *wb_context)
 	return (bufs_to_lap == 0 && recent_alloc == 0);
 }
 
+static inline bool
+BufferTagsSameRel(const BufferTag *tag1, const BufferTag *tag2)
+{
+	return (tag1->spcOid == tag2->spcOid) &&
+		(tag1->dbOid == tag2->dbOid) &&
+		(tag1->relNumber == tag2->relNumber) &&
+		(tag1->forkNum == tag2->forkNum)
+		;
+}
+
+static bool
+CanMergeWrite(BuffersToWrite *to_write, BufferDesc *cur_buf_hdr)
+{
+	BlockNumber cur_block = cur_buf_hdr->tag.blockNum;
+
+	Assert(to_write->nbuffers > 0); /* can't merge with nothing */
+	Assert(to_write->start_at_tag.relNumber != InvalidOid);
+	Assert(to_write->start_at_tag.blockNum != InvalidBlockNumber);
+
+	Assert(to_write->ioh != NULL);
+
+	/*
+	 * First check if the blocknumber is one that we could actually merge,
+	 * that's cheaper than checking the tablespace/db/relnumber/fork match.
+	 */
+	if (to_write->start_at_tag.blockNum + to_write->nbuffers != cur_block)
+		return false;
+
+	if (!BufferTagsSameRel(&to_write->start_at_tag, &cur_buf_hdr->tag))
+		return false;
+
+	/*
+	 * Need to check with smgr how large a write we're allowed to make. To
+	 * reduce the overhead of the smgr check, only inquire once, when
+	 * processing the first to-be-merged buffer. That avoids the overhead in
+	 * the common case of writing out buffers that definitely not mergeable.
+	 */
+	if (to_write->nbuffers == 1)
+	{
+		SMgrRelation smgr;
+
+		smgr = smgropen(BufTagGetRelFileLocator(&to_write->start_at_tag), INVALID_PROC_NUMBER);
+
+		to_write->max_combine = smgrmaxcombine(smgr,
+											   to_write->start_at_tag.forkNum,
+											   to_write->start_at_tag.blockNum);
+	}
+	else
+	{
+		Assert(to_write->max_combine > 0);
+	}
+
+	if (to_write->start_at_tag.blockNum + to_write->max_combine <= cur_block)
+		return false;
+
+	return true;
+}
+
 /*
- * SyncOneBuffer -- process a single buffer during syncing.
+ * PrepareToWriteBuffer -- process a single buffer during syncing.
  *
  * If skip_recently_used is true, we don't write currently-pinned buffers, nor
  * buffers marked recently used, as these are not replacement candidates.
@@ -3568,22 +3759,56 @@ BgBufferSync(WritebackContext *wb_context)
  *	BUF_WRITTEN: we wrote the buffer.
  *	BUF_REUSABLE: buffer is available for replacement, ie, it has
  *		pin count 0 and usage count 0.
+ *	BUF_CANT_MERGE: can't combine this write with prior writes, caller needs
+ *		to issue those first
  *
  * (BUF_WRITTEN could be set in error if FlushBuffer finds the buffer clean
  * after locking it, but we don't care all that much.)
  */
 static int
-SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
+PrepareToWriteBuffer(BuffersToWrite *to_write, Buffer buf,
+					 bool skip_recently_used,
+					 IOQueue *ioq, WritebackContext *wb_context)
 {
-	BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
-	int			result = 0;
+	BufferDesc *cur_buf_hdr = GetBufferDescriptor(buf - 1);
 	uint32		buf_state;
-	BufferTag	tag;
+	int			result = 0;
+	XLogRecPtr	cur_buf_lsn;
+	LWLock	   *content_lock;
+	bool		may_block;
+
+	/*
+	 * Check if this buffer can be written out together with already prepared
+	 * writes. We check before we have pinned the buffer, so the buffer can be
+	 * written out and replaced between this check and us pinning the buffer -
+	 * we'll recheck below. The reason for the pre-check is that we don't want
+	 * to pin the buffer just to find out that we can't merge the IO.
+	 */
+	if (to_write->nbuffers != 0)
+	{
+		if (!CanMergeWrite(to_write, cur_buf_hdr))
+		{
+			result |= BUF_CANT_MERGE;
+			return result;
+		}
+	}
+	else
+	{
+		if (to_write->ioh == NULL)
+		{
+			to_write->ioh = io_queue_get_io(ioq);
+			pgaio_io_get_ref(to_write->ioh, &to_write->ior);
+		}
+
+		to_write->start_at_tag = cur_buf_hdr->tag;
+	}
 
 	/* Make sure we can handle the pin */
 	ReservePrivateRefCountEntry();
 	ResourceOwnerEnlarge(CurrentResourceOwner);
 
+	/* XXX: Should also check if we are allowed to pin one more buffer */
+
 	/*
 	 * Check whether buffer needs writing.
 	 *
@@ -3593,7 +3818,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
 	 * don't worry because our checkpoint.redo points before log record for
 	 * upcoming changes and so we are not required to write such dirty buffer.
 	 */
-	buf_state = LockBufHdr(bufHdr);
+	buf_state = LockBufHdr(cur_buf_hdr);
 
 	if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 &&
 		BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
@@ -3602,40 +3827,282 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
 	}
 	else if (skip_recently_used)
 	{
+#if 0
+		elog(LOG, "at block %d: skip recent with nbuffers %d",
+			 cur_buf_hdr->tag.blockNum, to_write->nbuffers);
+#endif
 		/* Caller told us not to write recently-used buffers */
-		UnlockBufHdr(bufHdr, buf_state);
+		UnlockBufHdr(cur_buf_hdr, buf_state);
 		return result;
 	}
 
 	if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
 	{
 		/* It's clean, so nothing to do */
-		UnlockBufHdr(bufHdr, buf_state);
+		UnlockBufHdr(cur_buf_hdr, buf_state);
 		return result;
 	}
 
-	/*
-	 * Pin it, share-lock it, write it.  (FlushBuffer will do nothing if the
-	 * buffer is clean by the time we've locked it.)
-	 */
-	PinBuffer_Locked(bufHdr);
-	LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
-
-	FlushBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
-
-	LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
-
-	tag = bufHdr->tag;
-
-	UnpinBuffer(bufHdr);
+	/* pin the buffer, from now on its identity can't change anymore */
+	PinBuffer_Locked(cur_buf_hdr);
 
 	/*
-	 * SyncOneBuffer() is only called by checkpointer and bgwriter, so
-	 * IOContext will always be IOCONTEXT_NORMAL.
+	 * If we are merging, check if the buffer's identity possibly changed
+	 * while we hadn't yet pinned it.
+	 *
+	 * XXX: It might be worth checking if we still want to write the buffer
+	 * out, e.g. it could have been replaced with a buffer that doesn't have
+	 * BM_CHECKPOINT_NEEDED set.
 	 */
-	ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag);
+	if (to_write->nbuffers != 0)
+	{
+		if (!CanMergeWrite(to_write, cur_buf_hdr))
+		{
+			elog(LOG, "changed identity");
+			UnpinBuffer(cur_buf_hdr);
 
-	return result | BUF_WRITTEN;
+			result |= BUF_CANT_MERGE;
+
+			return result;
+		}
+	}
+
+	may_block = to_write->nbuffers == 0
+		&& !pgaio_have_staged()
+		&& io_queue_is_empty(ioq)
+		;
+	content_lock = BufferDescriptorGetContentLock(cur_buf_hdr);
+
+	if (!may_block)
+	{
+		if (LWLockConditionalAcquire(content_lock, LW_SHARED))
+		{
+			/* done */
+		}
+		else if (to_write->nbuffers == 0)
+		{
+			/*
+			 * Need to wait for all prior IO to finish before blocking for
+			 * lock acquisition, to avoid the risk a deadlock due to us
+			 * waiting for another backend that is waiting for our unsubmitted
+			 * IO to complete.
+			 */
+			pgaio_submit_staged();
+			io_queue_wait_all(ioq);
+
+			elog(DEBUG2, "at block %u: can't block, nbuffers = 0",
+				 cur_buf_hdr->tag.blockNum
+				);
+
+			may_block = to_write->nbuffers == 0
+				&& !pgaio_have_staged()
+				&& io_queue_is_empty(ioq)
+				;
+			Assert(may_block);
+
+			LWLockAcquire(content_lock, LW_SHARED);
+		}
+		else
+		{
+			elog(DEBUG2, "at block %d: can't block nbuffers = %d",
+				 cur_buf_hdr->tag.blockNum,
+				 to_write->nbuffers);
+
+			UnpinBuffer(cur_buf_hdr);
+			result |= BUF_CANT_MERGE;
+			Assert(to_write->nbuffers > 0);
+
+			return result;
+		}
+	}
+	else
+	{
+		LWLockAcquire(content_lock, LW_SHARED);
+	}
+
+	if (!may_block)
+	{
+		if (!StartBufferIO(cur_buf_hdr, false, !may_block))
+		{
+			pgaio_submit_staged();
+			io_queue_wait_all(ioq);
+
+			may_block = io_queue_is_empty(ioq) && to_write->nbuffers == 0 && !pgaio_have_staged();
+
+			if (!StartBufferIO(cur_buf_hdr, false, !may_block))
+			{
+				elog(DEBUG2, "at block %d: non-waitable StartBufferIO returns false, %d",
+					 cur_buf_hdr->tag.blockNum,
+					 may_block);
+
+				/*
+				 * FIXME: can't tell whether this is because the buffer has
+				 * been cleaned
+				 */
+				if (!may_block)
+				{
+					result |= BUF_CANT_MERGE;
+					Assert(to_write->nbuffers > 0);
+				}
+				LWLockRelease(content_lock);
+				UnpinBuffer(cur_buf_hdr);
+
+				return result;
+			}
+		}
+	}
+	else
+	{
+		if (!StartBufferIO(cur_buf_hdr, false, false))
+		{
+			elog(DEBUG2, "waitable StartBufferIO returns false");
+			LWLockRelease(content_lock);
+			UnpinBuffer(cur_buf_hdr);
+
+			/*
+			 * FIXME: Historically we returned BUF_WRITTEN in this case, which
+			 * seems wrong
+			 */
+			return result;
+		}
+	}
+
+	/*
+	 * Run PageGetLSN while holding header lock, since we don't have the
+	 * buffer locked exclusively in all cases.
+	 */
+	buf_state = LockBufHdr(cur_buf_hdr);
+
+	cur_buf_lsn = BufferGetLSN(cur_buf_hdr);
+
+	/* To check if block content changes while flushing. - vadim 01/17/97 */
+	buf_state &= ~BM_JUST_DIRTIED;
+
+	UnlockBufHdr(cur_buf_hdr, buf_state);
+
+	to_write->buffers[to_write->nbuffers] = buf;
+	to_write->nbuffers++;
+
+	if (buf_state & BM_PERMANENT &&
+		(to_write->max_lsn == InvalidXLogRecPtr || to_write->max_lsn < cur_buf_lsn))
+	{
+		to_write->max_lsn = cur_buf_lsn;
+	}
+
+	result |= BUF_WRITTEN;
+
+	return result;
+}
+
+static void
+WriteBuffers(BuffersToWrite *to_write,
+			 IOQueue *ioq, WritebackContext *wb_context)
+{
+	SMgrRelation smgr;
+	Buffer		first_buf;
+	BufferDesc *first_buf_hdr;
+	bool		needs_checksum;
+
+	Assert(to_write->nbuffers > 0 && to_write->nbuffers <= io_combine_limit);
+
+	first_buf = to_write->buffers[0];
+	first_buf_hdr = GetBufferDescriptor(first_buf - 1);
+
+	smgr = smgropen(BufTagGetRelFileLocator(&first_buf_hdr->tag), INVALID_PROC_NUMBER);
+
+	/*
+	 * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
+	 * rule that log updates must hit disk before any of the data-file changes
+	 * they describe do.
+	 *
+	 * However, this rule does not apply to unlogged relations, which will be
+	 * lost after a crash anyway.  Most unlogged relation pages do not bear
+	 * LSNs since we never emit WAL records for them, and therefore flushing
+	 * up through the buffer LSN would be useless, but harmless.  However,
+	 * GiST indexes use LSNs internally to track page-splits, and therefore
+	 * unlogged GiST pages bear "fake" LSNs generated by
+	 * GetFakeLSNForUnloggedRel.  It is unlikely but possible that the fake
+	 * LSN counter could advance past the WAL insertion point; and if it did
+	 * happen, attempting to flush WAL through that location would fail, with
+	 * disastrous system-wide consequences.  To make sure that can't happen,
+	 * skip the flush if the buffer isn't permanent.
+	 */
+	if (to_write->max_lsn != InvalidXLogRecPtr)
+		XLogFlush(to_write->max_lsn);
+
+	/*
+	 * Now it's safe to write buffer to disk. Note that no one else should
+	 * have been able to write it while we were busy with log flushing because
+	 * only one process at a time can set the BM_IO_IN_PROGRESS bit.
+	 */
+
+	for (int nbuf = 0; nbuf < to_write->nbuffers; nbuf++)
+	{
+		Buffer		cur_buf = to_write->buffers[nbuf];
+		BufferDesc *cur_buf_hdr = GetBufferDescriptor(cur_buf - 1);
+		Block		bufBlock;
+		char	   *bufToWrite;
+
+		bufBlock = BufHdrGetBlock(cur_buf_hdr);
+		needs_checksum = PageNeedsChecksumCopy((Page) bufBlock);
+
+		/*
+		 * Update page checksum if desired.  Since we have only shared lock on
+		 * the buffer, other processes might be updating hint bits in it, so
+		 * we must copy the page to a bounce buffer if we do checksumming.
+		 */
+		if (needs_checksum)
+		{
+			PgAioBounceBuffer *bb = pgaio_bounce_buffer_get();
+
+			pgaio_io_assoc_bounce_buffer(to_write->ioh, bb);
+
+			bufToWrite = pgaio_bounce_buffer_buffer(bb);
+			memcpy(bufToWrite, bufBlock, BLCKSZ);
+			PageSetChecksumInplace((Page) bufToWrite, cur_buf_hdr->tag.blockNum);
+		}
+		else
+		{
+			bufToWrite = bufBlock;
+		}
+
+		to_write->data_ptrs[nbuf] = bufToWrite;
+	}
+
+	pgaio_io_set_io_data_32(to_write->ioh,
+							(uint32 *) to_write->buffers,
+							to_write->nbuffers);
+	pgaio_io_add_shared_cb(to_write->ioh, ASC_SHARED_BUFFER_WRITE);
+
+	smgrstartwritev(to_write->ioh, smgr,
+					BufTagGetForkNum(&first_buf_hdr->tag),
+					first_buf_hdr->tag.blockNum,
+					to_write->data_ptrs,
+					to_write->nbuffers,
+					false);
+	pgstat_count_io_op_n(IOOBJECT_RELATION, IOCONTEXT_NORMAL,
+						 IOOP_WRITE, to_write->nbuffers);
+
+
+	for (int nbuf = 0; nbuf < to_write->nbuffers; nbuf++)
+	{
+		Buffer		cur_buf = to_write->buffers[nbuf];
+		BufferDesc *cur_buf_hdr = GetBufferDescriptor(cur_buf - 1);
+
+		UnpinBuffer(cur_buf_hdr);
+	}
+
+	io_queue_track(ioq, &to_write->ior);
+	to_write->total_writes++;
+
+	/* clear state for next write */
+	to_write->nbuffers = 0;
+	to_write->start_at_tag.relNumber = InvalidOid;
+	to_write->start_at_tag.blockNum = InvalidBlockNumber;
+	to_write->max_combine = 0;
+	to_write->max_lsn = InvalidXLogRecPtr;
+	to_write->ioh = NULL;
+	pgaio_io_ref_clear(&to_write->ior);
 }
 
 /*
@@ -4001,6 +4468,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	error_context_stack = errcallback.previous;
 }
 
+
 /*
  * RelationGetNumberOfBlocksInFork
  *		Determines the current number of pages in the specified relation fork.
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index be6f1f62d29..8295e3fb0a0 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -1491,6 +1491,16 @@ PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
 	return true;
 }
 
+bool
+PageNeedsChecksumCopy(Page page)
+{
+	if (PageIsNew(page))
+		return false;
+
+	/* If we don't need a checksum, just return the passed-in data */
+	return DataChecksumsEnabled();
+}
+
 
 /*
  * Set checksum for a page in shared buffers.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 6f39abcdf3c..ca6dd0bebf0 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -344,6 +344,7 @@ BufferManagerRelation
 BufferStrategyControl
 BufferTag
 BufferUsage
+BuffersToWrite
 BuildAccumulator
 BuiltinScript
 BulkInsertState
-- 
2.45.2.827.g557ae147e6

