diff -cr cvs.head/src/backend/access/transam/slru.c cvs.build/src/backend/access/transam/slru.c
*** cvs.head/src/backend/access/transam/slru.c	2010-01-05 12:39:22.000000000 +0100
--- cvs.build/src/backend/access/transam/slru.c	2010-01-22 00:42:56.000000000 +0100
***************
*** 58,83 ****
  #include "storage/shmem.h"
  #include "miscadmin.h"
  
- 
- /*
-  * Define segment size.  A page is the same BLCKSZ as is used everywhere
-  * else in Postgres.  The segment size can be chosen somewhat arbitrarily;
-  * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG
-  * or 64K transactions for SUBTRANS.
-  *
-  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
-  * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where
-  * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at
-  * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need
-  * take no explicit notice of that fact in this module, except when comparing
-  * segment and page numbers in SimpleLruTruncate (see PagePrecedes()).
-  *
-  * Note: this file currently assumes that segment file names will be four
-  * hex digits.	This sets a lower bound on the segment size (64K transactions
-  * for 32-bit TransactionIds).
-  */
- #define SLRU_PAGES_PER_SEGMENT	32
- 
  #define SlruFileName(ctl, path, seg) \
  	snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg)
  
--- 58,63 ----
diff -cr cvs.head/src/backend/access/transam/xact.c cvs.build/src/backend/access/transam/xact.c
*** cvs.head/src/backend/access/transam/xact.c	2010-01-20 20:08:24.000000000 +0100
--- cvs.build/src/backend/access/transam/xact.c	2010-01-22 00:53:47.000000000 +0100
***************
*** 1728,1735 ****
  	/* close large objects before lower-level cleanup */
  	AtEOXact_LargeObject(true);
  
! 	/* NOTIFY commit must come before lower-level cleanup */
! 	AtCommit_Notify();
  
  	/* Prevent cancel/die interrupt while cleaning up */
  	HOLD_INTERRUPTS();
--- 1728,1735 ----
  	/* close large objects before lower-level cleanup */
  	AtEOXact_LargeObject(true);
  
! 	/* Insert notifications sent by the NOTIFY command into the queue */
! 	AtCommit_NotifyBeforeCommit();
  
  	/* Prevent cancel/die interrupt while cleaning up */
  	HOLD_INTERRUPTS();
***************
*** 1804,1809 ****
--- 1804,1814 ----
  
  	AtEOXact_MultiXact();
  
+ 	/*
+ 	 * Clean up Notify buffers and signal listening backends.
+ 	 */
+ 	AtCommit_NotifyAfterCommit();
+ 
  	ResourceOwnerRelease(TopTransactionResourceOwner,
  						 RESOURCE_RELEASE_LOCKS,
  						 true, true);
diff -cr cvs.head/src/backend/catalog/Makefile cvs.build/src/backend/catalog/Makefile
*** cvs.head/src/backend/catalog/Makefile	2010-01-06 22:30:05.000000000 +0100
--- cvs.build/src/backend/catalog/Makefile	2010-01-22 00:42:56.000000000 +0100
***************
*** 30,36 ****
  	pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \
  	pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
  	pg_language.h pg_largeobject_metadata.h pg_largeobject.h pg_aggregate.h \
! 	pg_statistic.h pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h \
  	pg_cast.h pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \
  	pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \
  	pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \
--- 30,36 ----
  	pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \
  	pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
  	pg_language.h pg_largeobject_metadata.h pg_largeobject.h pg_aggregate.h \
! 	pg_statistic.h pg_rewrite.h pg_trigger.h pg_description.h \
  	pg_cast.h pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \
  	pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \
  	pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \
diff -cr cvs.head/src/backend/commands/async.c cvs.build/src/backend/commands/async.c
*** cvs.head/src/backend/commands/async.c	2010-01-05 12:39:22.000000000 +0100
--- cvs.build/src/backend/commands/async.c	2010-01-22 00:45:34.000000000 +0100
***************
*** 14,44 ****
  
  /*-------------------------------------------------------------------------
   * New Async Notification Model:
!  * 1. Multiple backends on same machine.  Multiple backends listening on
!  *	  one relation.  (Note: "listening on a relation" is not really the
!  *	  right way to think about it, since the notify names need not have
!  *	  anything to do with the names of relations actually in the database.
!  *	  But this terminology is all over the code and docs, and I don't feel
!  *	  like trying to replace it.)
!  *
!  * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
!  *	  ie, each relname/listenerPID pair.  The "notification" field of the
!  *	  tuple is zero when no NOTIFY is pending for that listener, or the PID
!  *	  of the originating backend when a cross-backend NOTIFY is pending.
!  *	  (We skip writing to pg_listener when doing a self-NOTIFY, so the
!  *	  notification field should never be equal to the listenerPID field.)
!  *
!  * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
!  *	  relname to a list of outstanding NOTIFY requests.  Actual processing
!  *	  happens if and only if we reach transaction commit.  At that time (in
!  *	  routine AtCommit_Notify) we scan pg_listener for matching relnames.
!  *	  If the listenerPID in a matching tuple is ours, we just send a notify
!  *	  message to our own front end.  If it is not ours, and "notification"
!  *	  is not already nonzero, we set notification to our own PID and send a
!  *	  PROCSIG_NOTIFY_INTERRUPT signal to the receiving process (indicated by
!  *	  listenerPID).
!  *	  BTW: if the signal operation fails, we presume that the listener backend
!  *	  crashed without removing this tuple, and remove the tuple for it.
   *
   * 4. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
   *	  can call inbound-notify processing immediately if this backend is idle
--- 14,68 ----
  
  /*-------------------------------------------------------------------------
   * New Async Notification Model:
!  *
!  * 1. Multiple backends on same machine. Multiple backends listening on
!  *	  several channels. (This was previously called a "relation" even though it
!  *	  is just an identifier and has nothing to do with a database relation.)
!  *
!  * 2. There is one central queue in the form of Slru backed file based storage
!  *    (directory pg_notify/), with several pages mapped into shared memory.
!  *
!  *    There is no central storage of which backend listens on which channel,
!  *    every backend has its own list.
!  *
!  *    Every backend that is listening on at least one channel registers by
!  *    entering its Pid into the array of all backends. It then scans all
!  *    incoming notifications and compares the notified channels with its list.
!  *
!  *    In case there is a match it delivers the corresponding notification to
!  *    its frontend.
!  *
!  * 3. The NOTIFY statement (routine Async_Notify) stores the notification
!  *    in a list which will not be processed until at transaction end. Every
!  *    notification can additionally send a "payload" which is an extra text
!  *    parameter to convey arbitrary information to the recipient.
!  *
!  *    Duplicate notifications from the same transaction are sent out as one
!  *    notification only. This is done to save work when for example a trigger
!  *    on a 2 million row table fires a notification for each row that has been
!  *    changed. If the applications needs to receive every single notification
!  *    that has been sent, it can easily add some unique string into the extra
!  *    payload parameter.
!  *
!  *    Once the transaction commits, AtCommit_NotifyBeforeCommit() performs the
!  *    required changes regarding listeners (Listen/Unlisten) and then adds the
!  *    pending notifications to the head of the queue. The head pointer of the
!  *    queue always points to the next free position and a position is just a
!  *    page number and the offset in that page. This is done before marking the
!  *    transaction as committed in clog. If we run into problems writing the
!  *    notifications, we can still call elog(ERROR, ...) and the transaction
!  *    will roll back.
!  *
!  *    Once we have put all of the notifications into the queue, we return to
!  *    CommitTransaction() which will then commit to clog.
!  *
!  *    After clog commit we are called another time
!  *    (AtCommit_NotifyAfterCommit()). Here we check if we need to signal the
!  *    backends. In SignalBackends() we scan the list of listening backends and
!  *    send a PROCSIG_NOTIFY_INTERRUPT to every backend that has set its Pid (we
!  *    don't know which backend is listening on which channel so we need to send
!  *    a signal to every listening backend). We can exclude backends that are
!  *    already up to date.
   *
   * 4. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
   *	  can call inbound-notify processing immediately if this backend is idle
***************
*** 46,97 ****
   *	  block).  Otherwise the handler may only set a flag, which will cause the
   *	  processing to occur just before we next go idle.
   *
!  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
!  *	  matching our own listenerPID and having nonzero notification fields.
!  *	  For each such tuple, we send a message to our frontend and clear the
!  *	  notification field.  BTW: this routine has to start/commit its own
!  *	  transaction, since by assumption it is only called from outside any
!  *	  transaction.
!  *
!  * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list
!  * of pending actions.	If we reach transaction commit, the changes are
!  * applied to pg_listener just before executing any pending NOTIFYs.  This
!  * method is necessary because to avoid race conditions, we must hold lock
!  * on pg_listener from when we insert a new listener tuple until we commit.
!  * To do that and not create undue hazard of deadlock, we don't want to
!  * touch pg_listener until we are otherwise done with the transaction;
!  * in particular it'd be uncool to still be taking user-commanded locks
!  * while holding the pg_listener lock.
!  *
!  * Although we grab ExclusiveLock on pg_listener for any operation,
!  * the lock is never held very long, so it shouldn't cause too much of
!  * a performance problem.  (Previously we used AccessExclusiveLock, but
!  * there's no real reason to forbid concurrent reads.)
   *
!  * An application that listens on the same relname it notifies will get
   * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
   * by comparing be_pid in the NOTIFY message to the application's own backend's
!  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
   * frontend during startup.)  The above design guarantees that notifies from
!  * other backends will never be missed by ignoring self-notifies.  Note,
!  * however, that we do *not* guarantee that a separate frontend message will
!  * be sent for every outside NOTIFY.  Since there is only room for one
!  * originating PID in pg_listener, outside notifies occurring at about the
!  * same time may be collapsed into a single message bearing the PID of the
!  * first outside backend to perform the NOTIFY.
   *-------------------------------------------------------------------------
   */
  
  #include "postgres.h"
  
  #include <unistd.h>
  #include <signal.h>
  
  #include "access/heapam.h"
  #include "access/twophase_rmgr.h"
  #include "access/xact.h"
! #include "catalog/pg_listener.h"
  #include "commands/async.h"
  #include "libpq/libpq.h"
  #include "libpq/pqformat.h"
  #include "miscadmin.h"
--- 70,116 ----
   *	  block).  Otherwise the handler may only set a flag, which will cause the
   *	  processing to occur just before we next go idle.
   *
!  *    Inbound-notify processing consists of reading all of the notifications
!  *	  that have arrived since scanning last time. We read every notification
!  *	  until we reach either a notification from an uncommitted transaction or
!  *	  the head pointer's position. Then we check if we were the laziest
!  *	  backend: if our pointer is set to the same position as the global tail
!  *	  pointer is set, then we set it further to the second-laziest backend (We
!  *	  can identify it by inspecting the positions of all other backends'
!  *	  pointers). Whenever we move the tail pointer we also truncate now unused
!  *	  pages (i.e. delete files in pg_notify/ that are no longer used).
   *
!  * An application that listens on the same channel it notifies will get
   * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
   * by comparing be_pid in the NOTIFY message to the application's own backend's
!  * Pid.  (As of FE/BE protocol 2.0, the backend's Pid is provided to the
   * frontend during startup.)  The above design guarantees that notifies from
!  * other backends will never be missed by ignoring self-notifies.
   *-------------------------------------------------------------------------
   */
  
+ /* XXX 
+  *
+  * TODO:
+  *  - guc parameter max_notifies_per_txn ??
+  *  - 2PC
+  *  - limit to ASCII?
+  *  - check truncation
+  */
+ 
  #include "postgres.h"
  
  #include <unistd.h>
  #include <signal.h>
  
  #include "access/heapam.h"
+ #include "access/slru.h"
+ #include "access/transam.h"
  #include "access/twophase_rmgr.h"
  #include "access/xact.h"
! #include "catalog/pg_type.h"
  #include "commands/async.h"
+ #include "funcapi.h"
  #include "libpq/libpq.h"
  #include "libpq/pqformat.h"
  #include "miscadmin.h"
