diff -cr cvs.head/src/backend/access/transam/slru.c cvs.build/src/backend/access/transam/slru.c
*** cvs.head/src/backend/access/transam/slru.c	2010-01-05 12:39:22.000000000 +0100
--- cvs.build/src/backend/access/transam/slru.c	2010-01-08 11:00:55.000000000 +0100
***************
*** 58,83 ****
  #include "storage/shmem.h"
  #include "miscadmin.h"
  
- 
- /*
-  * Define segment size.  A page is the same BLCKSZ as is used everywhere
-  * else in Postgres.  The segment size can be chosen somewhat arbitrarily;
-  * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG
-  * or 64K transactions for SUBTRANS.
-  *
-  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
-  * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where
-  * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at
-  * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need
-  * take no explicit notice of that fact in this module, except when comparing
-  * segment and page numbers in SimpleLruTruncate (see PagePrecedes()).
-  *
-  * Note: this file currently assumes that segment file names will be four
-  * hex digits.	This sets a lower bound on the segment size (64K transactions
-  * for 32-bit TransactionIds).
-  */
- #define SLRU_PAGES_PER_SEGMENT	32
- 
  #define SlruFileName(ctl, path, seg) \
  	snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg)
  
--- 58,63 ----
diff -cr cvs.head/src/backend/access/transam/xact.c cvs.build/src/backend/access/transam/xact.c
*** cvs.head/src/backend/access/transam/xact.c	2010-01-05 12:39:22.000000000 +0100
--- cvs.build/src/backend/access/transam/xact.c	2010-01-08 11:00:55.000000000 +0100
***************
*** 1729,1737 ****
  	/* close large objects before lower-level cleanup */
  	AtEOXact_LargeObject(true);
  
! 	/* NOTIFY commit must come before lower-level cleanup */
! 	AtCommit_Notify();
  
  	/* Prevent cancel/die interrupt while cleaning up */
  	HOLD_INTERRUPTS();
  
--- 1729,1738 ----
  	/* close large objects before lower-level cleanup */
  	AtEOXact_LargeObject(true);
  
! 	/* Insert notifications sent by the NOTIFY command into the queue */
! 	AtCommit_NotifyBeforeCommit();
  
+ 	Assert(s->state == TRANS_INPROGRESS);
  	/* Prevent cancel/die interrupt while cleaning up */
  	HOLD_INTERRUPTS();
  
***************
*** 1805,1810 ****
--- 1806,1816 ----
  
  	AtEOXact_MultiXact();
  
+ 	/*
+ 	 * Clean up Notify buffers and signal listening backends.
+ 	 */
+ 	AtCommit_NotifyAfterCommit();
+ 
  	ResourceOwnerRelease(TopTransactionResourceOwner,
  						 RESOURCE_RELEASE_LOCKS,
  						 true, true);
diff -cr cvs.head/src/backend/catalog/Makefile cvs.build/src/backend/catalog/Makefile
*** cvs.head/src/backend/catalog/Makefile	2010-01-06 22:30:05.000000000 +0100
--- cvs.build/src/backend/catalog/Makefile	2010-01-08 11:01:53.000000000 +0100
***************
*** 30,36 ****
  	pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \
  	pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
  	pg_language.h pg_largeobject_metadata.h pg_largeobject.h pg_aggregate.h \
! 	pg_statistic.h pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h \
  	pg_cast.h pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \
  	pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \
  	pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \
--- 30,36 ----
  	pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \
  	pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
  	pg_language.h pg_largeobject_metadata.h pg_largeobject.h pg_aggregate.h \
! 	pg_statistic.h pg_rewrite.h pg_trigger.h pg_description.h \
  	pg_cast.h pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \
  	pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \
  	pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \
diff -cr cvs.head/src/backend/commands/async.c cvs.build/src/backend/commands/async.c
*** cvs.head/src/backend/commands/async.c	2010-01-05 12:39:22.000000000 +0100
--- cvs.build/src/backend/commands/async.c	2010-01-08 11:00:55.000000000 +0100
***************
*** 14,44 ****
  
  /*-------------------------------------------------------------------------
   * New Async Notification Model:
!  * 1. Multiple backends on same machine.  Multiple backends listening on
!  *	  one relation.  (Note: "listening on a relation" is not really the
!  *	  right way to think about it, since the notify names need not have
!  *	  anything to do with the names of relations actually in the database.
!  *	  But this terminology is all over the code and docs, and I don't feel
!  *	  like trying to replace it.)
!  *
!  * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
!  *	  ie, each relname/listenerPID pair.  The "notification" field of the
!  *	  tuple is zero when no NOTIFY is pending for that listener, or the PID
!  *	  of the originating backend when a cross-backend NOTIFY is pending.
!  *	  (We skip writing to pg_listener when doing a self-NOTIFY, so the
!  *	  notification field should never be equal to the listenerPID field.)
!  *
!  * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
!  *	  relname to a list of outstanding NOTIFY requests.  Actual processing
!  *	  happens if and only if we reach transaction commit.  At that time (in
!  *	  routine AtCommit_Notify) we scan pg_listener for matching relnames.
!  *	  If the listenerPID in a matching tuple is ours, we just send a notify
!  *	  message to our own front end.  If it is not ours, and "notification"
!  *	  is not already nonzero, we set notification to our own PID and send a
!  *	  PROCSIG_NOTIFY_INTERRUPT signal to the receiving process (indicated by
!  *	  listenerPID).
!  *	  BTW: if the signal operation fails, we presume that the listener backend
!  *	  crashed without removing this tuple, and remove the tuple for it.
   *
   * 4. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
   *	  can call inbound-notify processing immediately if this backend is idle
--- 14,79 ----
  
  /*-------------------------------------------------------------------------
   * New Async Notification Model:
!  *
!  * 1. Multiple backends on same machine. Multiple backends listening on
!  *	  several channels. (This was previously called a "relation" even though it
!  *	  is just an identifier and has nothing to do with a database relation.)
!  *
!  * 2. There is one central queue in the form of Slru backed file based storage
!  *    (directory pg_notify/), with several pages mapped into shared memory.
!  *
!  *    There is no central storage of which backend listens on which channel,
!  *    every backend has its own list.
!  *
!  *    Every backend that is listening on at least one channel registers by
!  *    entering its Pid into the array of all backends. It then scans all
!  *    incoming notifications and compares the notified channels with its list.
!  *
!  *    In case there is a match it delivers the corresponding notification to
!  *    its frontend.
!  *
!  * 3. The NOTIFY statement (routine Async_Notify) stores the notification
!  *    in a list which will not be processed until at transaction end. Every
!  *    notification can additionally send a "payload" which is an extra text
!  *    parameter to convey arbitrary information to the recipient.
!  *
!  *    Duplicate notifications from the same transaction are sent out as one
!  *    notification only. This is done to save work when for example a trigger
!  *    on a 2 million row table fires a notification for each row that has been
!  *    changed. If the applications needs to receive every single notification
!  *    that has been sent, it can easily add some unique string into the extra
!  *    payload parameter.
!  *
!  *    Once the transaction commits, AtCommit_NotifyBeforeCommit() performs the
!  *    required changes regarding listeners (Listen/Unlisten) and then adds the
!  *    pending notifications to the head of the queue. The head pointer of the
!  *    queue always points to the next free position and a position is just a
!  *    page number and the offset in that page. This is done before marking the
!  *    transaction as committed in clog. If we run into problems writing the
!  *    notifications, we can still call elog(ERROR, ...) and the transaction
!  *    will roll back.
!  *
!  *    Once we have put all of the notifications into the queue, we return to
!  *    CommitTransaction() which will then commit to clog.
!  *
!  *    In order to ensure transactional processing there is AsyncCommitOrderLock
!  *    that has to be grabbed exclusively by all notifications that send NOTIFYs
!  *    do LISTENs and UNLISTENs but only for the time when those transactions
!  *    commit to clog. For example, one issue here is that a transaction sending
!  *    notifications could store them into the list while another transaction
!  *    that started later does a LISTEN on a channel and commits. Then it has to
!  *    see the notifications of the longer running transaction, because it
!  *    committed earlier (even though it started later).
!  *
!  *    After clog commit we are called another time
!  *    (AtCommit_NotifyAfterCommit()). If we have executed either LISTEN or
!  *    UNLISTEN, we register any running notifying transactions and release the
!  *    lock to be able to solve exactly the problem described above. We then
!  *    check if we need to signal the backends. In SignalBackends() we scan the
!  *    list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT to every
!  *    backend that has set its Pid (we don't know which backend is listening on
!  *    which channel so we need to send a signal to every listening backend). We
!  *    can exclude backends that are already up to date.
   *
   * 4. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
   *	  can call inbound-notify processing immediately if this backend is idle
***************
*** 46,97 ****
   *	  block).  Otherwise the handler may only set a flag, which will cause the
   *	  processing to occur just before we next go idle.
   *
!  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
!  *	  matching our own listenerPID and having nonzero notification fields.
!  *	  For each such tuple, we send a message to our frontend and clear the
!  *	  notification field.  BTW: this routine has to start/commit its own
!  *	  transaction, since by assumption it is only called from outside any
!  *	  transaction.
!  *
!  * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list
!  * of pending actions.	If we reach transaction commit, the changes are
!  * applied to pg_listener just before executing any pending NOTIFYs.  This
!  * method is necessary because to avoid race conditions, we must hold lock
!  * on pg_listener from when we insert a new listener tuple until we commit.
!  * To do that and not create undue hazard of deadlock, we don't want to
!  * touch pg_listener until we are otherwise done with the transaction;
!  * in particular it'd be uncool to still be taking user-commanded locks
!  * while holding the pg_listener lock.
!  *
!  * Although we grab ExclusiveLock on pg_listener for any operation,
!  * the lock is never held very long, so it shouldn't cause too much of
!  * a performance problem.  (Previously we used AccessExclusiveLock, but
!  * there's no real reason to forbid concurrent reads.)
   *
!  * An application that listens on the same relname it notifies will get
   * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
   * by comparing be_pid in the NOTIFY message to the application's own backend's
!  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
   * frontend during startup.)  The above design guarantees that notifies from
!  * other backends will never be missed by ignoring self-notifies.  Note,
!  * however, that we do *not* guarantee that a separate frontend message will
!  * be sent for every outside NOTIFY.  Since there is only room for one
!  * originating PID in pg_listener, outside notifies occurring at about the
!  * same time may be collapsed into a single message bearing the PID of the
!  * first outside backend to perform the NOTIFY.
   *-------------------------------------------------------------------------
   */
  
  #include "postgres.h"
  
  #include <unistd.h>
  #include <signal.h>
  
  #include "access/heapam.h"
  #include "access/twophase_rmgr.h"
  #include "access/xact.h"
! #include "catalog/pg_listener.h"
  #include "commands/async.h"
  #include "libpq/libpq.h"
  #include "libpq/pqformat.h"
  #include "miscadmin.h"
--- 81,127 ----
   *	  block).  Otherwise the handler may only set a flag, which will cause the
   *	  processing to occur just before we next go idle.
   *
