diff -ur cvs/src/backend/access/transam/slru.c cvs.build/src/backend/access/transam/slru.c
--- cvs/src/backend/access/transam/slru.c	2009-05-10 19:49:47.000000000 +0200
+++ cvs.build/src/backend/access/transam/slru.c	2009-11-18 10:20:54.000000000 +0100
@@ -58,26 +58,6 @@
 #include "storage/shmem.h"
 #include "miscadmin.h"
 
-
-/*
- * Define segment size.  A page is the same BLCKSZ as is used everywhere
- * else in Postgres.  The segment size can be chosen somewhat arbitrarily;
- * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG
- * or 64K transactions for SUBTRANS.
- *
- * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
- * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where
- * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at
- * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need
- * take no explicit notice of that fact in this module, except when comparing
- * segment and page numbers in SimpleLruTruncate (see PagePrecedes()).
- *
- * Note: this file currently assumes that segment file names will be four
- * hex digits.	This sets a lower bound on the segment size (64K transactions
- * for 32-bit TransactionIds).
- */
-#define SLRU_PAGES_PER_SEGMENT	32
-
 #define SlruFileName(ctl, path, seg) \
 	snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg)
 
diff -ur cvs/src/backend/access/transam/xact.c cvs.build/src/backend/access/transam/xact.c
--- cvs/src/backend/access/transam/xact.c	2009-09-06 08:58:59.000000000 +0200
+++ cvs.build/src/backend/access/transam/xact.c	2009-11-18 10:20:54.000000000 +0100
@@ -1604,8 +1604,8 @@
 	/* close large objects before lower-level cleanup */
 	AtEOXact_LargeObject(true);
 
-	/* NOTIFY commit must come before lower-level cleanup */
-	AtCommit_Notify();
+	/* Insert notifications sent by the NOTIFY command into the queue */
+	AtCommit_NotifyBeforeCommit();
 
 	/* Prevent cancel/die interrupt while cleaning up */
 	HOLD_INTERRUPTS();
@@ -1680,6 +1680,11 @@
 
 	AtEOXact_MultiXact();
 
+	/*
+	 * Clean up Notify buffers and signal listening backends.
+	 */
+	AtCommit_NotifyAfterCommit();
+
 	ResourceOwnerRelease(TopTransactionResourceOwner,
 						 RESOURCE_RELEASE_LOCKS,
 						 true, true);
diff -ur cvs/src/backend/catalog/Makefile cvs.build/src/backend/catalog/Makefile
--- cvs/src/backend/catalog/Makefile	2009-10-31 14:47:46.000000000 +0100
+++ cvs.build/src/backend/catalog/Makefile	2009-11-18 10:20:54.000000000 +0100
@@ -30,7 +30,7 @@
 	pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \
 	pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
 	pg_language.h pg_largeobject.h pg_aggregate.h pg_statistic.h \
-	pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h pg_cast.h \
+	pg_rewrite.h pg_trigger.h pg_description.h pg_cast.h \
 	pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \
 	pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \
 	pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \
diff -ur cvs/src/backend/commands/async.c cvs.build/src/backend/commands/async.c
--- cvs/src/backend/commands/async.c	2009-09-06 08:59:06.000000000 +0200
+++ cvs.build/src/backend/commands/async.c	2009-11-19 00:44:41.000000000 +0100
@@ -14,31 +14,54 @@
 
 /*-------------------------------------------------------------------------
  * New Async Notification Model:
- * 1. Multiple backends on same machine.  Multiple backends listening on
- *	  one relation.  (Note: "listening on a relation" is not really the
- *	  right way to think about it, since the notify names need not have
- *	  anything to do with the names of relations actually in the database.
- *	  But this terminology is all over the code and docs, and I don't feel
- *	  like trying to replace it.)
- *
- * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
- *	  ie, each relname/listenerPID pair.  The "notification" field of the
- *	  tuple is zero when no NOTIFY is pending for that listener, or the PID
- *	  of the originating backend when a cross-backend NOTIFY is pending.
- *	  (We skip writing to pg_listener when doing a self-NOTIFY, so the
- *	  notification field should never be equal to the listenerPID field.)
- *
- * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
- *	  relname to a list of outstanding NOTIFY requests.  Actual processing
- *	  happens if and only if we reach transaction commit.  At that time (in
- *	  routine AtCommit_Notify) we scan pg_listener for matching relnames.
- *	  If the listenerPID in a matching tuple is ours, we just send a notify
- *	  message to our own front end.  If it is not ours, and "notification"
- *	  is not already nonzero, we set notification to our own PID and send a
- *	  PROCSIG_NOTIFY_INTERRUPT signal to the receiving process (indicated by
- *	  listenerPID).
- *	  BTW: if the signal operation fails, we presume that the listener backend
- *	  crashed without removing this tuple, and remove the tuple for it.
+ *
+ * 1. Multiple backends on same machine. Multiple backends listening on
+ *	  several channels. (This was previously called a "relation" even though it
+ *	  is just an identifier and has nothing to do with a database relation.)
+ *
+ * 2. There is one central queue in the form of Slru backed file based storage
+ *    (directory pg_notify/), with several pages mapped into shared memory.
+ *
+ *    There is no central storage of which backend listens on which channel,
+ *    every backend has its own list.
+ *
+ *    Every backend that is listening on at least one channel registers by
+ *    entering its Pid into the array of all backends. It then scans all
+ *    incoming notifications and compares the notified channels with its list.
+ *
+ *    In case there is a match it delivers the corresponding notification to
+ *    its frontend.
+ *
+ * 3. The NOTIFY statement (routine Async_Notify) registers the notification
+ *    in a list which will not be processed until at transaction end. Every
+ *    notification can additionally send a "payload" which is an extra text
+ *    parameter to convey arbitrary information to the recipient.
+ *
+ *    Duplicate notifications from the same transaction are sent out as one
+ *    notification only. This is done to save work when for example a trigger
+ *    on a 2 million row table fires a notification for each row that has been
+ *    changed. If the applications needs to receive every single notification
+ *    that has been sent, it can easily add some unique string into the extra
+ *    payload parameter.
+ *
+ *    Once the transaction commits, AtCommit_NotifyBeforeCommit() performs the
+ *    required changes regarding listeners (Listen/Unlisten) and then adds the
+ *    pending notifications to the head of the queue. The head pointer of the
+ *    queue always points to the next free position and a position is just a
+ *    page number and the offset in that page. This is done before marking the
+ *    transaction as committed in clog. If we run into problems writing the
+ *    notifications, we can still call elog(ERROR, ...) and the transaction
+ *    will roll back.
+ *
+ *    Once we have put all of the notifications into the queue, we return to
+ *    CommitTransaction() which will then commit to clog.
+ *
+ *    We are then called another time (AtCommit_NotifyAfterCommit())and check
+ *    if we need to signal the backends.
+ *    In SignalBackends() we scan the list of listening backends and send a
+ *    PROCSIG_NOTIFY_INTERRUPT to every backend that has set its Pid (We don't
+ *    know which backend is listening on which channel so we need to send a
+ *    signal to every listening backend).
  *
  * 4. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
  *	  can call inbound-notify processing immediately if this backend is idle
@@ -46,48 +69,48 @@
  *	  block).  Otherwise the handler may only set a flag, which will cause the
  *	  processing to occur just before we next go idle.
  *
- * 5. Inbound-notify processing consists of scanning pg_listener for tuples
- *	  matching our own listenerPID and having nonzero notification fields.
- *	  For each such tuple, we send a message to our frontend and clear the
- *	  notification field.  BTW: this routine has to start/commit its own
- *	  transaction, since by assumption it is only called from outside any
- *	  transaction.
- *
- * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list
- * of pending actions.	If we reach transaction commit, the changes are
- * applied to pg_listener just before executing any pending NOTIFYs.  This
- * method is necessary because to avoid race conditions, we must hold lock
- * on pg_listener from when we insert a new listener tuple until we commit.
- * To do that and not create undue hazard of deadlock, we don't want to
- * touch pg_listener until we are otherwise done with the transaction;
- * in particular it'd be uncool to still be taking user-commanded locks
- * while holding the pg_listener lock.
- *
- * Although we grab ExclusiveLock on pg_listener for any operation,
- * the lock is never held very long, so it shouldn't cause too much of
- * a performance problem.  (Previously we used AccessExclusiveLock, but
- * there's no real reason to forbid concurrent reads.)
+ * 5. Inbound-notify processing consists of reading all of the notifications
+ *	  that have arrived since scanning last time. We read every notification
+ *	  until we reach the head pointer's position. Then we check if we were the
+ *	  laziest backend: if our pointer is set to the same position as the global
+ *	  tail pointer is set, then we set it further to the second-laziest
+ *	  backend (We can identify it by inspecting the positions of all other
+ *	  backends' pointers). Whenever we move the tail pointer we also truncate
+ *	  now unused pages (i.e. delete files in pg_notify/ that are no longer
+ *	  used).
+ *	  Note that we really read _any_ available notification in the queue. We
+ *	  also read uncommitted notifications from transaction that could still
+ *	  roll back. We must not deliver the notifications of those transactions
+ *	  but just copy them out of the queue. We save them in the
+ *	  uncommittedNotifications list which we try to deliver every time we
+ *	  check for available notifications.
  *
- * An application that listens on the same relname it notifies will get
+ * An application that listens on the same channel it notifies will get
  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
  * by comparing be_pid in the NOTIFY message to the application's own backend's
- * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
+ * Pid.  (As of FE/BE protocol 2.0, the backend's Pid is provided to the
  * frontend during startup.)  The above design guarantees that notifies from
- * other backends will never be missed by ignoring self-notifies.  Note,
- * however, that we do *not* guarantee that a separate frontend message will
- * be sent for every outside NOTIFY.  Since there is only room for one
- * originating PID in pg_listener, outside notifies occurring at about the
- * same time may be collapsed into a single message bearing the PID of the
- * first outside backend to perform the NOTIFY.
+ * other backends will never be missed by ignoring self-notifies.
  *-------------------------------------------------------------------------
  */
 