***************
*** 108,115 ****
  
  /*
   * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
!  * all actions requested in the current transaction.  As explained above,
!  * we don't actually modify pg_listener until we reach transaction commit.
   *
   * The list is kept in CurTransactionContext.  In subtransactions, each
   * subtransaction has its own list in its own CurTransactionContext, but
--- 127,134 ----
  
  /*
   * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
!  * all actions requested in the current transaction. As explained above,
!  * we don't actually send notifications until we reach transaction commit.
   *
   * The list is kept in CurTransactionContext.  In subtransactions, each
   * subtransaction has its own list in its own CurTransactionContext, but
***************
*** 126,132 ****
  typedef struct
  {
  	ListenActionKind action;
! 	char		condname[1];	/* actually, as long as needed */
  } ListenAction;
  
  static List *pendingActions = NIL;		/* list of ListenAction */
--- 145,151 ----
  typedef struct
  {
  	ListenActionKind action;
! 	char		channel[1];	/* actually, as long as needed */
  } ListenAction;
  
  static List *pendingActions = NIL;		/* list of ListenAction */
***************
*** 134,140 ****
  static List *upperPendingActions = NIL; /* list of upper-xact lists */
  
  /*
!  * State for outbound notifies consists of a list of all relnames NOTIFYed
   * in the current transaction.	We do not actually perform a NOTIFY until
   * and unless the transaction commits.	pendingNotifies is NIL if no
   * NOTIFYs have been done in the current transaction.
--- 153,159 ----
  static List *upperPendingActions = NIL; /* list of upper-xact lists */
  
  /*
!  * State for outbound notifies consists of a list of all channels NOTIFYed
   * in the current transaction.	We do not actually perform a NOTIFY until
   * and unless the transaction commits.	pendingNotifies is NIL if no
   * NOTIFYs have been done in the current transaction.
***************
*** 149,160 ****
   * condition name, it will get a self-notify at commit.  This is a bit odd
   * but is consistent with our historical behavior.
   */
- static List *pendingNotifies = NIL;		/* list of C strings */
  
  static List *upperPendingNotifies = NIL;		/* list of upper-xact lists */
  
  /*
!  * State for inbound notifies consists of two flags: one saying whether
   * the signal handler is currently allowed to call ProcessIncomingNotify
   * directly, and one saying whether the signal has occurred but the handler
   * was not allowed to call ProcessIncomingNotify at the time.
--- 168,288 ----
   * condition name, it will get a self-notify at commit.  This is a bit odd
   * but is consistent with our historical behavior.
   */
  
+ typedef struct QueuePosition
+ {
+ 	int				page;
+ 	int				offset;
+ } QueuePosition;
+ 
+ typedef struct Notification
+ {
+ 	char		   *channel;
+ 	char		   *payload;
+ 	TransactionId	xid;
+ 	int32			srcPid;
+ } Notification;
+ 
+ typedef struct AsyncQueueEntry
+ {
+ 	/*
+ 	 * this record has the maximal length, but usually we limit it to
+ 	 * AsyncQueueEntryEmptySize + strlen(payload).
+ 	 */
+ 	Size			length;
+ 	Oid				dboid;
+ 	TransactionId	xid;
+ 	int32			srcPid;
+ 	char			channel[NAMEDATALEN];
+ 	char			payload[NOTIFY_PAYLOAD_MAX_LENGTH];
+ } AsyncQueueEntry;
+ #define AsyncQueueEntryEmptySize \
+ 	 (sizeof(AsyncQueueEntry) - NOTIFY_PAYLOAD_MAX_LENGTH + 1)
+ 
+ #define	InvalidPid				(-1)
+ #define QUEUE_POS_PAGE(x)		((x).page)
+ #define QUEUE_POS_OFFSET(x)		((x).offset)
+ #define QUEUE_POS_EQUAL(x,y) \
+ 	 ((x).page == (y).page ? (x).offset == (y).offset : false)
+ #define SET_QUEUE_POS(x,y,z) \
+ 	do { \
+ 		(x).page = (y); \
+ 		(x).offset = (z); \
+ 	} while (0);
+ /* does page x logically precede page y with z = HEAD ? */
+ #define QUEUE_POS_MIN(x,y,z) \
+ 	asyncQueuePagePrecedesLogically((x).page, (y).page, (z).page) ? (x) : \
+ 		 asyncQueuePagePrecedesLogically((y).page, (x).page, (z).page) ? (y) : \
+ 			 (x).offset < (y).offset ? (x) : \
+ 			 	(y)
+ #define QUEUE_BACKEND_POS(i)		asyncQueueControl->backend[(i)].pos
+ #define QUEUE_BACKEND_PID(i)		asyncQueueControl->backend[(i)].pid
+ #define QUEUE_HEAD					asyncQueueControl->head
+ #define QUEUE_TAIL					asyncQueueControl->tail
+ 
+ typedef struct QueueBackendStatus
+ {
+ 	int32			pid;
+ 	QueuePosition	pos;
+ } QueueBackendStatus;
+ 
+ /*
+  * The AsyncQueueControl structure is protected by the AsyncQueueLock.
+  *
+  * In SHARED mode, backends will only inspect their own entries as well as
+  * head and tail pointers. Consequently we can allow a backend to update its
+  * own record while holding only a shared lock (since no other backend will
+  * inspect it).
+  *
+  * In EXCLUSIVE mode, backends can inspect the entries of other backends and
+  * also change head and tail pointers.
+  *
+  * In order to avoid deadlocks, whenever we need both locks, we always first
+  * get AsyncQueueLock and then AsyncCtlLock.
+  */
+ typedef struct AsyncQueueControl
+ {
+ 	QueuePosition		head;		/* head points to the next free location */
+ 	QueuePosition 		tail;		/* the global tail is equivalent to the
+ 									   tail of the "slowest" backend */
+ 	TimestampTz			lastQueueFillWarn;	/* when the queue is full we only
+ 											   want to log that once in a
+ 											   while */
+ 	QueueBackendStatus	backend[1];	/* actually this one has as many entries as
+ 									 * connections are allowed (MaxBackends) */
+ 	/* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
+ } AsyncQueueControl;
+ 
+ static AsyncQueueControl   *asyncQueueControl;
+ static SlruCtlData			AsyncCtlData;
+ 
+ #define AsyncCtl					(&AsyncCtlData)
+ #define QUEUE_PAGESIZE				BLCKSZ
+ #define QUEUE_FULL_WARN_INTERVAL	5000	/* warn at most once every 5s */
+ 
+ /*
+  * slru.c currently assumes that all filenames are four characters of hex
+  * digits. That means that we can use segments 0000 through FFFF.
+  * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us
+  * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0xFFFF.
+  *
+  * It's of course easy to enhance slru.c but those pages give us so much
+  * space already that it doesn't seem worth the trouble...
+  *
+  * It's an interesting test case to define QUEUE_MAX_PAGE to a very small
+  * multiple of SLRU_PAGES_PER_SEGMENT to test queue full behaviour.
+  */
+ #define QUEUE_MAX_PAGE			(SLRU_PAGES_PER_SEGMENT * 0xFFFF)
+ 
+ static List *pendingNotifies = NIL;				/* list of Notifications */
  static List *upperPendingNotifies = NIL;		/* list of upper-xact lists */
+ static List *listenChannels = NIL;	/* list of channels we are listening to */
+ 
+ /* has this backend sent notifications in the current transaction ? */
+ static bool backendSendsNotifications = false;
  
  /*
!  * State for inbound notifications consists of two flags: one saying whether
   * the signal handler is currently allowed to call ProcessIncomingNotify
   * directly, and one saying whether the signal has occurred but the handler
   * was not allowed to call ProcessIncomingNotify at the time.
***************
*** 171,224 ****
  
  bool		Trace_notify = false;
  
! 
! static void queue_listen(ListenActionKind action, const char *condname);
  static void Async_UnlistenOnExit(int code, Datum arg);
! static void Exec_Listen(Relation lRel, const char *relname);
! static void Exec_Unlisten(Relation lRel, const char *relname);
! static void Exec_UnlistenAll(Relation lRel);
! static void Send_Notify(Relation lRel);
  static void ProcessIncomingNotify(void);
! static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
! static bool AsyncExistsPendingNotify(const char *relname);
  static void ClearPendingActionsAndNotifies(void);
  
  
  /*
   * Async_Notify
   *
   *		This is executed by the SQL notify command.
   *
!  *		Adds the relation to the list of pending notifies.
   *		Actual notification happens during transaction commit.
   *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   */
  void
! Async_Notify(const char *relname)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Notify(%s)", relname);
  
  	/* no point in making duplicate entries in the list ... */
! 	if (!AsyncExistsPendingNotify(relname))
! 	{
! 		/*
! 		 * The name list needs to live until end of transaction, so store it
! 		 * in the transaction context.
! 		 */
! 		MemoryContext oldcontext;
  
! 		oldcontext = MemoryContextSwitchTo(CurTransactionContext);
  
! 		/*
! 		 * Ordering of the list isn't important.  We choose to put new entries
! 		 * on the front, as this might make duplicate-elimination a tad faster
! 		 * when the same condition is signaled many times in a row.
! 		 */
! 		pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
  
! 		MemoryContextSwitchTo(oldcontext);
! 	}
  }
  
  /*
--- 299,465 ----
  
  bool		Trace_notify = false;
  
! static void queue_listen(ListenActionKind action, const char *channel);
  static void Async_UnlistenOnExit(int code, Datum arg);
! static bool IsListeningOn(const char *channel);
! static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
! static void Exec_Listen(const char *channel);
! static void Exec_Unlisten(const char *channel);
! static void Exec_UnlistenAll(void);
! static void SignalBackends(void);
! static void Send_Notify(void);
! static bool asyncQueuePagePrecedesPhysically(int p, int q);
! static bool asyncQueuePagePrecedesLogically(int p, int q, int head);
! static bool asyncQueueAdvance(QueuePosition *position, int entryLength);
! static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
! static void asyncQueueEntryToNotification(AsyncQueueEntry *qe, Notification *n);
! static List *asyncQueueAddEntries(List *notifications);
! static bool asyncQueueGetEntriesByPage(QueuePosition *current,
! 									   QueuePosition stop,
! 									   List **notifications);
! static void asyncQueueReadAllNotifications(void);
! static void asyncQueueAdvanceTail(void);
! static void asyncQueueUnregister(void);
  static void ProcessIncomingNotify(void);
! static void NotifyMyFrontEnd(const char *channel,
! 							 const char *payload,
! 							 int32 srcPid);
! static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
  static void ClearPendingActionsAndNotifies(void);
  
+ /*
+  * We will work on the page range of 0..(SLRU_PAGES_PER_SEGMENT * 0xFFFF).
+  * asyncQueuePagePrecedesPhysically just checks numerically without any magic
+  * if one page precedes another one.
+  *
+  * On the other hand, when asyncQueuePagePrecedesLogically does that check, it
+  * takes the current head page number into account. If we have wrapped
+  * around, it can happen that p precedes q, even though p > q (if the head page
+  * is in between the two).
+  */ 
+ static bool
+ asyncQueuePagePrecedesPhysically(int p, int q)
+ {
+ 	return p < q;
+ }
+ 
+ static bool
+ asyncQueuePagePrecedesLogically(int p, int q, int head)
+ {
+ 	if (p <= head && q <= head)
+ 		return p < q;
+ 	if (p > head && q > head)
+ 		return p < q;
+ 	if (p <= head)
+ 	{
+ 		Assert(q > head);
+ 		/* q is older */
+ 		return false;
+ 	}
+ 	else
+ 	{
+ 		Assert(p > head && q <= head);
+ 		/* p is older */
+ 		return true;
+ 	}
+ }
+ 
+ void
+ AsyncShmemInit(void)
+ {
+ 	bool	found;
+ 	int		slotno;
+ 	Size	size;
+ 
+ 	/*
+ 	 * Remember that sizeof(AsyncQueueControl) already contains one member of
+ 	 * QueueBackendStatus, so we only need to add the status space requirement
+ 	 * for MaxBackends-1 backends.
+ 	 */
+ 	size = mul_size(MaxBackends-1, sizeof(QueueBackendStatus));
+ 	size = add_size(size, sizeof(AsyncQueueControl));
+ 
+ 	asyncQueueControl = (AsyncQueueControl *)
+ 		ShmemInitStruct("Async Queue Control", size, &found);
+ 
+ 	if (!asyncQueueControl)
+ 		elog(ERROR, "out of memory");
+ 
+ 	if (!found)
+ 	{
+ 		int		i;
+ 		SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
+ 		SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
+ 		for (i = 0; i < MaxBackends; i++)
+ 		{
+ 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+ 			QUEUE_BACKEND_PID(i) = InvalidPid;
+ 		}
+ 	}
+ 
+ 	AsyncCtl->PagePrecedes = asyncQueuePagePrecedesPhysically;
+ 	SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFERS, 0,
+ 				  AsyncCtlLock, "pg_notify");
+ 	AsyncCtl->do_fsync = false;
+ 	asyncQueueControl->lastQueueFillWarn = GetCurrentTimestamp();
+ 
+ 	if (!found)
+ 	{
+ 		LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
+ 		slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
+ 		SimpleLruWritePage(AsyncCtl, slotno, NULL);
+ 		LWLockRelease(AsyncCtlLock);
+ 
+ 		SimpleLruTruncate(AsyncCtl, 0);
+ 	}
+ }
  
  /*
   * Async_Notify
   *
   *		This is executed by the SQL notify command.
   *
!  *		Adds the channel to the list of pending notifies.
   *		Actual notification happens during transaction commit.
   *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   */
  void
