*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 71,76 ****
--- 71,83 ----
  #include "utils/syscache.h"
  #include "utils/tqual.h"
  
+ /*
+  * Checking the visibility map requires pinning the appropriate page.
+  * Ordinarily, that's not much of a problem, but for many very small tables,
+  * that can result in a lot of extra pins.  Reading the visibility map during a
+  * scan is not very important for such small tables, so we just skip it.
+  */
+ #define SCAN_VM_THRESHOLD 32
  
  /* GUC variable */
  bool		synchronize_seqscans = true;
***************
*** 269,274 **** initscan(HeapScanDesc scan, ScanKey key, bool is_rescan)
--- 276,282 ----
  	scan->rs_ctup.t_data = NULL;
  	ItemPointerSetInvalid(&scan->rs_ctup.t_self);
  	scan->rs_cbuf = InvalidBuffer;
+ 	scan->rs_cvmbuf = InvalidBuffer;
  	scan->rs_cblock = InvalidBlockNumber;
  
  	/* we don't have a marked position... */
***************
*** 373,379 **** heapgetpage(HeapScanDesc scan, BlockNumber page)
  	 * full page write. Until we can prove that beyond doubt, let's check each
  	 * tuple for visibility the hard way.
  	 */
! 	all_visible = PageIsAllVisible(dp) && !snapshot->takenDuringRecovery;
  
  	for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
  		 lineoff <= lines;
--- 381,391 ----
  	 * full page write. Until we can prove that beyond doubt, let's check each
  	 * tuple for visibility the hard way.
  	 */
! 	all_visible = !snapshot->takenDuringRecovery &&
! 		scan->rs_nblocks > SCAN_VM_THRESHOLD &&
! 		visibilitymap_test(scan->rs_rd,
! 						   BufferGetBlockNumber(buffer),
! 						   &scan->rs_cvmbuf);
  
  	for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
  		 lineoff <= lines;
***************
*** 665,671 **** heapgettup(HeapScanDesc scan,
--- 677,686 ----
  		{
  			if (BufferIsValid(scan->rs_cbuf))
  				ReleaseBuffer(scan->rs_cbuf);
+ 			if (BufferIsValid(scan->rs_cvmbuf))
+ 				ReleaseBuffer(scan->rs_cvmbuf);
  			scan->rs_cbuf = InvalidBuffer;
+ 			scan->rs_cvmbuf = InvalidBuffer;
  			scan->rs_cblock = InvalidBlockNumber;
  			tuple->t_data = NULL;
  			scan->rs_inited = false;
***************
*** 926,932 **** heapgettup_pagemode(HeapScanDesc scan,
--- 941,950 ----
  		{
  			if (BufferIsValid(scan->rs_cbuf))
  				ReleaseBuffer(scan->rs_cbuf);
+ 			if (BufferIsValid(scan->rs_cvmbuf))
+ 				ReleaseBuffer(scan->rs_cvmbuf);
  			scan->rs_cbuf = InvalidBuffer;
+ 			scan->rs_cvmbuf = InvalidBuffer;
  			scan->rs_cblock = InvalidBlockNumber;
  			tuple->t_data = NULL;
  			scan->rs_inited = false;
***************
*** 1382,1387 **** heap_rescan(HeapScanDesc scan,
--- 1400,1407 ----
  	 */
  	if (BufferIsValid(scan->rs_cbuf))
  		ReleaseBuffer(scan->rs_cbuf);
+ 	if (BufferIsValid(scan->rs_cvmbuf))
+ 		ReleaseBuffer(scan->rs_cvmbuf);
  
  	/*
  	 * reinitialize scan descriptor
***************
*** 1406,1411 **** heap_endscan(HeapScanDesc scan)
--- 1426,1433 ----
  	 */
  	if (BufferIsValid(scan->rs_cbuf))
  		ReleaseBuffer(scan->rs_cbuf);
+ 	if (BufferIsValid(scan->rs_cvmbuf))
+ 		ReleaseBuffer(scan->rs_cvmbuf);
  
  	/*
  	 * decrement relation reference count and free scan descriptor storage
***************
*** 2012,2023 **** FreeBulkInsertState(BulkInsertState bistate)
   */
  Oid
  heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			int options, BulkInsertState bistate)
  {
  	TransactionId xid = GetCurrentTransactionId();
  	HeapTuple	heaptup;
  	Buffer		buffer;
- 	Buffer		vmbuffer = InvalidBuffer;
  	bool		all_visible_cleared = false;
  
  	/*
--- 2034,2044 ----
   */
  Oid
  heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			int options, BulkInsertState bistate, Buffer *vmbuffer)
  {
  	TransactionId xid = GetCurrentTransactionId();
  	HeapTuple	heaptup;
  	Buffer		buffer;
  	bool		all_visible_cleared = false;
  
  	/*
***************
*** 2047,2066 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
  	 */
  	buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
  									   InvalidBuffer, options, bistate,
! 									   &vmbuffer, NULL);
  
  	/* NO EREPORT(ERROR) from here till changes are logged */
  	START_CRIT_SECTION();
  
  	RelationPutHeapTuple(relation, buffer, heaptup);
  
! 	if (PageIsAllVisible(BufferGetPage(buffer)))
  	{
  		all_visible_cleared = true;
- 		PageClearAllVisible(BufferGetPage(buffer));
  		visibilitymap_clear(relation,
  							ItemPointerGetBlockNumber(&(heaptup->t_self)),
! 							vmbuffer);
  	}
  
  	/*
--- 2068,2087 ----
  	 */
  	buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
  									   InvalidBuffer, options, bistate,
! 									   vmbuffer, NULL);
  
  	/* NO EREPORT(ERROR) from here till changes are logged */
  	START_CRIT_SECTION();
  
  	RelationPutHeapTuple(relation, buffer, heaptup);
  
! 	if (visibilitymap_test(relation, BufferGetBlockNumber(buffer),
! 						   vmbuffer))
  	{
  		all_visible_cleared = true;
  		visibilitymap_clear(relation,
  							ItemPointerGetBlockNumber(&(heaptup->t_self)),
! 							*vmbuffer);
  	}
  
  	/*
***************
*** 2136,2143 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
  	END_CRIT_SECTION();
  
  	UnlockReleaseBuffer(buffer);
- 	if (vmbuffer != InvalidBuffer)
- 		ReleaseBuffer(vmbuffer);
  
  	/*
  	 * If tuple is cachable, mark it for invalidation from the caches in case
--- 2157,2162 ----
***************
*** 2241,2247 **** heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
   */
  void
  heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
! 				  CommandId cid, int options, BulkInsertState bistate)
  {
  	TransactionId xid = GetCurrentTransactionId();
  	HeapTuple  *heaptuples;
--- 2260,2266 ----
   */
  void
  heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
! 				  CommandId cid, int options, BulkInsertState bistate, Buffer *vmbuffer)
  {
  	TransactionId xid = GetCurrentTransactionId();
  	HeapTuple  *heaptuples;
***************
*** 2286,2292 **** heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
  	while (ndone < ntuples)
  	{
  		Buffer		buffer;
- 		Buffer		vmbuffer = InvalidBuffer;
  		bool		all_visible_cleared = false;
  		int			nthispage;
  
--- 2305,2310 ----
***************
*** 2296,2302 **** heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
  		 */
  		buffer = RelationGetBufferForTuple(relation, heaptuples[ndone]->t_len,
  										   InvalidBuffer, options, bistate,
! 										   &vmbuffer, NULL);
  		page = BufferGetPage(buffer);
  
  		/* NO EREPORT(ERROR) from here till changes are logged */
--- 2314,2320 ----
  		 */
  		buffer = RelationGetBufferForTuple(relation, heaptuples[ndone]->t_len,
  										   InvalidBuffer, options, bistate,
! 										   vmbuffer, NULL);
  		page = BufferGetPage(buffer);
  
  		/* NO EREPORT(ERROR) from here till changes are logged */
***************
*** 2317,2329 **** heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
  			RelationPutHeapTuple(relation, buffer, heaptup);
  		}
  
! 		if (PageIsAllVisible(page))
  		{
  			all_visible_cleared = true;
- 			PageClearAllVisible(page);
  			visibilitymap_clear(relation,
  								BufferGetBlockNumber(buffer),
! 								vmbuffer);
  		}
  
  		/*
--- 2335,2347 ----
  			RelationPutHeapTuple(relation, buffer, heaptup);
  		}
  
! 		if (visibilitymap_test(relation, BufferGetBlockNumber(buffer),
! 							   vmbuffer))
  		{
  			all_visible_cleared = true;
  			visibilitymap_clear(relation,
  								BufferGetBlockNumber(buffer),
! 								*vmbuffer);
  		}
  
  		/*
***************
*** 2432,2439 **** heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
  		END_CRIT_SECTION();
  
  		UnlockReleaseBuffer(buffer);
- 		if (vmbuffer != InvalidBuffer)
- 			ReleaseBuffer(vmbuffer);
  
  		ndone += nthispage;
  	}
--- 2450,2455 ----
***************
*** 2473,2479 **** heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
  Oid
  simple_heap_insert(Relation relation, HeapTuple tup)
  {
! 	return heap_insert(relation, tup, GetCurrentCommandId(true), 0, NULL);
  }
  
  /*
--- 2489,2503 ----
  Oid
  simple_heap_insert(Relation relation, HeapTuple tup)
  {
! 	Buffer		vmbuffer = InvalidBuffer;
! 	Oid			oid;
! 
! 	oid = heap_insert(relation, tup, GetCurrentCommandId(true), 0, NULL, &vmbuffer);
! 
! 	if (BufferIsValid(vmbuffer))
! 		ReleaseBuffer(vmbuffer);
! 
! 	return oid;
  }
  
  /*
***************
*** 2524,2530 **** compute_infobits(uint16 infomask, uint16 infomask2)
  HTSU_Result
  heap_delete(Relation relation, ItemPointer tid,
  			CommandId cid, Snapshot crosscheck, bool wait,
! 			HeapUpdateFailureData *hufd)
  {
  	HTSU_Result result;
  	TransactionId xid = GetCurrentTransactionId();
--- 2548,2554 ----
  HTSU_Result
  heap_delete(Relation relation, ItemPointer tid,
  			CommandId cid, Snapshot crosscheck, bool wait,
! 			Buffer *vmbuffer, HeapUpdateFailureData *hufd)
  {
  	HTSU_Result result;
  	TransactionId xid = GetCurrentTransactionId();
***************
*** 2533,2539 **** heap_delete(Relation relation, ItemPointer tid,
  	Page		page;
  	BlockNumber block;
  	Buffer		buffer;
- 	Buffer		vmbuffer = InvalidBuffer;
  	TransactionId new_xmax;
  	uint16		new_infomask,
  				new_infomask2;
--- 2557,2562 ----
***************
*** 2548,2576 **** heap_delete(Relation relation, ItemPointer tid,
  	page = BufferGetPage(buffer);
  
  	/*
! 	 * Before locking the buffer, pin the visibility map page if it appears to
! 	 * be necessary.  Since we haven't got the lock yet, someone else might be
! 	 * in the middle of changing this, so we'll need to recheck after we have
! 	 * the lock.
  	 */
! 	if (PageIsAllVisible(page))
! 		visibilitymap_pin(relation, block, &vmbuffer);
  
  	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  
- 	/*
- 	 * If we didn't pin the visibility map page and the page has become all
- 	 * visible while we were busy locking the buffer, we'll have to unlock and
- 	 * re-lock, to avoid holding the buffer lock across an I/O.  That's a bit
- 	 * unfortunate, but hopefully shouldn't happen often.
- 	 */
- 	if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
- 	{
- 		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- 		visibilitymap_pin(relation, block, &vmbuffer);
- 		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- 	}
- 
  	lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid));
  	Assert(ItemIdIsNormal(lp));
  
--- 2571,2585 ----
  	page = BufferGetPage(buffer);
  
  	/*
! 	 * Pin the visibility map page now to avoid I/O while holding the buffer
! 	 * lock.  Since we haven't got the lock yet, someone else might be in the
! 	 * middle of changing this, so we'll need to recheck after we have the
! 	 * lock.
  	 */
! 	visibilitymap_pin(relation, block, vmbuffer);
  
  	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  
  	lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid));
  	Assert(ItemIdIsNormal(lp));
  
