Listen / Notify rewrite

Started by Joachim Wielandabout 16 years ago63 messages
#1Joachim Wieland
joe@mcknight.de
1 attachment(s)

Hi,

Attached is a patch for a new listen/notify implementation.

In a few words, the patch reimplements listen/notify as an slru-based queue
which works similar to the sinval structure. Essentially it is a ring buffer on
disk with pages mapped into shared memory for read/write access.

Additionally the patch does the following (see below for details):

1. It removes the pg_listener relation and
2. adds the possibility to specify a payload parameter, i.e. executing in SQL
"NOTIFY foo 'payload';" and 'payload' will be delivered to any listening
backend.
3. Every distinct notification is delivered.
4. Order is preserved, i.e. if txn 1 first does NOTIFY foo, then NOTIFY bar, a
backend (listening to both "foo" and "bar") will always first receive the
notification "foo" and then the notification "bar".
5. It's now "listen to a channel", not "listen to a relation" anymore...

Details:

1. Instead of placing the queue into shared memory only I propose to create a
new subdirectory pg_notify/ and make the queue slru-based, such that we do not
risk blocking. Several people here have pointed out that blocking is a true
no-go for a new listen/notify implementation. With an slru-based queue we have
so much space that blocking is really unlikely even in periods with extreme
notify bursts.
Regarding performance, the slru-queue is not fsync-ed to disk so most activity
would be in the OS file cache memory anyway and most backends will probably
work on the same pages most of the time. However more locking overhead is
required in comparison to a shared-memory-only implementation.

There is one doubt that I have: Currently the patch adds notifications to the
queue after the transaction has committed to clog. The advantage is that we do
not need to take care of visibility. When we add notifications to the queue, we
have committed our transaction already and all reading backends are not in a
transaction anyway, so everything is visible to everyone and we can just write
to and read from the queue.

However, if for some reason we cannot write to the slru files in the pg_notify/
directory we might want to roll back the current transaction but with the
proposed patch we cannot because we have already committed...
But... if there is a problem with the pg_notify/ directory, then something is
fundamentally wrong on the file system of the database server and pg_subtrans/
and pg_clog/ are probably affected by the same problem... One possible solution
would be to write to the queue before committing and adding the TransactionID.
Then other backends can check if our TransactionID has successfully committed
or not. Not sure if this is worth the overhead however...

2. The payload parameter is optional. A notifying client can either call
"NOTIFY foo;" or "NOTIFY foo 'payload';". The length of the payload is
currently limited to 128 characters... Not sure if we should allow longer
payload strings... If there is more complex data to be transferred, the sending
transaction can always put all of that data into a relation and just send the
id of the entry. If no payload is specified, then this is treated internally
like an empty payload. Consequently an empty string will be delivered as the
payload to the listening backend.

3. Not every notification is delivered, only distinct notifications (per
transaction).
In other words, each sending transaction eliminates duplicate notifications
that have the exact same payload and channel name as another notification that
is already in the queue to be sent out.

Should we have an upper limit on the number of notifications that a transaction
is allowed to send? Without an upper limit, a client can open a transaction and
send a series of NOTIFYs, each with a different payload until its backend runs
out of memory...

4.
Sending Transaction does: This will be delivered (on commit):

NOTIFY foo; NOTIFY foo;
NOTIFY foo; NOTIFY bar 'value1';
NOTIFY bar 'value1'; NOTIFY bar 'value2';
NOTIFY foo;
NOTIFY bar 'value2';
NOTIFY bar 'value1';

Note that we do _not_ guarantee that notifications from transaction 1 are
always delivered before the notifications of transaction 2 just because
transaction 1 committed before transaction 2.

Let me know what you think,

Regards,
Joachim

Attachments:

listennotify.1.difftext/x-diff; charset=US-ASCII; name=listennotify.1.diffDownload
diff -cr cvs/src/backend/access/transam/slru.c cvs.build/src/backend/access/transam/slru.c
*** cvs/src/backend/access/transam/slru.c	2009-05-10 19:49:47.000000000 +0200
--- cvs.build/src/backend/access/transam/slru.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 59,83 ****
  #include "miscadmin.h"
  
  
- /*
-  * Define segment size.  A page is the same BLCKSZ as is used everywhere
-  * else in Postgres.  The segment size can be chosen somewhat arbitrarily;
-  * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG
-  * or 64K transactions for SUBTRANS.
-  *
-  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
-  * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where
-  * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at
-  * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need
-  * take no explicit notice of that fact in this module, except when comparing
-  * segment and page numbers in SimpleLruTruncate (see PagePrecedes()).
-  *
-  * Note: this file currently assumes that segment file names will be four
-  * hex digits.	This sets a lower bound on the segment size (64K transactions
-  * for 32-bit TransactionIds).
-  */
- #define SLRU_PAGES_PER_SEGMENT	32
- 
  #define SlruFileName(ctl, path, seg) \
  	snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg)
  
--- 59,64 ----
diff -cr cvs/src/backend/access/transam/xact.c cvs.build/src/backend/access/transam/xact.c
*** cvs/src/backend/access/transam/xact.c	2009-09-06 08:58:59.000000000 +0200
--- cvs.build/src/backend/access/transam/xact.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 1604,1612 ****
  	/* close large objects before lower-level cleanup */
  	AtEOXact_LargeObject(true);
  
- 	/* NOTIFY commit must come before lower-level cleanup */
- 	AtCommit_Notify();
- 
  	/* Prevent cancel/die interrupt while cleaning up */
  	HOLD_INTERRUPTS();
  
--- 1604,1609 ----
***************
*** 1690,1695 ****
--- 1687,1693 ----
  	/* Check we've released all catcache entries */
  	AtEOXact_CatCache(true);
  
+ 	AtCommit_Notify();
  	AtEOXact_GUC(true, 1);
  	AtEOXact_SPI(true);
  	AtEOXact_on_commit_actions(true);
diff -cr cvs/src/backend/catalog/Makefile cvs.build/src/backend/catalog/Makefile
*** cvs/src/backend/catalog/Makefile	2009-10-31 14:47:46.000000000 +0100
--- cvs.build/src/backend/catalog/Makefile	2009-11-11 01:11:08.000000000 +0100
***************
*** 30,36 ****
  	pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \
  	pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
  	pg_language.h pg_largeobject.h pg_aggregate.h pg_statistic.h \
! 	pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h pg_cast.h \
  	pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \
  	pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \
  	pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \
--- 30,36 ----
  	pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \
  	pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
  	pg_language.h pg_largeobject.h pg_aggregate.h pg_statistic.h \
! 	pg_rewrite.h pg_trigger.h pg_description.h pg_cast.h \
  	pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \
  	pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \
  	pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \
diff -cr cvs/src/backend/commands/async.c cvs.build/src/backend/commands/async.c
*** cvs/src/backend/commands/async.c	2009-09-06 08:59:06.000000000 +0200
--- cvs.build/src/backend/commands/async.c	2009-11-11 20:32:20.000000000 +0100
***************
*** 14,44 ****
  
  /*-------------------------------------------------------------------------
   * New Async Notification Model:
!  * 1. Multiple backends on same machine.  Multiple backends listening on
!  *	  one relation.  (Note: "listening on a relation" is not really the
!  *	  right way to think about it, since the notify names need not have
!  *	  anything to do with the names of relations actually in the database.
!  *	  But this terminology is all over the code and docs, and I don't feel
!  *	  like trying to replace it.)
!  *
!  * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
!  *	  ie, each relname/listenerPID pair.  The "notification" field of the
!  *	  tuple is zero when no NOTIFY is pending for that listener, or the PID
!  *	  of the originating backend when a cross-backend NOTIFY is pending.
!  *	  (We skip writing to pg_listener when doing a self-NOTIFY, so the
!  *	  notification field should never be equal to the listenerPID field.)
!  *
!  * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
!  *	  relname to a list of outstanding NOTIFY requests.  Actual processing
!  *	  happens if and only if we reach transaction commit.  At that time (in
!  *	  routine AtCommit_Notify) we scan pg_listener for matching relnames.
!  *	  If the listenerPID in a matching tuple is ours, we just send a notify
!  *	  message to our own front end.  If it is not ours, and "notification"
!  *	  is not already nonzero, we set notification to our own PID and send a
!  *	  PROCSIG_NOTIFY_INTERRUPT signal to the receiving process (indicated by
!  *	  listenerPID).
!  *	  BTW: if the signal operation fails, we presume that the listener backend
!  *	  crashed without removing this tuple, and remove the tuple for it.
   *
   * 4. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
   *	  can call inbound-notify processing immediately if this backend is idle
--- 14,60 ----
  
  /*-------------------------------------------------------------------------
   * New Async Notification Model:
!  *
!  * 1. Multiple backends on same machine. Multiple backends listening on
!  *	  several channels. (This was previously called a "relation" even though it
!  *	  is just an identifier and has nothing to do with a database relation.)
!  *
!  * 2. There is one central queue in the form of Slru backed file based storage
!  *    (directory pg_notify/), with several pages mapped in to shared memory.
!  *
!  *    There is no central storage of which backend listens on which channel,
!  *    every backend has its own list.
!  *
!  *    Every backend that is listening on at least one channel registers by
!  *    entering its Pid into the array of all backends. It then scans all
!  *    incoming notifications and compares the notified channels with its list.
!  *
!  *    In case there is a match it delivers the corresponding notification to
!  *    its frontend.
!  *
!  * 3. The NOTIFY statement (routine Async_Notify) registers the notification
!  *    in a list which will not be processed until at transaction end. Every
!  *    notification can additionally send a "payload" which is an extra text
!  *    parameter to convey arbitrary information to the recipient.
!  *
!  *    Duplicate notifications from the same transaction are sent out as one
!  *    notification only. This is done to save work when for example a trigger
!  *    on a 2 million row table fires a notification for each row that has been
!  *    changed. If the applications needs to receive any notification that has
!  *    been sent, it can easily add some unique string into the extra payload
!  *    parameter.
!  *
!  *    Once the transaction commits, AtCommit_Notify performs the required
!  *    changes regarding listeners (Listen/Unlisten) and then adds the pending
!  *    notifications to the beginning of the queue. The head pointer of the
!  *    queue always points to the next free position and a position is just a
!  *    page number and the offset in that page.
!  *
!  *    After adding the notifications and adjusting the head pointer, the list
!  *    of listening backends is scanned and we send a PROCSIG_NOTIFY_INTERRUPT
!  *    to every backend that has set its Pid (We don't know which backend is
!  *    listening on which channel so we need to send a signal to every listening
!  *    backend).
   *
   * 4. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
   *	  can call inbound-notify processing immediately if this backend is idle
***************
*** 46,93 ****
   *	  block).  Otherwise the handler may only set a flag, which will cause the
   *	  processing to occur just before we next go idle.
   *
!  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
!  *	  matching our own listenerPID and having nonzero notification fields.
!  *	  For each such tuple, we send a message to our frontend and clear the
!  *	  notification field.  BTW: this routine has to start/commit its own
!  *	  transaction, since by assumption it is only called from outside any
!  *	  transaction.
!  *
!  * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list
!  * of pending actions.	If we reach transaction commit, the changes are
!  * applied to pg_listener just before executing any pending NOTIFYs.  This
!  * method is necessary because to avoid race conditions, we must hold lock
!  * on pg_listener from when we insert a new listener tuple until we commit.
!  * To do that and not create undue hazard of deadlock, we don't want to
!  * touch pg_listener until we are otherwise done with the transaction;
!  * in particular it'd be uncool to still be taking user-commanded locks
!  * while holding the pg_listener lock.
!  *
!  * Although we grab ExclusiveLock on pg_listener for any operation,
!  * the lock is never held very long, so it shouldn't cause too much of
!  * a performance problem.  (Previously we used AccessExclusiveLock, but
!  * there's no real reason to forbid concurrent reads.)
   *
!  * An application that listens on the same relname it notifies will get
   * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
   * by comparing be_pid in the NOTIFY message to the application's own backend's
!  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
   * frontend during startup.)  The above design guarantees that notifies from
!  * other backends will never be missed by ignoring self-notifies.  Note,
!  * however, that we do *not* guarantee that a separate frontend message will
!  * be sent for every outside NOTIFY.  Since there is only room for one
!  * originating PID in pg_listener, outside notifies occurring at about the
!  * same time may be collapsed into a single message bearing the PID of the
!  * first outside backend to perform the NOTIFY.
   *-------------------------------------------------------------------------
   */
  
  #include "postgres.h"
  
  #include <unistd.h>
  #include <signal.h>
  
  #include "access/heapam.h"
  #include "access/twophase_rmgr.h"
  #include "access/xact.h"
  #include "catalog/pg_listener.h"
--- 62,103 ----
   *	  block).  Otherwise the handler may only set a flag, which will cause the
   *	  processing to occur just before we next go idle.
   *