!  * 5. Inbound-notify processing consists of reading all of the notifications
!  *	  that have arrived since scanning last time. We read every notification
!  *	  until we reach the head pointer's position. Then we check if we were the
!  *	  laziest backend: if our pointer is set to the same position as the global
!  *	  tail pointer is set, then we set it further to the second-laziest
!  *	  backend (We can identify it by inspecting the positions of all other
!  *	  backends' pointers). Whenever we move the tail pointer we also truncate
!  *	  now unused pages (i.e. delete files in pg_notify/ that are no longer
!  *	  used).
   *
!  * An application that listens on the same channel it notifies will get
   * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
   * by comparing be_pid in the NOTIFY message to the application's own backend's
!  * Pid.  (As of FE/BE protocol 2.0, the backend's Pid is provided to the
   * frontend during startup.)  The above design guarantees that notifies from
!  * other backends will never be missed by ignoring self-notifies.
   *-------------------------------------------------------------------------
   */
  
+ /* XXX 
+  *
+  * TODO:
+  *  - guc parameter max_notifies_per_txn ??
+  *  - adapt comments
+  *  - 2PC
+  *  - limit to ASCII?
+  */
+ 
  #include "postgres.h"
  
  #include <unistd.h>
  #include <signal.h>
  
  #include "access/heapam.h"
+ #include "access/slru.h"
+ #include "access/transam.h"
  #include "access/twophase_rmgr.h"
  #include "access/xact.h"
! #include "catalog/pg_type.h"
  #include "commands/async.h"
+ #include "funcapi.h"
  #include "libpq/libpq.h"
  #include "libpq/pqformat.h"
  #include "miscadmin.h"
***************
*** 108,115 ****
  
  /*
   * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
!  * all actions requested in the current transaction.  As explained above,
!  * we don't actually modify pg_listener until we reach transaction commit.
   *
   * The list is kept in CurTransactionContext.  In subtransactions, each
   * subtransaction has its own list in its own CurTransactionContext, but
--- 138,145 ----
  
  /*
   * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
!  * all actions requested in the current transaction. As explained above,
!  * we don't actually send notifications until we reach transaction commit.
   *
   * The list is kept in CurTransactionContext.  In subtransactions, each
   * subtransaction has its own list in its own CurTransactionContext, but
***************
*** 134,140 ****
  static List *upperPendingActions = NIL; /* list of upper-xact lists */
  
  /*
!  * State for outbound notifies consists of a list of all relnames NOTIFYed
   * in the current transaction.	We do not actually perform a NOTIFY until
   * and unless the transaction commits.	pendingNotifies is NIL if no
   * NOTIFYs have been done in the current transaction.
--- 164,170 ----
  static List *upperPendingActions = NIL; /* list of upper-xact lists */
  
  /*
!  * State for outbound notifies consists of a list of all channels NOTIFYed
   * in the current transaction.	We do not actually perform a NOTIFY until
   * and unless the transaction commits.	pendingNotifies is NIL if no
   * NOTIFYs have been done in the current transaction.
***************
*** 149,160 ****
   * condition name, it will get a self-notify at commit.  This is a bit odd
   * but is consistent with our historical behavior.
   */
- static List *pendingNotifies = NIL;		/* list of C strings */
  
  static List *upperPendingNotifies = NIL;		/* list of upper-xact lists */
  
  /*
!  * State for inbound notifies consists of two flags: one saying whether
   * the signal handler is currently allowed to call ProcessIncomingNotify
   * directly, and one saying whether the signal has occurred but the handler
   * was not allowed to call ProcessIncomingNotify at the time.
--- 179,321 ----
   * condition name, it will get a self-notify at commit.  This is a bit odd
   * but is consistent with our historical behavior.
   */
  
+ typedef struct QueuePosition
+ {
+ 	int				page;
+ 	int				offset;
+ } QueuePosition;
+ 
+ typedef struct Notification
+ {
+ 	char		   *channel;
+ 	char		   *payload;
+ 	TransactionId	xid;
+ 	int32			srcPid;
+ 	QueuePosition	position;
+ } Notification;
+ 
+ typedef struct AsyncQueueEntry
+ {
+ 	/*
+ 	 * this record has the maximal length, but usually we limit it to
+ 	 * AsyncQueueEntryEmptySize + strlen(payload).
+ 	 */
+ 	Size			length;
+ 	Oid				dboid;
+ 	TransactionId	xid;
+ 	int32			srcPid;
+ 	char			channel[NAMEDATALEN];
+ 	char			payload[NOTIFY_PAYLOAD_MAX_LENGTH];
+ } AsyncQueueEntry;
+ #define AsyncQueueEntryEmptySize \
+ 	 (sizeof(AsyncQueueEntry) - NOTIFY_PAYLOAD_MAX_LENGTH + 1)
+ 
+ #define	InvalidPid				(-1)
+ #define QUEUE_POS_PAGE(x)		((x).page)
+ #define QUEUE_POS_OFFSET(x)		((x).offset)
+ #define QUEUE_POS_EQUAL(x,y) \
+ 	 ((x).page == (y).page ? (x).offset == (y).offset : false)
+ #define SET_QUEUE_POS(x,y,z) \
+ 	do { \
+ 		(x).page = (y); \
+ 		(x).offset = (z); \
+ 	} while (0);
+ /* does page x logically precede page y with z = HEAD ? */
+ #define QUEUE_POS_MIN(x,y,z) \
+ 	asyncQueuePagePrecedesLogically((x).page, (y).page, (z).page) ? (x) : \
+ 		 asyncQueuePagePrecedesLogically((y).page, (x).page, (z).page) ? (y) : \
+ 			 (x).offset < (y).offset ? (x) : \
+ 			 	(y)
+ #define QUEUE_POS_LT(x,y,z) \
+ 		(QUEUE_POS_EQUAL(x,QUEUE_POS_MIN(x,y,z)) && !QUEUE_POS_EQUAL(x,y))
+ #define QUEUE_POS_LE(x,y,z) \
+ 		(QUEUE_POS_EQUAL(x,QUEUE_POS_MIN(x,y,z)))
+ #define QUEUE_BACKEND_POS(i)		asyncQueueControl->backend[(i)].pos
+ #define QUEUE_BACKEND_PID(i)		asyncQueueControl->backend[(i)].pid
+ #define QUEUE_BACKEND_XID(i)		asyncQueueControl->backend[(i)].xid
+ #define QUEUE_HEAD					asyncQueueControl->head
+ #define QUEUE_TAIL					asyncQueueControl->tail
+ 
+ typedef struct QueueBackendStatus
+ {
+ 	int32			pid;
+ 	QueuePosition	pos;
+ 	TransactionId	xid;  /* this is protected by AsyncQueueCommitOrderLock,
+ 							 no lock required to read the own entry,
+ 							 LW_SHARED is sufficient for writing the own entry,
+ 							 LW_EXCLUSIVE to read everybody else's. */
+ } QueueBackendStatus;
+ 
+ /*
+  * The AsyncQueueControl structure is protected by the AsyncQueueLock.
+  *
+  * In SHARED mode, backends will only inspect their own entries as well as
+  * head and tail pointers. Consequently we can allow a backend to update its
+  * own record while holding only a shared lock (since no other backend will
+  * inspect it).
+  *
+  * In EXCLUSIVE mode, backends can inspect the entries of other backends and
+  * also change head and tail pointers.
+  *
+  * In order to avoid deadlocks, whenever we need both locks, we always first
+  * get AsyncQueueLock and then AsyncCtlLock.
+  */
+ typedef struct AsyncQueueControl
+ {
+ 	QueuePosition		head;		/* head points to the next free location */
+ 	QueuePosition 		tail;		/* the global tail is equivalent to the
+ 									   tail of the "slowest" backend */
+ 	TimestampTz			lastQueueFillWarn;	/* when the queue is full we only
+ 											   want to log that once in a
+ 											   while */
+ 	int					listeningBackends;
+ 	QueueBackendStatus	backend[1];	/* actually this one has as many entries as
+ 									 * connections are allowed (MaxBackends) */
+ 	/* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
+ } AsyncQueueControl;
+ 
+ static AsyncQueueControl   *asyncQueueControl;
+ static SlruCtlData			AsyncCtlData;
+ 
+ #define AsyncCtl					(&AsyncCtlData)
+ #define QUEUE_PAGESIZE				BLCKSZ
+ #define QUEUE_FULL_WARN_INTERVAL	5000	/* warn at most once every 5s */
+ 
+ /*
+  * slru.c currently assumes that all filenames are four characters of hex
+  * digits. That means that we can use segments 0000 through FFFF.
+  * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us
+  * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0xFFFF.
+  *
+  * It's of course easy to enhance slru.c but those pages give us so much
+  * space already that it doesn't seem worth the trouble...
+  *
+  * It's a legal test case to define QUEUE_MAX_PAGE to a very small multiply of
+  * SLRU_PAGES_PER_SEGMENT to test queue full behaviour.
+  */
+ #define QUEUE_MAX_PAGE			(SLRU_PAGES_PER_SEGMENT * 0xFFFF)
+ 
+ static List *pendingNotifies = NIL;				/* list of Notifications */
  static List *upperPendingNotifies = NIL;		/* list of upper-xact lists */
+ static List *listenChannels = NIL;	/* list of channels we are listening to */
+ 
+ static bool backendSendsNotifications = false;
+ static bool backendExecutesListen = false;
+ static bool backendExecutesUnlisten = false; 
+ 
+ typedef struct
+ {
+ 	char			   *channel;
+ 	List			   *xids;
+ 	QueuePosition		limitPos;
+ 	ListenActionKind	kind;
+ } ActionPhaseInOut;
+ 
+ static List *ActionPhaseInOutList = NIL;
  
  /*
!  * State for inbound notifications consists of two flags: one saying whether
   * the signal handler is currently allowed to call ProcessIncomingNotify
   * directly, and one saying whether the signal has occurred but the handler
   * was not allowed to call ProcessIncomingNotify at the time.
***************
*** 171,207 ****
  
  bool		Trace_notify = false;
  
- 
  static void queue_listen(ListenActionKind action, const char *condname);
  static void Async_UnlistenOnExit(int code, Datum arg);
! static void Exec_Listen(Relation lRel, const char *relname);
! static void Exec_Unlisten(Relation lRel, const char *relname);
! static void Exec_UnlistenAll(Relation lRel);
! static void Send_Notify(Relation lRel);
  static void ProcessIncomingNotify(void);
! static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
! static bool AsyncExistsPendingNotify(const char *relname);
  static void ClearPendingActionsAndNotifies(void);
  
  
  /*
   * Async_Notify
   *
   *		This is executed by the SQL notify command.
   *
!  *		Adds the relation to the list of pending notifies.
   *		Actual notification happens during transaction commit.
   *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   */
  void
! Async_Notify(const char *relname)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Notify(%s)", relname);
  
  	/* no point in making duplicate entries in the list ... */
! 	if (!AsyncExistsPendingNotify(relname))
  	{
  		/*
  		 * The name list needs to live until end of transaction, so store it
  		 * in the transaction context.
--- 332,484 ----
  
  bool		Trace_notify = false;
  
  static void queue_listen(ListenActionKind action, const char *condname);
  static void Async_UnlistenOnExit(int code, Datum arg);
! static bool IsListeningOn(const char *channel);
! static bool IsInListenChannels(const char *channel);
! static void asyncQueuePhaseInOut(ListenActionKind kind, const char *channel,
! 								 QueuePosition limitPos);
! static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
! static void Exec_Listen(const char *channel);
! static void Exec_Unlisten(const char *channel);
! static void Exec_UnlistenAll(void);
! static void SignalBackends(void);
! static void Send_Notify(void);
! static bool asyncQueuePagePrecedesPhysically(int p, int q);
! static bool asyncQueuePagePrecedesLogically(int p, int q, int head);
! static bool asyncQueueAdvance(QueuePosition *position, int entryLength);
! static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
! static void asyncQueueEntryToNotification(AsyncQueueEntry *qe, Notification *n,
! 										  QueuePosition pos);
! static List *asyncQueueAddEntries(List *notifications);
! static bool asyncQueueGetEntriesByPage(QueuePosition *current,
! 									   QueuePosition stop,
! 									   List **notifications);
! static void asyncQueueReadAllNotifications(void);
! static void asyncQueueAdvanceTail(void);
  static void ProcessIncomingNotify(void);
! static void NotifyMyFrontEnd(const char *channel,
! 							 const char *payload,
! 							 int32 srcPid);
! static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
  static void ClearPendingActionsAndNotifies(void);
  
+ /*
+  * We will work on the page range of 0..(SLRU_PAGES_PER_SEGMENT * 0xFFFF).
+  * asyncQueuePagePrecedesPhysically just checks numerically without any magic if
+  * one page precedes another one.
+  *
+  * On the other hand, when asyncQueuePagePrecedesLogically does that check, it
+  * takes the current head page number into account. If we have wrapped
+  * around, it can happen that p precedes q, even though p > q (if the head page
+  * is in between the two).
+  */ 
+ static bool
+ asyncQueuePagePrecedesPhysically(int p, int q)
+ {
+ 	return p < q;
+ }
+ 
+ static bool
+ asyncQueuePagePrecedesLogically(int p, int q, int head)
+ {
+ 	if (p <= head && q <= head)
+ 		return p < q;
+ 	if (p > head && q > head)
+ 		return p < q;
+ 	if (p <= head)
+ 	{
+ 		Assert(q > head);
+ 		/* q is older */
+ 		return false;
+ 	}
+ 	else
+ 	{
+ 		Assert(p > head && q <= head);
+ 		/* p is older */
+ 		return true;
+ 	}
+ }
+ 
+ void
+ AsyncShmemInit(void)
+ {
+ 	bool	found;
+ 	int		slotno;
+ 	Size	size;
+ 
+ 	/*
+ 	 * Remember that sizeof(AsyncQueueControl) already contains one member of
+ 	 * QueueBackendStatus, so we only need to add the status space requirement
+ 	 * for MaxBackends-1 backends.
+ 	 */
+ 	size = mul_size(MaxBackends-1, sizeof(QueueBackendStatus));
+ 	size = add_size(size, sizeof(AsyncQueueControl));
+ 
+ 	asyncQueueControl = (AsyncQueueControl *)
+ 		ShmemInitStruct("Async Queue Control", size, &found);
+ 
+ 	if (!asyncQueueControl)
+ 		elog(ERROR, "out of memory");
+ 
+ 	if (!found)
+ 	{
+ 		int		i;
+ 		SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
+ 		SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
+ 		for (i = 0; i < MaxBackends; i++)
+ 		{
+ 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+ 			QUEUE_BACKEND_PID(i) = InvalidPid;
+ 			QUEUE_BACKEND_XID(i) = InvalidTransactionId;
+ 		}
+ 	}
+ 
+ 	AsyncCtl->PagePrecedes = asyncQueuePagePrecedesPhysically;
+ 	SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFERS, 0,
+ 				  AsyncCtlLock, "pg_notify");
+ 	AsyncCtl->do_fsync = false;
+ 	asyncQueueControl->lastQueueFillWarn = GetCurrentTimestamp();
+ 
+ 	if (!found)
+ 	{
+ 		LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+ 		LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
+ 		slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
+ 		AsyncCtl->shared->page_dirty[slotno] = true;
+ 		SimpleLruWritePage(AsyncCtl, slotno, NULL);
+ 		LWLockRelease(AsyncCtlLock);
+ 		LWLockRelease(AsyncQueueLock);
+ 
+ 		SimpleLruTruncate(AsyncCtl, 0);
+ 	}
+ }
+ 
  
  /*
   * Async_Notify
   *
   *		This is executed by the SQL notify command.
   *
!  *		Adds the channel to the list of pending notifies.
   *		Actual notification happens during transaction commit.
   *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   */
  void
! Async_Notify(const char *channel, const char *payload)
  {
+ 
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Notify(%s)", channel);
! 
! 	/*
! 	 * XXX - do we now need a guc parameter max_notifies_per_txn?
! 	 */ 
  
  	/* no point in making duplicate entries in the list ... */
! 	if (!AsyncExistsPendingNotify(channel, payload))
  	{
+ 		Notification *n;
  		/*
  		 * The name list needs to live until end of transaction, so store it
  		 * in the transaction context.
***************
*** 210,221 ****
  
  		oldcontext = MemoryContextSwitchTo(CurTransactionContext);
  
  		/*
! 		 * Ordering of the list isn't important.  We choose to put new entries
! 		 * on the front, as this might make duplicate-elimination a tad faster
! 		 * when the same condition is signaled many times in a row.
  		 */
! 		pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
  
  		MemoryContextSwitchTo(oldcontext);
  	}
--- 487,509 ----
  
  		oldcontext = MemoryContextSwitchTo(CurTransactionContext);
  
+ 		n = (Notification *) palloc(sizeof(Notification));
+ 		n->channel = pstrdup(channel);
+ 		if (payload)
+ 			n->payload = pstrdup(payload);
+ 		else
+ 			n->payload = "";
+ 		/* will set the xid and the srcPid later... */
+ 		n->xid = InvalidTransactionId;
+ 		n->srcPid = InvalidPid;
+ 		/* we don't set n->position here. It is unknown and we won't do anything
+ 		 * with it at this point anyway. */
+ 
  		/*
! 		 * We want to preserve the order so we need to append every
! 		 * notification. See comments at AsyncExistsPendingNotify().
  		 */
! 		pendingNotifies = lappend(pendingNotifies, n);
  
  		MemoryContextSwitchTo(oldcontext);
  	}
***************
*** 259,270 ****
   *		This is executed by the SQL listen command.
   */
  void
! Async_Listen(const char *relname)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid);
  
! 	queue_listen(LISTEN_LISTEN, relname);
  }
  
  /*
--- 547,558 ----
   *		This is executed by the SQL listen command.
   */
  void