+/* XXX 
+ *
+ * TODO:
+ *  - guc parameter max_notifies_per_txn ??
+ *  - adapt comments
+ *  - test 2PC
+ *  - write error test
+ */
+
 #include "postgres.h"
 
 #include <unistd.h>
 #include <signal.h>
 
 #include "access/heapam.h"
+#include "access/slru.h"
+#include "access/transam.h"
 #include "access/twophase_rmgr.h"
 #include "access/xact.h"
 #include "catalog/pg_listener.h"
@@ -108,8 +131,8 @@
 
 /*
  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
- * all actions requested in the current transaction.  As explained above,
- * we don't actually modify pg_listener until we reach transaction commit.
+ * all actions requested in the current transaction. As explained above,
+ * we don't actually send notifications until we reach transaction commit.
  *
  * The list is kept in CurTransactionContext.  In subtransactions, each
  * subtransaction has its own list in its own CurTransactionContext, but
@@ -123,6 +146,12 @@
 	LISTEN_UNLISTEN_ALL
 } ListenActionKind;
 
+typedef enum
+{
+	READ_ALL_TO_UNCOMMITTED,
+	READ_ALL_SEND_COMMITTED
+} QueueProcessType;
+
 typedef struct
 {
 	ListenActionKind action;
@@ -133,8 +162,12 @@
 
 static List *upperPendingActions = NIL; /* list of upper-xact lists */
 
+static List *uncommittedNotifications = NIL;
+
+static bool needSignalBackends = false;
+
 /*
- * State for outbound notifies consists of a list of all relnames NOTIFYed
+ * State for outbound notifies consists of a list of all channels NOTIFYed
  * in the current transaction.	We do not actually perform a NOTIFY until
  * and unless the transaction commits.	pendingNotifies is NIL if no
  * NOTIFYs have been done in the current transaction.
@@ -149,12 +182,123 @@
  * condition name, it will get a self-notify at commit.  This is a bit odd
  * but is consistent with our historical behavior.
  */
-static List *pendingNotifies = NIL;		/* list of C strings */
 
+typedef struct Notification
+{
+	char		   *channel;
+	char		   *payload;
+	TransactionId	xid;
+	union {
+		/* we only need one of both, depending on whether we send a
+ 		 * notification or receive one. */
+		int32		dstPid;
+		int32		srcPid;
+	};
+} Notification;
+
+typedef struct AsyncQueueEntry
+{
+	/*
+	 * this record has the maximal length, but usually we limit it to
+	 * AsyncQueueEntryEmptySize + strlen(payload).
+	 */
+	Size			length;
+	Oid				dboid;
+	TransactionId	xid;
+	int32			srcPid;
+	char			channel[NAMEDATALEN];
+	char			payload[NOTIFY_PAYLOAD_MAX_LENGTH];
+} AsyncQueueEntry;
+#define AsyncQueueEntryEmptySize \
+	 (sizeof(AsyncQueueEntry) - NOTIFY_PAYLOAD_MAX_LENGTH + 1)
+
+#define	InvalidPid (-1)
+#define QUEUE_POS_PAGE(x) ((x).page)
+#define QUEUE_POS_OFFSET(x) ((x).offset)
+#define QUEUE_POS_EQUAL(x,y) \
+	 ((x).page == (y).page ? (x).offset == (y).offset : false)
+#define SET_QUEUE_POS(x,y,z) \
+	do { \
+		(x).page = (y); \
+		(x).offset = (z); \
+	} while (0);
+/* does page x logically precede page y with z = HEAD ? */
+#define QUEUE_POS_MIN(x,y,z) \
+	asyncQueuePagePrecedesLogically((x).page, (y).page, (z).page) ? (x) : \
+		 asyncQueuePagePrecedesLogically((y).page, (x).page, (z).page) ? (y) : \
+			 (x).offset < (y).offset ? (x) : \
+			 	(y)
+#define QUEUE_BACKEND_POS(i) asyncQueueControl->backend[(i)].pos
+#define QUEUE_BACKEND_PID(i) asyncQueueControl->backend[(i)].pid
+#define QUEUE_HEAD asyncQueueControl->head
+#define QUEUE_TAIL asyncQueueControl->tail
+
+typedef struct QueuePosition
+{
+	int				page;
+	int				offset;
+} QueuePosition;
+
+typedef struct QueueBackendStatus
+{
+	int32			pid;
+	QueuePosition	pos;
+} QueueBackendStatus;
+
+/*
+ * The AsyncQueueControl structure is protected by the AsyncQueueLock.
+ *
+ * In SHARED mode, backends will only inspect their own entries as well as
+ * head and tail pointers. Consequently we can allow a backend to update its
+ * own record while holding only a shared lock (since no other backend will
+ * inspect it).
+ *
+ * In EXCLUSIVE mode, backends can inspect the entries of other backends and
+ * also change head and tail pointers.
+ *
+ * In order to avoid deadlocks, whenever we need both locks, we always first
+ * get AsyncQueueLock and then AsyncCtlLock.
+ */
+typedef struct AsyncQueueControl
+{
+	QueuePosition		head;		/* head points to the next free location */
+	QueuePosition 		tail;		/* the global tail is equivalent to the
+									   tail of the "slowest" backend */
+	TimestampTz			lastQueueFullWarn;	/* when the queue is full we only
+											   want to log that once in a
+											   while */
+	QueueBackendStatus	backend[1];	/* actually this one has as many entries as
+									 * connections are allowed (MaxBackends) */
+	/* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
+} AsyncQueueControl;
+
+static AsyncQueueControl   *asyncQueueControl;
+static SlruCtlData			AsyncCtlData;
+
+#define AsyncCtl					(&AsyncCtlData)
+#define QUEUE_PAGESIZE				BLCKSZ
+#define QUEUE_FULL_WARN_INTERVAL	5000	/* warn at most once every 5s */
+
+/*
+ * slru.c currently assumes that all filenames are four characters of hex
+ * digits. That means that we can use segments 0000 through FFFF.
+ * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us
+ * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0xFFFF.
+ *
+ * It's of course easy to enhance slru.c but those pages give us so much
+ * space already that it doesn't seem worth the trouble...
+ *
+ * It's a legal test case to define QUEUE_MAX_PAGE to a very small multiply of
+ * SLRU_PAGES_PER_SEGMENT to test queue full behaviour.
+ */
+#define QUEUE_MAX_PAGE			(SLRU_PAGES_PER_SEGMENT * 0xFFFF)
+
+static List *pendingNotifies = NIL;				/* list of Notifications */
 static List *upperPendingNotifies = NIL;		/* list of upper-xact lists */
+static List *listenChannels = NIL;	/* list of channels we are listening to */
 
 /*
- * State for inbound notifies consists of two flags: one saying whether
+ * State for inbound notifications consists of two flags: one saying whether
  * the signal handler is currently allowed to call ProcessIncomingNotify
  * directly, and one saying whether the signal has occurred but the handler
  * was not allowed to call ProcessIncomingNotify at the time.
@@ -171,37 +315,148 @@
 
 bool		Trace_notify = false;
 
-
 static void queue_listen(ListenActionKind action, const char *condname);
 static void Async_UnlistenOnExit(int code, Datum arg);
-static void Exec_Listen(Relation lRel, const char *relname);
-static void Exec_Unlisten(Relation lRel, const char *relname);
-static void Exec_UnlistenAll(Relation lRel);
-static void Send_Notify(Relation lRel);
+static bool IsListeningOn(const char *channel);
+static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
+static void Exec_Listen(const char *channel);
+static void Exec_Unlisten(const char *channel);
+static void Exec_UnlistenAll(void);
+static void SignalBackends(void);
+static void Send_Notify(void);
+static bool asyncQueuePagePrecedesPhysically(int p, int q);
+static bool asyncQueuePagePrecedesLogically(int p, int q, int head);
+static bool asyncQueueAdvance(QueuePosition *position, int entryLength);
+static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
+static void asyncQueueEntryToNotification(AsyncQueueEntry *qe, Notification *n);
+static List *asyncQueueAddEntries(List *notifications);
+static bool asyncQueueGetEntriesByPage(QueuePosition *current,
+									   QueuePosition stop,
+									   List **committed,
+									   MemoryContext committedContext,
+									   List **uncommitted,
+									   MemoryContext uncommittedContext);
+static void asyncQueueReadAllNotifications(QueueProcessType type);
+static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(void);
-static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
-static bool AsyncExistsPendingNotify(const char *relname);
+static void NotifyMyFrontEnd(const char *channel,
+							 const char *payload,
+							 int32 dstPid);
+static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
 static void ClearPendingActionsAndNotifies(void);
 
+/*
+ * We will work on the page range of 0..(SLRU_PAGES_PER_SEGMENT * 0xFFFF).
+ * asyncQueuePagePrecedesPhysically just checks numerically without any magic if
+ * one page precedes another one.
+ *
+ * On the other hand, when asyncQueuePagePrecedesLogically does that check, it
+ * takes the current head page number into account. Now if we have wrapped
+ * around, it can happen that p precedes q, even though p > q (if the head page
+ * is in between the two).
+ */ 
+static bool
+asyncQueuePagePrecedesPhysically(int p, int q)
+{
+	return p < q;
+}
+
+static bool
+asyncQueuePagePrecedesLogically(int p, int q, int head)
+{
+	if (p <= head && q <= head)
+		return p < q;
+	if (p > head && q > head)
+		return p < q;
+	if (p <= head)
+	{
+		Assert(q > head);
+		/* q is older */
+		return false;
+	}
+	else
+	{
+		Assert(p > head && q <= head);
+		/* p is older */
+		return true;
+	}
+}
+
+void
+AsyncShmemInit(void)
+{
+	bool	found;
+	int		slotno;
+	Size	size;
+
+	/*
+	 * Remember that sizeof(AsyncQueueControl) already contains one member of
+	 * QueueBackendStatus, so we only need to add the status space requirement
+	 * for MaxBackends-1 backends.
+	 */
+	size = mul_size(MaxBackends-1, sizeof(QueueBackendStatus));
+	size = add_size(size, sizeof(AsyncQueueControl));
+
+	asyncQueueControl = (AsyncQueueControl *)
+		ShmemInitStruct("Async Queue Control", size, &found);
+
+	if (!asyncQueueControl)
+		elog(ERROR, "out of memory");
+
+	if (!found)
+	{
+		int		i;
+		SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
+		SET_QUEUE_POS(QUEUE_TAIL, QUEUE_MAX_PAGE, 0);
+		for (i = 0; i < MaxBackends; i++)
+		{
+			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+			QUEUE_BACKEND_PID(i) = InvalidPid;
+		}
+	}
+
+	AsyncCtl->PagePrecedes = asyncQueuePagePrecedesPhysically;
+	SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFERS, 0,
+				  AsyncCtlLock, "pg_notify");
+	AsyncCtl->do_fsync = false;
+	asyncQueueControl->lastQueueFullWarn = GetCurrentTimestamp();
+
+	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+	LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
+	slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
+	AsyncCtl->shared->page_dirty[slotno] = true;
+	SimpleLruWritePage(AsyncCtl, slotno, NULL);
+	LWLockRelease(AsyncCtlLock);
+	LWLockRelease(AsyncQueueLock);
+
+	SlruScanDirectory(AsyncCtl, QUEUE_MAX_PAGE, true);
+}
+
 
 /*
  * Async_Notify
  *
  *		This is executed by the SQL notify command.
  *
- *		Adds the relation to the list of pending notifies.
+ *		Adds the channel to the list of pending notifies.
  *		Actual notification happens during transaction commit.
  *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  */
 void