!  * 5. Inbound-notify processing consists of reading all of the notifications
!  *	  that have arrived since scanning last time. We read every notification
!  *	  until we reach the head pointer's position. Then we check if we were the
!  *	  laziest backend: if our pointer is set to the same position as the global
!  *	  tail pointer is set, then we set it further to the second-laziest
!  *	  backend (This we identify by inspecting the positions of all other
!  *	  backends' pointers). Whenever we move the tail pointer we also truncate
!  *	  now unused pages (i.e. delete files in pg_notify/ that are no longer
!  *	  used).
   *
!  * An application that listens on the same channel it notifies will get
   * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
   * by comparing be_pid in the NOTIFY message to the application's own backend's
!  * Pid.  (As of FE/BE protocol 2.0, the backend's Pid is provided to the
   * frontend during startup.)  The above design guarantees that notifies from
!  * other backends will never be missed by ignoring self-notifies.
   *-------------------------------------------------------------------------
   */
  
+ /* XXX 
+  *
+  * TODO:
+  *  - does a variable length for payload make sense ??
+  *  - tests with multiple dbs
+  *  - guc parameter max_notifies_per_txn ??
+  *  - adapt comments
+  */
+ 
  #include "postgres.h"
  
  #include <unistd.h>
  #include <signal.h>
  
  #include "access/heapam.h"
+ #include "access/slru.h"
+ #include "access/transam.h"
  #include "access/twophase_rmgr.h"
  #include "access/xact.h"
  #include "catalog/pg_listener.h"
***************
*** 108,115 ****
  
  /*
   * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
!  * all actions requested in the current transaction.  As explained above,
!  * we don't actually modify pg_listener until we reach transaction commit.
   *
   * The list is kept in CurTransactionContext.  In subtransactions, each
   * subtransaction has its own list in its own CurTransactionContext, but
--- 118,125 ----
  
  /*
   * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
!  * all actions requested in the current transaction. As explained above,
!  * we don't actually send notifications until we reach transaction commit.
   *
   * The list is kept in CurTransactionContext.  In subtransactions, each
   * subtransaction has its own list in its own CurTransactionContext, but
***************
*** 134,140 ****
  static List *upperPendingActions = NIL; /* list of upper-xact lists */
  
  /*
!  * State for outbound notifies consists of a list of all relnames NOTIFYed
   * in the current transaction.	We do not actually perform a NOTIFY until
   * and unless the transaction commits.	pendingNotifies is NIL if no
   * NOTIFYs have been done in the current transaction.
--- 144,150 ----
  static List *upperPendingActions = NIL; /* list of upper-xact lists */
  
  /*
!  * State for outbound notifies consists of a list of all channels NOTIFYed
   * in the current transaction.	We do not actually perform a NOTIFY until
   * and unless the transaction commits.	pendingNotifies is NIL if no
   * NOTIFYs have been done in the current transaction.
***************
*** 149,158 ****
   * condition name, it will get a self-notify at commit.  This is a bit odd
   * but is consistent with our historical behavior.
   */
! static List *pendingNotifies = NIL;		/* list of C strings */
  
  static List *upperPendingNotifies = NIL;		/* list of upper-xact lists */
  
  /*
   * State for inbound notifies consists of two flags: one saying whether
   * the signal handler is currently allowed to call ProcessIncomingNotify
--- 159,274 ----
   * condition name, it will get a self-notify at commit.  This is a bit odd
   * but is consistent with our historical behavior.
   */
! 
! typedef struct Notification
! {
! 	char   *channel;
! 	char   *payload;
! 	/* we only need one of both, depending on whether we send a notification or
! 	 * receive one. */
! 	union {
! 		int32 dstPid;
! 		int32 srcPid;
! 	};
! } Notification;
! 
! typedef struct AsyncQueueEntry
! {
! 	Oid		dboid;
! 	int32	srcPid;
! 	char	channel[NAMEDATALEN];
! 	char	payload[NOTIFY_PAYLOAD_MAX_LENGTH];
! } AsyncQueueEntry;
! 
! #define	INVALID_PID (-1)
! #define QUEUE_POS_PAGE(x) ((x).page)
! #define QUEUE_POS_OFFSET(x) ((x).offset)
! #define QUEUE_POS_EQUAL(x,y) \
! 	 ((x).page == (y).page ? (y).offset == (x).offset : false)
! #define SET_QUEUE_POS(x,y,z) \
! 	do { \
! 		(x).page = (y); \
! 		(x).offset = (z); \
! 	} while (0);
! /* does page x precede page y with z = HEAD ? */
! #define QUEUE_POS_MIN(x,y,z) \
! 		AsyncPagePrecedesLogically((x).page, (y).page, (z).page) ? (x) : \
! 			 AsyncPagePrecedesLogically((y).page, (x).page, (z).page) ? (y) : \
! 				 (x).offset < (y).offset ? (x) : \
! 				 	(y)
! #define QUEUE_BACKEND_POS(i) asyncQueueControl->backend[(i)].pos
! #define QUEUE_BACKEND_PID(i) asyncQueueControl->backend[(i)].pid
! #define QUEUE_HEAD asyncQueueControl->head
! #define QUEUE_TAIL asyncQueueControl->tail
! 
! typedef struct QueuePosition
! {
! 	int			page;
! 	int			offset;
! } QueuePosition;
! 
! typedef struct QueueBackendStatus
! {
! 	int32			pid;
! 	QueuePosition	pos;
! } QueueBackendStatus;
! 
! /*
!  * The AsyncQueueControl structure is protected by the AsyncQueueLock.
!  *
!  * In SHARED mode, backends will only inspect their own entries as well as
!  * head and tail pointers. Consequently we can allow a backend to update its
!  * own record while holding only a shared lock (since no other backend will
!  * inspect it).
!  *
!  * In EXCLUSIVE mode, backends can inspect the entries of other backends and
!  * also change head and tail pointers.
!  *
!  * In order to avoid deadlocks, whenever we need both locks, we always first
!  * get AsyncQueueLock and then AsyncCtlLock.
!  */
! typedef struct AsyncQueueControl
! {
! 	QueuePosition		head;		/* head points to the next free location */
! 	QueuePosition 		tail;		/* the global tail is equivalent to the
! 									   tail of the "slowest" backend */
! 	TimestampTz			lastQueueFullWarn;	/* when the queue is full we only
! 											   want to log that once in a
! 											   while */
! 	QueueBackendStatus	backend[1];	/* actually this one has as many entries as
! 									 * connections are allowed (MaxBackends) */
! 	/* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
! } AsyncQueueControl;
! 
! static AsyncQueueControl   *asyncQueueControl;
! static SlruCtlData			AsyncCtlData;
! 
! #define AsyncCtl					(&AsyncCtlData)
! #define NUM_ASYNC_BUFFER_SLOTS		4
! #define QUEUE_PAGESIZE				BLCKSZ
! #define QUEUE_FULL_WARN_INTERVAL	5000	/* warn at most once every 5s */
! 
! /*
!  * slru.c currently assumes that all filenames are four characters of hex
!  * digits. That means that we can use segments 0000 through FFFF.
!  * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us
!  * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0xFFFF.
!  *
!  * It's of course easy to enhance slru.c but those pages give us so much
!  * space already that it doesn't seem worth the trouble...
!  *
!  * It's a legal test case to define QUEUE_MAX_PAGE to a very small multiply of
!  * SLRU_PAGES_PER_SEGMENT to test queue full behaviour.
!  */
! #define QUEUE_MAX_PAGE			(SLRU_PAGES_PER_SEGMENT * 0xFFFF)
! 
! 
! static List *pendingNotifies = NIL;		/* list of Notifications */
  
  static List *upperPendingNotifies = NIL;		/* list of upper-xact lists */
  
+ static List *listenChannels = NIL;	/* list of channels we are listening to */
+ 
  /*
   * State for inbound notifies consists of two flags: one saying whether
   * the signal handler is currently allowed to call ProcessIncomingNotify
***************
*** 174,207 ****
  
  static void queue_listen(ListenActionKind action, const char *condname);
  static void Async_UnlistenOnExit(int code, Datum arg);
! static void Exec_Listen(Relation lRel, const char *relname);
! static void Exec_Unlisten(Relation lRel, const char *relname);
! static void Exec_UnlistenAll(Relation lRel);
! static void Send_Notify(Relation lRel);
  static void ProcessIncomingNotify(void);
! static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
! static bool AsyncExistsPendingNotify(const char *relname);
  static void ClearPendingActionsAndNotifies(void);
  
  
  /*
   * Async_Notify
   *
   *		This is executed by the SQL notify command.
   *
!  *		Adds the relation to the list of pending notifies.
   *		Actual notification happens during transaction commit.
   *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   */
  void
! Async_Notify(const char *relname)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Notify(%s)", relname);
  
  	/* no point in making duplicate entries in the list ... */
! 	if (!AsyncExistsPendingNotify(relname))
  	{
  		/*
  		 * The name list needs to live until end of transaction, so store it
  		 * in the transaction context.
--- 290,431 ----
  
  static void queue_listen(ListenActionKind action, const char *condname);
  static void Async_UnlistenOnExit(int code, Datum arg);
! static bool IsListeningOn(const char *channel);
! static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
! static void Exec_Listen(const char *channel);
! static void Exec_Unlisten(const char *channel);
! static void Exec_UnlistenAll(void);
! static void SignalBackends(void);
! static void Send_Notify(void);
! static bool AsyncPagePrecedesPhysically(int p, int q);
! static bool AsyncPagePrecedesLogically(int p, int q, int head);
! static bool asyncQueueAdvance(QueuePosition *position);
! static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
! static List *asyncQueueAddEntries(List *notifications);
! static List *asyncQueueGetEntriesByPage(QueuePosition *current,
! 										QueuePosition stop);
! static void asyncQueueAdvanceTail();
  static void ProcessIncomingNotify(void);
! static void NotifyMyFrontEnd(const char *channel,
! 							 const char *payload,
! 							 int32 dstPid);
! static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
  static void ClearPendingActionsAndNotifies(void);
  
+ static void asyncQueueAdvanceTail(void);
+ 
+ 
+ /*
+  * We will work on the page range of 0..(SLRU_PAGES_PER_SEGMENT * 0xFFFF).
+  * AsyncPagePrecedesPhysically just checks numerically without any magic if
+  * one page precedes another one.
+  *
+  * On the other hand, when AsyncPagePrecedesLogically does that check, it
+  * takes the current head page number into account. Now if we have wrapped
+  * around, it can happen that p precedes q, even though p > q (if the head page
+  * is in between the two).
+  */ 
+ static bool
+ AsyncPagePrecedesPhysically(int p, int q)
+ {
+ 	return p < q;
+ }
+ 
+ static bool
+ AsyncPagePrecedesLogically(int p, int q, int head)
+ {
+ 	if (p <= head && q <= head)
+ 		return p < q;
+ 	if (p > head && q > head)
+ 		return p < q;
+ 	if (p <= head)
+ 	{
+ 		Assert(q > head);
+ 		/* q is older */
+ 		return false;
+ 	}
+ 	else
+ 	{
+ 		Assert(p > head && q <= head);
+ 		/* p is older */
+ 		return true;
+ 	}
+ }
+ 
+ void
+ AsyncShmemInit(void)
+ {
+ 	bool	found;
+ 	int		slotno;
+ 	Size	size;
+ 
+ 	/*
+ 	 * Remember that sizeof(AsyncQueueControl) already contains one member of
+ 	 * QueueBackendStatus, so we only need to add the status space requirement
+ 	 * for MaxBackends-1 backends.
+ 	 */
+ 	size = mul_size(MaxBackends-1, sizeof(QueueBackendStatus));
+ 	size = add_size(size, sizeof(AsyncQueueControl));
+ 
+ 	asyncQueueControl = (AsyncQueueControl *)
+ 		ShmemInitStruct("Async Queue Control", size, &found);
+ 
+ 	if (!asyncQueueControl)
+ 		elog(ERROR, "out of memory");
+ 
+ 	if (!found)
+ 	{
+ 		int		i;
+ 		SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
+ 		SET_QUEUE_POS(QUEUE_TAIL, QUEUE_MAX_PAGE, 0);
+ 		for (i = 0; i < MaxBackends; i++)
+ 		{
+ 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+ 			QUEUE_BACKEND_PID(i) = INVALID_PID;
+ 		}
+ 	}
+ 
+ 	AsyncCtl->PagePrecedes = AsyncPagePrecedesPhysically;
+ 	SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFER_SLOTS, 0,
+ 				  AsyncCtlLock, "pg_notify");
+ 	AsyncCtl->do_fsync = false;
+ 	asyncQueueControl->lastQueueFullWarn = GetCurrentTimestamp();
+ 
+ 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+ 	LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
+ 	slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
+ 	SimpleLruWritePage(AsyncCtl, slotno, NULL);
+ 	LWLockRelease(AsyncCtlLock);
+ 	LWLockRelease(AsyncQueueLock);
+ 
+ 	SlruScanDirectory(AsyncCtl, QUEUE_MAX_PAGE, true);
+ }
+ 
  
  /*
   * Async_Notify
   *
   *		This is executed by the SQL notify command.
   *
!  *		Adds the channel to the list of pending notifies.
   *		Actual notification happens during transaction commit.
   *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   */
  void
! Async_Notify(const char *channel, const char *payload)
  {
+ 
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Notify(%s)", channel);
! 
! 	/*
! 	 * XXX - do we now need a guc parameter max_notifies_per_txn?
! 	 */ 
  
  	/* no point in making duplicate entries in the list ... */
! 	if (!AsyncExistsPendingNotify(channel, payload))
  	{
+ 		Notification *n;
  		/*
  		 * The name list needs to live until end of transaction, so store it
  		 * in the transaction context.
***************
*** 210,221 ****
  
  		oldcontext = MemoryContextSwitchTo(CurTransactionContext);
  
  		/*
! 		 * Ordering of the list isn't important.  We choose to put new entries
! 		 * on the front, as this might make duplicate-elimination a tad faster
! 		 * when the same condition is signaled many times in a row.
  		 */
! 		pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
  
  		MemoryContextSwitchTo(oldcontext);
  	}
--- 434,452 ----
  
  		oldcontext = MemoryContextSwitchTo(CurTransactionContext);
  
+ 		n = (Notification *) palloc(sizeof(Notification));
+ 		n->channel = pstrdup(channel);
+ 		if (payload)
+ 			n->payload = pstrdup(payload);
+ 		else
+ 			n->payload = "";
+ 		n->dstPid = INVALID_PID;
+ 
  		/*
! 		 * We want to preserve the order so we need to append every
! 		 * notification. See comments at AsyncExistsPendingNotify().
  		 */
! 		pendingNotifies = lappend(pendingNotifies, n);
  
  		MemoryContextSwitchTo(oldcontext);
  	}
***************
*** 259,270 ****
   *		This is executed by the SQL listen command.
   */
  void
! Async_Listen(const char *relname)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid);
  
! 	queue_listen(LISTEN_LISTEN, relname);
  }
  
  /*
--- 490,501 ----
   *		This is executed by the SQL listen command.
   */
  void
! Async_Listen(const char *channel)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
  
! 	queue_listen(LISTEN_LISTEN, channel);
  }
  
  /*
***************
*** 273,288 ****
   *		This is executed by the SQL unlisten command.
   */
  void
! Async_Unlisten(const char *relname)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid);
  
  	/* If we couldn't possibly be listening, no need to queue anything */
  	if (pendingActions == NIL && !unlistenExitRegistered)
  		return;
  
! 	queue_listen(LISTEN_UNLISTEN, relname);
  }
  
  /*
--- 504,519 ----
   *		This is executed by the SQL unlisten command.
   */
  void
! Async_Unlisten(const char *channel)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
  
  	/* If we couldn't possibly be listening, no need to queue anything */
  	if (pendingActions == NIL && !unlistenExitRegistered)
  		return;
  
! 	queue_listen(LISTEN_UNLISTEN, channel);
  }
  
  /*
***************
*** 306,313 ****
  /*
   * Async_UnlistenOnExit
   *
-  *		Clean up the pg_listener table at backend exit.
-  *
   *		This is executed if we have done any LISTENs in this backend.
   *		It might not be necessary anymore, if the user UNLISTENed everything,
   *		but we don't try to detect that case.
--- 537,542 ----
***************
*** 315,331 ****
  static void
  Async_UnlistenOnExit(int code, Datum arg)
  {
- 	/*
- 	 * We need to start/commit a transaction for the unlisten, but if there is
- 	 * already an active transaction we had better abort that one first.
- 	 * Otherwise we'd end up committing changes that probably ought to be
- 	 * discarded.
- 	 */
  	AbortOutOfAnyTransaction();
! 	/* Now we can do the unlisten */
! 	StartTransactionCommand();
! 	Async_UnlistenAll();
! 	CommitTransactionCommand();
  }
  
  /*
--- 544,551 ----
  static void
  Async_UnlistenOnExit(int code, Datum arg)
  {
  	AbortOutOfAnyTransaction();
! 	Exec_UnlistenAll();
  }
  
  /*
***************
*** 348,357 ****
  	/* We can deal with pending NOTIFY though */
  	foreach(p, pendingNotifies)
  	{
! 		const char *relname = (const char *) lfirst(p);
  
  		RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
! 							   relname, strlen(relname) + 1);
  	}
  
  	/*
--- 568,582 ----
  	/* We can deal with pending NOTIFY though */
  	foreach(p, pendingNotifies)
  	{
! 		AsyncQueueEntry qe;
! 		Notification   *n;
! 
! 		n = (Notification *) lfirst(p);
! 
! 		asyncQueueNotificationToEntry(n, &qe);
  
  		RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
! 							   &qe, sizeof(AsyncQueueEntry));
  	}
  
  	/*
***************
*** 367,386 ****
   *
   *		This is called at transaction commit.
   *
!  *		If there are pending LISTEN/UNLISTEN actions, insert or delete
!  *		tuples in pg_listener accordingly.
   *
!  *		If there are outbound notify requests in the pendingNotifies list,
!  *		scan pg_listener for matching tuples, and either signal the other
!  *		backend or send a message to our own frontend.
!  *
!  *		NOTE: we are still inside the current transaction, therefore can
!  *		piggyback on its committing of changes.
   */
  void
  AtCommit_Notify(void)
  {
- 	Relation	lRel;
  	ListCell   *p;
  
  	if (pendingActions == NIL && pendingNotifies == NIL)
--- 592,606 ----
   *
   *		This is called at transaction commit.
   *
!  *		If there are pending LISTEN/UNLISTEN actions, update our
!  *		"listenChannels" list.
   *
!  *		If there are outbound notify requests in the pendingNotifies list, add
!  *		them to the global queue and signal any backend that is listening.
   */
  void
  AtCommit_Notify(void)
  {
  	ListCell   *p;
  
  	if (pendingActions == NIL && pendingNotifies == NIL)
***************
*** 399,407 ****
  	if (Trace_notify)
  		elog(DEBUG1, "AtCommit_Notify");
  
- 	/* Acquire ExclusiveLock on pg_listener */
- 	lRel = heap_open(ListenerRelationId, ExclusiveLock);
- 
  	/* Perform any pending listen/unlisten actions */
  	foreach(p, pendingActions)
  	{
--- 619,624 ----
***************
*** 410,442 ****
  		switch (actrec->action)
  		{
  			case LISTEN_LISTEN:
! 				Exec_Listen(lRel, actrec->condname);
  				break;
  			case LISTEN_UNLISTEN:
! 				Exec_Unlisten(lRel, actrec->condname);
  				break;
  			case LISTEN_UNLISTEN_ALL:
! 				Exec_UnlistenAll(lRel);
  				break;
  		}
- 
- 		/* We must CCI after each action in case of conflicting actions */
- 		CommandCounterIncrement();
  	}
  
- 	/* Perform any pending notifies */
- 	if (pendingNotifies)
- 		Send_Notify(lRel);
- 
  	/*
! 	 * We do NOT release the lock on pg_listener here; we need to hold it
! 	 * until end of transaction (which is about to happen, anyway) to ensure
! 	 * that notified backends see our tuple updates when they look. Else they
! 	 * might disregard the signal, which would make the application programmer
! 	 * very unhappy.  Also, this prevents race conditions when we have just
! 	 * inserted a listening tuple.
  	 */
! 	heap_close(lRel, NoLock);
  
  	ClearPendingActionsAndNotifies();
  
--- 627,649 ----
  		switch (actrec->action)
  		{
  			case LISTEN_LISTEN:
! 				Exec_Listen(actrec->condname);
  				break;
  			case LISTEN_UNLISTEN:
! 				Exec_Unlisten(actrec->condname);
  				break;
  			case LISTEN_UNLISTEN_ALL:
! 				Exec_UnlistenAll();
  				break;
  		}
  	}
  
  	/*
! 	 * Perform any pending notifies, note that whatever we add to the queue is
! 	 * immediately visible to any other process.
  	 */
! 	if (pendingNotifies)
! 		Send_Notify();
  
  	ClearPendingActionsAndNotifies();
  
***************
*** 445,508 ****
  }
  
  /*
   * Exec_Listen --- subroutine for AtCommit_Notify
   *
!  *		Register the current backend as listening on the specified relation.
   */
  static void
! Exec_Listen(Relation lRel, const char *relname)
  {
! 	HeapScanDesc scan;
! 	HeapTuple	tuple;
! 	Datum		values[Natts_pg_listener];
! 	bool		nulls[Natts_pg_listener];
! 	NameData	condname;
! 	bool		alreadyListener = false;
  
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid);
! 
! 	/* Detect whether we are already listening on this relname */
! 	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
! 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! 	{
! 		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
! 
! 		if (listener->listenerpid == MyProcPid &&
! 			strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
! 		{
! 			alreadyListener = true;
! 			/* No need to scan the rest of the table */
! 			break;
! 		}
! 	}
! 	heap_endscan(scan);
  
! 	if (alreadyListener)
  		return;
  
  	/*
! 	 * OK to insert a new tuple
  	 */
- 	memset(nulls, false, sizeof(nulls));
- 
- 	namestrcpy(&condname, relname);
- 	values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname);
- 	values[Anum_pg_listener_listenerpid - 1] = Int32GetDatum(MyProcPid);
- 	values[Anum_pg_listener_notification - 1] = Int32GetDatum(0);		/* no notifies pending */
- 
- 	tuple = heap_form_tuple(RelationGetDescr(lRel), values, nulls);
- 
- 	simple_heap_insert(lRel, tuple);
  
! #ifdef NOT_USED					/* currently there are no indexes */
! 	CatalogUpdateIndexes(lRel, tuple);
! #endif
  
! 	heap_freetuple(tuple);
  
  	/*
! 	 * now that we are listening, make sure we will unlisten before dying.
  	 */
  	if (!unlistenExitRegistered)
  	{
--- 652,722 ----
  }
  
  /*
+  * This function is executed for every notification found in the queue in order
+  * to check if the current backend is listening on that channel. Not sure if we
+  * should further optimize this, for example convert to a sorted array and
+  * allow binary search on it...
+  */
+ static bool
+ IsListeningOn(const char *channel)
+ {
+ 	ListCell   *p;
+ 
+ 	foreach(p, listenChannels)
+ 	{
+ 		char *lchan = (char *) lfirst(p);
+ 		if (strcmp(lchan, channel) == 0)
+ 			/* already listening on this channel */
+ 			return true;
+ 	}
+ 	return false;
+ }
+ 
+ 
+ /*
   * Exec_Listen --- subroutine for AtCommit_Notify
   *
!  *		Register the current backend as listening on the specified channel.
   */
  static void
