*** src/backend/access/heap/Makefile
--- src/backend/access/heap/Makefile
***************
*** 12,17 **** subdir = src/backend/access/heap
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = heapam.o hio.o pruneheap.o rewriteheap.o syncscan.o tuptoaster.o
  
  include $(top_srcdir)/src/backend/common.mk
--- 12,17 ----
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = heapam.o hio.o pruneheap.o rewriteheap.o syncscan.o tuptoaster.o visibilitymap.o
  
  include $(top_srcdir)/src/backend/common.mk
*** src/backend/access/heap/heapam.c
--- src/backend/access/heap/heapam.c
***************
*** 47,52 ****
--- 47,53 ----
  #include "access/transam.h"
  #include "access/tuptoaster.h"
  #include "access/valid.h"
+ #include "access/visibilitymap.h"
  #include "access/xact.h"
  #include "access/xlogutils.h"
  #include "catalog/catalog.h"
***************
*** 195,200 **** heapgetpage(HeapScanDesc scan, BlockNumber page)
--- 196,202 ----
  	int			ntup;
  	OffsetNumber lineoff;
  	ItemId		lpp;
+ 	bool		all_visible;
  
  	Assert(page < scan->rs_nblocks);
  
***************
*** 233,252 **** heapgetpage(HeapScanDesc scan, BlockNumber page)
  	lines = PageGetMaxOffsetNumber(dp);
  	ntup = 0;
  
  	for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
  		 lineoff <= lines;
  		 lineoff++, lpp++)
  	{
  		if (ItemIdIsNormal(lpp))
  		{
- 			HeapTupleData loctup;
  			bool		valid;
  
! 			loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
! 			loctup.t_len = ItemIdGetLength(lpp);
! 			ItemPointerSet(&(loctup.t_self), page, lineoff);
  
! 			valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
  			if (valid)
  				scan->rs_vistuples[ntup++] = lineoff;
  		}
--- 235,266 ----
  	lines = PageGetMaxOffsetNumber(dp);
  	ntup = 0;
  
+ 	/*
+ 	 * If the all-visible flag indicates that all tuples on the page are
+ 	 * visible to everyone, we can skip the per-tuple visibility tests.
+ 	 */
+ 	all_visible = PageIsAllVisible(dp);
+ 
  	for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
  		 lineoff <= lines;
  		 lineoff++, lpp++)
  	{
  		if (ItemIdIsNormal(lpp))
  		{
  			bool		valid;
  
! 			if (all_visible)
! 				valid = true;
! 			else
! 			{
! 				HeapTupleData loctup;
! 
! 				loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
! 				loctup.t_len = ItemIdGetLength(lpp);
! 				ItemPointerSet(&(loctup.t_self), page, lineoff);
  
! 				valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
! 			}
  			if (valid)
  				scan->rs_vistuples[ntup++] = lineoff;
  		}
***************
*** 1860,1865 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 1874,1880 ----
  	TransactionId xid = GetCurrentTransactionId();
  	HeapTuple	heaptup;
  	Buffer		buffer;
+ 	bool		all_visible_cleared;
  
  	if (relation->rd_rel->relhasoids)
  	{
***************
*** 1920,1925 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 1935,1946 ----
  
  	RelationPutHeapTuple(relation, buffer, heaptup);
  
+ 	if (PageIsAllVisible(BufferGetPage(buffer)))
+ 	{
+ 		all_visible_cleared = true;
+ 		PageClearAllVisible(BufferGetPage(buffer));
+ 	}
+ 
  	/*
  	 * XXX Should we set PageSetPrunable on this page ?
  	 *
***************
*** 1943,1948 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 1964,1970 ----
  		Page		page = BufferGetPage(buffer);
  		uint8		info = XLOG_HEAP_INSERT;
  
+ 		xlrec.all_visible_cleared = all_visible_cleared;
  		xlrec.target.node = relation->rd_node;
  		xlrec.target.tid = heaptup->t_self;
  		rdata[0].data = (char *) &xlrec;
***************
*** 1994,1999 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 2016,2026 ----
  
  	UnlockReleaseBuffer(buffer);
  
+ 	/* Clear the bit in the visibility map if necessary */
+ 	if (all_visible_cleared)
+ 		visibilitymap_clear(relation, 
+ 							ItemPointerGetBlockNumber(&(heaptup->t_self)));
+ 
  	/*
  	 * If tuple is cachable, mark it for invalidation from the caches in case
  	 * we abort.  Note it is OK to do this after releasing the buffer, because
***************
*** 2070,2075 **** heap_delete(Relation relation, ItemPointer tid,
--- 2097,2103 ----
  	Buffer		buffer;
  	bool		have_tuple_lock = false;
  	bool		iscombo;
+ 	bool		all_visible_cleared = false;
  
  	Assert(ItemPointerIsValid(tid));
  
***************
*** 2216,2221 **** l1:
--- 2244,2255 ----
  	 */
  	PageSetPrunable(page, xid);
  
+ 	if (PageIsAllVisible(page))
+ 	{
+ 		all_visible_cleared = true;
+ 		PageClearAllVisible(page);
+ 	}
+ 
  	/* store transaction information of xact deleting the tuple */
  	tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
  							   HEAP_XMAX_INVALID |
***************
*** 2237,2242 **** l1:
--- 2271,2277 ----
  		XLogRecPtr	recptr;
  		XLogRecData rdata[2];
  
+ 		xlrec.all_visible_cleared = all_visible_cleared;
  		xlrec.target.node = relation->rd_node;
  		xlrec.target.tid = tp.t_self;
  		rdata[0].data = (char *) &xlrec;
***************
*** 2281,2286 **** l1:
--- 2316,2325 ----
  	 */
  	CacheInvalidateHeapTuple(relation, &tp);
  
+ 	/* Clear the bit in the visibility map if necessary */
+ 	if (all_visible_cleared)
+ 		visibilitymap_clear(relation, BufferGetBlockNumber(buffer));
+ 
  	/* Now we can release the buffer */
  	ReleaseBuffer(buffer);
  
***************
*** 2388,2393 **** heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
--- 2427,2434 ----
  	bool		have_tuple_lock = false;
  	bool		iscombo;
  	bool		use_hot_update = false;
+ 	bool		all_visible_cleared = false;
+ 	bool		all_visible_cleared_new = false;
  
  	Assert(ItemPointerIsValid(otid));
  
***************
*** 2763,2768 **** l2:
--- 2804,2815 ----
  		MarkBufferDirty(newbuf);
  	MarkBufferDirty(buffer);
  
+ 	/*
+ 	 * Note: we mustn't clear PD_ALL_VISIBLE flags before calling writing
+ 	 * the WAL record, because log_heap_update looks at those flags and sets
+ 	 * the corresponding flags in the WAL record.
+ 	 */
+ 
  	/* XLOG stuff */
  	if (!relation->rd_istemp)
  	{
***************
*** 2778,2783 **** l2:
--- 2825,2842 ----
  		PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
  	}
  
+ 	/* Clear PD_ALL_VISIBLE flags */
+ 	if (PageIsAllVisible(BufferGetPage(buffer)))
+ 	{
+ 		all_visible_cleared = true;
+ 		PageClearAllVisible(BufferGetPage(buffer));
+ 	}
+ 	if (newbuf != buffer && PageIsAllVisible(BufferGetPage(newbuf)))
+ 	{
+ 		all_visible_cleared_new = true;
+ 		PageClearAllVisible(BufferGetPage(newbuf));
+ 	}
+ 
  	END_CRIT_SECTION();
  
  	if (newbuf != buffer)
***************
*** 2791,2796 **** l2:
--- 2850,2861 ----
  	 */
  	CacheInvalidateHeapTuple(relation, &oldtup);
  
+ 	/* Clear bits in visibility map */
+ 	if (all_visible_cleared)
+ 		visibilitymap_clear(relation, BufferGetBlockNumber(buffer));
+ 	if (all_visible_cleared_new)
+ 		visibilitymap_clear(relation, BufferGetBlockNumber(newbuf));
+ 
  	/* Now we can release the buffer(s) */
  	if (newbuf != buffer)
  		ReleaseBuffer(newbuf);
***************
*** 3412,3417 **** l3:
--- 3477,3487 ----
  	LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
  
  	/*
+ 	 * Don't update the visibility map here. Locking a tuple doesn't
+ 	 * change visibility info.
+ 	 */
+ 
+ 	/*
  	 * Now that we have successfully marked the tuple as locked, we can
  	 * release the lmgr tuple lock, if we had it.
  	 */
***************
*** 3916,3922 **** log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
--- 3986,3994 ----
  
  	xlrec.target.node = reln->rd_node;
  	xlrec.target.tid = from;
+ 	xlrec.all_visible_cleared = PageIsAllVisible(BufferGetPage(oldbuf));
  	xlrec.newtid = newtup->t_self;
+ 	xlrec.new_all_visible_cleared = PageIsAllVisible(BufferGetPage(newbuf));
  
  	rdata[0].data = (char *) &xlrec;
  	rdata[0].len = SizeOfHeapUpdate;
***************
*** 4186,4191 **** heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
--- 4258,4274 ----
  	ItemId		lp = NULL;
  	HeapTupleHeader htup;
  
+ 	/*
+ 	 * The visibility map always needs to be updated, even if the heap page
+ 	 * is already up-to-date.
+ 	 */
+ 	if (xlrec->all_visible_cleared)
+ 	{
+ 		Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ 		visibilitymap_clear(reln, ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+ 		FreeFakeRelcacheEntry(reln);
+ 	}
+ 
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
***************
*** 4223,4228 **** heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
--- 4306,4314 ----
  	/* Mark the page as a candidate for pruning */
  	PageSetPrunable(page, record->xl_xid);
  
+ 	if (xlrec->all_visible_cleared)
+ 		PageClearAllVisible(page);
+ 
  	/* Make sure there is no forward chain link in t_ctid */
  	htup->t_ctid = xlrec->target.tid;
  	PageSetLSN(page, lsn);
***************
*** 4249,4254 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
--- 4335,4351 ----
  	Size		freespace;
  	BlockNumber	blkno;
  
+ 	/*
+ 	 * The visibility map always needs to be updated, even if the heap page
+ 	 * is already up-to-date.
+ 	 */
+ 	if (xlrec->all_visible_cleared)
+ 	{
+ 		Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ 		visibilitymap_clear(reln, ItemPointerGetBlockNumber(&xlrec->target.tid));
+ 		FreeFakeRelcacheEntry(reln);
+ 	}
+ 
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
***************
*** 4307,4312 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
--- 4404,4413 ----
  
  	PageSetLSN(page, lsn);
  	PageSetTLI(page, ThisTimeLineID);
+ 
+ 	if (xlrec->all_visible_cleared)
+ 		PageClearAllVisible(page);
+ 
  	MarkBufferDirty(buffer);
  	UnlockReleaseBuffer(buffer);
  
***************
*** 4347,4352 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
--- 4448,4464 ----
  	uint32		newlen;
  	Size		freespace;
  
+ 	/*
+ 	 * The visibility map always needs to be updated, even if the heap page
+ 	 * is already up-to-date.
+ 	 */
+ 	if (xlrec->all_visible_cleared)
+ 	{
+ 		Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ 		visibilitymap_clear(reln, ItemPointerGetBlockNumber(&xlrec->target.tid));
+ 		FreeFakeRelcacheEntry(reln);
+ 	}
+ 
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  	{
  		if (samepage)
***************
*** 4411,4416 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
--- 4523,4531 ----
  	/* Mark the page as a candidate for pruning */
  	PageSetPrunable(page, record->xl_xid);
  
+ 	if (xlrec->all_visible_cleared)
+ 		PageClearAllVisible(page);
+ 
  	/*
  	 * this test is ugly, but necessary to avoid thinking that insert change
  	 * is already applied
***************
*** 4426,4431 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
--- 4541,4557 ----
  
  newt:;
  
+ 	/*
+ 	 * The visibility map always needs to be updated, even if the heap page
+ 	 * is already up-to-date.
+ 	 */
+ 	if (xlrec->new_all_visible_cleared)
+ 	{
+ 		Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ 		visibilitymap_clear(reln, ItemPointerGetBlockNumber(&xlrec->newtid));
+ 		FreeFakeRelcacheEntry(reln);
+ 	}
+ 
  	if (record->xl_info & XLR_BKP_BLOCK_2)
  		return;
  
***************
*** 4504,4509 **** newsame:;
--- 4630,4638 ----
  	if (offnum == InvalidOffsetNumber)
  		elog(PANIC, "heap_update_redo: failed to add tuple");
  
+ 	if (xlrec->new_all_visible_cleared)
+ 		PageClearAllVisible(page);
+ 
  	freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
  
  	PageSetLSN(page, lsn);
*** /dev/null
--- src/backend/access/heap/visibilitymap.c
***************
*** 0 ****
--- 1,390 ----
+ /*-------------------------------------------------------------------------
+  *
+  * visibilitymap.c
+  *	  bitmap for tracking visibility of heap tuples
+  *
+  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *
+  * IDENTIFICATION
+  *	  $PostgreSQL$
+  *
+  * NOTES
+  *
+  * The visibility map is a bitmap with one bit per heap page. A set bit means
+  * that all tuples on the page are visible to all transactions, and doesn't
+  * therefore need to be vacuumed.
+  *
+  * The map is conservative in the sense that we make sure that whenever a bit
+  * is set, we know the condition is true, but if a bit is not set, it might
+  * or might not be.
+  *
+  * There's no explicit WAL logging in the functions in this file. The callers
+  * must make sure that whenever a bit is cleared, the bit is cleared on WAL
+  * replay of the updating operation as well. Setting bits during recovery
+  * isn't necessary for correctness.
+  *
+  * LOCKING
+  *
+  * In heapam.c, whenever a page is modified so that not all tuples on the
+  * page are visible to everyone anymore, the corresponding bit in the
+  * visibility map is cleared. The bit in the visibility map is cleared
+  * after releasing the lock on the heap page, to avoid holding the lock
+  * over possible I/O to read in the visibility map page.
+  *
+  * To set a bit, you need to hold a lock on the heap page. That prevents
+  * the race condition where VACUUM sees that all tuples on the page are
+  * visible to everyone, but another backend modifies the page before VACUUM
+  * sets the bit in the visibility map.
+  *
+  * When a bit is set, we need to update the LSN of the page to make sure that
+  * the visibility map update doesn't get written to disk before the WAL record
+  * of the changes that made it possible to set the bit is flushed. But when a
+  * bit is cleared, we don't have to do that because it's always OK to clear
+  * a bit in the map from correctness point of view.
+  *
+  * TODO
+  *
+  * It would be nice to use the visibility map to skip visibility checkes in
+  * index scans.
+  *
+  * Currently, the visibility map is not 100% correct all the time.
+  * During updates, the bit in the visibility map is cleared after releasing
+  * the lock on the heap page. During the window after releasing the lock
+  * and clearing the bit in the visibility map, the bit in the visibility map
+  * is set, but the new insertion or deletion is not yet visible to other
+  * backends.
+  *
+  * That might actually be OK for the index scans, though. The newly inserted
+  * tuple wouldn't have an index pointer yet, so all tuples reachable from an
+  * index would still be visible to all other backends, and deletions wouldn't
+  * be visible to other backends yet.
+  *
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include "access/visibilitymap.h"
+ #include "storage/bufmgr.h"
+ #include "storage/bufpage.h"
+ #include "storage/lmgr.h"
+ #include "storage/smgr.h"
+ 
+ /*#define TRACE_VISIBILITYMAP */
+ 
+ /* Number of bits allocated for each heap block. */
+ #define BITS_PER_HEAPBLOCK 1
+ 
+ /* Number of heap blocks we can represent in one byte. */
+ #define HEAPBLOCKS_PER_BYTE 8
+ 
+ /* Number of heap blocks we can represent in one visibility map page */
+ #define HEAPBLOCKS_PER_PAGE ((BLCKSZ - SizeOfPageHeaderData) * HEAPBLOCKS_PER_BYTE )
+ 
+ /* Mapping from heap block number to the right bit in the visibility map */
+ #define HEAPBLK_TO_MAPBLOCK(x) ((x) / HEAPBLOCKS_PER_PAGE)
+ #define HEAPBLK_TO_MAPBYTE(x) (((x) % HEAPBLOCKS_PER_PAGE) / HEAPBLOCKS_PER_BYTE)
+ #define HEAPBLK_TO_MAPBIT(x) ((x) % HEAPBLOCKS_PER_BYTE)
+ 
+ static Buffer vm_readbuf(Relation rel, BlockNumber blkno, bool extend);
+ static void vm_extend(Relation rel, BlockNumber nvmblocks, bool createstorage);
+ 
+ /*
+  * Read a visibility map page.
+  *
+  * If the page doesn't exist, InvalidBuffer is returned, or if 'extend' is
+  * true, the visibility map file is extended.
+  */
+ static Buffer
+ vm_readbuf(Relation rel, BlockNumber blkno, bool extend)
+ {
+ 	Buffer buf;
+ 
+ 	RelationOpenSmgr(rel);
+ 
+ 	if (rel->rd_vm_nblocks_cache == InvalidBlockNumber ||
+ 		rel->rd_vm_nblocks_cache <= blkno)
+ 	{
+ 		if (!smgrexists(rel->rd_smgr, VISIBILITYMAP_FORKNUM))
+ 			vm_extend(rel, blkno + 1, true);
+ 		else
+ 			rel->rd_vm_nblocks_cache = smgrnblocks(rel->rd_smgr,
+ 												   VISIBILITYMAP_FORKNUM);
+ 	}
+ 
+ 	if (blkno >= rel->rd_vm_nblocks_cache)
+ 	{
+ 		if (extend)
+ 			vm_extend(rel, blkno + 1, false);
+ 		else
+ 			return InvalidBuffer;
+ 	}
+ 
+ 	/*
+ 	 * Use ZERO_ON_ERROR mode, and initialize the page if necessary. XXX The
+ 	 * information is not accurate anyway, so it's better to clear corrupt
+ 	 * pages than error out. Since the FSM changes are not WAL-logged, the
+ 	 * so-called torn page problem on crash can lead to pages with corrupt
+ 	 * headers, for example.
+ 	 */
+ 	buf = ReadBufferExtended(rel, VISIBILITYMAP_FORKNUM, blkno,
+ 							 RBM_ZERO_ON_ERROR, NULL);
+ 	if (PageIsNew(BufferGetPage(buf)))
+ 		PageInit(BufferGetPage(buf), BLCKSZ, 0);
+ 	return buf;
+ }
+ 
+ /*
+  * Ensure that the visibility map fork is at least n_vmblocks long, extending
+  * it if necessary with empty pages. And by empty, I mean pages filled
+  * with zeros, meaning there's no free space. If createstorage is true,
+  * the physical file might need to be created first.
+  */
+ static void
+ vm_extend(Relation rel, BlockNumber n_vmblocks, bool createstorage)
+ {
+ 	BlockNumber n_vmblocks_now;
+ 	Page pg;
+ 
+ 	pg = (Page) palloc(BLCKSZ);
+ 	PageInit(pg, BLCKSZ, 0);
+ 
+ 	/*
+ 	 * We use the relation extension lock to lock out other backends
+ 	 * trying to extend the visibility map at the same time. It also locks out
+ 	 * extension of the main fork, unnecessarily, but extending the
+ 	 * visibility map happens seldom enough that it doesn't seem worthwhile to
+ 	 * have a separate lock tag type for it.
+ 	 *
+ 	 * Note that another backend might have extended or created the
+ 	 * relation before we get the lock.
+ 	 */
+ 	LockRelationForExtension(rel, ExclusiveLock);
+ 
+ 	/* Create the file first if it doesn't exist */
+ 	if (createstorage && !smgrexists(rel->rd_smgr, VISIBILITYMAP_FORKNUM))
+ 	{
+ 		smgrcreate(rel->rd_smgr, VISIBILITYMAP_FORKNUM, false);
+ 		n_vmblocks_now = 0;
+ 	}
+ 	else
+ 		n_vmblocks_now = smgrnblocks(rel->rd_smgr, VISIBILITYMAP_FORKNUM);
+ 
+ 	while (n_vmblocks_now < n_vmblocks)
+ 	{
+ 		smgrextend(rel->rd_smgr, VISIBILITYMAP_FORKNUM, n_vmblocks_now,
+ 				   (char *) pg, rel->rd_istemp);
+ 		n_vmblocks_now++;
+ 	}
+ 
+ 	UnlockRelationForExtension(rel, ExclusiveLock);
+ 
+ 	pfree(pg);
+ 
+ 	/* update the cache with the up-to-date size */
+ 	rel->rd_vm_nblocks_cache = n_vmblocks_now;
+ }
+ 
+ void
+ visibilitymap_truncate(Relation rel, BlockNumber nheapblocks)
+ {
+ 	BlockNumber truncBlock = HEAPBLK_TO_MAPBLOCK(nheapblocks);
+ 	uint32		truncByte  = HEAPBLK_TO_MAPBYTE(nheapblocks);
+ 	uint8		truncBit   = HEAPBLK_TO_MAPBIT(nheapblocks);
+ 	BlockNumber newnblocks;
+ 
+ #ifdef TRACE_VISIBILITYMAP
+ 	elog(DEBUG1, "vm_truncate %s %d", RelationGetRelationName(rel), nheapblocks);
+ #endif
+ 
+ 	/*
+ 	 * If no visibility map has been created yet for this relation, there's
+ 	 * nothing to truncate.
+ 	 */
+ 	if (!smgrexists(rel->rd_smgr, VISIBILITYMAP_FORKNUM))
+ 		return;
+ 
+ 	/* Truncate away pages that are no longer needed */
+ 	if (truncByte == 0 && truncBit == 0)
+ 		newnblocks = truncBlock;
+ 	else
+ 	{
+ 		Buffer mapBuffer;
+ 		Page page;
+ 		char *mappage;
+ 		int len;
+ 
+ 		newnblocks = truncBlock + 1;
+ 
+ 		/*
+ 		 * Clear all bits in the last map page, that represent the truncated
+ 		 * heap blocks. This is not only tidy, but also necessary because
+ 		 * we don't clear the bits on extension.
+ 		 */
+ 		mapBuffer = vm_readbuf(rel, truncBlock, false);
+ 		if (BufferIsValid(mapBuffer))
+ 		{
+ 			page = BufferGetPage(mapBuffer);
+ 			mappage = PageGetContents(page);
+ 
+ 			LockBuffer(mapBuffer, BUFFER_LOCK_EXCLUSIVE);
+ 
+ 			/*
+ 			 * Clear out the unwanted bytes.
+ 			 */
+ 			len = HEAPBLOCKS_PER_PAGE/HEAPBLOCKS_PER_BYTE - (truncByte + 1);
+ 			MemSet(&mappage[truncByte + 1], 0, len);
+ 
+ 			/*
+ 			 * Mask out the unwanted bits of the last remaining byte
+ 			 *
+ 			 * ((1 << 0) - 1) = 00000000
+ 			 * ((1 << 1) - 1) = 00000001
+ 			 * ...
+ 			 * ((1 << 6) - 1) = 00111111
+ 			 * ((1 << 7) - 1) = 01111111
+ 			 */
+ 			mappage[truncByte] &= (1 << truncBit) - 1;
+ 
+ 			/*
+ 			 * This needs to be WAL-logged. Although the now unused shouldn't
+ 			 * be accessed anymore, they better be zero if we extend again.
+ 			 */
+ 
+ 			MarkBufferDirty(mapBuffer);
+ 			UnlockReleaseBuffer(mapBuffer);
+ 		}
+ 	}
+ 
+ 	if (smgrnblocks(rel->rd_smgr, VISIBILITYMAP_FORKNUM) > newnblocks)
+ 		smgrtruncate(rel->rd_smgr, VISIBILITYMAP_FORKNUM, newnblocks,
+ 					 rel->rd_istemp);
+ }
+ 
+ /*
+  * Marks that all tuples on a heap page are visible to all.
+  *
+  * recptr is the LSN of the heap page. The LSN of the visibility map
+  * page is advanced to that, to make sure that the visibility map doesn't
+  * get flushed to disk before update to the heap page that made all tuples
+  * visible.
+  *
+  * *buf is a buffer previously returned by visibilitymap_test(). This is
+  * an opportunistic function; if *buf doesn't contain the bit for heapBlk,
+  * we do nothing. We don't want to do any I/O here, because the caller is
+  * holding a cleanup lock on the heap page.
+  */
+ void
+ visibilitymap_set(Relation rel, BlockNumber heapBlk, XLogRecPtr recptr,
+ 				  Buffer *buf)
+ {
+ 	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+ 	uint32		mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
+ 	uint8		mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
+ 	Page		page;
+ 	char	   *mappage;
+ 
+ #ifdef TRACE_VISIBILITYMAP
+ 	elog(DEBUG1, "vm_set %s %d", RelationGetRelationName(rel), heapBlk);
+ #endif
+ 
+ 	if (!BufferIsValid(*buf) || BufferGetBlockNumber(*buf) != mapBlock)
+ 		return;
+ 
+ 	page = BufferGetPage(*buf);
+ 	mappage = PageGetContents(page);
+ 	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
+ 
+ 	if (!(mappage[mapByte] & (1 << mapBit)))
+ 	{
+ 		mappage[mapByte] |= (1 << mapBit);
+ 
+ 		if (XLByteLT(PageGetLSN(page), recptr))
+ 			PageSetLSN(page, recptr);
+ 		PageSetTLI(page, ThisTimeLineID);
+ 		MarkBufferDirty(*buf);
+ 	}
+ 
+ 	LockBuffer(*buf, BUFFER_LOCK_UNLOCK);
+ }
+ 
+ /*
+  * Are all tuples on heap page visible to all?
+  *
+  * The page containing the bit for the heap block is (kept) pinned,
+  * and *buf is set to that buffer. If *buf is valid on entry, it should
+  * be a buffer previously returned by this function, for the same relation,
+  * and unless the new heap block is on the same page, it is released. On the
+  * first call, InvalidBuffer should be passed, and when the caller doesn't
+  * want to test any more pages, it should release *buf if it's valid.
+  */
+ bool
+ visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *buf)
+ {
+ 	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+ 	uint32		mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
+ 	uint8		mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
+ 	bool		val;
+ 	char	   *mappage;
+ 
+ #ifdef TRACE_VISIBILITYMAP
+ 	elog(DEBUG1, "vm_test %s %d", RelationGetRelationName(rel), heapBlk);
+ #endif
+ 
+ 	if (BufferIsValid(*buf))
+ 	{
+ 		if (BufferGetBlockNumber(*buf) == heapBlk)
+ 			return *buf;
+ 		else
+ 			ReleaseBuffer(*buf);
+ 	}
+ 
+ 	*buf = vm_readbuf(rel, mapBlock, true);
+ 	if (!BufferIsValid(*buf))
+ 		return false;
+ 
+ 	mappage = PageGetContents(BufferGetPage(*buf));
+ 
+ 	/*
+ 	 * We don't need to lock the page, as we're only looking at a single bit.
+ 	 */
+ 	val = (mappage[mapByte] & (1 << mapBit)) ? true : false;
+ 
+ 	return val;
+ }
+ 
+ /*
+  * Mark that not all tuples are visible to all.
+  */
+ void
+ visibilitymap_clear(Relation rel, BlockNumber heapBlk)
+ {
+ 	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+ 	int			mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
+ 	int			mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
+ 	uint8		mask = 1 << mapBit;
+ 	Buffer		mapBuffer;
+ 	char	   *mappage;
+ 
+ #ifdef TRACE_VISIBILITYMAP
+ 	elog(DEBUG1, "vm_clear %s %d", RelationGetRelationName(rel), heapBlk);
+ #endif
+ 
+ 	mapBuffer = vm_readbuf(rel, mapBlock, false);
+ 	if (!BufferIsValid(mapBuffer))
+ 		return; /* nothing to do */
+ 
+ 	LockBuffer(mapBuffer, BUFFER_LOCK_EXCLUSIVE);
+ 	mappage = PageGetContents(BufferGetPage(mapBuffer));
+ 
+ 	if (mappage[mapByte] & mask)
+ 	{
+ 		mappage[mapByte] &= ~mask;
+ 
+ 		MarkBufferDirty(mapBuffer);
+ 	}
+ 
+ 	UnlockReleaseBuffer(mapBuffer);
+ }
*** src/backend/access/transam/xlogutils.c
--- src/backend/access/transam/xlogutils.c
***************
*** 377,382 **** CreateFakeRelcacheEntry(RelFileNode rnode)
--- 377,383 ----
  
  	rel->rd_targblock = InvalidBlockNumber;
  	rel->rd_fsm_nblocks_cache = InvalidBlockNumber;
+ 	rel->rd_vm_nblocks_cache = InvalidBlockNumber;
  	rel->rd_smgr = NULL;
  
  	return rel;
*** src/backend/catalog/catalog.c
--- src/backend/catalog/catalog.c
***************
*** 54,60 ****
   */
  const char *forkNames[] = {
  	"main", /* MAIN_FORKNUM */
! 	"fsm"   /* FSM_FORKNUM */
  };
  
  /*
--- 54,61 ----
   */
  const char *forkNames[] = {
  	"main", /* MAIN_FORKNUM */
! 	"fsm",   /* FSM_FORKNUM */
! 	"vm"   /* VISIBILITYMAP_FORKNUM */
  };
  
  /*
*** src/backend/catalog/heap.c
--- src/backend/catalog/heap.c
***************
*** 33,38 ****
--- 33,39 ----
  #include "access/heapam.h"
  #include "access/sysattr.h"
  #include "access/transam.h"
+ #include "access/visibilitymap.h"
  #include "access/xact.h"
  #include "catalog/catalog.h"
  #include "catalog/dependency.h"
*** src/backend/catalog/storage.c
--- src/backend/catalog/storage.c
***************
*** 19,24 ****
--- 19,25 ----
  
  #include "postgres.h"
  
+ #include "access/visibilitymap.h"
  #include "access/xact.h"
  #include "access/xlogutils.h"
  #include "catalog/catalog.h"
***************
*** 175,180 **** void
--- 176,182 ----
  RelationTruncate(Relation rel, BlockNumber nblocks)
  {
  	bool fsm;
+ 	bool vm;
  
  	/* Open it at the smgr level if not already done */
  	RelationOpenSmgr(rel);
***************
*** 187,192 **** RelationTruncate(Relation rel, BlockNumber nblocks)
--- 189,199 ----
  	if (fsm)
  		FreeSpaceMapTruncateRel(rel, nblocks);
  
+ 	/* Truncate the visibility map too if it exists. */
+ 	vm = smgrexists(rel->rd_smgr, VISIBILITYMAP_FORKNUM);
+ 	if (vm)
+ 		visibilitymap_truncate(rel, nblocks);
+ 
  	/*
  	 * We WAL-log the truncation before actually truncating, which
  	 * means trouble if the truncation fails. If we then crash, the WAL
***************
*** 222,228 **** RelationTruncate(Relation rel, BlockNumber nblocks)
  		 * left with a truncated heap, but the FSM would still contain
  		 * entries for the non-existent heap pages.
  		 */
! 		if (fsm)
  			XLogFlush(lsn);
  	}
  
--- 229,235 ----
  		 * left with a truncated heap, but the FSM would still contain
  		 * entries for the non-existent heap pages.
  		 */
! 		if (fsm || vm)
  			XLogFlush(lsn);
  	}
  
*** src/backend/commands/vacuum.c
--- src/backend/commands/vacuum.c
***************
*** 26,31 ****
--- 26,32 ----
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/transam.h"
+ #include "access/visibilitymap.h"
  #include "access/xact.h"
  #include "access/xlog.h"
  #include "catalog/namespace.h"
***************
*** 2902,2907 **** move_chain_tuple(Relation rel,
--- 2903,2914 ----
  	Size		tuple_len = old_tup->t_len;
  
  	/*
+ 	 * Clear the bits in the visibility map.
+ 	 */
+ 	visibilitymap_clear(rel, BufferGetBlockNumber(old_buf));
+ 	visibilitymap_clear(rel, BufferGetBlockNumber(dst_buf));
+ 
+ 	/*
  	 * make a modifiable copy of the source tuple.
  	 */
  	heap_copytuple_with_tuple(old_tup, &newtup);
***************
*** 3005,3010 **** move_chain_tuple(Relation rel,
--- 3012,3021 ----
  
  	END_CRIT_SECTION();
  
+ 	PageClearAllVisible(BufferGetPage(old_buf));
+ 	if (dst_buf != old_buf)
+ 		PageClearAllVisible(BufferGetPage(dst_buf));
+ 
  	LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
  	if (dst_buf != old_buf)
  		LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
***************
*** 3107,3112 **** move_plain_tuple(Relation rel,
--- 3118,3140 ----
  
  	END_CRIT_SECTION();
  
+ 	/*
+ 	 * Clear the visible-to-all hint bits on the page, and bits in the
+ 	 * visibility map. Normally we'd release the locks on the heap pages
+ 	 * before updating the visibility map, but doesn't really matter here
+ 	 * because we're holding an AccessExclusiveLock on the relation anyway.
+ 	 */
+ 	if (PageIsAllVisible(dst_page))
+ 	{
+ 		PageClearAllVisible(dst_page);
+ 		visibilitymap_clear(rel, BufferGetBlockNumber(dst_buf));
+ 	}
+ 	if (PageIsAllVisible(old_page))
+ 	{
+ 		PageClearAllVisible(old_page);
+ 		visibilitymap_clear(rel, BufferGetBlockNumber(old_buf));
+ 	}
+ 
  	dst_vacpage->free = PageGetFreeSpaceWithFillFactor(rel, dst_page);
  	LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
  	LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
*** src/backend/commands/vacuumlazy.c
--- src/backend/commands/vacuumlazy.c
***************
*** 40,45 ****
--- 40,46 ----
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/transam.h"
+ #include "access/visibilitymap.h"
  #include "catalog/storage.h"
  #include "commands/dbcommands.h"
  #include "commands/vacuum.h"
***************
*** 88,93 **** typedef struct LVRelStats
--- 89,95 ----
  	int			max_dead_tuples;	/* # slots allocated in array */
  	ItemPointer dead_tuples;	/* array of ItemPointerData */
  	int			num_index_scans;
+ 	bool		scanned_all;	/* have we scanned all pages (this far) in the rel? */
  } LVRelStats;
  
  
***************
*** 102,108 **** static BufferAccessStrategy vac_strategy;
  
  /* non-export function prototypes */
  static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes);
  static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
  static void lazy_vacuum_index(Relation indrel,
  				  IndexBulkDeleteResult **stats,
--- 104,110 ----
  
  /* non-export function prototypes */
  static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes, bool scan_all);
  static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
  static void lazy_vacuum_index(Relation indrel,
  				  IndexBulkDeleteResult **stats,
***************
*** 141,146 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
--- 143,149 ----
  	BlockNumber possibly_freeable;
  	PGRUsage	ru0;
  	TimestampTz starttime = 0;
+ 	bool		scan_all;
  
  	pg_rusage_init(&ru0);
  
***************
*** 166,173 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
  	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
  	vacrelstats->hasindex = (nindexes > 0);
  
  	/* Do the vacuuming */
! 	lazy_scan_heap(onerel, vacrelstats, Irel, nindexes);
  
  	/* Done with indexes */
  	vac_close_indexes(nindexes, Irel, NoLock);
--- 169,185 ----
  	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
  	vacrelstats->hasindex = (nindexes > 0);
  
+ 	/* Should we use the visibility map or scan all pages? */
+ 	if (vacstmt->freeze_min_age != -1)
+ 		scan_all = true;
+ 	else
+ 		scan_all = false;
+  
+ 	/* initialize this variable */
+ 	vacrelstats->scanned_all = true;
+  
  	/* Do the vacuuming */
! 	lazy_scan_heap(onerel, vacrelstats, Irel, nindexes, scan_all);
  
  	/* Done with indexes */
  	vac_close_indexes(nindexes, Irel, NoLock);
***************
*** 189,195 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
  	/* Update statistics in pg_class */
  	vac_update_relstats(onerel,
  						vacrelstats->rel_pages, vacrelstats->rel_tuples,
! 						vacrelstats->hasindex, FreezeLimit);
  
  	/* report results to the stats collector, too */
  	pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
--- 201,208 ----
  	/* Update statistics in pg_class */
  	vac_update_relstats(onerel,
  						vacrelstats->rel_pages, vacrelstats->rel_tuples,
! 						vacrelstats->hasindex,
! 						vacrelstats->scanned_all ? FreezeLimit : InvalidOid);
  
  	/* report results to the stats collector, too */
  	pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
***************
*** 230,236 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
   */
  static void
  lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes)
  {
  	BlockNumber nblocks,
  				blkno;
--- 243,249 ----
   */
  static void
  lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes, bool scan_all)
  {
  	BlockNumber nblocks,
  				blkno;
***************
*** 245,250 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 258,264 ----
  	IndexBulkDeleteResult **indstats;
  	int			i;
  	PGRUsage	ru0;
+ 	Buffer		vmbuffer = InvalidBuffer;
  
  	pg_rusage_init(&ru0);
  
***************
*** 278,283 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 292,315 ----
  		OffsetNumber frozen[MaxOffsetNumber];
  		int			nfrozen;
  		Size		freespace;
+ 		bool		all_visible_according_to_vm;
+ 		bool		all_visible;
+ 
+ 		/*
+ 		 * If all tuples on page are visible to all, there's no
+ 		 * need to visit that page.
+ 		 *
+ 		 * Note that we test the visibility map even if we're scanning all
+ 		 * pages, to pin the visibility map page. We might set the bit there,
+ 		 * and we don't want to do the I/O while we're holding the heap page
+ 		 * locked.
+ 		 */
+ 		all_visible_according_to_vm = visibilitymap_test(onerel, blkno, &vmbuffer);
+ 		if (!scan_all && all_visible_according_to_vm)
+ 		{
+ 			vacrelstats->scanned_all = false;
+ 			continue;
+ 		}
  
  		vacuum_delay_point();
  
***************
*** 354,359 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 386,398 ----
  		{
  			empty_pages++;
  			freespace = PageGetHeapFreeSpace(page);
+ 
+ 			PageSetAllVisible(page);
+ 			/* Update the visibility map */
+ 			if (!all_visible_according_to_vm)
+ 				visibilitymap_set(onerel, blkno, PageGetLSN(page),
+ 								  &vmbuffer);
+ 
  			UnlockReleaseBuffer(buf);
  			RecordPageWithFreeSpace(onerel, blkno, freespace);
  			continue;
***************
*** 371,376 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 410,416 ----
  		 * Now scan the page to collect vacuumable items and check for tuples
  		 * requiring freezing.
  		 */
+ 		all_visible = true;
  		nfrozen = 0;
  		hastup = false;
  		prev_dead_count = vacrelstats->num_dead_tuples;
***************
*** 408,413 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 448,454 ----
  			if (ItemIdIsDead(itemid))
  			{
  				lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+ 				all_visible = false;
  				continue;
  			}
  
***************
*** 442,447 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 483,489 ----
  						nkeep += 1;
  					else
  						tupgone = true; /* we can delete the tuple */
+ 					all_visible = false;
  					break;
  				case HEAPTUPLE_LIVE:
  					/* Tuple is good --- but let's do some validity checks */
***************
*** 449,454 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 491,525 ----
  						!OidIsValid(HeapTupleGetOid(&tuple)))
  						elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
  							 relname, blkno, offnum);
+ 
+ 					/*
+ 					 * Definitely visible to all? Note that SetHintBits handles
+ 					 * async commit correctly
+ 					 */
+ 					if (all_visible)
+ 					{
+ 						/*
+ 						 * Is it visible to all transactions? It's important
+ 						 * that we look at the hint bit here. Only if a hint
+ 						 * bit is set, we can be sure that the tuple is indeed
+ 						 * live, even if asynchronous_commit is true and we
+ 						 * crash later
+ 						 */
+ 						if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
+ 						{
+ 							all_visible = false;
+ 							break;
+ 						}
+ 						/*
+ 						 * The inserter definitely committed. But is it
+ 						 * old enough that everyone sees it as committed?
+ 						 */
+ 						if (!TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data), OldestXmin))
+ 						{
+ 							all_visible = false;
+ 							break;
+ 						}
+ 					}
  					break;
  				case HEAPTUPLE_RECENTLY_DEAD:
  
