From f89b04963a80f66d9f9c4b8153a8afbff33b026e Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 22 Jul 2023 17:31:54 +1200
Subject: [PATCH v1 03/14] Provide vectored variant of ReadBuffer().

PrepareReadBuffer() followed by CompleteReadBuffers() or ZeroBuffer() is
equivalent to ReadBuffer(), except that the physical read for multiple
adjacent blocks can be merged into one vectored system call.

ReadBuffer() is now implemented in terms of these functions.

Author: Thomas Munro <thomas.munro@gmail.com>
---
 src/backend/storage/buffer/bufmgr.c   | 541 +++++++++++++++++---------
 src/backend/storage/buffer/localbuf.c |   4 +-
 src/include/storage/buf_internals.h   |   3 +-
 src/include/storage/bufmgr.h          |  20 +
 4 files changed, 383 insertions(+), 185 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 3bd82dbfca..a009b9da53 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -447,7 +447,7 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
 )
 
 
-static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence,
+static Buffer ReadBuffer_common(BufferManagerRelation bmr,
 								ForkNumber forkNum, BlockNumber blockNum,
 								ReadBufferMode mode, BufferAccessStrategy strategy,
 								bool *hit);
@@ -485,7 +485,8 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr,
 							   ForkNumber forkNum,
 							   BlockNumber blockNum,
 							   BufferAccessStrategy strategy,
-							   bool *foundPtr, IOContext io_context);
+							   bool *foundPtr, bool *allocPtr,
+							   IOContext io_context);
 static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 						IOObject io_object, IOContext io_context);
@@ -773,7 +774,7 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
 	 * miss.
 	 */
 	pgstat_count_buffer_read(reln);
-	buf = ReadBuffer_common(RelationGetSmgr(reln), reln->rd_rel->relpersistence,
+	buf = ReadBuffer_common(BMR_REL(reln),
 							forkNum, blockNum, mode, strategy, &hit);
 	if (hit)
 		pgstat_count_buffer_hit(reln);
@@ -800,8 +801,9 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum,
 
 	SMgrRelation smgr = smgropen(rlocator, InvalidBackendId);
 
-	return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT :
-							 RELPERSISTENCE_UNLOGGED, forkNum, blockNum,
+	return ReadBuffer_common(BMR_SMGR(smgr, permanent ? RELPERSISTENCE_PERMANENT :
+									  RELPERSISTENCE_UNLOGGED),
+							 forkNum, blockNum,
 							 mode, strategy, &hit);
 }
 
@@ -975,7 +977,7 @@ ExtendBufferedRelTo(BufferManagerRelation bmr,
 		bool		hit;
 
 		Assert(extended_by == 0);
-		buffer = ReadBuffer_common(bmr.smgr, bmr.relpersistence,
+		buffer = ReadBuffer_common(bmr,
 								   fork, extend_to - 1, mode, strategy,
 								   &hit);
 	}
@@ -989,18 +991,18 @@ ExtendBufferedRelTo(BufferManagerRelation bmr,
  * *hit is set to true if the request was satisfied from shared buffer cache.
  */
 static Buffer
-ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
+ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum,
 				  BlockNumber blockNum, ReadBufferMode mode,
 				  BufferAccessStrategy strategy, bool *hit)
 {
-	BufferDesc *bufHdr;
-	Block		bufBlock;
-	bool		found;
-	IOContext	io_context;
-	IOObject	io_object;
-	bool		isLocalBuf = SmgrIsTemp(smgr);
+	Buffer		buffer;
+	bool		allocated;
 
-	*hit = false;
+	Assert(mode == RBM_ZERO_AND_LOCK ||
+		   mode == RBM_ZERO_AND_CLEANUP_LOCK ||
+		   mode == RBM_NORMAL ||
+		   mode == RBM_NORMAL_NO_LOG ||
+		   mode == RBM_ZERO_ON_ERROR);
 
 	/*
 	 * Backward compatibility path, most code should use ExtendBufferedRel()
@@ -1019,178 +1021,326 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
 			flags |= EB_LOCK_FIRST;
 
-		return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence),
-								 forkNum, strategy, flags);
+		*hit = false;
+
+		return ExtendBufferedRel(bmr, forkNum, strategy, flags);
 	}
 
-	/* Make sure we will have room to remember the buffer pin */
-	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+	buffer = PrepareReadBuffer(bmr,
+							   forkNum,
+							   blockNum,
+							   strategy,
+							   hit,
+							   &allocated);
 
-	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
-									   smgr->smgr_rlocator.locator.spcOid,
-									   smgr->smgr_rlocator.locator.dbOid,
-									   smgr->smgr_rlocator.locator.relNumber,
-									   smgr->smgr_rlocator.backend);
+	/* At this point we do NOT hold any locks. */
+
+	if (mode == RBM_ZERO_AND_CLEANUP_LOCK || mode == RBM_ZERO_AND_LOCK)
+	{
+		/* if we just want zeroes and a lock, we're done */
+		ZeroBuffer(buffer, mode);
+	}
+	else if (!*hit)
+	{
+		/* we might need to perform I/O */
+		CompleteReadBuffers(bmr,
+							&buffer,
+							forkNum,
+							blockNum,
+							1,
+							mode == RBM_ZERO_ON_ERROR,
+							strategy);
+	}
+
+	return buffer;
+}
 
