commit 4cbbbd242b7ee6333e1d1a09794aa1cd9d39035f
Author: Abhijit Menon-Sen <ams@2ndQuadrant.com>
Date:   Tue Jun 23 13:38:01 2015 +0530

    Introduce XLogLockBlockRangeForCleanup()
    
    When replaying B-Tree vacuum records, we need to confirm that a range of
    blocks is unpinned, but there was no way to ask the buffer manager if a
    block was pinned without having it read in uncached pages.
    
    This patch exposes a new interface that returns pages if they're in the
    cache, or InvalidBuffer, and uses that in a function that locks a range
    of blocks without reading them in from disk.

diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 2f85867..e49a9ba 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -416,33 +416,10 @@ btree_xlog_vacuum(XLogReaderState *record)
 	{
 		RelFileNode thisrnode;
 		BlockNumber thisblkno;
-		BlockNumber blkno;
 
 		XLogRecGetBlockTag(record, 0, &thisrnode, NULL, &thisblkno);
-
-		for (blkno = xlrec->lastBlockVacuumed + 1; blkno < thisblkno; blkno++)
-		{
-			/*
-			 * We use RBM_NORMAL_NO_LOG mode because it's not an error
-			 * condition to see all-zero pages.  The original btvacuumpage
-			 * scan would have skipped over all-zero pages, noting them in FSM
-			 * but not bothering to initialize them just yet; so we mustn't
-			 * throw an error here.  (We could skip acquiring the cleanup lock
-			 * if PageIsNew, but it's probably not worth the cycles to test.)
-			 *
-			 * XXX we don't actually need to read the block, we just need to
-			 * confirm it is unpinned. If we had a special call into the
-			 * buffer manager we could optimise this so that if the block is
-			 * not in shared_buffers we confirm it as unpinned.
-			 */
-			buffer = XLogReadBufferExtended(thisrnode, MAIN_FORKNUM, blkno,
-											RBM_NORMAL_NO_LOG);
-			if (BufferIsValid(buffer))
-			{
-				LockBufferForCleanup(buffer);
-				UnlockReleaseBuffer(buffer);
-			}
-		}
+		XLogLockBlockRangeForCleanup(thisrnode, MAIN_FORKNUM,
+									 xlrec->lastBlockVacuumed + 1, thisblkno);
 	}
 
 	/*
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index fa98b82..4023c7e 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -401,10 +401,6 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
  * In RBM_ZERO_* modes, if the page doesn't exist, the relation is extended
  * with all-zeroes pages up to the given block number.
  *
- * In RBM_NORMAL_NO_LOG mode, we return InvalidBuffer if the page doesn't
- * exist, and we don't check for all-zeroes.  Thus, no log entry is made
- * to imply that the page should be dropped or truncated later.
- *
  * NB: A redo function should normally not call this directly. To get a page
  * to modify, use XLogReplayBuffer instead. It is important that all pages
  * modified by a WAL record are registered in the WAL records, or they will be
@@ -449,8 +445,6 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 			log_invalid_page(rnode, forknum, blkno, false);
 			return InvalidBuffer;
 		}
-		if (mode == RBM_NORMAL_NO_LOG)
-			return InvalidBuffer;
 		/* OK to extend the file */
 		/* we do this in recovery only - no rel-extension lock needed */
 		Assert(InRecovery);
@@ -500,6 +494,62 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 }
 
 /*
+ * XLogLockBlockRangeForCleanup is used in Hot Standby mode to emulate
+ * the locking effects imposed by VACUUM for nbtrees.
+ */
+void
+XLogLockBlockRangeForCleanup(RelFileNode rnode, ForkNumber forkNum,
+							 BlockNumber startBlkno, BlockNumber uptoBlkno)
+{
+	BlockNumber blkno;
+	BlockNumber lastblock;
+	BlockNumber endingBlkno;
+	Buffer      buffer;
+	SMgrRelation smgr;
+
+	Assert(startBlkno != P_NEW);
+	Assert(uptoBlkno != P_NEW);
+
+	/* Open the relation at smgr level */
+	smgr = smgropen(rnode, InvalidBackendId);
+
+	/*
+	 * Create the target file if it doesn't already exist.  This lets us cope
+	 * if the replay sequence contains writes to a relation that is later
+	 * deleted.  (The original coding of this routine would instead suppress
+	 * the writes, but that seems like it risks losing valuable data if the
+	 * filesystem loses an inode during a crash.  Better to write the data
+	 * until we are actually told to delete the file.)
+	 */
+	smgrcreate(smgr, forkNum, true);
+
+	lastblock = smgrnblocks(smgr, forkNum);
+
+	endingBlkno = uptoBlkno;
+	if (lastblock < endingBlkno)
+		endingBlkno = lastblock;
+
+	for (blkno = startBlkno; blkno < endingBlkno; blkno++)
+	{
+		/*
+		 * All we need to do here is prove that we can lock each block
+		 * with a cleanup lock. It's not an error to see all-zero pages
+		 * here because the original btvacuumpage would not have thrown
+		 * an error either.
+		 *
+		 * We don't actually need to read the block, we just need to
+		 * confirm it is unpinned.
+		 */
+		buffer = GetBufferWithoutRelcache(rnode, forkNum, blkno);
+		if (BufferIsValid(buffer))
+		{
+			LockBufferForCleanup(buffer);
+			UnlockReleaseBuffer(buffer);
+		}
+	}
+}
+
+/*
  * Struct actually returned by XLogFakeRelcacheEntry, though the declared
  * return type is Relation.
  */
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index cc973b5..24c409d 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -529,8 +529,6 @@ ReadBuffer(Relation reln, BlockNumber blockNum)
  * RBM_ZERO_AND_CLEANUP_LOCK is the same as RBM_ZERO_AND_LOCK, but acquires
  * a cleanup-strength lock on the page.
  *
