*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 366,371 **** heapgetpage(HeapScanDesc scan, BlockNumber page)
--- 366,372 ----
  	LockBuffer(buffer, BUFFER_LOCK_SHARE);
  
  	dp = (Page) BufferGetPage(buffer);
+ 	TestForOldSnapshot(snapshot, scan->rs_rd, dp);
  	lines = PageGetMaxOffsetNumber(dp);
  	ntup = 0;
  
***************
*** 496,501 **** heapgettup(HeapScanDesc scan,
--- 497,503 ----
  		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
  
  		dp = (Page) BufferGetPage(scan->rs_cbuf);
+ 		TestForOldSnapshot(snapshot, scan->rs_rd, dp);
  		lines = PageGetMaxOffsetNumber(dp);
  		/* page and lineoff now reference the physically next tid */
  
***************
*** 538,543 **** heapgettup(HeapScanDesc scan,
--- 540,546 ----
  		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
  
  		dp = (Page) BufferGetPage(scan->rs_cbuf);
+ 		TestForOldSnapshot(snapshot, scan->rs_rd, dp);
  		lines = PageGetMaxOffsetNumber(dp);
  
  		if (!scan->rs_inited)
***************
*** 696,701 **** heapgettup(HeapScanDesc scan,
--- 699,705 ----
  		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
  
  		dp = (Page) BufferGetPage(scan->rs_cbuf);
+ 		TestForOldSnapshot(snapshot, scan->rs_rd, dp);
  		lines = PageGetMaxOffsetNumber((Page) dp);
  		linesleft = lines;
  		if (backward)
***************
*** 1573,1578 **** heap_fetch(Relation relation,
--- 1577,1583 ----
  	 */
  	LockBuffer(buffer, BUFFER_LOCK_SHARE);
  	page = BufferGetPage(buffer);
+ 	TestForOldSnapshot(snapshot, relation, page);
  
  	/*
  	 * We'd better check for out-of-range offnum in case of VACUUM since the
***************
*** 1902,1907 **** heap_get_latest_tid(Relation relation,
--- 1907,1913 ----
  		buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&ctid));
  		LockBuffer(buffer, BUFFER_LOCK_SHARE);
  		page = BufferGetPage(buffer);
+ 		TestForOldSnapshot(snapshot, relation, page);
  
  		/*
  		 * Check for bogus item number.  This is not treated as an error
*** a/src/backend/access/heap/pruneheap.c
--- b/src/backend/access/heap/pruneheap.c
***************
*** 92,103 **** heap_page_prune_opt(Relation relation, Buffer buffer)
  	 * need to use the horizon that includes slots, otherwise the data-only
  	 * horizon can be used. Note that the toast relation of user defined
  	 * relations are *not* considered catalog relations.
  	 */
  	if (IsCatalogRelation(relation) ||
  		RelationIsAccessibleInLogicalDecoding(relation))
  		OldestXmin = RecentGlobalXmin;
  	else
! 		OldestXmin = RecentGlobalDataXmin;
  
  	Assert(TransactionIdIsValid(OldestXmin));
  
--- 92,112 ----
  	 * need to use the horizon that includes slots, otherwise the data-only
  	 * horizon can be used. Note that the toast relation of user defined
  	 * relations are *not* considered catalog relations.
+ 	 *
+ 	 * It is OK to apply the old snapshot limit before acquiring the cleanup
+ 	 * lock because the worst that can happen is that we are not quite as
+ 	 * aggressive about the cleanup (by however many transaction IDs are
+ 	 * consumed between this point and acquiring the lock).  This allows us to
+ 	 * save significant overhead in the case where the page is found not to be
+ 	 * prunable.
  	 */
  	if (IsCatalogRelation(relation) ||
  		RelationIsAccessibleInLogicalDecoding(relation))
  		OldestXmin = RecentGlobalXmin;
  	else
! 		OldestXmin =
! 				TransactionIdLimitedForOldSnapshots(RecentGlobalDataXmin,
! 													relation);
  
  	Assert(TransactionIdIsValid(OldestXmin));
  
*** a/src/backend/access/nbtree/nbtinsert.c
--- b/src/backend/access/nbtree/nbtinsert.c
***************
*** 118,124 **** _bt_doinsert(Relation rel, IndexTuple itup,
  
  top:
  	/* find the first page containing this key */
! 	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
  
  	offset = InvalidOffsetNumber;
  
--- 118,124 ----
  
  top:
  	/* find the first page containing this key */
! 	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, NULL);
  
  	offset = InvalidOffsetNumber;
  
***************
*** 134,140 **** top:
  	 * precise description.
  	 */
  	buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
! 						true, stack, BT_WRITE);
  
  	/*
  	 * If we're not allowing duplicates, make sure the key isn't already in
--- 134,140 ----
  	 * precise description.
  	 */
  	buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
! 						true, stack, BT_WRITE, NULL);
  
  	/*
  	 * If we're not allowing duplicates, make sure the key isn't already in
***************
*** 1654,1660 **** _bt_insert_parent(Relation rel,
  			elog(DEBUG2, "concurrent ROOT page split");
  			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
  			/* Find the leftmost page at the next level up */
! 			pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
  			/* Set up a phony stack entry pointing there */
  			stack = &fakestack;
  			stack->bts_blkno = BufferGetBlockNumber(pbuf);
--- 1654,1661 ----
  			elog(DEBUG2, "concurrent ROOT page split");
  			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
  			/* Find the leftmost page at the next level up */
! 			pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false,
! 									NULL);
  			/* Set up a phony stack entry pointing there */
  			stack = &fakestack;
  			stack->bts_blkno = BufferGetBlockNumber(pbuf);
*** a/src/backend/access/nbtree/nbtpage.c
--- b/src/backend/access/nbtree/nbtpage.c
***************
*** 1254,1260 **** _bt_pagedel(Relation rel, Buffer buf)
  				itup_scankey = _bt_mkscankey(rel, targetkey);
  				/* find the leftmost leaf page containing this key */
  				stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
! 								   false, &lbuf, BT_READ);
  				/* don't need a pin on the page */
  				_bt_relbuf(rel, lbuf);
  