+/*
+ * Prepare to read a block.  The buffer is pinned.  If this is a 'hit', then
+ * the returned buffer can be used immediately.  Otherwise, a physical read
+ * should be completed with CompleteReadBuffers().  PrepareReadBuffer()
+ * followed by CompleteReadBuffers() is equivalent ot ReadBuffer(), but the
+ * caller has the opportunity to coalesce reads of neighboring blocks into one
+ * CompleteReadBuffers() call.
+ *
+ * *found is set to true for a hit, and false for a miss.
+ *
+ * *allocated is set to true for a miss that allocates a buffer for the first
+ * time.  If there are multiple calls to PrepareReadBuffer() for the same
+ * block before CompleteReadBuffers() or ReadBuffer_common() finishes the
+ * read, then only the first such call will receive *allocated == true, which
+ * the caller might use to issue just one prefetch hint.
+ */
+Buffer
+PrepareReadBuffer(BufferManagerRelation bmr,
+				  ForkNumber forkNum,
+				  BlockNumber blockNum,
+				  BufferAccessStrategy strategy,
+				  bool *found,
+				  bool *allocated)
+{
+	BufferDesc *bufHdr;
+	bool		isLocalBuf;
+	IOContext	io_context;
+	IOObject	io_object;
+
+	Assert(blockNum != P_NEW);
+
+	if (bmr.rel)
+	{
+		bmr.smgr = RelationGetSmgr(bmr.rel);
+		bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
+	}
+
+	isLocalBuf = SmgrIsTemp(bmr.smgr);
 	if (isLocalBuf)
 	{
-		/*
-		 * We do not use a BufferAccessStrategy for I/O of temporary tables.
-		 * However, in some cases, the "strategy" may not be NULL, so we can't
-		 * rely on IOContextForStrategy() to set the right IOContext for us.
-		 * This may happen in cases like CREATE TEMPORARY TABLE AS...
-		 */
 		io_context = IOCONTEXT_NORMAL;
 		io_object = IOOBJECT_TEMP_RELATION;
-		bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
-		if (found)
-			pgBufferUsage.local_blks_hit++;
-		else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
-				 mode == RBM_ZERO_ON_ERROR)
-			pgBufferUsage.local_blks_read++;
 	}
 	else
 	{
-		/*
-		 * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
-		 * not currently in memory.
-		 */
 		io_context = IOContextForStrategy(strategy);
 		io_object = IOOBJECT_RELATION;
-		bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
-							 strategy, &found, io_context);
-		if (found)
-			pgBufferUsage.shared_blks_hit++;
-		else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
-				 mode == RBM_ZERO_ON_ERROR)
-			pgBufferUsage.shared_blks_read++;
 	}
 
-	/* At this point we do NOT hold any locks. */
+	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
+									   bmr.smgr->smgr_rlocator.locator.spcOid,
+									   bmr.smgr->smgr_rlocator.locator.dbOid,
+									   bmr.smgr->smgr_rlocator.locator.relNumber,
+									   bmr.smgr->smgr_rlocator.backend);
 