- * RBM_NORMAL_NO_LOG mode is treated the same as RBM_NORMAL here.
- *
  * If strategy is not NULL, a nondefault buffer access strategy is used.
  * See buffer/README for details.
  */
@@ -591,6 +589,62 @@ ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
 							 mode, strategy, &hit);
 }
 
+/*
+ * GetBufferWithoutRelcache returns Buffer iff available in shared_buffers,
+ * otherwise returns InvalidBuffer. Buffer is pinned, if available.
+ *
+ * Special purpose routine only executed during recovery, so uses a cut-down
+ * execution path rather than complicate ReadBuffer/AllocBuffer.
+ */
+Buffer
+GetBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
+						 BlockNumber blockNum)
+{
+	BufferTag	bufTag;			/* identity of requested block */
+	uint32		bufHash;		/* hash value for newTag */
+	LWLock	   *bufPartitionLock;		/* buffer partition lock for it */
+	int			buf_id;
+	volatile BufferDesc *buf;
+	SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
+	bool		valid = false;
+
+	Assert(InRecovery);
+
+	/* Make sure we will have room to remember the buffer pin */
+	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+	/* create a tag so we can lookup the buffer */
+	INIT_BUFFERTAG(bufTag, smgr->smgr_rnode.node, forkNum, blockNum);
+
+	/* determine its hash code and partition lock ID */
+	bufHash = BufTableHashCode(&bufTag);
+	bufPartitionLock = BufMappingPartitionLock(bufHash);
+
+	/* see if the block is in the buffer pool already */
+	LWLockAcquire(bufPartitionLock, LW_SHARED);
+	buf_id = BufTableLookup(&bufTag, bufHash);
+	if (buf_id >= 0)
+	{
+		/*
+		 * Found it. Now, try to pin the buffer if it's valid, but leave
+		 * its usage count alone.
+		 */
+		buf = &BufferDescriptors[buf_id];
+
+		LockBufHdr(buf);
+		valid = (buf->flags & BM_VALID) != 0;
+		if (valid)
+			PinBuffer_Locked(buf);
+		else
+			UnlockBufHdr(buf);
+	}
+	LWLockRelease(bufPartitionLock);
+
+	if (valid)
+		return BufferDescriptorGetBuffer(buf);
+
+	return InvalidBuffer;
+}
 
 /*
  * ReadBuffer_common -- common logic for all ReadBuffer variants
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 8cf51c7..b4cd762 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -43,7 +43,9 @@ extern XLogRedoAction XLogReadBufferForRedoExtended(XLogReaderState *record,
 
 extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 					   BlockNumber blkno, ReadBufferMode mode);
-
+extern void XLogLockBlockRangeForCleanup(RelFileNode rnode, ForkNumber forkNum,
+										 BlockNumber startBlkno,
+										 BlockNumber uptoBlkno);
 extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
 
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index ec0a254..b0bba41 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -40,9 +40,7 @@ typedef enum
 								 * initialize. Also locks the page. */
 	RBM_ZERO_AND_CLEANUP_LOCK,	/* Like RBM_ZERO_AND_LOCK, but locks the page
 								 * in "cleanup" mode */
-	RBM_ZERO_ON_ERROR,			/* Read, but return an all-zeros page on error */
-	RBM_NORMAL_NO_LOG			/* Don't log page as invalid during WAL
-								 * replay; otherwise same as RBM_NORMAL */
+	RBM_ZERO_ON_ERROR			/* Read, but return an all-zeros page on error */
 } ReadBufferMode;
 
 /* in globals.c ... this duplicates miscadmin.h */
@@ -153,6 +151,8 @@ extern Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum,
 extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode,
 						  ForkNumber forkNum, BlockNumber blockNum,
 						  ReadBufferMode mode, BufferAccessStrategy strategy);
+extern Buffer GetBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
+									   BlockNumber blockNum);
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
 extern void MarkBufferDirty(Buffer buffer);
