*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 4139,4145 **** heap_xlog_cleanup_info(XLogRecPtr lsn, XLogRecord *record)
  	xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record);
  
  	if (InHotStandby)
! 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid);
  
  	/*
  	 * Actual operation is a no-op. Record type exists to provide a means
--- 4139,4145 ----
  	xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record);
  
  	if (InHotStandby)
! 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
  
  	/*
  	 * Actual operation is a no-op. Record type exists to provide a means
***************
*** 4171,4177 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
  	 * no queries running for which the removed tuples are still visible.
  	 */
  	if (InHotStandby)
! 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid);
  
  	RestoreBkpBlocks(lsn, record, true);
  
--- 4171,4177 ----
  	 * no queries running for which the removed tuples are still visible.
  	 */
  	if (InHotStandby)
! 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
  
  	RestoreBkpBlocks(lsn, record, true);
  
***************
*** 4241,4247 **** heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record)
  	 * consider the frozen xids as running.
  	 */
  	if (InHotStandby)
! 		ResolveRecoveryConflictWithSnapshot(cutoff_xid);
  
  	RestoreBkpBlocks(lsn, record, false);
  
--- 4241,4247 ----
  	 * consider the frozen xids as running.
  	 */
  	if (InHotStandby)
! 		ResolveRecoveryConflictWithSnapshot(cutoff_xid, xlrec->node);
  
  	RestoreBkpBlocks(lsn, record, false);
  
*** a/src/backend/access/nbtree/nbtxlog.c
--- b/src/backend/access/nbtree/nbtxlog.c
***************
*** 833,839 **** btree_redo(XLogRecPtr lsn, XLogRecord *record)
  		 * here is worth some thought and possibly some effort to
  		 * improve.
  		 */
! 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid);
  	}
  
  	/*
--- 833,839 ----
  		 * here is worth some thought and possibly some effort to
  		 * improve.
  		 */
! 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
  	}
  
  	/*
*** a/src/backend/commands/indexcmds.c
--- b/src/backend/commands/indexcmds.c
***************
*** 540,546 **** DefineIndex(RangeVar *heapRelation,
  	 * check for that.	Also, prepared xacts are not reported, which is fine
  	 * since they certainly aren't going to do anything more.
  	 */