-	/* if it was already in the buffer pool, we're done */
-	if (found)
+	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+	if (isLocalBuf)
+	{
+		bufHdr = LocalBufferAlloc(bmr.smgr, forkNum, blockNum, found, allocated);
+		if (*found)
+			pgBufferUsage.local_blks_hit++;
+		else
+			pgBufferUsage.local_blks_read++;
+	}
+	else
+	{
+		bufHdr = BufferAlloc(bmr.smgr, bmr.relpersistence, forkNum, blockNum,
+							 strategy, found, allocated, io_context);
+		if (*found)
+			pgBufferUsage.shared_blks_hit++;
+		else
+			pgBufferUsage.shared_blks_read++;
+	}
+	if (bmr.rel)
+	{
+		pgstat_count_buffer_read(bmr.rel);
+		if (*found)
+			pgstat_count_buffer_hit(bmr.rel);
+	}
+	if (*found)
 	{
-		/* Just need to update stats before we exit */
-		*hit = true;
 		VacuumPageHit++;
 		pgstat_count_io_op(io_object, io_context, IOOP_HIT);
-
 		if (VacuumCostActive)
 			VacuumCostBalance += VacuumCostPageHit;
 
 		TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-										  smgr->smgr_rlocator.locator.spcOid,
-										  smgr->smgr_rlocator.locator.dbOid,
-										  smgr->smgr_rlocator.locator.relNumber,
-										  smgr->smgr_rlocator.backend,
-										  found);
+										  bmr.smgr->smgr_rlocator.locator.spcOid,
+										  bmr.smgr->smgr_rlocator.locator.dbOid,
+										  bmr.smgr->smgr_rlocator.locator.relNumber,
+										  bmr.smgr->smgr_rlocator.backend,
+										  true);
+	}
 
-		/*
-		 * In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked
-		 * on return.
-		 */
-		if (!isLocalBuf)
-		{
-			if (mode == RBM_ZERO_AND_LOCK)
-				LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
-							  LW_EXCLUSIVE);
-			else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
-				LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
-		}
+	return BufferDescriptorGetBuffer(bufHdr);
+}
+
+static inline bool
+CompleteReadBuffersCanStartIO(Buffer buffer)
+{
+	if (BufferIsLocal(buffer))
+	{
+		BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);
 
-		return BufferDescriptorGetBuffer(bufHdr);
+		return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
 	}
+	else
+		return StartBufferIO(GetBufferDescriptor(buffer - 1), true);
+}
 