! Async_Listen(const char *channel)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
  
! 	queue_listen(LISTEN_LISTEN, channel);
  }
  
  /*
***************
*** 273,288 ****
   *		This is executed by the SQL unlisten command.
   */
  void
! Async_Unlisten(const char *relname)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid);
  
  	/* If we couldn't possibly be listening, no need to queue anything */
  	if (pendingActions == NIL && !unlistenExitRegistered)
  		return;
  
! 	queue_listen(LISTEN_UNLISTEN, relname);
  }
  
  /*
--- 561,576 ----
   *		This is executed by the SQL unlisten command.
   */
  void
! Async_Unlisten(const char *channel)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
  
  	/* If we couldn't possibly be listening, no need to queue anything */
  	if (pendingActions == NIL && !unlistenExitRegistered)
  		return;
  
! 	queue_listen(LISTEN_UNLISTEN, channel);
  }
  
  /*
***************
*** 306,313 ****
  /*
   * Async_UnlistenOnExit
   *
-  *		Clean up the pg_listener table at backend exit.
-  *
   *		This is executed if we have done any LISTENs in this backend.
   *		It might not be necessary anymore, if the user UNLISTENed everything,
   *		but we don't try to detect that case.
--- 594,599 ----
***************
*** 315,331 ****
  static void
  Async_UnlistenOnExit(int code, Datum arg)
  {
  	/*
! 	 * We need to start/commit a transaction for the unlisten, but if there is
! 	 * already an active transaction we had better abort that one first.
! 	 * Otherwise we'd end up committing changes that probably ought to be
! 	 * discarded.
  	 */
! 	AbortOutOfAnyTransaction();
! 	/* Now we can do the unlisten */
! 	StartTransactionCommand();
! 	Async_UnlistenAll();
! 	CommitTransactionCommand();
  }
  
  /*
--- 601,625 ----
  static void
  Async_UnlistenOnExit(int code, Datum arg)
  {
+ 	bool	advanceTail = false;
+ 
+ 	AbortOutOfAnyTransaction();
+ 
+ 	LWLockAcquire(AsyncCommitOrderLock, LW_SHARED);
+ 	QUEUE_BACKEND_XID(MyBackendId) = InvalidTransactionId;
+ 	LWLockRelease(AsyncCommitOrderLock);
+ 
+ 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
+ 	QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
  	/*
! 	 * If we have been the last backend, advance the tail pointer.
  	 */
! 	if (QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL))
! 		advanceTail = true;
! 	LWLockRelease(AsyncQueueLock);
! 
! 	if (advanceTail)
! 		asyncQueueAdvanceTail();
  }
  
  /*
***************
*** 348,357 ****
  	/* We can deal with pending NOTIFY though */
  	foreach(p, pendingNotifies)
  	{
! 		const char *relname = (const char *) lfirst(p);
  
  		RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
! 							   relname, strlen(relname) + 1);
  	}
  
  	/*
--- 642,656 ----
  	/* We can deal with pending NOTIFY though */
  	foreach(p, pendingNotifies)
  	{
! 		AsyncQueueEntry qe;
! 		Notification   *n;
! 
! 		n = (Notification *) lfirst(p);
! 
! 		asyncQueueNotificationToEntry(n, &qe);
  
  		RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
! 							   &qe, qe.length);
  	}
  
  	/*
***************
*** 363,386 ****
  }
  
  /*
!  * AtCommit_Notify
!  *
!  *		This is called at transaction commit.
   *
!  *		If there are pending LISTEN/UNLISTEN actions, insert or delete
!  *		tuples in pg_listener accordingly.
   *
!  *		If there are outbound notify requests in the pendingNotifies list,
!  *		scan pg_listener for matching tuples, and either signal the other
!  *		backend or send a message to our own frontend.
   *
!  *		NOTE: we are still inside the current transaction, therefore can
!  *		piggyback on its committing of changes.
   */
  void
! AtCommit_Notify(void)
  {
- 	Relation	lRel;
  	ListCell   *p;
  
  	if (pendingActions == NIL && pendingNotifies == NIL)
--- 662,681 ----
  }
  
  /*
!  * AtCommit_NotifyBeforeCommit
   *
!  *		This is called at transaction commit, before actually committing to
!  *		clog.
   *
!  *		If there are pending LISTEN/UNLISTEN actions, update our
!  *		"listenChannels" list.
   *
!  *		If there are outbound notify requests in the pendingNotifies list, add
!  *		them to the global queue and signal any backend that is listening.
   */
  void
! AtCommit_NotifyBeforeCommit(void)
  {
  	ListCell   *p;
  
  	if (pendingActions == NIL && pendingNotifies == NIL)
***************
*** 397,406 ****
  	}
  
  	if (Trace_notify)
! 		elog(DEBUG1, "AtCommit_Notify");
  
! 	/* Acquire ExclusiveLock on pg_listener */
! 	lRel = heap_open(ListenerRelationId, ExclusiveLock);
  
  	/* Perform any pending listen/unlisten actions */
  	foreach(p, pendingActions)
--- 692,702 ----
  	}
  
  	if (Trace_notify)
! 		elog(DEBUG1, "AtCommit_NotifyBeforeCommit");
  
! 	Assert(backendSendsNotifications == false);
! 	Assert(backendExecutesListen == false);
! 	Assert(backendExecutesUnlisten == false);
  
  	/* Perform any pending listen/unlisten actions */
  	foreach(p, pendingActions)
***************
*** 410,508 ****
  		switch (actrec->action)
  		{
  			case LISTEN_LISTEN:
! 				Exec_Listen(lRel, actrec->condname);
  				break;
  			case LISTEN_UNLISTEN:
! 				Exec_Unlisten(lRel, actrec->condname);
  				break;
  			case LISTEN_UNLISTEN_ALL:
! 				Exec_UnlistenAll(lRel);
  				break;
  		}
- 
- 		/* We must CCI after each action in case of conflicting actions */
- 		CommandCounterIncrement();
  	}
  
! 	/* Perform any pending notifies */
  	if (pendingNotifies)
! 		Send_Notify(lRel);
  
  	/*
! 	 * We do NOT release the lock on pg_listener here; we need to hold it
! 	 * until end of transaction (which is about to happen, anyway) to ensure
! 	 * that notified backends see our tuple updates when they look. Else they
! 	 * might disregard the signal, which would make the application programmer
! 	 * very unhappy.  Also, this prevents race conditions when we have just
! 	 * inserted a listening tuple.
  	 */
! 	heap_close(lRel, NoLock);
  
  	ClearPendingActionsAndNotifies();
  
  	if (Trace_notify)
! 		elog(DEBUG1, "AtCommit_Notify: done");
  }
  
  /*
!  * Exec_Listen --- subroutine for AtCommit_Notify
!  *
!  *		Register the current backend as listening on the specified relation.
   */
! static void
! Exec_Listen(Relation lRel, const char *relname)
  {
! 	HeapScanDesc scan;
! 	HeapTuple	tuple;
! 	Datum		values[Natts_pg_listener];
! 	bool		nulls[Natts_pg_listener];
! 	NameData	condname;
! 	bool		alreadyListener = false;
! 
! 	if (Trace_notify)
! 		elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid);
  
! 	/* Detect whether we are already listening on this relname */
! 	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
! 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
  	{
! 		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
  
! 		if (listener->listenerpid == MyProcPid &&
! 			strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
  		{
! 			alreadyListener = true;
! 			/* No need to scan the rest of the table */
! 			break;
  		}
  	}
! 	heap_endscan(scan);
  
! 	if (alreadyListener)
! 		return;
  
! 	/*
! 	 * OK to insert a new tuple
! 	 */
! 	memset(nulls, false, sizeof(nulls));
  
! 	namestrcpy(&condname, relname);
! 	values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname);
! 	values[Anum_pg_listener_listenerpid - 1] = Int32GetDatum(MyProcPid);
! 	values[Anum_pg_listener_notification - 1] = Int32GetDatum(0);		/* no notifies pending */
  
! 	tuple = heap_form_tuple(RelationGetDescr(lRel), values, nulls);
  
! 	simple_heap_insert(lRel, tuple);
  
! #ifdef NOT_USED					/* currently there are no indexes */
! 	CatalogUpdateIndexes(lRel, tuple);
! #endif
  
! 	heap_freetuple(tuple);
  
  	/*
! 	 * now that we are listening, make sure we will unlisten before dying.
  	 */
  	if (!unlistenExitRegistered)
  	{
--- 706,973 ----
  		switch (actrec->action)
  		{
  			case LISTEN_LISTEN:
! 				Exec_Listen(actrec->condname);
  				break;
  			case LISTEN_UNLISTEN:
! 				Exec_Unlisten(actrec->condname);
  				break;
  			case LISTEN_UNLISTEN_ALL:
! 				Exec_UnlistenAll();
  				break;
  		}
  	}
  
! 	/*
! 	 * Perform any pending notifies.
! 	 */
  	if (pendingNotifies)
! 		Send_Notify();
  
  	/*
! 	 * Grab the AsyncCommitOrderLock to ensure we know the commit order.
! 	 * In case we have only sent notifications and have not executed LISTEN
! 	 * or UNLISTEN, a shared lock is sufficient.
  	 */
! 	if (backendExecutesListen || backendExecutesUnlisten)
! 		LWLockAcquire(AsyncCommitOrderLock, LW_EXCLUSIVE);
! 	else if (backendSendsNotifications)
! 		LWLockAcquire(AsyncCommitOrderLock, LW_SHARED);
! }
! 
! /*
!  * AtCommit_NotifyAfterCommit
!  *
!  *		This is called at transaction commit, after committing to clog.
!  *
!  *		Notify the listening backends.
!  */
! void
! AtCommit_NotifyAfterCommit(void)
! {
! 	QueuePosition	head;
! 	ListCell	   *p, *q;
! 
! 	/* Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
! 	 * return as soon as possible */
! 	if (!pendingActions && !backendSendsNotifications)
! 		return;
! 
! 	if (backendExecutesListen || backendExecutesUnlisten)
! 	{
! 		LWLockAcquire(AsyncQueueLock, LW_SHARED);
! 		head = QUEUE_HEAD;
! 		LWLockRelease(AsyncQueueLock);
! 	}
! 
! 	foreach(p, pendingActions)
! 	{
! 		ListenAction *actrec = (ListenAction *) lfirst(p);
! 
! 		switch (actrec->action)
! 		{
! 			case LISTEN_LISTEN:
! 				Assert(backendExecutesListen);
! 				asyncQueuePhaseInOut(LISTEN_LISTEN, actrec->condname, head);
! 				break;
! 			case LISTEN_UNLISTEN:
! 				Assert(backendExecutesUnlisten);
! 				asyncQueuePhaseInOut(LISTEN_UNLISTEN, actrec->condname, head);
! 				break;
! 			case LISTEN_UNLISTEN_ALL:
! 				Assert(backendExecutesUnlisten);
! 				foreach(q, listenChannels)
! 				{
! 					char *lchan = (char *) lfirst(q);
! 					asyncQueuePhaseInOut(LISTEN_UNLISTEN, lchan, head);
! 				}
! 				break;
! 		}
! 	}
! 
! 	if (backendSendsNotifications)
! 		QUEUE_BACKEND_XID(MyBackendId) = InvalidTransactionId;
! 
! 	if (backendSendsNotifications
! 	 		|| backendExecutesListen
! 			|| backendExecutesUnlisten)
! 	{
! 		Assert(LWLockHeldByMe(AsyncCommitOrderLock));
! 		LWLockRelease(AsyncCommitOrderLock);
! 	}
! 
! 	if (backendSendsNotifications)
! 		SignalBackends();
  
  	ClearPendingActionsAndNotifies();
  
  	if (Trace_notify)
! 		elog(DEBUG1, "AtCommit_NotifyAfterCommit: done");
  }
  
  /*
!  * This function is executed for every notification found in the queue in order
!  * to check if the current backend is listening on that channel. Not sure if we
!  * should further optimize this, for example convert to a sorted array and
!  * allow binary search on it...
   */