! 	old_lockholders = GetLockConflicts(&heaplocktag, ShareLock);
  
  	while (VirtualTransactionIdIsValid(*old_lockholders))
  	{
--- 540,546 ----
  	 * check for that.	Also, prepared xacts are not reported, which is fine
  	 * since they certainly aren't going to do anything more.
  	 */
! 	old_lockholders = GetLockConflicts(&heaplocktag, ShareLock, InvalidTransactionId);
  
  	while (VirtualTransactionIdIsValid(*old_lockholders))
  	{
***************
*** 627,633 **** DefineIndex(RangeVar *heapRelation,
  	 * We once again wait until no transaction can have the table open with
  	 * the index marked as read-only for updates.
  	 */
! 	old_lockholders = GetLockConflicts(&heaplocktag, ShareLock);
  
  	while (VirtualTransactionIdIsValid(*old_lockholders))
  	{
--- 627,633 ----
  	 * We once again wait until no transaction can have the table open with
  	 * the index marked as read-only for updates.
  	 */
! 	old_lockholders = GetLockConflicts(&heaplocktag, ShareLock, InvalidTransactionId);
  
  	while (VirtualTransactionIdIsValid(*old_lockholders))
  	{
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
***************
*** 66,71 **** typedef struct ProcArrayStruct
--- 66,72 ----
  
  	int			numKnownAssignedXids;	/* current number of known assigned xids */
  	int			maxKnownAssignedXids;	/* allocated size of known assigned xids */
+ 
  	/*
  	 * Highest subxid that overflowed KnownAssignedXids array. Similar to
  	 * overflowing cached subxids in PGPROC entries.
***************
*** 73,78 **** typedef struct ProcArrayStruct
--- 74,86 ----
  	TransactionId	lastOverflowedXid;
  
  	/*
+ 	 * During Hot Standby we need to store a visibility limit for to defer recovery
+ 	 * conflict processing. The cached value is accessed during lock processing,
+ 	 * but stored on the procarray header. See lock.c for further details.
+ 	 */
+ 	TransactionId	latestRemovedXidCache[NUM_LOCK_PARTITIONS];
+ 
+ 	/*
  	 * We declare procs[] as 1 entry because C wants a fixed-size array, but
  	 * actually it is maxProcs entries long.
  	 */
***************
*** 220,225 **** CreateSharedProcArray(void)
--- 228,236 ----
  											  HASH_ELEM | HASH_FUNCTION);
  		if (!KnownAssignedXidsHash)
  			elog(FATAL, "could not initialize known assigned xids hash table");
+ 
+ 		/* Initialize the latestRemovedXidCache */
+ 		MemSet(procArray->latestRemovedXidCache, 0, NUM_LOCK_PARTITIONS * sizeof(TransactionId));
  	}
  }
  
***************
*** 1216,1221 **** GetSnapshotData(Snapshot snapshot)
--- 1227,1235 ----
  		RecentGlobalXmin = FirstNormalTransactionId;
  	RecentXmin = xmin;
  
+ 	/* Reset recovery conflict processing limit after deriving new snapshot */
+ 	MyProc->limitXmin = InvalidTransactionId;
+ 
  	snapshot->xmin = xmin;
  	snapshot->xmax = xmax;
  	snapshot->xcnt = count;
***************
*** 1739,1747 **** GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
  			{
  				VirtualTransactionId vxid;
  
! 				GET_VXID_FROM_PGPROC(vxid, *proc);
! 				if (VirtualTransactionIdIsValid(vxid))
! 					vxids[count++] = vxid;
  			}
  		}
  	}
--- 1753,1774 ----
  			{
  				VirtualTransactionId vxid;
  
! 				/*
! 				 * Avoid repeated re-listing of a process for conflict processing
! 				 * by excluding it once resolved up to a particular latestRemovedXid.
! 				 * We reset proc->limitXmin each time we take a new snapshot because
! 				 * that may need to have conflict processing performed on it, as
! 				 * discussed in header comments for this function.
! 				 */
! 				if (!TransactionIdIsValid(proc->limitXmin) ||
! 					TransactionIdFollows(limitXmin, proc->limitXmin))
! 				{
! 					proc->limitXmin = limitXmin;
! 
! 					GET_VXID_FROM_PGPROC(vxid, *proc);
! 					if (VirtualTransactionIdIsValid(vxid))
! 						vxids[count++] = vxid;
! 				}
  			}
  		}
  	}
***************
*** 2559,2561 **** KnownAssignedXidsDisplay(int trace_level)
--- 2586,2610 ----
  
  	pfree(buf.data);
  }
+ 
+ /*
+  * LatestRemovedXidCache provides a way of deferring recovery conflicts up to
+  * the point where a query requests a lock on a relation. These functions provide
+  * the cache required by recovery conflict processing. (see storage/lmgr/lock.c)
+  * Function prototypes are in standby.h, not procarray.h for convenience.
+  */
+ void
+ ProcArraySetLatestRemovedXidCache(int partition, TransactionId latestRemovedXid)
+ {
+ 	ProcArrayStruct *arrayP = procArray;
+ 
+ 	arrayP->latestRemovedXidCache[partition] = latestRemovedXid;
+ }
+ 
+ TransactionId
+ ProcArrayGetLatestRemovedXidCache(int partition)
+ {
+ 	ProcArrayStruct *arrayP = procArray;
+ 
+ 	return arrayP->latestRemovedXidCache[partition];
+ }
*** a/src/backend/storage/ipc/standby.c
--- b/src/backend/storage/ipc/standby.c
***************
*** 27,32 ****
--- 27,33 ----
  #include "storage/procarray.h"
  #include "storage/sinvaladt.h"
  #include "storage/standby.h"