-	/*
-	 * if we have gotten to this point, we have allocated a buffer for the
-	 * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it,
-	 * if it's a shared buffer.
-	 */
-	Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID));	/* spinlock not needed */
+/*
+ * Complete a set reads prepared with PrepareReadBuffers().  The buffers must
+ * cover a cluster of neighboring block numbers.
+ *
+ * Typically this performs one physical vector read covering the block range,
+ * but if some of the buffers have already been read in the meantime by any
+ * backend, zero or multiple reads may be performed.
+ */
+void
+CompleteReadBuffers(BufferManagerRelation bmr,
+					Buffer *buffers,
+					ForkNumber forknum,
+					BlockNumber blocknum,
+					int nblocks,
+					bool zero_on_error,
+					BufferAccessStrategy strategy)
+{
+	bool		isLocalBuf;
+	IOContext	io_context;
+	IOObject	io_object;
 
-	bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
+	if (bmr.rel)
+	{
+		bmr.smgr = RelationGetSmgr(bmr.rel);
+		bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
+	}
 
-	/*
-	 * Read in the page, unless the caller intends to overwrite it and just
-	 * wants us to allocate a buffer.
-	 */
-	if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
-		MemSet((char *) bufBlock, 0, BLCKSZ);
+	isLocalBuf = SmgrIsTemp(bmr.smgr);
+	if (isLocalBuf)
+	{
+		io_context = IOCONTEXT_NORMAL;
+		io_object = IOOBJECT_TEMP_RELATION;
+	}
 	else
 	{
-		instr_time	io_start = pgstat_prepare_io_time();
+		io_context = IOContextForStrategy(strategy);
+		io_object = IOOBJECT_RELATION;
+	}
+
+	for (int i = 0; i < nblocks; ++i)
+	{
+		int			io_buffers_len;
+		Buffer		io_buffers[MAX_BUFFERS_PER_TRANSFER];
+		void	   *io_pages[MAX_BUFFERS_PER_TRANSFER];
+		instr_time	io_start;
+		BlockNumber io_first_block;
 
-		smgrread(smgr, forkNum, blockNum, bufBlock);
+#ifdef USE_ASSERT_CHECKING
+
+		/*
+		 * We could get all the information from buffer headers, but it can be
+		 * expensive to access buffer header cache lines so we make the caller
+		 * provide all the information we need, and assert that it is
+		 * consistent.
+		 */
+		{
+			RelFileLocator xlocator;
+			ForkNumber	xforknum;
+			BlockNumber xblocknum;
+
+			BufferGetTag(buffers[i], &xlocator, &xforknum, &xblocknum);
+			Assert(RelFileLocatorEquals(bmr.smgr->smgr_rlocator.locator, xlocator));
+			Assert(xforknum == forknum);
+			Assert(xblocknum == blocknum + i);
+		}
+#endif
+
+		/* Skip this block if someone else has already completed it. */
+		if (!CompleteReadBuffersCanStartIO(buffers[i]))
+		{
+			/*
+			 * Report this as a 'hit' for this backend, even though it must
+			 * have started out as a miss in PrepareReadBuffer().
+			 *
+			 * XXX We may want to redesign the tracing information to report
+			 * this, if there is still interest in these static tracing points
+			 */
+			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i,
+											  bmr.smgr->smgr_rlocator.locator.spcOid,
+											  bmr.smgr->smgr_rlocator.locator.dbOid,
+											  bmr.smgr->smgr_rlocator.locator.relNumber,
+											  bmr.smgr->smgr_rlocator.backend,
+											  true);
+			continue;
+		}
 
-		pgstat_count_io_op_time(io_object, io_context,
-								IOOP_READ, io_start, 1);
+		/* We found a buffer that we have to read in. */
+		io_buffers[0] = buffers[i];
+		io_pages[0] = BufferGetBlock(buffers[i]);
+		io_first_block = blocknum + i;
+		io_buffers_len = 1;
 
-		/* check for garbage data */
-		if (!PageIsVerifiedExtended((Page) bufBlock, blockNum,
-									PIV_LOG_WARNING | PIV_REPORT_STAT))
+		/*
+		 * How many neighboring-on-disk blocks can we can scatter-read into
+		 * other buffers at the same time?
+		 */
+		while ((i + 1) < nblocks &&
+			   CompleteReadBuffersCanStartIO(buffers[i + 1]))
 		{
-			if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
+			/* Must be consecutive block numbers. */
+			Assert(BufferGetBlockNumber(buffers[i + 1]) ==
+				   BufferGetBlockNumber(buffers[i]) + 1);
+
+			io_buffers[io_buffers_len] = buffers[++i];
+			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
+		}
+
+		io_start = pgstat_prepare_io_time();
+		smgrreadv(bmr.smgr, forknum, io_first_block, io_pages, io_buffers_len);
+		pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start, 1);
+
+		/* Verify each block we read, and terminate the I/O. */
+		for (int j = 0; j < io_buffers_len; ++j)
+		{
+			BufferDesc *bufHdr;
+			Block		bufBlock;
+
+			if (isLocalBuf)
 			{
-				ereport(WARNING,
-						(errcode(ERRCODE_DATA_CORRUPTED),
-						 errmsg("invalid page in block %u of relation %s; zeroing out page",
-								blockNum,
-								relpath(smgr->smgr_rlocator, forkNum))));
-				MemSet((char *) bufBlock, 0, BLCKSZ);
+				bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1);
+				bufBlock = LocalBufHdrGetBlock(bufHdr);
 			}
 			else
-				ereport(ERROR,
-						(errcode(ERRCODE_DATA_CORRUPTED),
-						 errmsg("invalid page in block %u of relation %s",
-								blockNum,
-								relpath(smgr->smgr_rlocator, forkNum))));
-		}
-	}
-
-	/*
-	 * In RBM_ZERO_AND_LOCK / RBM_ZERO_AND_CLEANUP_LOCK mode, grab the buffer
-	 * content lock before marking the page as valid, to make sure that no
-	 * other backend sees the zeroed page before the caller has had a chance
-	 * to initialize it.
-	 *
-	 * Since no-one else can be looking at the page contents yet, there is no
-	 * difference between an exclusive lock and a cleanup-strength lock. (Note
-	 * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
-	 * they assert that the buffer is already valid.)
-	 */
-	if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
-		!isLocalBuf)
-	{
-		LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
-	}
+			{
+				bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
+				bufBlock = BufHdrGetBlock(bufHdr);
+			}
 