! static bool
! IsListeningOn(const char *channel)
  {
! 	ListCell   *p;
  
! 	foreach(p, listenChannels)
  	{
! 		ActionPhaseInOut   *act;
! 		char			   *lchan = (char *) lfirst(p);
! 		bool				vote;
  
! 		if (strcmp(lchan, channel) == 0)
  		{
! 			vote = true;
! 			foreach(p, ActionPhaseInOutList)
! 			{
! 				act = (ActionPhaseInOut *) lfirst(p);
! 				if (strcmp(act->channel, channel) != 0)
! 					continue;
! 				if (act->kind == LISTEN_LISTEN)
! 					vote = true;
! 				if (act->kind == LISTEN_UNLISTEN)
! 					vote = false;
! 			}
! 			return vote;
  		}
  	}
! 	return false;
! }
  
! /*
!  * This is a less strict version of IsListeningOn().
!  *
!  * Think of the following:
!  *
!  * Backend 1:            Backend 2:
!  * LISTEN
!  * COMMIT
!  *                       NOTIFY
!  * UNLISTEN
!  *                       COMMIT
!  * COMMIT
!  *
!  * At the end backend 1 would say IsListeningOn() == false, but
!  * IsInListenChannels() == true.
!  *
!  * IsInListenChannels() is called from ProcessIncomingNotify() to check if
!  * a notification could be of interest.
!  */
! static bool
! IsInListenChannels(const char *channel)
! {
! 	ListCell   *p;
! 	char	   *lchan;
  
! 	foreach(p, listenChannels)
! 	{
! 		lchan = (char *) lfirst(p);
! 		if (strcmp(lchan, channel) == 0)
! 			return true;
! 	}
! 	return false;
! }
! 
! Datum
! pg_listening(PG_FUNCTION_ARGS)
! {
! 	FuncCallContext	   *funcctx;
! 	ListCell		  **lcp;
! 
! 	/* stuff done only on the first call of the function */
! 	if (SRF_IS_FIRSTCALL())
! 	{
! 		MemoryContext	oldcontext;
! 
! 		/* create a function context for cross-call persistence */
! 		funcctx = SRF_FIRSTCALL_INIT();
! 
! 		/*
! 		 * switch to memory context appropriate for multiple function calls
! 		 */
! 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
  
! 		/* allocate memory for user context */
! 		lcp = (ListCell **) palloc(sizeof(ListCell **));
! 		if (listenChannels != NIL)
! 			*lcp = list_head(listenChannels);
! 		else
! 			*lcp = NULL;
! 		funcctx->user_fctx = (void *) lcp;
  
! 		MemoryContextSwitchTo(oldcontext);
! 	}
  
! 	/* stuff done on every call of the function */
! 	funcctx = SRF_PERCALL_SETUP();
! 	lcp = (ListCell **) funcctx->user_fctx;
  
! 	while (*lcp != NULL)
! 	{
! 		char   *channel = (char *) lfirst(*lcp);
  
! 		*lcp = (*lcp)->next;
! 
! 		if (IsListeningOn(channel))
! 			SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
! 	}
! 
! 	SRF_RETURN_DONE(funcctx);
! }
! 
! /*
!  * Exec_Listen --- subroutine for AtCommit_Notify
!  *
!  *		Register the current backend as listening on the specified channel.
!  */
! static void
! Exec_Listen(const char *channel)
! {
! 	MemoryContext oldcontext;
! 
! 	if (Trace_notify)
! 		elog(DEBUG1, "Exec_Listen(%s,%d)", channel, MyProcPid);
! 
! 	/*
! 	 * This line has to be here and not after the IsInListenChannels() call to
! 	 * match the AtCommit_NotifyAfterCommit() checks.
! 	 */
! 	backendExecutesListen = true;
! 
! 	/* Detect whether we are already listening on this channel */
! 	if (IsInListenChannels(channel))
! 		return;
! 
! 	/*
! 	 * OK to insert to the list.
! 	 */
! 	if (listenChannels == NIL)
! 	{
! 		/*
! 		 * This is our first LISTEN, establish our pointer.
! 		 */
! 		LWLockAcquire(AsyncCommitOrderLock, LW_SHARED);
! 		QUEUE_BACKEND_XID(MyBackendId) = InvalidTransactionId;
! 		LWLockRelease(AsyncCommitOrderLock);
! 
! 		LWLockAcquire(AsyncQueueLock, LW_SHARED);
! 		QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
! 		QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
! 		LWLockRelease(AsyncQueueLock);
! 	}
! 
! 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
! 	listenChannels = lappend(listenChannels, pstrdup(channel));
! 	MemoryContextSwitchTo(oldcontext);
  
  	/*
! 	 * Now that we are listening, make sure we will unlisten before dying.
  	 */
  	if (!unlistenExitRegistered)
  	{
***************
*** 514,550 ****
  /*
   * Exec_Unlisten --- subroutine for AtCommit_Notify
   *
!  *		Remove the current backend from the list of listening backends
!  *		for the specified relation.
   */
  static void
! Exec_Unlisten(Relation lRel, const char *relname)
  {
- 	HeapScanDesc scan;
- 	HeapTuple	tuple;
- 
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid);
  
! 	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
! 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! 	{
! 		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
! 
! 		if (listener->listenerpid == MyProcPid &&
! 			strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
! 		{
! 			/* Found the matching tuple, delete it */
! 			simple_heap_delete(lRel, &tuple->t_self);
! 
! 			/*
! 			 * We assume there can be only one match, so no need to scan the
! 			 * rest of the table
! 			 */
! 			break;
! 		}
! 	}
! 	heap_endscan(scan);
  
  	/*
  	 * We do not complain about unlistening something not being listened;
--- 979,993 ----
  /*
   * Exec_Unlisten --- subroutine for AtCommit_Notify
   *
!  *		Remove a specified channel from "listenChannel".
   */
  static void
! Exec_Unlisten(const char *channel)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_Unlisten(%s,%d)", channel, MyProcPid);
  
! 	backendExecutesUnlisten = true;
  
  	/*
  	 * We do not complain about unlistening something not being listened;
***************
*** 555,690 ****
  /*
   * Exec_UnlistenAll --- subroutine for AtCommit_Notify
   *
!  *		Update pg_listener to unlisten all relations for this backend.
   */
  static void
! Exec_UnlistenAll(Relation lRel)
  {
! 	HeapScanDesc scan;
! 	HeapTuple	lTuple;
! 	ScanKeyData key[1];
  
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_UnlistenAll");
  
! 	/* Find and delete all entries with my listenerPID */
! 	ScanKeyInit(&key[0],
! 				Anum_pg_listener_listenerpid,
! 				BTEqualStrategyNumber, F_INT4EQ,
! 				Int32GetDatum(MyProcPid));
! 	scan = heap_beginscan(lRel, SnapshotNow, 1, key);
  
! 	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! 		simple_heap_delete(lRel, &lTuple->t_self);
  
! 	heap_endscan(scan);
  }
  
  /*
!  * Send_Notify --- subroutine for AtCommit_Notify
!  *
!  *		Scan pg_listener for tuples matching our pending notifies, and
!  *		either signal the other backend or send a message to our own frontend.
   */
  static void
! Send_Notify(Relation lRel)
  {
! 	TupleDesc	tdesc = RelationGetDescr(lRel);
! 	HeapScanDesc scan;
! 	HeapTuple	lTuple,
! 				rTuple;
! 	Datum		value[Natts_pg_listener];
! 	bool		repl[Natts_pg_listener],
! 				nulls[Natts_pg_listener];
! 
! 	/* preset data to update notify column to MyProcPid */
! 	memset(nulls, false, sizeof(nulls));
! 	memset(repl, false, sizeof(repl));
! 	repl[Anum_pg_listener_notification - 1] = true;
! 	memset(value, 0, sizeof(value));
! 	value[Anum_pg_listener_notification - 1] = Int32GetDatum(MyProcPid);
! 
! 	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
! 
! 	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! 	{
! 		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
! 		char	   *relname = NameStr(listener->relname);
! 		int32		listenerPID = listener->listenerpid;
  
! 		if (!AsyncExistsPendingNotify(relname))
! 			continue;
  
! 		if (listenerPID == MyProcPid)
  		{
! 			/*
! 			 * Self-notify: no need to bother with table update. Indeed, we
! 			 * *must not* clear the notification field in this path, or we
! 			 * could lose an outside notify, which'd be bad for applications
! 			 * that ignore self-notify messages.
! 			 */
! 			if (Trace_notify)
! 				elog(DEBUG1, "AtCommit_Notify: notifying self");
  
! 			NotifyMyFrontEnd(relname, listenerPID);
  		}
  		else
  		{
- 			if (Trace_notify)
- 				elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
- 					 listenerPID);
- 
  			/*
! 			 * If someone has already notified this listener, we don't bother
! 			 * modifying the table, but we do still send a NOTIFY_INTERRUPT
! 			 * signal, just in case that backend missed the earlier signal for
! 			 * some reason.  It's OK to send the signal first, because the
! 			 * other guy can't read pg_listener until we unlock it.
! 			 *
! 			 * Note: we don't have the other guy's BackendId available, so
! 			 * this will incur a search of the ProcSignal table.  That's
! 			 * probably not worth worrying about.
  			 */
! 			if (SendProcSignal(listenerPID, PROCSIG_NOTIFY_INTERRUPT,
! 							   InvalidBackendId) < 0)
  			{
! 				/*
! 				 * Get rid of pg_listener entry if it refers to a PID that no
! 				 * longer exists.  Presumably, that backend crashed without
! 				 * deleting its pg_listener entries. This code used to only
! 				 * delete the entry if errno==ESRCH, but as far as I can see
! 				 * we should just do it for any failure (certainly at least
! 				 * for EPERM too...)
! 				 */
! 				simple_heap_delete(lRel, &lTuple->t_self);
  			}
! 			else if (listener->notification == 0)
  			{
! 				/* Rewrite the tuple with my PID in notification column */
! 				rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
! 				simple_heap_update(lRel, &lTuple->t_self, rTuple);
! 
! #ifdef NOT_USED					/* currently there are no indexes */
! 				CatalogUpdateIndexes(lRel, rTuple);
! #endif
  			}
  		}
  	}
  
! 	heap_endscan(scan);
  }
  
  /*
   * AtAbort_Notify
   *
!  *		This is called at transaction abort.
   *
!  *		Gets rid of pending actions and outbound notifies that we would have
!  *		executed if the transaction got committed.
   */
  void
  AtAbort_Notify(void)
  {
  	ClearPendingActionsAndNotifies();
  }
  
--- 998,1390 ----
  /*
   * Exec_UnlistenAll --- subroutine for AtCommit_Notify
   *
!  *		Unlisten on all channels for this backend.
   */
  static void
! Exec_UnlistenAll(void)
  {
! 	ListCell   *p;
  
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_UnlistenAll(%d)", MyProcPid);
  
! 	foreach(p, listenChannels)
! 	{
! 		char *lchan = (char *) lfirst(p);
! 		Exec_Unlisten(lchan);
! 	}
! }
  
! static bool
! asyncQueueIsFull()
! {
! 	QueuePosition	lookahead = QUEUE_HEAD;
! 	Size			remain = QUEUE_PAGESIZE - QUEUE_POS_OFFSET(lookahead) - 1;
! 	Size			advance = Min(remain, NOTIFY_PAYLOAD_MAX_LENGTH);
  
! 	/*
! 	 * Check what happens if we wrote a maximally sized entry. Would we go to a
! 	 * new page? If not, then our queue can not be full (because we can still
! 	 * fill at least the current page with at least one more entry).
! 	 */
! 	if (!asyncQueueAdvance(&lookahead, advance))
! 		return false;
! 
! 	/*
! 	 * The queue is full if with a switch to a new page we reach the page
! 	 * of the tail pointer.
! 	 */
! 	return QUEUE_POS_PAGE(lookahead) == QUEUE_POS_PAGE(QUEUE_TAIL);
  }
  
  /*
!  * The function advances the position to the next entry. In case we jump to
!  * a new page the function returns true, else false.
   */
+ static bool
+ asyncQueueAdvance(QueuePosition *position, int entryLength)
+ {
+ 	int		pageno = QUEUE_POS_PAGE(*position);
+ 	int		offset = QUEUE_POS_OFFSET(*position);
+ 	bool	pageJump = false;
+ 
+ 	/*
+ 	 * Move to the next writing position: First jump over what we have just
+ 	 * written or read.
+ 	 */
+ 	offset += entryLength;
+ 	Assert(offset < QUEUE_PAGESIZE);
+ 
+ 	/*
+ 	 * In a second step check if another entry can be written to the page. If
+ 	 * it does, stay here, we have reached the next position. If not, then we
+ 	 * need to move on to the next page.
+ 	 */
+ 	if (offset + AsyncQueueEntryEmptySize >= QUEUE_PAGESIZE)
+ 	{
+ 		pageno++;
+ 		if (pageno > QUEUE_MAX_PAGE)
+ 			/* wrap around */
+ 			pageno = 0;
+ 		offset = 0;
+ 		pageJump = true;
+ 	}
+ 
+ 	SET_QUEUE_POS(*position, pageno, offset);
+ 	return pageJump;
+ }
+ 
+ static void
+ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
+ {
+ 		Assert(n->channel != NULL);
+ 		Assert(n->payload != NULL);
+ 		Assert(strlen(n->payload) <= NOTIFY_PAYLOAD_MAX_LENGTH);
+ 
+ 		/* The terminator is already included in AsyncQueueEntryEmptySize */
+ 		qe->length = AsyncQueueEntryEmptySize + strlen(n->payload);
+ 		qe->srcPid = MyProcPid;
+ 		qe->dboid = MyDatabaseId;
+ 		qe->xid = GetCurrentTransactionId();
+ 		strcpy(qe->channel, n->channel);
+ 		strcpy(qe->payload, n->payload);
+ }
+ 
  static void
! asyncQueueEntryToNotification(AsyncQueueEntry *qe, Notification *n,
! 							  QueuePosition pos)
  {
! 	n->channel = pstrdup(qe->channel);
! 	n->payload = pstrdup(qe->payload);
! 	n->srcPid = qe->srcPid;
! 	n->xid = qe->xid;
! 	n->position = pos;
! }
! 
! static List *
! asyncQueueAddEntries(List *notifications)
! {
! 	AsyncQueueEntry	qe;
! 	int				pageno;
! 	int				offset;
! 	int				slotno;
  
! 	/*
! 	 * Note that we are holding exclusive AsyncQueueLock already.
! 	 */
! 	LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
! 	pageno = QUEUE_POS_PAGE(QUEUE_HEAD);
! 	slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
! 	AsyncCtl->shared->page_dirty[slotno] = true;
! 
! 	do
! 	{
! 		Notification   *n;
  
! 		if (asyncQueueIsFull())
  		{
! 			/* document that we will not go into the if command further down */
! 			Assert(QUEUE_POS_OFFSET(QUEUE_HEAD) != 0);
! 			break;
! 		}
! 
! 		n = (Notification *) linitial(notifications);
  
! 		asyncQueueNotificationToEntry(n, &qe);
! 
! 		offset = QUEUE_POS_OFFSET(QUEUE_HEAD);
! 		/*
! 		 * Check whether or not the entry still fits on the current page.
! 		 */
! 		if (offset + qe.length < QUEUE_PAGESIZE)
! 		{
! 			notifications = list_delete_first(notifications);
  		}
  		else
  		{
  			/*
! 			 * Write a dummy entry to fill up the page. Actually readers will
! 			 * only check dboid and since it won't match any reader's database
! 			 * oid, they will ignore this entry and move on.
  			 */
! 			qe.length = QUEUE_PAGESIZE - offset - 1;
! 			qe.dboid = InvalidOid;
! 			qe.channel[0] = '\0';
! 			qe.payload[0] = '\0';
! 			qe.xid = InvalidTransactionId;
! 		}
! 		memcpy((char*) AsyncCtl->shared->page_buffer[slotno] + offset,
! 			   &qe, qe.length);
! 
! 	} while (!asyncQueueAdvance(&(QUEUE_HEAD), qe.length)
! 			 && notifications != NIL);
! 
! 	if (QUEUE_POS_OFFSET(QUEUE_HEAD) == 0)
! 	{
! 		/*
! 		 * If the next entry needs to go to a new page, prepare that page
! 		 * already.
! 		 */
! 		slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
! 		AsyncCtl->shared->page_dirty[slotno] = true;
! 	}
! 	LWLockRelease(AsyncCtlLock);
! 
! 	return notifications;
! }
! 
! static void
! asyncQueueFillWarning()
! {
! 	/*
! 	 * Caller must hold exclusive AsyncQueueLock.
! 	 */
! 	TimestampTz		t;
! 	double			fillDegree;
! 	int				occupied;
! 	int				tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
! 	int				headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
! 
! 	occupied = headPage - tailPage;
! 
! 	if (occupied == 0)
! 		return;
! 	
! 	if (!asyncQueuePagePrecedesPhysically(tailPage, headPage))
! 		/* head has wrapped around, tail not yet */
! 		occupied += QUEUE_MAX_PAGE;
! 
! 	fillDegree = (float) occupied / (float) QUEUE_MAX_PAGE;
! 
! 	if (fillDegree < 0.5)
! 		return;
! 
! 	t = GetCurrentTimestamp();
! 
! 	if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
! 								   t, QUEUE_FULL_WARN_INTERVAL))
! 	{
! 		QueuePosition	min = QUEUE_HEAD;
! 		int32			minPid = InvalidPid;
! 		int				i;
! 
! 		for (i = 0; i < MaxBackends; i++)
! 			if (QUEUE_BACKEND_PID(i) != InvalidPid)
  			{
! 				min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i), QUEUE_HEAD);
! 				if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
! 					minPid = QUEUE_BACKEND_PID(i);
  			}