***************
*** 2698,2705 **** l1:
  		UnlockReleaseBuffer(buffer);
  		if (have_tuple_lock)
  			UnlockTupleTuplock(relation, &(tp.t_self), LockTupleExclusive);
- 		if (vmbuffer != InvalidBuffer)
- 			ReleaseBuffer(vmbuffer);
  		return result;
  	}
  
--- 2707,2712 ----
***************
*** 2723,2734 **** l1:
  	 */
  	PageSetPrunable(page, xid);
  
! 	if (PageIsAllVisible(page))
  	{
  		all_visible_cleared = true;
- 		PageClearAllVisible(page);
  		visibilitymap_clear(relation, BufferGetBlockNumber(buffer),
! 							vmbuffer);
  	}
  
  	/*
--- 2730,2741 ----
  	 */
  	PageSetPrunable(page, xid);
  
! 	if (visibilitymap_test(relation, BufferGetBlockNumber(buffer),
! 						   vmbuffer))
  	{
  		all_visible_cleared = true;
  		visibilitymap_clear(relation, BufferGetBlockNumber(buffer),
! 							*vmbuffer);
  	}
  
  	/*
***************
*** 2792,2800 **** l1:
  
  	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
  
- 	if (vmbuffer != InvalidBuffer)
- 		ReleaseBuffer(vmbuffer);
- 
  	/*
  	 * If the tuple has toasted out-of-line attributes, we need to delete
  	 * those items too.  We have to do this before releasing the buffer
--- 2799,2804 ----
***************
*** 2844,2853 **** simple_heap_delete(Relation relation, ItemPointer tid)
--- 2848,2859 ----
  {
  	HTSU_Result result;
  	HeapUpdateFailureData hufd;
+ 	Buffer		vmbuffer = InvalidBuffer;
  
  	result = heap_delete(relation, tid,
  						 GetCurrentCommandId(true), InvalidSnapshot,
  						 true /* wait for commit */ ,
+ 						 &vmbuffer,
  						 &hufd);
  	switch (result)
  	{
***************
*** 2868,2873 **** simple_heap_delete(Relation relation, ItemPointer tid)
--- 2874,2882 ----
  			elog(ERROR, "unrecognized heap_delete status: %u", result);
  			break;
  	}
+ 
+ 	if (BufferIsValid(vmbuffer))
+ 		ReleaseBuffer(vmbuffer);
  }
  
  /*
***************
*** 2906,2912 **** simple_heap_delete(Relation relation, ItemPointer tid)
  HTSU_Result
  heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
  			CommandId cid, Snapshot crosscheck, bool wait,
! 			HeapUpdateFailureData *hufd, LockTupleMode *lockmode)
  {
  	HTSU_Result result;
  	TransactionId xid = GetCurrentTransactionId();
--- 2915,2921 ----
  HTSU_Result
  heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
  			CommandId cid, Snapshot crosscheck, bool wait,
! 			Buffer *vmbuffer, HeapUpdateFailureData *hufd, LockTupleMode *lockmode)
  {
  	HTSU_Result result;
  	TransactionId xid = GetCurrentTransactionId();
***************
*** 2920,2926 **** heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
  	MultiXactStatus mxact_status;
  	Buffer		buffer,
  				newbuf,
- 				vmbuffer = InvalidBuffer,
  				vmbuffer_new = InvalidBuffer;
  	bool		need_toast,
  				already_marked;
--- 2929,2934 ----
***************
*** 2965,2977 **** heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
  	page = BufferGetPage(buffer);
  
  	/*
! 	 * Before locking the buffer, pin the visibility map page if it appears to
! 	 * be necessary.  Since we haven't got the lock yet, someone else might be
! 	 * in the middle of changing this, so we'll need to recheck after we have
! 	 * the lock.
  	 */
! 	if (PageIsAllVisible(page))
! 		visibilitymap_pin(relation, block, &vmbuffer);
  
  	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  
--- 2973,2984 ----
  	page = BufferGetPage(buffer);
  
  	/*
! 	 * Pin the visibility map page now to avoid I/O while holding the buffer
! 	 * lock.  Since we haven't got the lock yet, someone else might be in the
! 	 * middle of changing this, so we'll need to recheck after we have the
! 	 * lock.
  	 */
! 	visibilitymap_pin(relation, block, vmbuffer);
  
  	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  
***************
*** 3237,3267 **** l2:
  		UnlockReleaseBuffer(buffer);
  		if (have_tuple_lock)
  			UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode);
- 		if (vmbuffer != InvalidBuffer)
- 			ReleaseBuffer(vmbuffer);
  		bms_free(hot_attrs);
  		bms_free(key_attrs);
  		return result;
  	}
  
  	/*
- 	 * If we didn't pin the visibility map page and the page has become all
- 	 * visible while we were busy locking the buffer, or during some
- 	 * subsequent window during which we had it unlocked, we'll have to unlock
- 	 * and re-lock, to avoid holding the buffer lock across an I/O.  That's a
- 	 * bit unfortunate, especially since we'll now have to recheck whether the
- 	 * tuple has been locked or updated under us, but hopefully it won't
- 	 * happen very often.
- 	 */
- 	if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
- 	{
- 		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- 		visibilitymap_pin(relation, block, &vmbuffer);
- 		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- 		goto l2;
- 	}
- 
- 	/*
  	 * We're about to do the actual update -- check for conflict first, to
  	 * avoid possibly having to roll back work we've just done.
  	 */
--- 3244,3255 ----
***************
*** 3419,3425 **** l2:
  			/* Assume there's no chance to put heaptup on same page. */
  			newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
  											   buffer, 0, NULL,