! Exec_Listen(const char *channel)
  {
! 	MemoryContext oldcontext;
  
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_Listen(%s,%d)", channel, MyProcPid);
  
! 	/* Detect whether we are already listening on this channel */
! 	if (IsListeningOn(channel))
  		return;
  
  	/*
! 	 * OK to insert to the list.
  	 */
  
! 	if (listenChannels == NIL)
! 	{
! 		/*
! 		 * This is our first LISTEN, establish our pointer.
! 		 */
! 		LWLockAcquire(AsyncQueueLock, LW_SHARED);
! 		QUEUE_BACKEND_POS(MyBackendId) = QUEUE_HEAD;
! 		QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
! 		LWLockRelease(AsyncQueueLock);
! 		/*
! 		 * Actually this is only necessary if we are the first listener
! 		 * (The tail pointer needs to be identical with the pointer of at
! 		 * least one backend).
! 		 */
! 		asyncQueueAdvanceTail();
! 	}
  
! 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
! 	listenChannels = lappend(listenChannels, pstrdup(channel));
! 	MemoryContextSwitchTo(oldcontext);
  
  	/*
! 	 * Now that we are listening, make sure we will unlisten before dying.
  	 */
  	if (!unlistenExitRegistered)
  	{
***************
*** 514,551 ****
  /*
   * Exec_Unlisten --- subroutine for AtCommit_Notify
   *
!  *		Remove the current backend from the list of listening backends
!  *		for the specified relation.
   */
  static void
! Exec_Unlisten(Relation lRel, const char *relname)
  {
! 	HeapScanDesc scan;
! 	HeapTuple	tuple;
  
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid);
  
! 	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
! 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
  	{
! 		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
! 
! 		if (listener->listenerpid == MyProcPid &&
! 			strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
  		{
- 			/* Found the matching tuple, delete it */
- 			simple_heap_delete(lRel, &tuple->t_self);
- 
  			/*
! 			 * We assume there can be only one match, so no need to scan the
! 			 * rest of the table
  			 */
! 			break;
  		}
  	}
! 	heap_endscan(scan);
! 
  	/*
  	 * We do not complain about unlistening something not being listened;
  	 * should we?
--- 728,780 ----
  /*
   * Exec_Unlisten --- subroutine for AtCommit_Notify
   *
!  *		Remove a specified channel from "listenChannel".
   */
  static void
! Exec_Unlisten(const char *channel)
  {
! 	ListCell   *p;
! 	ListCell   *prev = NULL;
  
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_Unlisten(%s,%d)", channel, MyProcPid);
  
! 	/* Detect whether we are already listening on this channel */
! 	foreach(p, listenChannels)
  	{
! 		char *lchan = (char *) lfirst(p);
! 		if (strcmp(lchan, channel) == 0)
  		{
  			/*
! 			 * Since the list is living in the TopMemoryContext, we free
! 			 * the memory. The ListCell is freed by list_delete_cell().
  			 */
! 			pfree(lchan);
! 			listenChannels = list_delete_cell(listenChannels, p, prev);
! 			if (listenChannels == NIL)
! 			{
! 				bool advanceTail = false;
! 				/*
! 				 * This backend is not listening anymore.
! 				 */
! 				LWLockAcquire(AsyncQueueLock, LW_SHARED);
! 				QUEUE_BACKEND_PID(MyBackendId) = INVALID_PID;
! 
! 				/*
! 				 * If we have been the last backend, advance the tail pointer.
! 				 */
! 				if (QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL))
! 					advanceTail = true;
! 				LWLockRelease(AsyncQueueLock);
! 
! 				if (advanceTail)
! 					asyncQueueAdvanceTail();
! 			}
! 			return;
  		}
+ 		prev = p;
  	}
! 	
  	/*
  	 * We do not complain about unlistening something not being listened;
  	 * should we?
***************
*** 555,677 ****
  /*
   * Exec_UnlistenAll --- subroutine for AtCommit_Notify
   *
!  *		Update pg_listener to unlisten all relations for this backend.
   */
  static void
! Exec_UnlistenAll(Relation lRel)
  {
! 	HeapScanDesc scan;
! 	HeapTuple	lTuple;
! 	ScanKeyData key[1];
  
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_UnlistenAll");
  
! 	/* Find and delete all entries with my listenerPID */
! 	ScanKeyInit(&key[0],
! 				Anum_pg_listener_listenerpid,
! 				BTEqualStrategyNumber, F_INT4EQ,
! 				Int32GetDatum(MyProcPid));
! 	scan = heap_beginscan(lRel, SnapshotNow, 1, key);
  
! 	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! 		simple_heap_delete(lRel, &lTuple->t_self);
  
! 	heap_endscan(scan);
  }
  
  /*
!  * Send_Notify --- subroutine for AtCommit_Notify
!  *
!  *		Scan pg_listener for tuples matching our pending notifies, and
!  *		either signal the other backend or send a message to our own frontend.
   */
  static void
! Send_Notify(Relation lRel)
  {
! 	TupleDesc	tdesc = RelationGetDescr(lRel);
! 	HeapScanDesc scan;
! 	HeapTuple	lTuple,
! 				rTuple;
! 	Datum		value[Natts_pg_listener];
! 	bool		repl[Natts_pg_listener],
! 				nulls[Natts_pg_listener];
! 
! 	/* preset data to update notify column to MyProcPid */
! 	memset(nulls, false, sizeof(nulls));
! 	memset(repl, false, sizeof(repl));
! 	repl[Anum_pg_listener_notification - 1] = true;
! 	memset(value, 0, sizeof(value));
! 	value[Anum_pg_listener_notification - 1] = Int32GetDatum(MyProcPid);
! 
! 	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
! 
! 	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! 	{
! 		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
! 		char	   *relname = NameStr(listener->relname);
! 		int32		listenerPID = listener->listenerpid;
  
! 		if (!AsyncExistsPendingNotify(relname))
! 			continue;
  
! 		if (listenerPID == MyProcPid)
! 		{
! 			/*
! 			 * Self-notify: no need to bother with table update. Indeed, we
! 			 * *must not* clear the notification field in this path, or we
! 			 * could lose an outside notify, which'd be bad for applications
! 			 * that ignore self-notify messages.
! 			 */
! 			if (Trace_notify)
! 				elog(DEBUG1, "AtCommit_Notify: notifying self");
  
! 			NotifyMyFrontEnd(relname, listenerPID);
! 		}
! 		else
  		{
! 			if (Trace_notify)
! 				elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
! 					 listenerPID);
  
! 			/*
! 			 * If someone has already notified this listener, we don't bother
! 			 * modifying the table, but we do still send a NOTIFY_INTERRUPT
! 			 * signal, just in case that backend missed the earlier signal for
! 			 * some reason.  It's OK to send the signal first, because the
! 			 * other guy can't read pg_listener until we unlock it.
! 			 *
! 			 * Note: we don't have the other guy's BackendId available, so
! 			 * this will incur a search of the ProcSignal table.  That's
! 			 * probably not worth worrying about.
! 			 */
! 			if (SendProcSignal(listenerPID, PROCSIG_NOTIFY_INTERRUPT,
! 							   InvalidBackendId) < 0)
! 			{
! 				/*
! 				 * Get rid of pg_listener entry if it refers to a PID that no
! 				 * longer exists.  Presumably, that backend crashed without
! 				 * deleting its pg_listener entries. This code used to only
! 				 * delete the entry if errno==ESRCH, but as far as I can see
! 				 * we should just do it for any failure (certainly at least
! 				 * for EPERM too...)
! 				 */
! 				simple_heap_delete(lRel, &lTuple->t_self);
! 			}
! 			else if (listener->notification == 0)
! 			{
! 				/* Rewrite the tuple with my PID in notification column */
! 				rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
! 				simple_heap_update(lRel, &lTuple->t_self, rTuple);
! 
! #ifdef NOT_USED					/* currently there are no indexes */
! 				CatalogUpdateIndexes(lRel, rTuple);
! #endif
! 			}
  		}
  	}
  
! 	heap_endscan(scan);
  }
  
  /*
--- 784,1028 ----
  /*
   * Exec_UnlistenAll --- subroutine for AtCommit_Notify
   *
!  *		Unlisten on all channels for this backend.
   */
  static void
! Exec_UnlistenAll(void)
  {
! 	bool advanceTail = false;
  
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_UnlistenAll(%d)", MyProcPid);
! 
! 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
! 	QUEUE_BACKEND_PID(MyBackendId) = INVALID_PID;
! 
! 	/*
! 	 * Since the list is living in the TopMemoryContext, we free the memory.
! 	 */
! 	list_free_deep(listenChannels);
! 	listenChannels = NIL;
! 
! 	/*
! 	 * If we have been the last backend, advance the tail pointer.
! 	 */
! 	if (QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL))
! 		advanceTail = true;
! 	LWLockRelease(AsyncQueueLock);
! 
! 	if (advanceTail)
! 		asyncQueueAdvanceTail();
! }
  
! static bool
! asyncQueueIsFull()
! {
! 	QueuePosition	lookahead = QUEUE_HEAD;
  
! 	/*
! 	 * Check what happens if we wrote one entry. Would we go to a new page? If
! 	 * not, then our queue can not be full (because we can still fill at least
! 	 * the current page with at least one more entry).
! 	 */
! 	if (!asyncQueueAdvance(&lookahead))
! 		return false;
  
! 	/*
! 	 * The queue is full if with a switch to a new page we reach the page
! 	 * of the tail pointer.
! 	 */
! 	return QUEUE_POS_PAGE(lookahead) == QUEUE_POS_PAGE(QUEUE_TAIL);
  }
  
  /*
!  * The function advances the position to the next entry. In case we jump to
!  * a new page the function returns true, else false.
   */
+ static bool
+ asyncQueueAdvance(QueuePosition *position)
+ {
+ 	int		pageno = QUEUE_POS_PAGE(*position);
+ 	int		offset = QUEUE_POS_OFFSET(*position);
+ 	bool	pageJump = false;
+ 
+ 	/*
+ 	 * Move to the next writing position: First jump over what we have just
+ 	 * written or read.
+ 	 */
+ 	offset += sizeof(AsyncQueueEntry);
+ 	Assert(offset < QUEUE_PAGESIZE);
+ 
+ 	/*
+ 	 * In a second step check if another entry can be written to the page. If
+ 	 * it does, stay here, we have reached the next position. If not, then we
+ 	 * need to move on to the next page.
+ 	 */
+ 	if (offset + sizeof(AsyncQueueEntry) >= QUEUE_PAGESIZE)
+ 	{
+ 		pageno++;
+ 		if (pageno > QUEUE_MAX_PAGE)
+ 			/* wrap around */
+ 			pageno = 0;
+ 		offset = 0;
+ 		pageJump = true;
+ 	}
+ 
+ 	SET_QUEUE_POS(*position, pageno, offset);
+ 	return pageJump;
+ }
+ 
  static void
! asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
  {
! 		Assert(n->channel);
! 		Assert(n->payload);
  
! 		qe->srcPid = MyProcPid;
! 		qe->dboid = MyDatabaseId;
! 		strcpy(qe->channel, n->channel);
! 		strcpy(qe->payload, n->payload);
! }
  
! static List *
! asyncQueueAddEntries(List *notifications)
! {
! 	int			pageno;
! 	int			offset;
! 	int			slotno;
  
! 	/*
! 	 * Note that we are holding exclusive AsyncQueueLock already.
! 	 */
! 	LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
! 	pageno = QUEUE_POS_PAGE(QUEUE_HEAD);
! 	slotno = SimpleLruReadPage(AsyncCtl, pageno,
! 							   true, InvalidTransactionId);
! 	AsyncCtl->shared->page_dirty[slotno] = true;
! 
! 	do
! 	{
! 		AsyncQueueEntry	qe;
! 		Notification   *n;
! 
! 		if (asyncQueueIsFull())
  		{
! 			/* document that we will not go into the if command further down */
! 			Assert(QUEUE_POS_OFFSET(QUEUE_HEAD) != 0);
! 			break;
! 		}
  
! 		n = (Notification *) linitial(notifications);
! 
! 		asyncQueueNotificationToEntry(n, &qe);
! 
! 		offset = QUEUE_POS_OFFSET(QUEUE_HEAD);
! 		Assert(offset + sizeof(AsyncQueueEntry) < QUEUE_PAGESIZE);
! 		memcpy((char*) AsyncCtl->shared->page_buffer[slotno] + offset,
! 			   &qe, sizeof(AsyncQueueEntry));
! 
! 		notifications = list_delete_first(notifications);
! 
! 	} while (!asyncQueueAdvance(&(QUEUE_HEAD)) && notifications != NIL);
! 
! 	if (QUEUE_POS_OFFSET(QUEUE_HEAD) == 0)
! 	{
! 		/*
! 		 * If the next entry needs to go to a new page, prepare that page
! 		 * already.
! 		 */
! 		slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
! 		AsyncCtl->shared->page_dirty[slotno] = true;
! 		SimpleLruWritePage(AsyncCtl, slotno, NULL);
! 	}
! 	LWLockRelease(AsyncCtlLock);
! 
! 	return notifications;
! }
! 
! static void
! asyncQueueFullWarning()
! {
! 	/*
! 	 * Caller must hold exclusive AsyncQueueLock.
! 	 */
! 	TimestampTz t = GetCurrentTimestamp();
! 	if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFullWarn,
! 								   t, QUEUE_FULL_WARN_INTERVAL))
! 	{
! 		ereport(WARNING, (errmsg("pg_notify ring buffer is full.")));
! 		asyncQueueControl->lastQueueFullWarn = t;
! 	}
! }
! 
! /*
!  * Send_Notify --- subroutine for AtCommit_Notify
!  *
!  * Add the pending notifications to the queue and signal the listening
!  * backends.
!  *
!  * A full queue is very uncommon and should really not happen, given that we
!  * have so much space available in our slru pages. Nevertheless we need to
!  * deal with this possibility. Note that when we get here we are in the process
!  * of committing our transaction, we have already committed to clog but we have
!  * not yet released all of our resources, and further block them if we block
!  * here...
!  */
! static void
! Send_Notify()
! {
! 	while (pendingNotifies != NIL)
! 	{
! 		LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
! 		while (asyncQueueIsFull())
! 		{
! 			asyncQueueFullWarning();
! 			LWLockRelease(AsyncQueueLock);
! 			SignalBackends();
! 			ProcessIncomingNotify();
! 			asyncQueueAdvanceTail();
! 			pg_usleep(100 * 1000L); /* 100ms */
! 			LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
  		}
+ 		Assert(pendingNotifies != NIL);
+ 		pendingNotifies = asyncQueueAddEntries(pendingNotifies);
+ 		LWLockRelease(AsyncQueueLock);
  	}
+ 	SignalBackends();
+ }
+ 
  
