diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 3b71174..e89ece5 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -196,12 +196,24 @@ typedef struct QueuePosition
 #define QUEUE_POS_EQUAL(x,y) \
 	 ((x).page == (y).page && (x).offset == (y).offset)
 
+/* Returns 1 if x is larger than y, 0 if equal, else -1 */
+#define QUEUE_POS_COMPARE(x,y) \
+	(((x).page > (y).page) ? 1 : \
+	  ((x).page < (y).page) ? -1 : \
+	    ((x).offset > (y).offset ? 1 : ((x).offset == (y).offset ? 0 : -1)))
+
 /* choose logically smaller QueuePosition */
 #define QUEUE_POS_MIN(x,y) \
 	(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
 	 (x).page != (y).page ? (y) : \
 	 (x).offset < (y).offset ? (x) : (y))
 
+/* choose logically smaller QueuePosition */
+#define QUEUE_POS_MAX(x,y) \
+	(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
+	 (x).page != (y).page ? (x) : \
+	 (x).offset < (y).offset ? (y) : (x))
+
 /*
  * Struct describing a listening backend's status
  */
@@ -217,12 +229,13 @@ typedef struct QueueBackendStatus
  * The AsyncQueueControl structure is protected by the AsyncQueueLock.
  *
  * When holding the lock in SHARED mode, backends may only inspect their own
- * entries as well as the head and tail pointers. Consequently we can allow a
- * backend to update its own record while holding only SHARED lock (since no
- * other backend will inspect it).
+ * entries as well as the head, tail, and firstUncommitted pointers. 
+ * Consequently we can allow a backend to update its own record while holding
+ * only SHARED lock (since no other backend will inspect it).
  *
  * When holding the lock in EXCLUSIVE mode, backends can inspect the entries
- * of other backends and also change the head and tail pointers.
+ * of other backends and also change the head, tail and firstUncommitted
+ * pointers.
  *
  * In order to avoid deadlocks, whenever we need both locks, we always first
  * get AsyncQueueLock and then AsyncCtlLock.
@@ -230,12 +243,23 @@ typedef struct QueueBackendStatus
  * Each backend uses the backend[] array entry with index equal to its
  * BackendId (which can range from 1 to MaxBackends).  We rely on this to make
  * SendProcSignal fast.
+ *
+ * In case a long running transaction is causing the size of the queue to grow
+ * we keep track of the first uncommitted transaction to prevent a
+ * connection from having to process a lot of old notifications when it
+ * issues it's first listen.  To facilitate this we allow one process
+ * at a time to advance firstUncommitted by using advancingFirstUncommitted
+ * as a mutex.
  */
 typedef struct AsyncQueueControl
 {
 	QueuePosition head;			/* head points to the next free location */
 	QueuePosition tail;			/* the global tail is equivalent to the tail
 								 * of the "slowest" backend */
+
+	QueuePosition firstUncommitted;
+	int32 advancingFirstUncommitted; /* Backend ID */
+
 	TimestampTz lastQueueFillWarn;		/* time of last queue-full msg */
 	QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
 	/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
@@ -245,6 +269,8 @@ static AsyncQueueControl *asyncQueueControl;
 
 #define QUEUE_HEAD					(asyncQueueControl->head)
 #define QUEUE_TAIL					(asyncQueueControl->tail)
+#define QUEUE_FIRST_UNCOMMITTED		(asyncQueueControl->firstUncommitted)
+#define QUEUE_ADVANCING_FIRST_UNCOMMITTED	(asyncQueueControl->advancingFirstUncommitted)
 #define QUEUE_BACKEND_PID(i)		(asyncQueueControl->backend[i].pid)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
 
@@ -376,6 +402,7 @@ static void asyncQueueFillWarning(void);
 static bool SignalBackends(void);
 static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
+							 volatile QueuePosition *firstUncommitted,
 							 QueuePosition stop,
 							 char *page_buffer);
 static void asyncQueueAdvanceTail(void);
@@ -455,6 +482,9 @@ AsyncShmemInit(void)
 
 		SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
 		SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
+		SET_QUEUE_POS(QUEUE_FIRST_UNCOMMITTED, 0, 0);
+		QUEUE_ADVANCING_FIRST_UNCOMMITTED = 0;
+
 		asyncQueueControl->lastQueueFillWarn = 0;
 		/* zero'th entry won't be used, but let's initialize it anyway */
 		for (i = 0; i <= MaxBackends; i++)
@@ -935,10 +965,12 @@ Exec_ListenPreCommit(void)
 	 * doesn't hurt.
 	 */
 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
-	QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
+	QUEUE_BACKEND_POS(MyBackendId) = QUEUE_FIRST_UNCOMMITTED;
 	QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
 	LWLockRelease(AsyncQueueLock);
 
+	elog(DEBUG1, "Listener registered, queue position is: %i:%i", QUEUE_POS_PAGE(QUEUE_FIRST_UNCOMMITTED),QUEUE_POS_OFFSET(QUEUE_FIRST_UNCOMMITTED));
+
 	/* Now we are listed in the global array, so remember we're listening */
 	amRegisteredListener = true;
 
@@ -1703,7 +1735,9 @@ asyncQueueReadAllNotifications(void)
 	volatile QueuePosition pos;
 	QueuePosition oldpos;
 	QueuePosition head;
+	QueuePosition firstUncommitted;
 	bool		advanceTail;