! 
! 		if (fillDegree < 0.75)
! 			ereport(WARNING, (errmsg("pg_notify queue is more than 50%% full. "
! 								 "Among the slowest backends: %d", minPid)));
! 		else
! 			ereport(WARNING, (errmsg("pg_notify queue is more than 75%% full. "
! 								 "Among the slowest backends: %d", minPid)));
! 
! 		asyncQueueControl->lastQueueFillWarn = t;
! 	}
! }
! 
! static List *
! asyncQueueSendingXids(void)
! {
! 	/* Caller must hold exclusive lock on AsyncCommitOrderLock */
! 	int		i;
! 	List   *xidList = NIL;
! 
! 	for (i = 0; i < MaxBackends; i++)
! 		if (QUEUE_BACKEND_XID(i) != InvalidTransactionId)
! 			xidList = lappend_int(xidList, QUEUE_BACKEND_XID(i));
! 
! 	return xidList;
! }
! 
! static void
! asyncQueuePhaseInOut(ListenActionKind kind, const char *channel,
! 					 QueuePosition limitPos)
! {
! 	/* Caller must hold exclusive lock on AsyncCommitOrderLock (for
! 	 * asyncQueueSendingXids()) */
! 	MemoryContext 		oldcontext;
! 	ActionPhaseInOut   *entry;
! 
! 	Assert(kind == LISTEN_LISTEN || kind == LISTEN_UNLISTEN);
! 
! 	if (kind == LISTEN_UNLISTEN && !IsListeningOn(channel))
! 		return;
! 	/* we cannot take the same shortcut for LISTEN_LISTEN and return if we are
! 	 * listening already. The reason is that we add the channel for every new
! 	 * LISTEN into the list of channels in Exec_Listen() and here we cannot
! 	 * tell anymore if there is already another previous LISTEN. */
! 
! 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
! 
! 	entry = (ActionPhaseInOut *) palloc(sizeof(ActionPhaseInOut));
! 	entry->channel = pstrdup(channel);
! 	entry->xids = asyncQueueSendingXids();
! 	entry->kind = kind;
! 	entry->limitPos = limitPos;
! 
! 	ActionPhaseInOutList = lappend(ActionPhaseInOutList, entry);
! 
! 	MemoryContextSwitchTo(oldcontext);
! }
! 
! 
! /*
!  * Send_Notify --- subroutine for AtCommit_Notify
!  *
!  * Add the pending notifications to the queue and signal the listening
!  * backends.
!  *
!  * A full queue is very uncommon and should really not happen, given that we
!  * have so much space available in our slru pages. Nevertheless we need to
!  * deal with this possibility. Note that when we get here we are in the process
!  * of committing our transaction, we have not yet committed to clog but this
!  * would be the next step. So at this point in time we can still roll the
!  * transaction back.
!  */
! static void
! Send_Notify()
! {
! 	Assert(pendingNotifies != NIL);
! 
! 	backendSendsNotifications = true;
! 
! 	LWLockAcquire(AsyncCommitOrderLock, LW_SHARED);
! 	QUEUE_BACKEND_XID(MyBackendId) = GetCurrentTransactionId();
! 	LWLockRelease(AsyncCommitOrderLock);
! 
! 	while (pendingNotifies != NIL)
! 	{
! 		LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
! 		asyncQueueFillWarning();
! 		if (asyncQueueIsFull())
! 			ereport(ERROR,
! 					(errcode(ERRCODE_TOO_MANY_ENTRIES),
! 					errmsg("Too many notifications in the queue")));
! 		pendingNotifies = asyncQueueAddEntries(pendingNotifies);
! 		LWLockRelease(AsyncQueueLock);
! 	}
! }
! 
! /*
!  * Send signals to all listening backends. Since we have EXCLUSIVE lock anyway
!  * we also check the position of the other backends and in case that it is
!  * already up-to-date we don't signal it.
!  *
!  * Since we know the BackendId and the Pid the signalling is quite cheap.
!  */
! static void
! SignalBackends(void)
! {
! 	QueuePosition	pos;
! 	ListCell	   *p1, *p2;
! 	int				i;
! 	int32			pid;
! 	List		   *pids = NIL;
! 	List		   *ids = NIL;
! 	int				count = 0;
! 
! 	/* Signal everybody who is LISTENing to any channel. */
! 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
! 	for (i = 0; i < MaxBackends; i++)
! 	{
! 		pid = QUEUE_BACKEND_PID(i);
! 		if (pid != InvalidPid)
! 		{
! 			count++;
! 			pos = QUEUE_BACKEND_POS(i);
! 			if (!QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
  			{
! 				pids = lappend_int(pids, pid);
! 				ids = lappend_int(ids, i);
  			}
  		}
  	}
+ 	LWLockRelease(AsyncQueueLock);
+ 
+ 	forboth(p1, pids, p2, ids)
+ 	{
+ 		pid = (int32) lfirst_int(p1);
+ 		i = lfirst_int(p2);
+ 		/*
+ 		 * Should we check for failure? Can it happen that a backend
+ 		 * has crashed without the postmaster starting over?
+ 		 */
+ 		if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, i) < 0)
+ 			elog(WARNING, "Error signalling backend %d", pid);
+ 	}
  
! 	if (count == 0)
! 	{
! 		/* No backend is listening at all, signal myself so we can clean up
! 		 * the queue. */
! 		SendProcSignal(MyProcPid, PROCSIG_NOTIFY_INTERRUPT, MyBackendId);
! 	}
  }
  
  /*
   * AtAbort_Notify
   *
!  *	This is called at transaction abort.
   *
!  *	Gets rid of pending actions and outbound notifies that we would have
!  *	executed if the transaction got committed.
!  *
!  *	Even though we have not committed, we need to signal the listening backends
!  *	because our notifications might block readers from processing the queue.
!  *	Now that the transaction has aborted, they can go on and skip our
!  *	notifications.
   */
  void
  AtAbort_Notify(void)
  {
+ 	if (backendSendsNotifications)
+ 		SignalBackends();
+ 
  	ClearPendingActionsAndNotifies();
  }
  