! static void
! SignalBackends(void)
! {
! 	ListCell	   *p1, *p2;
! 	int				i;
! 	int32			pid;
! 	List		   *pids = NIL;
! 	List		   *ids = NIL;
! 
! 	/* Signal everybody who is LISTENing to any channel*/
! 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
! 	for (i = 0; i < MaxBackends; i++)
! 	{
! 		pid = QUEUE_BACKEND_PID(i);
! 		if (pid != INVALID_PID)
! 		{
! 			pids = lappend_int(pids, pid);
! 			ids = lappend_int(ids, i);
! 		}
! 	}
! 	LWLockRelease(AsyncQueueLock);
! 	
! 	forboth(p1, pids, p2, ids)
! 	{
! 		pid = (int32) lfirst_int(p1);
! 		i = lfirst_int(p2);
! 		/* XXX
! 		 * Should we check for failure? Can it happen that a backend
! 		 * has crashed without the postmaster starting over?
! 		 */
! 		SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, i);
! 	}
  }
  
  /*
***************
*** 940,968 ****
  }
  
  /*
   * ProcessIncomingNotify
   *
   *		Deal with arriving NOTIFYs from other backends.
   *		This is called either directly from the PROCSIG_NOTIFY_INTERRUPT
   *		signal handler, or the next time control reaches the outer idle loop.
!  *		Scan pg_listener for arriving notifies, report them to my front end,
!  *		and clear the notification field in pg_listener until next time.
   *
!  *		NOTE: since we are outside any transaction, we must create our own.
   */
  static void
  ProcessIncomingNotify(void)
  {
! 	Relation	lRel;
! 	TupleDesc	tdesc;
! 	ScanKeyData key[1];
! 	HeapScanDesc scan;
! 	HeapTuple	lTuple,
! 				rTuple;
! 	Datum		value[Natts_pg_listener];
! 	bool		repl[Natts_pg_listener],
! 				nulls[Natts_pg_listener];
! 	bool		catchup_enabled;
  
  	/* Must prevent catchup interrupt while I am running */
  	catchup_enabled = DisableCatchupInterrupt();
--- 1291,1398 ----
  }
  
  /*
+  * This function will ask for a page with ReadOnly access and once we have the
+  * lock already, we read the whole content and pass back a list of
+  * Notifications that the calling function will deliver then.
+  *
+  * We stop if we have either reached the stop position or go to a new page.
+  */
+ static List *
+ asyncQueueGetEntriesByPage(QueuePosition *current, QueuePosition stop)
+ {
+ 	int				slotno;
+ 	AsyncQueueEntry	qe;
+ 	List	  	   *notifications = NIL;
+ 	Notification   *n;
+ 
+ 	if (QUEUE_POS_EQUAL(*current, stop))
+ 		return NIL;
+ 
+ 	slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, current->page,
+ 										InvalidTransactionId);
+ 	do {
+ 		memcpy(&qe, 
+ 			   (char *) (AsyncCtl->shared->page_buffer[slotno]) + current->offset,
+ 			   sizeof(AsyncQueueEntry));
+ 
+ 		if (qe.dboid == MyDatabaseId && IsListeningOn(qe.channel))
+ 		{
+ 			n = (Notification *) palloc(sizeof(Notification));
+ 			n->channel = pstrdup(qe.channel);
+ 			n->payload = pstrdup(qe.payload);
+ 			n->srcPid = qe.srcPid;
+         
+ 			notifications = lappend(notifications, n);
+ 		}
+ 
+ 		/*
+ 		 * Actually the call to asyncQueueAdvance just jumps over what we have
+ 		 * just read. If there is no more space for the next record on the
+ 		 * current page, it will also go to the beginning of the next page.
+ 		 */
+ 	} while(!asyncQueueAdvance(current) && !QUEUE_POS_EQUAL(*current, stop));
+ 
+ 	LWLockRelease(AsyncCtlLock);
+ 
+ 	return notifications;
+ }
+ 
+ static void
+ asyncQueueAdvanceTail()
+ {
+ 	QueuePosition	min;
+ 	int				i;
+ 	int				tailPage;
+ 	int				headPage;
+ 
+ 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+ 	min = QUEUE_HEAD;
+ 	for (i = 0; i < MaxBackends; i++)
+ 		if (QUEUE_BACKEND_PID(i) != INVALID_PID)
+ 			min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i), QUEUE_HEAD);
+ 
+ 	tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
+ 	headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
+ 	QUEUE_TAIL = min;
+ 	LWLockRelease(AsyncQueueLock);
+ 
+ 	if (QUEUE_POS_PAGE(min) == tailPage)
+ 		return;
+ 
+ 	/* This is our wraparound check */
+ 	if (AsyncPagePrecedesLogically(tailPage, QUEUE_POS_PAGE(min), headPage)
+ 		&& AsyncPagePrecedesPhysically(tailPage, headPage))
+ 	{
+ 		/*
+ 		 * SimpleLruTruncate() will ask for AsyncCtlLock but will also
+ 		 * release the lock again.
+ 		 */
+ 		SimpleLruTruncate(AsyncCtl, QUEUE_POS_PAGE(min));
+ 	}
+ }
+ 
+ /*
   * ProcessIncomingNotify
   *
   *		Deal with arriving NOTIFYs from other backends.
   *		This is called either directly from the PROCSIG_NOTIFY_INTERRUPT
   *		signal handler, or the next time control reaches the outer idle loop.
!  *		Scan the queue for arriving notifications and report them to my front
!  *		end.
   *
!  *		NOTE: we are outside of any transaction here.
   */
  static void
  ProcessIncomingNotify(void)
  {
! 	QueuePosition	pos;
! 	QueuePosition	oldpos;
! 	QueuePosition	head;
! 	List		   *notifications;
! 	bool			catchup_enabled;
! 	bool			advanceTail = false;
! 
! 	Assert(GetCurrentTransactionIdIfAny() == InvalidTransactionId);
  
  	/* Must prevent catchup interrupt while I am running */
  	catchup_enabled = DisableCatchupInterrupt();
***************
*** 974,1037 ****
  
  	notifyInterruptOccurred = 0;
  
! 	StartTransactionCommand();
  
! 	lRel = heap_open(ListenerRelationId, ExclusiveLock);
! 	tdesc = RelationGetDescr(lRel);
  
! 	/* Scan only entries with my listenerPID */
! 	ScanKeyInit(&key[0],
! 				Anum_pg_listener_listenerpid,
! 				BTEqualStrategyNumber, F_INT4EQ,
! 				Int32GetDatum(MyProcPid));
! 	scan = heap_beginscan(lRel, SnapshotNow, 1, key);
! 
! 	/* Prepare data for rewriting 0 into notification field */
! 	memset(nulls, false, sizeof(nulls));
! 	memset(repl, false, sizeof(repl));
! 	repl[Anum_pg_listener_notification - 1] = true;
! 	memset(value, 0, sizeof(value));
! 	value[Anum_pg_listener_notification - 1] = Int32GetDatum(0);
! 
! 	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! 	{
! 		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
! 		char	   *relname = NameStr(listener->relname);
! 		int32		sourcePID = listener->notification;
  
! 		if (sourcePID != 0)
  		{
! 			/* Notify the frontend */
! 
! 			if (Trace_notify)
! 				elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
! 					 relname, (int) sourcePID);
! 
! 			NotifyMyFrontEnd(relname, sourcePID);
! 
! 			/*
! 			 * Rewrite the tuple with 0 in notification column.
! 			 */
! 			rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
! 			simple_heap_update(lRel, &lTuple->t_self, rTuple);
! 
! #ifdef NOT_USED					/* currently there are no indexes */
! 			CatalogUpdateIndexes(lRel, rTuple);
! #endif
  		}
! 	}
! 	heap_endscan(scan);
! 
! 	/*
! 	 * We do NOT release the lock on pg_listener here; we need to hold it
! 	 * until end of transaction (which is about to happen, anyway) to ensure
! 	 * that other backends see our tuple updates when they look. Otherwise, a
! 	 * transaction started after this one might mistakenly think it doesn't
! 	 * need to send this backend a new NOTIFY.
! 	 */
! 	heap_close(lRel, NoLock);
  
! 	CommitTransactionCommand();
  
  	/*
  	 * Must flush the notify messages to ensure frontend gets them promptly.
--- 1404,1452 ----
  
  	notifyInterruptOccurred = 0;
  
! 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
! 	pos = QUEUE_BACKEND_POS(MyBackendId);
! 	oldpos = QUEUE_BACKEND_POS(MyBackendId);
! 	head = QUEUE_HEAD;
! 	LWLockRelease(AsyncQueueLock);
  
! 	/* Nothing to do, we have read all notifications already. */
! 	if (QUEUE_POS_EQUAL(pos, head))
! 		return;
  
! 	do 
! 	{
! 		ListCell	   *lc;
! 		Notification   *n;
  
! 		/*
! 		 * Our stop position is what we found to be the head's position when
! 		 * we entered this function. It might have changed already. But if it
! 		 * has, we will receive (or have already received and queued) another
! 		 * signal and come here again.
! 		 *
! 		 * We are not holding AsyncQueueLock here! The queue can only extend
! 		 * beyond the head pointer (see above) and we leave our backend's
! 		 * pointer where it is so nobody will truncate or rewrite pages under
! 		 * us.
! 		 */
! 		notifications = asyncQueueGetEntriesByPage(&pos, head);
! 		foreach(lc, notifications)
  		{
! 			n = (Notification *) lfirst(lc);
! 			NotifyMyFrontEnd(n->channel, n->payload, n->srcPid);
  		}
! 	} while (!QUEUE_POS_EQUAL(pos, head));
  
! 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
! 	QUEUE_BACKEND_POS(MyBackendId) = pos;
! 	if (QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL))
! 		advanceTail = true;
! 	LWLockRelease(AsyncQueueLock);
! 
! 	if (advanceTail)
! 		/* Move forward the tail pointer and try to truncate. */
! 		asyncQueueAdvanceTail();
  
  	/*
  	 * Must flush the notify messages to ensure frontend gets them promptly.
***************
*** 1051,1070 ****
   * Send NOTIFY message to my front end.
   */
  static void
! NotifyMyFrontEnd(char *relname, int32 listenerPID)
  {
  	if (whereToSendOutput == DestRemote)
  	{
  		StringInfoData buf;
  
  		pq_beginmessage(&buf, 'A');
! 		pq_sendint(&buf, listenerPID, sizeof(int32));
! 		pq_sendstring(&buf, relname);
  		if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
! 		{
! 			/* XXX Add parameter string here later */
! 			pq_sendstring(&buf, "");
! 		}
  		pq_endmessage(&buf);
  
  		/*
--- 1466,1482 ----
   * Send NOTIFY message to my front end.
   */
  static void
! NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
  {
  	if (whereToSendOutput == DestRemote)
  	{
  		StringInfoData buf;
  
  		pq_beginmessage(&buf, 'A');
! 		pq_sendint(&buf, srcPid, sizeof(int32));
! 		pq_sendstring(&buf, channel);
  		if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
! 			pq_sendstring(&buf, payload);
  		pq_endmessage(&buf);
  
  		/*
***************
*** 1074,1096 ****
  		 */
  	}
  	else
! 		elog(INFO, "NOTIFY for %s", relname);
  }
  
  /* Does pendingNotifies include the given relname? */
  static bool
! AsyncExistsPendingNotify(const char *relname)
  {
  	ListCell   *p;
  
! 	foreach(p, pendingNotifies)
! 	{
! 		const char *prelname = (const char *) lfirst(p);
  
! 		if (strcmp(prelname, relname) == 0)
  			return true;
  	}
  
  	return false;
  }
  
--- 1486,1538 ----
  		 */
  	}
  	else
! 		elog(INFO, "NOTIFY for %s", channel);
  }
  
  /* Does pendingNotifies include the given relname? */
  static bool
! AsyncExistsPendingNotify(const char *channel, const char *payload)
  {
  	ListCell   *p;
+ 	Notification *n;
  
! 	if (pendingNotifies == NIL)
! 		return false;
! 
! 	if (payload == NULL)
! 		payload = "";
! 
! 	/*
! 	 * We need to append new elements to the end of the list in order to keep
! 	 * the order. However, on the other hand we'd like to check the list
! 	 * backwards in order to make duplicate-elimination a tad faster when the
! 	 * same condition is signaled many times in a row. So as a compromise we
! 	 * check the tail element first which we can access directly. If this
! 	 * doesn't match, we check the rest of the list.
! 	 */
  
! 	n = (Notification *) llast(pendingNotifies);
! 	if (strcmp(n->channel, channel) == 0)
! 	{
! 		Assert(n->payload != NULL);
! 		if (strcmp(n->payload, payload) == 0)
  			return true;
  	}
  
+ 	for(p = list_head(pendingNotifies);
+ 		p != list_tail(pendingNotifies);
+ 		p = lnext(p))
+ 	{
+ 		n = (Notification *) lfirst(p);
+ 
+ 		if (strcmp(n->channel, channel) == 0)
+ 		{
+ 			Assert(n->payload != NULL);
+ 			if (strcmp(n->payload, payload) == 0)
+ 				return true;
+ 		}
+ 	}
+ 
  	return false;
  }
  
***************
*** 1124,1128 ****
  	 * there is any significant delay before I commit.	OK for now because we
  	 * disallow COMMIT PREPARED inside a transaction block.)
  	 */
! 	Async_Notify((char *) recdata);
  }
--- 1566,1576 ----
  	 * there is any significant delay before I commit.	OK for now because we
  	 * disallow COMMIT PREPARED inside a transaction block.)
  	 */
! 	AsyncQueueEntry		*qe = (AsyncQueueEntry *) recdata;
! 
! 	Assert(len == sizeof(AsyncQueueEntry));
! 	Assert(qe->dboid == MyDatabaseId);
! 
! 	Async_Notify(qe->channel, qe->payload);
  }
+ 
diff -cr cvs/src/backend/nodes/copyfuncs.c cvs.build/src/backend/nodes/copyfuncs.c
*** cvs/src/backend/nodes/copyfuncs.c	2009-10-31 14:47:48.000000000 +0100
--- cvs.build/src/backend/nodes/copyfuncs.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 2761,2766 ****
--- 2761,2767 ----
  	NotifyStmt *newnode = makeNode(NotifyStmt);
  
  	COPY_STRING_FIELD(conditionname);
+ 	COPY_STRING_FIELD(payload);
  
  	return newnode;
  }
diff -cr cvs/src/backend/nodes/equalfuncs.c cvs.build/src/backend/nodes/equalfuncs.c
*** cvs/src/backend/nodes/equalfuncs.c	2009-10-31 14:47:48.000000000 +0100
--- cvs.build/src/backend/nodes/equalfuncs.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 1321,1326 ****
--- 1321,1327 ----
  _equalNotifyStmt(NotifyStmt *a, NotifyStmt *b)
  {
  	COMPARE_STRING_FIELD(conditionname);
+ 	COMPARE_STRING_FIELD(payload);
  
  	return true;
  }
diff -cr cvs/src/backend/nodes/outfuncs.c cvs.build/src/backend/nodes/outfuncs.c
*** cvs/src/backend/nodes/outfuncs.c	2009-10-31 14:47:48.000000000 +0100
--- cvs.build/src/backend/nodes/outfuncs.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 1810,1815 ****
--- 1810,1816 ----
  	WRITE_NODE_TYPE("NOTIFY");
  
  	WRITE_STRING_FIELD(conditionname);
+ 	WRITE_STRING_FIELD(payload);
  }
  
  static void
diff -cr cvs/src/backend/nodes/readfuncs.c cvs.build/src/backend/nodes/readfuncs.c
*** cvs/src/backend/nodes/readfuncs.c	2009-10-31 14:47:48.000000000 +0100
--- cvs.build/src/backend/nodes/readfuncs.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 231,236 ****
--- 231,237 ----
  	READ_LOCALS(NotifyStmt);
  
  	READ_STRING_FIELD(conditionname);
+ 	READ_STRING_FIELD(payload);
  
  	READ_DONE();
  }
diff -cr cvs/src/backend/parser/gram.y cvs.build/src/backend/parser/gram.y
*** cvs/src/backend/parser/gram.y	2009-11-11 01:09:14.000000000 +0100
--- cvs.build/src/backend/parser/gram.y	2009-11-11 01:11:08.000000000 +0100
***************
*** 393,399 ****
  %type <boolean> opt_varying opt_timezone
  
  %type <ival>	Iconst SignedIconst
! %type <str>		Sconst comment_text
  %type <str>		RoleId opt_granted_by opt_boolean ColId_or_Sconst
  %type <list>	var_list
  %type <str>		ColId ColLabel var_name type_function_name param_name
--- 393,399 ----
  %type <boolean> opt_varying opt_timezone
  
  %type <ival>	Iconst SignedIconst
! %type <str>		Sconst comment_text notify_payload
  %type <str>		RoleId opt_granted_by opt_boolean ColId_or_Sconst
  %type <list>	var_list
  %type <str>		ColId ColLabel var_name type_function_name param_name
***************
*** 5977,5986 ****
   *
   *****************************************************************************/
  
! NotifyStmt: NOTIFY ColId
  				{
  					NotifyStmt *n = makeNode(NotifyStmt);
  					n->conditionname = $2;
  					$$ = (Node *)n;
  				}
  		;
--- 5977,5992 ----
   *
   *****************************************************************************/
  
! notify_payload:
! 			Sconst								{ $$ = $1; }
! 			| /*EMPTY*/							{ $$ = NULL; }
! 		;
! 
! NotifyStmt: NOTIFY ColId notify_payload
  				{
  					NotifyStmt *n = makeNode(NotifyStmt);
  					n->conditionname = $2;
+ 					n->payload = $3;
  					$$ = (Node *)n;
  				}
  		;
diff -cr cvs/src/backend/storage/ipc/ipci.c cvs.build/src/backend/storage/ipc/ipci.c
*** cvs/src/backend/storage/ipc/ipci.c	2009-09-06 09:06:21.000000000 +0200
--- cvs.build/src/backend/storage/ipc/ipci.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 219,224 ****
--- 219,225 ----
  	 */
  	BTreeShmemInit();
  	SyncScanShmemInit();
+ 	AsyncShmemInit();
  
  #ifdef EXEC_BACKEND
  
diff -cr cvs/src/backend/tcop/utility.c cvs.build/src/backend/tcop/utility.c
*** cvs/src/backend/tcop/utility.c	2009-10-31 14:47:55.000000000 +0100
--- cvs.build/src/backend/tcop/utility.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 875,882 ****
  		case T_NotifyStmt:
  			{
  				NotifyStmt *stmt = (NotifyStmt *) parsetree;
! 
! 				Async_Notify(stmt->conditionname);
  			}
  			break;
  
--- 875,886 ----
  		case T_NotifyStmt:
  			{
  				NotifyStmt *stmt = (NotifyStmt *) parsetree;
! 				if (stmt->payload
! 					&& strlen(stmt->payload) > NOTIFY_PAYLOAD_MAX_LENGTH - 1)
! 					ereport(ERROR,
! 							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
! 							 errmsg("payload string too long")));
! 				Async_Notify(stmt->conditionname, stmt->payload);
  			}
  			break;
  
diff -cr cvs/src/bin/initdb/initdb.c cvs.build/src/bin/initdb/initdb.c
*** cvs/src/bin/initdb/initdb.c	2009-09-06 09:06:44.000000000 +0200
--- cvs.build/src/bin/initdb/initdb.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 2476,2481 ****
--- 2476,2482 ----
  		"pg_xlog",
  		"pg_xlog/archive_status",
  		"pg_clog",
+ 		"pg_notify",
  		"pg_subtrans",
  		"pg_twophase",
  		"pg_multixact/members",
diff -cr cvs/src/bin/psql/common.c cvs.build/src/bin/psql/common.c
*** cvs/src/bin/psql/common.c	2009-05-10 19:50:30.000000000 +0200
--- cvs.build/src/bin/psql/common.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 555,562 ****
  
  	while ((notify = PQnotifies(pset.db)))
  	{
! 		fprintf(pset.queryFout, _("Asynchronous notification \"%s\" received from server process with PID %d.\n"),
! 				notify->relname, notify->be_pid);
  		fflush(pset.queryFout);
  		PQfreemem(notify);
  	}
--- 555,562 ----
  
  	while ((notify = PQnotifies(pset.db)))
  	{
! 		fprintf(pset.queryFout, _("Asynchronous notification \"%s\" (%s) received from server process with PID %d.\n"),
! 				notify->relname, notify->extra, notify->be_pid);
  		fflush(pset.queryFout);
  		PQfreemem(notify);
  	}
diff -cr cvs/src/include/access/slru.h cvs.build/src/include/access/slru.h
*** cvs/src/include/access/slru.h	2009-05-10 19:50:35.000000000 +0200
--- cvs.build/src/include/access/slru.h	2009-11-11 01:11:08.000000000 +0100
***************
*** 16,21 ****
--- 16,40 ----
  #include "access/xlogdefs.h"
  #include "storage/lwlock.h"
  
+ /*
+  * Define segment size.  A page is the same BLCKSZ as is used everywhere
+  * else in Postgres.  The segment size can be chosen somewhat arbitrarily;
+  * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG
+  * or 64K transactions for SUBTRANS.
+  *
+  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+  * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where
+  * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at
+  * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need
+  * take no explicit notice of that fact in this module, except when comparing
+  * segment and page numbers in SimpleLruTruncate (see PagePrecedes()).
+  *
+  * Note: this file currently assumes that segment file names will be four
+  * hex digits.	This sets a lower bound on the segment size (64K transactions
+  * for 32-bit TransactionIds).
+  */
+ #define SLRU_PAGES_PER_SEGMENT	32
+ 
  
  /*
   * Page status codes.  Note that these do not include the "dirty" bit.
diff -cr cvs/src/include/commands/async.h cvs.build/src/include/commands/async.h
*** cvs/src/include/commands/async.h	2009-09-06 09:08:02.000000000 +0200
--- cvs.build/src/include/commands/async.h	2009-11-11 01:11:08.000000000 +0100
***************
*** 13,22 ****
  #ifndef ASYNC_H
  #define ASYNC_H
  
  extern bool Trace_notify;
  
  /* notify-related SQL statements */