--- 1254,1260 ----
  				itup_scankey = _bt_mkscankey(rel, targetkey);
  				/* find the leftmost leaf page containing this key */
  				stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
! 								   false, &lbuf, BT_READ, NULL);
  				/* don't need a pin on the page */
  				_bt_relbuf(rel, lbuf);
  
*** a/src/backend/access/nbtree/nbtsearch.c
--- b/src/backend/access/nbtree/nbtsearch.c
***************
*** 22,27 ****
--- 22,28 ----
  #include "storage/predicate.h"
  #include "utils/lsyscache.h"
  #include "utils/rel.h"
+ #include "utils/snapmgr.h"
  
  
  static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
***************
*** 29,35 **** static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
  static void _bt_saveitem(BTScanOpaque so, int itemIndex,
  			 OffsetNumber offnum, IndexTuple itup);
  static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
! static Buffer _bt_walk_left(Relation rel, Buffer buf);
  static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
  
  
--- 30,36 ----
  static void _bt_saveitem(BTScanOpaque so, int itemIndex,
  			 OffsetNumber offnum, IndexTuple itup);
  static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
! static Buffer _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot);
  static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
  
  
***************
*** 48,53 **** static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
--- 49,58 ----
   * address of the leaf-page buffer, which is read-locked and pinned.
   * No locks are held on the parent pages, however!
   *
+  * If the snapshot parameter is not NULL, "old snapshot" checking will take
+  * place during the descent through the tree.  This is not needed when
+  * positioning for an insert or delete, so NULL is used for those cases.
+  *
   * NOTE that the returned buffer is read-locked regardless of the access
   * parameter.  However, access = BT_WRITE will allow an empty root page
   * to be created and returned.  When access = BT_READ, an empty index
***************
*** 56,62 **** static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
   */
  BTStack
  _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