! Async_Notify(const char *channel, const char *payload)
  {
+ 	Notification *n;
+ 	MemoryContext oldcontext;
+ 
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Notify(%s)", channel);
  
  	/* no point in making duplicate entries in the list ... */
! 	if (AsyncExistsPendingNotify(channel, payload))
! 		return;
! 
! 	/*
! 	 * The name list needs to live until end of transaction, so store it
! 	 * in the transaction context.
! 	 */
! 	oldcontext = MemoryContextSwitchTo(CurTransactionContext);
  
! 	n = (Notification *) palloc(sizeof(Notification));
! 	n->channel = pstrdup(channel);
! 	if (payload)
! 		n->payload = pstrdup(payload);
! 	else
! 		n->payload = "";
  
! 	/* will set the xid and the srcPid later... */
! 	n->xid = InvalidTransactionId;
! 	n->srcPid = InvalidPid;
  
! 	/*
! 	 * We want to preserve the order so we need to append every
! 	 * notification. See comments at AsyncExistsPendingNotify().
! 	 */
! 	pendingNotifies = lappend(pendingNotifies, n);
! 
! 	MemoryContextSwitchTo(oldcontext);
  }
  
  /*
***************
*** 226,236 ****
   *		Common code for listen, unlisten, unlisten all commands.
   *
   *		Adds the request to the list of pending actions.
!  *		Actual update of pg_listener happens during transaction commit.
!  *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   */
  static void
! queue_listen(ListenActionKind action, const char *condname)
  {
  	MemoryContext oldcontext;
  	ListenAction *actrec;
--- 467,477 ----
   *		Common code for listen, unlisten, unlisten all commands.
   *
   *		Adds the request to the list of pending actions.
!  *		Actual update of the notification queue happens during transaction
!  *		commit.
   */
  static void
! queue_listen(ListenActionKind action, const char *channel)
  {
  	MemoryContext oldcontext;
  	ListenAction *actrec;
***************
*** 244,252 ****
  	oldcontext = MemoryContextSwitchTo(CurTransactionContext);
  
  	/* space for terminating null is included in sizeof(ListenAction) */
! 	actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(condname));
  	actrec->action = action;
! 	strcpy(actrec->condname, condname);
  
  	pendingActions = lappend(pendingActions, actrec);
  
--- 485,493 ----
  	oldcontext = MemoryContextSwitchTo(CurTransactionContext);
  
  	/* space for terminating null is included in sizeof(ListenAction) */
! 	actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(channel));
  	actrec->action = action;
! 	strcpy(actrec->channel, channel);
  
  	pendingActions = lappend(pendingActions, actrec);
  
***************
*** 259,270 ****
   *		This is executed by the SQL listen command.
   */
  void
! Async_Listen(const char *relname)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid);
  
! 	queue_listen(LISTEN_LISTEN, relname);
  }
  
  /*
--- 500,511 ----
   *		This is executed by the SQL listen command.
   */
  void
! Async_Listen(const char *channel)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
  
! 	queue_listen(LISTEN_LISTEN, channel);
  }
  
  /*
***************
*** 273,288 ****
   *		This is executed by the SQL unlisten command.
   */
  void
! Async_Unlisten(const char *relname)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid);
  
  	/* If we couldn't possibly be listening, no need to queue anything */
  	if (pendingActions == NIL && !unlistenExitRegistered)
  		return;
  
! 	queue_listen(LISTEN_UNLISTEN, relname);
  }
  
  /*
--- 514,529 ----
   *		This is executed by the SQL unlisten command.
   */
  void
! Async_Unlisten(const char *channel)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
  
  	/* If we couldn't possibly be listening, no need to queue anything */
  	if (pendingActions == NIL && !unlistenExitRegistered)
  		return;
  
! 	queue_listen(LISTEN_UNLISTEN, channel);
  }
  
  /*
***************
*** 306,313 ****
  /*
   * Async_UnlistenOnExit
   *
-  *		Clean up the pg_listener table at backend exit.
-  *
   *		This is executed if we have done any LISTENs in this backend.
   *		It might not be necessary anymore, if the user UNLISTENed everything,
   *		but we don't try to detect that case.
--- 547,552 ----
***************
*** 315,331 ****
  static void
  Async_UnlistenOnExit(int code, Datum arg)
  {
- 	/*
- 	 * We need to start/commit a transaction for the unlisten, but if there is
- 	 * already an active transaction we had better abort that one first.
- 	 * Otherwise we'd end up committing changes that probably ought to be
- 	 * discarded.
- 	 */
  	AbortOutOfAnyTransaction();
! 	/* Now we can do the unlisten */
! 	StartTransactionCommand();
! 	Async_UnlistenAll();
! 	CommitTransactionCommand();
  }
  
  /*
--- 554,561 ----
  static void
  Async_UnlistenOnExit(int code, Datum arg)
  {
  	AbortOutOfAnyTransaction();
! 	Exec_UnlistenAll();
  }
  
  /*
***************
*** 348,357 ****
  	/* We can deal with pending NOTIFY though */
  	foreach(p, pendingNotifies)
  	{
! 		const char *relname = (const char *) lfirst(p);
  
  		RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
! 							   relname, strlen(relname) + 1);
  	}
  
  	/*
--- 578,592 ----
  	/* We can deal with pending NOTIFY though */
  	foreach(p, pendingNotifies)
  	{
! 		AsyncQueueEntry qe;
! 		Notification   *n;
! 
! 		n = (Notification *) lfirst(p);
! 
! 		asyncQueueNotificationToEntry(n, &qe);
  
  		RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
! 							   &qe, qe.length);
  	}
  
  	/*
***************
*** 363,386 ****
  }
  
  /*
!  * AtCommit_Notify
!  *
!  *		This is called at transaction commit.
   *
!  *		If there are pending LISTEN/UNLISTEN actions, insert or delete
!  *		tuples in pg_listener accordingly.
   *
!  *		If there are outbound notify requests in the pendingNotifies list,
!  *		scan pg_listener for matching tuples, and either signal the other
!  *		backend or send a message to our own frontend.
   *
!  *		NOTE: we are still inside the current transaction, therefore can
!  *		piggyback on its committing of changes.
   */
  void
! AtCommit_Notify(void)
  {
- 	Relation	lRel;
  	ListCell   *p;
  
  	if (pendingActions == NIL && pendingNotifies == NIL)
--- 598,617 ----
  }
  
  /*
!  * AtCommit_NotifyBeforeCommit
   *
!  *		This is called at transaction commit, before actually committing to
!  *		clog.
   *
!  *		If there are pending LISTEN/UNLISTEN actions, update our
!  *		"listenChannels" list.
   *
!  *		If there are outbound notify requests in the pendingNotifies list, add
!  *		them to the global queue and signal any backend that is listening.
   */
  void
! AtCommit_NotifyBeforeCommit(void)
  {
  	ListCell   *p;
  
  	if (pendingActions == NIL && pendingNotifies == NIL)
***************
*** 397,406 ****
  	}
  
  	if (Trace_notify)
! 		elog(DEBUG1, "AtCommit_Notify");
  
! 	/* Acquire ExclusiveLock on pg_listener */
! 	lRel = heap_open(ListenerRelationId, ExclusiveLock);
  
  	/* Perform any pending listen/unlisten actions */
  	foreach(p, pendingActions)
--- 628,636 ----
  	}
  
  	if (Trace_notify)
! 		elog(DEBUG1, "AtCommit_NotifyBeforeCommit");
  
! 	Assert(backendSendsNotifications == false);
  
  	/* Perform any pending listen/unlisten actions */
  	foreach(p, pendingActions)
***************
*** 410,508 ****
  		switch (actrec->action)
  		{
  			case LISTEN_LISTEN:
! 				Exec_Listen(lRel, actrec->condname);
  				break;
  			case LISTEN_UNLISTEN:
! 				Exec_Unlisten(lRel, actrec->condname);
  				break;
  			case LISTEN_UNLISTEN_ALL:
! 				Exec_UnlistenAll(lRel);
  				break;
  		}
- 
- 		/* We must CCI after each action in case of conflicting actions */
- 		CommandCounterIncrement();
  	}
  
- 	/* Perform any pending notifies */
- 	if (pendingNotifies)
- 		Send_Notify(lRel);
- 
  	/*
! 	 * We do NOT release the lock on pg_listener here; we need to hold it
! 	 * until end of transaction (which is about to happen, anyway) to ensure
! 	 * that notified backends see our tuple updates when they look. Else they
! 	 * might disregard the signal, which would make the application programmer
! 	 * very unhappy.  Also, this prevents race conditions when we have just
! 	 * inserted a listening tuple.
  	 */
! 	heap_close(lRel, NoLock);
  
  	ClearPendingActionsAndNotifies();
  
  	if (Trace_notify)
! 		elog(DEBUG1, "AtCommit_Notify: done");
  }
  
  /*
!  * Exec_Listen --- subroutine for AtCommit_Notify
!  *
!  *		Register the current backend as listening on the specified relation.
   */
! static void
! Exec_Listen(Relation lRel, const char *relname)
  {
! 	HeapScanDesc scan;
! 	HeapTuple	tuple;
! 	Datum		values[Natts_pg_listener];
! 	bool		nulls[Natts_pg_listener];
! 	NameData	condname;
! 	bool		alreadyListener = false;
  
! 	if (Trace_notify)
! 		elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid);
  
! 	/* Detect whether we are already listening on this relname */
! 	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
! 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
  	{
! 		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
  
! 		if (listener->listenerpid == MyProcPid &&
! 			strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
! 		{
! 			alreadyListener = true;
! 			/* No need to scan the rest of the table */
! 			break;
! 		}
  	}
- 	heap_endscan(scan);
  
! 	if (alreadyListener)
! 		return;
  
! 	/*
! 	 * OK to insert a new tuple
! 	 */
! 	memset(nulls, false, sizeof(nulls));
  
! 	namestrcpy(&condname, relname);
! 	values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname);
! 	values[Anum_pg_listener_listenerpid - 1] = Int32GetDatum(MyProcPid);
! 	values[Anum_pg_listener_notification - 1] = Int32GetDatum(0);		/* no notifies pending */
  
! 	tuple = heap_form_tuple(RelationGetDescr(lRel), values, nulls);
  
! 	simple_heap_insert(lRel, tuple);
  
! #ifdef NOT_USED					/* currently there are no indexes */
! 	CatalogUpdateIndexes(lRel, tuple);
! #endif
  
! 	heap_freetuple(tuple);
  
  	/*
! 	 * now that we are listening, make sure we will unlisten before dying.
  	 */
  	if (!unlistenExitRegistered)
  	{
--- 640,806 ----
  		switch (actrec->action)
  		{
  			case LISTEN_LISTEN:
! 				Exec_Listen(actrec->channel);
  				break;
  			case LISTEN_UNLISTEN:
! 				Exec_Unlisten(actrec->channel);
  				break;
  			case LISTEN_UNLISTEN_ALL:
! 				Exec_UnlistenAll();
  				break;
  		}
  	}
  
  	/*
! 	 * Perform any pending notifies.
  	 */