! 											   &vmbuffer_new, &vmbuffer);
  		}
  		else
  		{
--- 3407,3413 ----
  			/* Assume there's no chance to put heaptup on same page. */
  			newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
  											   buffer, 0, NULL,
! 											   &vmbuffer_new, vmbuffer);
  		}
  		else
  		{
***************
*** 3437,3443 **** l2:
  				LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
  				newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
  												   buffer, 0, NULL,
! 												   &vmbuffer_new, &vmbuffer);
  			}
  			else
  			{
--- 3425,3431 ----
  				LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
  				newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
  												   buffer, 0, NULL,
! 												   &vmbuffer_new, vmbuffer);
  			}
  			else
  			{
***************
*** 3538,3555 **** l2:
  	/* record address of new tuple in t_ctid of old one */
  	oldtup.t_data->t_ctid = heaptup->t_self;
  
! 	/* clear PD_ALL_VISIBLE flags */
! 	if (PageIsAllVisible(BufferGetPage(buffer)))
  	{
  		all_visible_cleared = true;
- 		PageClearAllVisible(BufferGetPage(buffer));
  		visibilitymap_clear(relation, BufferGetBlockNumber(buffer),
! 							vmbuffer);
  	}
! 	if (newbuf != buffer && PageIsAllVisible(BufferGetPage(newbuf)))
  	{
  		all_visible_cleared_new = true;
- 		PageClearAllVisible(BufferGetPage(newbuf));
  		visibilitymap_clear(relation, BufferGetBlockNumber(newbuf),
  							vmbuffer_new);
  	}
--- 3526,3544 ----
  	/* record address of new tuple in t_ctid of old one */
  	oldtup.t_data->t_ctid = heaptup->t_self;
  
! 	/* clear VM bits */
! 	if (visibilitymap_test(relation, BufferGetBlockNumber(buffer),
! 						   vmbuffer))
  	{
  		all_visible_cleared = true;
  		visibilitymap_clear(relation, BufferGetBlockNumber(buffer),
! 							*vmbuffer);
  	}
! 	if (newbuf != buffer &&
! 		visibilitymap_test(relation, BufferGetBlockNumber(newbuf),
! 						   &vmbuffer_new))
  	{
  		all_visible_cleared_new = true;
  		visibilitymap_clear(relation, BufferGetBlockNumber(newbuf),
  							vmbuffer_new);
  	}
***************
*** 3595,3602 **** l2:
  	ReleaseBuffer(buffer);
  	if (BufferIsValid(vmbuffer_new))
  		ReleaseBuffer(vmbuffer_new);
- 	if (BufferIsValid(vmbuffer))
- 		ReleaseBuffer(vmbuffer);
  
  	/*
  	 * Release the lmgr tuple lock, if we had it.
--- 3584,3589 ----
***************
*** 3812,3821 **** simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
--- 3799,3810 ----
  	HTSU_Result result;
  	HeapUpdateFailureData hufd;
  	LockTupleMode lockmode;
+ 	Buffer		vmbuffer = InvalidBuffer;
  
  	result = heap_update(relation, otid, tup,
  						 GetCurrentCommandId(true), InvalidSnapshot,
  						 true /* wait for commit */ ,
+ 						 &vmbuffer,
  						 &hufd, &lockmode);
  	switch (result)
  	{
***************
*** 3836,3841 **** simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
--- 3825,3833 ----
  			elog(ERROR, "unrecognized heap_update status: %u", result);
  			break;
  	}
+ 
+ 	if (BufferIsValid(vmbuffer))
+ 		ReleaseBuffer(vmbuffer);
  }
  
  
***************
*** 5522,5528 **** heap_restrpos(HeapScanDesc scan)
--- 5514,5523 ----
  		 */
  		if (BufferIsValid(scan->rs_cbuf))
  			ReleaseBuffer(scan->rs_cbuf);
+ 		if (BufferIsValid(scan->rs_cvmbuf))
+ 			ReleaseBuffer(scan->rs_cvmbuf);
  		scan->rs_cbuf = InvalidBuffer;
+ 		scan->rs_cvmbuf = InvalidBuffer;
  		scan->rs_cblock = InvalidBlockNumber;
  		scan->rs_inited = false;
  	}
***************
*** 5759,5783 **** log_heap_freeze(Relation reln, Buffer buffer,
  /*
   * Perform XLogInsert for a heap-visible operation.  'block' is the block
   * being marked all-visible, and vm_buffer is the buffer containing the
!  * corresponding visibility map block.	Both should have already been modified
   * and dirtied.
-  *
-  * If checksums are enabled, we also add the heap_buffer to the chain to
-  * protect it from being torn.
   */
  XLogRecPtr
! log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
  				 TransactionId cutoff_xid)
  {
  	xl_heap_visible xlrec;
  	XLogRecPtr	recptr;
! 	XLogRecData rdata[3];
! 
! 	Assert(BufferIsValid(heap_buffer));
! 	Assert(BufferIsValid(vm_buffer));
  
  	xlrec.node = rnode;
! 	xlrec.block = BufferGetBlockNumber(heap_buffer);
  	xlrec.cutoff_xid = cutoff_xid;
  
  	rdata[0].data = (char *) &xlrec;
--- 5754,5772 ----
  /*
   * Perform XLogInsert for a heap-visible operation.  'block' is the block
   * being marked all-visible, and vm_buffer is the buffer containing the
!  * corresponding visibility map block. Both should have already been modified
   * and dirtied.
   */
  XLogRecPtr
! log_heap_visible(RelFileNode rnode, BlockNumber block, Buffer vm_buffer,
  				 TransactionId cutoff_xid)
  {
  	xl_heap_visible xlrec;
  	XLogRecPtr	recptr;
! 	XLogRecData rdata[2];
  
  	xlrec.node = rnode;
! 	xlrec.block = block;
  	xlrec.cutoff_xid = cutoff_xid;
  
  	rdata[0].data = (char *) &xlrec;
***************
*** 5791,5807 **** log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
  	rdata[1].buffer_std = false;
  	rdata[1].next = NULL;
  
- 	if (DataChecksumsEnabled())
- 	{
- 		rdata[1].next = &(rdata[2]);
- 
- 		rdata[2].data = NULL;
- 		rdata[2].len = 0;
- 		rdata[2].buffer = heap_buffer;
- 		rdata[2].buffer_std = true;
- 		rdata[2].next = NULL;
- 	}
- 
  	recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_VISIBLE, rdata);
  
  	return recptr;
--- 5780,5785 ----
***************
*** 6151,6164 **** heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record)
  	UnlockReleaseBuffer(buffer);
  }
  
- /*
-  * Replay XLOG_HEAP2_VISIBLE record.
-  *
-  * The critical integrity requirement here is that we must never end up with
-  * a situation where the visibility map bit is set, and the page-level
-  * PD_ALL_VISIBLE bit is clear.  If that were to occur, then a subsequent
-  * page modification would fail to clear the visibility map bit.
-  */
  static void
  heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
  {
--- 6129,6134 ----
***************
*** 6176,6240 **** heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
  	if (InHotStandby)
  		ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, xlrec->node);
  
- 	/*
- 	 * If heap block was backed up, restore it. This can only happen with
- 	 * checksums enabled.
- 	 */
- 	if (record->xl_info & XLR_BKP_BLOCK(1))
- 	{
- 		Assert(DataChecksumsEnabled());
- 		(void) RestoreBackupBlock(lsn, record, 1, false, false);
- 	}
- 	else
- 	{
- 		Buffer		buffer;
- 		Page		page;
- 
- 		/*
- 		 * Read the heap page, if it still exists. If the heap file has been
- 		 * dropped or truncated later in recovery, we don't need to update the
- 		 * page, but we'd better still update the visibility map.
- 		 */
- 		buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM,
- 										xlrec->block, RBM_NORMAL);
- 		if (BufferIsValid(buffer))
- 		{
- 			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- 
- 			page = (Page) BufferGetPage(buffer);
- 
- 			/*
- 			 * We don't bump the LSN of the heap page when setting the
- 			 * visibility map bit (unless checksums are enabled, in which case
- 			 * we must), because that would generate an unworkable volume of
- 			 * full-page writes.  This exposes us to torn page hazards, but
- 			 * since we're not inspecting the existing page contents in any
- 			 * way, we don't care.
- 			 *
- 			 * However, all operations that clear the visibility map bit *do*
- 			 * bump the LSN, and those operations will only be replayed if the
- 			 * XLOG LSN follows the page LSN.  Thus, if the page LSN has
- 			 * advanced past our XLOG record's LSN, we mustn't mark the page
- 			 * all-visible, because the subsequent update won't be replayed to
- 			 * clear the flag.
- 			 */
- 			if (lsn > PageGetLSN(page))
- 			{
- 				PageSetAllVisible(page);
- 				MarkBufferDirty(buffer);
- 			}
- 
- 			/* Done with heap page. */
- 			UnlockReleaseBuffer(buffer);
- 		}
- 	}
- 
- 	/*
- 	 * Even if we skipped the heap page update due to the LSN interlock, it's
- 	 * still safe to update the visibility map.  Any WAL record that clears
- 	 * the visibility map bit does so before checking the page LSN, so any
- 	 * bits that need to be cleared will still be cleared.
- 	 */
  	if (record->xl_info & XLR_BKP_BLOCK(0))
  		(void) RestoreBackupBlock(lsn, record, 0, false, false);
  	else
--- 6146,6151 ----
***************
*** 6253,6263 **** heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
  		 * before, and if this bit needs to be cleared, the record responsible
  		 * for doing so should be again replayed, and clear it.  For right
  		 * now, out of an abundance of conservatism, we use the same test here
! 		 * we did for the heap page.  If this results in a dropped bit, no
  		 * real harm is done; and the next VACUUM will fix it.
  		 */
  		if (lsn > PageGetLSN(BufferGetPage(vmbuffer)))
! 			visibilitymap_set(reln, xlrec->block, InvalidBuffer, lsn, vmbuffer,
  							  xlrec->cutoff_xid);
  
  		ReleaseBuffer(vmbuffer);
--- 6164,6174 ----
  		 * before, and if this bit needs to be cleared, the record responsible
  		 * for doing so should be again replayed, and clear it.  For right
  		 * now, out of an abundance of conservatism, we use the same test here
! 		 * that we do for a heap page.  If this results in a dropped bit, no
  		 * real harm is done; and the next VACUUM will fix it.
  		 */
  		if (lsn > PageGetLSN(BufferGetPage(vmbuffer)))
! 			visibilitymap_set(reln, xlrec->block, lsn, vmbuffer,
  							  xlrec->cutoff_xid);
  
  		ReleaseBuffer(vmbuffer);
***************
*** 6394,6402 **** heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
  	/* Mark the page as a candidate for pruning */
  	PageSetPrunable(page, record->xl_xid);
  