! 		   Buffer *bufP, int access)
  {
  	BTStack		stack_in = NULL;
  
--- 61,67 ----
   */
  BTStack
  _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
! 		   Buffer *bufP, int access, Snapshot snapshot)
  {
  	BTStack		stack_in = NULL;
  
***************
*** 65,71 **** _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
--- 70,79 ----
  
  	/* If index is empty and access = BT_READ, no root page is created. */
  	if (!BufferIsValid(*bufP))
+ 	{
+ 		/* FIXME: old snapshot checking special case here? */
  		return (BTStack) NULL;
+ 	}
  
  	/* Loop iterates once per level descended in the tree */
  	for (;;)
***************
*** 93,99 **** _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
  		 */
  		*bufP = _bt_moveright(rel, *bufP, keysz, scankey, nextkey,
  							  (access == BT_WRITE), stack_in,
! 							  BT_READ);
  
  		/* if this is a leaf page, we're done */
  		page = BufferGetPage(*bufP);
--- 101,107 ----
  		 */
  		*bufP = _bt_moveright(rel, *bufP, keysz, scankey, nextkey,
  							  (access == BT_WRITE), stack_in,
! 							  BT_READ, snapshot);
  
  		/* if this is a leaf page, we're done */
  		page = BufferGetPage(*bufP);
***************
*** 166,171 **** _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
--- 174,183 ----
   * On entry, we have the buffer pinned and a lock of the type specified by
   * 'access'.  If we move right, we release the buffer and lock and acquire
   * the same on the right sibling.  Return value is the buffer we stop at.
+  *
+  * If the snapshot parameter is not NULL, "old snapshot" checking will take
+  * place during the descent through the tree.  This is not needed when
+  * positioning for an insert or delete, so NULL is used for those cases.
   */
  Buffer
  _bt_moveright(Relation rel,
***************
*** 175,181 **** _bt_moveright(Relation rel,
  			  bool nextkey,
  			  bool forupdate,
  			  BTStack stack,
! 			  int access)
  {
  	Page		page;
  	BTPageOpaque opaque;
--- 187,194 ----
  			  bool nextkey,
  			  bool forupdate,
  			  BTStack stack,
! 			  int access,
! 			  Snapshot snapshot)
  {
  	Page		page;
  	BTPageOpaque opaque;
***************
*** 201,206 **** _bt_moveright(Relation rel,
--- 214,220 ----
  	for (;;)
  	{
  		page = BufferGetPage(buf);
+ 		TestForOldSnapshot(snapshot, rel, page);
  		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  
  		if (P_RIGHTMOST(opaque))
***************
*** 937,943 **** _bt_first(IndexScanDesc scan, ScanDirection dir)
  	 * Use the manufactured insertion scan key to descend the tree and
  	 * position ourselves on the target leaf page.
  	 */
! 	stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ);
  
  	/* don't need to keep the stack around... */
  	_bt_freestack(stack);
--- 951,958 ----
  	 * Use the manufactured insertion scan key to descend the tree and
  	 * position ourselves on the target leaf page.
  	 */
! 	stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ,
! 					   scan->xs_snapshot);
  
  	/* don't need to keep the stack around... */
  	_bt_freestack(stack);
***************
*** 1308,1313 **** _bt_steppage(IndexScanDesc scan, ScanDirection dir)
--- 1323,1329 ----
  			so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
  			/* check for deleted page */
  			page = BufferGetPage(so->currPos.buf);
+ 			TestForOldSnapshot(scan->xs_snapshot, rel, page);
  			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  			if (!P_IGNORE(opaque))
  			{
***************
*** 1344,1350 **** _bt_steppage(IndexScanDesc scan, ScanDirection dir)
  			}
  
  			/* Step to next physical page */
! 			so->currPos.buf = _bt_walk_left(rel, so->currPos.buf);
  
  			/* if we're physically at end of index, return failure */
  			if (so->currPos.buf == InvalidBuffer)
--- 1360,1367 ----
  			}
  
  			/* Step to next physical page */
! 			so->currPos.buf = _bt_walk_left(rel, so->currPos.buf,
! 											scan->xs_snapshot);
  
  			/* if we're physically at end of index, return failure */
  			if (so->currPos.buf == InvalidBuffer)
***************
*** 1356,1361 **** _bt_steppage(IndexScanDesc scan, ScanDirection dir)
--- 1373,1379 ----
  			 * and do it all again.
  			 */
  			page = BufferGetPage(so->currPos.buf);
+ 			TestForOldSnapshot(scan->xs_snapshot, rel, page);
  			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  			if (!P_IGNORE(opaque))
  			{
***************
*** 1386,1392 **** _bt_steppage(IndexScanDesc scan, ScanDirection dir)
   * again if it's important.
   */
  static Buffer
! _bt_walk_left(Relation rel, Buffer buf)
  {
  	Page		page;
  	BTPageOpaque opaque;
--- 1404,1410 ----
   * again if it's important.
   */
  static Buffer
! _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot)
  {
  	Page		page;
  	BTPageOpaque opaque;
***************
*** 1416,1421 **** _bt_walk_left(Relation rel, Buffer buf)
--- 1434,1440 ----
  		CHECK_FOR_INTERRUPTS();
  		buf = _bt_getbuf(rel, blkno, BT_READ);
  		page = BufferGetPage(buf);
+ 		TestForOldSnapshot(snapshot, rel, page);
  		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  
  		/*
***************
*** 1442,1453 **** _bt_walk_left(Relation rel, Buffer buf)
--- 1461,1474 ----
  			blkno = opaque->btpo_next;
  			buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
  			page = BufferGetPage(buf);
+ 			TestForOldSnapshot(snapshot, rel, page);
  			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  		}
  
  		/* Return to the original page to see what's up */
  		buf = _bt_relandgetbuf(rel, buf, obknum, BT_READ);
  		page = BufferGetPage(buf);
+ 		TestForOldSnapshot(snapshot, rel, page);
  		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  		if (P_ISDELETED(opaque))
  		{
***************
*** 1465,1470 **** _bt_walk_left(Relation rel, Buffer buf)
--- 1486,1492 ----
  				blkno = opaque->btpo_next;
  				buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
  				page = BufferGetPage(buf);
+ 				TestForOldSnapshot(snapshot, rel, page);
  				opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  				if (!P_ISDELETED(opaque))
  					break;
***************
*** 1501,1507 **** _bt_walk_left(Relation rel, Buffer buf)
   * The returned buffer is pinned and read-locked.
   */
  Buffer
! _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
  {
  	Buffer		buf;
  	Page		page;
--- 1523,1530 ----
   * The returned buffer is pinned and read-locked.
   */
  Buffer
! _bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
! 				 Snapshot snapshot)
  {
  	Buffer		buf;
  	Page		page;
***************
*** 1524,1529 **** _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
--- 1547,1553 ----
  		return InvalidBuffer;
  
  	page = BufferGetPage(buf);
+ 	TestForOldSnapshot(snapshot, rel, page);
  	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  
  	for (;;)
***************
*** 1543,1548 **** _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
--- 1567,1573 ----
  					 RelationGetRelationName(rel));
  			buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
  			page = BufferGetPage(buf);
+ 			TestForOldSnapshot(snapshot, rel, page);
  			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  		}
  
***************
*** 1595,1601 **** _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
  	 * version of _bt_search().  We don't maintain a stack since we know we
  	 * won't need it.
  	 */
! 	buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir));
  
  	if (!BufferIsValid(buf))
  	{
--- 1620,1626 ----
  	 * version of _bt_search().  We don't maintain a stack since we know we
  	 * won't need it.
  	 */
! 	buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir), scan->xs_snapshot);
  
  	if (!BufferIsValid(buf))
  	{
*** a/src/backend/commands/vacuum.c
--- b/src/backend/commands/vacuum.c
***************
*** 441,447 **** vacuum_set_xid_limits(Relation rel,
  	 * working on a particular table at any time, and that each vacuum is
  	 * always an independent transaction.
  	 */
! 	*oldestXmin = GetOldestXmin(rel, true);
  
  	Assert(TransactionIdIsNormal(*oldestXmin));
  
--- 441,448 ----
  	 * working on a particular table at any time, and that each vacuum is
  	 * always an independent transaction.
  	 */
! 	*oldestXmin =
! 		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel);
  
  	Assert(TransactionIdIsNormal(*oldestXmin));
  
*** a/src/backend/commands/vacuumlazy.c
--- b/src/backend/commands/vacuumlazy.c
***************
*** 59,64 ****
--- 59,65 ----
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"
  #include "utils/pg_rusage.h"
+ #include "utils/snapmgr.h"
  #include "utils/timestamp.h"
  #include "utils/tqual.h"
  
***************
*** 267,273 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
  	possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
  	if (possibly_freeable > 0 &&
  		(possibly_freeable >= REL_TRUNCATE_MINIMUM ||
! 		 possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION))
  		lazy_truncate_heap(onerel, vacrelstats);
  
  	/* Vacuum the Free Space Map */
--- 268,275 ----
  	possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
  	if (possibly_freeable > 0 &&
  		(possibly_freeable >= REL_TRUNCATE_MINIMUM ||
! 		 possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION) &&
! 		old_snapshot_threshold < 0)
  		lazy_truncate_heap(onerel, vacrelstats);
  
  	/* Vacuum the Free Space Map */
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
***************
*** 1609,1614 **** GetSnapshotData(Snapshot snapshot)
--- 1609,1620 ----
  	snapshot->regd_count = 0;
  	snapshot->copied = false;
  
+ 	/*
+ 	 * Capture the current WAL stream location in case this snapshot becomes
+ 	 * old enough to need to fall back on the special "old snapshot" logic.
+ 	 */
+ 	snapshot->lsn = GetXLogInsertRecPtr();
+ 
  	return snapshot;
  }
  
*** a/src/backend/utils/errcodes.txt
--- b/src/backend/utils/errcodes.txt
***************
*** 410,415 **** Section: Class 58 - System Error (errors external to PostgreSQL itself)
--- 410,419 ----
  58P01    E    ERRCODE_UNDEFINED_FILE                                         undefined_file
  58P02    E    ERRCODE_DUPLICATE_FILE                                         duplicate_file
  
+ Section: Class 72 - Snapshot Failure
+ # (class borrowed from Oracle)
+ 72000    E    ERRCODE_SNAPSHOT_TOO_OLD                                       snapshot_too_old
+ 
  Section: Class F0 - Configuration File Error
  
  # (PostgreSQL-specific error class)
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 2448,2453 **** static struct config_int ConfigureNamesInt[] =
--- 2448,2463 ----
  	},
  
  	{
+ 		{"old_snapshot_threshold", PGC_POSTMASTER, RESOURCES_ASYNCHRONOUS,
+ 			gettext_noop("The number of transaction IDs which must be consumed before a snapshot can be considered too old."),
+ 			gettext_noop("A value of -1 disables this feature.")
+ 		},
+ 		&old_snapshot_threshold,
+ 		-1, -1, 2000000000,
+ 		NULL, NULL, NULL
+ 	},
+ 
+ 	{
  		{"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER,
  			gettext_noop("Time between issuing TCP keepalives."),
  			gettext_noop("A value of 0 uses the system default."),
*** a/src/backend/utils/time/snapmgr.c
--- b/src/backend/utils/time/snapmgr.c
***************
*** 54,59 ****
--- 54,60 ----
  #include "storage/sinval.h"
  #include "utils/builtins.h"
  #include "utils/memutils.h"
+ #include "utils/rel.h"
  #include "utils/resowner_private.h"
  #include "utils/snapmgr.h"
  #include "utils/syscache.h"
***************
*** 61,66 ****
--- 62,73 ----
  
  
  /*
+  * GUC parameters
+  */
+ int			old_snapshot_threshold;
+ 
+ 
+ /*
   * CurrentSnapshot points to the only snapshot taken in transaction-snapshot
   * mode, and to the latest one taken in a read-committed transaction.
   * SecondarySnapshot is a snapshot that's always up-to-date as of the current
***************
*** 1065,1070 **** pg_export_snapshot(PG_FUNCTION_ARGS)
--- 1072,1104 ----
  
  
  /*
+  * TransactionIdLimitedForOldSnapshots -- apply old snapshot limit, if any
+  */
+ TransactionId
+ TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
+ 									Relation relation)
+ {
+ 	if (TransactionIdIsNormal(recentXmin)
+ 		&& old_snapshot_threshold >= 0
+ 		&& RelationNeedsWAL(relation)
+ 		&& !IsCatalogRelation(relation)
+ 		&& !RelationIsAccessibleInLogicalDecoding(relation))
+ 	{
+ 		TransactionId xlimit;
+ 
+ 		xlimit = ShmemVariableCache->latestCompletedXid;
+ 		Assert(TransactionIdIsNormal(xlimit));
+ 		xlimit -= old_snapshot_threshold;
+ 		TransactionIdAdvance(xlimit);
+ 		if (NormalTransactionIdFollows(xlimit, recentXmin))
+ 			return xlimit;
+ 	}
+ 
+ 	return recentXmin;
+ }
+ 
+ 
+ /*
   * Parsing subroutines for ImportSnapshot: parse a line with the given
   * prefix followed by a value, and advance *s to the next line.  The
   * filename is provided for use in error messages.
*** a/src/include/access/nbtree.h
--- b/src/include/access/nbtree.h
***************
*** 669,685 **** extern int	_bt_pagedel(Relation rel, Buffer buf);
   */
  extern BTStack _bt_search(Relation rel,
  		   int keysz, ScanKey scankey, bool nextkey,
! 		   Buffer *bufP, int access);
  extern Buffer _bt_moveright(Relation rel, Buffer buf, int keysz,
  			  ScanKey scankey, bool nextkey, bool forupdate, BTStack stack,
! 			  int access);
  extern OffsetNumber _bt_binsrch(Relation rel, Buffer buf, int keysz,
  			ScanKey scankey, bool nextkey);
  extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
  			Page page, OffsetNumber offnum);
  extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
  extern bool _bt_next(IndexScanDesc scan, ScanDirection dir);
! extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost);
  
  /*
   * prototypes for functions in nbtutils.c
--- 669,686 ----
   */
  extern BTStack _bt_search(Relation rel,
  		   int keysz, ScanKey scankey, bool nextkey,
! 		   Buffer *bufP, int access, Snapshot snapshot);
  extern Buffer _bt_moveright(Relation rel, Buffer buf, int keysz,
  			  ScanKey scankey, bool nextkey, bool forupdate, BTStack stack,
! 			  int access, Snapshot snapshot);
  extern OffsetNumber _bt_binsrch(Relation rel, Buffer buf, int keysz,
  			ScanKey scankey, bool nextkey);
  extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
  			Page page, OffsetNumber offnum);
  extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
  extern bool _bt_next(IndexScanDesc scan, ScanDirection dir);
! extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
! 							   Snapshot snapshot);
  
  /*
   * prototypes for functions in nbtutils.c
*** a/src/include/utils/rel.h
--- b/src/include/utils/rel.h
***************
*** 15,20 ****
--- 15,21 ----
  #define REL_H
  
  #include "access/tupdesc.h"
+ #include "access/xlog.h"
  #include "catalog/pg_am.h"
  #include "catalog/pg_class.h"
  #include "catalog/pg_index.h"
*** a/src/include/utils/snapmgr.h
--- b/src/include/utils/snapmgr.h
***************
*** 14,21 ****
--- 14,41 ----
  #define SNAPMGR_H
  
  #include "fmgr.h"
+ #include "catalog/catalog.h"
  #include "utils/resowner.h"
  #include "utils/snapshot.h"
+ #include "utils/tqual.h"
+ 
+ 
+ #define TestForOldSnapshot(snapshot, relation, page) \
+ 	do { \
+ 		if (old_snapshot_threshold >= 0 \
+ 		 && ((snapshot) != NULL) \
+ 		 && (snapshot)->satisfies == HeapTupleSatisfiesMVCC \
+ 		 && !XLogRecPtrIsInvalid((snapshot)->lsn) \
+ 		 && PageGetLSN(page) > (snapshot)->lsn \
+ 		 && NormalTransactionIdFollows(TransactionIdLimitedForOldSnapshots((snapshot)->xmin, relation), (snapshot)->xmin)) \
+ 			ereport(ERROR, \
+ 					(errcode(ERRCODE_SNAPSHOT_TOO_OLD), \
+ 					 errmsg("snapshot too old"))); \
+ 	} while (0)
+ 
+ 
+ /* GUC variables */
+ extern int	old_snapshot_threshold;
  
  
  extern bool FirstSnapshotSet;