-	if (isLocalBuf)
-	{
-		/* Only need to adjust flags */
-		uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+			/* check for garbage data */
+			if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j,
+										PIV_LOG_WARNING | PIV_REPORT_STAT))
+			{
+				if (zero_on_error || zero_damaged_pages)
+				{
+					ereport(WARNING,
+							(errcode(ERRCODE_DATA_CORRUPTED),
+							 errmsg("invalid page in block %u of relation %s; zeroing out page",
+									io_first_block + j,
+									relpath(bmr.smgr->smgr_rlocator, forknum))));
+					memset(bufBlock, 0, BLCKSZ);
+				}
+				else
+					ereport(ERROR,
+							(errcode(ERRCODE_DATA_CORRUPTED),
+							 errmsg("invalid page in block %u of relation %s",
+									io_first_block + j,
+									relpath(bmr.smgr->smgr_rlocator, forknum))));
+			}
 
-		buf_state |= BM_VALID;
-		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
-	}
-	else
-	{
-		/* Set BM_VALID, terminate IO, and wake up any waiters */
-		TerminateBufferIO(bufHdr, false, BM_VALID);
-	}
+			/* Terminate I/O and set BM_VALID. */
+			if (isLocalBuf)
+			{
+				uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
 
-	VacuumPageMiss++;
-	if (VacuumCostActive)
-		VacuumCostBalance += VacuumCostPageMiss;
+				buf_state |= BM_VALID;
+				pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+			}
+			else
+			{
+				/* Set BM_VALID, terminate IO, and wake up any waiters */
+				TerminateBufferIO(bufHdr, false, BM_VALID);
+			}
 
-	TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-									  smgr->smgr_rlocator.locator.spcOid,
-									  smgr->smgr_rlocator.locator.dbOid,
-									  smgr->smgr_rlocator.locator.relNumber,
-									  smgr->smgr_rlocator.backend,
-									  found);
+			/* Report I/Os as completing individually. */
+			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
+											  bmr.smgr->smgr_rlocator.locator.spcOid,
+											  bmr.smgr->smgr_rlocator.locator.dbOid,
+											  bmr.smgr->smgr_rlocator.locator.relNumber,
+											  bmr.smgr->smgr_rlocator.backend,
+											  false);
+		}
 
-	return BufferDescriptorGetBuffer(bufHdr);
+		VacuumPageMiss += io_buffers_len;
+		if (VacuumCostActive)
+			VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+	}
 }
 
 /*
@@ -1204,11 +1354,8 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
  *
  * The returned buffer is pinned and is already marked as holding the
  * desired page.  If it already did have the desired page, *foundPtr is
- * set true.  Otherwise, *foundPtr is set false and the buffer is marked
- * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
- *
- * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
- * we keep it for simplicity in ReadBuffer.
+ * set true.  Otherwise, *foundPtr is set false.  A read should be
+ * performed with CompleteReadBuffers().
  *
  * io_context is passed as an output parameter to avoid calling
  * IOContextForStrategy() when there is a shared buffers hit and no IO
@@ -1220,7 +1367,9 @@ static BufferDesc *
 BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 			BlockNumber blockNum,
 			BufferAccessStrategy strategy,
-			bool *foundPtr, IOContext io_context)
+			bool *foundPtr,
+			bool *allocatedPtr,
+			IOContext io_context)
 {
 	BufferTag	newTag;			/* identity of requested block */
 	uint32		newHash;		/* hash value for newTag */
@@ -1258,24 +1407,16 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		LWLockRelease(newPartitionLock);
 
 		*foundPtr = true;