! 	if (pendingNotifies)
! 		Send_Notify();
! }
! 
! /*
!  * AtCommit_NotifyAfterCommit
!  *
!  *		This is called at transaction commit, after committing to clog.
!  *
!  *		Notify the listening backends.
!  */
! void
! AtCommit_NotifyAfterCommit(void)
! {
! 	/* Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
! 	 * return as soon as possible */
! 	if (!pendingActions && !backendSendsNotifications)
! 		return;
! 
! 	if (backendSendsNotifications)
! 		SignalBackends();
  
  	ClearPendingActionsAndNotifies();
  
  	if (Trace_notify)
! 		elog(DEBUG1, "AtCommit_NotifyAfterCommit: done");
  }
  
  /*
!  * This function is executed for every notification found in the queue in order
!  * to check if the current backend is listening on that channel. Not sure if we
!  * should further optimize this, for example convert to a sorted array and
!  * allow binary search on it...
   */
! static bool
! IsListeningOn(const char *channel)
  {
! 	ListCell   *p;
! 	char	   *lchan;
  
! 	foreach(p, listenChannels)
! 	{
! 		lchan = (char *) lfirst(p);
! 		if (strcmp(lchan, channel) == 0)
! 			return true;
! 	}
! 	return false;
! }
! 
! Datum
! pg_listening(PG_FUNCTION_ARGS)
! {
! 	FuncCallContext	   *funcctx;
! 	ListCell		  **lcp;
  
! 	/* stuff done only on the first call of the function */
! 	if (SRF_IS_FIRSTCALL())
  	{
! 		MemoryContext	oldcontext;
  
! 		/* create a function context for cross-call persistence */
! 		funcctx = SRF_FIRSTCALL_INIT();
! 
! 		/*
! 		 * switch to memory context appropriate for multiple function calls
! 		 */
! 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
! 
! 		/* allocate memory for user context */
! 		lcp = (ListCell **) palloc(sizeof(ListCell **));
! 		if (listenChannels != NIL)
! 			*lcp = list_head(listenChannels);
! 		else
! 			*lcp = NULL;
! 		funcctx->user_fctx = (void *) lcp;
! 
! 		MemoryContextSwitchTo(oldcontext);
  	}
  
! 	/* stuff done on every call of the function */
! 	funcctx = SRF_PERCALL_SETUP();
! 	lcp = (ListCell **) funcctx->user_fctx;
  
! 	while (*lcp != NULL)
! 	{
! 		char   *channel = (char *) lfirst(*lcp);
! 
! 		*lcp = (*lcp)->next;
! 		SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
! 	}
! 
! 	SRF_RETURN_DONE(funcctx);
! }
! 
! /*
!  * Exec_Listen --- subroutine for AtCommit_Notify
!  *
!  *		Register the current backend as listening on the specified channel.
!  */
! static void
! Exec_Listen(const char *channel)
! {
! 	MemoryContext oldcontext;
  
! 	if (Trace_notify)
! 		elog(DEBUG1, "Exec_Listen(%s,%d)", channel, MyProcPid);
  
! 	/* Detect whether we are already listening on this channel */
! 	if (IsListeningOn(channel))
! 		return;
  
! 	/*
! 	 * OK to insert to the list.
! 	 */
! 	if (listenChannels == NIL)
! 	{
! 		/*
! 		 * This is our first LISTEN, establish our pointer.
! 		 * We set our pointer to the global tail pointer, this way we make
! 		 * sure that we get all of the notifications. We might get a few more
! 		 * but that doesn't hurt.
! 		 */
! 		LWLockAcquire(AsyncQueueLock, LW_SHARED);
! 		QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
! 		QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
! 		LWLockRelease(AsyncQueueLock);
  
! 		/*
! 		 * Try to move our pointer forward as far as possible. This will skip
! 		 * over already committed notifications. Still, we could get
! 		 * notifications that have already committed before we started to
! 		 * LISTEN.
! 		 *
! 		 * Note that we are not yet listening on anything, so we won't deliver
! 		 * any notification.
! 		 *
! 		 * This will also advance the global tail pointer if necessary.
! 		 */
! 		asyncQueueReadAllNotifications();
! 	}
  
! 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
! 	listenChannels = lappend(listenChannels, pstrdup(channel));
! 	MemoryContextSwitchTo(oldcontext);
  
  	/*
! 	 * Now that we are listening, make sure we will unlisten before dying.
  	 */
  	if (!unlistenExitRegistered)
  	{
***************
*** 514,550 ****
  /*
   * Exec_Unlisten --- subroutine for AtCommit_Notify
   *
!  *		Remove the current backend from the list of listening backends
!  *		for the specified relation.
   */
  static void
! Exec_Unlisten(Relation lRel, const char *relname)
  {
! 	HeapScanDesc scan;
! 	HeapTuple	tuple;
  
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid);
  
! 	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
! 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
  	{
! 		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
! 
! 		if (listener->listenerpid == MyProcPid &&
! 			strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
  		{
! 			/* Found the matching tuple, delete it */
! 			simple_heap_delete(lRel, &tuple->t_self);
! 
! 			/*
! 			 * We assume there can be only one match, so no need to scan the
! 			 * rest of the table
! 			 */
  			break;
  		}
  	}
! 	heap_endscan(scan);
  
  	/*
  	 * We do not complain about unlistening something not being listened;
--- 812,843 ----
  /*
   * Exec_Unlisten --- subroutine for AtCommit_Notify
   *
!  *		Remove a specified channel from "listenChannel".
   */
  static void
! Exec_Unlisten(const char *channel)
  {
! 	ListCell *q;
! 	ListCell *prev;
  
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_Unlisten(%s,%d)", channel, MyProcPid);
  
! 	prev = NULL;
! 	foreach(q, listenChannels)
  	{
! 		char *lchan = (char *) lfirst(q);
! 		if (strcmp(lchan, channel) == 0)
  		{
! 			pfree(lchan);
! 			listenChannels = list_delete_cell(listenChannels, q, prev);
  			break;
  		}
+ 		prev = q;
  	}
! 
! 	if (listenChannels == NIL)
! 		asyncQueueUnregister();
  
  	/*
  	 * We do not complain about unlistening something not being listened;
***************
*** 555,690 ****
  /*
   * Exec_UnlistenAll --- subroutine for AtCommit_Notify
   *
!  *		Update pg_listener to unlisten all relations for this backend.
   */
  static void
! Exec_UnlistenAll(Relation lRel)
  {
- 	HeapScanDesc scan;
- 	HeapTuple	lTuple;
- 	ScanKeyData key[1];
- 
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_UnlistenAll");
  
! 	/* Find and delete all entries with my listenerPID */
! 	ScanKeyInit(&key[0],
! 				Anum_pg_listener_listenerpid,
! 				BTEqualStrategyNumber, F_INT4EQ,
! 				Int32GetDatum(MyProcPid));
! 	scan = heap_beginscan(lRel, SnapshotNow, 1, key);
  
! 	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! 		simple_heap_delete(lRel, &lTuple->t_self);
  
! 	heap_endscan(scan);
  }
  
  /*
!  * Send_Notify --- subroutine for AtCommit_Notify
!  *
!  *		Scan pg_listener for tuples matching our pending notifies, and
!  *		either signal the other backend or send a message to our own frontend.
   */
  static void
! Send_Notify(Relation lRel)
  {
! 	TupleDesc	tdesc = RelationGetDescr(lRel);
! 	HeapScanDesc scan;
! 	HeapTuple	lTuple,
! 				rTuple;
! 	Datum		value[Natts_pg_listener];
! 	bool		repl[Natts_pg_listener],
! 				nulls[Natts_pg_listener];
! 
! 	/* preset data to update notify column to MyProcPid */
! 	memset(nulls, false, sizeof(nulls));
! 	memset(repl, false, sizeof(repl));
! 	repl[Anum_pg_listener_notification - 1] = true;
! 	memset(value, 0, sizeof(value));
! 	value[Anum_pg_listener_notification - 1] = Int32GetDatum(MyProcPid);
! 
! 	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
! 
! 	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! 	{
! 		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
! 		char	   *relname = NameStr(listener->relname);
! 		int32		listenerPID = listener->listenerpid;
  
! 		if (!AsyncExistsPendingNotify(relname))
! 			continue;
  
! 		if (listenerPID == MyProcPid)
  		{
! 			/*
! 			 * Self-notify: no need to bother with table update. Indeed, we
! 			 * *must not* clear the notification field in this path, or we
! 			 * could lose an outside notify, which'd be bad for applications
! 			 * that ignore self-notify messages.
! 			 */
! 			if (Trace_notify)
! 				elog(DEBUG1, "AtCommit_Notify: notifying self");
  
! 			NotifyMyFrontEnd(relname, listenerPID);
  		}
  		else
  		{
- 			if (Trace_notify)
- 				elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
- 					 listenerPID);
- 
  			/*
! 			 * If someone has already notified this listener, we don't bother
! 			 * modifying the table, but we do still send a NOTIFY_INTERRUPT
! 			 * signal, just in case that backend missed the earlier signal for
! 			 * some reason.  It's OK to send the signal first, because the
! 			 * other guy can't read pg_listener until we unlock it.
! 			 *
! 			 * Note: we don't have the other guy's BackendId available, so
! 			 * this will incur a search of the ProcSignal table.  That's
! 			 * probably not worth worrying about.
  			 */
! 			if (SendProcSignal(listenerPID, PROCSIG_NOTIFY_INTERRUPT,
! 							   InvalidBackendId) < 0)
  			{
! 				/*
! 				 * Get rid of pg_listener entry if it refers to a PID that no
! 				 * longer exists.  Presumably, that backend crashed without
! 				 * deleting its pg_listener entries. This code used to only
! 				 * delete the entry if errno==ESRCH, but as far as I can see
! 				 * we should just do it for any failure (certainly at least
! 				 * for EPERM too...)
! 				 */
! 				simple_heap_delete(lRel, &lTuple->t_self);
  			}
! 			else if (listener->notification == 0)
  			{
! 				/* Rewrite the tuple with my PID in notification column */
! 				rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
! 				simple_heap_update(lRel, &lTuple->t_self, rTuple);
! 
! #ifdef NOT_USED					/* currently there are no indexes */
! 				CatalogUpdateIndexes(lRel, rTuple);
! #endif
  			}
  		}
  	}
  
! 	heap_endscan(scan);
  }
  
  /*
   * AtAbort_Notify
   *
!  *		This is called at transaction abort.
   *
!  *		Gets rid of pending actions and outbound notifies that we would have
!  *		executed if the transaction got committed.
   */
  void
  AtAbort_Notify(void)
  {
  	ClearPendingActionsAndNotifies();
  }
  
--- 848,1220 ----
  /*
   * Exec_UnlistenAll --- subroutine for AtCommit_Notify
   *
!  *		Unlisten on all channels for this backend.
   */
  static void
! Exec_UnlistenAll(void)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_UnlistenAll(%d)", MyProcPid);
! 
! 	list_free_deep(listenChannels);
! 	listenChannels = NIL;
  
! 	asyncQueueUnregister();
! }
! 
! static void
! asyncQueueUnregister(void)
! {
! 	bool	  advanceTail = false;
! 
! 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
! 	QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
! 	/*
! 	 * If we have been the last backend, advance the tail pointer.
! 	 */
! 	if (QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL))
! 		advanceTail = true;
! 	LWLockRelease(AsyncQueueLock);
! 
! 	if (advanceTail)
! 		asyncQueueAdvanceTail();
! }
! 
! static bool
! asyncQueueIsFull(void)
! {
! 	QueuePosition	lookahead = QUEUE_HEAD;
! 	Size			remain = QUEUE_PAGESIZE - QUEUE_POS_OFFSET(lookahead) - 1;
! 	Size			advance = Min(remain, NOTIFY_PAYLOAD_MAX_LENGTH);
  
! 	/*
! 	 * Check what happens if we wrote a maximally sized entry. Would we go to a
! 	 * new page? If not, then our queue can not be full (because we can still
! 	 * fill at least the current page with at least one more entry).
! 	 */
! 	if (!asyncQueueAdvance(&lookahead, advance))
! 		return false;
  
! 	/*
! 	 * The queue is full if with a switch to a new page we reach the page
! 	 * of the tail pointer.
! 	 */
! 	return QUEUE_POS_PAGE(lookahead) == QUEUE_POS_PAGE(QUEUE_TAIL);
  }
  
  /*
!  * The function advances the position to the next entry. In case we jump to
!  * a new page the function returns true, else false.
   */
+ static bool
+ asyncQueueAdvance(QueuePosition *position, int entryLength)
+ {
+ 	int		pageno = QUEUE_POS_PAGE(*position);
+ 	int		offset = QUEUE_POS_OFFSET(*position);
+ 	bool	pageJump = false;
+ 
+ 	/*
+ 	 * Move to the next writing position: First jump over what we have just
+ 	 * written or read.
+ 	 */
+ 	offset += entryLength;
+ 	Assert(offset < QUEUE_PAGESIZE);
+ 
+ 	/*
+ 	 * In a second step check if another entry can be written to the page. If
+ 	 * it does, stay here, we have reached the next position. If not, then we
+ 	 * need to move on to the next page.
+ 	 */
+ 	if (offset + AsyncQueueEntryEmptySize >= QUEUE_PAGESIZE)
+ 	{
+ 		pageno++;
+ 		if (pageno > QUEUE_MAX_PAGE)
+ 			/* wrap around */
+ 			pageno = 0;
+ 		offset = 0;
+ 		pageJump = true;
+ 	}
+ 
+ 	SET_QUEUE_POS(*position, pageno, offset);
+ 	return pageJump;
+ }
+ 
+ static void
+ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
+ {
+ 		Assert(n->channel != NULL);
+ 		Assert(n->payload != NULL);
+ 		Assert(strlen(n->payload) <= NOTIFY_PAYLOAD_MAX_LENGTH);
+ 
+ 		/* The terminator is already included in AsyncQueueEntryEmptySize */
+ 		qe->length = AsyncQueueEntryEmptySize + strlen(n->payload);
+ 		qe->srcPid = MyProcPid;
+ 		qe->dboid = MyDatabaseId;
+ 		qe->xid = GetCurrentTransactionId();
+ 		strcpy(qe->channel, n->channel);
+ 		strcpy(qe->payload, n->payload);
+ }
+ 
  static void
! asyncQueueEntryToNotification(AsyncQueueEntry *qe, Notification *n)
! {
! 	n->channel = pstrdup(qe->channel);
! 	n->payload = pstrdup(qe->payload);
! 	n->srcPid = qe->srcPid;
! 	n->xid = qe->xid;
! }
! 
! /*
!  * Add the notifications to the queue: we go page by page here, i.e. we stop
!  * once we have to go to a new page but we will be called again and then fill
!  * that next page. If an entry does not fit to a page anymore, we write a dummy
!  * entry with an InvalidOid as the database oid in order to fill the page. So
!  * every page is always used up to the last byte which simplifies reading the
!  * page later.
!  *
!  * We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock
!  * here in this function.
!  *
!  * We are passed the list of notifications to write and return the
!  * not-yet-written notifications back. Eventually we will return NIL.
!  */
! static List *
! asyncQueueAddEntries(List *notifications)
  {
! 	AsyncQueueEntry	qe;
! 	int				pageno;
! 	int				offset;
! 	int				slotno;
  
! 	/*
! 	 * Note that we are holding exclusive AsyncQueueLock already.
! 	 */
! 	LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
! 	pageno = QUEUE_POS_PAGE(QUEUE_HEAD);
! 	slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
! 	AsyncCtl->shared->page_dirty[slotno] = true;
  
! 	do
! 	{
! 		Notification   *n;
! 
! 		if (asyncQueueIsFull())
  		{
! 			/* document that we will not go into the if-block further down */
! 			Assert(QUEUE_POS_OFFSET(QUEUE_HEAD) != 0);
! 			break;
! 		}
  
! 		n = (Notification *) linitial(notifications);
! 
! 		asyncQueueNotificationToEntry(n, &qe);
! 
! 		offset = QUEUE_POS_OFFSET(QUEUE_HEAD);
! 		/*
! 		 * Check whether or not the entry still fits on the current page.
! 		 */
! 		if (offset + qe.length < QUEUE_PAGESIZE)
! 		{
! 			notifications = list_delete_first(notifications);
  		}
  		else
  		{
  			/*
! 			 * Write a dummy entry to fill up the page. Actually readers will
! 			 * only check dboid and since it won't match any reader's database
! 			 * oid, they will ignore this entry and move on.
  			 */
! 			qe.length = QUEUE_PAGESIZE - offset - 1;
! 			qe.dboid = InvalidOid;
! 			qe.channel[0] = '\0';
! 			qe.payload[0] = '\0';
! 			qe.xid = InvalidTransactionId;
! 		}
! 		memcpy((char*) AsyncCtl->shared->page_buffer[slotno] + offset,
! 			   &qe, qe.length);
! 
! 	} while (!asyncQueueAdvance(&(QUEUE_HEAD), qe.length)
! 			 && notifications != NIL);
! 
! 	if (QUEUE_POS_OFFSET(QUEUE_HEAD) == 0)
! 	{
! 		/*
! 		 * we need to go to continue on a new page, stop here but prepare that
! 		 * page already.
! 		 */
! 		slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
! 		AsyncCtl->shared->page_dirty[slotno] = true;
! 	}
! 	LWLockRelease(AsyncCtlLock);
! 
! 	return notifications;
! }
! 
! static void
! asyncQueueFillWarning(void)
! {
! 	/*
! 	 * Caller must hold exclusive AsyncQueueLock.
! 	 */
! 	TimestampTz		t;
! 	double			fillDegree;
! 	int				occupied;
! 	int				tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
! 	int				headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
! 
! 	occupied = headPage - tailPage;
! 
! 	if (occupied == 0)
! 		return;
! 	
! 	if (!asyncQueuePagePrecedesPhysically(tailPage, headPage))
! 		/* head has wrapped around, tail not yet */
! 		occupied += QUEUE_MAX_PAGE;
! 
! 	fillDegree = (float) occupied / (float) QUEUE_MAX_PAGE;
! 
! 	if (fillDegree < 0.5)
! 		return;
! 
! 	t = GetCurrentTimestamp();
! 
! 	if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
! 								   t, QUEUE_FULL_WARN_INTERVAL))
! 	{
! 		QueuePosition	min = QUEUE_HEAD;
! 		int32			minPid = InvalidPid;
! 		int				i;
! 
! 		for (i = 0; i < MaxBackends; i++)
! 			if (QUEUE_BACKEND_PID(i) != InvalidPid)
  			{
! 				min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i), QUEUE_HEAD);
! 				if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
! 					minPid = QUEUE_BACKEND_PID(i);
  			}
