diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 2e07702..e924c96 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -661,7 +661,8 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 
 	SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
 
-	LockAcquireExtended(&locktag, AccessExclusiveLock, true, false, false);
+	LockAcquireExtended(&locktag, AccessExclusiveLock, true, false,
+						false, NULL);
 }
 
 static void
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 7b2dcb6..dc0a439 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -105,11 +105,12 @@ void
 LockRelationOid(Oid relid, LOCKMODE lockmode)
 {
 	LOCKTAG		tag;
+	LOCALLOCK  *locallock;
 	LockAcquireResult res;
 
 	SetLocktagRelationOid(&tag, relid);
 
-	res = LockAcquire(&tag, lockmode, false, false);
+	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
 
 	/*
 	 * Now that we have the lock, check for invalidation messages, so that we
@@ -120,9 +121,18 @@ LockRelationOid(Oid relid, LOCKMODE lockmode)
 	 * relcache entry in an undesirable way.  (In the case where our own xact
 	 * modifies the rel, the relcache update happens via
 	 * CommandCounterIncrement, not here.)
+	 *
+	 * However, in corner cases where code acts on tables (usually catalogs)
+	 * recursively, we might get here while still processing invalidation
+	 * messages in some outer execution of this function or a sibling.  The
+	 * "cleared" status of the lock tells us whether we really are done
+	 * absorbing relevant inval messages.
 	 */
-	if (res != LOCKACQUIRE_ALREADY_HELD)
+	if (res != LOCKACQUIRE_ALREADY_CLEAR)
+	{
 		AcceptInvalidationMessages();
+		MarkLockClear(locallock);
+	}
 }
 
 /*
@@ -138,11 +148,12 @@ bool
 ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
 {
 	LOCKTAG		tag;
+	LOCALLOCK  *locallock;
 	LockAcquireResult res;
 
 	SetLocktagRelationOid(&tag, relid);
 
-	res = LockAcquire(&tag, lockmode, false, true);
+	res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
 
 	if (res == LOCKACQUIRE_NOT_AVAIL)
 		return false;
@@ -151,8 +162,11 @@ ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
 	 * Now that we have the lock, check for invalidation messages; see notes
 	 * in LockRelationOid.
 	 */
-	if (res != LOCKACQUIRE_ALREADY_HELD)
+	if (res != LOCKACQUIRE_ALREADY_CLEAR)
+	{
 		AcceptInvalidationMessages();
+		MarkLockClear(locallock);
+	}
 
 	return true;
 }
@@ -199,20 +213,24 @@ void
 LockRelation(Relation relation, LOCKMODE lockmode)
 {
 	LOCKTAG		tag;
+	LOCALLOCK  *locallock;
 	LockAcquireResult res;
 
 	SET_LOCKTAG_RELATION(tag,
 						 relation->rd_lockInfo.lockRelId.dbId,
 						 relation->rd_lockInfo.lockRelId.relId);
 
-	res = LockAcquire(&tag, lockmode, false, false);
+	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
 
 	/*
 	 * Now that we have the lock, check for invalidation messages; see notes
 	 * in LockRelationOid.
 	 */
-	if (res != LOCKACQUIRE_ALREADY_HELD)
+	if (res != LOCKACQUIRE_ALREADY_CLEAR)
+	{
 		AcceptInvalidationMessages();
+		MarkLockClear(locallock);
+	}
 }
 
 /*
@@ -226,13 +244,14 @@ bool
 ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
 {
 	LOCKTAG		tag;
+	LOCALLOCK  *locallock;
 	LockAcquireResult res;
 
 	SET_LOCKTAG_RELATION(tag,
 						 relation->rd_lockInfo.lockRelId.dbId,
 						 relation->rd_lockInfo.lockRelId.relId);
 
-	res = LockAcquire(&tag, lockmode, false, true);
+	res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
 
 	if (res == LOCKACQUIRE_NOT_AVAIL)
 		return false;
@@ -241,8 +260,11 @@ ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
 	 * Now that we have the lock, check for invalidation messages; see notes
 	 * in LockRelationOid.
 	 */
-	if (res != LOCKACQUIRE_ALREADY_HELD)
+	if (res != LOCKACQUIRE_ALREADY_CLEAR)
+	{
 		AcceptInvalidationMessages();
+		MarkLockClear(locallock);
+	}
 
 	return true;
 }
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index dc3d8d9..58d1f32 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -669,6 +669,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
  *		LOCKACQUIRE_NOT_AVAIL		lock not available, and dontWait=true
  *		LOCKACQUIRE_OK				lock successfully acquired
  *		LOCKACQUIRE_ALREADY_HELD	incremented count for lock already held
+ *		LOCKACQUIRE_ALREADY_CLEAR	incremented count for lock already clear
  *
  * In the normal case where dontWait=false and the caller doesn't need to
  * distinguish a freshly acquired lock from one already taken earlier in