***************
*** 457,468 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 528,542 ----
  					 * from relation.
  					 */
  					nkeep += 1;
+ 					all_visible = false;
  					break;
  				case HEAPTUPLE_INSERT_IN_PROGRESS:
  					/* This is an expected case during concurrent vacuum */
+ 					all_visible = false;
  					break;
  				case HEAPTUPLE_DELETE_IN_PROGRESS:
  					/* This is an expected case during concurrent vacuum */
+ 					all_visible = false;
  					break;
  				default:
  					elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
***************
*** 525,530 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 599,621 ----
  
  		freespace = PageGetHeapFreeSpace(page);
  
+ 		/* Update the all-visible flag on the page */
+ 		if (!PageIsAllVisible(page) && all_visible)
+ 		{
+ 			SetBufferCommitInfoNeedsSave(buf);
+ 			PageSetAllVisible(page);
+ 		}
+ 		else if (PageIsAllVisible(page) && !all_visible)
+ 		{
+ 			elog(WARNING, "all-visible flag was incorrectly set");
+ 			SetBufferCommitInfoNeedsSave(buf);
+ 			PageClearAllVisible(page);
+ 		}
+ 
+ 		/* Update the visibility map */
+ 		if (!all_visible_according_to_vm && all_visible)
+ 			visibilitymap_set(onerel, blkno, PageGetLSN(page), &vmbuffer);
+ 
  		/* Remember the location of the last page with nonremovable tuples */
  		if (hastup)
  			vacrelstats->nonempty_pages = blkno + 1;