***************
*** 54,59 **** extern void ImportSnapshot(const char *idstr);
--- 74,81 ----
  extern bool XactHasExportedSnapshots(void);
  extern void DeleteAllExportedSnapshotFiles(void);
  extern bool ThereAreNoPriorRegisteredSnapshots(void);
+ extern TransactionId TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
+ 														 Relation relation);
  
  extern char *ExportSnapshot(Snapshot snapshot);
  
*** a/src/include/utils/snapshot.h
--- b/src/include/utils/snapshot.h
***************
*** 14,19 ****
--- 14,20 ----
  #define SNAPSHOT_H
  
  #include "access/htup.h"
+ #include "access/xlogdefs.h"
  #include "lib/pairingheap.h"
  #include "storage/buf.h"
  
***************
*** 95,100 **** typedef struct SnapshotData
--- 96,103 ----
  	uint32		regd_count;		/* refcount on RegisteredSnapshots */
  
  	pairingheap_node ph_node;	/* link in the RegisteredSnapshots heap */
+ 
+ 	XLogRecPtr	lsn;			/* position in the WAL stream */
  } SnapshotData;
  
  /*
*** a/src/include/utils/tqual.h
--- b/src/include/utils/tqual.h
***************
*** 15,20 ****
--- 15,21 ----
  #ifndef TQUAL_H
  #define TQUAL_H
  
+ #include "utils/hsearch.h"
  #include "utils/snapshot.h"
  
  