+		*allocatedPtr = false;
 
 		if (!valid)
 		{
 			/*
 			 * We can only get here if (a) someone else is still reading in
-			 * the page, or (b) a previous read attempt failed.  We have to
-			 * wait for any active read attempt to finish, and then set up our
-			 * own read attempt if the page is still not BM_VALID.
-			 * StartBufferIO does it all.
+			 * the page, (b) a previous read attempt failed, or (c) someone
+			 * called PrepareReadBuffer() but not yet CompleteReadBuffers().
 			 */
-			if (StartBufferIO(buf, true))
-			{
-				/*
-				 * If we get here, previous attempts to read the buffer must
-				 * have failed ... but we shall bravely try again.
-				 */
-				*foundPtr = false;
-			}
+			*foundPtr = false;
 		}
 
 		return buf;
@@ -1307,6 +1448,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		BufferDesc *existing_buf_hdr;
 		bool		valid;
 
+		*allocatedPtr = false;
+
 		/*
 		 * Got a collision. Someone has already done what we were about to do.
 		 * We'll just handle this as if it were found in the buffer pool in
@@ -1341,24 +1484,18 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		{
 			/*
 			 * We can only get here if (a) someone else is still reading in
-			 * the page, or (b) a previous read attempt failed.  We have to
-			 * wait for any active read attempt to finish, and then set up our
-			 * own read attempt if the page is still not BM_VALID.
-			 * StartBufferIO does it all.
+			 * the page, (b) a previous read attempt failed, or (c) someone
+			 * called PrepareReadBuffer() but not yet CompleteReadBuffers().
 			 */
-			if (StartBufferIO(existing_buf_hdr, true))
-			{
-				/*
-				 * If we get here, previous attempts to read the buffer must
-				 * have failed ... but we shall bravely try again.
-				 */
-				*foundPtr = false;
-			}
+			*foundPtr = false;
 		}
 
 		return existing_buf_hdr;
 	}
 
+	/* Report that we had to allocate a buffer. */
+	*allocatedPtr = true;
+
 	/*
 	 * Need to lock the buffer header too in order to change its tag.
 	 */
@@ -1385,15 +1522,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	LWLockRelease(newPartitionLock);
 
 	/*
-	 * Buffer contents are currently invalid.  Try to obtain the right to
-	 * start I/O.  If StartBufferIO returns false, then someone else managed
-	 * to read it before we did, so there's nothing left for BufferAlloc() to
-	 * do.
+	 * Buffer contents are currently invalid.
 	 */
-	if (StartBufferIO(victim_buf_hdr, true))
-		*foundPtr = false;
-	else
-		*foundPtr = true;
+	*foundPtr = false;
 
 	return victim_buf_hdr;
 }
@@ -2293,7 +2424,11 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
 	else
 	{
 		/*
-		 * If we previously pinned the buffer, it must surely be valid.
+		 * If we previously pinned the buffer, it is likely to be valid, but
+		 * it may not be if StartReadBuffers() was used.  We'll check by
+		 * loading the flags without locking.  This is racy, but it's OK to
+		 * return false spuriously: when ReadBuffer() or CompleteReadBuffers()
+		 * call StartBufferIO(), they'll see that it's now valid.
 		 *
 		 * Note: We deliberately avoid a Valgrind client request here.
 		 * Individual access methods can optionally superimpose buffer page
@@ -2302,7 +2437,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
 		 * that the buffer page is legitimately non-accessible here.  We
 		 * cannot meddle with that.
 		 */
-		result = true;
+		result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0;
 	}
 
 	ref->refcount++;
@@ -4752,6 +4887,46 @@ ConditionalLockBuffer(Buffer buffer)
 									LW_EXCLUSIVE);
 }
 
+/*
+ * Zero a buffer, and lock it as RBM_ZERO_AND_LOCK or
+ * RBM_ZERO_AND_CLEANUP_LOCK would.  The buffer must be already pinned.  It
+ * does not have to be valid, but it is valid and locked on return.
+ */
+void
+ZeroBuffer(Buffer buffer, ReadBufferMode mode)
+{
+	BufferDesc *bufHdr;
+	uint32		buf_state;
+
+	Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
+
+	if (BufferIsLocal(buffer))
+		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+	else
+	{
+		bufHdr = GetBufferDescriptor(buffer - 1);
+		if (mode == RBM_ZERO_AND_LOCK)
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+		else
+			LockBufferForCleanup(buffer);
+	}
+
+	memset(BufferGetPage(buffer), 0, BLCKSZ);
+
+	if (BufferIsLocal(buffer))
+	{
+		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		buf_state |= BM_VALID;
+		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+	}
+	else
+	{
+		buf_state = LockBufHdr(bufHdr);
+		buf_state |= BM_VALID;
+		UnlockBufHdr(bufHdr, buf_state);
+	}
+}
+
 /*
  * Verify that this backend is pinning the buffer exactly once.
  *
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 1735ec7141..9ec410d6c0 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -115,7 +115,7 @@ PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
  */
 BufferDesc *
 LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