+ #include "tcop/tcopprot.h"
  #include "utils/ps_status.h"
  
  int		vacuum_defer_cleanup_age;
***************
*** 34,40 **** int		vacuum_defer_cleanup_age;
  static List *RecoveryLockList;
  
  static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
! 									   ProcSignalReason reason);
  static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
  static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
  static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
--- 35,43 ----
  static List *RecoveryLockList;
  
  static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
! 									   ProcSignalReason reason,
! 									   TransactionId latestRemovedXid,
! 									   Oid dbOid, Oid relOid);
  static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
  static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
  static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
***************
*** 157,165 **** WaitExceedsMaxStandbyDelay(void)
   */
  static void
  ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
! 									   ProcSignalReason reason)
  {
  	char		waitactivitymsg[100];
  
  	while (VirtualTransactionIdIsValid(*waitlist))
  	{
--- 160,171 ----
   */
  static void
  ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
! 									   ProcSignalReason reason,
! 									   TransactionId latestRemovedXid,
! 									   Oid dbOid, Oid relOid)
  {
  	char		waitactivitymsg[100];
+ 	VirtualTransactionId *lockconflicts = NULL;
  
  	while (VirtualTransactionIdIsValid(*waitlist))
  	{
***************
*** 203,221 **** ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
  			if (WaitExceedsMaxStandbyDelay())
  			{
  				pid_t pid;
  
  				/*
! 				 * Now find out who to throw out of the balloon.
  				 */
! 				Assert(VirtualTransactionIdIsValid(*waitlist));
! 				pid = CancelVirtualTransaction(*waitlist, reason);
! 
! 				/*
! 				 * Wait awhile for it to die so that we avoid flooding an
! 				 * unresponsive backend when system is heavily loaded.
! 				 */
! 				if (pid != 0)
! 					pg_usleep(5000);
  			}
  		}
  
--- 209,279 ----
  			if (WaitExceedsMaxStandbyDelay())
  			{
  				pid_t pid;
+ 				bool cancel = true;
  
  				/*
! 				 * If it is a snapshot conflict decide whether we should resolve
! 				 * immediately or defer the resolution until later. This must be
! 				 * designed carefully so that it is a deferral in all cases, in
! 				 * particular we must not set the latestRemovedXid until when the
! 				 * stand_delay has reached max, which is now!
! 				 *
! 				 * If we had a snapshot conflict then we also check for lock conflicts.
! 				 * Lock conflicts disregard whether backends are holding or waiting for
! 				 * the lock, so we get everybody that has declared an intention on the
! 				 * lock. We are careful to acquire the lock partition lock in exclusive
! 				 * mode, so that no concurrent lock requestors slip by while we do this.
! 				 * While holding the lock partition lock we also record the latestRemovedXid
! 				 * so that anybody arriving after we release the lock will cancel
! 				 * themselves using the same errors they would have got here.
! 				 *
! 				 * If conflicts do exist yet don't conflict with the snapshot then we
! 				 * then we do not need to cancel. So we need to merge the two lists
! 				 * to determine who can be spared, for now.
! 				 *
! 				 * XXX O(N^2) at present, but neither list is long in the common case.
  				 */
! 				if (reason == PROCSIG_RECOVERY_CONFLICT_SNAPSHOT)
! 				{
! 					LOCKTAG			locktag;
! 					VirtualTransactionId *conflicts;
! 
! 					SET_LOCKTAG_RELATION(locktag, dbOid, relOid);
! 
! 					if (!lockconflicts)
! 						lockconflicts = GetLockConflicts(&locktag, AccessExclusiveLock,
! 															latestRemovedXid);
! 					conflicts = lockconflicts;
! 					cancel = false;
! 					while (VirtualTransactionIdIsValid(*conflicts))
! 					{
! 						if (waitlist->backendId == conflicts->backendId &&
! 							waitlist->localTransactionId == conflicts->localTransactionId)
! 						{
! 							cancel = true;
! 							break;
! 						}
! 						conflicts++;
! 					}
! 				}
! 
! 				if (cancel)
! 				{
! 					/*
! 					 * Now find out who to throw out of the balloon.
! 					 */
! 					Assert(VirtualTransactionIdIsValid(*waitlist));
! 					pid = CancelVirtualTransaction(*waitlist, reason);
! 
! 					/*
! 					 * Wait awhile for it to die so that we avoid flooding an
! 					 * unresponsive backend when system is heavily loaded.
! 					 */
! 					if (pid != 0)
! 						pg_usleep(5000);
! 				}
! 				else
! 					break;
  			}
  		}
  
***************
*** 232,238 **** ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
  }
  
  void
! ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid)
  {
  	VirtualTransactionId *backends;
  
--- 290,296 ----
  }
  
  void
! ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
  {
  	VirtualTransactionId *backends;
  
***************
*** 240,246 **** ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid)
  										 InvalidOid);
  
  	ResolveRecoveryConflictWithVirtualXIDs(backends,
! 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
  }
  
  void
--- 298,318 ----
  										 InvalidOid);
  
  	ResolveRecoveryConflictWithVirtualXIDs(backends,
! 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
! 										   latestRemovedXid,
! 										   node.dbNode, node.relNode);
! }
! 
! /*
!  * ResolveRecoveryConflictWithRemovedTransactionId is called to resolve
!  * a deferred conflict.
!  *
!  * Treat as a self-interrupt, but don't bother actually sending the signal.
!  */
! void
! ResolveRecoveryConflictWithRemovedTransactionId(void)
! {
! 	RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
  }
  
  void
***************
*** 270,276 **** ResolveRecoveryConflictWithTablespace(Oid tsid)
  	temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
  												InvalidOid);
  	ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
! 										   PROCSIG_RECOVERY_CONFLICT_TABLESPACE);
  }
  
  void
--- 342,350 ----
  	temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
  												InvalidOid);
  	ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
! 										   PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
! 										   InvalidTransactionId,
! 										   0, 0);
  }
  
  void