***************
*** 940,968 ****
  }
  
  /*
   * ProcessIncomingNotify
   *
   *		Deal with arriving NOTIFYs from other backends.
   *		This is called either directly from the PROCSIG_NOTIFY_INTERRUPT
   *		signal handler, or the next time control reaches the outer idle loop.
!  *		Scan pg_listener for arriving notifies, report them to my front end,
!  *		and clear the notification field in pg_listener until next time.
   *
!  *		NOTE: since we are outside any transaction, we must create our own.
   */
  static void
  ProcessIncomingNotify(void)
  {
! 	Relation	lRel;
! 	TupleDesc	tdesc;
! 	ScanKeyData key[1];
! 	HeapScanDesc scan;
! 	HeapTuple	lTuple,
! 				rTuple;
! 	Datum		value[Natts_pg_listener];
! 	bool		repl[Natts_pg_listener],
! 				nulls[Natts_pg_listener];
! 	bool		catchup_enabled;
  
  	/* Must prevent catchup interrupt while I am running */
  	catchup_enabled = DisableCatchupInterrupt();
--- 1640,2005 ----
  }
  
  /*
+  * This function will ask for a page with ReadOnly access and once we have the
+  * lock, we read the whole content and pass back the list of notifications
+  * that the calling function will deliver then. The list will contain all
+  * notifications from transactions that have already committed.
+  *
+  * We stop if we have either reached the stop position or go to a new page.
+  *
+  * The function returns true once we have reached the end or a notification of
+  * a transaction that is still running and false if we have just finished with
+  * the page.
+  */
+ static bool
+ asyncQueueGetEntriesByPage(QueuePosition *current,
+ 						   QueuePosition stop,
+ 						   List **notifications)
+ {
+ 	AsyncQueueEntry	qe;
+ 	Notification   *n;
+ 	int				slotno;
+ 	bool			reachedStop = false;
+ 
+ 	if (QUEUE_POS_EQUAL(*current, stop))
+ 		return true;
+ 
+ 	slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, current->page,
+ 										InvalidTransactionId);
+ 	do {
+ 		char *readPtr = (char *) (AsyncCtl->shared->page_buffer[slotno]);
+ 		readPtr += current->offset;
+ 
+ 		if (QUEUE_POS_EQUAL(*current, stop))
+ 		{
+ 			reachedStop = true;
+ 			break;
+ 		}
+ 
+ 		memcpy(&qe, readPtr, AsyncQueueEntryEmptySize);
+ 
+ 		if (qe.dboid == MyDatabaseId)
+ 		{
+ 			if (TransactionIdDidCommit(qe.xid))
+ 			{
+ 				if (IsInListenChannels(qe.channel))
+ 				{
+ 					if (qe.length > AsyncQueueEntryEmptySize)
+ 						memcpy(&qe, readPtr, qe.length);
+ 					n = (Notification *) palloc(sizeof(Notification));
+ 					asyncQueueEntryToNotification(&qe, n, *current);
+ 					*notifications = lappend(*notifications, n);
+ 				}
+ 			}
+ 			else
+ 			{
+ 				if (!TransactionIdDidAbort(qe.xid))
+ 				{
+ 					/*
+ 					 * The transaction has neither committed nor aborted so
+ 					 * far.
+ 					 */
+ 					reachedStop = true;
+ 					break;
+ 				}
+ 			}
+ 		}
+ 		/*
+ 		 * The call to asyncQueueAdvance just jumps over what we have
+ 		 * just read. If there is no more space for the next record on the
+ 		 * current page, it will also switch to the beginning of the next page.
+ 		 */
+ 	} while(!asyncQueueAdvance(current, qe.length));
+ 
+ 	LWLockRelease(AsyncCtlLock);
+ 
+ 	if (QUEUE_POS_EQUAL(*current, stop))
+ 		reachedStop = true;
+ 
+ 	return reachedStop;
+ }
+ 
+ static bool
+ asyncQueueCheckDelivery(Notification *n, QueuePosition head)
+ {
+ 	ActionPhaseInOut   *act;
+ 	ListCell		   *lc;
+ 	bool				vote = true;
+ 
+ 	foreach(lc, ActionPhaseInOutList)
+ 	{
+ 		act = (ActionPhaseInOut *) lfirst(lc);
+ 
+ 		if (strcmp(act->channel, n->channel) != 0)
+ 			continue;
+ 
+ 		if (act->kind == LISTEN_LISTEN)
+ 		{
+ 			if (QUEUE_POS_LT(n->position, act->limitPos, head))
+ 			{
+ 				/*
+ 				 * When LISTEN committed, n->xid was still running. As n->xid
+ 				 * has committed by now, we need to deliver its notification.
+ 				 *
+ 				 * If n->xid was not running then it is a committed transaction
+ 				 * and we must not deliver notifications from already committed
+ 				 * transactions.
+ 				 */
+ 				if (list_member_int(act->xids, n->xid))
+ 					vote = true;
+ 				else
+ 					vote = false;
+ 			}
+ 			else
+ 				/*
+ 				 * n->xid committed, when LISTEN was already fully established
+ 				 */
+ 				vote = true;
+ 		}
+ 		else
+ 		{
+ 			Assert(act->kind == LISTEN_UNLISTEN);
+ 			if (QUEUE_POS_LT(n->position, act->limitPos, head))
+ 			{
+ 				/*
+ 				 * When UNLISTEN committed, n->xid was still running. As n->xid
+ 				 * has committed by now, we must not deliver its notification.
+ 				 *
+ 				 * If n->xid was already committed, we need to deliver its
+ 				 * notification (because we assume that there has been a LISTEN
+ 				 * previously).
+ 				 */
+ 				if (list_member_int(act->xids, n->xid))
+ 					vote = false;
+ 				else
+ 					vote = true;
+ 			}
+ 			else
+ 				/*
+ 				 * n->xid committed, when UNLISTEN was already fully
+ 				 * established.
+ 				 */
+ 				vote = false;
+ 		}
+ 	}
+ 
+ 	return vote;
+ }
+ 
+ static void
+ asyncQueueCleanUpPhaseInOut(QueuePosition pos, QueuePosition head)
+ {
+ 	ActionPhaseInOut   *act;
+ 	ListCell		   *p, *q;
+ 	List			   *deletedChannels = NIL;
+ 
+ 	while ((p = list_head(ActionPhaseInOutList)))
+ 	{
+ 		act = (ActionPhaseInOut *) lfirst(p);
+ 		if (QUEUE_POS_LE(act->limitPos, pos, head))
+ 		{
+ 			list_free(act->xids);
+ 			if (act->kind == LISTEN_LISTEN)
+ 			{
+ 				pfree(act->channel);
+ 			}
+ 			else
+ 			{
+ 				Assert(act->kind == LISTEN_UNLISTEN);
+ 				/* do not free act->channel, we reuse it... */
+ 				deletedChannels = lappend(deletedChannels, act->channel);
+ 			}
+ 			ActionPhaseInOutList =
+ 							list_delete_cell(ActionPhaseInOutList, p, NULL);
+ 			
+ 			if (ActionPhaseInOutList == NIL)
+ 				break;
+ 		}
+ 		else
+ 		{
+ 			/* the entries are ordered by limitPos, if we don't have a hit
+ 			 * now we won't have a further hit later on either */
+ 			break;
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * Check for every channel that we want to delete from listenChannels if
+ 	 * it is still in use by a subsequently issued LISTEN. There clearly is
+ 	 * room for improving the performance of this check but we expect the lists
+ 	 * to be really short anyway...
+ 	 */
+ 	foreach(p, deletedChannels)
+ 	{
+ 		char   *candidate = (char *) lfirst(p);
+ 		bool	found = false;
+ 		foreach(q, ActionPhaseInOutList)
+ 		{
+ 			act = (ActionPhaseInOut *) lfirst(q);
+ 			if (strcmp(candidate, act->channel) == 0)
+ 			{
+ 				found = true;
+ 				break;
+ 			}
+ 		}
+ 		if (found == false)
+ 		{
+ 			ListCell *prev = NULL;
+ 			foreach(q, listenChannels)
+ 			{
+ 				char *lchan = (char *) lfirst(q);
+ 				if (strcmp(lchan, candidate) == 0)
+ 				{
+ 					pfree(lchan);
+ 					listenChannels = list_delete_cell(listenChannels, q, prev);
+ 					Assert(!IsInListenChannels(lchan));
+ 					break;
+ 				}
+ 				prev = q;
+ 			}
+ 		}
+ 	}
+ 
+ 	if (listenChannels == NIL && ActionPhaseInOutList == NIL)
+ 	{
+ 		bool advanceTail = false;
+ 
+ 		LWLockAcquire(AsyncQueueLock, LW_SHARED);
+ 		QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
+ 		Assert(QUEUE_BACKEND_XID(MyBackendId) == InvalidTransactionId);
+ 		if (QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL))
+ 			advanceTail = true;
+ 		LWLockRelease(AsyncQueueLock);
+ 
+ 		if (advanceTail)
+ 			/* Move forward the tail pointer and try to truncate. */
+ 			asyncQueueAdvanceTail();
+ 	}
+ }
+ 
+ static void
+ asyncQueueReadAllNotifications(void)
+ {
+ 	QueuePosition	pos;
+ 	QueuePosition	oldpos;
+ 	QueuePosition	head;
+ 	List		   *notifications;
+ 	ListCell	   *lc;
+ 	Notification   *n;
+ 	bool			advanceTail = false;
+ 	bool			reachedStop;
+ 
+ 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
+ 	pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
+ 	head = QUEUE_HEAD;
+ 	/*
+ 	 * We could have signalled ourselves because nobody was listening when we
+  	 * sent out notifications.
+  	 */
+ 	if (QUEUE_BACKEND_PID(MyBackendId) == InvalidPid)
+ 		advanceTail = true;
+ 	LWLockRelease(AsyncQueueLock);
+ 
+ 	if (advanceTail)
+ 	{
+ 		asyncQueueAdvanceTail();
+ 		return;
+ 	}
+ 
+ 	/* Nothing to do, we have read all notifications already. */
+ 	if (QUEUE_POS_EQUAL(pos, head))
+ 		return;
+ 
+ 	do 
+ 	{
+ 		/*
+ 		 * Our stop position is what we found to be the head's position when
+ 		 * we entered this function. It might have changed already. But if it
+ 		 * has, we will receive (or have already received and queued) another
+ 		 * signal and come here again.
+ 		 *
+ 		 * We are not holding AsyncQueueLock here! The queue can only extend
+ 		 * beyond the head pointer (see above) and we leave our backend's
+ 		 * pointer where it is so nobody will truncate or rewrite pages under
+ 		 * us.
+ 		 */
+ 		reachedStop = false;
+ 
+ 		notifications = NIL;
+ 		reachedStop = asyncQueueGetEntriesByPage(&pos, head, &notifications);
+ 
+ 		foreach(lc, notifications)
+ 		{
+ 			n = (Notification *) lfirst(lc);
+ 			if (asyncQueueCheckDelivery(n, head))
+ 				NotifyMyFrontEnd(n->channel, n->payload, n->srcPid);
+ 		}
+ 		asyncQueueCleanUpPhaseInOut(pos, head);
+ 	} while (!reachedStop);
+ 
+ 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
+ 	QUEUE_BACKEND_POS(MyBackendId) = pos;
+ 	if (QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL))
+ 		advanceTail = true;
+ 	LWLockRelease(AsyncQueueLock);
+ 
+ 	if (advanceTail)
+ 		/* Move forward the tail pointer and try to truncate. */
+ 		asyncQueueAdvanceTail();
+ }
+ 
+ static void
+ asyncQueueAdvanceTail()
+ {
+ 	QueuePosition	min;
+ 	int				i;
+ 	int				tailp;
+ 	int				headp;
+ 
+ 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+ 	min = QUEUE_HEAD;
+ 	for (i = 0; i < MaxBackends; i++)
+ 		if (QUEUE_BACKEND_PID(i) != InvalidPid)
+ 			min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i), QUEUE_HEAD);
+ 
+ 	tailp = QUEUE_POS_PAGE(QUEUE_TAIL);
+ 	headp = QUEUE_POS_PAGE(QUEUE_HEAD);
+ 	QUEUE_TAIL = min;
+ 	LWLockRelease(AsyncQueueLock);
+ 
+ 	/* This is our wraparound check */
+ 	if ((asyncQueuePagePrecedesLogically(tailp, QUEUE_POS_PAGE(min), headp)
+ 			&& asyncQueuePagePrecedesPhysically(tailp, headp))
+ 		|| tailp == QUEUE_POS_PAGE(min))
+ 	{
+ 		/*
+ 		 * SimpleLruTruncate() will ask for AsyncCtlLock but will also
+ 		 * release the lock again.
+ 		 *
+ 		 * XXX this could be optimized, to call SimpleLruTruncate only when we
+ 		 * know we can truncate something.
+ 		 */
+ 		SimpleLruTruncate(AsyncCtl, QUEUE_POS_PAGE(min));
+ 	}
+ }
+ 
+ /*
   * ProcessIncomingNotify
   *
   *		Deal with arriving NOTIFYs from other backends.
   *		This is called either directly from the PROCSIG_NOTIFY_INTERRUPT
   *		signal handler, or the next time control reaches the outer idle loop.
!  *		Scan the queue for arriving notifications and report them to my front
!  *		end.
   *
!  *		NOTE: we are outside of any transaction here.
   */
  static void
  ProcessIncomingNotify(void)
  {
! 	bool			catchup_enabled;
! 
! 	Assert(GetCurrentTransactionIdIfAny() == InvalidTransactionId);
  
  	/* Must prevent catchup interrupt while I am running */
  	catchup_enabled = DisableCatchupInterrupt();
***************
*** 974,1037 ****
  
  	notifyInterruptOccurred = 0;
  
! 	StartTransactionCommand();
! 
! 	lRel = heap_open(ListenerRelationId, ExclusiveLock);
! 	tdesc = RelationGetDescr(lRel);
! 
! 	/* Scan only entries with my listenerPID */
! 	ScanKeyInit(&key[0],
! 				Anum_pg_listener_listenerpid,
! 				BTEqualStrategyNumber, F_INT4EQ,
! 				Int32GetDatum(MyProcPid));
! 	scan = heap_beginscan(lRel, SnapshotNow, 1, key);
! 
! 	/* Prepare data for rewriting 0 into notification field */
! 	memset(nulls, false, sizeof(nulls));
! 	memset(repl, false, sizeof(repl));
! 	repl[Anum_pg_listener_notification - 1] = true;
! 	memset(value, 0, sizeof(value));
! 	value[Anum_pg_listener_notification - 1] = Int32GetDatum(0);
! 
! 	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! 	{
! 		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
! 		char	   *relname = NameStr(listener->relname);
! 		int32		sourcePID = listener->notification;
! 
! 		if (sourcePID != 0)
! 		{
! 			/* Notify the frontend */
! 
! 			if (Trace_notify)
! 				elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
! 					 relname, (int) sourcePID);
! 
! 			NotifyMyFrontEnd(relname, sourcePID);
! 
! 			/*
! 			 * Rewrite the tuple with 0 in notification column.
! 			 */
! 			rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
! 			simple_heap_update(lRel, &lTuple->t_self, rTuple);
! 
! #ifdef NOT_USED					/* currently there are no indexes */
! 			CatalogUpdateIndexes(lRel, rTuple);
! #endif
! 		}
! 	}
! 	heap_endscan(scan);
! 
! 	/*
! 	 * We do NOT release the lock on pg_listener here; we need to hold it
! 	 * until end of transaction (which is about to happen, anyway) to ensure
! 	 * that other backends see our tuple updates when they look. Otherwise, a
! 	 * transaction started after this one might mistakenly think it doesn't
! 	 * need to send this backend a new NOTIFY.
! 	 */
! 	heap_close(lRel, NoLock);
! 
! 	CommitTransactionCommand();
  
  	/*
  	 * Must flush the notify messages to ensure frontend gets them promptly.
--- 2011,2017 ----
  
  	notifyInterruptOccurred = 0;
  
! 	asyncQueueReadAllNotifications();
  
  	/*
  	 * Must flush the notify messages to ensure frontend gets them promptly.
***************
*** 1051,1070 ****
   * Send NOTIFY message to my front end.
   */
  static void