-Async_Notify(const char *relname)
+Async_Notify(const char *channel, const char *payload)
 {
+
 	if (Trace_notify)
-		elog(DEBUG1, "Async_Notify(%s)", relname);
+		elog(DEBUG1, "Async_Notify(%s)", channel);
+
+	/*
+	 * XXX - do we now need a guc parameter max_notifies_per_txn?
+	 */ 
 
 	/* no point in making duplicate entries in the list ... */
-	if (!AsyncExistsPendingNotify(relname))
+	if (!AsyncExistsPendingNotify(channel, payload))
 	{
+		Notification *n;
 		/*
 		 * The name list needs to live until end of transaction, so store it
 		 * in the transaction context.
@@ -210,12 +465,21 @@
 
 		oldcontext = MemoryContextSwitchTo(CurTransactionContext);
 
+		n = (Notification *) palloc(sizeof(Notification));
+		/* will set the xid later... */
+		n->xid = InvalidTransactionId;
+		n->channel = pstrdup(channel);
+		if (payload)
+			n->payload = pstrdup(payload);
+		else
+			n->payload = "";
+		n->dstPid = InvalidPid;
+
 		/*
-		 * Ordering of the list isn't important.  We choose to put new entries
-		 * on the front, as this might make duplicate-elimination a tad faster
-		 * when the same condition is signaled many times in a row.
+		 * We want to preserve the order so we need to append every
+		 * notification. See comments at AsyncExistsPendingNotify().
 		 */
-		pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
+		pendingNotifies = lappend(pendingNotifies, n);
 
 		MemoryContextSwitchTo(oldcontext);
 	}
@@ -259,12 +523,12 @@
  *		This is executed by the SQL listen command.
  */
 void
-Async_Listen(const char *relname)
+Async_Listen(const char *channel)
 {
 	if (Trace_notify)
-		elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid);
+		elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
 
-	queue_listen(LISTEN_LISTEN, relname);
+	queue_listen(LISTEN_LISTEN, channel);
 }
 
 /*
@@ -273,16 +537,16 @@
  *		This is executed by the SQL unlisten command.
  */
 void
-Async_Unlisten(const char *relname)
+Async_Unlisten(const char *channel)
 {
 	if (Trace_notify)
-		elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid);
+		elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
 
 	/* If we couldn't possibly be listening, no need to queue anything */
 	if (pendingActions == NIL && !unlistenExitRegistered)
 		return;
 
-	queue_listen(LISTEN_UNLISTEN, relname);
+	queue_listen(LISTEN_UNLISTEN, channel);
 }
 
 /*
@@ -306,8 +570,6 @@
 /*
  * Async_UnlistenOnExit
  *
- *		Clean up the pg_listener table at backend exit.
- *
  *		This is executed if we have done any LISTENs in this backend.
  *		It might not be necessary anymore, if the user UNLISTENed everything,
  *		but we don't try to detect that case.
@@ -315,17 +577,8 @@
 static void
 Async_UnlistenOnExit(int code, Datum arg)
 {
-	/*
-	 * We need to start/commit a transaction for the unlisten, but if there is
-	 * already an active transaction we had better abort that one first.
-	 * Otherwise we'd end up committing changes that probably ought to be
-	 * discarded.
-	 */
 	AbortOutOfAnyTransaction();
-	/* Now we can do the unlisten */
-	StartTransactionCommand();
-	Async_UnlistenAll();
-	CommitTransactionCommand();
+	Exec_UnlistenAll();
 }
 
 /*
@@ -348,10 +601,15 @@
 	/* We can deal with pending NOTIFY though */
 	foreach(p, pendingNotifies)
 	{
-		const char *relname = (const char *) lfirst(p);
+		AsyncQueueEntry qe;
+		Notification   *n;
+
+		n = (Notification *) lfirst(p);
+
+		asyncQueueNotificationToEntry(n, &qe);
 
 		RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
-							   relname, strlen(relname) + 1);
+							   &qe, qe.length);
 	}
 
 	/*
@@ -363,26 +621,24 @@
 }
 
 /*
- * AtCommit_Notify
- *
- *		This is called at transaction commit.
+ * AtCommit_NotifyBeforeCommit
  *
- *		If there are pending LISTEN/UNLISTEN actions, insert or delete
- *		tuples in pg_listener accordingly.
+ *		This is called at transaction commit, before actually committing to
+ *		clog.
  *
- *		If there are outbound notify requests in the pendingNotifies list,
- *		scan pg_listener for matching tuples, and either signal the other
- *		backend or send a message to our own frontend.
+ *		If there are pending LISTEN/UNLISTEN actions, update our
+ *		"listenChannels" list.
  *
- *		NOTE: we are still inside the current transaction, therefore can
- *		piggyback on its committing of changes.
+ *		If there are outbound notify requests in the pendingNotifies list, add
+ *		them to the global queue and signal any backend that is listening.
  */
 void
-AtCommit_Notify(void)
+AtCommit_NotifyBeforeCommit(void)
 {
-	Relation	lRel;
 	ListCell   *p;
 
+	needSignalBackends = false;
+
 	if (pendingActions == NIL && pendingNotifies == NIL)
 		return;					/* no relevant statements in this xact */
 
@@ -397,10 +653,7 @@
 	}
 
 	if (Trace_notify)
-		elog(DEBUG1, "AtCommit_Notify");
-
-	/* Acquire ExclusiveLock on pg_listener */
-	lRel = heap_open(ListenerRelationId, ExclusiveLock);
+		elog(DEBUG1, "AtCommit_NotifyBeforeCommit");
 
 	/* Perform any pending listen/unlisten actions */
 	foreach(p, pendingActions)
