diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 723051e..85997f6 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -210,6 +210,10 @@ static bool FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode);
 static bool FastPathTransferRelationLocks(LockMethod lockMethodTable,
 							  const LOCKTAG *locktag, uint32 hashcode);
 static PROCLOCK *FastPathGetRelationLockEntry(LOCALLOCK *locallock);
+static bool GroupLockShouldJumpQueue(LockMethod lockMethodTable,
+						 LOCKMODE lockmode,
+						 LOCK *lock,
+						 PROCLOCK *proclock);
 
 /*
  * To make the fast-path lock mechanism work, we must have some way of
@@ -339,7 +343,8 @@ PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
 static uint32 proclock_hash(const void *key, Size keysize);
 static void RemoveLocalLock(LOCALLOCK *locallock);
 static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
-				 const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode);
+				 PGPROC *leader, const LOCKTAG *locktag, uint32 hashcode,
+				 LOCKMODE lockmode);
 static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
 static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
 static void FinishStrongLockAcquire(void);
@@ -894,8 +899,8 @@ LockAcquireExtended(const LOCKTAG *locktag,
 	 * away anytime.  So we have to use SetupLockInTable() to recompute the
 	 * lock and proclock pointers, even if they're already set.
 	 */
-	proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
-								hashcode, lockmode);
+	proclock = SetupLockInTable(lockMethodTable, MyProc, LockGroupLeader,
+								locktag, hashcode, lockmode);
 	if (!proclock)
 	{
 		AbortStrongLockAcquire();
@@ -914,18 +919,27 @@ LockAcquireExtended(const LOCKTAG *locktag,
 
 	/*
 	 * If lock requested conflicts with locks requested by waiters, must join
-	 * wait queue.  Otherwise, check for conflict with already-held locks.
-	 * (That's last because most complex check.)
+	 * wait queue (except for certain cases involving group locking, where
+	 * new lockers must sometimes jump the entire wait queue to avoid
+	 * deadlock).  Otherwise, we can grant ourselves the lock if there are
+	 * no conflicts.
 	 */
 	if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
-		status = STATUS_FOUND;
+	{
+		if (proclock->groupLeader != NULL &&
+			GroupLockShouldJumpQueue(lockMethodTable, lockmode, lock,
+									 proclock))
+			status = STATUS_OK;
+		else
+			status = STATUS_FOUND;
+	}
 	else
 		status = LockCheckConflicts(lockMethodTable, lockmode,
 									lock, proclock);
 
 	if (status == STATUS_OK)
 	{
-		/* No conflict with held or previously requested locks */
+		/* We can and should grant ourselves the lock at once */
 		GrantLock(lock, proclock, lockmode);
 		GrantLockLocal(locallock, owner);
 	}
@@ -1053,7 +1067,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
  * held at exit.
  */
 static PROCLOCK *
-SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
+SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, PGPROC *leader,
 				 const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode)
 {
 	LOCK	   *lock;
@@ -1141,6 +1155,7 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
 	{
 		uint32		partition = LockHashPartition(hashcode);
 
+		proclock->groupLeader = leader;
 		proclock->holdMask = 0;
 		proclock->releaseMask = 0;
 		/* Add proclock to appropriate lists */
@@ -1258,9 +1273,10 @@ RemoveLocalLock(LOCALLOCK *locallock)
  * NOTES:
  *		Here's what makes this complicated: one process's locks don't
  * conflict with one another, no matter what purpose they are held for
- * (eg, session and transaction locks do not conflict).
- * So, we must subtract off our own locks when determining whether the
- * requested new lock conflicts with those already held.
+ * (eg, session and transaction locks do not conflict).  Nor do the locks
+ * of one process in a lock group conflict with those of another process in
+ * the same group.  So, we must subtract off these locks when determining
+ * whether the requested new lock conflicts with those already held.
  */
 int
 LockCheckConflicts(LockMethod lockMethodTable,
@@ -1270,8 +1286,12 @@ LockCheckConflicts(LockMethod lockMethodTable,
 {
 	int			numLockModes = lockMethodTable->numLockModes;
 	LOCKMASK	myLocks;
-	LOCKMASK	otherLocks;
+	int			conflictMask = lockMethodTable->conflictTab[lockmode];
+	int			conflictsRemaining[MAX_LOCKMODES];
+	int			totalConflictsRemaining = 0;
 	int			i;
+	SHM_QUEUE  *procLocks;
+	PROCLOCK   *otherproclock;
 
 	/*
 	 * first check for global conflicts: If no locks conflict with my request,
@@ -1282,44 +1302,215 @@ LockCheckConflicts(LockMethod lockMethodTable,
 	 * type of lock that conflicts with request.   Bitwise compare tells if
 	 * there is a conflict.
 	 */
-	if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask))
+	if (!(conflictMask & lock->grantMask))
 	{
 		PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
 		return STATUS_OK;
 	}
 
 	/*
-	 * Rats.  Something conflicts.  But it could still be my own lock. We have
-	 * to construct a conflict mask that does not reflect our own locks, but
-	 * only lock types held by other processes.
+	 * Rats.  Something conflicts.  But it could still be my own lock, or
+	 * a lock held by another member of my locking group.  First, figure out
+	 * how many conflicts remain after subtracting out any locks I hold
+	 * myself.
 	 */
 	myLocks = proclock->holdMask;
-	otherLocks = 0;
 	for (i = 1; i <= numLockModes; i++)
 	{
-		int			myHolding = (myLocks & LOCKBIT_ON(i)) ? 1 : 0;
+		if ((conflictMask & LOCKBIT_ON(i)) == 0)
+		{
+			conflictsRemaining[i] = 0;
+			continue;
+		}
+		conflictsRemaining[i] = lock->granted[i];
+		if (myLocks & LOCKBIT_ON(i))
+			--conflictsRemaining[i];
+		totalConflictsRemaining += conflictsRemaining[i];
+	}
 
-		if (lock->granted[i] > myHolding)
-			otherLocks |= LOCKBIT_ON(i);
+	/* If no conflicts remain, we get the lock. */
+	if (totalConflictsRemaining == 0)
+	{
+		PROCLOCK_PRINT("LockCheckConflicts: resolved (simple)", proclock);
+		return STATUS_OK;
+	}
+
+	/* If we're not using group locking, this is definitely a conflict. */
+	if (proclock->groupLeader == NULL)
+	{
+		PROCLOCK_PRINT("LockCheckConflicts: conflicting (simple)", proclock);
+		return STATUS_FOUND;
+	}
+
+	/* Important special case: we're the only member of a lock group. */
+	if (proclock->groupLeader == MyProc && MyProc->lockGroupMembers < 2)
+	{
+		Assert(proclock->tag.myProc == MyProc);
+		Assert(MyProc->lockGroupMembers == 1);
+		PROCLOCK_PRINT("LockCheckConflicts: conflicting (trivial group)",
+					   proclock);
+		return STATUS_FOUND;
 	}
 
 	/*
-	 * now check again for conflicts.  'otherLocks' describes the types of
-	 * locks held by other processes.  If one of these conflicts with the kind
-	 * of lock that I want, there is a conflict and I have to sleep.
+	 * Locks held in conflicting modes by members of our own lock group are
+	 * not real conflicts; we can subtract those out and see if we still have
+	 * a conflict.  This is O(N) in the number of processes holding or awaiting
+	 * locks on this object.  We could improve that by making the shared memory
+	 * state more complex (and larger) but it doesn't seem worth it.
 	 */
-	if (!(lockMethodTable->conflictTab[lockmode] & otherLocks))
+	procLocks = &(lock->procLocks);
+	otherproclock = (PROCLOCK *)
+		SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
+	while (otherproclock != NULL)
 	{
-		/* no conflict. OK to get the lock */
-		PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
-		return STATUS_OK;
+		if (proclock != otherproclock &&
+			proclock->groupLeader == otherproclock->groupLeader &&
+			(otherproclock->holdMask & conflictMask) != 0)
+		{
+			int	intersectMask = otherproclock->holdMask & conflictMask;
+
+			for (i = 1; i <= numLockModes; i++)
+			{
+				if ((intersectMask & LOCKBIT_ON(i)) != 0)
+				{
+					if (conflictsRemaining[i] <= 0)
+						elog(PANIC, "proclocks held do not match lock");
+					conflictsRemaining[i]--;
+					totalConflictsRemaining--;
+				}
+			}
+
+			if (totalConflictsRemaining == 0)
+			{
+				PROCLOCK_PRINT("LockCheckConflicts: resolved (group)",
+							   proclock);
+				return STATUS_OK;
+			}
+		}
+		otherproclock = (PROCLOCK *)
+			SHMQueueNext(procLocks, &proclock->lockLink,
+						 offsetof(PROCLOCK, lockLink));
 	}
 
-	PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock);
+	/* Nope, it's a real conflict. */
+	PROCLOCK_PRINT("LockCheckConflicts: conflicting (group)", proclock);
 	return STATUS_FOUND;
 }
 
 /*
+ * GroupLockGrantWithoutWait -- should a group lock be granted without
+ *		waiting, despite the presence of conflicting waiters?
+ *
+ * If some member of our locking group already holds a lock on the object,
+ * then we should skip the wait queue and grant ourselves the lock immediately.
+ * This is because we presume lock group members will eventually wait for
+ * each other; thus, if we didn't do this, such situations would result in
+ * an eventual deadlock.  However, if a conflicting lock is present that is
+ * not held by another member of our lock group, then we can't do this.
+ * In that case we'll have to wait despite the deadlock risk and hope for
+ * the best.
+ */
+static bool
+GroupLockShouldJumpQueue(LockMethod lockMethodTable,
+						 LOCKMODE lockmode,
+						 LOCK *lock,
+						 PROCLOCK *proclock)
+{
+	int			numLockModes = lockMethodTable->numLockModes;
+	LOCKMASK	myLocks;
+	int			conflictMask = lockMethodTable->conflictTab[lockmode];
+	int			conflictsRemaining[MAX_LOCKMODES];
+	int			totalConflictsRemaining = 0;
+	int			i;
+	SHM_QUEUE  *procLocks;
+	PROCLOCK   *otherproclock;
+
+	/*
+	 * If we're the only member of the lock group, then clearly no other
+	 * member holds a lock.  We should NOT jump the queue.
+	 */
+	if (proclock->groupLeader == MyProc && MyProc->lockGroupMembers < 2)
+	{
+		Assert(proclock->tag.myProc == MyProc);
+		Assert(MyProc->lockGroupMembers == 1);
+		PROCLOCK_PRINT("GroupLockShouldJumpQueue: trivial group", proclock);
+		return false;
+	}
+
+	/* Count the number of lock conflicts, excluding my own locks. */
+	myLocks = proclock->holdMask;
+	for (i = 1; i <= numLockModes; i++)
+	{
+		if ((conflictMask & LOCKBIT_ON(i)) == 0)
+		{
+			conflictsRemaining[i] = 0;
+			continue;
+		}
+		conflictsRemaining[i] = lock->granted[i];
+		if (myLocks & LOCKBIT_ON(i))
+			--conflictsRemaining[i];
+		totalConflictsRemaining += conflictsRemaining[i];
+	}
+
+	/*
+	 * Search for locks held by other group members.  Even if there are
+	 * no conflicts, we can't exit early yet, because we don't know whether
+	 * any group member actually holds a lock.
+	 */
+	procLocks = &(lock->procLocks);
+	otherproclock = (PROCLOCK *)
+		SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
+	while (otherproclock != NULL)
+	{
+		if (proclock != otherproclock &&
+			proclock->groupLeader == otherproclock->groupLeader &&
+			otherproclock->holdMask != 0)
+		{
+			int	intersectMask = otherproclock->holdMask & conflictMask;
+
+			/*
+			 * Does the group member hold a lock in 1 or more conflicting
+			 * modes?  If so, reduce the count of remaining conflicts by the
+			 * number of such modes.
+			 */
+			if (intersectMask != 0)
+			{
+				for (i = 1; i <= numLockModes; i++)
+				{
+					if ((intersectMask & LOCKBIT_ON(i)) != 0)
+					{
+						if (conflictsRemaining[i] <= 0)
+							elog(PANIC, "proclocks held do not match lock");
+						conflictsRemaining[i]--;
+						totalConflictsRemaining--;
+					}
+				}
+			}
+
+			/*
+			 * Whether there were any conflicting modes here or not, the fact
+			 * that the lock is held at all makes us eligible to jump the
+			 * queue.  But we can only do that once the absence of conflicts
+			 * is established.
+			 */
+			if (totalConflictsRemaining == 0)
+			{
+				PROCLOCK_PRINT("GroupLockShouldJumpQueue: jump", proclock);
+				return true;
+			}
+		}
+		otherproclock = (PROCLOCK *)
+			SHMQueueNext(procLocks, &proclock->lockLink,
+						 offsetof(PROCLOCK, lockLink));
+	}
+
+	/* Either no group members hold locks, or there are conflicts. */
+	PROCLOCK_PRINT("GroupLockShouldJumpQueue: fallthrough", proclock);
+	return false;
+}
+
+/*
  * GrantLock -- update the lock and proclock data structures to show
  *		the lock request has been granted.
  *
@@ -2534,7 +2725,8 @@ FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag
 
 				if (!FAST_PATH_CHECK_LOCKMODE(proc, f, lockmode))
 					continue;
-				proclock = SetupLockInTable(lockMethodTable, proc, locktag,
+				proclock = SetupLockInTable(lockMethodTable, proc,
+											proc->lockGroupLeader, locktag,
 											hashcode, lockmode);
 				if (!proclock)
 				{
@@ -2590,8 +2782,8 @@ FastPathGetRelationLockEntry(LOCALLOCK *locallock)
 		/* Find or create lock object. */
 		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
 
-		proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
-									locallock->hashcode, lockmode);
+		proclock = SetupLockInTable(lockMethodTable, MyProc, LockGroupLeader,
+									locktag, locallock->hashcode, lockmode);
 		if (!proclock)
 		{
 			LWLockRelease(partitionLock);
@@ -3094,6 +3286,9 @@ PostPrepare_Locks(TransactionId xid)
 	PROCLOCKTAG proclocktag;
 	int			partition;
 
+	/* Can't prepare a lock group follower. */
+	Assert(LockGroupLeader == MyProc || LockGroupLeader == NULL);
+
 	/* This is a critical section: any error means big trouble */
 	START_CRIT_SECTION();
 
@@ -3195,6 +3390,19 @@ PostPrepare_Locks(TransactionId xid)
 
 			Assert(proclock->tag.myProc == MyProc);
 
+			/*
+			 * We shouldn't be in a lock group, except for a single-entry
+			 * group for which we're the leader, which is OK.  We need to
+			 * clear the groupLeader pointer in that case, so that the dummy
+			 * PGPROC doesn't end up pointing back to our PGPROC.
+			 */
+			if (proclock->groupLeader != NULL)
+			{
+				Assert(proclock->groupLeader == MyProc);
+				Assert(MyProc->lockGroupMembers == 1);
+				proclock->groupLeader = NULL;
+			}
+
 			lock = proclock->tag.myLock;
 
 			/* Ignore VXID locks */
@@ -3784,6 +3992,7 @@ lock_twophase_recover(TransactionId xid, uint16 info,
 	 */
 	if (!found)
 	{
+		proclock->groupLeader = NULL;
 		proclock->holdMask = 0;
 		proclock->releaseMask = 0;
 		/* Add proclock to appropriate lists */
@@ -4058,7 +4267,8 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
 		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
 
 		proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc,
-									&tag, hashcode, ExclusiveLock);
+									proc->lockGroupLeader, &tag, hashcode,
+									ExclusiveLock);
 		if (!proclock)
 		{
 			LWLockRelease(partitionLock);
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index ea88a24..1af9851 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -64,6 +64,13 @@ PGPROC	   *MyProc = NULL;
 PGXACT	   *MyPgXact = NULL;
 
 /*
+ * If we're not in a lock group, LockGroupLeader will be NULL.  Otherwise,
+ * it should be the set to the leader of the lock group of which we're a
+ * member.  This will be the same as MyProc iff we're the group leader.
+ */
+PGPROC	   *LockGroupLeader = NULL;
+
+/*
  * This spinlock protects the freelist of recycled PGPROC structures.
  * We cannot use an LWLock because the LWLock manager depends on already
  * having a PGPROC and a wait semaphore!  But these structures are touched
@@ -240,18 +247,21 @@ InitProcGlobal(void)
 			/* PGPROC for normal backend, add to freeProcs list */
 			procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs;
 			ProcGlobal->freeProcs = &procs[i];
+			procs[i].procgloballist = &ProcGlobal->freeProcs;
 		}
 		else if (i < MaxConnections + autovacuum_max_workers + 1)
 		{
 			/* PGPROC for AV launcher/worker, add to autovacFreeProcs list */
 			procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs;
 			ProcGlobal->autovacFreeProcs = &procs[i];
+			procs[i].procgloballist = &ProcGlobal->autovacFreeProcs;
 		}
 		else if (i < MaxBackends)
 		{
 			/* PGPROC for bgworker, add to bgworkerFreeProcs list */
 			procs[i].links.next = (SHM_QUEUE *) ProcGlobal->bgworkerFreeProcs;
 			ProcGlobal->bgworkerFreeProcs = &procs[i];
+			procs[i].procgloballist = &ProcGlobal->bgworkerFreeProcs;
 		}
 
 		/* Initialize myProcLocks[] shared memory queues. */
@@ -279,6 +289,7 @@ InitProcess(void)
 {
 	/* use volatile pointer to prevent code rearrangement */
 	volatile PROC_HDR *procglobal = ProcGlobal;
+	PGPROC * volatile * procgloballist;
 
 	/*
 	 * ProcGlobal should be set up already (if we are a backend, we inherit
@@ -297,9 +308,17 @@ InitProcess(void)
 	 */
 	InitializeLatchSupport();
 
+	/* Decide which list should supply our PGPROC. */
+	if (IsAnyAutoVacuumProcess())
+		procgloballist = &procglobal->autovacFreeProcs;
+	else if (IsBackgroundWorker)
+		procgloballist = &procglobal->bgworkerFreeProcs;
+	else
+		procgloballist = &procglobal->freeProcs;
+
 	/*
-	 * Try to get a proc struct from the free list.  If this fails, we must be
-	 * out of PGPROC structures (not to mention semaphores).
+	 * Try to get a proc struct from the appropriate free list.  If this
+	 * fails, we must be out of PGPROC structures (not to mention semaphores).
 	 *
 	 * While we are holding the ProcStructLock, also copy the current shared
 	 * estimate of spins_per_delay to local storage.
@@ -308,21 +327,11 @@ InitProcess(void)
 
 	set_spins_per_delay(procglobal->spins_per_delay);
 
-	if (IsAnyAutoVacuumProcess())
-		MyProc = procglobal->autovacFreeProcs;
-	else if (IsBackgroundWorker)
-		MyProc = procglobal->bgworkerFreeProcs;
-	else
-		MyProc = procglobal->freeProcs;
+	MyProc = *procgloballist;
 
 	if (MyProc != NULL)
 	{
-		if (IsAnyAutoVacuumProcess())
-			procglobal->autovacFreeProcs = (PGPROC *) MyProc->links.next;
-		else if (IsBackgroundWorker)
-			procglobal->bgworkerFreeProcs = (PGPROC *) MyProc->links.next;
-		else
-			procglobal->freeProcs = (PGPROC *) MyProc->links.next;
+		*procgloballist = (PGPROC *) MyProc->links.next;
 		SpinLockRelease(ProcStructLock);
 	}
 	else
@@ -341,6 +350,12 @@ InitProcess(void)
 	MyPgXact = &ProcGlobal->allPgXact[MyProc->pgprocno];
 
 	/*
+	 * Cross-check that the PGPROC is of the type we expect; if this were
+	 * not the case, it would get returned to the wrong list.
+	 */
+	Assert(MyProc->procgloballist == procgloballist);
+
+	/*
 	 * Now that we have a PGPROC, mark ourselves as an active postmaster
 	 * child; this is so that the postmaster can detect it if we exit without
 	 * cleaning up.  (XXX autovac launcher currently doesn't participate in
@@ -375,6 +390,7 @@ InitProcess(void)
 	MyProc->lwWaitLink = NULL;
 	MyProc->waitLock = NULL;
 	MyProc->waitProcLock = NULL;
+	Assert(MyProc->lockGroupLeader == NULL);
 #ifdef USE_ASSERT_CHECKING
 	{
 		int			i;
@@ -538,6 +554,7 @@ InitAuxiliaryProcess(void)
 	MyProc->lwWaitLink = NULL;
 	MyProc->waitLock = NULL;
 	MyProc->waitProcLock = NULL;
+	Assert(MyProc->lockGroupLeader == NULL);
 #ifdef USE_ASSERT_CHECKING
 	{
 		int			i;
@@ -780,16 +797,6 @@ ProcKill(int code, Datum arg)
 	/* Make sure we're out of the sync rep lists */
 	SyncRepCleanupAtProcExit();
 
-#ifdef USE_ASSERT_CHECKING
-	{
-		int			i;
-
-		/* Last process should have released all locks. */
-		for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
-			Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i])));
-	}
-#endif
-
 	/*
 	 * Release any LW locks I am holding.  There really shouldn't be any, but
 	 * it's cheap to check again before we cut the knees off the LWLock
@@ -801,6 +808,50 @@ ProcKill(int code, Datum arg)
 	if (MyReplicationSlot != NULL)
 		ReplicationSlotRelease();
 
+	/* If we're a lock group member, detach from the lock group. */
+	if (LockGroupLeader != NULL && LockGroupLeader != MyProc)
+	{
+		int	members;
+
+		LWLockAcquire(LockGroupLeader->backendLock, LW_EXCLUSIVE);
+		members = --LockGroupLeader->lockGroupMembers;
+		LWLockRelease(LockGroupLeader->backendLock);
+
+		LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
+		MyProc->lockGroupLeader = NULL;
+		LWLockRelease(MyProc->backendLock);
+
+		/* If we're the last member of the lock group, detach the PGPROC. */
+		if (members == 0)
+		{
+			PGPROC * volatile * procgloballist;
+
+#ifdef USE_ASSERT_CHECKING
+			{
+				int			i;
+
+				/* Last process should have released all locks. */
+				for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
+					Assert(SHMQueueEmpty(&(LockGroupLeader->myProcLocks[i])));
+			}
+#endif
+
+			procgloballist = LockGroupLeader->procgloballist;
+
+			SpinLockAcquire(ProcStructLock);
+
+			/* Return PGPROC structure to appropriate freelist */
+			LockGroupLeader->links.next = (SHM_QUEUE *) *procgloballist;
+			*procgloballist = LockGroupLeader;
+
+			/* Update shared estimate of spins_per_delay */
+			procglobal->spins_per_delay =
+				update_spins_per_delay(procglobal->spins_per_delay);
+
+			SpinLockRelease(ProcStructLock);
+		}
+	}
+
 	/*
 	 * Clear MyProc first; then disown the process latch.  This is so that
 	 * signal handlers won't try to clear the process latch after it's no
@@ -810,29 +861,55 @@ ProcKill(int code, Datum arg)
 	MyProc = NULL;
 	DisownLatch(&proc->procLatch);
 
-	SpinLockAcquire(ProcStructLock);
-
-	/* Return PGPROC structure (and semaphore) to appropriate freelist */
-	if (IsAnyAutoVacuumProcess())
-	{
-		proc->links.next = (SHM_QUEUE *) procglobal->autovacFreeProcs;
-		procglobal->autovacFreeProcs = proc;
-	}
-	else if (IsBackgroundWorker)
+	/*
+	 * If we are a lock group leader, we need to check whether any other
+	 * group members are active.  If not, we can declare ourselves to no longer
+	 * be a lock group leader, allowing our PGPROC to be recycled
+	 * immediately.
+	 */
+	if (LockGroupLeader == proc)
 	{
-		proc->links.next = (SHM_QUEUE *) procglobal->bgworkerFreeProcs;
-		procglobal->bgworkerFreeProcs = proc;
+		int	members;
+
+		LWLockAcquire(proc->backendLock, LW_EXCLUSIVE);
+		members = --proc->lockGroupMembers;
+		LWLockRelease(proc->backendLock);
+
+		LockGroupLeader = NULL;
 	}