- 	if (xlrec->all_visible_cleared)
- 		PageClearAllVisible(page);
- 
  	/* Make sure there is no forward chain link in t_ctid */
  	htup->t_ctid = xlrec->target.tid;
  	PageSetLSN(page, lsn);
--- 6305,6310 ----
***************
*** 6499,6507 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
  
  	PageSetLSN(page, lsn);
  
- 	if (xlrec->all_visible_cleared)
- 		PageClearAllVisible(page);
- 
  	MarkBufferDirty(buffer);
  	UnlockReleaseBuffer(buffer);
  
--- 6407,6412 ----
***************
*** 6645,6653 **** heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
  
  	PageSetLSN(page, lsn);
  
- 	if (xlrec->all_visible_cleared)
- 		PageClearAllVisible(page);
- 
  	MarkBufferDirty(buffer);
  	UnlockReleaseBuffer(buffer);
  
--- 6550,6555 ----
***************
*** 6771,6779 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
  	/* Mark the page as a candidate for pruning */
  	PageSetPrunable(page, record->xl_xid);
  
- 	if (xlrec->all_visible_cleared)
- 		PageClearAllVisible(page);
- 
  	/*
  	 * this test is ugly, but necessary to avoid thinking that insert change
  	 * is already applied
--- 6673,6678 ----
***************
*** 6881,6889 **** newsame:;
  	if (offnum == InvalidOffsetNumber)
  		elog(PANIC, "heap_update_redo: failed to add tuple");
  
- 	if (xlrec->new_all_visible_cleared)
- 		PageClearAllVisible(page);
- 
  	freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
  
  	PageSetLSN(page, lsn);
--- 6780,6785 ----
*** a/src/backend/access/heap/hio.c
--- b/src/backend/access/heap/hio.c
***************
*** 98,161 **** ReadBufferBI(Relation relation, BlockNumber targetBlock,
  }
  
  /*
-  * For each heap page which is all-visible, acquire a pin on the appropriate
-  * visibility map page, if we haven't already got one.
-  *
-  * buffer2 may be InvalidBuffer, if only one buffer is involved.  buffer1
-  * must not be InvalidBuffer.  If both buffers are specified, buffer1 must
-  * be less than buffer2.
-  */
- static void
- GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
- 					 BlockNumber block1, BlockNumber block2,
- 					 Buffer *vmbuffer1, Buffer *vmbuffer2)
- {
- 	bool		need_to_pin_buffer1;
- 	bool		need_to_pin_buffer2;
- 
- 	Assert(BufferIsValid(buffer1));
- 	Assert(buffer2 == InvalidBuffer || buffer1 <= buffer2);
- 
- 	while (1)
- 	{
- 		/* Figure out which pins we need but don't have. */
- 		need_to_pin_buffer1 = PageIsAllVisible(BufferGetPage(buffer1))
- 			&& !visibilitymap_pin_ok(block1, *vmbuffer1);
- 		need_to_pin_buffer2 = buffer2 != InvalidBuffer
- 			&& PageIsAllVisible(BufferGetPage(buffer2))
- 			&& !visibilitymap_pin_ok(block2, *vmbuffer2);
- 		if (!need_to_pin_buffer1 && !need_to_pin_buffer2)
- 			return;
- 
- 		/* We must unlock both buffers before doing any I/O. */
- 		LockBuffer(buffer1, BUFFER_LOCK_UNLOCK);
- 		if (buffer2 != InvalidBuffer && buffer2 != buffer1)
- 			LockBuffer(buffer2, BUFFER_LOCK_UNLOCK);
- 
- 		/* Get pins. */
- 		if (need_to_pin_buffer1)
- 			visibilitymap_pin(relation, block1, vmbuffer1);
- 		if (need_to_pin_buffer2)
- 			visibilitymap_pin(relation, block2, vmbuffer2);
- 
- 		/* Relock buffers. */
- 		LockBuffer(buffer1, BUFFER_LOCK_EXCLUSIVE);
- 		if (buffer2 != InvalidBuffer && buffer2 != buffer1)
- 			LockBuffer(buffer2, BUFFER_LOCK_EXCLUSIVE);
- 
- 		/*
- 		 * If there are two buffers involved and we pinned just one of them,
- 		 * it's possible that the second one became all-visible while we were
- 		 * busy pinning the first one.	If it looks like that's a possible
- 		 * scenario, we'll need to make a second pass through this loop.
- 		 */
- 		if (buffer2 == InvalidBuffer || buffer1 == buffer2
- 			|| (need_to_pin_buffer1 && need_to_pin_buffer2))
- 			break;
- 	}
- }
- 
- /*
   * RelationGetBufferForTuple
   *
   *	Returns pinned and exclusive-locked buffer of a page in given relation
--- 98,103 ----
***************
*** 246,252 **** RelationGetBufferForTuple(Relation relation, Size len,
--- 188,197 ----
  												   HEAP_DEFAULT_FILLFACTOR);
  
  	if (otherBuffer != InvalidBuffer)
+ 	{
  		otherBlock = BufferGetBlockNumber(otherBuffer);
+ 		Assert(visibilitymap_pin_ok(otherBlock, *vmbuffer_other));
+ 	}
  	else
  		otherBlock = InvalidBlockNumber;		/* just to keep compiler quiet */
  