-				 bool *foundPtr)
+				 bool *foundPtr, bool *allocPtr)
 {
 	BufferTag	newTag;			/* identity of requested block */
 	LocalBufferLookupEnt *hresult;
@@ -141,6 +141,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 		Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
 
 		*foundPtr = PinLocalBuffer(bufHdr, true);
+		*allocPtr = false;
 	}
 	else
 	{
@@ -167,6 +168,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
 
 		*foundPtr = false;
+		*allocPtr = true;
 	}
 
 	return bufHdr;
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index bc79a329a1..427a989aaf 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -422,7 +422,8 @@ extern PrefetchBufferResult PrefetchLocalBuffer(SMgrRelation smgr,
 												ForkNumber forkNum,
 												BlockNumber blockNum);
 extern BufferDesc *LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum,
-									BlockNumber blockNum, bool *foundPtr);
+									BlockNumber blockNum, bool *foundPtr,
+									bool *allocPtr);
 extern BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr,
 										  ForkNumber fork,
 										  uint32 flags,
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index b379c76e27..5a2f66ed47 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -14,6 +14,8 @@
 #ifndef BUFMGR_H
 #define BUFMGR_H
 
+#include "pgstat.h"
+#include "port/pg_iovec.h"
 #include "storage/block.h"
 #include "storage/buf.h"
 #include "storage/bufpage.h"
@@ -47,6 +49,8 @@ typedef enum
 	RBM_ZERO_AND_CLEANUP_LOCK,	/* Like RBM_ZERO_AND_LOCK, but locks the page
 								 * in "cleanup" mode */
 	RBM_ZERO_ON_ERROR,			/* Read, but return an all-zeros page on error */
+	RBM_WILL_ZERO,				/* Don't read from disk, caller will call
+								 * ZeroBuffer() */
 	RBM_NORMAL_NO_LOG			/* Don't log page as invalid during WAL
 								 * replay; otherwise same as RBM_NORMAL */
 } ReadBufferMode;
@@ -158,6 +162,8 @@ extern PGDLLIMPORT int32 *LocalRefCount;
 #define BUFFER_LOCK_SHARE		1
 #define BUFFER_LOCK_EXCLUSIVE	2
 
+/* Maximum number of buffers for multi-buffer I/O functions. */
+#define MAX_BUFFERS_PER_TRANSFER Min(PG_IOV_MAX, (128 * 1024) / BLCKSZ)
 
 /*
  * prototypes for functions in bufmgr.c
@@ -177,6 +183,19 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator,
 										ForkNumber forkNum, BlockNumber blockNum,
 										ReadBufferMode mode, BufferAccessStrategy strategy,
 										bool permanent);
+extern Buffer PrepareReadBuffer(BufferManagerRelation bmr,
+								ForkNumber forkNum,
+								BlockNumber blockNum,
+								BufferAccessStrategy strategy,
+								bool *found,
+								bool *allocated);
+extern void CompleteReadBuffers(BufferManagerRelation bmr,
+								Buffer *buffers,
+								ForkNumber forknum,
+								BlockNumber blocknum,
+								int nblocks,
+								bool zero_on_error,
+								BufferAccessStrategy strategy);
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
 extern void MarkBufferDirty(Buffer buffer);
@@ -245,6 +264,7 @@ extern void LockBufferForCleanup(Buffer buffer);
 extern bool ConditionalLockBufferForCleanup(Buffer buffer);
 extern bool IsBufferCleanupOK(Buffer buffer);
 extern bool HoldingBufferPinThatDelaysRecovery(void);
+extern void ZeroBuffer(Buffer buffer, ReadBufferMode mode);
 
 extern void AbortBufferIO(Buffer buffer);
 
-- 
2.39.2