! extern void Async_Notify(const char *relname);
  extern void Async_Listen(const char *relname);
  extern void Async_Unlisten(const char *relname);
  extern void Async_UnlistenAll(void);
--- 13,30 ----
  #ifndef ASYNC_H
  #define ASYNC_H
  
+ /*
+  * How long can a payload string possibly be? Actually it needs to be one
+  * byte less to provide space for the trailing terminating '\0'
+  */
+ #define NOTIFY_PAYLOAD_MAX_LENGTH (128)
+ 
  extern bool Trace_notify;
  
+ extern void AsyncShmemInit(void);
+ 
  /* notify-related SQL statements */
! extern void Async_Notify(const char *relname, const char *payload);
  extern void Async_Listen(const char *relname);
  extern void Async_Unlisten(const char *relname);
  extern void Async_UnlistenAll(void);
diff -cr cvs/src/include/nodes/parsenodes.h cvs.build/src/include/nodes/parsenodes.h
*** cvs/src/include/nodes/parsenodes.h	2009-11-11 01:09:15.000000000 +0100
--- cvs.build/src/include/nodes/parsenodes.h	2009-11-11 01:11:08.000000000 +0100
***************
*** 2059,2064 ****
--- 2059,2065 ----
  {
  	NodeTag		type;
  	char	   *conditionname;	/* condition name to notify */
+ 	char	   *payload;		/* the payload string to be conveyed */
  } NotifyStmt;
  
  /* ----------------------
diff -cr cvs/src/include/storage/lwlock.h cvs.build/src/include/storage/lwlock.h
*** cvs/src/include/storage/lwlock.h	2009-05-10 19:53:12.000000000 +0200
--- cvs.build/src/include/storage/lwlock.h	2009-11-11 01:11:08.000000000 +0100
***************
*** 67,72 ****
--- 67,74 ----
  	AutovacuumLock,
  	AutovacuumScheduleLock,
  	SyncScanLock,
+ 	AsyncCtlLock,
+ 	AsyncQueueLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
#2A.M.
agentm@themactionfaction.com
In reply to: Joachim Wieland (#1)
Re: Listen / Notify rewrite

On Nov 11, 2009, at 4:25 PM, Joachim Wieland wrote:

Hi,

Attached is a patch for a new listen/notify implementation.

In a few words, the patch reimplements listen/notify as an slru-
based queue
which works similar to the sinval structure. Essentially it is a
ring buffer on
disk with pages mapped into shared memory for read/write access.

Additionally the patch does the following (see below for details):

1. It removes the pg_listener relation and
2. adds the possibility to specify a payload parameter, i.e.
executing in SQL
"NOTIFY foo 'payload';" and 'payload' will be delivered to any
listening
backend.
3. Every distinct notification is delivered.
4. Order is preserved, i.e. if txn 1 first does NOTIFY foo, then
NOTIFY bar, a
backend (listening to both "foo" and "bar") will always first
receive the
notification "foo" and then the notification "bar".
5. It's now "listen to a channel", not "listen to a relation"
anymore...

Hi Joachim,

Thank you for implementing this- LISTEN/NOTIFY without a payload has
been a major problem to work around for me.

I understand that coalescing multiple notifications of the same name
happens now, but I never understood why. I had hoped that someone
revisiting this "feature" would remove it. My use case is autonomous
transactions.

From a developer's standpoint, NOTIFY specifies a form of remote
trigger for further action- outside the transaction- to occur. From
that point of view, I don't see why NOTIFY foo; NOTIFY foo; is
equivalent to NOTIFY foo;. I understand the use case where a per-row
trigger could generate lots of spurious notifications, but it seems
that if anything is generating spurious notifications, the
notification is in the wrong place.

The documentation makes the strong implication that notification names
are usually table names, but with the new payload, this becomes even
less the case.

""I changed this table, take a look at it to see what's new". But no
such association is enforced by the NOTIFY and LISTEN commands." http://www.postgresql.org/docs/8.4/static/sql-notify.html

With the coalescing, I feel like I am being punished for using NOTIFY
with something other than a table name use case.

At least with this new payload, I can set the payload to the
transaction ID and be certain that all the notifications I sent are
processed (and in order even!) but could you explain why the
coalescing is still necessary?

While coalescing could be achieved on the receiving-end NOTIFY
callback ( if(payload ID was already processed) continue; ), non-
coalescing behavior cannot be achieved when the backend does the
coalescing.

For backwards compatibility, payload-less NOTIFY could coalesce while
NOTIFYs with a payload would not coalesce, but, on the other hand,
that might be confusing.

In any case, thank you for improving this long-neglected subsystem!

Best regards,
M

#3Martijn van Oosterhout
kleptog@svana.org
In reply to: Joachim Wieland (#1)
Re: Listen / Notify rewrite

On Wed, Nov 11, 2009 at 10:25:05PM +0100, Joachim Wieland wrote:

Hi,

Attached is a patch for a new listen/notify implementation.

In a few words, the patch reimplements listen/notify as an slru-based queue
which works similar to the sinval structure. Essentially it is a ring buffer on
disk with pages mapped into shared memory for read/write access.

While I can't really comment on the implementation, from your
description it looks like a big improvement.

Nice work!

Have a nice day,
--
Martijn van Oosterhout <kleptog@svana.org> http://svana.org/kleptog/

Show quoted text

Please line up in a tree and maintain the heap invariant while
boarding. Thank you for flying nlogn airlines.

#4Andrew Chernow
ac@esilo.com
In reply to: A.M. (#2)
Re: Listen / Notify rewrite

2. adds the possibility to specify a payload parameter, i.e. executing
in SQL
"NOTIFY foo 'payload';" and 'payload' will be delivered to any
listening
backend.

Thank you for implementing this- LISTEN/NOTIFY without a payload has
been a major problem to work around for me.

+1

--
Andrew Chernow
eSilo, LLC
every bit counts
http://www.esilo.com/

#5Andrew Chernow
ac@esilo.com
In reply to: Martijn van Oosterhout (#3)
Re: Listen / Notify rewrite
   /*
+  * This function is executed for every notification found in the queue in order
+  * to check if the current backend is listening on that channel. Not sure if we
+  * should further optimize this, for example convert to a sorted array and
+  * allow binary search on it...
+  */
+ static bool
+ IsListeningOn(const char *channel)

I think a bsearch would be needed. Very busy servers that make heavy use of
notifies would be quite a penalty.

--
Andrew Chernow
eSilo, LLC
every bit counts
http://www.esilo.com/

#6Joachim Wieland
joe@mcknight.de
In reply to: Andrew Chernow (#5)
Re: Listen / Notify rewrite

On Thu, Nov 12, 2009 at 1:04 AM, Andrew Chernow <ac@esilo.com> wrote:

I think a bsearch would be needed.  Very busy servers that make heavy use of
notifies would be quite a penalty.

In such an environment, how many relations/channels is a backend
typically listening to?
Do you have average / maximal numbers?

Regards,
Joachim

#7Tom Lane
tgl@sss.pgh.pa.us
In reply to: Andrew Chernow (#5)
Re: Listen / Notify rewrite

Andrew Chernow <ac@esilo.com> writes:

+  * This function is executed for every notification found in the queue in order
+  * to check if the current backend is listening on that channel. Not sure if we
+  * should further optimize this, for example convert to a sorted array and
+  * allow binary search on it...

I think a bsearch would be needed. Very busy servers that make heavy use of
notifies would be quite a penalty.

Premature optimization is the root of all evil ;-). Unless you've done
some profiling and can show that this is a hot spot, making it more
complicated isn't the thing to be doing now.

regards, tom lane

#8Andrew Chernow
ac@esilo.com
In reply to: Joachim Wieland (#6)
Re: Listen / Notify rewrite

Joachim Wieland wrote:

On Thu, Nov 12, 2009 at 1:04 AM, Andrew Chernow <ac@esilo.com> wrote:

I think a bsearch would be needed. Very busy servers that make heavy use of
notifies would be quite a penalty.

In such an environment, how many relations/channels is a backend
typically listening to?
Do you have average / maximal numbers?

We have a system where many libpq clients, ~2000 - 4000 per server (depends on
hardware), maintain a persistent backend connection. Each connection listens
for notifies, LISTEN 'client_xxx'. There are maybe 10 different reasons why a
NOTIFY 'client_xxx' is fired. Sometimes, notifies are broadcasted to all client
connections, or just portions of them.

The goal is real-time messaging to a large groups of computers/devices. Past
4000, the problem is distributed to a second, third, etc... server.

--
Andrew Chernow
eSilo, LLC
every bit counts
http://www.esilo.com/

#9Andrew Gierth
andrew@tao11.riddles.org.uk
In reply to: Martijn van Oosterhout (#3)
Re: Listen / Notify rewrite

"Martijn" == Martijn van Oosterhout <kleptog@svana.org> writes:

Hi,

Attached is a patch for a new listen/notify implementation.

In a few words, the patch reimplements listen/notify as an
slru-based queue which works similar to the sinval
structure. Essentially it is a ring buffer on disk with pages
mapped into shared memory for read/write access.

Martijn> While I can't really comment on the implementation, from your
Martijn> description it looks like a big improvement.

Does it cope with the case where a trigger is doing NOTIFY, and you do
a whole-table update, therefore dumping potentially millions of
notifications in at once?

(for example a rare maintenance operation on a table which has a
listen/notify arrangement triggered by single inserts or updates)

The existing implementation copes with that just fine.

--
Andrew (irc:RhodiumToad)

#10Andrew Chernow
ac@esilo.com
In reply to: Tom Lane (#7)
Re: Listen / Notify rewrite

Premature optimization is the root of all evil ;-). Unless you've done
some profiling and can show that this is a hot spot, making it more
complicated isn't the thing to be doing now.

I'm thinking of how our system uses/abuses notifies, and began wondering
if several thousand backends listening with a large queue would perform
decently behind a linear search. At this point, I have no data either
way; only an assumption based off being burnt by sequential scans in the
past ;)

--
Andrew Chernow
eSilo, LLC
every bit counts
http://www.esilo.com/

#11Merlin Moncure
mmoncure@gmail.com
In reply to: A.M. (#2)
Re: Listen / Notify rewrite

On Wed, Nov 11, 2009 at 5:48 PM, A.M. <agentm@themactionfaction.com> wrote:

At least with this new payload, I can set the payload to the transaction ID
and be certain that all the notifications I sent are processed (and in order
even!) but could you explain why the coalescing is still necessary?

Christmas comes early this year! :-).

three reasons:
*) it works that way now...a lot of people use this feature for all
kinds of subtle things and the behavior chould change as little as
possible
*) legacy issues aside, I think it's generally better behavior (how
many times do you need to be tapped on the shoulder?)
*) since you can trivially differentiate it (using xid, sequence,
etc), what's the fuss?

merlin

#12A.M.
agentm@themactionfaction.com
In reply to: Merlin Moncure (#11)
Re: Listen / Notify rewrite

On Nov 11, 2009, at 9:28 PM, Merlin Moncure wrote:

On Wed, Nov 11, 2009 at 5:48 PM, A.M.
<agentm@themactionfaction.com> wrote:

At least with this new payload, I can set the payload to the
transaction ID
and be certain that all the notifications I sent are processed
(and in order
even!) but could you explain why the coalescing is still necessary?

Christmas comes early this year! :-).

three reasons:
*) it works that way now...a lot of people use this feature for all
kinds of subtle things and the behavior chould change as little as
possible
*) legacy issues aside, I think it's generally better behavior (how
many times do you need to be tapped on the shoulder?)
*) since you can trivially differentiate it (using xid, sequence,
etc), what's the fuss?

Except for the fact that the number of times a notification occurred
may be valuable information.

I thought of a compromise: add the number of times a notification was
generated (coalesced count+1) to the callback data. That would
satisfy any backwards compatibility concerns and my use case too!

Cheers,
M