***************
*** 298,303 **** RelationGetBufferForTuple(Relation relation, Size len,
--- 243,250 ----
  
  	while (targetBlock != InvalidBlockNumber)
  	{
+ 		visibilitymap_pin(relation, targetBlock, vmbuffer);
+ 
  		/*
  		 * Read and exclusive-lock the target block, as well as the other
  		 * block if one was given, taking suitable care with lock ordering and
***************
*** 306,338 **** RelationGetBufferForTuple(Relation relation, Size len,
  		 * If the page-level all-visible flag is set, caller will need to
  		 * clear both that and the corresponding visibility map bit.  However,
  		 * by the time we return, we'll have x-locked the buffer, and we don't
! 		 * want to do any I/O while in that state.	So we check the bit here
! 		 * before taking the lock, and pin the page if it appears necessary.
! 		 * Checking without the lock creates a risk of getting the wrong
! 		 * answer, so we'll have to recheck after acquiring the lock.
  		 */
  		if (otherBuffer == InvalidBuffer)
  		{
  			/* easy case */
  			buffer = ReadBufferBI(relation, targetBlock, bistate);
- 			if (PageIsAllVisible(BufferGetPage(buffer)))
- 				visibilitymap_pin(relation, targetBlock, vmbuffer);
  			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  		}
  		else if (otherBlock == targetBlock)
  		{
  			/* also easy case */
  			buffer = otherBuffer;
- 			if (PageIsAllVisible(BufferGetPage(buffer)))
- 				visibilitymap_pin(relation, targetBlock, vmbuffer);
  			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  		}
  		else if (otherBlock < targetBlock)
  		{
  			/* lock other buffer first */
  			buffer = ReadBuffer(relation, targetBlock);
- 			if (PageIsAllVisible(BufferGetPage(buffer)))
- 				visibilitymap_pin(relation, targetBlock, vmbuffer);
  			LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
  			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  		}
--- 253,276 ----
  		 * If the page-level all-visible flag is set, caller will need to
  		 * clear both that and the corresponding visibility map bit.  However,
  		 * by the time we return, we'll have x-locked the buffer, and we don't
! 		 * want to do any I/O while in that state.	So pin the VM buffer first.
  		 */
  		if (otherBuffer == InvalidBuffer)
  		{
  			/* easy case */
  			buffer = ReadBufferBI(relation, targetBlock, bistate);
  			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  		}
  		else if (otherBlock == targetBlock)
  		{
  			/* also easy case */
  			buffer = otherBuffer;
  			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  		}
  		else if (otherBlock < targetBlock)
  		{
  			/* lock other buffer first */
  			buffer = ReadBuffer(relation, targetBlock);
  			LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
  			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  		}
***************
*** 340,380 **** RelationGetBufferForTuple(Relation relation, Size len,
  		{
  			/* lock target buffer first */
  			buffer = ReadBuffer(relation, targetBlock);
- 			if (PageIsAllVisible(BufferGetPage(buffer)))
- 				visibilitymap_pin(relation, targetBlock, vmbuffer);
  			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  			LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
  		}
  
- 		/*
- 		 * We now have the target page (and the other buffer, if any) pinned
- 		 * and locked.	However, since our initial PageIsAllVisible checks
- 		 * were performed before acquiring the lock, the results might now be
- 		 * out of date, either for the selected victim buffer, or for the
- 		 * other buffer passed by the caller.  In that case, we'll need to
- 		 * give up our locks, go get the pin(s) we failed to get earlier, and
- 		 * re-lock.  That's pretty painful, but hopefully shouldn't happen
- 		 * often.
- 		 *
- 		 * Note that there's a small possibility that we didn't pin the page
- 		 * above but still have the correct page pinned anyway, either because
- 		 * we've already made a previous pass through this loop, or because
- 		 * caller passed us the right page anyway.
- 		 *
- 		 * Note also that it's possible that by the time we get the pin and
- 		 * retake the buffer locks, the visibility map bit will have been
- 		 * cleared by some other backend anyway.  In that case, we'll have
- 		 * done a bit of extra work for no gain, but there's no real harm
- 		 * done.
- 		 */
- 		if (otherBuffer == InvalidBuffer || buffer <= otherBuffer)
- 			GetVisibilityMapPins(relation, buffer, otherBuffer,
- 								 targetBlock, otherBlock, vmbuffer,
- 								 vmbuffer_other);
- 		else
- 			GetVisibilityMapPins(relation, otherBuffer, buffer,
- 								 otherBlock, targetBlock, vmbuffer_other,
- 								 vmbuffer);
  
  		/*
  		 * Now we can check to see if there's enough free space here. If so,
--- 278,287 ----
*** a/src/backend/access/heap/tuptoaster.c
--- b/src/backend/access/heap/tuptoaster.c
***************
*** 35,40 ****
--- 35,41 ----
  #include "access/tuptoaster.h"
  #include "access/xact.h"
  #include "catalog/catalog.h"
+ #include "storage/bufmgr.h"
  #include "utils/fmgroids.h"
  #include "utils/pg_lzcompress.h"
  #include "utils/rel.h"
***************
*** 1241,1246 **** toast_save_datum(Relation rel, Datum value,
--- 1242,1248 ----
  	Relation	toastidx;
  	HeapTuple	toasttup;
  	TupleDesc	toasttupDesc;
+ 	Buffer		vmbuffer = InvalidBuffer;
  	Datum		t_values[3];
  	bool		t_isnull[3];
  	CommandId	mycid = GetCurrentCommandId(true);
***************
*** 1418,1424 **** toast_save_datum(Relation rel, Datum value,
  		memcpy(VARDATA(&chunk_data), data_p, chunk_size);
  		toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
  
! 		heap_insert(toastrel, toasttup, mycid, options, NULL);
  
  		/*
  		 * Create the index entry.	We cheat a little here by not using
--- 1420,1426 ----
  		memcpy(VARDATA(&chunk_data), data_p, chunk_size);
  		toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
  
! 		heap_insert(toastrel, toasttup, mycid, options, NULL, &vmbuffer);
  
  		/*
  		 * Create the index entry.	We cheat a little here by not using
***************
*** 1446,1451 **** toast_save_datum(Relation rel, Datum value,
--- 1448,1457 ----
  		data_p += chunk_size;
  	}
  
+ 	/* release visibility map buffer */
+ 	if (BufferIsValid(vmbuffer))
+ 		ReleaseBuffer(vmbuffer);
+ 
  	/*
  	 * Done - close toast relation
  	 */
*** a/src/backend/access/heap/visibilitymap.c
--- b/src/backend/access/heap/visibilitymap.c
***************
*** 31,47 ****
   * must make sure that whenever a bit is cleared, the bit is cleared on WAL
   * replay of the updating operation as well.
   *
!  * When we *set* a visibility map during VACUUM, we must write WAL.  This may
!  * seem counterintuitive, since the bit is basically a hint: if it is clear,
!  * it may still be the case that every tuple on the page is visible to all
!  * transactions; we just don't know that for certain.  The difficulty is that
!  * there are two bits which are typically set together: the PD_ALL_VISIBLE bit
!  * on the page itself, and the visibility map bit.	If a crash occurs after the
!  * visibility map page makes it to disk and before the updated heap page makes
!  * it to disk, redo must set the bit on the heap page.	Otherwise, the next
!  * insert, update, or delete on the heap page will fail to realize that the
!  * visibility map bit must be cleared, possibly causing index-only scans to
!  * return wrong answers.
   *
   * VACUUM will normally skip pages for which the visibility map bit is set;
   * such pages can't contain any dead tuples and therefore don't need vacuuming.
--- 31,39 ----
   * must make sure that whenever a bit is cleared, the bit is cleared on WAL
   * replay of the updating operation as well.
   *
!  * We write WAL for setting a visibilitymap bit, even though it's a hint, to
!  * make sure that the action that makes a page all visible reaches disk before
!  * the VM bit does.
   *
   * VACUUM will normally skip pages for which the visibility map bit is set;
   * such pages can't contain any dead tuples and therefore don't need vacuuming.
***************
*** 57,71 ****
   * while still holding a lock on the heap page and in the same critical
   * section that logs the page modification. However, we don't want to hold
   * the buffer lock over any I/O that may be required to read in the visibility
!  * map page.  To avoid this, we examine the heap page before locking it;
!  * if the page-level PD_ALL_VISIBLE bit is set, we pin the visibility map
!  * bit.  Then, we lock the buffer.	But this creates a race condition: there
!  * is a possibility that in the time it takes to lock the buffer, the
!  * PD_ALL_VISIBLE bit gets set.  If that happens, we have to unlock the
!  * buffer, pin the visibility map page, and relock the buffer.	This shouldn't
!  * happen often, because only VACUUM currently sets visibility map bits,
!  * and the race will only occur if VACUUM processes a given page at almost
!  * exactly the same time that someone tries to further modify it.
   *
   * To set a bit, you need to hold a lock on the heap page. That prevents
   * the race condition where VACUUM sees that all tuples on the page are
--- 49,57 ----
   * while still holding a lock on the heap page and in the same critical
   * section that logs the page modification. However, we don't want to hold
   * the buffer lock over any I/O that may be required to read in the visibility
!  * map page.  To avoid this, we pin the visibility map page first, then lock
!  * the buffer.  We hold a pin on the VM buffer across multiple calls where
!  * possible.
   *
   * To set a bit, you need to hold a lock on the heap page. That prevents
   * the race condition where VACUUM sees that all tuples on the page are
***************
*** 78,83 ****
--- 64,78 ----
   * But when a bit is cleared, we don't have to do that because it's always
   * safe to clear a bit in the map from correctness point of view.
   *
+  * To test a bit in the visibility map, most callers should have a pin on the
+  * VM buffer, and at least a shared lock on the data buffer. Any process that
+  * clears the VM bit must have an exclusive lock on the data buffer, so that
+  * will serialize access to the appropriate bit. Because lock acquisition and
+  * release are full memory barriers, then there is no danger of seeing the
+  * state of the bit before it was last cleared. Callers that don't have the
+  * data buffer yet, such as an index only scan or a VACUUM that is skipping
+  * pages, must handle the concurrency as appropriate.
+  *
   *-------------------------------------------------------------------------
   */
  #include "postgres.h"
***************
*** 243,249 **** visibilitymap_pin_ok(BlockNumber heapBlk, Buffer buf)
   * any I/O.
   */
  void
! visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
  				  XLogRecPtr recptr, Buffer vmBuf, TransactionId cutoff_xid)
  {
  	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
--- 238,244 ----
   * any I/O.
   */
  void
! visibilitymap_set(Relation rel, BlockNumber heapBlk,
  				  XLogRecPtr recptr, Buffer vmBuf, TransactionId cutoff_xid)
  {
  	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
***************
*** 257,267 **** visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
  #endif
  
  	Assert(InRecovery || XLogRecPtrIsInvalid(recptr));
- 	Assert(InRecovery || BufferIsValid(heapBuf));
- 
- 	/* Check that we have the right heap page pinned, if present */
- 	if (BufferIsValid(heapBuf) && BufferGetBlockNumber(heapBuf) != heapBlk)
- 		elog(ERROR, "wrong heap buffer passed to visibilitymap_set");
  
  	/* Check that we have the right VM page pinned */
  	if (!BufferIsValid(vmBuf) || BufferGetBlockNumber(vmBuf) != mapBlock)
--- 252,257 ----
***************
*** 278,304 **** visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
  		map[mapByte] |= (1 << mapBit);
  		MarkBufferDirty(vmBuf);
  
  		if (RelationNeedsWAL(rel))
  		{
  			if (XLogRecPtrIsInvalid(recptr))
! 			{
! 				Assert(!InRecovery);
! 				recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf,
! 										  cutoff_xid);
! 
! 				/*
! 				 * If data checksums are enabled, we need to protect the heap
! 				 * page from being torn.
! 				 */
! 				if (DataChecksumsEnabled())
! 				{
! 					Page		heapPage = BufferGetPage(heapBuf);
! 
! 					/* caller is expected to set PD_ALL_VISIBLE first */
! 					Assert(PageIsAllVisible(heapPage));
! 					PageSetLSN(heapPage, recptr);
! 				}
! 			}
  			PageSetLSN(page, recptr);
  		}
  
--- 268,283 ----
  		map[mapByte] |= (1 << mapBit);
  		MarkBufferDirty(vmBuf);
  
+ 		/*
+ 		 * Although this is a hint, we still need to make sure that the WAL for
+ 		 * the action that made the page all-visible is written before the VM
+ 		 * bit.
+ 		 */
  		if (RelationNeedsWAL(rel))
  		{
  			if (XLogRecPtrIsInvalid(recptr))
! 				recptr = log_heap_visible(rel->rd_node, heapBlk,
! 										  vmBuf, cutoff_xid);
  			PageSetLSN(page, recptr);
  		}
  
*** a/src/backend/commands/copy.c
--- b/src/backend/commands/copy.c
***************
*** 38,43 ****
--- 38,44 ----
  #include "optimizer/planner.h"
  #include "parser/parse_relation.h"
  #include "rewrite/rewriteHandler.h"
+ #include "storage/bufmgr.h"
  #include "storage/fd.h"
  #include "tcop/tcopprot.h"
  #include "utils/acl.h"
***************
*** 292,298 **** static uint64 CopyFrom(CopyState cstate);
  static void CopyFromInsertBatch(CopyState cstate, EState *estate,
  					CommandId mycid, int hi_options,
  					ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
! 					BulkInsertState bistate,
  					int nBufferedTuples, HeapTuple *bufferedTuples,
  					int firstBufferedLineNo);
  static bool CopyReadLine(CopyState cstate);
--- 293,299 ----
  static void CopyFromInsertBatch(CopyState cstate, EState *estate,
  					CommandId mycid, int hi_options,
  					ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
! 					BulkInsertState bistate, Buffer *vmbuffer,
  					int nBufferedTuples, HeapTuple *bufferedTuples,
  					int firstBufferedLineNo);
  static bool CopyReadLine(CopyState cstate);
***************
*** 2009,2014 **** CopyFrom(CopyState cstate)
--- 2010,2016 ----
  	CommandId	mycid = GetCurrentCommandId(true);
  	int			hi_options = 0; /* start with default heap_insert options */
  	BulkInsertState bistate;
+ 	Buffer 		vmbuffer = InvalidBuffer;
  	uint64		processed = 0;
  	bool		useHeapMultiInsert;
  	int			nBufferedTuples = 0;
***************
*** 2263,2270 **** CopyFrom(CopyState cstate)
  				{
  					CopyFromInsertBatch(cstate, estate, mycid, hi_options,
  										resultRelInfo, myslot, bistate,
! 										nBufferedTuples, bufferedTuples,
! 										firstBufferedLineNo);
  					nBufferedTuples = 0;
  					bufferedTuplesSize = 0;
  				}
--- 2265,2272 ----
  				{
  					CopyFromInsertBatch(cstate, estate, mycid, hi_options,
  										resultRelInfo, myslot, bistate,
! 										&vmbuffer, nBufferedTuples,
! 										bufferedTuples, firstBufferedLineNo);
  					nBufferedTuples = 0;
  					bufferedTuplesSize = 0;
  				}
***************
*** 2274,2280 **** CopyFrom(CopyState cstate)
  				List	   *recheckIndexes = NIL;
  
  				/* OK, store the tuple and create index entries for it */
! 				heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
  
  				if (resultRelInfo->ri_NumIndices > 0)
  					recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
--- 2276,2282 ----
  				List	   *recheckIndexes = NIL;
  
  				/* OK, store the tuple and create index entries for it */
! 				heap_insert(cstate->rel, tuple, mycid, hi_options, bistate, &vmbuffer);
  
  				if (resultRelInfo->ri_NumIndices > 0)
  					recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
***************
*** 2299,2305 **** CopyFrom(CopyState cstate)
  	/* Flush any remaining buffered tuples */
  	if (nBufferedTuples > 0)
  		CopyFromInsertBatch(cstate, estate, mycid, hi_options,
! 							resultRelInfo, myslot, bistate,
  							nBufferedTuples, bufferedTuples,
  							firstBufferedLineNo);
  
--- 2301,2307 ----
  	/* Flush any remaining buffered tuples */
  	if (nBufferedTuples > 0)
  		CopyFromInsertBatch(cstate, estate, mycid, hi_options,
! 							resultRelInfo, myslot, bistate, &vmbuffer,
  							nBufferedTuples, bufferedTuples,
  							firstBufferedLineNo);
  
***************
*** 2308,2313 **** CopyFrom(CopyState cstate)
--- 2310,2318 ----
  
  	FreeBulkInsertState(bistate);
  
+ 	if (BufferIsValid(vmbuffer))
+ 		ReleaseBuffer(vmbuffer);
+ 
  	MemoryContextSwitchTo(oldcontext);
  
  	/* Execute AFTER STATEMENT insertion triggers */
***************
*** 2344,2351 **** static void
  CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
  					int hi_options, ResultRelInfo *resultRelInfo,
  					TupleTableSlot *myslot, BulkInsertState bistate,
! 					int nBufferedTuples, HeapTuple *bufferedTuples,
! 					int firstBufferedLineNo)
  {
  	MemoryContext oldcontext;
  	int			i;
--- 2349,2356 ----
  CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
  					int hi_options, ResultRelInfo *resultRelInfo,
  					TupleTableSlot *myslot, BulkInsertState bistate,
! 					Buffer *vmbuffer, int nBufferedTuples,
! 					HeapTuple *bufferedTuples, int firstBufferedLineNo)
  {
  	MemoryContext oldcontext;
  	int			i;
***************
*** 2368,2374 **** CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
  					  nBufferedTuples,
  					  mycid,
  					  hi_options,
! 					  bistate);
  	MemoryContextSwitchTo(oldcontext);
  
  	/*
--- 2373,2380 ----
  					  nBufferedTuples,
  					  mycid,
  					  hi_options,
! 					  bistate,
! 					  vmbuffer);
  	MemoryContextSwitchTo(oldcontext);
  
  	/*
*** a/src/backend/commands/createas.c
--- b/src/backend/commands/createas.c
***************
*** 35,40 ****
--- 35,41 ----
  #include "commands/view.h"
  #include "parser/parse_clause.h"
  #include "rewrite/rewriteHandler.h"
+ #include "storage/bufmgr.h"
  #include "storage/smgr.h"
  #include "tcop/tcopprot.h"
  #include "utils/builtins.h"
***************
*** 52,57 **** typedef struct
--- 53,59 ----
  	CommandId	output_cid;		/* cmin to insert in output tuples */
  	int			hi_options;		/* heap_insert performance options */
  	BulkInsertState bistate;	/* bulk insert state */
+ 	Buffer		vmbuffer;		/* current visibility map buffer */
  } DR_intorel;
  
  static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
***************
*** 397,402 **** intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
--- 399,405 ----
  	myState->hi_options = HEAP_INSERT_SKIP_FSM |
  		(XLogIsNeeded() ? 0 : HEAP_INSERT_SKIP_WAL);
  	myState->bistate = GetBulkInsertState();
+ 	myState->vmbuffer = InvalidBuffer;
  
  	/* Not using WAL requires smgr_targblock be initially invalid */
  	Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
***************
*** 427,433 **** intorel_receive(TupleTableSlot *slot, DestReceiver *self)
  				tuple,
  				myState->output_cid,
  				myState->hi_options,
! 				myState->bistate);
  
  	/* We know this is a newly created relation, so there are no indexes */
  }