! 
! 		if (fillDegree < 0.75)
! 			ereport(WARNING, (errmsg("pg_notify queue is more than 50%% full. "
! 								 "Among the slowest backends: %d", minPid)));
! 		else
! 			ereport(WARNING, (errmsg("pg_notify queue is more than 75%% full. "
! 								 "Among the slowest backends: %d", minPid)));
! 
! 		asyncQueueControl->lastQueueFillWarn = t;
! 	}
! }
! 
! /*
!  * Send_Notify --- subroutine for AtCommit_Notify
!  *
!  * Add the pending notifications to the queue.
!  *
!  * A full queue is very uncommon and should really not happen, given that we
!  * have so much space available in the slru pages. Nevertheless we need to
!  * deal with this possibility. Note that when we get here we are in the process
!  * of committing our transaction, we have not yet committed to clog but this
!  * would be the next step. So at this point in time we can still roll the
!  * transaction back.
!  */
! static void
! Send_Notify(void)
! {
! 	backendSendsNotifications = true;
! 
! 	while (pendingNotifies != NIL)
! 	{
! 		LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
! 		asyncQueueFillWarning();
! 		if (asyncQueueIsFull())
! 			ereport(ERROR,
! 					(errcode(ERRCODE_TOO_MANY_ENTRIES),
! 					errmsg("Too many notifications in the queue")));
! 		pendingNotifies = asyncQueueAddEntries(pendingNotifies);
! 		LWLockRelease(AsyncQueueLock);
! 	}
! }
! 
! /*
!  * Send signals to all listening backends. Since we have EXCLUSIVE lock anyway
!  * we also check the position of the other backends and in case that anyone is
!  * already up-to-date we don't signal it. This can happen if concurrent
!  * notifying transactions have sent a signal and the signaled backend has read
!  * the other notifications and ours in the same step.
!  *
!  * Since we know the BackendId and the Pid the signalling is quite cheap.
!  */
! static void
! SignalBackends(void)
! {
! 	QueuePosition	pos;
! 	ListCell	   *p1, *p2;
! 	int				i;
! 	int32			pid;
! 	List		   *pids = NIL;
! 	List		   *ids = NIL;
! 	int				count = 0;
! 
! 	/* Signal everybody who is LISTENing to any channel. */
! 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
! 	for (i = 0; i < MaxBackends; i++)
! 	{
! 		pid = QUEUE_BACKEND_PID(i);
! 		if (pid != InvalidPid)
! 		{
! 			count++;
! 			pos = QUEUE_BACKEND_POS(i);
! 			if (!QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
  			{
! 				pids = lappend_int(pids, pid);
! 				ids = lappend_int(ids, i);
  			}
  		}
  	}
+ 	LWLockRelease(AsyncQueueLock);
+ 
+ 	forboth(p1, pids, p2, ids)
+ 	{
+ 		pid = (int32) lfirst_int(p1);
+ 		i = lfirst_int(p2);
+ 		/*
+ 		 * Should we check for failure? Can it happen that a backend
+ 		 * has crashed without the postmaster starting over?
+ 		 */
+ 		if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, i) < 0)
+ 			elog(WARNING, "Error signalling backend %d", pid);
+ 	}
  
! 	if (count == 0)
! 	{
! 		/* No backend is listening at all, try to clean up the queue.
! 		 * Even if by now (after we determined count to be 0 and now)
! 		 * a backend has started to listen, advancing the tail does not
! 		 * hurt. Our notifications are committed already and a newly
! 		 * listening backend would skip over them anyway. */
! 		asyncQueueAdvanceTail();
! 	}
  }
  
  /*
   * AtAbort_Notify
   *
!  *	This is called at transaction abort.
!  *
!  *	Gets rid of pending actions and outbound notifies that we would have
!  *	executed if the transaction got committed.
   *
!  *	Even though we have not committed, we need to signal the listening backends
!  *	because our notifications might block readers from processing the queue.
!  *	Now that the transaction has aborted, they can go on and skip over our
!  *	notifications. They could find notifications past ours that they need to
!  *	deliver.
   */
  void
  AtAbort_Notify(void)
  {
+ 	if (backendSendsNotifications)
+ 		SignalBackends();
+ 
  	ClearPendingActionsAndNotifies();
  }
  