@@ -685,7 +686,8 @@ LockAcquire(const LOCKTAG *locktag,
 			bool sessionLock,
 			bool dontWait)
 {
-	return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
+	return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
+							   true, NULL);
 }
 
 /*
@@ -696,13 +698,17 @@ LockAcquire(const LOCKTAG *locktag,
  * caller to note that the lock table is full and then begin taking
  * extreme action to reduce the number of other lock holders before
  * retrying the action.
+ *
+ * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK
+ * table entry if a lock is successfully acquired, or NULL if not.
  */
 LockAcquireResult
 LockAcquireExtended(const LOCKTAG *locktag,
 					LOCKMODE lockmode,
 					bool sessionLock,
 					bool dontWait,
-					bool reportMemoryError)
+					bool reportMemoryError,
+					LOCALLOCK **locallockp)
 {
 	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
 	LockMethod	lockMethodTable;
@@ -766,9 +772,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
 		locallock->proclock = NULL;
 		locallock->hashcode = LockTagHashCode(&(localtag.lock));
 		locallock->nLocks = 0;
+		locallock->holdsStrongLockCount = false;
+		locallock->lockCleared = false;
 		locallock->numLockOwners = 0;
 		locallock->maxLockOwners = 8;
-		locallock->holdsStrongLockCount = false;
 		locallock->lockOwners = NULL;	/* in case next line fails */
 		locallock->lockOwners = (LOCALLOCKOWNER *)
 			MemoryContextAlloc(TopMemoryContext,
@@ -789,13 +796,22 @@ LockAcquireExtended(const LOCKTAG *locktag,
 	}
 	hashcode = locallock->hashcode;
 
+	if (locallockp)
+		*locallockp = locallock;
+
 	/*
 	 * If we already hold the lock, we can just increase the count locally.
+	 *
+	 * If lockCleared is already set, caller need not worry about absorbing
+	 * sinval messages related to the lock's object.
 	 */
 	if (locallock->nLocks > 0)
 	{
 		GrantLockLocal(locallock, owner);
-		return LOCKACQUIRE_ALREADY_HELD;
+		if (locallock->lockCleared)
+			return LOCKACQUIRE_ALREADY_CLEAR;
+		else
+			return LOCKACQUIRE_ALREADY_HELD;
 	}
 
 	/*
@@ -877,6 +893,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
 										   hashcode))
 		{
 			AbortStrongLockAcquire();
+			if (locallock->nLocks == 0)
+				RemoveLocalLock(locallock);
+			if (locallockp)
+				*locallockp = NULL;
 			if (reportMemoryError)
 				ereport(ERROR,
 						(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -911,6 +931,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
 	{
 		AbortStrongLockAcquire();
 		LWLockRelease(partitionLock);
+		if (locallock->nLocks == 0)
+			RemoveLocalLock(locallock);
+		if (locallockp)
+			*locallockp = NULL;
 		if (reportMemoryError)
 			ereport(ERROR,
 					(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -976,6 +1000,8 @@ LockAcquireExtended(const LOCKTAG *locktag,
 			LWLockRelease(partitionLock);
 			if (locallock->nLocks == 0)
 				RemoveLocalLock(locallock);
+			if (locallockp)
+				*locallockp = NULL;
 			return LOCKACQUIRE_NOT_AVAIL;
 		}
 
@@ -1646,6 +1672,20 @@ GrantAwaitedLock(void)
 }
 
 /*
+ * MarkLockClear -- mark an acquired lock as "clear"
+ *
+ * This means that we know we have absorbed all sinval messages that other
+ * sessions generated before we acquired this lock, and so we can confidently
+ * assume we know about any catalog changes protected by this lock.
+ */
+void
+MarkLockClear(LOCALLOCK *locallock)
+{
+	Assert(locallock->nLocks > 0);
+	locallock->lockCleared = true;
+}
+
+/*
  * WaitOnLock -- wait to acquire a lock
  *
  * Caller must have set MyProc->heldLocks to reflect locks already held
@@ -1909,6 +1949,15 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
 	if (locallock->nLocks > 0)
 		return true;
 
+	/*
+	 * At this point we can no longer suppose we are clear of invalidation
+	 * messages related to this lock.  Although we'll delete the LOCALLOCK
+	 * object before any intentional return from this routine, it seems worth
+	 * the trouble to explicitly reset lockCleared right now, just in case
+	 * some error prevents us from deleting the LOCALLOCK.
+	 */
+	locallock->lockCleared = false;
+
 	/* Attempt fast release of any lock eligible for the fast path. */
 	if (EligibleForRelationFastPath(locktag, lockmode) &&
 		FastPathLocalUseCount > 0)
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 777da71..ff4df7f 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -408,9 +408,10 @@ typedef struct LOCALLOCK
 	PROCLOCK   *proclock;		/* associated PROCLOCK object, if any */
 	uint32		hashcode;		/* copy of LOCKTAG's hash value */
 	int64		nLocks;			/* total number of times lock is held */
+	bool		holdsStrongLockCount;	/* bumped FastPathStrongRelationLocks */
+	bool		lockCleared;	/* we read all sinval msgs for lock */
 	int			numLockOwners;	/* # of relevant ResourceOwners */
 	int			maxLockOwners;	/* allocated size of array */
-	bool		holdsStrongLockCount;	/* bumped FastPathStrongRelationLocks */
 	LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */
 } LOCALLOCK;
 
@@ -472,7 +473,8 @@ typedef enum
 {
 	LOCKACQUIRE_NOT_AVAIL,		/* lock not available, and dontWait=true */
 	LOCKACQUIRE_OK,				/* lock successfully acquired */
-	LOCKACQUIRE_ALREADY_HELD	/* incremented count for lock already held */
+	LOCKACQUIRE_ALREADY_HELD,	/* incremented count for lock already held */
+	LOCKACQUIRE_ALREADY_CLEAR	/* incremented count for lock already clear */
 } LockAcquireResult;
 
 /* Deadlock states identified by DeadLockCheck() */
@@ -528,8 +530,10 @@ extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag,
 					LOCKMODE lockmode,
 					bool sessionLock,
 					bool dontWait,
-					bool report_memory_error);
+					bool reportMemoryError,
+					LOCALLOCK **locallockp);
 extern void AbortStrongLockAcquire(void);
+extern void MarkLockClear(LOCALLOCK *locallock);
 extern bool LockRelease(const LOCKTAG *locktag,
 			LOCKMODE lockmode, bool sessionLock);
 extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