***************
*** 560,565 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 651,663 ----
  		vacrelstats->num_index_scans++;
  	}
  
+ 	/* Release the pin on the visibility map page */
+ 	if (BufferIsValid(vmbuffer))
+ 	{
+ 		ReleaseBuffer(vmbuffer);
+ 		vmbuffer = InvalidBuffer;
+ 	}
+ 
  	/* Do post-vacuum cleanup and statistics update for each index */
  	for (i = 0; i < nindexes; i++)
  		lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
***************
*** 623,628 **** lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
--- 721,735 ----
  		LockBufferForCleanup(buf);
  		tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);
  
+ 		/*
+ 		 * Before we let the page go, prune it. The primary reason is to
+ 		 * update the visibility map in the common special case that we just
+ 		 * vacuumed away the last tuple on the page that wasn't visible to
+ 		 * everyone.
+ 		 */
+ 		vacrelstats->tuples_deleted +=
+ 			heap_page_prune(onerel, buf, OldestXmin, false, false);
+ 
  		/* Now that we've compacted the page, record its available space */
  		page = BufferGetPage(buf);
  		freespace = PageGetHeapFreeSpace(page);
*** src/backend/storage/freespace/freespace.c
--- src/backend/storage/freespace/freespace.c
***************
*** 555,562 **** fsm_extend(Relation rel, BlockNumber n_fsmblocks, bool createstorage)
  	 * FSM happens seldom enough that it doesn't seem worthwhile to
  	 * have a separate lock tag type for it.
  	 *