--- 430,437 ----
  				tuple,
  				myState->output_cid,
  				myState->hi_options,
! 				myState->bistate,
! 				&myState->vmbuffer);
  
  	/* We know this is a newly created relation, so there are no indexes */
  }
***************
*** 442,447 **** intorel_shutdown(DestReceiver *self)
--- 446,454 ----
  
  	FreeBulkInsertState(myState->bistate);
  
+ 	if (BufferIsValid(myState->vmbuffer))
+ 		ReleaseBuffer(myState->vmbuffer);
+ 
  	/* If we skipped using WAL, must heap_sync before commit */
  	if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
  		heap_sync(myState->rel);
*** a/src/backend/commands/matview.c
--- b/src/backend/commands/matview.c
***************
*** 26,31 ****
--- 26,32 ----
  #include "executor/executor.h"
  #include "miscadmin.h"
  #include "rewrite/rewriteHandler.h"
+ #include "storage/bufmgr.h"
  #include "storage/smgr.h"
  #include "tcop/tcopprot.h"
  #include "utils/rel.h"
***************
*** 42,47 **** typedef struct
--- 43,49 ----
  	CommandId	output_cid;		/* cmin to insert in output tuples */
  	int			hi_options;		/* heap_insert performance options */
  	BulkInsertState bistate;	/* bulk insert state */
+ 	Buffer		vmbuffer;		/* current visibility map buffer */
  } DR_transientrel;
  
  static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
***************
*** 337,343 **** transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
  				tuple,
  				myState->output_cid,
  				myState->hi_options,
! 				myState->bistate);
  
  	/* We know this is a newly created relation, so there are no indexes */
  }
--- 339,346 ----
  				tuple,
  				myState->output_cid,
  				myState->hi_options,
! 				myState->bistate,
! 				&myState->vmbuffer);
  
  	/* We know this is a newly created relation, so there are no indexes */
  }
***************
*** 352,357 **** transientrel_shutdown(DestReceiver *self)
--- 355,363 ----
  
  	FreeBulkInsertState(myState->bistate);
  
+ 	if (BufferIsValid(myState->vmbuffer))
+ 		ReleaseBuffer(myState->vmbuffer);
+ 
  	/* If we skipped using WAL, must heap_sync before commit */
  	if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
  		heap_sync(myState->transientrel);
*** a/src/backend/commands/tablecmds.c
--- b/src/backend/commands/tablecmds.c
***************
*** 3637,3642 **** ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
--- 3637,3643 ----
  	EState	   *estate;
  	CommandId	mycid;
  	BulkInsertState bistate;
+ 	Buffer		vmbuffer = InvalidBuffer;
  	int			hi_options;
  
  	/*
***************
*** 3885,3891 **** ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
  
  			/* Write the tuple out to the new relation */
  			if (newrel)
! 				heap_insert(newrel, tuple, mycid, hi_options, bistate);
  
  			ResetExprContext(econtext);
  
--- 3886,3892 ----
  
  			/* Write the tuple out to the new relation */
  			if (newrel)
! 				heap_insert(newrel, tuple, mycid, hi_options, bistate, &vmbuffer);
  
  			ResetExprContext(econtext);
  
***************
*** 3905,3910 **** ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
--- 3906,3913 ----
  	if (newrel)
  	{
  		FreeBulkInsertState(bistate);
+ 		if (BufferIsValid(vmbuffer))
+ 			ReleaseBuffer(vmbuffer);
  
  		/* If we skipped writing WAL, then we need to sync the heap. */
  		if (hi_options & HEAP_INSERT_SKIP_WAL)
*** a/src/backend/commands/vacuumlazy.c
--- b/src/backend/commands/vacuumlazy.c
***************
*** 661,671 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
  			freespace = PageGetHeapFreeSpace(page);
  
  			/* empty pages are always all-visible */
! 			if (!PageIsAllVisible(page))
  			{
- 				PageSetAllVisible(page);
  				MarkBufferDirty(buf);
! 				visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
  								  vmbuffer, InvalidTransactionId);
  			}
  