+	bool		advanceFirstUncommitted;
 
 	/* page_buffer must be adequately aligned, so use a union */
 	union
@@ -1718,6 +1752,18 @@ asyncQueueReadAllNotifications(void)
 	Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
 	pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
 	head = QUEUE_HEAD;
+
+	if (!QUEUE_POS_EQUAL(pos,head) && /* If we aren't bailing out early */
+		(!QUEUE_ADVANCING_FIRST_UNCOMMITTED ||
+		  QUEUE_BACKEND_PID(QUEUE_ADVANCING_FIRST_UNCOMMITTED) == InvalidPid))
+	{
+		advanceFirstUncommitted = true;
+		QUEUE_ADVANCING_FIRST_UNCOMMITTED = MyBackendId;
+		firstUncommitted = QUEUE_FIRST_UNCOMMITTED;
+		elog(DEBUG1, "Attempting to advance first uncommitted from: %i:%i", QUEUE_POS_PAGE(QUEUE_FIRST_UNCOMMITTED),QUEUE_POS_OFFSET(QUEUE_FIRST_UNCOMMITTED));
+	} else
+		advanceFirstUncommitted = false;
+
 	LWLockRelease(AsyncQueueLock);
 
 	if (QUEUE_POS_EQUAL(pos, head))
@@ -1812,8 +1858,9 @@ asyncQueueReadAllNotifications(void)
 			 * rewrite pages under us. Especially we don't want to hold a lock
 			 * while sending the notifications to the frontend.
 			 */
-			reachedStop = asyncQueueProcessPageEntries(&pos, head,
-													   page_buffer.buf);
+			reachedStop = asyncQueueProcessPageEntries(&pos,
+				   advanceFirstUncommitted ? &firstUncommitted : 0,
+				   head, page_buffer.buf);
 		} while (!reachedStop);
 	}
 	PG_CATCH();
@@ -1822,6 +1869,11 @@ asyncQueueReadAllNotifications(void)
 		LWLockAcquire(AsyncQueueLock, LW_SHARED);
 		QUEUE_BACKEND_POS(MyBackendId) = pos;
 		advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
+		if (advanceFirstUncommitted) {
+			QUEUE_FIRST_UNCOMMITTED = QUEUE_POS_MAX(QUEUE_FIRST_UNCOMMITTED, firstUncommitted);
+			QUEUE_ADVANCING_FIRST_UNCOMMITTED = 0;
+			elog(DEBUG1, "Advanced first uncommitted to: %i:%i", QUEUE_POS_PAGE(QUEUE_FIRST_UNCOMMITTED),QUEUE_POS_OFFSET(QUEUE_FIRST_UNCOMMITTED));
+		}
 		LWLockRelease(AsyncQueueLock);
 
 		/* If we were the laziest backend, try to advance the tail pointer */
@@ -1836,6 +1888,11 @@ asyncQueueReadAllNotifications(void)
 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
 	QUEUE_BACKEND_POS(MyBackendId) = pos;
 	advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
+	if (advanceFirstUncommitted) {
+		QUEUE_FIRST_UNCOMMITTED = QUEUE_POS_MAX(QUEUE_FIRST_UNCOMMITTED, firstUncommitted);
+		QUEUE_ADVANCING_FIRST_UNCOMMITTED = 0;
+		elog(DEBUG1, "Advanced first uncommitted to: %i:%i", QUEUE_POS_PAGE(QUEUE_FIRST_UNCOMMITTED),QUEUE_POS_OFFSET(QUEUE_FIRST_UNCOMMITTED));
+	}
 	LWLockRelease(AsyncQueueLock);
 
 	/* If we were the laziest backend, try to advance the tail pointer */
@@ -1861,6 +1918,7 @@ asyncQueueReadAllNotifications(void)
  */
 static bool
 asyncQueueProcessPageEntries(volatile QueuePosition *current,
+							 volatile QueuePosition *firstUncommitted,
 							 QueuePosition stop,
 							 char *page_buffer)
 {
@@ -1928,6 +1986,29 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 				 */
 			}
 		}
+		else if(firstUncommitted && QUEUE_POS_COMPARE(*firstUncommitted,thisentry) <= 0)
+		{
+			/*
+			 * If we are trying to advance firstUncommitted we need
+			 * to check for live transactions at every queue position.
+			 */
+			if(qe->dboid != InvalidOid && TransactionIdIsInProgress(qe->xid))
+			{
+				/*
+				 * We hit an uncommitted transaction so there is no possibility
+				 * to further advance firstUncommitted
+				 */
+				firstUncommitted = 0;
+			}
+		}
+
+		/*
+		 * If we got to here with a valid firstUncommitted pointer then
+		 * we know that all transactions up to thisentry are committed.
+		 * If *firstUncommitted matches thisentry then we can advance.
+		 */
+		if (firstUncommitted && QUEUE_POS_EQUAL(*firstUncommitted,thisentry))
+			*firstUncommitted = *current;
 
 		/* Loop back if we're not at end of page */
 	} while (!reachedEndOfPage);
@@ -1960,6 +2041,7 @@ asyncQueueAdvanceTail(void)
 	}
 	oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
 	QUEUE_TAIL = min;
+	QUEUE_FIRST_UNCOMMITTED = QUEUE_POS_MAX(min, QUEUE_FIRST_UNCOMMITTED);
 	LWLockRelease(AsyncQueueLock);
 
 	/*