@@ -410,99 +663,111 @@
 		switch (actrec->action)
 		{
 			case LISTEN_LISTEN:
-				Exec_Listen(lRel, actrec->condname);
+				Exec_Listen(actrec->condname);
 				break;
 			case LISTEN_UNLISTEN:
-				Exec_Unlisten(lRel, actrec->condname);
+				Exec_Unlisten(actrec->condname);
 				break;
 			case LISTEN_UNLISTEN_ALL:
-				Exec_UnlistenAll(lRel);
+				Exec_UnlistenAll();
 				break;
 		}
-
-		/* We must CCI after each action in case of conflicting actions */
-		CommandCounterIncrement();
 	}
 
-	/* Perform any pending notifies */
-	if (pendingNotifies)
-		Send_Notify(lRel);
-
 	/*
-	 * We do NOT release the lock on pg_listener here; we need to hold it
-	 * until end of transaction (which is about to happen, anyway) to ensure
-	 * that notified backends see our tuple updates when they look. Else they
-	 * might disregard the signal, which would make the application programmer
-	 * very unhappy.  Also, this prevents race conditions when we have just
-	 * inserted a listening tuple.
+	 * Perform any pending notifies.
 	 */
-	heap_close(lRel, NoLock);
+	if (pendingNotifies)
+	{
+		needSignalBackends = true;
+		Send_Notify();
+	}
+}
+
+/*
+ * AtCommit_NotifyAfterCommit
+ *
+ *		This is called at transaction commit, after committing to clog.
+ *
+ *		Notify the listening backends.
+ */
+void
+AtCommit_NotifyAfterCommit(void)
+{
+	if (needSignalBackends)
+		SignalBackends();
 
 	ClearPendingActionsAndNotifies();
 
 	if (Trace_notify)
-		elog(DEBUG1, "AtCommit_Notify: done");
+		elog(DEBUG1, "AtCommit_NotifyAfterCommit: done");
+}
+
+/*
+ * This function is executed for every notification found in the queue in order
+ * to check if the current backend is listening on that channel. Not sure if we
+ * should further optimize this, for example convert to a sorted array and
+ * allow binary search on it...
+ */
+static bool
+IsListeningOn(const char *channel)
+{
+	ListCell   *p;
+
+	foreach(p, listenChannels)
+	{
+		char *lchan = (char *) lfirst(p);
+		if (strcmp(lchan, channel) == 0)
+			/* already listening on this channel */
+			return true;
+	}
+	return false;
 }
 
+
 /*
  * Exec_Listen --- subroutine for AtCommit_Notify
  *
- *		Register the current backend as listening on the specified relation.
+ *		Register the current backend as listening on the specified channel.
  */
 static void
-Exec_Listen(Relation lRel, const char *relname)
+Exec_Listen(const char *channel)
 {
-	HeapScanDesc scan;
-	HeapTuple	tuple;
-	Datum		values[Natts_pg_listener];
-	bool		nulls[Natts_pg_listener];
-	NameData	condname;
-	bool		alreadyListener = false;
+	MemoryContext oldcontext;
 
 	if (Trace_notify)
-		elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid);
+		elog(DEBUG1, "Exec_Listen(%s,%d)", channel, MyProcPid);
 
-	/* Detect whether we are already listening on this relname */
-	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
-	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
-	{
-		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
-
-		if (listener->listenerpid == MyProcPid &&
-			strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
-		{
-			alreadyListener = true;
-			/* No need to scan the rest of the table */
-			break;
-		}
-	}
-	heap_endscan(scan);
-
-	if (alreadyListener)
+	/* Detect whether we are already listening on this channel */
+	if (IsListeningOn(channel))
 		return;
 
 	/*
-	 * OK to insert a new tuple
+	 * OK to insert to the list.
 	 */
-	memset(nulls, false, sizeof(nulls));
-
-	namestrcpy(&condname, relname);
-	values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname);
-	values[Anum_pg_listener_listenerpid - 1] = Int32GetDatum(MyProcPid);
-	values[Anum_pg_listener_notification - 1] = Int32GetDatum(0);		/* no notifies pending */
-
-	tuple = heap_form_tuple(RelationGetDescr(lRel), values, nulls);
-
-	simple_heap_insert(lRel, tuple);
-
-#ifdef NOT_USED					/* currently there are no indexes */
-	CatalogUpdateIndexes(lRel, tuple);
-#endif
+	if (listenChannels == NIL)
+	{
+		/*
+		 * This is our first LISTEN, establish our pointer.
+		 */
+		LWLockAcquire(AsyncQueueLock, LW_SHARED);
+		QUEUE_BACKEND_POS(MyBackendId) = QUEUE_HEAD;
+		QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
+		LWLockRelease(AsyncQueueLock);
+		/*
+		 * Actually this is only necessary if we are the first listener
+		 * (The tail pointer needs to be identical with the pointer of at
+		 * least one backend).
+		 */
+		asyncQueueAdvanceTail();
+	}
 
-	heap_freetuple(tuple);
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+	listenChannels = lappend(listenChannels, pstrdup(channel));
+	MemoryContextSwitchTo(oldcontext);
 
 	/*
-	 * now that we are listening, make sure we will unlisten before dying.
+	 * Now that we are listening, make sure we will unlisten before dying.
 	 */
 	if (!unlistenExitRegistered)
 	{
@@ -514,38 +779,53 @@
 /*
  * Exec_Unlisten --- subroutine for AtCommit_Notify
  *
- *		Remove the current backend from the list of listening backends
- *		for the specified relation.
+ *		Remove a specified channel from "listenChannel".
  */
 static void
-Exec_Unlisten(Relation lRel, const char *relname)
+Exec_Unlisten(const char *channel)
 {
-	HeapScanDesc scan;
-	HeapTuple	tuple;
+	ListCell   *p;
+	ListCell   *prev = NULL;
 
 	if (Trace_notify)
-		elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid);
+		elog(DEBUG1, "Exec_Unlisten(%s,%d)", channel, MyProcPid);
 
-	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
-	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+	/* Detect whether we are already listening on this channel */
+	foreach(p, listenChannels)
 	{
-		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
-
-		if (listener->listenerpid == MyProcPid &&
-			strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
+		char *lchan = (char *) lfirst(p);
+		if (strcmp(lchan, channel) == 0)
 		{
-			/* Found the matching tuple, delete it */
-			simple_heap_delete(lRel, &tuple->t_self);
-
 			/*
-			 * We assume there can be only one match, so no need to scan the
-			 * rest of the table
+			 * Since the list is living in the TopMemoryContext, we free
+			 * the memory. The ListCell is freed by list_delete_cell().
 			 */
-			break;
+			pfree(lchan);
+			listenChannels = list_delete_cell(listenChannels, p, prev);
+			if (listenChannels == NIL)
+			{
+				bool advanceTail = false;
+				/*
+				 * This backend is not listening anymore.
+				 */
+				LWLockAcquire(AsyncQueueLock, LW_SHARED);
+				QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
+
+				/*
+				 * If we have been the last backend, advance the tail pointer.
+				 */
+				if (QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL))
+					advanceTail = true;
+				LWLockRelease(AsyncQueueLock);
+
+				if (advanceTail)
+					asyncQueueAdvanceTail();
+			}
+			return;
 		}
+		prev = p;
 	}
-	heap_endscan(scan);
-
+	
 	/*
 	 * We do not complain about unlistening something not being listened;
 	 * should we?
@@ -555,123 +835,300 @@
 /*
  * Exec_UnlistenAll --- subroutine for AtCommit_Notify
  *
- *		Update pg_listener to unlisten all relations for this backend.
+ *		Unlisten on all channels for this backend.
  */
 static void
-Exec_UnlistenAll(Relation lRel)
+Exec_UnlistenAll(void)
 {
-	HeapScanDesc scan;
-	HeapTuple	lTuple;
-	ScanKeyData key[1];
+	bool advanceTail = false;
 
 	if (Trace_notify)
-		elog(DEBUG1, "Exec_UnlistenAll");
+		elog(DEBUG1, "Exec_UnlistenAll(%d)", MyProcPid);
 
-	/* Find and delete all entries with my listenerPID */
-	ScanKeyInit(&key[0],
-				Anum_pg_listener_listenerpid,
-				BTEqualStrategyNumber, F_INT4EQ,
-				Int32GetDatum(MyProcPid));
-	scan = heap_beginscan(lRel, SnapshotNow, 1, key);
+	LWLockAcquire(AsyncQueueLock, LW_SHARED);
+	QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
 
-	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
-		simple_heap_delete(lRel, &lTuple->t_self);
+	/*
+	 * Since the list is living in the TopMemoryContext, we free the memory.
+	 */
+	list_free_deep(listenChannels);
+	listenChannels = NIL;
 
-	heap_endscan(scan);
+	/*
+	 * If we have been the last backend, advance the tail pointer.
+	 */
+	if (QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL))
+		advanceTail = true;
+	LWLockRelease(AsyncQueueLock);
+
+	if (advanceTail)
+		asyncQueueAdvanceTail();
+}
+
+static bool
+asyncQueueIsFull()
+{
+	QueuePosition	lookahead = QUEUE_HEAD;
+	Size remain = QUEUE_PAGESIZE - QUEUE_POS_OFFSET(lookahead) - 1;
+	Size advance = Min(remain, NOTIFY_PAYLOAD_MAX_LENGTH);
+
+	/*
+	 * Check what happens if we wrote a maximally sized entry. Would we go to a
+	 * new page? If not, then our queue can not be full (because we can still
+	 * fill at least the current page with at least one more entry).
+	 */
+	if (!asyncQueueAdvance(&lookahead, advance))
+		return false;
+
+	/*
+	 * The queue is full if with a switch to a new page we reach the page
+	 * of the tail pointer.
+	 */
+	return QUEUE_POS_PAGE(lookahead) == QUEUE_POS_PAGE(QUEUE_TAIL);
 }
 
 /*
- * Send_Notify --- subroutine for AtCommit_Notify
- *
- *		Scan pg_listener for tuples matching our pending notifies, and
- *		either signal the other backend or send a message to our own frontend.
+ * The function advances the position to the next entry. In case we jump to
+ * a new page the function returns true, else false.
  */