! NotifyMyFrontEnd(char *relname, int32 listenerPID)
  {
  	if (whereToSendOutput == DestRemote)
  	{
  		StringInfoData buf;
  
  		pq_beginmessage(&buf, 'A');
! 		pq_sendint(&buf, listenerPID, sizeof(int32));
! 		pq_sendstring(&buf, relname);
  		if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
! 		{
! 			/* XXX Add parameter string here later */
! 			pq_sendstring(&buf, "");
! 		}
  		pq_endmessage(&buf);
  
  		/*
--- 2031,2047 ----
   * Send NOTIFY message to my front end.
   */
  static void
! NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
  {
  	if (whereToSendOutput == DestRemote)
  	{
  		StringInfoData buf;
  
  		pq_beginmessage(&buf, 'A');
! 		pq_sendint(&buf, srcPid, sizeof(int32));
! 		pq_sendstring(&buf, channel);
  		if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
! 			pq_sendstring(&buf, payload);
  		pq_endmessage(&buf);
  
  		/*
***************
*** 1074,1096 ****
  		 */
  	}
  	else
! 		elog(INFO, "NOTIFY for %s", relname);
  }
  
! /* Does pendingNotifies include the given relname? */
  static bool
! AsyncExistsPendingNotify(const char *relname)
  {
  	ListCell   *p;
  
! 	foreach(p, pendingNotifies)
! 	{
! 		const char *prelname = (const char *) lfirst(p);
  
! 		if (strcmp(prelname, relname) == 0)
  			return true;
  	}
  
  	return false;
  }
  
--- 2051,2107 ----
  		 */
  	}
  	else
! 		elog(INFO, "NOTIFY for %s", channel);
  }
  
! /* Does pendingNotifies include the given channel/payload? */
  static bool
! AsyncExistsPendingNotify(const char *channel, const char *payload)
  {
  	ListCell   *p;
+ 	Notification *n;
  
! 	if (pendingNotifies == NIL)
! 		return false;
! 
! 	if (payload == NULL)
! 		payload = "";
! 
! 	/*
! 	 * We need to append new elements to the end of the list in order to keep
! 	 * the order. However, on the other hand we'd like to check the list
! 	 * backwards in order to make duplicate-elimination a tad faster when the
! 	 * same condition is signaled many times in a row. So as a compromise we
! 	 * check the tail element first which we can access directly. If this
! 	 * doesn't match, we check the rest of whole list.
! 	 */
  
! 	n = (Notification *) llast(pendingNotifies);
! 	if (strcmp(n->channel, channel) == 0)
! 	{
! 		Assert(n->payload != NULL);
! 		if (strcmp(n->payload, payload) == 0)
  			return true;
  	}
  
+ 	/*
+ 	 * Note the difference to foreach(). We stop if p is the last element
+ 	 * already. So we don't check the last element, we have checked it already.
+  	 */
+ 	for(p = list_head(pendingNotifies);
+ 		p != list_tail(pendingNotifies);
+ 		p = lnext(p))
+ 	{
+ 		n = (Notification *) lfirst(p);
+ 
+ 		if (strcmp(n->channel, channel) == 0)
+ 		{
+ 			Assert(n->payload != NULL);
+ 			if (strcmp(n->payload, payload) == 0)
+ 				return true;
+ 		}
+ 	}
+ 
  	return false;
  }
  
***************
*** 1107,1112 ****
--- 2118,2127 ----
  	 */
  	pendingActions = NIL;
  	pendingNotifies = NIL;
+ 
+ 	backendSendsNotifications = false;
+ 	backendExecutesListen = false;
+ 	backendExecutesUnlisten = false;
  }
  
  /*
***************
*** 1124,1128 ****
  	 * there is any significant delay before I commit.	OK for now because we
  	 * disallow COMMIT PREPARED inside a transaction block.)
  	 */
! 	Async_Notify((char *) recdata);
  }
--- 2139,2149 ----
  	 * there is any significant delay before I commit.	OK for now because we
  	 * disallow COMMIT PREPARED inside a transaction block.)
  	 */
! 	AsyncQueueEntry		*qe = (AsyncQueueEntry *) recdata;
! 
! 	Assert(qe->dboid == MyDatabaseId);
! 	Assert(qe->length == len);
! 
! 	Async_Notify(qe->channel, qe->payload);
  }
+ 
diff -cr cvs.head/src/backend/nodes/copyfuncs.c cvs.build/src/backend/nodes/copyfuncs.c
*** cvs.head/src/backend/nodes/copyfuncs.c	2010-01-06 22:30:06.000000000 +0100
--- cvs.build/src/backend/nodes/copyfuncs.c	2010-01-08 11:00:55.000000000 +0100
***************
*** 2770,2775 ****
--- 2770,2776 ----
  	NotifyStmt *newnode = makeNode(NotifyStmt);
  
  	COPY_STRING_FIELD(conditionname);
+ 	COPY_STRING_FIELD(payload);
  
  	return newnode;
  }
diff -cr cvs.head/src/backend/nodes/equalfuncs.c cvs.build/src/backend/nodes/equalfuncs.c
*** cvs.head/src/backend/nodes/equalfuncs.c	2010-01-06 22:30:06.000000000 +0100
--- cvs.build/src/backend/nodes/equalfuncs.c	2010-01-08 11:00:55.000000000 +0100
***************
*** 1324,1329 ****
--- 1324,1330 ----
  _equalNotifyStmt(NotifyStmt *a, NotifyStmt *b)
  {
  	COMPARE_STRING_FIELD(conditionname);
+ 	COMPARE_STRING_FIELD(payload);
  
  	return true;
  }
diff -cr cvs.head/src/backend/nodes/outfuncs.c cvs.build/src/backend/nodes/outfuncs.c
*** cvs.head/src/backend/nodes/outfuncs.c	2010-01-06 22:30:06.000000000 +0100
--- cvs.build/src/backend/nodes/outfuncs.c	2010-01-08 11:00:55.000000000 +0100
***************
*** 1817,1822 ****
--- 1817,1823 ----
  	WRITE_NODE_TYPE("NOTIFY");
  
  	WRITE_STRING_FIELD(conditionname);
+ 	WRITE_STRING_FIELD(payload);
  }
  
  static void
diff -cr cvs.head/src/backend/nodes/readfuncs.c cvs.build/src/backend/nodes/readfuncs.c
*** cvs.head/src/backend/nodes/readfuncs.c	2010-01-05 12:39:25.000000000 +0100
--- cvs.build/src/backend/nodes/readfuncs.c	2010-01-08 11:00:55.000000000 +0100
***************
*** 231,236 ****
--- 231,237 ----
  	READ_LOCALS(NotifyStmt);
  
  	READ_STRING_FIELD(conditionname);
+ 	READ_STRING_FIELD(payload);
  
  	READ_DONE();
  }
diff -cr cvs.head/src/backend/parser/gram.y cvs.build/src/backend/parser/gram.y
*** cvs.head/src/backend/parser/gram.y	2010-01-06 22:30:07.000000000 +0100
--- cvs.build/src/backend/parser/gram.y	2010-01-08 11:05:17.000000000 +0100
***************
*** 399,405 ****
  
  %type <ival>	Iconst SignedIconst
  %type <list>	Iconst_list
! %type <str>		Sconst comment_text
  %type <str>		RoleId opt_granted_by opt_boolean ColId_or_Sconst
  %type <list>	var_list
  %type <str>		ColId ColLabel var_name type_function_name param_name
--- 399,405 ----
  
  %type <ival>	Iconst SignedIconst
  %type <list>	Iconst_list
! %type <str>		Sconst comment_text notify_payload
  %type <str>		RoleId opt_granted_by opt_boolean ColId_or_Sconst
  %type <list>	var_list
  %type <str>		ColId ColLabel var_name type_function_name param_name
***************
*** 6074,6083 ****
   *
   *****************************************************************************/
  
! NotifyStmt: NOTIFY ColId
  				{
  					NotifyStmt *n = makeNode(NotifyStmt);
  					n->conditionname = $2;
  					$$ = (Node *)n;
  				}
  		;
--- 6074,6089 ----
   *
   *****************************************************************************/
  
! notify_payload:
! 			Sconst								{ $$ = $1; }
! 			| /*EMPTY*/							{ $$ = NULL; }
! 		;
! 
! NotifyStmt: NOTIFY ColId notify_payload
  				{
  					NotifyStmt *n = makeNode(NotifyStmt);
  					n->conditionname = $2;
+ 					n->payload = $3;
  					$$ = (Node *)n;
  				}
  		;
diff -cr cvs.head/src/backend/storage/ipc/ipci.c cvs.build/src/backend/storage/ipc/ipci.c
*** cvs.head/src/backend/storage/ipc/ipci.c	2010-01-05 12:39:28.000000000 +0100
--- cvs.build/src/backend/storage/ipc/ipci.c	2010-01-08 11:00:55.000000000 +0100
***************
*** 219,224 ****
--- 219,225 ----
  	 */
  	BTreeShmemInit();
  	SyncScanShmemInit();
+ 	AsyncShmemInit();
  
  #ifdef EXEC_BACKEND
  
diff -cr cvs.head/src/backend/storage/lmgr/lwlock.c cvs.build/src/backend/storage/lmgr/lwlock.c
*** cvs.head/src/backend/storage/lmgr/lwlock.c	2010-01-05 12:39:29.000000000 +0100
--- cvs.build/src/backend/storage/lmgr/lwlock.c	2010-01-08 11:00:55.000000000 +0100
***************
*** 24,29 ****
--- 24,30 ----
  #include "access/clog.h"
  #include "access/multixact.h"
  #include "access/subtrans.h"
+ #include "commands/async.h"
  #include "miscadmin.h"
  #include "pg_trace.h"
  #include "storage/ipc.h"
***************
*** 174,179 ****
--- 175,183 ----
  	/* multixact.c needs two SLRU areas */
  	numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS;
  
+ 	/* async.c needs one per page for the AsyncQueue */
+ 	numLocks += NUM_ASYNC_BUFFERS;
+ 
  	/*
  	 * Add any requested by loadable modules; for backwards-compatibility
  	 * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
diff -cr cvs.head/src/backend/tcop/utility.c cvs.build/src/backend/tcop/utility.c
*** cvs.head/src/backend/tcop/utility.c	2010-01-06 22:30:08.000000000 +0100
--- cvs.build/src/backend/tcop/utility.c	2010-01-08 11:04:38.000000000 +0100
***************
*** 928,936 ****
  		case T_NotifyStmt:
  			{
  				NotifyStmt *stmt = (NotifyStmt *) parsetree;
  				PreventCommandDuringRecovery();
  
! 				Async_Notify(stmt->conditionname);
  			}
  			break;
  
--- 928,943 ----
  		case T_NotifyStmt:
  			{
  				NotifyStmt *stmt = (NotifyStmt *) parsetree;
+ 				/* XXX the new listen/notify version can be enabled
+ 				 * for Hot Standby */
  				PreventCommandDuringRecovery();
  
! 				if (stmt->payload
! 					&& strlen(stmt->payload) > NOTIFY_PAYLOAD_MAX_LENGTH - 1)
! 					ereport(ERROR,
! 							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
! 							 errmsg("payload string too long")));
! 				Async_Notify(stmt->conditionname, stmt->payload);
  			}
  			break;
  
diff -cr cvs.head/src/bin/initdb/initdb.c cvs.build/src/bin/initdb/initdb.c
*** cvs.head/src/bin/initdb/initdb.c	2010-01-08 10:48:31.000000000 +0100
--- cvs.build/src/bin/initdb/initdb.c	2010-01-08 11:00:55.000000000 +0100
***************
*** 2458,2463 ****
--- 2458,2464 ----
  		"pg_xlog",
  		"pg_xlog/archive_status",
  		"pg_clog",
+ 		"pg_notify",
  		"pg_subtrans",
  		"pg_twophase",
  		"pg_multixact/members",