--- 661,671 ----
  			freespace = PageGetHeapFreeSpace(page);
  
  			/* empty pages are always all-visible */
! 			if (!visibilitymap_test(onerel, BufferGetBlockNumber(buf),
! 									&vmbuffer))
  			{
  				MarkBufferDirty(buf);
! 				visibilitymap_set(onerel, blkno, InvalidXLogRecPtr,
  								  vmbuffer, InvalidTransactionId);
  			}
  
***************
*** 675,680 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 675,690 ----
  		}
  
  		/*
+ 		 * If this page is left over from an upgraded system, it may have a
+ 		 * PD_ALL_VISIBLE bit set (which is deprecated). If so, clear it.
+ 		 */
+ 		if (PageIsAllVisible(page))
+ 		{
+ 			PageClearAllVisible(page);
+ 			MarkBufferDirty(buf);
+ 		}
+ 
+ 		/*
  		 * Prune all HOT-update chains in this page.
  		 *
  		 * We count tuples removed by the pruning step as removed by VACUUM.
***************
*** 896,933 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
  		/* mark page all-visible, if appropriate */
  		if (all_visible && !all_visible_according_to_vm)
  		{
! 			/*
! 			 * It should never be the case that the visibility map page is set
! 			 * while the page-level bit is clear, but the reverse is allowed
! 			 * (if checksums are not enabled).	Regardless, set the both bits
! 			 * so that we get back in sync.
! 			 *
! 			 * NB: If the heap page is all-visible but the VM bit is not set,
! 			 * we don't need to dirty the heap page.  However, if checksums
! 			 * are enabled, we do need to make sure that the heap page is
! 			 * dirtied before passing it to visibilitymap_set(), because it
! 			 * may be logged.  Given that this situation should only happen in
! 			 * rare cases after a crash, it is not worth optimizing.
! 			 */
! 			PageSetAllVisible(page);
! 			MarkBufferDirty(buf);
! 			visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
! 							  vmbuffer, visibility_cutoff_xid);
! 		}
! 
! 		/*
! 		 * As of PostgreSQL 9.2, the visibility map bit should never be set if
! 		 * the page-level bit is clear.  However, it's possible that the bit
! 		 * got cleared after we checked it and before we took the buffer
! 		 * content lock, so we must recheck before jumping to the conclusion
! 		 * that something bad has happened.
! 		 */
! 		else if (all_visible_according_to_vm && !PageIsAllVisible(page)
! 				 && visibilitymap_test(onerel, blkno, &vmbuffer))
! 		{
! 			elog(WARNING, "page is not marked all-visible but visibility map bit is set in relation \"%s\" page %u",
! 				 relname, blkno);
! 			visibilitymap_clear(onerel, blkno, vmbuffer);
  		}
  
  		/*
--- 906,913 ----
  		/* mark page all-visible, if appropriate */
  		if (all_visible && !all_visible_according_to_vm)
  		{
! 			visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, vmbuffer,
! 							  visibility_cutoff_xid);
  		}
  
  		/*
***************
*** 943,949 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
  		 * There should never be dead tuples on a page with PD_ALL_VISIBLE
  		 * set, however.
  		 */
! 		else if (PageIsAllVisible(page) && has_dead_tuples)
  		{
  			elog(WARNING, "page containing dead tuples is marked as all-visible in relation \"%s\" page %u",
  				 relname, blkno);
--- 923,929 ----
  		 * There should never be dead tuples on a page with PD_ALL_VISIBLE
  		 * set, however.
  		 */
! 		else if (all_visible_according_to_vm && has_dead_tuples)
  		{
  			elog(WARNING, "page containing dead tuples is marked as all-visible in relation \"%s\" page %u",
  				 relname, blkno);
***************
*** 952,957 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 932,938 ----
  			visibilitymap_clear(onerel, blkno, vmbuffer);
  		}
  
+ 
  		UnlockReleaseBuffer(buf);
  
  		/* Remember the location of the last page with nonremovable tuples */
***************
*** 1153,1160 **** lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
  		heap_page_is_all_visible(buffer, &visibility_cutoff_xid))
  	{
  		Assert(BufferIsValid(*vmbuffer));
! 		PageSetAllVisible(page);
! 		visibilitymap_set(onerel, blkno, buffer, InvalidXLogRecPtr, *vmbuffer,
  						  visibility_cutoff_xid);
  	}
  
--- 1134,1140 ----
  		heap_page_is_all_visible(buffer, &visibility_cutoff_xid))
  	{
  		Assert(BufferIsValid(*vmbuffer));
! 		visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, *vmbuffer,
  						  visibility_cutoff_xid);
  	}
  
*** a/src/backend/executor/nodeIndexonlyscan.c
--- b/src/backend/executor/nodeIndexonlyscan.c
***************
*** 88,102 **** IndexOnlyNext(IndexOnlyScanState *node)
  		 * Note on Memory Ordering Effects: visibilitymap_test does not lock
  		 * the visibility map buffer, and therefore the result we read here
  		 * could be slightly stale.  However, it can't be stale enough to