#13Tom Lane
tgl@sss.pgh.pa.us
In reply to: Joachim Wieland (#1)
Re: Listen / Notify rewrite

Joachim Wieland <joe@mcknight.de> writes:

However, if for some reason we cannot write to the slru files in the pg_notify/
directory we might want to roll back the current transaction but with the
proposed patch we cannot because we have already committed...

I think this is a deal-breaker, and arguing about how the pg_notify
directory ought to be writable is not even slightly acceptable --- out
of disk space is an obvious risk. The existing implementation
guarantees that notifications obey ACID: if you commit, they are sent,
and if you don't, they aren't. You can't just put in something that
works most of the time instead.

One possible solution would be to write to the queue before committing
and adding the TransactionID. Then other backends can check if our
TransactionID has successfully committed or not. Not sure if this is
worth the overhead however...

That sounds reasonable, and it's certainly no more overhead than the
tuple visibility checks that happen in the current implementation.

2. The payload parameter is optional. A notifying client can either call
"NOTIFY foo;" or "NOTIFY foo 'payload';". The length of the payload is
currently limited to 128 characters... Not sure if we should allow longer
payload strings...

Might be a good idea to make the max the same as the max length for
prepared transaction GUIDs? Not sure anyone would be shipping those
around, but it's a pre-existing limit of about the same size.

regards, tom lane

#14Andrew Chernow
ac@esilo.com
In reply to: A.M. (#12)
Re: Listen / Notify rewrite

I thought of a compromise: add the number of times a notification was
generated (coalesced count+1) to the callback data. That would satisfy
any backwards compatibility concerns and my use case too!

If you are suggesting that the server poke data into the notifier's opaque
payload, I vote no. Maybe the NOTIFY command can include a switch to enable
this behavior. No syntax suggestions at this point.

--
Andrew Chernow
eSilo, LLC
every bit counts
http://www.esilo.com/

#15Tom Lane
tgl@sss.pgh.pa.us
In reply to: Andrew Chernow (#14)
Re: Listen / Notify rewrite

Andrew Chernow <ac@esilo.com> writes:

I thought of a compromise: add the number of times a notification was
generated (coalesced count+1) to the callback data. That would satisfy
any backwards compatibility concerns and my use case too!

If you are suggesting that the server poke data into the notifier's opaque
payload, I vote no. Maybe the NOTIFY command can include a switch to enable
this behavior. No syntax suggestions at this point.

I agree, we should not have the system modifying the payload string for
this. And don't bother suggesting a third column in the result ---
we'd have to change the FE/BE protocol for that, and it's not going
to happen.

The existing precedent is that the system collapses identical
notifications without payloads. So we could possibly get away with
saying that identical payload-less notifies are collapsed but those
with a payload are not. That doesn't really seem to satisfy the
POLA though. I think Joachim's definition is fine, and anyone who
needs delivery of distinct notifications can easily make his payload
strings unique to ensure it.

regards, tom lane

#16Andrew Chernow
ac@esilo.com
In reply to: Tom Lane (#13)
Re: Listen / Notify rewrite

2. The payload parameter is optional. A notifying client can either call
"NOTIFY foo;" or "NOTIFY foo 'payload';". The length of the payload is
currently limited to 128 characters... Not sure if we should allow longer
payload strings...

Might be a good idea to make the max the same as the max length for
prepared transaction GUIDs? Not sure anyone would be shipping those
around, but it's a pre-existing limit of about the same size.

I don't see any reason to impose such a small limit on payload size. Surely
some limit must exist, but 128 characters seems awfully small. I already have
at few places in mind that would require more bytes.

Why not 8K, 64K, 256K, 1M or even more? Is there some other factor in play
forcing this limitation?

--
Andrew Chernow
eSilo, LLC
every bit counts
http://www.esilo.com/

#17A.M.
agentm@themactionfaction.com
In reply to: Tom Lane (#15)
Re: Listen / Notify rewrite

On Nov 11, 2009, at 10:43 PM, Tom Lane wrote:

Andrew Chernow <ac@esilo.com> writes:

I thought of a compromise: add the number of times a notification
was
generated (coalesced count+1) to the callback data. That would
satisfy
any backwards compatibility concerns and my use case too!

If you are suggesting that the server poke data into the
notifier's opaque
payload, I vote no. Maybe the NOTIFY command can include a switch
to enable
this behavior. No syntax suggestions at this point.

I agree, we should not have the system modifying the payload string
for
this. And don't bother suggesting a third column in the result ---
we'd have to change the FE/BE protocol for that, and it's not going
to happen.

The existing precedent is that the system collapses identical
notifications without payloads. So we could possibly get away with
saying that identical payload-less notifies are collapsed but those
with a payload are not. That doesn't really seem to satisfy the
POLA though. I think Joachim's definition is fine, and anyone who
needs delivery of distinct notifications can easily make his payload
strings unique to ensure it.

The notification count could be a secondary "payload" which does not
affect the first, but I guess I'm the only one complaining about the
coalescing...

-M

#18Joachim Wieland
joe@mcknight.de
In reply to: Andrew Gierth (#9)
Re: Listen / Notify rewrite

On Thu, Nov 12, 2009 at 2:12 AM, Andrew Gierth
<andrew@tao11.riddles.org.uk> wrote:

Does it cope with the case where a trigger is doing NOTIFY, and you do
a whole-table update, therefore dumping potentially millions of
notifications in at once?

(for example a rare maintenance operation on a table which has a
listen/notify arrangement triggered by single inserts or updates)

The existing implementation copes with that just fine.

As long as you use the existing way to send out notifications by just
sending "NOTIFY foo", then yes, it will cope with millions of them
just fine because it will collapse them into one notification as does
the current implementation (contrary to the current implementation a
notification will be received from every transaction that has sent one
- the current implementation even collapses notifications from
different transactions).

However if you decide to use the new syntax and add a distinct payload
to every NOTIFY, then you also send out millions of notifications...

With the proposed patch the queue has space for up to about 81,787,680
notifications. The limits are mainly imposed by slru.c which defines
page size, pages per segment and the number of segments available. We
could easily bump that up to a multiple of the current limits if we
have decided that we need to...

Joachim

#19Joachim Wieland
joe@mcknight.de
In reply to: Tom Lane (#13)
Re: Listen / Notify rewrite

On Thu, Nov 12, 2009 at 4:25 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

One possible solution would be to write to the queue before committing
and adding the TransactionID.  Then other backends can check if our
TransactionID has successfully committed or not. Not sure if this is
worth the overhead however...

That sounds reasonable, and it's certainly no more overhead than the
tuple visibility checks that happen in the current implementation.

I am not too concerned about the runtime of the visibility checks,
instead I suppose that most of the overhead will come from waiting for
another transaction to either commit or abort...

If transaction t1 scans the queue and at some point finds
notifications from t2, then it will ask for the status of t2. If it
finds out that t2 is still running, then it has no other option than
to stop working on the queue and wait (it will be signalled again
later once t2 has finished).

This also means that we cannot at the same time write notifications to
the queue and read from it and if a transaction places a few million
notifications into the queue, readers need to wait until it has
finished and only after that they can continue and read the
notifications...

And it means that if the queue is full, we might run into a
deadlock... A transaction adding notifications will wait for the
readers to proceed and the readers wait for the transaction to commit
or abort...

One option could be to write new notifications to a subdirectory, and
create a bunch of new segment files there. Once this is done, the
segment files could be moved over and renamed, so that they continue
the slru queue... If we run out of disk space while filling that
temporary subdirectory, then we can just delete the subdirectory and
nobody has been blocked. We could still run into errors moving and
renaming the segment files (e.g. permission problems) so that we still
might need to abort the transaction...

2. The payload parameter is optional. A notifying client can either call
"NOTIFY foo;" or "NOTIFY foo 'payload';". The length of the payload is
currently limited to 128 characters... Not sure if we should allow longer
payload strings...

Might be a good idea to make the max the same as the max length for
prepared transaction GUIDs?  Not sure anyone would be shipping those
around, but it's a pre-existing limit of about the same size.

Yes, sounds reasonable to have the same limit for user-defined identifiers...

Joachim

#20Andrew Chernow
ac@esilo.com
In reply to: Joachim Wieland (#19)
Re: Listen / Notify rewrite

2. The payload parameter is optional. A notifying client can either call
"NOTIFY foo;" or "NOTIFY foo 'payload';". The length of the payload is
currently limited to 128 characters... Not sure if we should allow longer
payload strings...

Might be a good idea to make the max the same as the max length for
prepared transaction GUIDs? Not sure anyone would be shipping those
around, but it's a pre-existing limit of about the same size.

Yes, sounds reasonable to have the same limit for user-defined identifiers...

[..begging..] Can this be increased significantly? I don't get it, is there any
technical reason to make the limit soo small? This drastically reduces the
usefulness of the payload. I've wanted this feature for quite sometime and it
is quite disappointing that I could not even use it because it is unjustifiably
limited.

One use case I need is making the payload an absolute path, which saves us a
round trip (commonly internet latency) and a query in a section of the system
that's extremely performance sensitive. That sure ain't going to fit in 128
bytes.

I'm sure I'm not the only one who finds this limit too small. I can almost
guarentee complaints would come in if released that way.

--
Andrew Chernow
eSilo, LLC
every bit counts
http://www.esilo.com/

#21Merlin Moncure
mmoncure@gmail.com
In reply to: Andrew Chernow (#20)
Re: Listen / Notify rewrite

On Thu, Nov 12, 2009 at 8:25 AM, Andrew Chernow <ac@esilo.com> wrote:

2. The payload parameter is optional. A notifying client can either call
"NOTIFY foo;" or "NOTIFY foo 'payload';". The length of the payload is
currently limited to 128 characters... Not sure if we should allow
longer
payload strings...

Might be a good idea to make the max the same as the max length for
prepared transaction GUIDs?  Not sure anyone would be shipping those
around, but it's a pre-existing limit of about the same size.

Yes, sounds reasonable to have the same limit for user-defined
identifiers...

[..begging..] Can this be increased significantly?  I don't get it, is there
any technical reason to make the limit soo small?  This drastically reduces
the usefulness of the payload.  I've wanted this feature for quite sometime
and it is quite disappointing that I could not even use it because it is
unjustifiably limited.

+1

What advantage is there in limiting it to a tiny size? This is a
'payload' after all...an arbitrary data block. Looking at the patch I
noticed the payload structure (AsyncQueueEntry) is fixed length and
designed to lay into QUEUE_PAGESIZE (set to) BLCKSZ sized pages.

Couple of questions:

*) is BLCKSZ a hard requirement, that is, coming from the slru
implementation, or can QUEUE_PAGESIZE be bumped independently of block
size.

*) why not make the AsyncQueueEntry divide evenly into BLCKSZ, that
is, make the whole structure a size that is a multiple of two? (this
would make the payload length 'weird')

*) is there any downside you see to making the AsyncQueueEntry
structure exactly BLCKSZ bytes in size? Are we worried about the
downsides of spinning the notifications out to disk?

*) Is a variable length AsyncQueueEntry possible? (presumably bounded
by the max page size). Or does complicate the implementation too
much?

merlin

merlin

#22Andrew Chernow
ac@esilo.com
In reply to: Merlin Moncure (#21)
Re: Listen / Notify rewrite

What advantage is there in limiting it to a tiny size? This is a
'payload' after all...an arbitrary data block. Looking at the patch I
noticed the payload structure (AsyncQueueEntry) is fixed length and
designed to lay into QUEUE_PAGESIZE (set to) BLCKSZ sized pages.

Hmmmm. Looks like the limitation comes from slru. The true payload
limit is (8K - struct members) the way this is implemented.

--
Andrew Chernow
eSilo, LLC
every bit counts
http://www.esilo.com/

#23Greg Stark
gsstark@mit.edu
In reply to: Andrew Chernow (#22)
Re: Listen / Notify rewrite

On Thu, Nov 12, 2009 at 3:09 PM, Andrew Chernow <ac@esilo.com> wrote:

What advantage is there in limiting it to a tiny size? This is a
'payload' after all...an arbitrary data block. Looking at the patch I
noticed the payload structure (AsyncQueueEntry) is fixed length and
designed to lay into QUEUE_PAGESIZE (set to) BLCKSZ sized pages.

Hmmmm. Looks like the limitation comes from slru. The true payload limit
is (8K - struct members) the way this is implemented.

So I'm beginning to think that adding a "payload" to NOTIFY is a bad
idea altogether.

The original use case NOTIFY was designed to handle was to implement
condition variables. You have clients wait on some data structure and
whenever someone changes that data structure they notify all waiting
clients to reread the data structure and check if their condition is
true. The condition might be that the data structure doesn't match
their locally cached copy and they have to rebuild their cache, it
might be that some work queue is non-empty, etc.

So the way to build a queue would be to use a table to hold your work
queue and then use NOTIFY whenever you insert into the queue and any
idle workers would know to go dequeue something from the queue.

It sounds like what people are trying to do is use NOTIFY as a queue
directly. The problem with this is that it was specifically not
designed to do this. As evidenced by the fact that it can't store
arbitrary data structures (nothing else in postgres is limited to only
handling text data, this would be an annoying arbitrary limitation),
can't handle even moderately large data efficiently, etc.

I'm beginning to think the right solution is to reject the idea of
adding a payload to the NOTIFY mechanism and instead provide a queue
contrib module which provides the interface people want.

--
greg

#24Joachim Wieland
joe@mcknight.de
In reply to: Merlin Moncure (#21)
Re: Listen / Notify rewrite

On Thu, Nov 12, 2009 at 3:30 PM, Merlin Moncure <mmoncure@gmail.com> wrote:

Couple of questions:

*) is BLCKSZ a hard requirement, that is, coming from the slru
implementation, or can QUEUE_PAGESIZE be bumped independently of block
size.

It's the size of slru's pages.

*) why not make the AsyncQueueEntry divide evenly into BLCKSZ, that
is, make the whole structure a size that is a multiple of two?  (this
would make the payload length 'weird')

it's possible and yes, it would make the payload length weird. Also if
we wanted to add further structure members later on (commit time, xid,
notify counter, whatever) we'd have to announce that payload in a new
release cannot have the same length as in a previous one, so I tried
to keep both independent right from the beginning.

*) is there any downside you see to making the AsyncQueueEntry
structure exactly BLCKSZ bytes in size?  Are we worried about the
downsides of spinning the notifications out to disk?

Yes, and also we'd need a lot more page slots to work efficiently and
I fear quite some queue managing overhead (locking). Currently we are
using 4 page slots and can have ~160 notifications mapped into memory.

*) Is a variable length AsyncQueueEntry possible? (presumably bounded
by the max page size).  Or does complicate the implementation too
much?

That's an option to look at, I think it's technically possible, we'd
just need to throw in some more logic but I didnt want to invest too
much effort before we have a clear way to go.

However I share Greg's concerns that people are trying to use NOTIFY
as a message queue which it is not designed to be.

Joachim

#25Bernd Helmle
mailings@oopsware.de
In reply to: Greg Stark (#23)
Re: Listen / Notify rewrite

--On 12. November 2009 15:48:44 +0000 Greg Stark <gsstark@mit.edu> wrote:

I'm beginning to think the right solution is to reject the idea of
adding a payload to the NOTIFY mechanism and instead provide a queue
contrib module which provides the interface people want.

Isn't PgQ already the solution you have in mind?

--
Thanks

Bernd

#26Andrew Chernow
ac@esilo.com
In reply to: Joachim Wieland (#24)
Re: Listen / Notify rewrite

However I share Greg's concerns that people are trying to use NOTIFY
as a message queue which it is not designed to be.

When you have an established libpq connection waiting for notifies it is
not unreasonable to expect/desire a payload. ISTM, the problem is that
the initial design was half-baked. NOTIFY is event-driven, ie. no
polling!

--
Andrew Chernow
eSilo, LLC
every bit counts
http://www.esilo.com/

#27Tom Lane
tgl@sss.pgh.pa.us
In reply to: Joachim Wieland (#24)
Re: Listen / Notify rewrite

Joachim Wieland <joe@mcknight.de> writes:

However I share Greg's concerns that people are trying to use NOTIFY
as a message queue which it is not designed to be.

Yes. Particularly those complaining that they want to have very large
payload strings --- that's pretty much a dead giveaway that it's not
being used as a condition signal.

Now you might say that yeah, that's the point, we're trying to enable
using NOTIFY in a different style. The problem is that if you are
trying to use NOTIFY as a queue, you will soon realize that it has
the wrong semantics for that --- in particular, losing notifies across
a system crash or client crash is OK for a condition notification,
not so OK for a message queue. The difference is that the former style
assumes that the authoritative data is in a table somewhere, so you can
still find out what you need to know after reconnecting. If you are
doing messaging you are likely to think that you don't need any backing
store for the system state.

So while a payload string for NOTIFY has been on the to-do list since
forever, I have to think that Greg's got a good point questioning
whether it is actually a good idea.

regards, tom lane

#28Dave Page
dpage@pgadmin.org
In reply to: Tom Lane (#27)
Re: Listen / Notify rewrite

On Thu, Nov 12, 2009 at 4:30 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

So while a payload string for NOTIFY has been on the to-do list since
forever, I have to think that Greg's got a good point questioning
whether it is actually a good idea.

Here's an example of why I'd like a payload (and not a queue in an
add-on module). Say you have multiple users running pgAdmin. One of
them creates a new table. Currently, unless the other user refreshes
his view of the database, he won't see it. I'd like to be able to
notify the other pgAdmin sessions that a new table has been created,
so they can automatically display it, without having to poll pg_class
or be manually refreshed. The payload could contain details of type of
object that has been created, and it's OID/identifier to minimise the
work required of the other sessions to find and display the new
object.

And as I'm sure you're already thinking it, yes, I know it doesn't
help if the new table is created using psql, but there are lots of
shops where pgAdmin is the default tool, and it could help them and
just exhibit the current behaviour if someone does break out psql.

--
Dave Page
EnterpriseDB UK: http://www.enterprisedb.com

#29Robert Haas
robertmhaas@gmail.com
In reply to: Tom Lane (#27)
Re: Listen / Notify rewrite

On Thu, Nov 12, 2009 at 11:30 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Joachim Wieland <joe@mcknight.de> writes:

However I share Greg's concerns that people are trying to use NOTIFY
as a message queue which it is not designed to be.

Yes.  Particularly those complaining that they want to have very large
payload strings --- that's pretty much a dead giveaway that it's not
being used as a condition signal.

Now you might say that yeah, that's the point, we're trying to enable
using NOTIFY in a different style.  The problem is that if you are
trying to use NOTIFY as a queue, you will soon realize that it has
the wrong semantics for that --- in particular, losing notifies across
a system crash or client crash is OK for a condition notification,
not so OK for a message queue.  The difference is that the former style
assumes that the authoritative data is in a table somewhere, so you can
still find out what you need to know after reconnecting.  If you are
doing messaging you are likely to think that you don't need any backing
store for the system state.

So while a payload string for NOTIFY has been on the to-do list since
forever, I have to think that Greg's got a good point questioning
whether it is actually a good idea.

I think there could be cases where the person writing the code can
know, extrinsic to the system, that lost notifications are OK, and
still want to deliver a payload. But I think the idea of enabling a
huge payload is not wise, as it sounds like it will sacrifice
performance for a feature that is by definition not essential to
anyone who is using this now. A small payload seems like a reasonable
compromise.

...Robert

#30Merlin Moncure
mmoncure@gmail.com
In reply to: Tom Lane (#27)
Re: Listen / Notify rewrite

On Thu, Nov 12, 2009 at 11:30 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Joachim Wieland <joe@mcknight.de> writes:

However I share Greg's concerns that people are trying to use NOTIFY
as a message queue which it is not designed to be.

Yes.  Particularly those complaining that they want to have very large
payload strings --- that's pretty much a dead giveaway that it's not
being used as a condition signal.

Now you might say that yeah, that's the point, we're trying to enable
using NOTIFY in a different style.  The problem is that if you are
trying to use NOTIFY as a queue, you will soon realize that it has
the wrong semantics for that --- in particular, losing notifies across
a system crash or client crash is OK for a condition notification,
not so OK for a message queue.  The difference is that the former style
assumes that the authoritative data is in a table somewhere, so you can
still find out what you need to know after reconnecting.  If you are
doing messaging you are likely to think that you don't need any backing
store for the system state.

So while a payload string for NOTIFY has been on the to-do list since
forever, I have to think that Greg's got a good point questioning
whether it is actually a good idea.

You guys are assuming it's being used in a queue, which is only a
fraction of cases where this feature is useful. In fact, having a
payload can remove the need for a queue completely where is currently
required for no other reason to deliver payload messages.

I'm sorry, the 128 character limit is simply lame (other than for
unsolvable implementation/performance complexity which I doubt is the
case here), and if that constraint is put in by the implementation,
than the implementation is busted and should be reworked until it's
right. A feature that is being used for things not intended is a sign
of a strong feature, not a weak one, and the idea that a payload
should be length limited in order to prevent use in ways that are
'wrong' is a very weak argument IMO. People have been asking for this
feature since the beginning of time, and nobody said: 'please limit it
to 128 bytes'. A limit of 4k - 64k is much more appropriate if you
even want a hard limit at all...

merlin

#31Merlin Moncure
mmoncure@gmail.com
In reply to: Robert Haas (#29)
Re: Listen / Notify rewrite

On Thu, Nov 12, 2009 at 11:39 AM, Robert Haas <robertmhaas@gmail.com> wrote:

On Thu, Nov 12, 2009 at 11:30 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Joachim Wieland <joe@mcknight.de> writes:

However I share Greg's concerns that people are trying to use NOTIFY
as a message queue which it is not designed to be.

Yes.  Particularly those complaining that they want to have very large
payload strings --- that's pretty much a dead giveaway that it's not
being used as a condition signal.

Now you might say that yeah, that's the point, we're trying to enable
using NOTIFY in a different style.  The problem is that if you are
trying to use NOTIFY as a queue, you will soon realize that it has
the wrong semantics for that --- in particular, losing notifies across
a system crash or client crash is OK for a condition notification,
not so OK for a message queue.  The difference is that the former style
assumes that the authoritative data is in a table somewhere, so you can
still find out what you need to know after reconnecting.  If you are
doing messaging you are likely to think that you don't need any backing
store for the system state.

So while a payload string for NOTIFY has been on the to-do list since
forever, I have to think that Greg's got a good point questioning
whether it is actually a good idea.

I think there could be cases where the person writing the code can
know, extrinsic to the system, that lost notifications are OK, and
still want to deliver a payload.  But I think the idea of enabling a
huge payload is not wise, as it sounds like it will sacrifice
performance for a feature that is by definition not essential to

'premature optimization is the root of all evil' :-)

merlin

#32Andrew Chernow
ac@esilo.com
In reply to: Tom Lane (#27)
Re: Listen / Notify rewrite

Now you might say that yeah, that's the point, we're trying to enable
using NOTIFY in a different style. The problem is that if you are
trying to use NOTIFY as a queue, you will soon realize that it has
the wrong semantics for that --- in particular, losing notifies across
a system crash or client crash is OK for a condition notification,
not so OK for a message queue. The difference is that the former style
assumes that the authoritative data is in a table somewhere, so you can
still find out what you need to know after reconnecting. If you are
doing messaging you are likely to think that you don't need any backing
store for the system state.

I simply don't agree that the semantics have to change. You call it a
queue, I call it sesison data. There is no reason why the documentation
can't state that notifies may not be delivered due to crashes, so make
sure to use persistent storage for any payload worth keeping post-session.

--
Andrew Chernow
eSilo, LLC
every bit counts
http://www.esilo.com/

#33Merlin Moncure
mmoncure@gmail.com
In reply to: Merlin Moncure (#30)
Re: Listen / Notify rewrite

On Thu, Nov 12, 2009 at 11:40 AM, Merlin Moncure <mmoncure@gmail.com> wrote:

I'm sorry, the 128 character limit is simply lame (other than for
unsolvable implementation/performance complexity which I doubt is the
case here), and if that constraint is put in by the implementation,
than the implementation is busted and should be reworked until it's
right.

After some reflection, I realized this was an overly strong statement
and impolite to the OP. It's easy to yarp from the gallery with the
other peanuts :-). It's not the implementation I have an issue with,
just the _idea_ that we should be restricted to small payloads for
religious reasons...until that came upI was already scheming on how to
both extend the patch to be more flexible in terms of payload size,
and to backpatch and test it on 8.4 (no point if the community has no
interest however). In any event, sorry for the strong words.

merlin

#34Greg Sabino Mullane
greg@turnstep.com
In reply to: A.M. (#17)
Re: Listen / Notify rewrite

-----BEGIN PGP SIGNED MESSAGE-----
Hash: RIPEMD160

Tom Lane writes:

Yes. Particularly those complaining that they want to have very large
payload strings --- that's pretty much a dead giveaway that it's not
being used as a condition signal.

I don't want "large" but I do think an arbitrary limit of 128 is odd without
some justfication. I could do 128, but would probably be happier with a bit
more room.

Now you might say that yeah, that's the point, we're trying to enable
using NOTIFY in a different style. The problem is that if you are
trying to use NOTIFY as a queue, you will soon realize that it has
the wrong semantics for that --- in particular, losing notifies across
a system crash or client crash is OK for a condition notification,
not so OK for a message queue. The difference is that the former style
assumes that the authoritative data is in a table somewhere, so you can
still find out what you need to know after reconnecting. If you are
doing messaging you are likely to think that you don't need any backing
store for the system state.

That's putting an awful lot of assumptions on what people are going to do
in the future, and also not being imaginative enough for circumstances
in which a payload which is not system crash survivable is a completely
acceptable condition. In most of my use cases, even desired. People wanting
a real queue can continue to use something other than NOTIFY.

So while a payload string for NOTIFY has been on the to-do list since
forever, I have to think that Greg's got a good point questioning
whether it is actually a good idea.

Absolutely is a good idea.

Agent M asks:

The notification count could be a secondary "payload" which does not
affect the first, but I guess I'm the only one complaining about the
coalescing...

Er...this thread is only a few hours old. For the record, I'm fine with the
coalescing as we do now (at least as far as my own selfish purposes :)

- --
Greg Sabino Mullane greg@turnstep.com
PGP Key: 0x14964AC8 200911121836
http://biglumber.com/x/web?pk=2529DF6AB8F79407E94445B4BC9B906714964AC8
-----BEGIN PGP SIGNATURE-----

iEYEAREDAAYFAkr8nCwACgkQvJuQZxSWSsjQIgCgjH60LlZYEek9FwcD+/w4IHYQ
PWwAnR0YxdSBm5iBa+G+T1VpIP4qjJsx
=Ju0P
-----END PGP SIGNATURE-----

#35Josh Berkus
josh@agliodbs.com
In reply to: Tom Lane (#27)
Re: Listen / Notify rewrite

On 11/12/09 8:30 AM, Tom Lane wrote:

So while a payload string for NOTIFY has been on the to-do list since
forever, I have to think that Greg's got a good point questioning
whether it is actually a good idea.

Sure, people will abuse it as a queue. But people abuse arrays when
they should be using child tables, use composite types to make data
non-atomic, and use dblink when they really should be using schema.
Does the potential for misuse mean that we should drop the features? No.

Payloads are also quite useful even in a lossy environment, where you
understand that LISTEN is not a queue. For example, I'd like to be
using LISTEN/NOTIFY for cache invalidation for some applications; if it
misses a few, or double-counts them, it's not an issue. However, I'd
like to be able to send message like players_updated|45321 where 45321
is the ID of the player updated.

--Josh Berkus

#36Robert Haas
robertmhaas@gmail.com
In reply to: Josh Berkus (#35)
Re: Listen / Notify rewrite

On Thu, Nov 12, 2009 at 8:44 PM, Josh Berkus <josh@agliodbs.com> wrote:

On 11/12/09 8:30 AM, Tom Lane wrote:

So while a payload string for NOTIFY has been on the to-do list since
forever, I have to think that Greg's got a good point questioning
whether it is actually a good idea.

Sure, people will abuse it as a queue.  But people abuse arrays when
they should be using child tables, use composite types to make data
non-atomic, and use dblink when they really should be using schema.
Does the potential for misuse mean that we should drop the features?  No.

I agree. We frequently reject features on the basis that someone
might do something stupid with them. It's lame and counterproductive,
and we should stop. The world contains infinite amounts of lameness,
but that's the world's problem, not ours. There is zero evidence that
this feature is only useful for stupid purposes, and some evidence
(namely, the opinions of esteemed community members) that it is useful
for at least some non-stupid purposes.

...Robert

#37Andrew Chernow
ac@esilo.com
In reply to: Robert Haas (#36)
Re: Listen / Notify rewrite

and we should stop. The world contains infinite amounts of lameness,
but that's the world's problem, not ours. There is zero evidence that

+1

this feature is only useful for stupid purposes, and some evidence
(namely, the opinions of esteemed community members) that it is useful
for at least some non-stupid purposes.

The unexpected application of a feature can be creative or innovative, which I
firmly believe is something this community embraces. How many ways can a screw
driver be used ... think MacGyver :) Deteriming whether it's creative vs.
stupid would require an understanding of the context in which it was applied.
For example, using our screw driver to remove a splinter would be rather stupid,
IMHO ;)

--
Andrew Chernow
eSilo, LLC
every bit counts
http://www.esilo.com/

#38Steve Atkins
steve@blighty.com
In reply to: Robert Haas (#36)
Re: Listen / Notify rewrite

On Nov 12, 2009, at 5:57 PM, Robert Haas wrote:

On Thu, Nov 12, 2009 at 8:44 PM, Josh Berkus <josh@agliodbs.com> wrote:

On 11/12/09 8:30 AM, Tom Lane wrote:

So while a payload string for NOTIFY has been on the to-do list since
forever, I have to think that Greg's got a good point questioning
whether it is actually a good idea.

Sure, people will abuse it as a queue. But people abuse arrays when
they should be using child tables, use composite types to make data
non-atomic, and use dblink when they really should be using schema.
Does the potential for misuse mean that we should drop the features? No.

I agree. We frequently reject features on the basis that someone
might do something stupid with them. It's lame and counterproductive,
and we should stop. The world contains infinite amounts of lameness,
but that's the world's problem, not ours. There is zero evidence that
this feature is only useful for stupid purposes, and some evidence
(namely, the opinions of esteemed community members) that it is useful
for at least some non-stupid purposes.

Speaking as a consumer of this feature, my (mild) concern is not that by
adding functionality it will make it possible to do new things badly, it's that
it might make it harder (or impossible) to do currently supported things as well.

I like the current notify. It's a good match for handling table based
queues or for app-level-cache invalidation. Any changes that make
it less good at that would be a step backwards. The more complex
and flexible and "heavier" the changes, the more that concerns me.

(Though a small payload - I'd settle for at least an integer - would be
convenient, to allow invalidation of 20 different caches without needing
20 different LISTENs).

Cheers,
Steve

#39Joachim Wieland
joe@mcknight.de
In reply to: Joachim Wieland (#19)
Re: Listen / Notify rewrite

Unfortunately with all that payload-length discussion, the other part
of my email regarding ACID compliant behavior got completely lost. I
would appreciate some input on that part also...

Thanks,
Joachim

Show quoted text

On Thu, Nov 12, 2009 at 12:17 PM, Joachim Wieland <joe@mcknight.de> wrote:

On Thu, Nov 12, 2009 at 4:25 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

One possible solution would be to write to the queue before committing
and adding the TransactionID.  Then other backends can check if our
TransactionID has successfully committed or not. Not sure if this is
worth the overhead however...

That sounds reasonable, and it's certainly no more overhead than the
tuple visibility checks that happen in the current implementation.

I am not too concerned about the runtime of the visibility checks,
instead I suppose that most of the overhead will come from waiting for
another transaction to either commit or abort...

If transaction t1 scans the queue and at some point finds
notifications from t2, then it will ask for the status of t2. If it
finds out that t2 is still running, then it has no other option than
to stop working on the queue and wait (it will be signalled again
later once t2 has finished).

This also means that we cannot at the same time write notifications to
the queue and read from it and if a transaction places a few million
notifications into the queue, readers need to wait until it has
finished and only after that they can continue and read the
notifications...

And it means that if the queue is full, we might run into a
deadlock... A transaction adding notifications will wait for the
readers to proceed and the readers wait for the transaction to commit
or abort...

One option could be to write new notifications to a subdirectory, and
create a bunch of new segment files there. Once this is done, the
segment files could be moved over and renamed, so that they continue
the slru queue... If we run out of disk space while filling that
temporary subdirectory, then we can just delete the subdirectory and
nobody has been blocked. We could still run into errors moving and
renaming the segment files (e.g. permission problems) so that we still
might need to abort the transaction...

2. The payload parameter is optional. A notifying client can either call
"NOTIFY foo;" or "NOTIFY foo 'payload';". The length of the payload is
currently limited to 128 characters... Not sure if we should allow longer
payload strings...

Might be a good idea to make the max the same as the max length for
prepared transaction GUIDs?  Not sure anyone would be shipping those
around, but it's a pre-existing limit of about the same size.

Yes, sounds reasonable to have the same limit for user-defined identifiers...

Joachim

#40Greg Stark
gsstark@mit.edu
In reply to: Robert Haas (#36)
Re: Listen / Notify rewrite

On Fri, Nov 13, 2009 at 1:57 AM, Robert Haas <robertmhaas@gmail.com> wrote:

I agree.  We frequently reject features on the basis that someone
might do something stupid with them.  It's lame and counterproductive,
and we should stop.  The world contains infinite amounts of lameness,
but that's the world's problem, not ours.  There is zero evidence that
this feature is only useful for stupid purposes, and some evidence
(namely, the opinions of esteemed community members) that it is useful
for at least some non-stupid purposes.

This is BS. The problem is not that someone might do something stupid
with this feature. The problem is that we're making these other use
cases into requirements which will influence the design. This is a
classic "feature creep" situation and the result is normally products
which solve none of the use cases especially well.

Remember this queue has to live in shared memory which is a fixed size
resource. If you're designing a queue mechanism then you would
naturally use something like a queue or priority queue. You expect to
spill to disk and need an efficient storage mechanism. The natural
implementation of this in Postgres would be a table, not the slru. If
you're designing a condition-variable mechanism then you would
naturally use a hash table which can probably live in a single page
with a simple flag for each variable. The comment in another thread
that this mechanism should implement ACID properties just reinforces
my reaction.

--
greg

#41Merlin Moncure
mmoncure@gmail.com
In reply to: Greg Stark (#40)
Re: Listen / Notify rewrite

On Fri, Nov 13, 2009 at 5:35 AM, Greg Stark <gsstark@mit.edu> wrote:

On Fri, Nov 13, 2009 at 1:57 AM, Robert Haas <robertmhaas@gmail.com> wrote:

I agree.  We frequently reject features on the basis that someone
might do something stupid with them.  It's lame and counterproductive,
and we should stop.  The world contains infinite amounts of lameness,
but that's the world's problem, not ours.  There is zero evidence that
this feature is only useful for stupid purposes, and some evidence
(namely, the opinions of esteemed community members) that it is useful
for at least some non-stupid purposes.

This is BS. The problem is not that someone might do something stupid
with this feature. The problem is that we're making these other use
cases into requirements which will influence the design. This is a
classic "feature creep" situation and the result is normally products
which solve none of the use cases especially well.

Removing a length restriction is feature creep?

Having an flexible payload mechanism improves on notify in the same
way that epoll improves on poll. Yes, epoll is overdesigned, highly
overused, etc. but it does vastly improve server
handling/responsiveness in some situations. Delivering a notification
with data saves a round trip back to the server and a transaction
which is both helpful in terms of server load and improving latency.
On top of that, I don't think saying: "hello; here's some data" is
groundbreaking in terms of network communication paradigms.

My interest in this feature is not academic, the project I'm working
on could use it with great benefit immediately. Arguments that I am
using notify for the set list of use cases improvised by the original
authors are not going to hold much water with me :-).

IMNSHO, I don't think that keeping payloads limited to a tiny size
'improves' this feature is a winnable argument. That said, I do
appreciate simple designs and very much understand trying to keep
things simple. So let me ask you this:

*) Are you sure that putting a variable length payload into the slru
is going to complicate things that badly in terms of implementing this
feature? If so, how?

*) Wouldn't you agree that variable length would actually benefit
'proper' (small) payloads by allowing more of them to fit in the slru
page?

*) 8k should be enough for anybody :-) ...so if a variable length
structure can be made why not max the payload length at blcksz-hdrsz
and call it a day (yes, I am aware that extending the structure will
reduce payload maximum length)? I think this should fit quite nicely
into the OP's approach and benefits both people who use small payloads
and large ones...(I DO think spanning pages is complex and probably
unnecessary)

merlin

#42Alvaro Herrera
alvherre@commandprompt.com
In reply to: Joachim Wieland (#1)
Re: Listen / Notify rewrite

Joachim Wieland wrote:

1. Instead of placing the queue into shared memory only I propose to create a
new subdirectory pg_notify/ and make the queue slru-based, such that we do not
risk blocking. Several people here have pointed out that blocking is a true
no-go for a new listen/notify implementation. With an slru-based queue we have
so much space that blocking is really unlikely even in periods with extreme
notify bursts.
Regarding performance, the slru-queue is not fsync-ed to disk so most activity
would be in the OS file cache memory anyway and most backends will probably
work on the same pages most of the time. However more locking overhead is
required in comparison to a shared-memory-only implementation.

Note, however, that pg_subtrans is also supposed to use "the same pages
from memory most of the time" and it still is a performance bottleneck
in some cases, and enlarging NUM_SUBTRANS_BUFFERS is a huge boon. I
think holding AsyncCtlLock in exclusive mode for every notify add (which
may involve I/O) is going to be a hard hit on scalability.

--
Alvaro Herrera http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.

#43Andrew Chernow
ac@esilo.com
In reply to: Greg Stark (#40)
Re: Listen / Notify rewrite

spill to disk and need an efficient storage mechanism. The natural
implementation of this in Postgres would be a table, not the slru. If

This is what I think the people's real problem is, the implementation becomes a
more complex when including payloads (larger ones even more so). I think its a
side-track to discuss queue vs condition variables. Whether a notify is 20
bytes through the network or 8192 bytes doesn't change its design or purpose,
only its size.

Calling this a creeping feature is quite a leap.

--
Andrew Chernow
eSilo, LLC
every bit counts
http://www.esilo.com/

#44Greg Stark
gsstark@mit.edu
In reply to: Andrew Chernow (#43)
Re: Listen / Notify rewrite

On Fri, Nov 13, 2009 at 1:47 PM, Andrew Chernow <ac@esilo.com> wrote:

This is what I think the people's real problem is, the implementation
becomes a more complex when including payloads (larger ones even more so).
 I think its a side-track to discuss queue vs condition variables.  Whether
a notify is 20 bytes through the network or 8192 bytes doesn't change its
design or purpose, only its size.

Calling this a creeping feature is quite a leap.

It's true that the real creep is having the payload at all rather than
not having it. But having fixed-size data also makes the
implementation much simpler as well.

One person described stuffing the payload with the primary key of the
record being invalidated. This means the requirements have just gone
from holding at most some small fixed number of records bounded by the
number of tables or other shared data structures to holding a large
number of records bounded only by the number of records in their
tables which is usually much much larger.

Now you're talking about making the payloads variable size, which
means you need to do free space management within shared pages to keep
track of how much space is free and available for reuse.

So we've gone from a simple hash table of fixed size entries
containing an oid or "name" datum where we expect the hash table to
fit in memory and a simple lru can handle old pages that aren't part
of the working set to something that's going to look a lot like a
database table -- it has to handle reusing space in collections of
variable size data and scale up to millions of entries.

And I note someone else in the thread was suggesting it needed ACID
properties which makes space reuse even more complex and will need
something like vacuum to implement it.

--
greg

#45Andrew Chernow
ac@esilo.com
In reply to: Greg Stark (#44)
Re: Listen / Notify rewrite

Calling this a creeping feature is quite a leap.

It's true that the real creep is having the payload at all rather than
not having it.

Not having the payload at all is like santa showing up without his bag
of toys. Instead, you have to drive/fly to the north pole where he just
came from to get them.

One person described stuffing the payload with the primary key of the
record being invalidated. This means the requirements have just gone
from holding at most some small fixed number of records bounded by the
number of tables or other shared data structures to holding a large
number of records bounded only by the number of records in their
tables which is usually much much larger.

Now you're talking about making the payloads variable size, which
means you need to do free space management within shared pages to keep
track of how much space is free and available for reuse.

So we've gone from a simple hash table of fixed size entries
containing an oid or "name" datum where we expect the hash table to
fit in memory and a simple lru can handle old pages that aren't part
of the working set to something that's going to look a lot like a
database table -- it has to handle reusing space in collections of
variable size data and scale up to millions of entries.

And I note someone else in the thread was suggesting it needed ACID
properties which makes space reuse even more complex and will need
something like vacuum to implement it.

I think the original OP was close. The structure can still be fixed
length but maybe we can bump it to 8k (BLCKSZ)?

--
Andrew Chernow
eSilo, LLC
every bit counts
http://www.esilo.com/

#46Merlin Moncure
mmoncure@gmail.com
In reply to: Andrew Chernow (#45)
Re: Listen / Notify rewrite

On Fri, Nov 13, 2009 at 10:00 AM, Andrew Chernow <ac@esilo.com> wrote:

I think the original OP was close.  The structure can still be fixed length
but maybe we can bump it to 8k (BLCKSZ)?

The problem with this (which I basically agree with) is that this will
greatly increase the size of the queue for all participants of this
feature if they use the payload or not. I think it boils down to
this: is there a reasonably effective way of making the payload
variable length (now or in the future)? If not, let's compromise and
maybe go with a larger size, maybe 256 or 512 bytes.

merlin

#47Tom Lane
tgl@sss.pgh.pa.us
In reply to: Merlin Moncure (#46)
Re: Listen / Notify rewrite

Merlin Moncure <mmoncure@gmail.com> writes:

The problem with this (which I basically agree with) is that this will
greatly increase the size of the queue for all participants of this
feature if they use the payload or not. I think it boils down to
this: is there a reasonably effective way of making the payload
variable length (now or in the future)? If not, let's compromise and
maybe go with a larger size, maybe 256 or 512 bytes.

Yeah, if the payload is not variable length then we are not going to be
able to make it more than a couple hundred bytes without taking a
significant performance hit. (By the way, has anyone yet tried to
compare the speed of this implementation to the old code?)

regards, tom lane

#48Andrew Dunstan
andrew@dunslane.net
In reply to: Merlin Moncure (#46)
Re: Listen / Notify rewrite

Merlin Moncure wrote:

On Fri, Nov 13, 2009 at 10:00 AM, Andrew Chernow <ac@esilo.com> wrote:

I think the original OP was close. The structure can still be fixed length
but maybe we can bump it to 8k (BLCKSZ)?

The problem with this (which I basically agree with) is that this will
greatly increase the size of the queue for all participants of this
feature if they use the payload or not. I think it boils down to
this: is there a reasonably effective way of making the payload
variable length (now or in the future)? If not, let's compromise and
maybe go with a larger size, maybe 256 or 512 bytes.

My original intention was to have the queue as a circular buffer where
the size of the entries was variable, but possibly bounded. Certainly
using fixed length records of large size seems somewhat wasteful.

But maybe that doesn't fit with what Joachim has done.

Incidentally, I'd like to thank Joachim personally for having done this
work, that I have been trying to get to for a couple of years, and that
circumstances kept conspiring to prevent me from doing. It's been a big
monkey on my back.

cheers

andrew

#49Greg Sabino Mullane
greg@turnstep.com
In reply to: Greg Stark (#40)
Re: Listen / Notify rewrite

-----BEGIN PGP SIGNED MESSAGE-----
Hash: RIPEMD160

This is BS. The problem is not that someone might do something stupid
with this feature. The problem is that we're making these other use
cases into requirements which will influence the design. This is a
classic "feature creep" situation and the result is normally products
which solve none of the use cases especially well.

Feature creep? The payload has been on the roadmap for a long time. I don't
recall anyone objecting when Andrew was working on the next version of
Listen/Notify around what is probably a couple of years ago now.

Remember this queue has to live in shared memory which is a fixed size
resource. If you're designing a queue mechanism then you would
naturally use something like a queue or priority queue.

Right, but we're not discussing a queue, we're discussing the listen/notify
system. If people want to mis-use it as a queue when they should be using
something else, so be it. Talk of efficiency also seems silly here - using
shared memory is already way more efficient than our current listen/notify
system.

- --
Greg Sabino Mullane greg@turnstep.com
PGP Key: 0x14964AC8 200911131234
http://biglumber.com/x/web?pk=2529DF6AB8F79407E94445B4BC9B906714964AC8
-----BEGIN PGP SIGNATURE-----

iEYEAREDAAYFAkr9mL0ACgkQvJuQZxSWSshkvACg6OQ/SRjkvmozzUogTX3weuio
4ZoAnRVfvcrdMmo+iKtkyXmhAlZqViqF
=6fzv
-----END PGP SIGNATURE-----

#50Tom Lane
tgl@sss.pgh.pa.us
In reply to: Greg Sabino Mullane (#49)
Re: Listen / Notify rewrite

"Greg Sabino Mullane" <greg@turnstep.com> writes:

Talk of efficiency also seems silly here - using
shared memory is already way more efficient than our current listen/notify
system.

Except that the proposed implementation spills to disk. Particularly if
it has to have support for large payloads, it could very well end up
being a lot SLOWER than what we have now.

regards, tom lane

#51Andrew Chernow
ac@esilo.com
In reply to: Tom Lane (#50)
Re: Listen / Notify rewrite

Tom Lane wrote:

"Greg Sabino Mullane" <greg@turnstep.com> writes:

Talk of efficiency also seems silly here - using
shared memory is already way more efficient than our current listen/notify
system.

Except that the proposed implementation spills to disk. Particularly if
it has to have support for large payloads, it could very well end up
being a lot SLOWER than what we have now.

True, but do you really consider it to be a common case that the notify
system gets soo bogged down that it starts to crawl? The problem would
be the collective size of notify structures + payloads and whether that
would fit in memory or not. This leads me to believe that the only
safety in smaller payloads is *possibly* a smaller chance of bogging it
down, but that all depends on the usage pattern of smaller vs. larger
payloads which is system specific.

--
Andrew Chernow
eSilo, LLC
every bit counts
http://www.esilo.com/

#52Andrew Chernow
ac@esilo.com
In reply to: Andrew Dunstan (#48)
Re: Listen / Notify rewrite

My original intention was to have the queue as a circular buffer where
the size of the entries was variable, but possibly bounded. Certainly
using fixed length records of large size seems somewhat wasteful.

Maybe we should do away with 'spill to disk' all together and either
hard-code an overflow behavior or make it a knob. Possible overflow
behaviors could be block until space is available, throw an error or
silently drop it.

Can the size of the shared memory segment for notifications be
configurable? That would allow those with large payloads or a huge
number of notifications to bump memory to avoid overflow cases. By
removing the disk and making shmem usage configurable, I think the
notify system would be flexible and could scale nicely.

Another added benefit is the payload limit can be much higher than
previously considered, because memory/performance concerns are now in
the hands of the DBA.

Incidentally, I'd like to thank Joachim personally for having done this
work,

+1

--
Andrew Chernow
eSilo, LLC
every bit counts
http://www.esilo.com/

#53James Mansion
james@mansionfamily.plus.com
In reply to: Josh Berkus (#35)
Re: Listen / Notify rewrite

Josh Berkus wrote:

Payloads are also quite useful even in a lossy environment, where you
understand that LISTEN is not a queue. For example, I'd like to be
using LISTEN/NOTIFY for cache invalidation for some applications; if it
misses a few, or double-counts them, it's not an issue. However, I'd

Not sure how missing a few can not be an issue for cache invalidation,
but never mind.

In the use-cases I've considered I've used a trigger to write a change
notification to a table
and a notify to indicate that notification record(s) have been changed.
The notifications
contain copies of the primary keys and the action so the cache processor
knows what's
changed and the notify is a a wakeup signal.

If this is in fact the most common use case, perhaps an alternative
approach would be
to automate it directly, so that writing the triggers (and using the
trigger processing engines)
would be unecessary, so:
- the queue definition can be automated with reference to the parent
table, by DDL stating
that one is required
- the notification 'name' is effectively the queue name and the
subscription says 'tell me
when a change note is placed in the queue'

Doing this in the database engine core allows a number of potential
optimisations:
- the mechanism does not require general trigger execution
- the queue does not have to be a real table, and can have custom
semantics: it may not
actually be necessary to store copies of the primary key data if it
can refer to the rows
so the data can be retrieved as the queue is consumed
- if there are no subscribers to the queue then the insertion to it can
be elided
- if the server crashes, connected consumers should assume caches are
invalid and
theer is no ACID requirement for the queue data
- if the queue fills then slow consumer(s) can be dropped and can
receive a data loss
indicator

like to be able to send message like players_updated|45321 where 45321
is the ID of the player updated.

Indeed.

Just a thought, anyway. (FWIW I was initially concerned about the lack
of payload, but
with any sort of lossy compression I figured it wasn't, actually, and I
needed a notification
queue)

James

#54Merlin Moncure
mmoncure@gmail.com
In reply to: Tom Lane (#47)
Re: Listen / Notify rewrite

On Fri, Nov 13, 2009 at 11:08 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
 (By the way, has anyone yet tried to

compare the speed of this implementation to the old code?)

I quickly hacked pgbench to take a custom script on connection (for
listen), and make pgbench'd 'notify x'; with all clients doing 'listen
x'.

The old method (measured on a 4 core high performance server) has
severe scaling issues due to table bloat (we knew that):
./pgbench -c 10 -t 1000 -n -b listen.sql -f notify.sql
run #1 tps = 1364.948079 (including connections establishing)
run #2 tps = 573.988495 (including connections establishing)
<vac full pg_listener>
./pgbench -c 50 -t 200 -n -b listen.sql -f notify.sql
tps = 844.033498 (including connections establishing)

new method on my dual core workstation (max payload 128):
./pgbench -c 10 -t 10000 -n -b listen.sql -f notify.sql -hlocalhost postgres
tps = 16343.012373 (including connections establishing)
./pgbench -c 20 -t 5000 -n -b listen.sql -f notify.sql -hlocalhost postgres
tps = 7642.104941 (including connections establishing)
./pgbench -c 50 -t 5000 -n -b listen.sql -f notify.sql -hlocalhost postgres
tps = 3184.049268 (including connections establishing)

max payload 2048:
./pgbench -c 10 -t 10000 -n -b listen.sql -f notify.sql -hlocalhost postgres
tps = 12062.906610 (including connections establishing)
./pgbench -c 20 -t 5000 -n -b listen.sql -f notify.sql -hlocalhost postgres
tps = 7229.505869 (including connections establishing)
./pgbench -c 50 -t 5000 -n -b listen.sql -f notify.sql -hlocalhost postgres
tps = 3219.511372 (including connections establishing)

getting sporadic 'LOG: could not send data to client: Broken pipe'
throughout the test.

merlin

#55Alex
alex323@gmail.com
In reply to: Andrew Chernow (#26)
Re: Listen / Notify rewrite

On Thu, 12 Nov 2009 11:22:32 -0500
Andrew Chernow <ac@esilo.com> wrote:

However I share Greg's concerns that people are trying to use NOTIFY
as a message queue which it is not designed to be.

When you have an established libpq connection waiting for notifies it
is not unreasonable to expect/desire a payload. ISTM, the problem is
that the initial design was half-baked. NOTIFY is event-driven, ie.
no polling!

I agree. Wouldn't it make sense to allow the user to pass libpq a
callback function which is executed when NOTIFY events happen? Currently
we are forced to poll the connection, which means that we'll be checking
for a NOTIFY every time we have new data.

That just doesn't make sense.

--
Alex

#56Simon Riggs
simon@2ndQuadrant.com
In reply to: Joachim Wieland (#1)
Re: Listen / Notify rewrite

On Wed, 2009-11-11 at 22:25 +0100, Joachim Wieland wrote:

3. Every distinct notification is delivered.

Regarding performance, the slru-queue is not fsync-ed to disk

These two statements seem to be in opposition. How do you know a
notification will be delivered if the queue is non-recoverable? Surely
the idea is to send information externally to the database, so why
should that stream of info be altered depending upon whether the
database crashes? You couldn't use it to reliably update an external
cache for example.

Why do we need this as well as PgQ? For me, I would need a good reason
why this shouldn't be implemented using a normal table, plus bells and
whistles. If normal tables don't do what you need, perhaps that's a
place to add value.

--
Simon Riggs www.2ndQuadrant.com

#57Tom Lane
tgl@sss.pgh.pa.us
In reply to: Simon Riggs (#56)
Re: Listen / Notify rewrite

Simon Riggs <simon@2ndQuadrant.com> writes:

On Wed, 2009-11-11 at 22:25 +0100, Joachim Wieland wrote:

3. Every distinct notification is delivered.
Regarding performance, the slru-queue is not fsync-ed to disk

These two statements seem to be in opposition. How do you know a
notification will be delivered if the queue is non-recoverable?

You misunderstand the requirements. LISTEN notifications are *not*
meant to survive a database crash, and never have been. However,
so long as both client and server stay up, they must be reliable.
If the client has to poll database state because it might have
missed a notification, the feature is just a waste of time.

regards, tom lane

#58Simon Riggs
simon@2ndQuadrant.com
In reply to: Tom Lane (#57)
Re: Listen / Notify rewrite

On Sun, 2009-11-15 at 16:48 -0500, Tom Lane wrote:

Simon Riggs <simon@2ndQuadrant.com> writes:

On Wed, 2009-11-11 at 22:25 +0100, Joachim Wieland wrote:

3. Every distinct notification is delivered.
Regarding performance, the slru-queue is not fsync-ed to disk

These two statements seem to be in opposition. How do you know a
notification will be delivered if the queue is non-recoverable?

You misunderstand the requirements. LISTEN notifications are *not*
meant to survive a database crash, and never have been. However,
so long as both client and server stay up, they must be reliable.
If the client has to poll database state because it might have
missed a notification, the feature is just a waste of time.

Why would it be so important for messages to be reliable if the database
is up, yet its OK to lose messages if it crashes? The application must
still allow for the case that messages are lost.

--
Simon Riggs www.2ndQuadrant.com

#59Tom Lane
tgl@sss.pgh.pa.us
In reply to: Simon Riggs (#58)
Re: Listen / Notify rewrite

Simon Riggs <simon@2ndQuadrant.com> writes:

On Sun, 2009-11-15 at 16:48 -0500, Tom Lane wrote:

You misunderstand the requirements. LISTEN notifications are *not*
meant to survive a database crash, and never have been. However,
so long as both client and server stay up, they must be reliable.
If the client has to poll database state because it might have
missed a notification, the feature is just a waste of time.

Why would it be so important for messages to be reliable if the database
is up, yet its OK to lose messages if it crashes? The application must
still allow for the case that messages are lost.

No, that's the point. The design center for LISTEN is that you have a
client that needs to respond to changes in the DB state. When it first
connects it will issue LISTEN and then (order is important) it will
examine the current state of the database. After that it can just wait
for NOTIFY to tell it that something interesting has happened. If it
crashes, or sees a disconnect indicating that the server has crashed,
it goes back to the startup point. No problem. But if it can't be sure
that it will get a NOTIFY every time something happens to the DB state,
then it has to do active polling of the state instead, and the NOTIFY
feature is really worthless to it.

This is an entirely useful and reliable feature within these parameters
--- the first application I ever wrote using PG relied on NOTIFY to work
this way.  (In fact it wouldn't be overstating the case to say that
I wouldn't be a PG hacker today if it weren't for LISTEN/NOTIFY.)

regards, tom lane

#60Greg Sabino Mullane
greg@turnstep.com
In reply to: Simon Riggs (#58)
Re: Listen / Notify rewrite

-----BEGIN PGP SIGNED MESSAGE-----
Hash: RIPEMD160

You misunderstand the requirements. LISTEN notifications are *not*
meant to survive a database crash, and never have been. However,
so long as both client and server stay up, they must be reliable.
If the client has to poll database state because it might have
missed a notification, the feature is just a waste of time.

Why would it be so important for messages to be reliable if
the database is up, yet its OK to lose messages if it crashes? The
application must still allow for the case that messages are lost.

Well, there are many use cases. For example, Bucardo uses notifications
to let it know that a table has changed. If the database crashes,
Bucardo is going to restart - as part of its startup routine, it checks
all tables manually for changes, eliminating the need for the NOTIFYs
to survive the crash.

- --
Greg Sabino Mullane greg@turnstep.com
End Point Corporation
PGP Key: 0x14964AC8 200911160910
http://biglumber.com/x/web?pk=2529DF6AB8F79407E94445B4BC9B906714964AC8
-----BEGIN PGP SIGNATURE-----

iEYEAREDAAYFAksBXkAACgkQvJuQZxSWSsjEWACePcT+65HQ0dvx52PjjTkdMzVS
ELMAnAhR3Ll016/EwPdizzS5BcsuXaw9
=jds6
-----END PGP SIGNATURE-----

#61Joachim Wieland
joe@mcknight.de
In reply to: Merlin Moncure (#54)
Re: Listen / Notify rewrite

On Sat, Nov 14, 2009 at 11:06 PM, Merlin Moncure <mmoncure@gmail.com> wrote:

The old method (measured on a 4 core high performance server) has
severe scaling issues due to table bloat (we knew that):
./pgbench -c 10 -t 1000 -n -b listen.sql -f notify.sql
run #1 tps = 1364.948079 (including connections establishing)

new method on my dual core workstation (max payload 128):
./pgbench -c 10 -t 10000 -n -b listen.sql -f notify.sql -hlocalhost postgres
tps = 16343.012373 (including connections establishing)

That looks fine and is similar to my tests where I also see a
performance increase of about 10x, and unlike pg_listener it is
constant.

getting sporadic 'LOG:  could not send data to client: Broken pipe'
throughout the test.

This looks like the server is trying to send a notification down to
the client but the client has already terminated the connection...

Joachim

#62Merlin Moncure
mmoncure@gmail.com
In reply to: Joachim Wieland (#61)
Re: Listen / Notify rewrite

On Mon, Nov 16, 2009 at 4:41 PM, Joachim Wieland <joe@mcknight.de> wrote:

On Sat, Nov 14, 2009 at 11:06 PM, Merlin Moncure <mmoncure@gmail.com> wrote:

The old method (measured on a 4 core high performance server) has
severe scaling issues due to table bloat (we knew that):
./pgbench -c 10 -t 1000 -n -b listen.sql -f notify.sql
run #1 tps = 1364.948079 (including connections establishing)

new method on my dual core workstation (max payload 128):
./pgbench -c 10 -t 10000 -n -b listen.sql -f notify.sql -hlocalhost postgres
tps = 16343.012373 (including connections establishing)

That looks fine and is similar to my tests where I also see a
performance increase of about 10x, and unlike pg_listener it is
constant.

old method scaled (badly) on volume of notifications and your stuff
seems to scale based on # of client's sending simultaneous
notifications. Well, you're better all day long, but it shows that
your fears regarding locking were not completely unfounded. Do the
Burcardo people have any insights on the #of simultaneous notifies are
generated from different backends?

merlin

#63Greg Sabino Mullane
greg@turnstep.com
In reply to: Merlin Moncure (#62)
Re: Listen / Notify rewrite

-----BEGIN PGP SIGNED MESSAGE-----
Hash: RIPEMD160

old method scaled (badly) on volume of notifications and your stuff
seems to scale based on # of client's sending simultaneous
notifications. Well, you're better all day long, but it shows that
your fears regarding locking were not completely unfounded. Do the
Burcardo people have any insights on the #of simultaneous notifies are
generated from different backends?

Very low. On a busy system I know of there are about 90 entries in
pg_listener, and I would guess that the maximum rate of simulataneous
notifies is not more than 3 per second, tops. If someone knows an
easy way to measure such a thing and is really curious, I can see about
getting better numbers.

- --
Greg Sabino Mullane greg@turnstep.com
End Point Corporation
PGP Key: 0x14964AC8 200911162127
http://biglumber.com/x/web?pk=2529DF6AB8F79407E94445B4BC9B906714964AC8
-----BEGIN PGP SIGNATURE-----

iEYEAREDAAYFAksCCjsACgkQvJuQZxSWSsgTogCfS5Xg8N2JhsUpi2r96IbxX+Tm
pMsAnAktBVkEblzx6Ux/netXkP9u4AVG
=SO/k
-----END PGP SIGNATURE-----