+static bool
+asyncQueueAdvance(QueuePosition *position, int entryLength)
+{
+	int		pageno = QUEUE_POS_PAGE(*position);
+	int		offset = QUEUE_POS_OFFSET(*position);
+	bool	pageJump = false;
+
+	/*
+	 * Move to the next writing position: First jump over what we have just
+	 * written or read.
+	 */
+	offset += entryLength;
+	Assert(offset < QUEUE_PAGESIZE);
+
+	/*
+	 * In a second step check if another entry can be written to the page. If
+	 * it does, stay here, we have reached the next position. If not, then we
+	 * need to move on to the next page.
+	 */
+	if (offset + AsyncQueueEntryEmptySize >= QUEUE_PAGESIZE)
+	{
+		pageno++;
+		if (pageno > QUEUE_MAX_PAGE)
+			/* wrap around */
+			pageno = 0;
+		offset = 0;
+		pageJump = true;
+	}
+
+	SET_QUEUE_POS(*position, pageno, offset);
+	return pageJump;
+}
+
+static void
+asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
+{
+		Assert(n->channel);
+		Assert(n->payload);
+		Assert(strlen(n->payload) <= NOTIFY_PAYLOAD_MAX_LENGTH);
+
+		/* The terminator is already included in AsyncQueueEntryEmptySize */
+		qe->length = AsyncQueueEntryEmptySize + strlen(n->payload);
+		qe->srcPid = MyProcPid;
+		qe->dboid = MyDatabaseId;
+		qe->xid = GetCurrentTransactionId();
+		strcpy(qe->channel, n->channel);
+		strcpy(qe->payload, n->payload);
+}
+
 static void
-Send_Notify(Relation lRel)
+asyncQueueEntryToNotification(AsyncQueueEntry *qe, Notification *n)
+{
+	n->channel = pstrdup(qe->channel);
+	n->payload = pstrdup(qe->payload);
+	n->srcPid = qe->srcPid;
+	n->xid = qe->xid;
+}
+
+static List *
+asyncQueueAddEntries(List *notifications)
 {
-	TupleDesc	tdesc = RelationGetDescr(lRel);
-	HeapScanDesc scan;
-	HeapTuple	lTuple,
-				rTuple;
-	Datum		value[Natts_pg_listener];
-	bool		repl[Natts_pg_listener],
-				nulls[Natts_pg_listener];
-
-	/* preset data to update notify column to MyProcPid */
-	memset(nulls, false, sizeof(nulls));
-	memset(repl, false, sizeof(repl));
-	repl[Anum_pg_listener_notification - 1] = true;
-	memset(value, 0, sizeof(value));
-	value[Anum_pg_listener_notification - 1] = Int32GetDatum(MyProcPid);
-
-	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
-
-	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
-	{
-		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
-		char	   *relname = NameStr(listener->relname);
-		int32		listenerPID = listener->listenerpid;
+	int				pageno;
+	int				offset;
+	int				slotno;
+	AsyncQueueEntry	qe;
 
-		if (!AsyncExistsPendingNotify(relname))
-			continue;
+	/*
+	 * Note that we are holding exclusive AsyncQueueLock already.
+	 */
+	LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
+	pageno = QUEUE_POS_PAGE(QUEUE_HEAD);
+	slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
+	AsyncCtl->shared->page_dirty[slotno] = true;
 
-		if (listenerPID == MyProcPid)
+	do
+	{
+		Notification   *n;
+
+		if (asyncQueueIsFull())
 		{
-			/*
-			 * Self-notify: no need to bother with table update. Indeed, we
-			 * *must not* clear the notification field in this path, or we
-			 * could lose an outside notify, which'd be bad for applications
-			 * that ignore self-notify messages.
-			 */
-			if (Trace_notify)
-				elog(DEBUG1, "AtCommit_Notify: notifying self");
+			/* document that we will not go into the if command further down */
+			Assert(QUEUE_POS_OFFSET(QUEUE_HEAD) != 0);
+			break;
+		}
+
+		n = (Notification *) linitial(notifications);
+
+		asyncQueueNotificationToEntry(n, &qe);
 
-			NotifyMyFrontEnd(relname, listenerPID);
+		offset = QUEUE_POS_OFFSET(QUEUE_HEAD);
+		/*
+		 * Check whether or not the entry still fits on the current page.
+		 */
+		if (offset + qe.length < QUEUE_PAGESIZE)
+		{
+			notifications = list_delete_first(notifications);
 		}
 		else
 		{
-			if (Trace_notify)
-				elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
-					 listenerPID);
-
 			/*
-			 * If someone has already notified this listener, we don't bother
-			 * modifying the table, but we do still send a NOTIFY_INTERRUPT
-			 * signal, just in case that backend missed the earlier signal for
-			 * some reason.  It's OK to send the signal first, because the
-			 * other guy can't read pg_listener until we unlock it.
-			 *
-			 * Note: we don't have the other guy's BackendId available, so
-			 * this will incur a search of the ProcSignal table.  That's
-			 * probably not worth worrying about.
+			 * Write a dummy entry to fill up the page. Actually readers will
+			 * only check dboid and since it won't match any reader's database
+			 * oid, they will ignore this entry and move on.
 			 */
-			if (SendProcSignal(listenerPID, PROCSIG_NOTIFY_INTERRUPT,
-							   InvalidBackendId) < 0)
-			{
-				/*
-				 * Get rid of pg_listener entry if it refers to a PID that no
-				 * longer exists.  Presumably, that backend crashed without
-				 * deleting its pg_listener entries. This code used to only
-				 * delete the entry if errno==ESRCH, but as far as I can see
-				 * we should just do it for any failure (certainly at least
-				 * for EPERM too...)
-				 */
-				simple_heap_delete(lRel, &lTuple->t_self);
-			}
-			else if (listener->notification == 0)
-			{
-				/* Rewrite the tuple with my PID in notification column */
-				rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
-				simple_heap_update(lRel, &lTuple->t_self, rTuple);
-
-#ifdef NOT_USED					/* currently there are no indexes */
-				CatalogUpdateIndexes(lRel, rTuple);
-#endif
-			}
+			qe.length = QUEUE_PAGESIZE - offset - 1;
+			qe.dboid = InvalidOid;
+			qe.channel[0] = '\0';
+			qe.payload[0] = '\0';
+			qe.xid = InvalidTransactionId;
 		}
+		memcpy((char*) AsyncCtl->shared->page_buffer[slotno] + offset,
+			   &qe, qe.length);
+
+	} while (!asyncQueueAdvance(&(QUEUE_HEAD), qe.length)
+			 && notifications != NIL);
+
+	if (QUEUE_POS_OFFSET(QUEUE_HEAD) == 0)
+	{
+		/*
+		 * If the next entry needs to go to a new page, prepare that page
+		 * already.
+		 */
+		slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
+		AsyncCtl->shared->page_dirty[slotno] = true;
 	}
+	LWLockRelease(AsyncCtlLock);
+
+	return notifications;
+}
+
+static void
+asyncQueueFullWarning()
+{
+	/*
+	 * Caller must hold exclusive AsyncQueueLock.
+	 */
+	TimestampTz		t = GetCurrentTimestamp();
+	QueuePosition	min = QUEUE_HEAD;
+	int32			minPid = InvalidPid;
+	int				i;
+
+	for (i = 0; i < MaxBackends; i++)
+		if (QUEUE_BACKEND_PID(i) != InvalidPid)
+		{
+			min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i), QUEUE_HEAD);
+			minPid = QUEUE_BACKEND_PID(i);
+		}
 