-	else
+
+	/*
+	 * If we were never a lock group leader or have managed to give up that
+	 * designation, then we can immediately release our PGPROC.  If not, then
+	 * the last group member will do that on exit.
+	 */
+	if (LockGroupLeader == NULL)
 	{
-		proc->links.next = (SHM_QUEUE *) procglobal->freeProcs;
-		procglobal->freeProcs = proc;
-	}
+		PGPROC * volatile * procgloballist;
 
-	/* Update shared estimate of spins_per_delay */
-	procglobal->spins_per_delay = update_spins_per_delay(procglobal->spins_per_delay);
+#ifdef USE_ASSERT_CHECKING
+		{
+			int			i;
 
-	SpinLockRelease(ProcStructLock);
+			/* Last process should have released all locks. */
+			for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
+				Assert(SHMQueueEmpty(&(proc->myProcLocks[i])));
+		}
+#endif
+
+		procgloballist = proc->procgloballist;
+		SpinLockAcquire(ProcStructLock);
+
+		/* Return PGPROC structure (and semaphore) to appropriate freelist */
+		proc->links.next = (SHM_QUEUE *) *procgloballist;
+		*procgloballist = proc;
+
+		/* Update shared estimate of spins_per_delay */
+		procglobal->spins_per_delay =
+			update_spins_per_delay(procglobal->spins_per_delay);
+
+		SpinLockRelease(ProcStructLock);
+	}
 
 	/*
 	 * This process is no longer present in shared memory in any meaningful
@@ -960,18 +1037,53 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
 	bool		early_deadlock = false;
 	bool		allow_autovacuum_cancel = true;
 	int			myWaitStatus;
-	PGPROC	   *proc;
+	PGPROC	   *proc = NULL;
 	int			i;
+	PGPROC	   *groupLeader = LockGroupLeader;
+
+	/*
+	 * Ignore trivial lock groups.
+	 *
+	 * We read MyProc->lockGroupMembers here without a lock.  The read itself
+	 * is atomic; while the value could be changing under us, it can't change
+	 * from a value < 2 to a value >= 2 while any group locks are actually
+	 * present.  Similarly, when iterating over the wait queue, we needn't
+	 * worry that the lock group membership of a process will change under us:
+	 * that's not allowed while a process holds any locks.
+	 */
+	if (MyProc == groupLeader && MyProc->lockGroupMembers >= 2)
+		groupLeader = NULL;
 
 	/*
 	 * Determine where to add myself in the wait queue.
 	 *
-	 * Normally I should go at the end of the queue.  However, if I already
-	 * hold locks that conflict with the request of any previous waiter, put
-	 * myself in the queue just in front of the first such waiter. This is not
-	 * a necessary step, since deadlock detection would move me to before that
-	 * waiter anyway; but it's relatively cheap to detect such a conflict
-	 * immediately, and avoid delaying till deadlock timeout.
+	 * Normally I should go at the end of the queue.  However, if I'm a
+	 * member of a lock group, and some other member of the lock group is
+	 * already waiting for a lock, then add add myself just after the
+	 * existing waiters.  This is necessary for correctness; any code that
+	 * scans the wait queue is entitled to assume that lockers from the same
+	 * group are in consecutive positions in the queue.
+	 */
+	if (groupLeader != NULL)
+	{
+		PGPROC *cproc = (PGPROC *) waitQueue->links.next;
+
+		for (i = 0; i < waitQueue->size; i++)
+		{
+			if (cproc->lockGroupLeader == LockGroupLeader)
+				proc = cproc;
+			else if (proc != NULL)
+				break;
+			cproc = (PGPROC *) cproc->links.next;
+		}
+	}
+
+	/*
+	 * If I already hold locks that conflict with the request of any previous
+	 * waiter, put myself in the queue just in front of the first such waiter.
+	 * This is not a necessary step, since deadlock detection would move me
+	 * to before that waiter anyway; but it's relatively cheap to detect such
+	 * a conflict immediately, and avoid delaying till deadlock timeout.
 	 *
 	 * Special case: if I find I should go in front of some waiter, check to
 	 * see if I conflict with already-held locks or the requests before that
@@ -982,16 +1094,24 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
 	 */
 	if (myHeldLocks != 0)
 	{
+		PGPROC *cproc = (PGPROC *) waitQueue->links.next;
 		LOCKMASK	aheadRequests = 0;
 
-		proc = (PGPROC *) waitQueue->links.next;
 		for (i = 0; i < waitQueue->size; i++)
 		{
+			/*
+			 * If we reached our own lock group in the wait queue without
+			 * finding a conflict, we aren't going to find one at all prior
+			 * to the insertion point, so bail out.
+			 */
+			if (groupLeader != NULL && cproc->lockGroupLeader == groupLeader)
+				break;
+
 			/* Must he wait for me? */
-			if (lockMethodTable->conflictTab[proc->waitLockMode] & myHeldLocks)
+			if (lockMethodTable->conflictTab[cproc->waitLockMode] & myHeldLocks)
 			{
 				/* Must I wait for him ? */
-				if (lockMethodTable->conflictTab[lockmode] & proc->heldLocks)
+				if (lockMethodTable->conflictTab[lockmode] & cproc->heldLocks)
 				{
 					/*
 					 * Yes, so we have a deadlock.  Easiest way to clean up
@@ -1000,7 +1120,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
 					 * a flag to check below, and break out of loop.  Also,
 					 * record deadlock info for later message.
 					 */
-					RememberSimpleDeadLock(MyProc, lockmode, lock, proc);
+					RememberSimpleDeadLock(MyProc, lockmode, lock, cproc);
 					early_deadlock = true;
 					break;
 				}
@@ -1016,22 +1136,19 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
 					GrantAwaitedLock();
 					return STATUS_OK;
 				}
-				/* Break out of loop to put myself before him */
+				/* Break out of loop and put myself before him */
+				proc = cproc;
 				break;
 			}
 			/* Nope, so advance to next waiter */
-			aheadRequests |= LOCKBIT_ON(proc->waitLockMode);
-			proc = (PGPROC *) proc->links.next;
+			aheadRequests |= LOCKBIT_ON(cproc->waitLockMode);
+			cproc = (PGPROC *) cproc->links.next;
 		}