***************
*** 940,968 ****
  }
  
  /*
   * ProcessIncomingNotify
   *
   *		Deal with arriving NOTIFYs from other backends.
   *		This is called either directly from the PROCSIG_NOTIFY_INTERRUPT
   *		signal handler, or the next time control reaches the outer idle loop.
!  *		Scan pg_listener for arriving notifies, report them to my front end,
!  *		and clear the notification field in pg_listener until next time.
   *
!  *		NOTE: since we are outside any transaction, we must create our own.
   */
  static void
  ProcessIncomingNotify(void)
  {
! 	Relation	lRel;
! 	TupleDesc	tdesc;
! 	ScanKeyData key[1];
! 	HeapScanDesc scan;
! 	HeapTuple	lTuple,
! 				rTuple;
! 	Datum		value[Natts_pg_listener];
! 	bool		repl[Natts_pg_listener],
! 				nulls[Natts_pg_listener];
! 	bool		catchup_enabled;
  
  	/* Must prevent catchup interrupt while I am running */
  	catchup_enabled = DisableCatchupInterrupt();
--- 1470,1708 ----
  }
  
  /*
+  * This function will ask for a page with ReadOnly access and once we have the
+  * lock, we read the whole content and pass back the list of notifications
+  * that the calling function will deliver then. The list will contain all
+  * notifications from transactions that have already committed.
+  *
+  * We stop if we have either reached the stop position or go to a new page.
+  *
+  * The function returns true once we have reached the end or a notification of
+  * a transaction that is still running and false if we have finished with
+  * the page. In other words: once it returns true there is no point in calling
+  * it again.
+  */
+ static bool
+ asyncQueueGetEntriesByPage(QueuePosition *current,
+ 						   QueuePosition stop,
+ 						   List **notifications)
+ {
+ 	AsyncQueueEntry	qe;
+ 	Notification   *n;
+ 	int				slotno;
+ 	bool			reachedStop = false;
+ 
+ 	if (QUEUE_POS_EQUAL(*current, stop))
+ 		return true;
+ 
+ 	slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, current->page,
+ 										InvalidTransactionId);
+ 	do {
+ 		char *readPtr = (char *) (AsyncCtl->shared->page_buffer[slotno]);
+ 
+ 		if (QUEUE_POS_EQUAL(*current, stop))
+ 		{
+ 			reachedStop = true;
+ 			break;
+ 		}
+ 
+ 		readPtr += current->offset;
+ 		/* at first we only read the header of the notification */
+ 		memcpy(&qe, readPtr, AsyncQueueEntryEmptySize);
+ 
+ 		if (qe.dboid == MyDatabaseId)
+ 		{
+ 			if (TransactionIdDidCommit(qe.xid))
+ 			{
+ 				if (IsListeningOn(qe.channel))
+ 				{
+ 					if (qe.length > AsyncQueueEntryEmptySize)
+ 					{
+ 						/* now we know that we are interested in the
+ 						 * notification and read it completely. */
+ 						memcpy(&qe, readPtr, qe.length);
+ 					}
+ 					n = (Notification *) palloc(sizeof(Notification));
+ 					asyncQueueEntryToNotification(&qe, n);
+ 					*notifications = lappend(*notifications, n);
+ 				}
+ 			}
+ 			else
+ 			{
+ 				if (!TransactionIdDidAbort(qe.xid))
+ 				{
+ 					/*
+ 					 * The transaction has neither committed nor aborted so
+ 					 * far.
+ 					 */
+ 					reachedStop = true;
+ 					break;
+ 				}
+ 				/*
+ 				 * Here we know that the transaction has aborted, we just
+ 				 * ignore its notifications.
+ 				 */
+ 			}
+ 		}
+ 		/*
+ 		 * The call to asyncQueueAdvance just jumps over what we have
+ 		 * just read. If there is no more space for the next record on the
+ 		 * current page, it will also switch to the beginning of the next page.
+ 		 */
+ 	} while(!asyncQueueAdvance(current, qe.length));
+ 
+ 	/*
+ 	 * Release the lock that we implicitly got from
+ 	 * SimpleLruReadPage_ReadOnly().
+ 	 */
+ 	LWLockRelease(AsyncCtlLock);
+ 
+ 	if (QUEUE_POS_EQUAL(*current, stop))
+ 		reachedStop = true;
+ 
+ 	return reachedStop;
+ }
+ 
+ 
+ static void
+ asyncQueueReadAllNotifications(void)
+ {
+ 	QueuePosition	pos;
+ 	QueuePosition	oldpos;
+ 	QueuePosition	head;
+ 	List		   *notifications;
+ 	ListCell	   *lc;
+ 	Notification   *n;
+ 	bool			advanceTail = false;
+ 	bool			reachedStop;
+ 
+ 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
+ 	pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
+ 	head = QUEUE_HEAD;
+ 	LWLockRelease(AsyncQueueLock);
+ 
+ 	if (QUEUE_POS_EQUAL(pos, head))
+ 	{
+ 		/* Nothing to do, we have read all notifications already. */
+ 		return;
+ 	}
+ 
+ 	do 
+ 	{
+ 		/*
+ 		 * Our stop position is what we found to be the head's position when
+ 		 * we entered this function. It might have changed already. But if it
+ 		 * has, we will receive (or have already received and queued) another
+ 		 * signal and come here again.
+ 		 *
+ 		 * We are not holding AsyncQueueLock here! The queue can only extend
+ 		 * beyond the head pointer (see above) and we leave our backend's
+ 		 * pointer where it is so nobody will truncate or rewrite pages under
+ 		 * us. Especially we don't want to hold a lock while sending the
+ 		 * notifications to the frontend.
+ 		 */
+ 		reachedStop = false;
+ 
+ 		notifications = NIL;
+ 		reachedStop = asyncQueueGetEntriesByPage(&pos, head, &notifications);
+ 
+ 		/*
+ 		 * Note that we deliver everything that we see in the queue and that
+ 		 * matches our _current_ listening state.
+ 		 * Especially we do not take into account different commit times.
+ 		 *
+ 		 * See the following example:
+ 		 *
+ 		 * Backend 1:                    Backend 2:
+ 		 *
+ 		 * transaction starts
+ 		 * NOTIFY foo;
+ 		 * commit starts
+ 		 *                               transaction starts
+ 		 *                               LISTEN foo;
+ 		 *                               commit starts
+ 		 * commit to clog
+ 		 *                               commit to clog
+ 		 *
+ 		 * It could happen that backend 2 sees the notification from
+ 		 * backend 1 in the queue and even though the notifying transaction
+ 		 * committed before the listening transaction, we still deliver the
+ 		 * notification.
+ 		 *
+ 		 * The idea is that an additional notification does not do any
+ 		 * harm we just need to make sure that we do not miss a
+ 		 * notification.
+ 		 */
+ 		foreach(lc, notifications)
+ 		{
+ 			n = (Notification *) lfirst(lc);
+ 			NotifyMyFrontEnd(n->channel, n->payload, n->srcPid);
+ 		}
+ 	} while (!reachedStop);
+ 
+ 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
+ 	QUEUE_BACKEND_POS(MyBackendId) = pos;
+ 	if (QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL))
+ 		advanceTail = true;
+ 	LWLockRelease(AsyncQueueLock);
+ 
+ 	if (advanceTail)
+ 		/* Move forward the tail pointer and try to truncate. */
+ 		asyncQueueAdvanceTail();
+ }
+ 
+ static void
+ asyncQueueAdvanceTail(void)
+ {
+ 	QueuePosition	min;
+ 	int				i;
+ 	int				tailp;
+ 	int				headp;
+ 
+ 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+ 	min = QUEUE_HEAD;
+ 	for (i = 0; i < MaxBackends; i++)
+ 		if (QUEUE_BACKEND_PID(i) != InvalidPid)
+ 			min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i), QUEUE_HEAD);
+ 
+ 	tailp = QUEUE_POS_PAGE(QUEUE_TAIL);
+ 	headp = QUEUE_POS_PAGE(QUEUE_HEAD);
+ 	QUEUE_TAIL = min;
+ 	LWLockRelease(AsyncQueueLock);
+ 
+ 	/* This is our wraparound check */
+ 	if ((asyncQueuePagePrecedesLogically(tailp, QUEUE_POS_PAGE(min), headp)
+ 			&& asyncQueuePagePrecedesPhysically(tailp, headp))
+ 		|| tailp == QUEUE_POS_PAGE(min))
+ 	{
+ 		/*
+ 		 * SimpleLruTruncate() will ask for AsyncCtlLock but will also
+ 		 * release the lock again.
+ 		 *
+ 		 * XXX this could be optimized, to call SimpleLruTruncate only when we
+ 		 * know that we can truncate something.
+ 		 */
+ 		SimpleLruTruncate(AsyncCtl, QUEUE_POS_PAGE(min));
+ 	}
+ }
+ 
+ /*
   * ProcessIncomingNotify
   *
   *		Deal with arriving NOTIFYs from other backends.
   *		This is called either directly from the PROCSIG_NOTIFY_INTERRUPT
   *		signal handler, or the next time control reaches the outer idle loop.
!  *		Scan the queue for arriving notifications and report them to my front
!  *		end.
   *
!  *		NOTE: we are outside of any transaction here.
   */
  static void
  ProcessIncomingNotify(void)
  {
! 	bool			catchup_enabled;
! 
! 	Assert(GetCurrentTransactionIdIfAny() == InvalidTransactionId);
  
  	/* Must prevent catchup interrupt while I am running */
  	catchup_enabled = DisableCatchupInterrupt();
***************
*** 974,1037 ****
  
  	notifyInterruptOccurred = 0;
  
! 	StartTransactionCommand();
! 
! 	lRel = heap_open(ListenerRelationId, ExclusiveLock);
! 	tdesc = RelationGetDescr(lRel);
! 
! 	/* Scan only entries with my listenerPID */
! 	ScanKeyInit(&key[0],
! 				Anum_pg_listener_listenerpid,
! 				BTEqualStrategyNumber, F_INT4EQ,
! 				Int32GetDatum(MyProcPid));
! 	scan = heap_beginscan(lRel, SnapshotNow, 1, key);
! 
! 	/* Prepare data for rewriting 0 into notification field */
! 	memset(nulls, false, sizeof(nulls));
! 	memset(repl, false, sizeof(repl));
! 	repl[Anum_pg_listener_notification - 1] = true;
! 	memset(value, 0, sizeof(value));
! 	value[Anum_pg_listener_notification - 1] = Int32GetDatum(0);
! 
! 	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! 	{
! 		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
! 		char	   *relname = NameStr(listener->relname);
! 		int32		sourcePID = listener->notification;
! 
! 		if (sourcePID != 0)
! 		{
! 			/* Notify the frontend */
! 
! 			if (Trace_notify)
! 				elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
! 					 relname, (int) sourcePID);
! 
! 			NotifyMyFrontEnd(relname, sourcePID);
! 
! 			/*
! 			 * Rewrite the tuple with 0 in notification column.
! 			 */
! 			rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
! 			simple_heap_update(lRel, &lTuple->t_self, rTuple);
! 
! #ifdef NOT_USED					/* currently there are no indexes */
! 			CatalogUpdateIndexes(lRel, rTuple);
! #endif
! 		}
! 	}
! 	heap_endscan(scan);
! 
! 	/*
! 	 * We do NOT release the lock on pg_listener here; we need to hold it
! 	 * until end of transaction (which is about to happen, anyway) to ensure
! 	 * that other backends see our tuple updates when they look. Otherwise, a
! 	 * transaction started after this one might mistakenly think it doesn't
! 	 * need to send this backend a new NOTIFY.
! 	 */
! 	heap_close(lRel, NoLock);
! 
! 	CommitTransactionCommand();
  
  	/*
  	 * Must flush the notify messages to ensure frontend gets them promptly.
--- 1714,1720 ----
  
  	notifyInterruptOccurred = 0;
  
! 	asyncQueueReadAllNotifications();
  
  	/*
  	 * Must flush the notify messages to ensure frontend gets them promptly.
***************
*** 1051,1070 ****
   * Send NOTIFY message to my front end.
   */
  static void
! NotifyMyFrontEnd(char *relname, int32 listenerPID)
  {
  	if (whereToSendOutput == DestRemote)
  	{
  		StringInfoData buf;
  
  		pq_beginmessage(&buf, 'A');
! 		pq_sendint(&buf, listenerPID, sizeof(int32));
! 		pq_sendstring(&buf, relname);
  		if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
! 		{
! 			/* XXX Add parameter string here later */
! 			pq_sendstring(&buf, "");
! 		}
  		pq_endmessage(&buf);
  
  		/*
--- 1734,1750 ----
   * Send NOTIFY message to my front end.
   */
  static void
! NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
  {
  	if (whereToSendOutput == DestRemote)
  	{
  		StringInfoData buf;
  
  		pq_beginmessage(&buf, 'A');
! 		pq_sendint(&buf, srcPid, sizeof(int32));
! 		pq_sendstring(&buf, channel);
  		if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
! 			pq_sendstring(&buf, payload);
  		pq_endmessage(&buf);
  
  		/*
***************
*** 1074,1096 ****
  		 */
  	}
  	else
! 		elog(INFO, "NOTIFY for %s", relname);
  }
  
! /* Does pendingNotifies include the given relname? */
  static bool
! AsyncExistsPendingNotify(const char *relname)
  {
  	ListCell   *p;
  
! 	foreach(p, pendingNotifies)
! 	{
! 		const char *prelname = (const char *) lfirst(p);
  
! 		if (strcmp(prelname, relname) == 0)
  			return true;
  	}
  
  	return false;
  }
  
--- 1754,1810 ----
  		 */
  	}
  	else
! 		elog(INFO, "NOTIFY for %s", channel);
  }
  