***************
*** 321,327 **** ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
  	while (!lock_acquired)
  	{
  		if (++num_attempts < 3)
! 			backends = GetLockConflicts(&locktag, AccessExclusiveLock);
  		else
  		{
  			backends = GetConflictingVirtualXIDs(InvalidTransactionId,
--- 395,403 ----
  	while (!lock_acquired)
  	{
  		if (++num_attempts < 3)
! 			backends = GetLockConflicts(&locktag,
! 										AccessExclusiveLock,
! 										InvalidTransactionId);
  		else
  		{
  			backends = GetConflictingVirtualXIDs(InvalidTransactionId,
***************
*** 330,336 **** ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
  		}
  
  		ResolveRecoveryConflictWithVirtualXIDs(backends,
! 											   PROCSIG_RECOVERY_CONFLICT_LOCK);
  
  		if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
  											!= LOCKACQUIRE_NOT_AVAIL)
--- 406,414 ----
  		}
  
  		ResolveRecoveryConflictWithVirtualXIDs(backends,
! 											   PROCSIG_RECOVERY_CONFLICT_LOCK,
! 											   InvalidTransactionId,
! 											   0, 0);
  
  		if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
  											!= LOCKACQUIRE_NOT_AVAIL)
*** a/src/backend/storage/lmgr/lock.c
--- b/src/backend/storage/lmgr/lock.c
***************
*** 35,40 ****
--- 35,41 ----
  #include "access/transam.h"
  #include "access/twophase.h"
  #include "access/twophase_rmgr.h"
+ #include "access/xact.h"
  #include "miscadmin.h"
  #include "pg_trace.h"
  #include "pgstat.h"
***************
*** 439,444 **** ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
--- 440,486 ----
  	return lockhash;
  }
  
+ /*
+  * SetRecoveryConflictLatestRemovedXid functions manage the latestRemovedXid cache.
+  * During Hot Standby we generate a list of conflicts and then resolve only those
+  * conflicts that have a clear and present lock conflict. We defer the conflict
+  * for other relations to some later time, hopefully never. The deferral mechanism
+  * is that we store the latestRemovedXid that applies to a relation behind the
+  * lock partition that applies for that relation. These routines exist to provide
+  * a level of modularity to allow us to alter the cacheing mechanism as needed.
+  *
+  * It is possible to defer the recovery conflict right up until the point that
+  * we access a particular data block. Doing that means we would have to cache
+  * latestRemovedXid for each data block, even those that have exited and re-entered
+  * cache. It would also mean we would need to re-check latestRemovedXid each time
+  * we access a data block. Both of those factors make that level of granularity
+  * more trouble than it would likely be worth in practice. So we check the
+  * latestRemovedXid once each time we lock a relation: smaller cache, fewer checks.
+  *
+  * We store the latestRemovedXid on the lock entry, giving easy access to the value
+  * that applies for that relation. If a lock is heavily used or held for a long
+  * duration then it will effectively have its own private cache. When the lock entry
+  * is removed, we lose the cached latestRemovedXid value.
+  *
+  * When a lock entry is created we set the value from the cache. The current cache
+  * is just a single value for each partition, though that could be extended to have
+  * multiple entries for frequently used relations. The current cache is stored on
+  * the header of the ProcArrayStruct as a convenient place to store shmem data
+  * and could be relocated elsewhere if required.
+  */
+ static void
+ LockSetRecoveryConflictLatestRemovedXidForPartition(int partition, TransactionId latestRemovedXid)
+ {
+ 	Assert(InHotStandby);
+ 
+ 	ProcArraySetLatestRemovedXidCache(partition, latestRemovedXid);
+ }
+ 
+ static TransactionId
+ LockGetRecoveryConflictLatestRemovedXidForPartition(int partition)
+ {
+ 	return ProcArrayGetLatestRemovedXidCache(partition);
+ }
  
  /*
   * LockAcquire -- Check for lock conflicts, sleep if conflict found,
***************
*** 632,637 **** LockAcquireExtended(const LOCKTAG *locktag,
--- 674,685 ----
  		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
  		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
  		LOCK_PRINT("LockAcquire: new", lock, lockmode);
+ 
+ 		/*
+ 		 * Start the latestRemovedXid from the partition's cached value
+ 		 */
+ 		if (TransactionStartedDuringRecovery())
+ 			lock->latestRemovedXid = LockGetRecoveryConflictLatestRemovedXidForPartition(partition);
  	}
  	else
  	{
***************
*** 642,647 **** LockAcquireExtended(const LOCKTAG *locktag,
--- 690,726 ----
  	}
  
  	/*
+ 	 * If latestRemovedXid is set, resolve any recovery conflict. We do this
+ 	 * here whether we wait or not. If our snapshot conflicts then we will
+ 	 * be cancelled, so there is no need for us to re-check this after we
+ 	 * wake from a wait for the lock.
+ 	 */
+ 	if (TransactionIdIsValid(lock->latestRemovedXid) &&
+ 		TransactionIdIsValid(MyProc->xmin) &&
+ 		TransactionIdFollowsOrEquals(lock->latestRemovedXid, MyProc->xmin))
+ 	{
+ 		if (lock->nRequested == 0)
+ 		{
+ 			/*
+ 			 * There are no other requestors of this lock, so garbage-collect
+ 			 * the lock object.  We *must* do this to avoid a permanent leak
+ 			 * of shared memory, because there won't be anything to cause
+ 			 * anyone to release the lock object later.
+ 			 */
+ 			Assert(SHMQueueEmpty(&(lock->procLocks)));
+ 			if (!hash_search_with_hash_value(LockMethodLockHash,
+ 											 (void *) &(lock->tag),
+ 											 hashcode,
+ 											 HASH_REMOVE,
+ 											 NULL))
+ 				elog(PANIC, "lock table corrupted");
+ 		}
+ 		LWLockRelease(partitionLock);
+ 		ResolveRecoveryConflictWithRemovedTransactionId();
+ 		/* Not reached */
+ 	}
+ 
+ 	/*
  	 * Create the hash key for the proclock table.
  	 */
  	proclocktag.myLock = lock;
***************
*** 1788,1796 **** LockReassignCurrentOwner(void)
   * uses of the result.
   */
  VirtualTransactionId *
! GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
  {
! 	VirtualTransactionId *vxids;
  	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
  	LockMethod	lockMethodTable;
  	LOCK	   *lock;
--- 1867,1875 ----
   * uses of the result.
   */
  VirtualTransactionId *
! GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, TransactionId latestRemovedXid)
  {
! 	static VirtualTransactionId *vxids;
  	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
  	LockMethod	lockMethodTable;
  	LOCK	   *lock;
***************
*** 1798,1803 **** GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
--- 1877,1883 ----
  	SHM_QUEUE  *procLocks;
  	PROCLOCK   *proclock;
  	uint32		hashcode;
+ 	int			partition;
  	LWLockId	partitionLock;
  	int			count = 0;
  
***************
*** 1812,1827 **** GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
  	 * need enough space for MaxBackends + a terminator, since prepared xacts
  	 * don't count.
  	 */
! 	vxids = (VirtualTransactionId *)
! 		palloc0(sizeof(VirtualTransactionId) * (MaxBackends + 1));
  
  	/*
  	 * Look up the lock object matching the tag.
  	 */
  	hashcode = LockTagHashCode(locktag);
  	partitionLock = LockHashPartitionLock(hashcode);
  
! 	LWLockAcquire(partitionLock, LW_SHARED);
  
  	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
  												(void *) locktag,
--- 1892,1929 ----
  	 * need enough space for MaxBackends + a terminator, since prepared xacts
  	 * don't count.
  	 */
! 	if (!InHotStandby)
! 		vxids = (VirtualTransactionId *)
! 			palloc0(sizeof(VirtualTransactionId) * (MaxBackends + 1));
! 	else
! 	{
! 		if (vxids == NULL)
! 		{
! 			vxids = (VirtualTransactionId *)
! 				malloc(sizeof(VirtualTransactionId) * (MaxBackends + 1));
! 			if (vxids == NULL)
! 				ereport(ERROR,
! 					(errcode(ERRCODE_OUT_OF_MEMORY),
! 					 errmsg("out of memory")));
! 
! 		}
! 	}
  
  	/*
  	 * Look up the lock object matching the tag.
  	 */
  	hashcode = LockTagHashCode(locktag);
+ 	partition = LockHashPartition(hashcode);
  	partitionLock = LockHashPartitionLock(hashcode);
  
! 	/*
! 	 * If we are setting latestRemovedXid we need to get exclusive lock
! 	 * to avoid race conditions.
! 	 */
! 	if (TransactionIdIsValid(latestRemovedXid))
! 		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
! 	else
! 		LWLockAcquire(partitionLock, LW_SHARED);
  
  	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
  												(void *) locktag,
***************
*** 1831,1836 **** GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
--- 1933,1944 ----
  	if (!lock)
  	{
  		/*
+ 		 * Set the latestRemovedXid for the partition.
+ 		 */
+ 		if (TransactionIdIsValid(latestRemovedXid))
+ 			LockSetRecoveryConflictLatestRemovedXidForPartition(partition, latestRemovedXid);
+ 
+ 		/*
  		 * If the lock object doesn't exist, there is nothing holding a lock
  		 * on this lockable object.
  		 */
***************
*** 1839,1844 **** GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
--- 1947,1958 ----
  	}
  
  	/*
+ 	 * Set the latestRemovedXid for the lock
+ 	 */
+ 	if (TransactionIdIsValid(latestRemovedXid))
+ 		lock->latestRemovedXid = latestRemovedXid;
+ 
+ 	/*
  	 * Examine each existing holder (or awaiter) of the lock.
  	 */
  	conflictMask = lockMethodTable->conflictTab[lockmode];
*** a/src/include/storage/lock.h
--- b/src/include/storage/lock.h
***************
*** 312,317 **** typedef struct LOCK
--- 312,323 ----
  	int			nRequested;		/* total of requested[] array */
  	int			granted[MAX_LOCKMODES]; /* counts of granted locks */
  	int			nGranted;		/* total of granted[] array */
+ 
+ 	/*
+ 	 * For Hot Standby, the xid limit for access to this table. Allows deferred
+ 	 * and selective conflict processing based upon the relation id.
+ 	 */
+ 	TransactionId latestRemovedXid;
  } LOCK;
  
  #define LOCK_LOCKMETHOD(lock) ((LOCKMETHODID) (lock).tag.locktag_lockmethodid)
***************
*** 488,494 **** extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
  extern void LockReleaseCurrentOwner(void);
  extern void LockReassignCurrentOwner(void);
  extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag,
! 				 LOCKMODE lockmode);
  extern void AtPrepare_Locks(void);
  extern void PostPrepare_Locks(TransactionId xid);
  extern int LockCheckConflicts(LockMethod lockMethodTable,
--- 494,500 ----
  extern void LockReleaseCurrentOwner(void);
  extern void LockReassignCurrentOwner(void);
  extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag,
! 				 LOCKMODE lockmode, TransactionId latestRemovedXid);
  extern void AtPrepare_Locks(void);
  extern void PostPrepare_Locks(TransactionId xid);
  extern int LockCheckConflicts(LockMethod lockMethodTable,
*** a/src/include/storage/proc.h
--- b/src/include/storage/proc.h
***************
*** 102,107 **** struct PGPROC
--- 102,112 ----
  	 */
  	bool		recoveryConflictPending;
  
+ 	TransactionId limitXmin;	/* The latest xid for which conflict processing
+ 								 * has been already performed for this proc.
+ 								 * Updated by snapshot conflicts, reset when
+ 								 * we take a new snapshot. */
+ 
  	/* Info about LWLock the process is currently waiting for, if any. */
  	bool		lwWaiting;		/* true if waiting for an LW lock */
  	bool		lwExclusive;	/* true if waiting for exclusive access */
*** a/src/include/storage/standby.h
--- b/src/include/storage/standby.h
***************
*** 16,34 ****
  
  #include "access/xlog.h"
  #include "storage/lock.h"
  
  extern int	vacuum_defer_cleanup_age;
  
  extern void InitRecoveryTransactionEnvironment(void);
  extern void ShutdownRecoveryTransactionEnvironment(void);
  
! extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid);
  extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
  extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
  
  extern void ResolveRecoveryConflictWithBufferPin(void);
  extern void SendRecoveryConflictWithBufferPin(void);
  
  /*
   * Standby Rmgr (RM_STANDBY_ID)
   *
--- 16,41 ----
  
  #include "access/xlog.h"
  #include "storage/lock.h"
+ #include "storage/relfilenode.h"
  
  extern int	vacuum_defer_cleanup_age;
  
  extern void InitRecoveryTransactionEnvironment(void);
  extern void ShutdownRecoveryTransactionEnvironment(void);
  
! extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
! 												RelFileNode node);
! extern void ResolveRecoveryConflictWithRemovedTransactionId(void);
  extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
  extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
  
  extern void ResolveRecoveryConflictWithBufferPin(void);
  extern void SendRecoveryConflictWithBufferPin(void);
  
+ /* Prototypes here rather than in procarray.h */
+ extern void ProcArraySetLatestRemovedXidCache(int partition, TransactionId latestRemovedXid);
+ extern TransactionId ProcArrayGetLatestRemovedXidCache(int partition);
+ 
  /*
   * Standby Rmgr (RM_STANDBY_ID)
   *