-	heap_endscan(scan);
+	if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFullWarn,
+								   t, QUEUE_FULL_WARN_INTERVAL))
+	{
+		ereport(WARNING, (errmsg("pg_notify queue is full. Among the slowest backends: %d", minPid)));
+		asyncQueueControl->lastQueueFullWarn = t;
+	}
+}
+
+/*
+ * Send_Notify --- subroutine for AtCommit_Notify
+ *
+ * Add the pending notifications to the queue and signal the listening
+ * backends.
+ *
+ * A full queue is very uncommon and should really not happen, given that we
+ * have so much space available in our slru pages. Nevertheless we need to
+ * deal with this possibility. Note that when we get here we are in the process
+ * of committing our transaction, we have not yet committed to clog but this
+ * would be the next step.
+ */
+static void
+Send_Notify()
+{
+	while (pendingNotifies != NIL)
+	{
+		LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+		while (asyncQueueIsFull())
+		{
+			asyncQueueFullWarning();
+			LWLockRelease(AsyncQueueLock);
+
+			SignalBackends();
+
+			asyncQueueReadAllNotifications(READ_ALL_TO_UNCOMMITTED);
+
+			asyncQueueAdvanceTail();
+			pg_usleep(100 * 1000L); /* 100ms */
+			LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+		}
+		Assert(pendingNotifies != NIL);
+		pendingNotifies = asyncQueueAddEntries(pendingNotifies);
+		LWLockRelease(AsyncQueueLock);
+	}
+}
+
+/*
+ * Send signals to all listening backends. It would be easy here to check
+ * for backends that are already up-to-date, i.e.
+ *
+ *   QUEUE_BACKEND_POS(pid) == QUEUE_HEAD
+ *
+ * but we need to signal them anyway. If we didn't, we would not have the
+ * guarantee that they can deliver their notifications from
+ * uncommittedNotifications.
+ *
+ * Since we know the BackendId and the Pid the signalling is quite cheap.
+ */
+static void
+SignalBackends(void)
+{
+	ListCell	   *p1, *p2;
+	int				i;
+	int32			pid;
+	List		   *pids = NIL;
+	List		   *ids = NIL;
+
+	/* Signal everybody who is LISTENing to any channel. */
+	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+	for (i = 0; i < MaxBackends; i++)
+	{
+		pid = QUEUE_BACKEND_PID(i);
+		if (pid != InvalidPid)
+		{
+			pids = lappend_int(pids, pid);
+			ids = lappend_int(ids, i);
+		}
+	}
+	LWLockRelease(AsyncQueueLock);
+	
+	forboth(p1, pids, p2, ids)
+	{
+		pid = (int32) lfirst_int(p1);
+		i = lfirst_int(p2);
+		/*
+		 * Should we check for failure? Can it happen that a backend
+		 * has crashed without the postmaster starting over?
+		 */
+		if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, i) < 0)
+			elog(WARNING, "Error signalling backend %d", pid);
+	}
 }
 
 /*
@@ -940,29 +1397,211 @@
 }
 
 /*
+ * This function will ask for a page with ReadOnly access and once we have the
+ * lock, we read the whole content and pass back two lists of notifications
+ * that the calling function will deliver then. The first list will contain all
+ * notifications from transactions that have already committed and the second
+ * one will contain uncommitted notifications.
+ *
+ * We stop if we have either reached the stop position or go to a new page.
+ *
+ * If we have reached the stop position, return true, else false.
+ */
+static bool
+asyncQueueGetEntriesByPage(QueuePosition *current, QueuePosition stop,
+						   List **committed, MemoryContext committedContext,
+						   List **uncommitted, MemoryContext uncommittedContext)
+{
+	int				slotno;
+	AsyncQueueEntry	qe;
+	Notification   *n;
+
+	if (QUEUE_POS_EQUAL(*current, stop))
+		return true;
+
+	slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, current->page,
+										InvalidTransactionId);
+	do {
+		char *readPtr = (char *) (AsyncCtl->shared->page_buffer[slotno]);
+		readPtr += current->offset;
+
+		if (QUEUE_POS_EQUAL(*current, stop))
+			break;
+
+		memcpy(&qe, readPtr, AsyncQueueEntryEmptySize);
+
+		if (qe.dboid == MyDatabaseId && IsListeningOn(qe.channel))
+		{
+			MemoryContext	oldcontext;
+
+			/* read the whole entry only if we are really interested in it */
+			if (qe.length > AsyncQueueEntryEmptySize)
+				memcpy(&qe, readPtr, qe.length);
+
+			if (TransactionIdDidCommit(qe.xid))
+			{
+				oldcontext = MemoryContextSwitchTo(committedContext);
+				n = (Notification *) palloc(sizeof(Notification));
+				asyncQueueEntryToNotification(&qe, n);
+				*committed = lappend(*committed, n);
+				MemoryContextSwitchTo(oldcontext);
+			}
+			else
+			{
+				if (!TransactionIdDidAbort(qe.xid))
+				{
+					oldcontext = MemoryContextSwitchTo(uncommittedContext);
+					n = (Notification *) palloc(sizeof(Notification));
+					asyncQueueEntryToNotification(&qe, n);
+					*uncommitted= lappend(*uncommitted, n);
+					MemoryContextSwitchTo(oldcontext);
+				}
+			}
+		}
+		/*
+		 * The call to asyncQueueAdvance just jumps over what we have
+		 * just read. If there is no more space for the next record on the
+		 * current page, it will also switch to the beginning of the next page.
+		 */
+	} while(!asyncQueueAdvance(current, qe.length));
+
+	LWLockRelease(AsyncCtlLock);
+
+	if (QUEUE_POS_EQUAL(*current, stop))
+		return true;
+
+	return false;
+}
+
+static void
+asyncQueueReadAllNotifications(QueueProcessType type)
+{
+	QueuePosition	pos;
+	QueuePosition	oldpos;
+	QueuePosition	head;
+	List		   *notifications;
+	ListCell	   *lc;
+	Notification   *n;
+	bool			advanceTail = false;
+	bool			reachedStop;
+
+	LWLockAcquire(AsyncQueueLock, LW_SHARED);
+	pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
+	head = QUEUE_HEAD;
+	LWLockRelease(AsyncQueueLock);
+
+	/* Nothing to do, we have read all notifications already. */
+	if (QUEUE_POS_EQUAL(pos, head))
+		return;
+
+	do 
+	{
+		/*
+		 * Our stop position is what we found to be the head's position when
+		 * we entered this function. It might have changed already. But if it
+		 * has, we will receive (or have already received and queued) another
+		 * signal and come here again.
+		 *
+		 * We are not holding AsyncQueueLock here! The queue can only extend
+		 * beyond the head pointer (see above) and we leave our backend's
+		 * pointer where it is so nobody will truncate or rewrite pages under
+		 * us.
+		 */
+		reachedStop = false;
+
+		if (type == READ_ALL_TO_UNCOMMITTED)
+			/*
+			 * If the queue is full, we call this in the writing backend.
+			 * if a backend sends more notifications than the queue can hold
+			 * it also needs to read its own notifications from time to time
+			 * such that it can reuse the space of the queue.
+			 */
+			reachedStop = asyncQueueGetEntriesByPage(&pos, head,
+								   &uncommittedNotifications, TopMemoryContext,
+								   &uncommittedNotifications, TopMemoryContext);
+		else
+		{
+			/*
+			 * This is called from ProcessIncomingNotify()
+			 */
+			Assert(type == READ_ALL_SEND_COMMITTED);
+
+			notifications = NIL;
+			reachedStop = asyncQueueGetEntriesByPage(&pos, head,
+								   &notifications, CurrentMemoryContext,
+								   &uncommittedNotifications, TopMemoryContext);
+
+			foreach(lc, notifications)
+			{
+				n = (Notification *) lfirst(lc);
+				NotifyMyFrontEnd(n->channel, n->payload, n->srcPid);
+			}
+		}
+	} while (!reachedStop);
+
+	LWLockAcquire(AsyncQueueLock, LW_SHARED);
+	QUEUE_BACKEND_POS(MyBackendId) = pos;
+	if (QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL))
+		advanceTail = true;
+	LWLockRelease(AsyncQueueLock);
+
+	if (advanceTail)
+		/* Move forward the tail pointer and try to truncate. */
+		asyncQueueAdvanceTail();
+}
+
+static void
+asyncQueueAdvanceTail()
+{
+	QueuePosition	min;
+	int				i;
+	int				tailPage;
+	int				headPage;
+
+	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+	min = QUEUE_HEAD;
+	for (i = 0; i < MaxBackends; i++)
+		if (QUEUE_BACKEND_PID(i) != InvalidPid)
+			min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i), QUEUE_HEAD);
+
+	tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
+	headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
+	QUEUE_TAIL = min;
+	LWLockRelease(AsyncQueueLock);
+
+	/* This is our wraparound check */
+	if (asyncQueuePagePrecedesLogically(tailPage, QUEUE_POS_PAGE(min), headPage)
+		&& asyncQueuePagePrecedesPhysically(tailPage, headPage))
+	{
+		/*
+		 * SimpleLruTruncate() will ask for AsyncCtlLock but will also
+		 * release the lock again.
+		 *
+		 * Don't even bother grabbing the lock if we can only truncate at most
+		 * one page...
+		 */
+		if (QUEUE_POS_PAGE(min) - tailPage > SLRU_PAGES_PER_SEGMENT)
+			SimpleLruTruncate(AsyncCtl, QUEUE_POS_PAGE(min));
+	}
+}
+
+/*
  * ProcessIncomingNotify
  *
  *		Deal with arriving NOTIFYs from other backends.
  *		This is called either directly from the PROCSIG_NOTIFY_INTERRUPT
  *		signal handler, or the next time control reaches the outer idle loop.
- *		Scan pg_listener for arriving notifies, report them to my front end,
- *		and clear the notification field in pg_listener until next time.
+ *		Scan the queue for arriving notifications and report them to my front
+ *		end.
  *
- *		NOTE: since we are outside any transaction, we must create our own.
+ *		NOTE: we are outside of any transaction here.
  */
 static void
 ProcessIncomingNotify(void)
 {
-	Relation	lRel;
-	TupleDesc	tdesc;
-	ScanKeyData key[1];
-	HeapScanDesc scan;
-	HeapTuple	lTuple,
-				rTuple;
-	Datum		value[Natts_pg_listener];
-	bool		repl[Natts_pg_listener],
-				nulls[Natts_pg_listener];
-	bool		catchup_enabled;
+	bool			catchup_enabled;
+
+	Assert(GetCurrentTransactionIdIfAny() == InvalidTransactionId);
 
 	/* Must prevent catchup interrupt while I am running */
 	catchup_enabled = DisableCatchupInterrupt();
@@ -974,64 +1613,36 @@
 
 	notifyInterruptOccurred = 0;
 
-	StartTransactionCommand();
-
-	lRel = heap_open(ListenerRelationId, ExclusiveLock);
-	tdesc = RelationGetDescr(lRel);
-
-	/* Scan only entries with my listenerPID */
-	ScanKeyInit(&key[0],
-				Anum_pg_listener_listenerpid,
-				BTEqualStrategyNumber, F_INT4EQ,
-				Int32GetDatum(MyProcPid));
-	scan = heap_beginscan(lRel, SnapshotNow, 1, key);
-
-	/* Prepare data for rewriting 0 into notification field */
-	memset(nulls, false, sizeof(nulls));
-	memset(repl, false, sizeof(repl));
-	repl[Anum_pg_listener_notification - 1] = true;
-	memset(value, 0, sizeof(value));
-	value[Anum_pg_listener_notification - 1] = Int32GetDatum(0);
-
-	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
-	{
-		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
-		char	   *relname = NameStr(listener->relname);
-		int32		sourcePID = listener->notification;
+	/*
+ 	 * Work on the uncommitted notifications list until we hit the first
+	 * still-running transaction.
+	 */
+	while(uncommittedNotifications != NIL)
+	{
+		ListCell	   *lc;
+		Notification   *n;
 
-		if (sourcePID != 0)
+		n = (Notification *) linitial(uncommittedNotifications);
+		if (TransactionIdDidCommit(n->xid))
 		{
-			/* Notify the frontend */
-
-			if (Trace_notify)
-				elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
-					 relname, (int) sourcePID);
-
-			NotifyMyFrontEnd(relname, sourcePID);
-
-			/*
-			 * Rewrite the tuple with 0 in notification column.
-			 */
-			rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
-			simple_heap_update(lRel, &lTuple->t_self, rTuple);
-
-#ifdef NOT_USED					/* currently there are no indexes */
-			CatalogUpdateIndexes(lRel, rTuple);
-#endif
+			/* could the client have sent an unlisten already? */
+			if (IsListeningOn(n->channel))
+				NotifyMyFrontEnd(n->channel, n->payload, n->srcPid);
+		}
+		else
+		{
+			if (!TransactionIdDidAbort(n->xid))
+				/* n->xid still running */
+				break;
 		}
+		pfree(n->channel);
+		pfree(n->payload);
+		lc = list_head(uncommittedNotifications);
+		uncommittedNotifications
+			= list_delete_cell(uncommittedNotifications, lc, NULL);
 	}
-	heap_endscan(scan);
-
-	/*
-	 * We do NOT release the lock on pg_listener here; we need to hold it
-	 * until end of transaction (which is about to happen, anyway) to ensure
-	 * that other backends see our tuple updates when they look. Otherwise, a
-	 * transaction started after this one might mistakenly think it doesn't
-	 * need to send this backend a new NOTIFY.
-	 */
-	heap_close(lRel, NoLock);
 
-	CommitTransactionCommand();
+	asyncQueueReadAllNotifications(READ_ALL_SEND_COMMITTED);
 
 	/*
 	 * Must flush the notify messages to ensure frontend gets them promptly.
@@ -1051,20 +1662,17 @@
  * Send NOTIFY message to my front end.
  */
 static void
-NotifyMyFrontEnd(char *relname, int32 listenerPID)
+NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
 {
 	if (whereToSendOutput == DestRemote)
 	{
 		StringInfoData buf;
 
 		pq_beginmessage(&buf, 'A');
-		pq_sendint(&buf, listenerPID, sizeof(int32));
-		pq_sendstring(&buf, relname);
+		pq_sendint(&buf, srcPid, sizeof(int32));
+		pq_sendstring(&buf, channel);
 		if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
-		{
-			/* XXX Add parameter string here later */
-			pq_sendstring(&buf, "");
-		}
+			pq_sendstring(&buf, payload);
 		pq_endmessage(&buf);
 
 		/*
@@ -1074,23 +1682,57 @@
 		 */
 	}
 	else
-		elog(INFO, "NOTIFY for %s", relname);
+		elog(INFO, "NOTIFY for %s", channel);
 }
 
-/* Does pendingNotifies include the given relname? */
+/* Does pendingNotifies include the given channel/payload? */
 static bool
-AsyncExistsPendingNotify(const char *relname)
+AsyncExistsPendingNotify(const char *channel, const char *payload)
 {
 	ListCell   *p;
+	Notification *n;
 
-	foreach(p, pendingNotifies)
-	{
-		const char *prelname = (const char *) lfirst(p);
+	if (pendingNotifies == NIL)
+		return false;
 
-		if (strcmp(prelname, relname) == 0)
+	if (payload == NULL)
+		payload = "";
+
+	/*
+	 * We need to append new elements to the end of the list in order to keep
+	 * the order. However, on the other hand we'd like to check the list
+	 * backwards in order to make duplicate-elimination a tad faster when the
+	 * same condition is signaled many times in a row. So as a compromise we
+	 * check the tail element first which we can access directly. If this
+	 * doesn't match, we check the rest of whole list.
+	 */
+
+	n = (Notification *) llast(pendingNotifies);
+	if (strcmp(n->channel, channel) == 0)
+	{
+		Assert(n->payload != NULL);
+		if (strcmp(n->payload, payload) == 0)
 			return true;
 	}
 
+	/*
+	 * Note the difference to foreach(). We stop if p is the last element
+	 * already. So we don't check the last element, we have checked it already.
+ 	 */
+	for(p = list_head(pendingNotifies);
+		p != list_tail(pendingNotifies);
+		p = lnext(p))
+	{
+		n = (Notification *) lfirst(p);
+
+		if (strcmp(n->channel, channel) == 0)
+		{
+			Assert(n->payload != NULL);
+			if (strcmp(n->payload, payload) == 0)
+				return true;
+		}
+	}
+
 	return false;
 }
 
@@ -1124,5 +1766,11 @@
 	 * there is any significant delay before I commit.	OK for now because we
 	 * disallow COMMIT PREPARED inside a transaction block.)
 	 */
-	Async_Notify((char *) recdata);
+	AsyncQueueEntry		*qe = (AsyncQueueEntry *) recdata;
+
+	Assert(qe->dboid == MyDatabaseId);
+	Assert(qe->length == len);
+
+	Async_Notify(qe->channel, qe->payload);
 }