! 		 * matter.	It suffices to show that (1) there is a read barrier
! 		 * between the time we read the index TID and the time we test the
! 		 * visibility map; and (2) there is a write barrier between the time
! 		 * some other concurrent process clears the visibility map bit and the
! 		 * time it inserts the index TID.  Since acquiring or releasing a
! 		 * LWLock interposes a full barrier, this is easy to show: (1) is
! 		 * satisfied by the release of the index buffer content lock after
! 		 * reading the TID; and (2) is satisfied by the acquisition of the
! 		 * buffer content lock in order to insert the TID.
  		 */
  		if (!visibilitymap_test(scandesc->heapRelation,
  								ItemPointerGetBlockNumber(tid),
--- 88,118 ----
  		 * Note on Memory Ordering Effects: visibilitymap_test does not lock
  		 * the visibility map buffer, and therefore the result we read here
  		 * could be slightly stale.  However, it can't be stale enough to
! 		 * matter.
! 		 *
! 		 * We need to detect clearing a VM bit due to an insert right away,
! 		 * because the tuple is present in the index page but not visible. The
! 		 * reading of the TID by this scan (using a shared lock on the index
! 		 * buffer) is serialized with the insert of the TID into the index
! 		 * (using an exclusive lock on the index buffer). Because the VM bit is
! 		 * cleared before updating the index, and locking/unlocking of the
! 		 * index page acts as a full memory barrier, we are sure to see the
! 		 * cleared bit if we see a recently-inserted TID.
! 		 *
! 		 * Deletes do not update the index page (only VACUUM will clear out the
! 		 * TID), so the clearing of the VM bit by a delete is not serialized
! 		 * with this test below, and we may see a value that is significantly
! 		 * stale. However, we don't care about the delete right away, because
! 		 * the tuple is still visible until the deleting transaction commits or
! 		 * the statement ends (if it's our transaction). In either case, the
! 		 * lock on the VM buffer will have been released (acting as a write
! 		 * barrier) after clearing the bit. And for us to have a snapshot that
! 		 * includes the deleting transaction (making the tuple invisible), we
! 		 * must have acquired ProcArrayLock after that time, acting as a read
! 		 * barrier.
! 		 *
! 		 * It's worth going through this complexity to avoid needing to lock
! 		 * the VM buffer, which could cause significant contention.
  		 */
  		if (!visibilitymap_test(scandesc->heapRelation,
  								ItemPointerGetBlockNumber(tid),
*** a/src/backend/executor/nodeModifyTable.c
--- b/src/backend/executor/nodeModifyTable.c
***************
*** 164,170 **** static TupleTableSlot *
  ExecInsert(TupleTableSlot *slot,
  		   TupleTableSlot *planSlot,
  		   EState *estate,
! 		   bool canSetTag)
  {
  	HeapTuple	tuple;
  	ResultRelInfo *resultRelInfo;
--- 164,171 ----
  ExecInsert(TupleTableSlot *slot,
  		   TupleTableSlot *planSlot,
  		   EState *estate,
! 		   bool canSetTag,
! 		   Buffer *vmbuffer)
  {
  	HeapTuple	tuple;
  	ResultRelInfo *resultRelInfo;
***************
*** 259,265 **** ExecInsert(TupleTableSlot *slot,
  		 * the t_self field.
  		 */
  		newId = heap_insert(resultRelationDesc, tuple,
! 							estate->es_output_cid, 0, NULL);
  
  		/*
  		 * insert index entries for tuple
--- 260,266 ----
  		 * the t_self field.
  		 */
  		newId = heap_insert(resultRelationDesc, tuple,
! 							estate->es_output_cid, 0, NULL, vmbuffer);
  
  		/*
  		 * insert index entries for tuple
***************
*** 311,317 **** ExecDelete(ItemPointer tupleid,
  		   TupleTableSlot *planSlot,
  		   EPQState *epqstate,
  		   EState *estate,
! 		   bool canSetTag)
  {
  	ResultRelInfo *resultRelInfo;
  	Relation	resultRelationDesc;
--- 312,319 ----
  		   TupleTableSlot *planSlot,
  		   EPQState *epqstate,
  		   EState *estate,
! 		   bool canSetTag,
! 		   Buffer *vmbuffer)
  {
  	ResultRelInfo *resultRelInfo;
  	Relation	resultRelationDesc;
***************
*** 393,398 **** ldelete:;
--- 395,401 ----
  							 estate->es_output_cid,
  							 estate->es_crosscheck_snapshot,
  							 true /* wait for commit */ ,
+ 							 vmbuffer,
  							 &hufd);
  		switch (result)
  		{
***************
*** 567,573 **** ExecUpdate(ItemPointer tupleid,
  		   TupleTableSlot *planSlot,
  		   EPQState *epqstate,
  		   EState *estate,
! 		   bool canSetTag)
  {
  	HeapTuple	tuple;
  	ResultRelInfo *resultRelInfo;
--- 570,577 ----
  		   TupleTableSlot *planSlot,
  		   EPQState *epqstate,
  		   EState *estate,
! 		   bool canSetTag,
! 		   Buffer *vmbuffer)
  {
  	HeapTuple	tuple;
  	ResultRelInfo *resultRelInfo;
***************
*** 675,680 **** lreplace:;
--- 679,685 ----
  							 estate->es_output_cid,
  							 estate->es_crosscheck_snapshot,
  							 true /* wait for commit */ ,
+ 							 vmbuffer,
  							 &hufd, &lockmode);
  		switch (result)
  		{
***************
*** 991,1005 **** ExecModifyTable(ModifyTableState *node)
  		switch (operation)
  		{
  			case CMD_INSERT:
! 				slot = ExecInsert(slot, planSlot, estate, node->canSetTag);
  				break;
  			case CMD_UPDATE:
  				slot = ExecUpdate(tupleid, oldtuple, slot, planSlot,
! 								&node->mt_epqstate, estate, node->canSetTag);
  				break;
  			case CMD_DELETE:
  				slot = ExecDelete(tupleid, oldtuple, planSlot,
! 								&node->mt_epqstate, estate, node->canSetTag);
  				break;
  			default:
  				elog(ERROR, "unknown operation");
--- 996,1013 ----
  		switch (operation)
  		{
  			case CMD_INSERT:
! 				slot = ExecInsert(slot, planSlot, estate, node->canSetTag,
! 								  &node->mt_vmbuffer);
  				break;
  			case CMD_UPDATE:
  				slot = ExecUpdate(tupleid, oldtuple, slot, planSlot,
! 								  &node->mt_epqstate, estate, node->canSetTag,
! 								  &node->mt_vmbuffer);
  				break;
  			case CMD_DELETE:
  				slot = ExecDelete(tupleid, oldtuple, planSlot,
! 								  &node->mt_epqstate, estate, node->canSetTag,
! 								  &node->mt_vmbuffer);
  				break;
  			default:
  				elog(ERROR, "unknown operation");
***************
*** 1060,1065 **** ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
--- 1068,1074 ----
  
  	mtstate->operation = operation;
  	mtstate->canSetTag = node->canSetTag;
+ 	mtstate->mt_vmbuffer = InvalidBuffer;
  	mtstate->mt_done = false;
  
  	mtstate->mt_plans = (PlanState **) palloc0(sizeof(PlanState *) * nplans);
***************
*** 1376,1381 **** ExecEndModifyTable(ModifyTableState *node)
--- 1385,1396 ----
  	EvalPlanQualEnd(&node->mt_epqstate);
  
  	/*
+ 	 * Release visibility map buffer if pinned
+ 	 */
+ 	if (BufferIsValid(node->mt_vmbuffer))
+ 		ReleaseBuffer(node->mt_vmbuffer);
+ 
+ 	/*
  	 * shut down subplans
  	 */
  	for (i = 0; i < node->mt_nplans; i++)
*** a/src/include/access/heapam.h
--- b/src/include/access/heapam.h
***************
*** 131,146 **** extern BulkInsertState GetBulkInsertState(void);
  extern void FreeBulkInsertState(BulkInsertState);
  
  extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			int options, BulkInsertState bistate);
  extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
! 				  CommandId cid, int options, BulkInsertState bistate);
  extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
  			CommandId cid, Snapshot crosscheck, bool wait,
! 			HeapUpdateFailureData *hufd);
  extern HTSU_Result heap_update(Relation relation, ItemPointer otid,
  			HeapTuple newtup,
  			CommandId cid, Snapshot crosscheck, bool wait,
! 			HeapUpdateFailureData *hufd, LockTupleMode *lockmode);
  extern HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
  				CommandId cid, LockTupleMode mode, bool nowait,
  				bool follow_update,
--- 131,147 ----
  extern void FreeBulkInsertState(BulkInsertState);
  
  extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 					   int options, BulkInsertState bistate, Buffer *vmbuffer);
  extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
! 					   CommandId cid, int options, BulkInsertState bistate,
! 					   Buffer *vmbuffer);
  extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
  			CommandId cid, Snapshot crosscheck, bool wait,
! 			Buffer *vmbuffer, HeapUpdateFailureData *hufd);
  extern HTSU_Result heap_update(Relation relation, ItemPointer otid,
  			HeapTuple newtup,
  			CommandId cid, Snapshot crosscheck, bool wait,
! 			Buffer *vmbuffer, HeapUpdateFailureData *hufd, LockTupleMode *lockmode);
  extern HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
  				CommandId cid, LockTupleMode mode, bool nowait,
  				bool follow_update,
*** a/src/include/access/heapam_xlog.h
--- b/src/include/access/heapam_xlog.h
***************
*** 279,286 **** extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
  extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
  				TransactionId cutoff_xid, MultiXactId cutoff_multi,
  				OffsetNumber *offsets, int offcnt);
! extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer,
! 				 Buffer vm_buffer, TransactionId cutoff_xid);
  extern XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum,
  			BlockNumber blk, Page page);
  extern XLogRecPtr log_newpage_buffer(Buffer buffer);
--- 279,286 ----
  extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
  				TransactionId cutoff_xid, MultiXactId cutoff_multi,
  				OffsetNumber *offsets, int offcnt);
! extern XLogRecPtr log_heap_visible(RelFileNode rnode, BlockNumber block,
! 								   Buffer vm_buffer, TransactionId cutoff_xid);
  extern XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum,
  			BlockNumber blk, Page page);
  extern XLogRecPtr log_newpage_buffer(Buffer buffer);
*** a/src/include/access/relscan.h
--- b/src/include/access/relscan.h
***************
*** 44,49 **** typedef struct HeapScanDescData
--- 44,50 ----
  	HeapTupleData rs_ctup;		/* current tuple in scan, if any */
  	BlockNumber rs_cblock;		/* current block # in scan, if any */
  	Buffer		rs_cbuf;		/* current buffer in scan, if any */
+ 	Buffer		rs_cvmbuf;		/* current VM buffer in scan, if any */
  	/* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */
  	ItemPointerData rs_mctid;	/* marked scan position, if any */
  
*** a/src/include/access/visibilitymap.h
--- b/src/include/access/visibilitymap.h
***************
*** 24,30 **** extern void visibilitymap_clear(Relation rel, BlockNumber heapBlk,
  extern void visibilitymap_pin(Relation rel, BlockNumber heapBlk,
  				  Buffer *vmbuf);
  extern bool visibilitymap_pin_ok(BlockNumber heapBlk, Buffer vmbuf);
! extern void visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
  				  XLogRecPtr recptr, Buffer vmBuf, TransactionId cutoff_xid);
  extern bool visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *vmbuf);
  extern BlockNumber visibilitymap_count(Relation rel);
--- 24,30 ----
  extern void visibilitymap_pin(Relation rel, BlockNumber heapBlk,
  				  Buffer *vmbuf);
  extern bool visibilitymap_pin_ok(BlockNumber heapBlk, Buffer vmbuf);
! extern void visibilitymap_set(Relation rel, BlockNumber heapBlk,
  				  XLogRecPtr recptr, Buffer vmBuf, TransactionId cutoff_xid);
  extern bool visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *vmbuf);
  extern BlockNumber visibilitymap_count(Relation rel);
*** a/src/include/nodes/execnodes.h
--- b/src/include/nodes/execnodes.h
***************
*** 1073,1078 **** typedef struct ModifyTableState
--- 1073,1079 ----
  	PlanState	ps;				/* its first field is NodeTag */
  	CmdType		operation;		/* INSERT, UPDATE, or DELETE */
  	bool		canSetTag;		/* do we set the command tag/es_processed? */
+ 	Buffer		mt_vmbuffer;		/* current visibility map buffer */
  	bool		mt_done;		/* are we done? */
  	PlanState **mt_plans;		/* subplans (one per target rel) */
  	int			mt_nplans;		/* number of plans in the array */
*** a/src/include/storage/bufpage.h
--- b/src/include/storage/bufpage.h
***************
*** 177,183 **** typedef PageHeaderData *PageHeader;
  #define PD_PAGE_FULL		0x0002		/* not enough free space for new
  										 * tuple? */
  #define PD_ALL_VISIBLE		0x0004		/* all tuples on page are visible to
! 										 * everyone */
  
  #define PD_VALID_FLAG_BITS	0x0007		/* OR of all valid pd_flags bits */
  
--- 177,183 ----
  #define PD_PAGE_FULL		0x0002		/* not enough free space for new
  										 * tuple? */
  #define PD_ALL_VISIBLE		0x0004		/* all tuples on page are visible to
! 										 * everyone - DEPRECATED */
  
  #define PD_VALID_FLAG_BITS	0x0007		/* OR of all valid pd_flags bits */
  
***************
*** 362,369 **** typedef PageHeaderData *PageHeader;
  
  #define PageIsAllVisible(page) \
  	(((PageHeader) (page))->pd_flags & PD_ALL_VISIBLE)
- #define PageSetAllVisible(page) \
- 	(((PageHeader) (page))->pd_flags |= PD_ALL_VISIBLE)
  #define PageClearAllVisible(page) \
  	(((PageHeader) (page))->pd_flags &= ~PD_ALL_VISIBLE)
  
--- 362,367 ----