diff -cr cvs.head/src/bin/psql/common.c cvs.build/src/bin/psql/common.c
*** cvs.head/src/bin/psql/common.c	2010-01-05 12:39:33.000000000 +0100
--- cvs.build/src/bin/psql/common.c	2010-01-08 11:00:55.000000000 +0100
***************
*** 555,562 ****
  
  	while ((notify = PQnotifies(pset.db)))
  	{
! 		fprintf(pset.queryFout, _("Asynchronous notification \"%s\" received from server process with PID %d.\n"),
! 				notify->relname, notify->be_pid);
  		fflush(pset.queryFout);
  		PQfreemem(notify);
  	}
--- 555,562 ----
  
  	while ((notify = PQnotifies(pset.db)))
  	{
! 		fprintf(pset.queryFout, _("Asynchronous notification \"%s\" (%s) received from server process with PID %d.\n"),
! 				notify->relname, notify->extra, notify->be_pid);
  		fflush(pset.queryFout);
  		PQfreemem(notify);
  	}
diff -cr cvs.head/src/bin/psql/tab-complete.c cvs.build/src/bin/psql/tab-complete.c
*** cvs.head/src/bin/psql/tab-complete.c	2010-01-05 12:39:33.000000000 +0100
--- cvs.build/src/bin/psql/tab-complete.c	2010-01-08 11:00:55.000000000 +0100
***************
*** 2099,2105 ****
  
  /* UNLISTEN */
  	else if (pg_strcasecmp(prev_wd, "UNLISTEN") == 0)
! 		COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(relname) FROM pg_catalog.pg_listener WHERE substring(pg_catalog.quote_ident(relname),1,%d)='%s' UNION SELECT '*'");
  
  /* UPDATE */
  	/* If prev. word is UPDATE suggest a list of tables */
--- 2099,2105 ----
  
  /* UNLISTEN */
  	else if (pg_strcasecmp(prev_wd, "UNLISTEN") == 0)
! 		COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(channel) FROM pg_catalog.pg_listening() AS channel WHERE substring(pg_catalog.quote_ident(channel),1,%d)='%s' UNION SELECT '*'");
  
  /* UPDATE */
  	/* If prev. word is UPDATE suggest a list of tables */
diff -cr cvs.head/src/include/access/slru.h cvs.build/src/include/access/slru.h
*** cvs.head/src/include/access/slru.h	2010-01-05 12:39:34.000000000 +0100
--- cvs.build/src/include/access/slru.h	2010-01-08 11:00:56.000000000 +0100
***************
*** 16,21 ****
--- 16,40 ----
  #include "access/xlogdefs.h"
  #include "storage/lwlock.h"
  
+ /*
+  * Define segment size.  A page is the same BLCKSZ as is used everywhere
+  * else in Postgres.  The segment size can be chosen somewhat arbitrarily;
+  * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG
+  * or 64K transactions for SUBTRANS.
+  *
+  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+  * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where
+  * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at
+  * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need
+  * take no explicit notice of that fact in this module, except when comparing
+  * segment and page numbers in SimpleLruTruncate (see PagePrecedes()).
+  *
+  * Note: this file currently assumes that segment file names will be four
+  * hex digits.	This sets a lower bound on the segment size (64K transactions
+  * for 32-bit TransactionIds).
+  */
+ #define SLRU_PAGES_PER_SEGMENT	32
+ 
  
  /*
   * Page status codes.  Note that these do not include the "dirty" bit.
diff -cr cvs.head/src/include/catalog/pg_proc.h cvs.build/src/include/catalog/pg_proc.h
*** cvs.head/src/include/catalog/pg_proc.h	2010-01-08 10:48:32.000000000 +0100
--- cvs.build/src/include/catalog/pg_proc.h	2010-01-08 11:00:56.000000000 +0100
***************
*** 4084,4089 ****
--- 4084,4091 ----
  DESCR("get the prepared statements for this session");
  DATA(insert OID = 2511 (  pg_cursor PGNSP PGUID 12 1 1000 0 f f f t t s 0 0 2249 "" "{25,25,16,16,16,1184}" "{o,o,o,o,o,o}" "{name,statement,is_holdable,is_binary,is_scrollable,creation_time}" _null_ pg_cursor _null_ _null_ _null_ ));
  DESCR("get the open cursors for this session");
+ DATA(insert OID = 2187 (  pg_listening	PGNSP	PGUID 12 1 10 0 f f f t t s 0 0 25 "" _null_ _null_ _null_ _null_ pg_listening _null_ _null_ _null_ ));
+ DESCR("get the channels that the current backend listens to");
  DATA(insert OID = 2599 (  pg_timezone_abbrevs	PGNSP PGUID 12 1 1000 0 f f f t t s 0 0 2249 "" "{25,1186,16}" "{o,o,o}" "{abbrev,utc_offset,is_dst}" _null_ pg_timezone_abbrevs _null_ _null_ _null_ ));
  DESCR("get the available time zone abbreviations");
  DATA(insert OID = 2856 (  pg_timezone_names		PGNSP PGUID 12 1 1000 0 f f f t t s 0 0 2249 "" "{25,25,1186,16}" "{o,o,o,o}" "{name,abbrev,utc_offset,is_dst}" _null_ pg_timezone_names _null_ _null_ _null_ ));
diff -cr cvs.head/src/include/commands/async.h cvs.build/src/include/commands/async.h
*** cvs.head/src/include/commands/async.h	2010-01-05 12:39:35.000000000 +0100
--- cvs.build/src/include/commands/async.h	2010-01-08 11:00:56.000000000 +0100
***************
*** 13,28 ****
  #ifndef ASYNC_H
  #define ASYNC_H
  
  extern bool Trace_notify;
  
  /* notify-related SQL statements */
! extern void Async_Notify(const char *relname);
  extern void Async_Listen(const char *relname);
  extern void Async_Unlisten(const char *relname);
  extern void Async_UnlistenAll(void);
  
  /* perform (or cancel) outbound notify processing at transaction commit */
! extern void AtCommit_Notify(void);
  extern void AtAbort_Notify(void);
  extern void AtSubStart_Notify(void);
  extern void AtSubCommit_Notify(void);
--- 13,42 ----
  #ifndef ASYNC_H
  #define ASYNC_H
  
+ /*
+  * How long can a payload string possibly be? Actually it needs to be one
+  * byte less to provide space for the trailing terminating '\0'.
+  */
+ #define NOTIFY_PAYLOAD_MAX_LENGTH	8000
+ 
+ /*
+  * How many page slots do we reserve ?
+  */
+ #define NUM_ASYNC_BUFFERS			4
+ 
  extern bool Trace_notify;
  
+ extern void AsyncShmemInit(void);
+ 
  /* notify-related SQL statements */
! extern void Async_Notify(const char *relname, const char *payload);
  extern void Async_Listen(const char *relname);
  extern void Async_Unlisten(const char *relname);
  extern void Async_UnlistenAll(void);
  
  /* perform (or cancel) outbound notify processing at transaction commit */
! extern void AtCommit_NotifyBeforeCommit(void);
! extern void AtCommit_NotifyAfterCommit(void);
  extern void AtAbort_Notify(void);
  extern void AtSubStart_Notify(void);
  extern void AtSubCommit_Notify(void);
***************
*** 43,46 ****
--- 57,62 ----
  extern void notify_twophase_postcommit(TransactionId xid, uint16 info,
  						   void *recdata, uint32 len);
  
+ extern Datum pg_listening(PG_FUNCTION_ARGS);
+ 
  #endif   /* ASYNC_H */
diff -cr cvs.head/src/include/nodes/parsenodes.h cvs.build/src/include/nodes/parsenodes.h
*** cvs.head/src/include/nodes/parsenodes.h	2010-01-06 22:30:09.000000000 +0100
--- cvs.build/src/include/nodes/parsenodes.h	2010-01-08 11:00:56.000000000 +0100
***************
*** 2082,2087 ****
--- 2082,2088 ----
  {
  	NodeTag		type;
  	char	   *conditionname;	/* condition name to notify */
+ 	char	   *payload;		/* the payload string to be conveyed */
  } NotifyStmt;
  
  /* ----------------------
diff -cr cvs.head/src/include/storage/lwlock.h cvs.build/src/include/storage/lwlock.h
*** cvs.head/src/include/storage/lwlock.h	2010-01-05 12:39:36.000000000 +0100
--- cvs.build/src/include/storage/lwlock.h	2010-01-08 11:00:56.000000000 +0100
***************
*** 67,72 ****
--- 67,75 ----
  	AutovacuumLock,
  	AutovacuumScheduleLock,
  	SyncScanLock,
+ 	AsyncCtlLock,
+ 	AsyncQueueLock,
+ 	AsyncCommitOrderLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
diff -cr cvs.head/src/include/utils/errcodes.h cvs.build/src/include/utils/errcodes.h
*** cvs.head/src/include/utils/errcodes.h	2010-01-05 12:39:36.000000000 +0100
--- cvs.build/src/include/utils/errcodes.h	2010-01-08 11:00:56.000000000 +0100
***************
*** 318,323 ****
--- 318,324 ----
  #define ERRCODE_STATEMENT_TOO_COMPLEX		MAKE_SQLSTATE('5','4', '0','0','1')
  #define ERRCODE_TOO_MANY_COLUMNS			MAKE_SQLSTATE('5','4', '0','1','1')
  #define ERRCODE_TOO_MANY_ARGUMENTS			MAKE_SQLSTATE('5','4', '0','2','3')
+ #define ERRCODE_TOO_MANY_ENTRIES			MAKE_SQLSTATE('5','4', '0','3','1')
  
  /* Class 55 - Object Not In Prerequisite State (class borrowed from DB2) */
  #define ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE	MAKE_SQLSTATE('5','5', '0','0','0')
diff -cr cvs.head/src/test/regress/expected/guc.out cvs.build/src/test/regress/expected/guc.out
*** cvs.head/src/test/regress/expected/guc.out	2009-11-22 06:20:41.000000000 +0100
--- cvs.build/src/test/regress/expected/guc.out	2010-01-08 11:00:56.000000000 +0100
***************
*** 532,540 ****
  CREATE ROLE temp_reset_user;
  SET SESSION AUTHORIZATION temp_reset_user;
  -- look changes
! SELECT relname FROM pg_listener;
!   relname  
! -----------
   foo_event
  (1 row)
  
--- 532,540 ----
  CREATE ROLE temp_reset_user;
  SET SESSION AUTHORIZATION temp_reset_user;
  -- look changes
! SELECT pg_listening();
!  pg_listening 
! --------------
   foo_event
  (1 row)
  
***************
*** 571,579 ****
  -- discard everything
  DISCARD ALL;
  -- look again
! SELECT relname FROM pg_listener;
!  relname 
! ---------
  (0 rows)
  
  SELECT name FROM pg_prepared_statements;
--- 571,579 ----
  -- discard everything
  DISCARD ALL;
  -- look again
! SELECT pg_listening();
!  pg_listening 
! --------------
  (0 rows)
  
  SELECT name FROM pg_prepared_statements;
diff -cr cvs.head/src/test/regress/expected/sanity_check.out cvs.build/src/test/regress/expected/sanity_check.out
*** cvs.head/src/test/regress/expected/sanity_check.out	2010-01-05 12:39:38.000000000 +0100
--- cvs.build/src/test/regress/expected/sanity_check.out	2010-01-08 14:28:26.000000000 +0100
***************
*** 107,113 ****
   pg_language             | t
   pg_largeobject          | t
   pg_largeobject_metadata | t
-  pg_listener             | f
   pg_namespace            | t
   pg_opclass              | t
   pg_operator             | t
--- 107,112 ----
***************
*** 154,160 ****
   timetz_tbl              | f
   tinterval_tbl           | f
   varchar_tbl             | f
! (143 rows)
  
  --
  -- another sanity check: every system catalog that has OIDs should have
--- 153,159 ----
   timetz_tbl              | f
   tinterval_tbl           | f
   varchar_tbl             | f
! (142 rows)
  
  --
  -- another sanity check: every system catalog that has OIDs should have
diff -cr cvs.head/src/test/regress/sql/guc.sql cvs.build/src/test/regress/sql/guc.sql
*** cvs.head/src/test/regress/sql/guc.sql	2009-10-21 22:38:58.000000000 +0200
--- cvs.build/src/test/regress/sql/guc.sql	2010-01-08 11:00:56.000000000 +0100
***************
*** 165,171 ****
  CREATE ROLE temp_reset_user;
  SET SESSION AUTHORIZATION temp_reset_user;
  -- look changes
! SELECT relname FROM pg_listener;
  SELECT name FROM pg_prepared_statements;
  SELECT name FROM pg_cursors;
  SHOW vacuum_cost_delay;
--- 165,171 ----
  CREATE ROLE temp_reset_user;
  SET SESSION AUTHORIZATION temp_reset_user;
  -- look changes
! SELECT pg_listening();
  SELECT name FROM pg_prepared_statements;
  SELECT name FROM pg_cursors;
  SHOW vacuum_cost_delay;
***************
*** 174,180 ****
  -- discard everything
  DISCARD ALL;
  -- look again
! SELECT relname FROM pg_listener;
  SELECT name FROM pg_prepared_statements;
  SELECT name FROM pg_cursors;
  SHOW vacuum_cost_delay;
--- 174,180 ----
  -- discard everything
  DISCARD ALL;
  -- look again
! SELECT pg_listening();
  SELECT name FROM pg_prepared_statements;
  SELECT name FROM pg_cursors;
  SHOW vacuum_cost_delay;