-
-		/*
-		 * If we fall out of loop normally, proc points to waitQueue head, so
-		 * we will insert at tail of queue as desired.
-		 */
 	}
-	else
+
+	if (proc == NULL)
 	{
-		/* I hold no locks, so I can't push in front of anyone. */
+		/* No special case applies, so I can't push in front of anyone. */
 		proc = (PGPROC *) &(waitQueue->links);
 	}
 
@@ -1614,6 +1731,71 @@ check_done:
 		LWLockRelease(LockHashPartitionLockByIndex(i));
 }
 
+/*
+ * BecomeLockGroupLeader - designate process as lock group leader
+ *
+ * Once this function has returned, other processes can join the lock group
+ * by calling BecomLockGroupFollower.
+ */
+void
+BecomeLockGroupLeader(void)
+{
+	/* Can't be leader and follower. */
+	Assert(LockGroupLeader == NULL || LockGroupLeader == MyProc);
+
+	/* This can be called more than once; but we must not redo the work. */
+	if (LockGroupLeader == NULL)
+	{
+		LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
+		Assert(MyProc->lockGroupMembers == 0);
+		Assert(MyProc->lockGroupLeader == NULL);
+		MyProc->lockGroupLeader = MyProc;
+		MyProc->lockGroupLeaderIdentifier = MyProcPid;
+		MyProc->lockGroupMembers = 1;
+		LWLockRelease(MyProc->backendLock);
+	}
+}
+
+/*
+ * BecomeLockGroupFollower - designate process as lock group follower
+ *
+ * This is pretty straightforward except for the possibility that the leader
+ * whose group we're trying to join might exit before we manage to do so;
+ * and the PGPROC might get recycled for an unrelated process.  To avoid
+ * that, we require the caller to pass the PID of the intended PGPROC as
+ * an interlock.  Returns true if we successfully join the intended lock
+ * group, and false if not.
+ */
+bool
+BecomeLockGroupFollower(PGPROC *leader, int pid)
+{
+	bool	ok = false;
+
+	/* Can't become a follower if we already in a lock group. */
+	Assert(LockGroupLeader == NULL);
+
+	/* Can't follow ourselves. */
+	Assert(MyProc != leader);
+
+	/* Try to join the group. */
+	LWLockAcquire(leader->backendLock, LW_EXCLUSIVE);
+	if (leader->lockGroupMembers > 0 &&
+		leader->lockGroupLeaderIdentifier == pid)
+	{
+		ok = true;
+		leader->lockGroupMembers++;
+		LockGroupLeader = leader;
+	}
+	LWLockRelease(leader->backendLock);
+
+	/* Advertise our new leader. */
+	LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
+	MyProc->lockGroupLeader = leader;
+	LWLockRelease(MyProc->backendLock);
+
+	return ok;
+}
+
 
 /*
  * ProcWaitForSignal - wait for a signal from another backend.
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 4c49e3c..b15addb 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -362,6 +362,7 @@ typedef struct PROCLOCK
 	PROCLOCKTAG tag;			/* unique identifier of proclock object */
 
 	/* data */