! 	 * Note that another backend might have extended the relation
! 	 * before we get the lock.
  	 */
  	LockRelationForExtension(rel, ExclusiveLock);
  
--- 555,562 ----
  	 * FSM happens seldom enough that it doesn't seem worthwhile to
  	 * have a separate lock tag type for it.
  	 *
! 	 * Note that another backend might have extended or created the
! 	 * relation before we get the lock.
  	 */
  	LockRelationForExtension(rel, ExclusiveLock);
  
*** src/backend/storage/smgr/smgr.c
--- src/backend/storage/smgr/smgr.c
***************
*** 21,26 ****
--- 21,27 ----
  #include "catalog/catalog.h"
  #include "commands/tablespace.h"
  #include "storage/bufmgr.h"
+ #include "storage/freespace.h"
  #include "storage/ipc.h"
  #include "storage/smgr.h"
  #include "utils/hsearch.h"
*** src/backend/utils/cache/relcache.c
--- src/backend/utils/cache/relcache.c
***************
*** 305,310 **** AllocateRelationDesc(Relation relation, Form_pg_class relp)
--- 305,311 ----
  	MemSet(relation, 0, sizeof(RelationData));
  	relation->rd_targblock = InvalidBlockNumber;
  	relation->rd_fsm_nblocks_cache = InvalidBlockNumber;
