Anyone working on asynchronous NOTIFY reception?
Hi folks,
Hope I'm not making a fool of myself by posting to a list I just
joined ... but I couldn't find much about this in the list archives.
I'm looking at an application that involves several client processes
communicating in real time via a pgsql database. "Real time" means
that when one client writes something, any other clients that are
interested need to know about it within a few seconds at most.
The other clients can use LISTEN/NOTIFY to detect updates --- but
I don't think I can accept the notion of continuously doing empty
queries to receive the notifies. That'll drive performance into the
ground. What I want is for a client to be able to sleep until
something interesting happens.
As near as I can tell from backend/commands/async.c, notify messages
actually are sent out to the frontends asynchronously, as soon as
possible (either immediately or at the end of the current transaction).
The problem is simply that libpq is designed in such a way that it can't
read in the notify message except while processing a new query.
I am thinking about revising libpq so that it doesn't force synchronous
reading, but can be called from an application's main loop whenever the
backend connection is ready for reading according to select(). This
would seem to be a major win for Tcl and other environments, as well as
for my problem: an app waiting for a server response would not have to
be dead to the rest of the world.
Is this a correct description of the situation? Has anyone already
started to work on this issue? If not, would someone who knows the
code be willing to give me guidance? I'm entirely new to Postgres
and am likely to make some dumb choices without advice...
regards, tom lane
Hi folks,
Hope I'm not making a fool of myself by posting to a list I just
joined ... but I couldn't find much about this in the list archives.I'm looking at an application that involves several client processes
communicating in real time via a pgsql database. "Real time" means
that when one client writes something, any other clients that are
interested need to know about it within a few seconds at most.
The other clients can use LISTEN/NOTIFY to detect updates --- but
I don't think I can accept the notion of continuously doing empty
queries to receive the notifies. That'll drive performance into the
ground. What I want is for a client to be able to sleep until
something interesting happens.
The person who knows the most about this is:
dz@cs.unitn.it (Massimo Dal Zotto)
--
Bruce Momjian | 830 Blythe Avenue
maillist@candle.pha.pa.us | Drexel Hill, Pennsylvania 19026
+ If your life is a hard drive, | (610) 353-9879(w)
+ Christ can be your backup. | (610) 853-3000(h)
I don't know how many clients you're running, but sending an empty
query every second or three isn't too much of a performance hit (I
know it still sucks though)..
could you cc copies of mail to the list? I'm sure we're very
interested..
On Wed, 15 April 1998, at 16:57:43, Bruce Momjian wrote:
Show quoted text
I'm looking at an application that involves several client processes
communicating in real time via a pgsql database. "Real time" means
that when one client writes something, any other clients that are
interested need to know about it within a few seconds at most.
The other clients can use LISTEN/NOTIFY to detect updates --- but
I don't think I can accept the notion of continuously doing empty
queries to receive the notifies. That'll drive performance into the
ground. What I want is for a client to be able to sleep until
something interesting happens.The person who knows the most about this is:
dz@cs.unitn.it (Massimo Dal Zotto)
-- Bruce Momjian | 830 Blythe Avenue maillist@candle.pha.pa.us | Drexel Hill, Pennsylvania 19026 + If your life is a hard drive, | (610) 353-9879(w) + Christ can be your backup. | (610) 853-3000(h)
Brett McCormick <brett@work.chicken.org> writes:
I don't know how many clients you're running, but sending an empty
query every second or three isn't too much of a performance hit (I
know it still sucks though)..
Well, raw performance is only part of it. Some of the clients will
be accessing the database server across interstate dial-on-demand
ISDN links. Popping up the link for a minute whenever something
happens (which is likely to be only a few times a day) is cool.
Nailing it up 24x7 to pass a steady flow of empty queries is not cool.
could you cc copies of mail to the list? I'm sure we're very
interested..
Sure, I'll keep you posted. If anything comes of this I'll be
submitting the mods, of course.
regards, tom lane
Import Notes
Reply to msg id not found: YourmessageofWed15Apr1998142844-070013621.9792.56866.280748@abraxas.scene.com | Resolved by subject fallback
Mattias Kregert <matti@algonet.se> writes:
Async communication between backend and frontend would be really nice.
I use Tcl a lot and I really miss this. It would be wonderful to have
libpgtcl do callbacks, so that info on-screen could be automagically
updated whenever something changes.
Yes, if anything were to be done along this line it'd also make sense
to revise libpgtcl. I think it ought to work more like this:
(a) the idle loop is invoked while waiting for a query response
(so that a pg_exec statement behaves sort of like "tkwait");
(b) a "listen" command is sent via a new pg_listen statement that
specifies a callback command string. Subsequent notify responses
can occur whenever a callback is possible.
I suppose (a) had better be an option to pg_exec statements so that
we don't break existing Tcl code...
regards, tom lane
Import Notes
Reply to msg id not found: YourmessageofThu16Apr1998024229+0200353553F5.AD492EC3@algonet.se | Resolved by subject fallback
Mattias Kregert <matti@algonet.se> writes:
Async communication between backend and frontend would be really nice.
I use Tcl a lot and I really miss this. It would be wonderful to have
libpgtcl do callbacks, so that info on-screen could be automagically
updated whenever something changes.Yes, if anything were to be done along this line it'd also make sense
to revise libpgtcl. I think it ought to work more like this:
(a) the idle loop is invoked while waiting for a query response
(so that a pg_exec statement behaves sort of like "tkwait");
(b) a "listen" command is sent via a new pg_listen statement that
specifies a callback command string. Subsequent notify responses
can occur whenever a callback is possible.
I suppose (a) had better be an option to pg_exec statements so that
we don't break existing Tcl code...regards, tom lane
There is already some support for async notify in libpgtcl, it was and old
patch of mine. It does exactly what you are thinking of, except for the
idle loop stuff. You can setup callbacks for specific relations and then
periodically issue a command which checks for pending notifications and does
the callbacks if any is found. Note also that you can listen on any name,
not just for existing relations.
I have a Tcl/Tk application (used concurrently by more than 30 users)
which uses heavily async notifications to notify clients when some events
occur. It uses a timer inside the application which polls the server every
n seconds (actually 1 second) for pending notifies.
It works well but it is a really big bottleneck for the application and I
had to spend a lot of time to debug and patch the code in async.c.
It seems that nobody beside me has ever used this feature because I didn't
see any bug report on the mailing lists (and there were many).
The biggest problem is that if you have many clients listening on the same
thing they are signaled at the same time and all of them try to access the
pg_listener table for write. The result is that you have a lot of waits on
the table and sometimes also deadlocks if you don't do things carefully.
From the Tcl side, a better solution would be to define a tcl event handler,
like the standard Tcl filehandler, which would be invoked automatically by
the Tk event loop or by tkwait if using pure Tcl.
I have also some new patches which try to reduce the notify overhead by
avoiding unnecessary unlocks of the table. If you are interested I can
post them.
--
Massimo Dal Zotto
+----------------------------------------------------------------------+
| Massimo Dal Zotto e-mail: dz@cs.unitn.it |
| Via Marconi, 141 phone: ++39-461-534251 |
| 38057 Pergine Valsugana (TN) www: http://www.cs.unitn.it/~dz/ |
| Italy pgp: finger dz@tango.cs.unitn.it |
+----------------------------------------------------------------------+
The biggest problem is that if you have many clients listening on the same
thing they are signaled at the same time and all of them try to access the
pg_listener table for write. The result is that you have a lot of waits on
the table and sometimes also deadlocks if you don't do things carefully.
Right, I recall seeing some things about that in the mailing list
archives (from you, no doubt?). I had the impression that async.c
had been changed to handle this better as of the current release.
Is there still a problem?
(Fortunately, I don't expect a *lot* of clients waiting on the same
table, but deadlock would still be very bad news...)
From the Tcl side, a better solution would be to define a tcl event handler,
like the standard Tcl filehandler, which would be invoked automatically by
the Tk event loop or by tkwait if using pure Tcl.
I agree.
I don't have an immediate need for Tcl-based clients, so I was just
going to revise libpg and libpg++. Do you want to redo libpgtcl?
I'd probably get to that eventually, but splitting the work sounds
better :-).
I'll post something later today about what the extensions to the
libpg API should look like.
I have also some new patches which try to reduce the notify overhead by
avoiding unnecessary unlocks of the table. If you are interested I can
post them.
Please do.
regards, tom lane
Import Notes
Reply to msg id not found: YourmessageofFri17Apr1998122327+0200199804171023.MAA01116@pennac.cs.unitn.it | Resolved by subject fallback
The biggest problem is that if you have many clients listening on the same
thing they are signaled at the same time and all of them try to access the
pg_listener table for write. The result is that you have a lot of waits on
the table and sometimes also deadlocks if you don't do things carefully.Right, I recall seeing some things about that in the mailing list
archives (from you, no doubt?). I had the impression that async.c
had been changed to handle this better as of the current release.
Is there still a problem?(Fortunately, I don't expect a *lot* of clients waiting on the same
table, but deadlock would still be very bad news...)From the Tcl side, a better solution would be to define a tcl event handler,
like the standard Tcl filehandler, which would be invoked automatically by
the Tk event loop or by tkwait if using pure Tcl.I agree.
I don't have an immediate need for Tcl-based clients, so I was just
going to revise libpg and libpg++. Do you want to redo libpgtcl?
I'd probably get to that eventually, but splitting the work sounds
better :-).
Not now, I am too busy.
I'll post something later today about what the extensions to the
libpg API should look like.I have also some new patches which try to reduce the notify overhead by
avoiding unnecessary unlocks of the table. If you are interested I can
post them.Please do.
regards, tom lane
This is the patch against 6.2.1p7. I haven't the the time to port it to 6.3.1.
The idea is to notify the backends while we have a write lock on the table
before doing the first CommitTransactionCommand. Otherwise if we must also
notify our frontend we almost certainly get the lock again only after all the
other backends have processed the notify and this may take a lot of time.
Note however that there is a little problem by releasing the lock before the
end of transaction: you may get duplicate records in pg_listener if more
backends are notifying the same relation at the same time. I don't know why
this happens and hadn't time to investigate, so I wrote a quick hack in
Async_NotifyFrontEnd_Aux() to avoid the problem (search for "notifyHack").
This is what I found in my pg_listener:
mytable | 627| 0
mytable | 627| 0
mytable | 627| 0
And this is the patch for 6.2.1p7:
*** async.c.orig Tue Jan 27 17:06:42 1998
--- async.c Thu Mar 19 01:09:49 1998
***************
*** 22,30 ****
* notification (we are notifying something that we are listening),
* signal the corresponding frontend over the comm channel using the
* out-of-band channel.
! * 2.b For all other listening processes, we send kill(2) to wake up
! * the listening backend.
! * 3. Upon receiving a kill(2) signal from another backend process notifying
* that one of the relation that we are listening is being notified,
* we can be in either of two following states:
* 3.a We are sleeping, wake up and signal our frontend.
--- 22,30 ----
* notification (we are notifying something that we are listening),
* signal the corresponding frontend over the comm channel using the
* out-of-band channel.
! * 2.b For all other listening processes, we send a SIGUSR2 signal
! * to wake up the listening backend.
! * 3. Upon receiving a SIGUSR2 signal from another backend process notifying
* that one of the relation that we are listening is being notified,
* we can be in either of two following states:
* 3.a We are sleeping, wake up and signal our frontend.
***************
*** 85,99 ****
#include <port-protos.h> /* for strdup() */
#include <storage/lmgr.h>
static int notifyFrontEndPending = 0;
static int notifyIssued = 0;
static Dllist *pendingNotifies = NULL;
-
static int AsyncExistsPendingNotify(char *);
static void ClearPendingNotify(void);
static void Async_NotifyFrontEnd(void);
void Async_Unlisten(char *relname, int pid);
static void Async_UnlistenOnExit(int code, char *relname);
--- 85,105 ----
#include <port-protos.h> /* for strdup() */
#include <storage/lmgr.h>
+ #include <utils/trace.h>
+
+ #define notifyUnlock pg_options[OPT_NOTIFYUNLOCK]
+ #define notifyHack pg_options[OPT_NOTIFYHACK]
+
+ GlobalMemory notifyContext = NULL;
static int notifyFrontEndPending = 0;
static int notifyIssued = 0;
static Dllist *pendingNotifies = NULL;
static int AsyncExistsPendingNotify(char *);
static void ClearPendingNotify(void);
static void Async_NotifyFrontEnd(void);
+ static void Async_NotifyFrontEnd_Aux(void);
void Async_Unlisten(char *relname, int pid);
static void Async_UnlistenOnExit(int code, char *relname);
***************
*** 121,145 ****
{
extern TransactionState CurrentTransactionState;
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
(CurrentTransactionState->blockState == TRANS_DEFAULT))
{
!
! #ifdef ASYNC_DEBUG
! elog(DEBUG, "Waking up sleeping backend process");
! #endif
Async_NotifyFrontEnd();
-
}
else
{
! #ifdef ASYNC_DEBUG
! elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
! CurrentTransactionState->state,
! CurrentTransactionState->blockState);
! #endif
notifyFrontEndPending = 1;
}
}
/*
--- 127,152 ----
{
extern TransactionState CurrentTransactionState;
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler");
+
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
(CurrentTransactionState->blockState == TRANS_DEFAULT))
{
! TPRINTF(TRACE_NOTIFY, "Waking up sleeping backend process");
Async_NotifyFrontEnd();
}
else
{
! TPRINTF(TRACE_NOTIFY,
! "Process is in the middle of another transaction, "
! "state = %d, block state = %d",
! CurrentTransactionState->state,
! CurrentTransactionState->blockState);
notifyFrontEndPending = 1;
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: notify frontend pending");
}
+
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler done");
}
/*
***************
*** 184,192 ****
char *notifyName;
! #ifdef ASYNC_DEBUG
! elog(DEBUG, "Async_Notify: %s", relname);
! #endif
if (!pendingNotifies)
pendingNotifies = DLNewList();
--- 191,197 ----
char *notifyName;
! TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);
if (!pendingNotifies)
pendingNotifies = DLNewList();
***************
*** 224,234 ****
heap_replace(lRel, &lTuple->t_ctid, rTuple);
}
ReleaseBuffer(b);
}
heap_endscan(sRel);
! RelationUnsetLockForWrite(lRel);
heap_close(lRel);
! notifyIssued = 1;
}
/*
--- 229,249 ----
heap_replace(lRel, &lTuple->t_ctid, rTuple);
}
ReleaseBuffer(b);
+ notifyIssued = 1;
}
heap_endscan(sRel);
!
! /*
! * Note: if we unset the lock or we could get multiple tuples
! * with same oid if other backends notify the same relation.
! */
! if (notifyUnlock) {
! RelationUnsetLockForWrite(lRel);
! }
!
heap_close(lRel);
!
! TPRINTF(TRACE_NOTIFY, "Async_Notify: done %s", relname);
}
/*
***************
*** 278,286 ****
{ /* 'notify <relname>' issued by us */
notifyIssued = 0;
StartTransactionCommand();
! #ifdef ASYNC_DEBUG
! elog(DEBUG, "Async_NotifyAtCommit.");
! #endif
ScanKeyEntryInitialize(&key, 0,
Anum_pg_listener_notify,
Integer32EqualRegProcedure,
--- 293,299 ----
{ /* 'notify <relname>' issued by us */
notifyIssued = 0;
StartTransactionCommand();
! TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit");
ScanKeyEntryInitialize(&key, 0,
Anum_pg_listener_notify,
Integer32EqualRegProcedure,
***************
*** 303,318 ****
if (ourpid == DatumGetInt32(d))
{
- #ifdef ASYNC_DEBUG
- elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
- #endif
notifyFrontEndPending = 1;
}
else
{
! #ifdef ASYNC_DEBUG
! elog(DEBUG, "Notifying others");
! #endif
#ifdef HAVE_KILL
if (kill(DatumGetInt32(d), SIGUSR2) < 0)
{
--- 316,330 ----
if (ourpid == DatumGetInt32(d))
{
notifyFrontEndPending = 1;
+ TPRINTF(TRACE_NOTIFY,
+ "Async_NotifyAtCommit notifying self");
}
else
{
! TPRINTF(TRACE_NOTIFY,
! "Async_NotifyAtCommit notifying %d",
! DatumGetInt32(d));
#ifdef HAVE_KILL
if (kill(DatumGetInt32(d), SIGUSR2) < 0)
{
***************
*** 327,344 ****
ReleaseBuffer(b);
}
heap_endscan(sRel);
- RelationUnsetLockForWrite(lRel);
heap_close(lRel);
-
- CommitTransactionCommand();
ClearPendingNotify();
- }
! if (notifyFrontEndPending)
! { /* we need to notify the frontend of all
! * pending notifies. */
! notifyFrontEndPending = 1;
! Async_NotifyFrontEnd();
}
}
}
--- 339,361 ----
ReleaseBuffer(b);
}
heap_endscan(sRel);
heap_close(lRel);
ClearPendingNotify();
! if (notifyFrontEndPending)
! {
! /* Notify the frontend inside the current transaction! */
! Async_NotifyFrontEnd_Aux();
! }
!
! TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit done");
! CommitTransactionCommand();
! } else {
! /* Notify the frontend of pending notifies from other backends. */
! if (notifyFrontEndPending)
! {
! Async_NotifyFrontEnd();
! }
}
}
}
***************
*** 422,430 ****
char *relnamei;
TupleDesc tupDesc;
! #ifdef ASYNC_DEBUG
! elog(DEBUG, "Async_Listen: %s", relname);
! #endif
for (i = 0; i < Natts_pg_listener; i++)
{
nulls[i] = ' ';
--- 439,445 ----
char *relnamei;
TupleDesc tupDesc;
! TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);
for (i = 0; i < Natts_pg_listener; i++)
{
nulls[i] = ' ';
***************
*** 457,462 ****
--- 472,480 ----
}
}
ReleaseBuffer(b);
+ if (alreadyListener) {
+ break;
+ }
}
heap_endscan(s);
***************
*** 464,485 ****
{
elog(NOTICE, "Async_Listen: We are already listening on %s",
relname);
return;
}
tupDesc = lDesc->rd_att;
! tup = heap_formtuple(tupDesc,
! values,
! nulls);
heap_insert(lDesc, tup);
-
pfree(tup);
- /*
- * if (alreadyListener) { elog(NOTICE,"Async_Listen: already one
- * listener on %s (possibly dead)",relname); }
- */
-
RelationUnsetLockForWrite(lDesc);
heap_close(lDesc);
--- 482,497 ----
{
elog(NOTICE, "Async_Listen: We are already listening on %s",
relname);
+ RelationUnsetLockForWrite(lDesc);
+ heap_close(lDesc);
return;
}
tupDesc = lDesc->rd_att;
! tup = heap_formtuple(tupDesc, values, nulls);
heap_insert(lDesc, tup);
pfree(tup);
RelationUnsetLockForWrite(lDesc);
heap_close(lDesc);
***************
*** 519,534 ****
lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
Int32GetDatum(pid),
0, 0);
- lDesc = heap_openr(ListenerRelationName);
- RelationSetLockForWrite(lDesc);
-
if (lTuple != NULL)
{
heap_delete(lDesc, &lTuple->t_ctid);
- }
! RelationUnsetLockForWrite(lDesc);
! heap_close(lDesc);
}
static void
--- 531,545 ----
lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
Int32GetDatum(pid),
0, 0);
if (lTuple != NULL)
{
+ lDesc = heap_openr(ListenerRelationName);
+ RelationSetLockForWrite(lDesc);
heap_delete(lDesc, &lTuple->t_ctid);
! RelationUnsetLockForWrite(lDesc);
! heap_close(lDesc);
! }
}
static void
***************
*** 560,570 ****
*
* --------------------------------------------------------------
*/
- GlobalMemory notifyContext = NULL;
-
static void
Async_NotifyFrontEnd()
{
extern CommandDest whereToSendOutput;
HeapTuple lTuple,
rTuple;
--- 571,595 ----
*
* --------------------------------------------------------------
*/
static void
Async_NotifyFrontEnd()
{
+ StartTransactionCommand();
+ Async_NotifyFrontEnd_Aux();
+ CommitTransactionCommand();
+ }
+
+ /*
+ * --------------------------------------------------------------
+ * Async_NotifyFrontEnd_Aux --
+ *
+ * Like Async_NotifyFrontEnd but MUST be called inside a transaction.
+ *
+ * --------------------------------------------------------------
+ */
+ static void
+ Async_NotifyFrontEnd_Aux()
+ {
extern CommandDest whereToSendOutput;
HeapTuple lTuple,
rTuple;
***************
*** 580,592 ****
int ourpid;
bool isnull;
! notifyFrontEndPending = 0;
! #ifdef ASYNC_DEBUG
! elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
! #endif
! StartTransactionCommand();
ourpid = getpid();
ScanKeyEntryInitialize(&key[0], 0,
Anum_pg_listener_notify,
--- 605,616 ----
int ourpid;
bool isnull;
! char *hack[32];
! int i, hack_count = 0;
! notifyFrontEndPending = 0;
! TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd");
ourpid = getpid();
ScanKeyEntryInitialize(&key[0], 0,
Anum_pg_listener_notify,
***************
*** 611,620 ****
--- 635,664 ----
{
d = heap_getattr(lTuple, b, Anum_pg_listener_relname,
tdesc, &isnull);
+
+ /* Hack to delete duplicate tuples (possible if notifyUnlock is set) */
+ if (notifyHack) {
+ for (i=0; i<hack_count; i++) {
+ if (strcmp(DatumGetName(d)->data, hack[i]) == 0) {
+ TPRINTF(TRACE_NOTIFY,
+ "Async_NotifyFrontEnd duplicate %s",
+ DatumGetName(d)->data);
+ heap_delete(lRel, &lTuple->t_ctid);
+ goto release_buffer;
+ }
+ }
+ if (hack_count < 32) {
+ hack[hack_count++] = pstrdup(DatumGetName(d)->data);
+ }
+ }
+
rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
heap_replace(lRel, &lTuple->t_ctid, rTuple);
/* notifying the front end */
+ TPRINTF(TRACE_NOTIFY,
+ "Async_NotifyFrontEnd notifying %s",
+ DatumGetName(d)->data);
if (whereToSendOutput == Remote)
{
***************
*** 625,635 ****
}
else
{
! elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
}
ReleaseBuffer(b);
}
! CommitTransactionCommand();
}
static int
--- 669,686 ----
}
else
{
! elog(NOTICE,
! "Async_NotifyFrontEnd: no asynchronous notification "
! "to frontend on interactive sessions");
}
+
+ release_buffer:
ReleaseBuffer(b);
}
! heap_endscan(sRel);
! heap_close(lRel);
! RelationUnsetLockForWrite(lRel);
! TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd done");
}
static int
Massimo Dal Zotto
+----------------------------------------------------------------------+
| Massimo Dal Zotto e-mail: dz@cs.unitn.it |
| Via Marconi, 141 phone: ++39-461-534251 |
| 38057 Pergine Valsugana (TN) www: http://www.cs.unitn.it/~dz/ |
| Italy pgp: finger dz@tango.cs.unitn.it |
+----------------------------------------------------------------------+