+	PGPROC	   *groupLeader;	/* group leader, or NULL if no lock group */
 	LOCKMASK	holdMask;		/* bitmask for lock types currently held */
 	LOCKMASK	releaseMask;	/* bitmask for lock types to be released */
 	SHM_QUEUE	lockLink;		/* list link in LOCK's list of proclocks */
@@ -473,7 +474,6 @@ typedef enum
 								 * worker */
 } DeadLockState;
 
-
 /*
  * The lockmgr's shared hash tables are partitioned to reduce contention.
  * To determine which partition a given locktag belongs to, compute the tag's
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index c23f4da..6f842f7 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -77,6 +77,7 @@ struct PGPROC
 {
 	/* proc->links MUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */
 	SHM_QUEUE	links;			/* list link if process is in a list */
+	PGPROC	  **procgloballist;	/* procglobal list that owns this PGPROC */
 
 	PGSemaphoreData sem;		/* ONE semaphore to sleep on */
 	int			waitStatus;		/* STATUS_WAITING, STATUS_OK or STATUS_ERROR */
@@ -142,6 +143,11 @@ struct PGPROC
 	bool		fpVXIDLock;		/* are we holding a fast-path VXID lock? */
 	LocalTransactionId fpLocalTransactionId;	/* lxid for fast-path VXID
 												 * lock */
+
+	/* Support for lock groups. */
+	int			lockGroupMembers; 	/* 0 if not a lock group leader */
+	int			lockGroupLeaderIdentifier;	/* MyProcPid, if I'm a leader */
+	PGPROC	   *lockGroupLeader;	/* lock group leader, if I'm a follower */
 };
 
 /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
@@ -149,6 +155,7 @@ struct PGPROC
 
 extern PGDLLIMPORT PGPROC *MyProc;
 extern PGDLLIMPORT struct PGXACT *MyPgXact;
+extern PGDLLIMPORT PGPROC *LockGroupLeader;
 
 /*
  * Prior to PostgreSQL 9.2, the fields below were stored as part of the
@@ -253,6 +260,8 @@ extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock);
 extern void CheckDeadLock(void);
 extern bool IsWaitingForLock(void);
 extern void LockErrorCleanup(void);
+extern void BecomeLockGroupLeader(void);
+extern bool BecomeLockGroupFollower(PGPROC *leader, int pid);
 
 extern void ProcWaitForSignal(void);
 extern void ProcSendSignal(int pid);