+ 	relation->rd_vm_nblocks_cache = InvalidBlockNumber;
  
  	/* make sure relation is marked as having no open file yet */
  	relation->rd_smgr = NULL;
***************
*** 1377,1382 **** formrdesc(const char *relationName, Oid relationReltype,
--- 1378,1384 ----
  	relation = (Relation) palloc0(sizeof(RelationData));
  	relation->rd_targblock = InvalidBlockNumber;
  	relation->rd_fsm_nblocks_cache = InvalidBlockNumber;
+ 	relation->rd_vm_nblocks_cache = InvalidBlockNumber;
  
  	/* make sure relation is marked as having no open file yet */
  	relation->rd_smgr = NULL;
***************
*** 1665,1673 **** RelationReloadIndexInfo(Relation relation)
  	heap_freetuple(pg_class_tuple);
  	/* We must recalculate physical address in case it changed */
  	RelationInitPhysicalAddr(relation);
! 	/* Must reset targblock and fsm_nblocks_cache in case rel was truncated */
  	relation->rd_targblock = InvalidBlockNumber;
  	relation->rd_fsm_nblocks_cache = InvalidBlockNumber;
  	/* Must free any AM cached data, too */
  	if (relation->rd_amcache)
  		pfree(relation->rd_amcache);
--- 1667,1676 ----
  	heap_freetuple(pg_class_tuple);
  	/* We must recalculate physical address in case it changed */
  	RelationInitPhysicalAddr(relation);
! 	/* Must reset targblock and fsm_nblocks_cache and vm_nblocks_cache in case rel was truncated */
  	relation->rd_targblock = InvalidBlockNumber;
  	relation->rd_fsm_nblocks_cache = InvalidBlockNumber;
+ 	relation->rd_vm_nblocks_cache = InvalidBlockNumber;
  	/* Must free any AM cached data, too */
  	if (relation->rd_amcache)
  		pfree(relation->rd_amcache);
***************
*** 1751,1756 **** RelationClearRelation(Relation relation, bool rebuild)
--- 1754,1760 ----
  	{
  		relation->rd_targblock = InvalidBlockNumber;
  		relation->rd_fsm_nblocks_cache = InvalidBlockNumber;
+ 		relation->rd_vm_nblocks_cache = InvalidBlockNumber;
  		if (relation->rd_rel->relkind == RELKIND_INDEX)
  		{
  			relation->rd_isvalid = false;		/* needs to be revalidated */
***************
*** 2346,2351 **** RelationBuildLocalRelation(const char *relname,
--- 2350,2356 ----
  
  	rel->rd_targblock = InvalidBlockNumber;
  	rel->rd_fsm_nblocks_cache = InvalidBlockNumber;
+ 	rel->rd_vm_nblocks_cache = InvalidBlockNumber;
  
  	/* make sure relation is marked as having no open file yet */
  	rel->rd_smgr = NULL;
***************
*** 3603,3608 **** load_relcache_init_file(void)
--- 3608,3614 ----
  		rel->rd_smgr = NULL;
  		rel->rd_targblock = InvalidBlockNumber;
  		rel->rd_fsm_nblocks_cache = InvalidBlockNumber;
+ 		rel->rd_vm_nblocks_cache = InvalidBlockNumber;
  		if (rel->rd_isnailed)
  			rel->rd_refcnt = 1;
  		else
*** src/include/access/heapam.h
--- src/include/access/heapam.h
***************
*** 153,158 **** extern void heap_page_prune_execute(Buffer buffer,
--- 153,159 ----
  						OffsetNumber *nowunused, int nunused,
  						bool redirect_move);
  extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets);
+ extern void heap_page_update_all_visible(Buffer buffer);
  
  /* in heap/syncscan.c */
  extern void ss_report_location(Relation rel, BlockNumber location);
*** src/include/access/htup.h
--- src/include/access/htup.h
***************
*** 601,606 **** typedef struct xl_heaptid
--- 601,607 ----
  typedef struct xl_heap_delete
  {
  	xl_heaptid	target;			/* deleted tuple id */
+ 	bool all_visible_cleared;	/* PD_ALL_VISIBLE was cleared */
  } xl_heap_delete;
  
  #define SizeOfHeapDelete	(offsetof(xl_heap_delete, target) + SizeOfHeapTid)
***************
*** 626,641 **** typedef struct xl_heap_header
  typedef struct xl_heap_insert
  {
  	xl_heaptid	target;			/* inserted tuple id */
  	/* xl_heap_header & TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_insert;
  
! #define SizeOfHeapInsert	(offsetof(xl_heap_insert, target) + SizeOfHeapTid)
  
  /* This is what we need to know about update|move|hot_update */
  typedef struct xl_heap_update
  {
  	xl_heaptid	target;			/* deleted tuple id */
  	ItemPointerData newtid;		/* new inserted tuple id */
  	/* NEW TUPLE xl_heap_header (PLUS xmax & xmin IF MOVE OP) */
  	/* and TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_update;
--- 627,645 ----
  typedef struct xl_heap_insert
  {
  	xl_heaptid	target;			/* inserted tuple id */
+ 	bool all_visible_cleared;	/* PD_ALL_VISIBLE was cleared */
  	/* xl_heap_header & TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_insert;
  
! #define SizeOfHeapInsert	(offsetof(xl_heap_insert, all_visible_cleared) + sizeof(bool))
  
  /* This is what we need to know about update|move|hot_update */
  typedef struct xl_heap_update
  {
  	xl_heaptid	target;			/* deleted tuple id */
  	ItemPointerData newtid;		/* new inserted tuple id */
+ 	bool all_visible_cleared;	/* PD_ALL_VISIBLE was cleared */
+ 	bool new_all_visible_cleared; /* same for the page of newtid */
  	/* NEW TUPLE xl_heap_header (PLUS xmax & xmin IF MOVE OP) */
  	/* and TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_update;
*** /dev/null
--- src/include/access/visibilitymap.h
***************
*** 0 ****
--- 1,28 ----
+ /*-------------------------------------------------------------------------
+  *
+  * visibilitymap.h
+  *      visibility map interface
+  *
+  *
+  * Portions Copyright (c) 2007, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  * $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #ifndef VISIBILITYMAP_H
+ #define VISIBILITYMAP_H
+ 
+ #include "utils/rel.h"
+ #include "storage/buf.h"
+ #include "storage/itemptr.h"
+ #include "access/xlogdefs.h"
+ 
+ extern void visibilitymap_set(Relation rel, BlockNumber heapBlk,
+ 							  XLogRecPtr recptr, Buffer *vmbuf);
+ extern void visibilitymap_clear(Relation rel, BlockNumber heapBlk);
+ extern bool visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *vmbuf);
+ extern void visibilitymap_truncate(Relation rel, BlockNumber heapblk);
+ 
+ #endif   /* VISIBILITYMAP_H */
*** src/include/storage/bufpage.h
--- src/include/storage/bufpage.h
***************
*** 152,159 **** typedef PageHeaderData *PageHeader;
  #define PD_HAS_FREE_LINES	0x0001		/* are there any unused line pointers? */
  #define PD_PAGE_FULL		0x0002		/* not enough free space for new
  										 * tuple? */
  
! #define PD_VALID_FLAG_BITS	0x0003		/* OR of all valid pd_flags bits */
  
  /*
   * Page layout version number 0 is for pre-7.3 Postgres releases.
--- 152,161 ----
  #define PD_HAS_FREE_LINES	0x0001		/* are there any unused line pointers? */
  #define PD_PAGE_FULL		0x0002		/* not enough free space for new
  										 * tuple? */
+ #define PD_ALL_VISIBLE		0x0004		/* all tuples on page are visible to
+ 										 * everyone */
  
! #define PD_VALID_FLAG_BITS	0x0007		/* OR of all valid pd_flags bits */
  
  /*
   * Page layout version number 0 is for pre-7.3 Postgres releases.
***************
*** 336,341 **** typedef PageHeaderData *PageHeader;
--- 338,350 ----
  #define PageClearFull(page) \
  	(((PageHeader) (page))->pd_flags &= ~PD_PAGE_FULL)
  
+ #define PageIsAllVisible(page) \
+ 	(((PageHeader) (page))->pd_flags & PD_ALL_VISIBLE)
+ #define PageSetAllVisible(page) \
+ 	(((PageHeader) (page))->pd_flags |= PD_ALL_VISIBLE)
+ #define PageClearAllVisible(page) \
+ 	(((PageHeader) (page))->pd_flags &= ~PD_ALL_VISIBLE)
+ 
  #define PageIsPrunable(page, oldestxmin) \
  ( \
  	AssertMacro(TransactionIdIsNormal(oldestxmin)), \
*** src/include/storage/relfilenode.h
--- src/include/storage/relfilenode.h
***************
*** 24,37 **** typedef enum ForkNumber
  {
  	InvalidForkNumber = -1,
  	MAIN_FORKNUM = 0,
! 	FSM_FORKNUM
  	/*
  	 * NOTE: if you add a new fork, change MAX_FORKNUM below and update the
  	 * forkNames array in catalog.c
  	 */
  } ForkNumber;
  
! #define MAX_FORKNUM		FSM_FORKNUM
  
  /*
   * RelFileNode must provide all that we need to know to physically access
--- 24,38 ----
  {
  	InvalidForkNumber = -1,
  	MAIN_FORKNUM = 0,
! 	FSM_FORKNUM,
! 	VISIBILITYMAP_FORKNUM
  	/*
  	 * NOTE: if you add a new fork, change MAX_FORKNUM below and update the
  	 * forkNames array in catalog.c
  	 */
  } ForkNumber;
  
! #define MAX_FORKNUM		VISIBILITYMAP_FORKNUM
  
  /*
   * RelFileNode must provide all that we need to know to physically access
*** src/include/utils/rel.h
--- src/include/utils/rel.h
***************
*** 195,202 **** typedef struct RelationData
  	List	   *rd_indpred;		/* index predicate tree, if any */
  	void	   *rd_amcache;		/* available for use by index AM */
  
! 	/* Cached last-seen size of the FSM */
  	BlockNumber	rd_fsm_nblocks_cache;
  
  	/* use "struct" here to avoid needing to include pgstat.h: */
  	struct PgStat_TableStatus *pgstat_info;		/* statistics collection area */
--- 195,203 ----
  	List	   *rd_indpred;		/* index predicate tree, if any */
  	void	   *rd_amcache;		/* available for use by index AM */
  
! 	/* Cached last-seen size of the FSM and visibility map */
  	BlockNumber	rd_fsm_nblocks_cache;
+ 	BlockNumber	rd_vm_nblocks_cache;
  
  	/* use "struct" here to avoid needing to include pgstat.h: */
  	struct PgStat_TableStatus *pgstat_info;		/* statistics collection area */