+
diff -ur cvs/src/backend/nodes/copyfuncs.c cvs.build/src/backend/nodes/copyfuncs.c
--- cvs/src/backend/nodes/copyfuncs.c	2009-11-18 10:19:30.000000000 +0100
+++ cvs.build/src/backend/nodes/copyfuncs.c	2009-11-18 10:20:54.000000000 +0100
@@ -2761,6 +2761,7 @@
 	NotifyStmt *newnode = makeNode(NotifyStmt);
 
 	COPY_STRING_FIELD(conditionname);
+	COPY_STRING_FIELD(payload);
 
 	return newnode;
 }
diff -ur cvs/src/backend/nodes/equalfuncs.c cvs.build/src/backend/nodes/equalfuncs.c
--- cvs/src/backend/nodes/equalfuncs.c	2009-11-18 10:19:30.000000000 +0100
+++ cvs.build/src/backend/nodes/equalfuncs.c	2009-11-18 10:20:54.000000000 +0100
@@ -1321,6 +1321,7 @@
 _equalNotifyStmt(NotifyStmt *a, NotifyStmt *b)
 {
 	COMPARE_STRING_FIELD(conditionname);
+	COMPARE_STRING_FIELD(payload);
 
 	return true;
 }
diff -ur cvs/src/backend/nodes/outfuncs.c cvs.build/src/backend/nodes/outfuncs.c
--- cvs/src/backend/nodes/outfuncs.c	2009-11-18 10:19:30.000000000 +0100
+++ cvs.build/src/backend/nodes/outfuncs.c	2009-11-18 10:20:54.000000000 +0100
@@ -1811,6 +1811,7 @@
 	WRITE_NODE_TYPE("NOTIFY");
 
 	WRITE_STRING_FIELD(conditionname);
+	WRITE_STRING_FIELD(payload);
 }
 
 static void