! /* Does pendingNotifies include the given channel/payload? */
  static bool
! AsyncExistsPendingNotify(const char *channel, const char *payload)
  {
  	ListCell   *p;
+ 	Notification *n;
  
! 	if (pendingNotifies == NIL)
! 		return false;
  
! 	if (payload == NULL)
! 		payload = "";
! 
! 	/*
! 	 * We need to append new elements to the end of the list in order to keep
! 	 * the order. However, on the other hand we'd like to check the list
! 	 * backwards in order to make duplicate-elimination a tad faster when the
! 	 * same condition is signaled many times in a row. So as a compromise we
! 	 * check the tail element first which we can access directly. If this
! 	 * doesn't match, we check the rest of whole list.
! 	 */
! 
! 	n = (Notification *) llast(pendingNotifies);
! 	if (strcmp(n->channel, channel) == 0)
! 	{
! 		Assert(n->payload != NULL);
! 		if (strcmp(n->payload, payload) == 0)
  			return true;
  	}
  
+ 	/*
+ 	 * Note the difference to foreach(). We stop if p is the last element
+ 	 * already. So we don't check the last element, we have checked it already.
+  	 */
+ 	for(p = list_head(pendingNotifies);
+ 		p != list_tail(pendingNotifies);
+ 		p = lnext(p))
+ 	{
+ 		n = (Notification *) lfirst(p);
+ 
+ 		if (strcmp(n->channel, channel) == 0)
+ 		{
+ 			Assert(n->payload != NULL);
+ 			if (strcmp(n->payload, payload) == 0)
+ 				return true;
+ 		}
+ 	}
+ 
  	return false;
  }
  
***************
*** 1107,1112 ****
--- 1821,1828 ----
  	 */
  	pendingActions = NIL;
  	pendingNotifies = NIL;
+ 
+ 	backendSendsNotifications = false;
  }
  
  /*
***************
*** 1124,1128 ****
  	 * there is any significant delay before I commit.	OK for now because we
  	 * disallow COMMIT PREPARED inside a transaction block.)
  	 */
! 	Async_Notify((char *) recdata);
  }
--- 1840,1850 ----
  	 * there is any significant delay before I commit.	OK for now because we
  	 * disallow COMMIT PREPARED inside a transaction block.)
  	 */
! 	AsyncQueueEntry		*qe = (AsyncQueueEntry *) recdata;
! 
! 	Assert(qe->dboid == MyDatabaseId);
! 	Assert(qe->length == len);
! 
! 	Async_Notify(qe->channel, qe->payload);
  }
+ 
diff -cr cvs.head/src/backend/nodes/copyfuncs.c cvs.build/src/backend/nodes/copyfuncs.c
*** cvs.head/src/backend/nodes/copyfuncs.c	2010-01-06 22:30:06.000000000 +0100
--- cvs.build/src/backend/nodes/copyfuncs.c	2010-01-22 00:42:56.000000000 +0100
***************
*** 2770,2775 ****
--- 2770,2776 ----
  	NotifyStmt *newnode = makeNode(NotifyStmt);
  
  	COPY_STRING_FIELD(conditionname);
+ 	COPY_STRING_FIELD(payload);
  
  	return newnode;
  }
diff -cr cvs.head/src/backend/nodes/equalfuncs.c cvs.build/src/backend/nodes/equalfuncs.c
*** cvs.head/src/backend/nodes/equalfuncs.c	2010-01-06 22:30:06.000000000 +0100
--- cvs.build/src/backend/nodes/equalfuncs.c	2010-01-22 00:42:56.000000000 +0100
***************
*** 1324,1329 ****
--- 1324,1330 ----
  _equalNotifyStmt(NotifyStmt *a, NotifyStmt *b)
  {
  	COMPARE_STRING_FIELD(conditionname);
+ 	COMPARE_STRING_FIELD(payload);
  
  	return true;
  }
diff -cr cvs.head/src/backend/nodes/outfuncs.c cvs.build/src/backend/nodes/outfuncs.c
*** cvs.head/src/backend/nodes/outfuncs.c	2010-01-06 22:30:06.000000000 +0100
--- cvs.build/src/backend/nodes/outfuncs.c	2010-01-22 00:42:56.000000000 +0100
***************
*** 1817,1822 ****
--- 1817,1823 ----
  	WRITE_NODE_TYPE("NOTIFY");
  
  	WRITE_STRING_FIELD(conditionname);
+ 	WRITE_STRING_FIELD(payload);
  }
  
  static void
diff -cr cvs.head/src/backend/nodes/readfuncs.c cvs.build/src/backend/nodes/readfuncs.c
*** cvs.head/src/backend/nodes/readfuncs.c	2010-01-05 12:39:25.000000000 +0100
--- cvs.build/src/backend/nodes/readfuncs.c	2010-01-22 00:42:56.000000000 +0100
***************
*** 231,236 ****
--- 231,237 ----
  	READ_LOCALS(NotifyStmt);
  
  	READ_STRING_FIELD(conditionname);
+ 	READ_STRING_FIELD(payload);
  
  	READ_DONE();
  }
diff -cr cvs.head/src/backend/parser/gram.y cvs.build/src/backend/parser/gram.y
*** cvs.head/src/backend/parser/gram.y	2010-01-06 22:30:07.000000000 +0100
--- cvs.build/src/backend/parser/gram.y	2010-01-22 00:42:56.000000000 +0100
***************
*** 399,405 ****
  
  %type <ival>	Iconst SignedIconst
  %type <list>	Iconst_list
! %type <str>		Sconst comment_text
  %type <str>		RoleId opt_granted_by opt_boolean ColId_or_Sconst
  %type <list>	var_list
  %type <str>		ColId ColLabel var_name type_function_name param_name
--- 399,405 ----
  
  %type <ival>	Iconst SignedIconst
  %type <list>	Iconst_list
! %type <str>		Sconst comment_text notify_payload
  %type <str>		RoleId opt_granted_by opt_boolean ColId_or_Sconst
  %type <list>	var_list
  %type <str>		ColId ColLabel var_name type_function_name param_name
***************
*** 6074,6083 ****
   *
   *****************************************************************************/
  
! NotifyStmt: NOTIFY ColId
  				{
  					NotifyStmt *n = makeNode(NotifyStmt);
  					n->conditionname = $2;
  					$$ = (Node *)n;
  				}
  		;
--- 6074,6089 ----
   *
   *****************************************************************************/
  
! notify_payload:
! 			Sconst								{ $$ = $1; }
! 			| /*EMPTY*/							{ $$ = NULL; }
! 		;
! 
! NotifyStmt: NOTIFY ColId notify_payload
  				{
  					NotifyStmt *n = makeNode(NotifyStmt);
  					n->conditionname = $2;
+ 					n->payload = $3;
  					$$ = (Node *)n;
  				}
  		;
diff -cr cvs.head/src/backend/storage/ipc/ipci.c cvs.build/src/backend/storage/ipc/ipci.c
*** cvs.head/src/backend/storage/ipc/ipci.c	2010-01-20 20:08:27.000000000 +0100
--- cvs.build/src/backend/storage/ipc/ipci.c	2010-01-22 00:44:47.000000000 +0100
***************
*** 20,25 ****
--- 20,26 ----
  #include "access/nbtree.h"
  #include "access/subtrans.h"
  #include "access/twophase.h"
+ #include "commands/async.h"
  #include "miscadmin.h"
  #include "pgstat.h"
  #include "postmaster/autovacuum.h"
***************
*** 225,230 ****
--- 226,232 ----
  	 */
  	BTreeShmemInit();
  	SyncScanShmemInit();
+ 	AsyncShmemInit();
  
  #ifdef EXEC_BACKEND
  
diff -cr cvs.head/src/backend/storage/lmgr/lwlock.c cvs.build/src/backend/storage/lmgr/lwlock.c
*** cvs.head/src/backend/storage/lmgr/lwlock.c	2010-01-05 12:39:29.000000000 +0100
--- cvs.build/src/backend/storage/lmgr/lwlock.c	2010-01-22 00:42:56.000000000 +0100
***************
*** 24,29 ****
--- 24,30 ----
  #include "access/clog.h"
  #include "access/multixact.h"
  #include "access/subtrans.h"
+ #include "commands/async.h"
  #include "miscadmin.h"
  #include "pg_trace.h"
  #include "storage/ipc.h"
***************
*** 174,179 ****
--- 175,183 ----
  	/* multixact.c needs two SLRU areas */
  	numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS;
  
+ 	/* async.c needs one per page for the AsyncQueue */
+ 	numLocks += NUM_ASYNC_BUFFERS;
+ 
  	/*
  	 * Add any requested by loadable modules; for backwards-compatibility
  	 * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
diff -cr cvs.head/src/backend/tcop/utility.c cvs.build/src/backend/tcop/utility.c
*** cvs.head/src/backend/tcop/utility.c	2010-01-20 20:08:28.000000000 +0100
--- cvs.build/src/backend/tcop/utility.c	2010-01-22 00:42:56.000000000 +0100
***************
*** 928,936 ****
  		case T_NotifyStmt:
  			{
  				NotifyStmt *stmt = (NotifyStmt *) parsetree;
  				PreventCommandDuringRecovery();
  
! 				Async_Notify(stmt->conditionname);
  			}
  			break;
  
--- 928,943 ----
  		case T_NotifyStmt:
  			{
  				NotifyStmt *stmt = (NotifyStmt *) parsetree;
+ 				/* XXX the new listen/notify version can be enabled
+ 				 * for Hot Standby */
  				PreventCommandDuringRecovery();
  
! 				if (stmt->payload
! 					&& strlen(stmt->payload) > NOTIFY_PAYLOAD_MAX_LENGTH - 1)
! 					ereport(ERROR,
! 							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
! 							 errmsg("payload string too long")));
! 				Async_Notify(stmt->conditionname, stmt->payload);
  			}
  			break;
  
diff -cr cvs.head/src/bin/initdb/initdb.c cvs.build/src/bin/initdb/initdb.c
*** cvs.head/src/bin/initdb/initdb.c	2010-01-08 10:48:31.000000000 +0100
--- cvs.build/src/bin/initdb/initdb.c	2010-01-22 00:42:56.000000000 +0100
***************
*** 2458,2463 ****
--- 2458,2464 ----
  		"pg_xlog",
  		"pg_xlog/archive_status",
  		"pg_clog",
+ 		"pg_notify",
  		"pg_subtrans",
  		"pg_twophase",
  		"pg_multixact/members",
diff -cr cvs.head/src/bin/psql/common.c cvs.build/src/bin/psql/common.c
*** cvs.head/src/bin/psql/common.c	2010-01-05 12:39:33.000000000 +0100
--- cvs.build/src/bin/psql/common.c	2010-01-22 00:42:56.000000000 +0100
***************
*** 555,562 ****
  
  	while ((notify = PQnotifies(pset.db)))
  	{
! 		fprintf(pset.queryFout, _("Asynchronous notification \"%s\" received from server process with PID %d.\n"),
! 				notify->relname, notify->be_pid);
  		fflush(pset.queryFout);
  		PQfreemem(notify);
  	}
--- 555,562 ----
  
  	while ((notify = PQnotifies(pset.db)))
  	{
! 		fprintf(pset.queryFout, _("Asynchronous notification \"%s\" (%s) received from server process with PID %d.\n"),
! 				notify->relname, notify->extra, notify->be_pid);
  		fflush(pset.queryFout);
  		PQfreemem(notify);
  	}
diff -cr cvs.head/src/bin/psql/tab-complete.c cvs.build/src/bin/psql/tab-complete.c
*** cvs.head/src/bin/psql/tab-complete.c	2010-01-05 12:39:33.000000000 +0100
--- cvs.build/src/bin/psql/tab-complete.c	2010-01-22 00:42:56.000000000 +0100
***************
*** 2099,2105 ****
  
  /* UNLISTEN */
  	else if (pg_strcasecmp(prev_wd, "UNLISTEN") == 0)
! 		COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(relname) FROM pg_catalog.pg_listener WHERE substring(pg_catalog.quote_ident(relname),1,%d)='%s' UNION SELECT '*'");
  
  /* UPDATE */
  	/* If prev. word is UPDATE suggest a list of tables */
--- 2099,2105 ----
  
  /* UNLISTEN */
  	else if (pg_strcasecmp(prev_wd, "UNLISTEN") == 0)
! 		COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(channel) FROM pg_catalog.pg_listening() AS channel WHERE substring(pg_catalog.quote_ident(channel),1,%d)='%s' UNION SELECT '*'");
  
  /* UPDATE */
  	/* If prev. word is UPDATE suggest a list of tables */
diff -cr cvs.head/src/include/access/slru.h cvs.build/src/include/access/slru.h
*** cvs.head/src/include/access/slru.h	2010-01-05 12:39:34.000000000 +0100
--- cvs.build/src/include/access/slru.h	2010-01-22 00:42:56.000000000 +0100
***************
*** 16,21 ****
--- 16,40 ----
  #include "access/xlogdefs.h"
  #include "storage/lwlock.h"
  
+ /*
+  * Define segment size.  A page is the same BLCKSZ as is used everywhere
+  * else in Postgres.  The segment size can be chosen somewhat arbitrarily;
+  * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG
+  * or 64K transactions for SUBTRANS.
+  *
+  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+  * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where
+  * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at
+  * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need
+  * take no explicit notice of that fact in this module, except when comparing
+  * segment and page numbers in SimpleLruTruncate (see PagePrecedes()).
+  *
+  * Note: this file currently assumes that segment file names will be four
+  * hex digits.	This sets a lower bound on the segment size (64K transactions
+  * for 32-bit TransactionIds).
+  */
+ #define SLRU_PAGES_PER_SEGMENT	32
+ 
  
  /*
   * Page status codes.  Note that these do not include the "dirty" bit.
diff -cr cvs.head/src/include/catalog/pg_proc.h cvs.build/src/include/catalog/pg_proc.h
*** cvs.head/src/include/catalog/pg_proc.h	2010-01-20 20:08:29.000000000 +0100
--- cvs.build/src/include/catalog/pg_proc.h	2010-01-22 00:42:56.000000000 +0100
***************
*** 4097,4102 ****
--- 4097,4104 ----
  DESCR("get the prepared statements for this session");
  DATA(insert OID = 2511 (  pg_cursor PGNSP PGUID 12 1 1000 0 f f f t t s 0 0 2249 "" "{25,25,16,16,16,1184}" "{o,o,o,o,o,o}" "{name,statement,is_holdable,is_binary,is_scrollable,creation_time}" _null_ pg_cursor _null_ _null_ _null_ ));
  DESCR("get the open cursors for this session");
+ DATA(insert OID = 2187 (  pg_listening	PGNSP	PGUID 12 1 10 0 f f f t t s 0 0 25 "" _null_ _null_ _null_ _null_ pg_listening _null_ _null_ _null_ ));
+ DESCR("get the channels that the current backend listens to");
  DATA(insert OID = 2599 (  pg_timezone_abbrevs	PGNSP PGUID 12 1 1000 0 f f f t t s 0 0 2249 "" "{25,1186,16}" "{o,o,o}" "{abbrev,utc_offset,is_dst}" _null_ pg_timezone_abbrevs _null_ _null_ _null_ ));
  DESCR("get the available time zone abbreviations");
  DATA(insert OID = 2856 (  pg_timezone_names		PGNSP PGUID 12 1 1000 0 f f f t t s 0 0 2249 "" "{25,25,1186,16}" "{o,o,o,o}" "{name,abbrev,utc_offset,is_dst}" _null_ pg_timezone_names _null_ _null_ _null_ ));
diff -cr cvs.head/src/include/commands/async.h cvs.build/src/include/commands/async.h
*** cvs.head/src/include/commands/async.h	2010-01-05 12:39:35.000000000 +0100
--- cvs.build/src/include/commands/async.h	2010-01-22 00:42:56.000000000 +0100
***************
*** 13,28 ****
  #ifndef ASYNC_H
  #define ASYNC_H
  
  extern bool Trace_notify;
  
  /* notify-related SQL statements */
! extern void Async_Notify(const char *relname);
  extern void Async_Listen(const char *relname);
  extern void Async_Unlisten(const char *relname);
  extern void Async_UnlistenAll(void);
  
  /* perform (or cancel) outbound notify processing at transaction commit */
! extern void AtCommit_Notify(void);
  extern void AtAbort_Notify(void);
  extern void AtSubStart_Notify(void);
  extern void AtSubCommit_Notify(void);
--- 13,42 ----
  #ifndef ASYNC_H
  #define ASYNC_H
  
+ /*
+  * How long can a payload string possibly be? Actually it needs to be one
+  * byte less to provide space for the trailing terminating '\0'.
+  */
+ #define NOTIFY_PAYLOAD_MAX_LENGTH	8000
+ 
+ /*
+  * How many page slots do we reserve ?
+  */
+ #define NUM_ASYNC_BUFFERS			4
+ 
  extern bool Trace_notify;
  
+ extern void AsyncShmemInit(void);
+ 
  /* notify-related SQL statements */
! extern void Async_Notify(const char *relname, const char *payload);
  extern void Async_Listen(const char *relname);
  extern void Async_Unlisten(const char *relname);
  extern void Async_UnlistenAll(void);
  
  /* perform (or cancel) outbound notify processing at transaction commit */
! extern void AtCommit_NotifyBeforeCommit(void);
! extern void AtCommit_NotifyAfterCommit(void);
  extern void AtAbort_Notify(void);
  extern void AtSubStart_Notify(void);
  extern void AtSubCommit_Notify(void);
***************
*** 43,46 ****
--- 57,62 ----
  extern void notify_twophase_postcommit(TransactionId xid, uint16 info,
  						   void *recdata, uint32 len);
  
+ extern Datum pg_listening(PG_FUNCTION_ARGS);
+ 
  #endif   /* ASYNC_H */
diff -cr cvs.head/src/include/nodes/parsenodes.h cvs.build/src/include/nodes/parsenodes.h
*** cvs.head/src/include/nodes/parsenodes.h	2010-01-20 20:08:30.000000000 +0100
--- cvs.build/src/include/nodes/parsenodes.h	2010-01-22 00:42:56.000000000 +0100
***************
*** 2081,2086 ****
--- 2081,2087 ----
  {
  	NodeTag		type;
  	char	   *conditionname;	/* condition name to notify */
+ 	char	   *payload;		/* the payload string to be conveyed */
  } NotifyStmt;
  
  /* ----------------------
diff -cr cvs.head/src/include/storage/lwlock.h cvs.build/src/include/storage/lwlock.h
*** cvs.head/src/include/storage/lwlock.h	2010-01-05 12:39:36.000000000 +0100
--- cvs.build/src/include/storage/lwlock.h	2010-01-22 00:44:25.000000000 +0100
***************
*** 67,72 ****
--- 67,74 ----
  	AutovacuumLock,
  	AutovacuumScheduleLock,
  	SyncScanLock,
+ 	AsyncCtlLock,
+ 	AsyncQueueLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
diff -cr cvs.head/src/include/utils/errcodes.h cvs.build/src/include/utils/errcodes.h
*** cvs.head/src/include/utils/errcodes.h	2010-01-05 12:39:36.000000000 +0100
--- cvs.build/src/include/utils/errcodes.h	2010-01-22 00:42:56.000000000 +0100
***************
*** 318,323 ****
--- 318,324 ----
  #define ERRCODE_STATEMENT_TOO_COMPLEX		MAKE_SQLSTATE('5','4', '0','0','1')
  #define ERRCODE_TOO_MANY_COLUMNS			MAKE_SQLSTATE('5','4', '0','1','1')
  #define ERRCODE_TOO_MANY_ARGUMENTS			MAKE_SQLSTATE('5','4', '0','2','3')
+ #define ERRCODE_TOO_MANY_ENTRIES			MAKE_SQLSTATE('5','4', '0','3','1')
  
  /* Class 55 - Object Not In Prerequisite State (class borrowed from DB2) */
  #define ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE	MAKE_SQLSTATE('5','5', '0','0','0')
diff -cr cvs.head/src/test/regress/expected/guc.out cvs.build/src/test/regress/expected/guc.out
*** cvs.head/src/test/regress/expected/guc.out	2009-11-22 06:20:41.000000000 +0100
--- cvs.build/src/test/regress/expected/guc.out	2010-01-22 00:42:56.000000000 +0100
***************
*** 532,540 ****
  CREATE ROLE temp_reset_user;
  SET SESSION AUTHORIZATION temp_reset_user;
  -- look changes
! SELECT relname FROM pg_listener;
!   relname  
! -----------
   foo_event
  (1 row)
  
--- 532,540 ----
  CREATE ROLE temp_reset_user;
  SET SESSION AUTHORIZATION temp_reset_user;
  -- look changes
! SELECT pg_listening();
!  pg_listening 
! --------------
   foo_event
  (1 row)
  
***************
*** 571,579 ****
  -- discard everything
  DISCARD ALL;
  -- look again
! SELECT relname FROM pg_listener;
!  relname 
! ---------
  (0 rows)
  
  SELECT name FROM pg_prepared_statements;
--- 571,579 ----
  -- discard everything
  DISCARD ALL;
  -- look again
! SELECT pg_listening();
!  pg_listening 
! --------------
  (0 rows)
  
  SELECT name FROM pg_prepared_statements;
diff -cr cvs.head/src/test/regress/expected/sanity_check.out cvs.build/src/test/regress/expected/sanity_check.out
*** cvs.head/src/test/regress/expected/sanity_check.out	2010-01-20 20:08:32.000000000 +0100
--- cvs.build/src/test/regress/expected/sanity_check.out	2010-01-22 00:42:56.000000000 +0100
***************
*** 107,113 ****
   pg_language             | t
   pg_largeobject          | t
   pg_largeobject_metadata | t
-  pg_listener             | f
   pg_namespace            | t
   pg_opclass              | t
   pg_operator             | t
--- 107,112 ----
***************
*** 154,160 ****
   timetz_tbl              | f
   tinterval_tbl           | f
   varchar_tbl             | f
! (143 rows)
  
  --
  -- another sanity check: every system catalog that has OIDs should have
--- 153,159 ----
   timetz_tbl              | f
   tinterval_tbl           | f
   varchar_tbl             | f
! (142 rows)
  
  --
  -- another sanity check: every system catalog that has OIDs should have
diff -cr cvs.head/src/test/regress/sql/guc.sql cvs.build/src/test/regress/sql/guc.sql
*** cvs.head/src/test/regress/sql/guc.sql	2009-10-21 22:38:58.000000000 +0200
--- cvs.build/src/test/regress/sql/guc.sql	2010-01-22 00:42:56.000000000 +0100
***************
*** 165,171 ****
  CREATE ROLE temp_reset_user;
  SET SESSION AUTHORIZATION temp_reset_user;
  -- look changes
! SELECT relname FROM pg_listener;
  SELECT name FROM pg_prepared_statements;
  SELECT name FROM pg_cursors;
  SHOW vacuum_cost_delay;
--- 165,171 ----
  CREATE ROLE temp_reset_user;
  SET SESSION AUTHORIZATION temp_reset_user;
  -- look changes
! SELECT pg_listening();
  SELECT name FROM pg_prepared_statements;
  SELECT name FROM pg_cursors;
  SHOW vacuum_cost_delay;
***************
*** 174,180 ****
  -- discard everything
  DISCARD ALL;
  -- look again
! SELECT relname FROM pg_listener;
  SELECT name FROM pg_prepared_statements;
  SELECT name FROM pg_cursors;
  SHOW vacuum_cost_delay;
--- 174,180 ----
  -- discard everything
  DISCARD ALL;
  -- look again
! SELECT pg_listening();
  SELECT name FROM pg_prepared_statements;
  SELECT name FROM pg_cursors;
  SHOW vacuum_cost_delay;