diff -ur cvs/src/backend/nodes/readfuncs.c cvs.build/src/backend/nodes/readfuncs.c
--- cvs/src/backend/nodes/readfuncs.c	2009-10-31 14:47:48.000000000 +0100
+++ cvs.build/src/backend/nodes/readfuncs.c	2009-11-18 10:20:54.000000000 +0100
@@ -231,6 +231,7 @@
 	READ_LOCALS(NotifyStmt);
 
 	READ_STRING_FIELD(conditionname);
+	READ_STRING_FIELD(payload);
 
 	READ_DONE();
 }
diff -ur cvs/src/backend/parser/gram.y cvs.build/src/backend/parser/gram.y
--- cvs/src/backend/parser/gram.y	2009-11-18 10:19:30.000000000 +0100
+++ cvs.build/src/backend/parser/gram.y	2009-11-18 10:20:54.000000000 +0100
@@ -394,7 +394,7 @@
 %type <boolean> opt_varying opt_timezone
 
 %type <ival>	Iconst SignedIconst
-%type <str>		Sconst comment_text
+%type <str>		Sconst comment_text notify_payload
 %type <str>		RoleId opt_granted_by opt_boolean ColId_or_Sconst
 %type <list>	var_list
 %type <str>		ColId ColLabel var_name type_function_name param_name
@@ -5984,10 +5984,16 @@
  *
  *****************************************************************************/
 
-NotifyStmt: NOTIFY ColId
+notify_payload:
+			Sconst								{ $$ = $1; }
+			| /*EMPTY*/							{ $$ = NULL; }
+		;
+
+NotifyStmt: NOTIFY ColId notify_payload
 				{
 					NotifyStmt *n = makeNode(NotifyStmt);
 					n->conditionname = $2;
+					n->payload = $3;
 					$$ = (Node *)n;
 				}
 		;
diff -ur cvs/src/backend/storage/ipc/ipci.c cvs.build/src/backend/storage/ipc/ipci.c
--- cvs/src/backend/storage/ipc/ipci.c	2009-09-06 09:06:21.000000000 +0200
+++ cvs.build/src/backend/storage/ipc/ipci.c	2009-11-18 10:20:54.000000000 +0100
@@ -219,6 +219,7 @@
 	 */
 	BTreeShmemInit();
 	SyncScanShmemInit();
+	AsyncShmemInit();
 
 #ifdef EXEC_BACKEND
 
diff -ur cvs/src/backend/storage/lmgr/lwlock.c cvs.build/src/backend/storage/lmgr/lwlock.c
--- cvs/src/backend/storage/lmgr/lwlock.c	2009-05-10 19:50:21.000000000 +0200
+++ cvs.build/src/backend/storage/lmgr/lwlock.c	2009-11-18 10:22:00.000000000 +0100
@@ -24,6 +24,7 @@
 #include "access/clog.h"
 #include "access/multixact.h"
 #include "access/subtrans.h"
+#include "commands/async.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "storage/ipc.h"
@@ -174,6 +175,9 @@
 	/* multixact.c needs two SLRU areas */
 	numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS;
 
+	/* async.c needs one per page for the AsyncQueue */
+	numLocks += NUM_ASYNC_BUFFERS;
+
 	/*
 	 * Add any requested by loadable modules; for backwards-compatibility
 	 * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
diff -ur cvs/src/backend/tcop/utility.c cvs.build/src/backend/tcop/utility.c
--- cvs/src/backend/tcop/utility.c	2009-11-18 10:19:31.000000000 +0100
+++ cvs.build/src/backend/tcop/utility.c	2009-11-18 10:20:54.000000000 +0100
@@ -875,8 +875,12 @@
 		case T_NotifyStmt:
 			{
 				NotifyStmt *stmt = (NotifyStmt *) parsetree;
-
-				Async_Notify(stmt->conditionname);
+				if (stmt->payload
+					&& strlen(stmt->payload) > NOTIFY_PAYLOAD_MAX_LENGTH - 1)
+					ereport(ERROR,
+							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+							 errmsg("payload string too long")));
+				Async_Notify(stmt->conditionname, stmt->payload);
 			}
 			break;
 
diff -ur cvs/src/bin/initdb/initdb.c cvs.build/src/bin/initdb/initdb.c
--- cvs/src/bin/initdb/initdb.c	2009-11-18 10:19:31.000000000 +0100
+++ cvs.build/src/bin/initdb/initdb.c	2009-11-18 10:20:54.000000000 +0100
@@ -2469,6 +2469,7 @@
 		"pg_xlog",
 		"pg_xlog/archive_status",
 		"pg_clog",
+		"pg_notify",
 		"pg_subtrans",
 		"pg_twophase",
 		"pg_multixact/members",
diff -ur cvs/src/bin/psql/common.c cvs.build/src/bin/psql/common.c
--- cvs/src/bin/psql/common.c	2009-05-10 19:50:30.000000000 +0200
+++ cvs.build/src/bin/psql/common.c	2009-11-18 10:20:54.000000000 +0100
@@ -555,8 +555,8 @@
 
 	while ((notify = PQnotifies(pset.db)))
 	{
-		fprintf(pset.queryFout, _("Asynchronous notification \"%s\" received from server process with PID %d.\n"),
-				notify->relname, notify->be_pid);
+		fprintf(pset.queryFout, _("Asynchronous notification \"%s\" (%s) received from server process with PID %d.\n"),
+				notify->relname, notify->extra, notify->be_pid);
 		fflush(pset.queryFout);
 		PQfreemem(notify);
 	}
diff -ur cvs/src/include/access/slru.h cvs.build/src/include/access/slru.h
--- cvs/src/include/access/slru.h	2009-05-10 19:50:35.000000000 +0200
+++ cvs.build/src/include/access/slru.h	2009-11-18 10:20:54.000000000 +0100
@@ -16,6 +16,25 @@
 #include "access/xlogdefs.h"
 #include "storage/lwlock.h"
 
+/*
+ * Define segment size.  A page is the same BLCKSZ as is used everywhere
+ * else in Postgres.  The segment size can be chosen somewhat arbitrarily;
+ * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG
+ * or 64K transactions for SUBTRANS.
+ *
+ * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+ * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where
+ * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at
+ * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need
+ * take no explicit notice of that fact in this module, except when comparing
+ * segment and page numbers in SimpleLruTruncate (see PagePrecedes()).
+ *
+ * Note: this file currently assumes that segment file names will be four
+ * hex digits.	This sets a lower bound on the segment size (64K transactions
+ * for 32-bit TransactionIds).
+ */
+#define SLRU_PAGES_PER_SEGMENT	32
+
 
 /*
  * Page status codes.  Note that these do not include the "dirty" bit.
diff -ur cvs/src/include/commands/async.h cvs.build/src/include/commands/async.h
--- cvs/src/include/commands/async.h	2009-09-06 09:08:02.000000000 +0200
+++ cvs.build/src/include/commands/async.h	2009-11-18 10:23:41.000000000 +0100
@@ -13,16 +13,30 @@
 #ifndef ASYNC_H
 #define ASYNC_H
 
+/*
+ * How long can a payload string possibly be? Actually it needs to be one
+ * byte less to provide space for the trailing terminating '\0'.
+ */
+#define NOTIFY_PAYLOAD_MAX_LENGTH	8000
+
+/*
+ * How many page slots do we reserve ?
+ */
+#define NUM_ASYNC_BUFFERS			4
+
 extern bool Trace_notify;
 
+extern void AsyncShmemInit(void);
+
 /* notify-related SQL statements */
-extern void Async_Notify(const char *relname);
+extern void Async_Notify(const char *relname, const char *payload);
 extern void Async_Listen(const char *relname);
 extern void Async_Unlisten(const char *relname);
 extern void Async_UnlistenAll(void);
 
 /* perform (or cancel) outbound notify processing at transaction commit */
-extern void AtCommit_Notify(void);
+extern void AtCommit_NotifyBeforeCommit(void);
+extern void AtCommit_NotifyAfterCommit(void);
 extern void AtAbort_Notify(void);
 extern void AtSubStart_Notify(void);
 extern void AtSubCommit_Notify(void);
diff -ur cvs/src/include/nodes/parsenodes.h cvs.build/src/include/nodes/parsenodes.h
--- cvs/src/include/nodes/parsenodes.h	2009-11-18 10:19:31.000000000 +0100
+++ cvs.build/src/include/nodes/parsenodes.h	2009-11-18 10:20:54.000000000 +0100
@@ -2059,6 +2059,7 @@
 {
 	NodeTag		type;
 	char	   *conditionname;	/* condition name to notify */
+	char	   *payload;		/* the payload string to be conveyed */
 } NotifyStmt;
 
 /* ----------------------
diff -ur cvs/src/include/storage/lwlock.h cvs.build/src/include/storage/lwlock.h
--- cvs/src/include/storage/lwlock.h	2009-05-10 19:53:12.000000000 +0200
+++ cvs.build/src/include/storage/lwlock.h	2009-11-18 10:20:54.000000000 +0100
@@ -67,6 +67,8 @@
 	AutovacuumLock,
 	AutovacuumScheduleLock,
 	SyncScanLock,
+	AsyncCtlLock,
+	AsyncQueueLock,
 	/* Individual lock IDs end here */
 	FirstBufMappingLock,
 	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
