proposal: make NOTIFY list de-duplication optional
- new GUC in "Statement Behaviour" section, notify_duplicate_removal
(default true)
Initial discussion in this thread:
/messages/by-id/CAP_rwwmpzk9=SbjRZTOd05bDctyC43wNKnu_m37dYGvL4SAeSw@mail.gmail.com
Rationale: for some legitimate use cases, duplicate removal is not
required, and it gets O(N^2) cost on large COPY/ insert transactions.
Attachments:
postgres-async-notify-duplicate-removal-guc.patchtext/x-patch; charset=US-ASCII; name=postgres-async-notify-duplicate-removal-guc.patchDownload
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 392eb70..9fb5504 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -6095,6 +6095,24 @@ SET XML OPTION { DOCUMENT | CONTENT };
</listitem>
</varlistentry>
+ <varlistentry id="guc-notify-duplicate-removal" xreflabel="notify_duplicate_removal">
+ <term><varname>notify_duplicate_removal</varname> (<type>bool</type>)
+ <indexterm>
+ <primary><varname>notify_duplicate_removal</> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Try to remove duplicate messages while processing NOTIFY. When
+ <literal>on</> (the default), the server will avoid duplicate messages
+ (with same channel and payload). Setting this variable to
+ <literal>off</> can increase performance in some situations - at the
+ cost of having duplicate messages in NOTIFY queue. See <xref
+ linkend="SQL-NOTIFY"> for more information.
+ </para>
+ </listitem>
+ </varlistentry>
+
</variablelist>
</sect2>
<sect2 id="runtime-config-client-format">
diff --git a/doc/src/sgml/ref/notify.sgml b/doc/src/sgml/ref/notify.sgml
index 4dd5608..86a9bed 100644
--- a/doc/src/sgml/ref/notify.sgml
+++ b/doc/src/sgml/ref/notify.sgml
@@ -95,16 +95,17 @@ NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable cla
<para>
If the same channel name is signaled multiple times from the same
- transaction with identical payload strings, the
- database server can decide to deliver a single notification only.
- On the other hand, notifications with distinct payload strings will
- always be delivered as distinct notifications. Similarly, notifications from
- different transactions will never get folded into one notification.
- Except for dropping later instances of duplicate notifications,
+ transaction with identical payload strings, and
+ <varname>notify_duplicate_removal</> is set to true, the database server
+ can decide to deliver a single notification only. On the other hand,
+ notifications with distinct payload strings will always be delivered as
+ distinct notifications. Similarly, notifications from different
+ transactions will never get folded into one notification. Except for
+ dropping later instances of duplicate notifications,
<command>NOTIFY</command> guarantees that notifications from the same
transaction get delivered in the order they were sent. It is also
- guaranteed that messages from different transactions are delivered in
- the order in which the transactions committed.
+ guaranteed that messages from different transactions are delivered in the
+ order in which the transactions committed.
</para>
<para>
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index c39ac3a..a7bc9f1 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -364,8 +364,9 @@ static bool amRegisteredListener = false;
/* has this backend sent notifications in the current transaction? */
static bool backendHasSentNotifications = false;
-/* GUC parameter */
+/* GUC parameters */
bool Trace_notify = false;
+bool notify_duplicate_removal = true;
/* local function prototypes */
static bool asyncQueuePagePrecedes(int p, int q);
@@ -570,9 +571,12 @@ Async_Notify(const char *channel, const char *payload)
errmsg("payload string too long")));
}
- /* no point in making duplicate entries in the list ... */
- if (AsyncExistsPendingNotify(channel, payload))
- return;
+ if (notify_duplicate_removal)
+ {
+ /* check for duplicate entries in the list */
+ if (AsyncExistsPendingNotify(channel, payload))
+ return;
+ }
/*
* The notification list needs to live until end of transaction, so store
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 83b8388..b737c29 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1617,6 +1617,15 @@ static struct config_bool ConfigureNamesBool[] =
false,
NULL, NULL, NULL
},
+ {
+ {"notify_duplicate_removal", PGC_USERSET, CLIENT_CONN_STATEMENT,
+ gettext_noop("Remove duplicate messages during NOTIFY."),
+ NULL
+ },
+ ¬ify_duplicate_removal,
+ true,
+ NULL, NULL, NULL
+ },
/* End-of-list marker */
{
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 029114f..2831c1b 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -536,6 +536,7 @@
#xmloption = 'content'
#gin_fuzzy_search_limit = 0
#gin_pending_list_limit = 4MB
+#notify_duplicate_removal = on
# - Locale and Formatting -
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index b4c13fa..c572691 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -23,6 +23,7 @@
#define NUM_ASYNC_BUFFERS 8
extern bool Trace_notify;
+extern bool notify_duplicate_removal;
extern volatile sig_atomic_t notifyInterruptPending;
extern Size AsyncShmemSize(void);
On Fri, Feb 5, 2016 at 10:17 AM, Filip Rembiałkowski
<filip.rembialkowski@gmail.com> wrote:
- new GUC in "Statement Behaviour" section, notify_duplicate_removal
(default true)Initial discussion in this thread:
/messages/by-id/CAP_rwwmpzk9=SbjRZTOd05bDctyC43wNKnu_m37dYGvL4SAeSw@mail.gmail.comRationale: for some legitimate use cases, duplicate removal is not required,
and it gets O(N^2) cost on large COPY/ insert transactions.
I agree with what Merlin said about this:
/messages/by-id/CAHyXU0yoHe8Qc=yC10AHU1nFiA1tbHsg+35Ds-oEueUapo7t4g@mail.gmail.com
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Robert Haas <robertmhaas@gmail.com> writes:
On Fri, Feb 5, 2016 at 10:17 AM, Filip Rembiałkowski
<filip.rembialkowski@gmail.com> wrote:- new GUC in "Statement Behaviour" section, notify_duplicate_removal
I agree with what Merlin said about this:
/messages/by-id/CAHyXU0yoHe8Qc=yC10AHU1nFiA1tbHsg+35Ds-oEueUapo7t4g@mail.gmail.com
Yeah, I agree that a GUC for this is quite unappetizing.
One idea would be to build a hashtable to aid with duplicate detection
(perhaps only once the pending-notify list gets long).
Another thought is that it's already the case that duplicate detection is
something of a "best effort" activity; note for example the comment in
AsyncExistsPendingNotify pointing out that we don't collapse duplicates
across subtransactions. Would it be acceptable to relax the standards
a bit further? For example, if we only checked for duplicates among the
last N notification list entries (for N say around 100), we'd probably
cover just about all the useful cases, and the runtime would stay linear.
The data structure isn't tremendously conducive to that, but it could be
done.
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Sat, 6 Feb 2016 at 12:50 Tom Lane <tgl@sss.pgh.pa.us> wrote:
Robert Haas <robertmhaas@gmail.com> writes:
I agree with what Merlin said about this:
/messages/by-id/CAHyXU0yoHe8Qc=yC10AHU1nFiA1tbHsg+35Ds-oEueUapo7t4g@mail.gmail.com
Yeah, I agree that a GUC for this is quite unappetizing.
How would you feel about a variant for calling NOTIFY?
The SQL syntax could be something like "NOTIFY [ALL] channel, payload"
where the ALL means "just send the notification already, nobody cares
whether there's an identical one in the queue".
Likewise we could introduce a three-argument form of pg_notify(text, text,
bool) where the final argument is whether you are interested in removing
duplicates.
Optimising the remove-duplicates path is still probably a worthwhile
endeavour, but if the user really doesn't care at all about duplication, it
seems silly to force them to pay any performance price for a behaviour they
didn't want, no?
Cheers,
BJ
Brendan Jurd <direvus@gmail.com> writes:
On Sat, 6 Feb 2016 at 12:50 Tom Lane <tgl@sss.pgh.pa.us> wrote:
Yeah, I agree that a GUC for this is quite unappetizing.
How would you feel about a variant for calling NOTIFY?
If we decide that this ought to be user-visible, then an extra NOTIFY
parameter would be the way to do it. I'd much rather it "just works"
though. In particular, if we do start advertising user control of
de-duplication, we are likely to start getting bug reports about every
case where it's inexact, eg the no-checks-across-subxact-boundaries
business.
Optimising the remove-duplicates path is still probably a worthwhile
endeavour, but if the user really doesn't care at all about duplication, it
seems silly to force them to pay any performance price for a behaviour they
didn't want, no?
I would only be impressed with that argument if it could be shown that
de-duplication was a significant fraction of the total cost of a typical
NOTIFY cycle. Obviously, you can make the O(N^2) term dominate if you
try, but I really doubt that it's significant for reasonable numbers of
notify events per transaction. One should also keep in mind that
duplicate events are going to cost extra processing on the
client-application side, too. In my experience with using NOTIFY, that
cost probably dwarfs the cost of emitting the messages.
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 02/05/2016 08:49 PM, Tom Lane wrote:
Yeah, I agree that a GUC for this is quite unappetizing.
Agreed.
One idea would be to build a hashtable to aid with duplicate detection
(perhaps only once the pending-notify list gets long).Another thought is that it's already the case that duplicate detection is
something of a "best effort" activity; note for example the comment in
AsyncExistsPendingNotify pointing out that we don't collapse duplicates
across subtransactions. Would it be acceptable to relax the standards
a bit further? For example, if we only checked for duplicates among the
last N notification list entries (for N say around 100), we'd probably
cover just about all the useful cases, and the runtime would stay linear.
The data structure isn't tremendously conducive to that, but it could be
done.
I like the hashtable idea if it can be made workable.
cheers
andrew
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Sat, Feb 6, 2016 at 5:52 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
Brendan Jurd <direvus@gmail.com> writes:
On Sat, 6 Feb 2016 at 12:50 Tom Lane <tgl@sss.pgh.pa.us> wrote:
Yeah, I agree that a GUC for this is quite unappetizing.
How would you feel about a variant for calling NOTIFY?
If we decide that this ought to be user-visible, then an extra NOTIFY
parameter would be the way to do it. I'd much rather it "just works"
though. In particular, if we do start advertising user control of
de-duplication, we are likely to start getting bug reports about every
case where it's inexact, eg the no-checks-across-subxact-boundaries
business.
It is not enough to say "database server can decide to deliver a
single notification only." - which is already said in the docs?
The ALL keyword would be a clearly separated "do-nothing" version.
Optimising the remove-duplicates path is still probably a worthwhile
endeavour, but if the user really doesn't care at all about duplication, it
seems silly to force them to pay any performance price for a behaviour they
didn't want, no?I would only be impressed with that argument if it could be shown that
de-duplication was a significant fraction of the total cost of a typical
NOTIFY cycle.
Even if a typical NOTIFY cycle excludes processing 10k or 100k
messages, why penalize users who have bigger transactions?
Obviously, you can make the O(N^2) term dominate if you
try, but I really doubt that it's significant for reasonable numbers of
notify events per transaction.
Yes, it is hard to observe for less than few thousands messages in one
transaction.
But big data happens. And then the numbers get really bad.
In my test for 40k messages, it is 400 ms versus 9 seconds. 22 times
slower. For 200k messages, it is 2 seconds versus 250 seconds. 125
times slower.
And I tested with very short payload strings, so strcmp() had not much to do.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
+1
... and a patch (only adding ALL keyword, no hash table implemented yet).
Show quoted text
On Sat, Feb 6, 2016 at 2:35 PM, Brendan Jurd <direvus@gmail.com> wrote:
On Sat, 6 Feb 2016 at 12:50 Tom Lane <tgl@sss.pgh.pa.us> wrote:
Robert Haas <robertmhaas@gmail.com> writes:
I agree with what Merlin said about this:
/messages/by-id/CAHyXU0yoHe8Qc=yC10AHU1nFiA1tbHsg+35Ds-oEueUapo7t4g@mail.gmail.com
Yeah, I agree that a GUC for this is quite unappetizing.
How would you feel about a variant for calling NOTIFY?
The SQL syntax could be something like "NOTIFY [ALL] channel, payload" where
the ALL means "just send the notification already, nobody cares whether
there's an identical one in the queue".Likewise we could introduce a three-argument form of pg_notify(text, text,
bool) where the final argument is whether you are interested in removing
duplicates.Optimising the remove-duplicates path is still probably a worthwhile
endeavour, but if the user really doesn't care at all about duplication, it
seems silly to force them to pay any performance price for a behaviour they
didn't want, no?Cheers,
BJ
Attachments:
postgres-notify-all.patchtext/x-patch; charset=US-ASCII; name=postgres-notify-all.patchDownload
diff --git a/doc/src/sgml/ref/notify.sgml b/doc/src/sgml/ref/notify.sgml
index 4dd5608..c148859 100644
--- a/doc/src/sgml/ref/notify.sgml
+++ b/doc/src/sgml/ref/notify.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable class="PARAMETER">payload</replaceable> ]
+NOTIFY [ ALL | DISTINCT ] <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable class="PARAMETER">payload</replaceable> ]
</synopsis>
</refsynopsisdiv>
@@ -105,6 +105,8 @@ NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable cla
transaction get delivered in the order they were sent. It is also
guaranteed that messages from different transactions are delivered in
the order in which the transactions committed.
+ If <literal>ALL</> is specified (contrary to <literal>DISTINCT</>, the
+ default), the server will deliver all notifications, including duplicates.
</para>
<para>
@@ -184,11 +186,14 @@ NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable cla
<para>
To send a notification you can also use the function
- <literal><function>pg_notify</function>(<type>text</type>,
- <type>text</type>)</literal>. The function takes the channel name as the
- first argument and the payload as the second. The function is much easier
- to use than the <command>NOTIFY</command> command if you need to work with
- non-constant channel names and payloads.
+ <literal><function>pg_notify(<parameter>channel</> <type>text</>,
+ <parameter>payload</parameter> <type>text</> <optional>,
+ <parameter>use_all</> <type>boolean</></optional>)</function></literal>.
+ The function takes the channel name as the first argument and the payload
+ as the second. The third argument, <literal>false</> by default, represents
+ the <literal>ALL</> keyword. The function is much easier to use than the
+ <command>NOTIFY</command> command if you need to work with non-constant
+ channel names and payloads.
</para>
</refsect2>
</refsect1>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 923fe58..9df5301 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -965,3 +965,10 @@ RETURNS jsonb
LANGUAGE INTERNAL
STRICT IMMUTABLE
AS 'jsonb_set';
+
+CREATE OR REPLACE FUNCTION
+ pg_notify(channel text, payload text, use_all boolean DEFAULT false)
+RETURNS void
+LANGUAGE INTERNAL
+VOLATILE
+AS 'pg_notify';
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index c39ac3a..d374a00 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -510,6 +510,7 @@ pg_notify(PG_FUNCTION_ARGS)
{
const char *channel;
const char *payload;
+ bool use_all;
if (PG_ARGISNULL(0))
channel = "";
@@ -521,10 +522,12 @@ pg_notify(PG_FUNCTION_ARGS)
else
payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ use_all = PG_GETARG_BOOL(2);
+
/* For NOTIFY as a statement, this is checked in ProcessUtility */
PreventCommandDuringRecovery("NOTIFY");
- Async_Notify(channel, payload);
+ Async_Notify(channel, payload, use_all);
PG_RETURN_VOID();
}
@@ -540,7 +543,7 @@ pg_notify(PG_FUNCTION_ARGS)
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*/
void
-Async_Notify(const char *channel, const char *payload)
+Async_Notify(const char *channel, const char *payload, bool use_all)
{
Notification *n;
MemoryContext oldcontext;
@@ -570,9 +573,10 @@ Async_Notify(const char *channel, const char *payload)
errmsg("payload string too long")));
}
- /* no point in making duplicate entries in the list ... */
- if (AsyncExistsPendingNotify(channel, payload))
- return;
+ if (!use_all)
+ /* remove duplicate entries in the list */
+ if (AsyncExistsPendingNotify(channel, payload))
+ return;
/*
* The notification list needs to live until end of transaction, so store
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index b307b48..7203f4a 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -8528,11 +8528,12 @@ DropRuleStmt:
*
*****************************************************************************/
-NotifyStmt: NOTIFY ColId notify_payload
+NotifyStmt: NOTIFY all_or_distinct ColId notify_payload
{
NotifyStmt *n = makeNode(NotifyStmt);
- n->conditionname = $2;
- n->payload = $3;
+ n->use_all = $2;
+ n->conditionname = $3;
+ n->payload = $4;
$$ = (Node *)n;
}
;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 045f7f0..0e50561 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -599,7 +599,7 @@ standard_ProcessUtility(Node *parsetree,
NotifyStmt *stmt = (NotifyStmt *) parsetree;
PreventCommandDuringRecovery("NOTIFY");
- Async_Notify(stmt->conditionname, stmt->payload);
+ Async_Notify(stmt->conditionname, stmt->payload, stmt->use_all);
}
break;
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index c6b4916..14b1ed0 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -4005,7 +4005,7 @@ DESCR("trigger description with pretty-print option");
/* asynchronous notifications */
DATA(insert OID = 3035 ( pg_listening_channels PGNSP PGUID 12 1 10 0 0 f f f f t t s r 0 0 25 "" _null_ _null_ _null_ _null_ _null_ pg_listening_channels _null_ _null_ _null_ ));
DESCR("get the channels that the current backend listens to");
-DATA(insert OID = 3036 ( pg_notify PGNSP PGUID 12 1 0 0 0 f f f f f f v r 2 0 2278 "25 25" _null_ _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ ));
+DATA(insert OID = 3036 ( pg_notify PGNSP PGUID 12 1 0 0 0 f f f f f f v r 3 0 2278 "25 25 16" _null_ _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ ));
DESCR("send a notification event");
DATA(insert OID = 3296 ( pg_notification_queue_usage PGNSP PGUID 12 1 0 0 0 f f f f t f v s 0 0 701 "" _null_ _null_ _null_ _null_ _null_ pg_notification_queue_usage _null_ _null_ _null_ ));
DESCR("get the fraction of the asynchronous notification queue currently in use");
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index b4c13fa..3a2c1c2 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -29,7 +29,7 @@ extern Size AsyncShmemSize(void);
extern void AsyncShmemInit(void);
/* notify-related SQL statements */
-extern void Async_Notify(const char *channel, const char *payload);
+extern void Async_Notify(const char *channel, const char *payload, bool use_all);
extern void Async_Listen(const char *channel);
extern void Async_Unlisten(const char *channel);
extern void Async_UnlistenAll(void);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 2fd0629..7f1f01d 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2590,6 +2590,7 @@ typedef struct NotifyStmt
NodeTag type;
char *conditionname; /* condition name to notify */
char *payload; /* the payload string, or NULL if none */
+ bool use_all; /* ALL option */
} NotifyStmt;
/* ----------------------
diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out
index 19cbe38..c650b90 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -8,6 +8,18 @@ SELECT pg_notify('notify_async1','sample message1');
(1 row)
+SELECT pg_notify('notify_async1','sample message1',false);
+ pg_notify
+-----------
+
+(1 row)
+
+SELECT pg_notify('notify_async1','sample_message1',true);
+ pg_notify
+-----------
+
+(1 row)
+
SELECT pg_notify('notify_async1','');
pg_notify
-----------
@@ -29,6 +41,8 @@ SELECT pg_notify('notify_async_channel_name_too_long____________________________
ERROR: channel name too long
--Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
NOTIFY notify_async2;
+NOTIFY DISTINCT notify_async2;
+NOTIFY ALL notify_async2;
LISTEN notify_async2;
UNLISTEN notify_async2;
UNLISTEN *;
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 40f6e01..6e53b86 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -4,6 +4,8 @@
--Should work. Send a valid message via a valid channel name
SELECT pg_notify('notify_async1','sample message1');
+SELECT pg_notify('notify_async1','sample message1',false);
+SELECT pg_notify('notify_async1','sample_message1',true);
SELECT pg_notify('notify_async1','');
SELECT pg_notify('notify_async1',NULL);
@@ -14,6 +16,8 @@ SELECT pg_notify('notify_async_channel_name_too_long____________________________
--Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
NOTIFY notify_async2;
+NOTIFY DISTINCT notify_async2;
+NOTIFY ALL notify_async2;
LISTEN notify_async2;
UNLISTEN notify_async2;
UNLISTEN *;
On 02/07/2016 03:42 AM, Filip Rembiałkowski wrote:
+1
... and a patch (only adding ALL keyword, no hash table implemented yet).
Please stop top-posting, it's very disruptive. My comments are below,
where they belong.
On Sat, Feb 6, 2016 at 2:35 PM, Brendan Jurd <direvus@gmail.com> wrote:
On Sat, 6 Feb 2016 at 12:50 Tom Lane <tgl@sss.pgh.pa.us> wrote:
Robert Haas <robertmhaas@gmail.com> writes:
I agree with what Merlin said about this:
/messages/by-id/CAHyXU0yoHe8Qc=yC10AHU1nFiA1tbHsg+35Ds-oEueUapo7t4g@mail.gmail.com
Yeah, I agree that a GUC for this is quite unappetizing.
How would you feel about a variant for calling NOTIFY?
The SQL syntax could be something like "NOTIFY [ALL] channel, payload" where
the ALL means "just send the notification already, nobody cares whether
there's an identical one in the queue".Likewise we could introduce a three-argument form of pg_notify(text, text,
bool) where the final argument is whether you are interested in removing
duplicates.Optimising the remove-duplicates path is still probably a worthwhile
endeavour, but if the user really doesn't care at all about duplication, it
seems silly to force them to pay any performance price for a behaviour they
didn't want, no?
On 02/07/2016 03:42 AM, Filip Rembiałkowski wrote:
+1
... and a patch (only adding ALL keyword, no hash table implemented yet).
I only read through the patch, I didn't compile it or test it, but I
have a few comments:
You left the duplicate behavior with subtransactions, but didn't mention
it in the documentation. If I do NOTIFY DISTINCT chan, 'msg'; then I
expect only distinct notifications but I'll get duplicates if I'm in a
subtransaction. Either the documentation or the code needs to be fixed.
I seem to remember some discussion about not using DEFAULT parameters in
system functions so you should leave the old function alone and create a
new function with your use_all parameter. I don't recall the exact
reason why so hopefully someone else will enlighten me.
There is also no mention in the documentation about what happens if I do:
NOTIFY ALL chan, 'msg';
NOTIFY ALL chan, 'msg';
NOTIFY DISTINCT chan, 'msg';
NOTIFY ALL chan, 'msg';
Without testing, I'd say I'd get two messages, but it should be
explicitly mentioned somewhere.
--
Vik Fearing +33 6 46 75 15 36
http://2ndQuadrant.fr PostgreSQL : Expertise, Formation et Support
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Sun, Feb 7, 2016 at 11:54 AM, Vik Fearing <vik@2ndquadrant.fr> wrote:
On 02/07/2016 03:42 AM, Filip Rembiałkowski wrote:
You left the duplicate behavior with subtransactions, but didn't mention
it in the documentation. If I do NOTIFY DISTINCT chan, 'msg'; then I
expect only distinct notifications but I'll get duplicates if I'm in a
subtransaction. Either the documentation or the code needs to be fixed.
agreed
I seem to remember some discussion about not using DEFAULT parameters in
system functions so you should leave the old function alone and create a
new function with your use_all parameter. I don't recall the exact
reason why so hopefully someone else will enlighten me.
I'm quite new to this; how do I pinpoint proper OID for a new catalog
object (function, in this case)?
Is there a better way than browsing fmgr files and guessing next available oid?
There is also no mention in the documentation about what happens if I do:
NOTIFY ALL chan, 'msg';
NOTIFY ALL chan, 'msg';
NOTIFY DISTINCT chan, 'msg';
NOTIFY ALL chan, 'msg';Without testing, I'd say I'd get two messages, but it should be
explicitly mentioned somewhere.
If it's four separate transactions, LISTEN'er should get four.
If it's in one transaction, LISTEN'er should get three.
--
Vik Fearing +33 6 46 75 15 36
http://2ndQuadrant.fr PostgreSQL : Expertise, Formation et Support
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 02/07/2016 04:00 PM, Filip Rembiałkowski wrote:
On Sun, Feb 7, 2016 at 11:54 AM, Vik Fearing <vik@2ndquadrant.fr> wrote:
I seem to remember some discussion about not using DEFAULT parameters in
system functions so you should leave the old function alone and create a
new function with your use_all parameter. I don't recall the exact
reason why so hopefully someone else will enlighten me.I'm quite new to this; how do I pinpoint proper OID for a new catalog
object (function, in this case)?
Is there a better way than browsing fmgr files and guessing next available oid?
There is a shell script called `unused_oids` in src/include/catalog/.
There is also no mention in the documentation about what happens if I do:
NOTIFY ALL chan, 'msg';
NOTIFY ALL chan, 'msg';
NOTIFY DISTINCT chan, 'msg';
NOTIFY ALL chan, 'msg';Without testing, I'd say I'd get two messages, but it should be
explicitly mentioned somewhere.If it's four separate transactions, LISTEN'er should get four.
The question was for one transaction, I should have been clearer about that.
If it's in one transaction, LISTEN'er should get three.
This is surprising to me, I would think it would get only two. What is
your rationale for three?
Compare with the behavior of:
select 1
union all
select 1
union distinct
select 1
union all
select 1;
--
Vik Fearing +33 6 46 75 15 36
http://2ndQuadrant.fr PostgreSQL : Expertise, Formation et Support
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Sun, Feb 7, 2016 at 4:37 PM, Vik Fearing <vik@2ndquadrant.fr> wrote:
There is also no mention in the documentation about what happens if I do:
NOTIFY ALL chan, 'msg';
NOTIFY ALL chan, 'msg';
NOTIFY DISTINCT chan, 'msg';
NOTIFY ALL chan, 'msg';Without testing, I'd say I'd get two messages, but it should be
explicitly mentioned somewhere.If it's four separate transactions, LISTEN'er should get four.
The question was for one transaction, I should have been clearer about that.
If it's in one transaction, LISTEN'er should get three.
This is surprising to me, I would think it would get only two. What is
your rationale for three?
It is a single transaction, but four separate commands.
NOTIFY ALL chan, 'msg';
-- send the message, save in the list/hash
NOTIFY ALL chan, 'msg';
-- ALL was specified, send the message even if it is on the list/hash
NOTIFY DISTINCT chan, 'msg';
-- default mode, skip message because it's in the list/hash
NOTIFY ALL chan, 'msg';
-- ALL was specified, send the message even if it is hashed/saved
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 8 February 2016 at 09:37, Filip Rembiałkowski <
filip.rembialkowski@gmail.com> wrote:
On Sun, Feb 7, 2016 at 4:37 PM, Vik Fearing <vik@2ndquadrant.fr> wrote:
There is also no mention in the documentation about what happens if I
do:
NOTIFY ALL chan, 'msg';
NOTIFY ALL chan, 'msg';
NOTIFY DISTINCT chan, 'msg';
NOTIFY ALL chan, 'msg';Without testing, I'd say I'd get two messages, but it should be
explicitly mentioned somewhere.If it's four separate transactions, LISTEN'er should get four.
The question was for one transaction, I should have been clearer about
that.
If it's in one transaction, LISTEN'er should get three.
This is surprising to me, I would think it would get only two. What is
your rationale for three?It is a single transaction, but four separate commands.
NOTIFY ALL chan, 'msg';
-- send the message, save in the list/hash
NOTIFY ALL chan, 'msg';
-- ALL was specified, send the message even if it is on the list/hash
NOTIFY DISTINCT chan, 'msg';
-- default mode, skip message because it's in the list/hash
NOTIFY ALL chan, 'msg';
-- ALL was specified, send the message even if it is hashed/saved
So in total three messages are sent?
Would it be correct to say that if ALL is specified then a message is
queued no matter what. If DISTINCT is specified then it is only queued if
no message with the same channel and argument is already queued for
delivery. Using DISTINCT can never decrease the total number of messages to
be sent.
Right?
If so, I think that's the right behaviour and the docs just need to be
explicit - an example like the above would be good, translated to be
friendlier to those who don't know the internal mechanics.
I've found the deduplication functionality of NOTIFY very frustrating in
the past and I see this as a significant improvement. Sometimes the *number
of times* something happened is significant too...
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
On Mon, Feb 8, 2016 at 1:52 PM, Craig Ringer <craig@2ndquadrant.com> wrote:
Would it be correct to say that if ALL is specified then a message is queued
no matter what. If DISTINCT is specified then it is only queued if no
message with the same channel and argument is already queued for delivery.
Yes, exactly.
Using DISTINCT can never decrease the total number of messages to be sent.
This sentence does not sound true. DISTINCT is the default, old
behaviour. It *can* decrease total number of messages (by
deduplication)
I've found the deduplication functionality of NOTIFY very frustrating in the past
and I see this as a significant improvement. Sometimes the *number of times*
something happened is significant too...
yep, same idea here.
Here is my next try, after suggestions from -perf and -hackers list:
* no GUC
* small addition to NOTIFY grammar: NOTIFY ALL/DISTINCT
* corresponding, 3-argument version of pg_notify(text,text,bool)
* updated the docs to include new syntax and clarify behavior
* no hashtable in AsyncExistsPendingNotify
(I don't see much sense in that part; and it can be well done
separately from this)
Attachments:
postgres-notify-all-v2.patchtext/x-patch; charset=US-ASCII; name=postgres-notify-all-v2.patchDownload
diff --git a/doc/src/sgml/ref/notify.sgml b/doc/src/sgml/ref/notify.sgml
index 4dd5608..933c76c 100644
--- a/doc/src/sgml/ref/notify.sgml
+++ b/doc/src/sgml/ref/notify.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable class="PARAMETER">payload</replaceable> ]
+NOTIFY [ ALL | DISTINCT ] <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable class="PARAMETER">payload</replaceable> ]
</synopsis>
</refsynopsisdiv>
@@ -105,6 +105,10 @@ NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable cla
transaction get delivered in the order they were sent. It is also
guaranteed that messages from different transactions are delivered in
the order in which the transactions committed.
+ If <literal>ALL</> is specified (contrary to <literal>DISTINCT</>, the
+ default), the server will deliver all notifications, including duplicates.
+ Removal of duplicate notifications takes place within transaction block,
+ finished with <literal>COMMIT</>, <literal>END</> or <literal>SAVEPOINT</>.
</para>
<para>
@@ -190,6 +194,12 @@ NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable cla
to use than the <command>NOTIFY</command> command if you need to work with
non-constant channel names and payloads.
</para>
+ <para>
+ There is a three-argument version, <literal>pg_notify(<type>text</>,
+ <type>text</>, <type>boolean</>)</literal>. The third argument acts like
+ the <literal>ALL</> keyword when set to <literal>true</>, and
+ <literal>DISTINCT</> when set to <literal>false</>.
+ </para>
</refsect2>
</refsect1>
@@ -210,6 +220,21 @@ Asynchronous notification "virtual" with payload "This is the payload" received
LISTEN foo;
SELECT pg_notify('fo' || 'o', 'pay' || 'load');
Asynchronous notification "foo" with payload "payload" received from server process with PID 14728.
+
+/* Identical messages from same (sub-) transaction can be eliminated - unless you use the ALL keyword */
+LISTEN bar;
+BEGIN;
+NOTIFY bar, 'Coffee please';
+NOTIFY bar, 'Coffee please';
+NOTIFY bar, 'Milk please';
+NOTIFY ALL bar, 'Milk please';
+SAVEPOINT s;
+NOTIFY bar, 'Coffee please';
+COMMIT;
+Asynchronous notification "bar" with payload "Coffee please" received from server process with PID 31517.
+Asynchronous notification "bar" with payload "Milk please" received from server process with PID 31517.
+Asynchronous notification "bar" with payload "Milk please" received from server process with PID 31517.
+Asynchronous notification "bar" with payload "Coffee please" received from server process with PID 31517.
</programlisting></para>
</refsect1>
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index c39ac3a..38a8246 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -510,6 +510,7 @@ pg_notify(PG_FUNCTION_ARGS)
{
const char *channel;
const char *payload;
+ bool use_all;
if (PG_ARGISNULL(0))
channel = "";
@@ -521,10 +522,15 @@ pg_notify(PG_FUNCTION_ARGS)
else
payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ if (PG_NARGS() > 2 && ! PG_ARGISNULL(2))
+ use_all = PG_GETARG_BOOL(2);
+ else
+ use_all = false;
+
/* For NOTIFY as a statement, this is checked in ProcessUtility */
PreventCommandDuringRecovery("NOTIFY");
- Async_Notify(channel, payload);
+ Async_Notify(channel, payload, use_all);
PG_RETURN_VOID();
}
@@ -540,7 +546,7 @@ pg_notify(PG_FUNCTION_ARGS)
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*/
void
-Async_Notify(const char *channel, const char *payload)
+Async_Notify(const char *channel, const char *payload, bool use_all)
{
Notification *n;
MemoryContext oldcontext;
@@ -570,9 +576,10 @@ Async_Notify(const char *channel, const char *payload)
errmsg("payload string too long")));
}
- /* no point in making duplicate entries in the list ... */
- if (AsyncExistsPendingNotify(channel, payload))
- return;
+ if (!use_all)
+ /* remove duplicate entries in the list */
+ if (AsyncExistsPendingNotify(channel, payload))
+ return;
/*
* The notification list needs to live until end of transaction, so store
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index b307b48..7203f4a 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -8528,11 +8528,12 @@ DropRuleStmt:
*
*****************************************************************************/
-NotifyStmt: NOTIFY ColId notify_payload
+NotifyStmt: NOTIFY all_or_distinct ColId notify_payload
{
NotifyStmt *n = makeNode(NotifyStmt);
- n->conditionname = $2;
- n->payload = $3;
+ n->use_all = $2;
+ n->conditionname = $3;
+ n->payload = $4;
$$ = (Node *)n;
}
;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 045f7f0..0e50561 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -599,7 +599,7 @@ standard_ProcessUtility(Node *parsetree,
NotifyStmt *stmt = (NotifyStmt *) parsetree;
PreventCommandDuringRecovery("NOTIFY");
- Async_Notify(stmt->conditionname, stmt->payload);
+ Async_Notify(stmt->conditionname, stmt->payload, stmt->use_all);
}
break;
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 1c0ef9a..53ed138 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -4005,6 +4005,8 @@ DESCR("trigger description with pretty-print option");
/* asynchronous notifications */
DATA(insert OID = 3035 ( pg_listening_channels PGNSP PGUID 12 1 10 0 0 f f f f t t s r 0 0 25 "" _null_ _null_ _null_ _null_ _null_ pg_listening_channels _null_ _null_ _null_ ));
DESCR("get the channels that the current backend listens to");
+DATA(insert OID = 2561 ( pg_notify PGNSP PGUID 12 1 0 0 0 f f f f f f v r 3 0 2278 "25 25 16" _null_ _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ ));
+DESCR("send a notification event");
DATA(insert OID = 3036 ( pg_notify PGNSP PGUID 12 1 0 0 0 f f f f f f v r 2 0 2278 "25 25" _null_ _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ ));
DESCR("send a notification event");
DATA(insert OID = 3296 ( pg_notification_queue_usage PGNSP PGUID 12 1 0 0 0 f f f f t f v s 0 0 701 "" _null_ _null_ _null_ _null_ _null_ pg_notification_queue_usage _null_ _null_ _null_ ));
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index b4c13fa..3a2c1c2 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -29,7 +29,7 @@ extern Size AsyncShmemSize(void);
extern void AsyncShmemInit(void);
/* notify-related SQL statements */
-extern void Async_Notify(const char *channel, const char *payload);
+extern void Async_Notify(const char *channel, const char *payload, bool use_all);
extern void Async_Listen(const char *channel);
extern void Async_Unlisten(const char *channel);
extern void Async_UnlistenAll(void);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 2fd0629..7f1f01d 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2590,6 +2590,7 @@ typedef struct NotifyStmt
NodeTag type;
char *conditionname; /* condition name to notify */
char *payload; /* the payload string, or NULL if none */
+ bool use_all; /* ALL option */
} NotifyStmt;
/* ----------------------
diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out
index 19cbe38..c650b90 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -8,6 +8,18 @@ SELECT pg_notify('notify_async1','sample message1');
(1 row)
+SELECT pg_notify('notify_async1','sample message1',false);
+ pg_notify
+-----------
+
+(1 row)
+
+SELECT pg_notify('notify_async1','sample_message1',true);
+ pg_notify
+-----------
+
+(1 row)
+
SELECT pg_notify('notify_async1','');
pg_notify
-----------
@@ -29,6 +41,8 @@ SELECT pg_notify('notify_async_channel_name_too_long____________________________
ERROR: channel name too long
--Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
NOTIFY notify_async2;
+NOTIFY DISTINCT notify_async2;
+NOTIFY ALL notify_async2;
LISTEN notify_async2;
UNLISTEN notify_async2;
UNLISTEN *;
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 40f6e01..6e53b86 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -4,6 +4,8 @@
--Should work. Send a valid message via a valid channel name
SELECT pg_notify('notify_async1','sample message1');
+SELECT pg_notify('notify_async1','sample message1',false);
+SELECT pg_notify('notify_async1','sample_message1',true);
SELECT pg_notify('notify_async1','');
SELECT pg_notify('notify_async1',NULL);
@@ -14,6 +16,8 @@ SELECT pg_notify('notify_async_channel_name_too_long____________________________
--Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
NOTIFY notify_async2;
+NOTIFY DISTINCT notify_async2;
+NOTIFY ALL notify_async2;
LISTEN notify_async2;
UNLISTEN notify_async2;
UNLISTEN *;
On 02/08/2016 09:33 PM, Filip Rembiałkowski wrote:
Here is my next try, after suggestions from -perf and -hackers list:
* no GUC
* small addition to NOTIFY grammar: NOTIFY ALL/DISTINCT
* corresponding, 3-argument version of pg_notify(text,text,bool)
* updated the docs to include new syntax and clarify behavior
* no hashtable in AsyncExistsPendingNotify
(I don't see much sense in that part; and it can be well done
separately from this)
Please add this to the next commitfest:
https://commitfest.postgresql.org/9/new/
--
Vik Fearing +33 6 46 75 15 36
http://2ndQuadrant.fr PostgreSQL : Expertise, Formation et Support
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Feb 8, 2016 at 2:33 PM, Filip Rembiałkowski
<filip.rembialkowski@gmail.com> wrote:
Here is my next try, after suggestions from -perf and -hackers list:
* no GUC
* small addition to NOTIFY grammar: NOTIFY ALL/DISTINCT
* corresponding, 3-argument version of pg_notify(text,text,bool)
This is all sounding pretty good. I wonder if the third argument
should be a boolean however. If we make it 'text, 'send mode',
instead, we could leave some room for more specialization of the
queuing behavior.
For example, we've had a couple of requests over the years to have an
'immediate' mode which dumps the notification immediately to the
client without waiting for tx commit. This may or may not be a good
idea, but if it was ultimately proved to be, it could be introduced as
an alternate mode without adding an extra function.
merlin
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Feb 9, 2016 at 12:15 AM, Merlin Moncure <mmoncure@gmail.com> wrote:
I wonder if the third argument
should be a boolean however. If we make it 'text, 'send mode',
instead, we could leave some room for more specialization of the
queuing behavior.For example, we've had a couple of requests over the years to have an
'immediate' mode which dumps the notification immediately to the
client without waiting for tx commit. This may or may not be a good
idea, but if it was ultimately proved to be, it could be introduced as
an alternate mode without adding an extra function.
But then it becomes disputable if SQL syntax change makes sense.
---we had this,
NOTIFY channel [ , payload ]
---and in this patch we have this
NOTIFY [ ALL | DISTINCT ] channel [ , payload ]
--- but maybe we should have this?
NOTIFY channel [ , payload [ , mode ] ]
I'm not sure which direction is better with non-standard SQL additions.
Recycling keywords or adding more commas?
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Feb 9, 2016 at 2:16 PM, Filip Rembiałkowski
<filip.rembialkowski@gmail.com> wrote:
But then it becomes disputable if SQL syntax change makes sense.
---we had this, NOTIFY channel [ , payload ] ---and in this patch we have this NOTIFY [ ALL | DISTINCT ] channel [ , payload ] --- but maybe we should have this? NOTIFY channel [ , payload [ , mode ] ]
I think using ALL to mean "don't worry about de-duplication" could be
a bit confusing, especially as there was some interest recently in
supporting wildcard notifications:
/messages/by-id/52693FC5.7070507@gmail.com
and conceivably we might want to support a way to notify all
listeners, i.e. NOTIFY * as proposed in that thread. If we ever
supported wildcard notifies, ALL may be easily confused to mean "all
channel names".
What about adopting the options-inside-parentheses format, the way
EXPLAIN does nowadays, something like:
NOTIFY (DEDUPLICATE FALSE, MODE IMMEDIATE) mychannel;
Josh
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Josh Kupershmidt <schmiddy@gmail.com> writes:
On Tue, Feb 9, 2016 at 2:16 PM, Filip Rembiałkowski
<filip.rembialkowski@gmail.com> wrote:But then it becomes disputable if SQL syntax change makes sense.
---we had this, NOTIFY channel [ , payload ] ---and in this patch we have this NOTIFY [ ALL | DISTINCT ] channel [ , payload ] --- but maybe we should have this? NOTIFY channel [ , payload [ , mode ] ]
What about adopting the options-inside-parentheses format, the way
EXPLAIN does nowadays, something like:
NOTIFY (DEDUPLICATE FALSE, MODE IMMEDIATE) mychannel;
FWIW, I think it would be a good thing if the NOTIFY statement syntax were
not remarkably different from the syntax used in the pg_notify() function
call. To do otherwise would certainly be confusing. So on the whole
I'd go with the "NOTIFY channel [ , payload [ , mode ] ]" option.
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Small update. I had to add one thing in /contrib/tcn/.
Attachments:
postgres-notify-all-v3.patchtext/x-patch; charset=US-ASCII; name=postgres-notify-all-v3.patchDownload
diff --git a/contrib/tcn/tcn.c b/contrib/tcn/tcn.c
index 7352b29..3a6d4f5 100644
--- a/contrib/tcn/tcn.c
+++ b/contrib/tcn/tcn.c
@@ -160,7 +160,7 @@ triggered_change_notification(PG_FUNCTION_ARGS)
strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\'');
}
- Async_Notify(channel, payload->data);
+ Async_Notify(channel, payload->data, false);
}
ReleaseSysCache(indexTuple);
break;
diff --git a/doc/src/sgml/ref/notify.sgml b/doc/src/sgml/ref/notify.sgml
index 4dd5608..933c76c 100644
--- a/doc/src/sgml/ref/notify.sgml
+++ b/doc/src/sgml/ref/notify.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable class="PARAMETER">payload</replaceable> ]
+NOTIFY [ ALL | DISTINCT ] <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable class="PARAMETER">payload</replaceable> ]
</synopsis>
</refsynopsisdiv>
@@ -105,6 +105,10 @@ NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable cla
transaction get delivered in the order they were sent. It is also
guaranteed that messages from different transactions are delivered in
the order in which the transactions committed.
+ If <literal>ALL</> is specified (contrary to <literal>DISTINCT</>, the
+ default), the server will deliver all notifications, including duplicates.
+ Removal of duplicate notifications takes place within transaction block,
+ finished with <literal>COMMIT</>, <literal>END</> or <literal>SAVEPOINT</>.
</para>
<para>
@@ -190,6 +194,12 @@ NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable cla
to use than the <command>NOTIFY</command> command if you need to work with
non-constant channel names and payloads.
</para>
+ <para>
+ There is a three-argument version, <literal>pg_notify(<type>text</>,
+ <type>text</>, <type>boolean</>)</literal>. The third argument acts like
+ the <literal>ALL</> keyword when set to <literal>true</>, and
+ <literal>DISTINCT</> when set to <literal>false</>.
+ </para>
</refsect2>
</refsect1>
@@ -210,6 +220,21 @@ Asynchronous notification "virtual" with payload "This is the payload" received
LISTEN foo;
SELECT pg_notify('fo' || 'o', 'pay' || 'load');
Asynchronous notification "foo" with payload "payload" received from server process with PID 14728.
+
+/* Identical messages from same (sub-) transaction can be eliminated - unless you use the ALL keyword */
+LISTEN bar;
+BEGIN;
+NOTIFY bar, 'Coffee please';
+NOTIFY bar, 'Coffee please';
+NOTIFY bar, 'Milk please';
+NOTIFY ALL bar, 'Milk please';
+SAVEPOINT s;
+NOTIFY bar, 'Coffee please';
+COMMIT;
+Asynchronous notification "bar" with payload "Coffee please" received from server process with PID 31517.
+Asynchronous notification "bar" with payload "Milk please" received from server process with PID 31517.
+Asynchronous notification "bar" with payload "Milk please" received from server process with PID 31517.
+Asynchronous notification "bar" with payload "Coffee please" received from server process with PID 31517.
</programlisting></para>
</refsect1>
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index c39ac3a..38a8246 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -510,6 +510,7 @@ pg_notify(PG_FUNCTION_ARGS)
{
const char *channel;
const char *payload;
+ bool use_all;
if (PG_ARGISNULL(0))
channel = "";
@@ -521,10 +522,15 @@ pg_notify(PG_FUNCTION_ARGS)
else
payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ if (PG_NARGS() > 2 && ! PG_ARGISNULL(2))
+ use_all = PG_GETARG_BOOL(2);
+ else
+ use_all = false;
+
/* For NOTIFY as a statement, this is checked in ProcessUtility */
PreventCommandDuringRecovery("NOTIFY");
- Async_Notify(channel, payload);
+ Async_Notify(channel, payload, use_all);
PG_RETURN_VOID();
}
@@ -540,7 +546,7 @@ pg_notify(PG_FUNCTION_ARGS)
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*/
void
-Async_Notify(const char *channel, const char *payload)
+Async_Notify(const char *channel, const char *payload, bool use_all)
{
Notification *n;
MemoryContext oldcontext;
@@ -570,9 +576,10 @@ Async_Notify(const char *channel, const char *payload)
errmsg("payload string too long")));
}
- /* no point in making duplicate entries in the list ... */
- if (AsyncExistsPendingNotify(channel, payload))
- return;
+ if (!use_all)
+ /* remove duplicate entries in the list */
+ if (AsyncExistsPendingNotify(channel, payload))
+ return;
/*
* The notification list needs to live until end of transaction, so store
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index b307b48..7203f4a 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -8528,11 +8528,12 @@ DropRuleStmt:
*
*****************************************************************************/
-NotifyStmt: NOTIFY ColId notify_payload
+NotifyStmt: NOTIFY all_or_distinct ColId notify_payload
{
NotifyStmt *n = makeNode(NotifyStmt);
- n->conditionname = $2;
- n->payload = $3;
+ n->use_all = $2;
+ n->conditionname = $3;
+ n->payload = $4;
$$ = (Node *)n;
}
;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 045f7f0..0e50561 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -599,7 +599,7 @@ standard_ProcessUtility(Node *parsetree,
NotifyStmt *stmt = (NotifyStmt *) parsetree;
PreventCommandDuringRecovery("NOTIFY");
- Async_Notify(stmt->conditionname, stmt->payload);
+ Async_Notify(stmt->conditionname, stmt->payload, stmt->use_all);
}
break;
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 1c0ef9a..53ed138 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -4005,6 +4005,8 @@ DESCR("trigger description with pretty-print option");
/* asynchronous notifications */
DATA(insert OID = 3035 ( pg_listening_channels PGNSP PGUID 12 1 10 0 0 f f f f t t s r 0 0 25 "" _null_ _null_ _null_ _null_ _null_ pg_listening_channels _null_ _null_ _null_ ));
DESCR("get the channels that the current backend listens to");
+DATA(insert OID = 2561 ( pg_notify PGNSP PGUID 12 1 0 0 0 f f f f f f v r 3 0 2278 "25 25 16" _null_ _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ ));
+DESCR("send a notification event");
DATA(insert OID = 3036 ( pg_notify PGNSP PGUID 12 1 0 0 0 f f f f f f v r 2 0 2278 "25 25" _null_ _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ ));
DESCR("send a notification event");
DATA(insert OID = 3296 ( pg_notification_queue_usage PGNSP PGUID 12 1 0 0 0 f f f f t f v s 0 0 701 "" _null_ _null_ _null_ _null_ _null_ pg_notification_queue_usage _null_ _null_ _null_ ));
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index b4c13fa..3a2c1c2 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -29,7 +29,7 @@ extern Size AsyncShmemSize(void);
extern void AsyncShmemInit(void);
/* notify-related SQL statements */
-extern void Async_Notify(const char *channel, const char *payload);
+extern void Async_Notify(const char *channel, const char *payload, bool use_all);
extern void Async_Listen(const char *channel);
extern void Async_Unlisten(const char *channel);
extern void Async_UnlistenAll(void);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 2fd0629..7f1f01d 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2590,6 +2590,7 @@ typedef struct NotifyStmt
NodeTag type;
char *conditionname; /* condition name to notify */
char *payload; /* the payload string, or NULL if none */
+ bool use_all; /* ALL option */
} NotifyStmt;
/* ----------------------
diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out
index 19cbe38..c650b90 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -8,6 +8,18 @@ SELECT pg_notify('notify_async1','sample message1');
(1 row)
+SELECT pg_notify('notify_async1','sample message1',false);
+ pg_notify
+-----------
+
+(1 row)
+
+SELECT pg_notify('notify_async1','sample_message1',true);
+ pg_notify
+-----------
+
+(1 row)
+
SELECT pg_notify('notify_async1','');
pg_notify
-----------
@@ -29,6 +41,8 @@ SELECT pg_notify('notify_async_channel_name_too_long____________________________
ERROR: channel name too long
--Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
NOTIFY notify_async2;
+NOTIFY DISTINCT notify_async2;
+NOTIFY ALL notify_async2;
LISTEN notify_async2;
UNLISTEN notify_async2;
UNLISTEN *;
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 40f6e01..6e53b86 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -4,6 +4,8 @@
--Should work. Send a valid message via a valid channel name
SELECT pg_notify('notify_async1','sample message1');
+SELECT pg_notify('notify_async1','sample message1',false);
+SELECT pg_notify('notify_async1','sample_message1',true);
SELECT pg_notify('notify_async1','');
SELECT pg_notify('notify_async1',NULL);
@@ -14,6 +16,8 @@ SELECT pg_notify('notify_async_channel_name_too_long____________________________
--Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
NOTIFY notify_async2;
+NOTIFY DISTINCT notify_async2;
+NOTIFY ALL notify_async2;
LISTEN notify_async2;
UNLISTEN notify_async2;
UNLISTEN *;
Another update - separated new internal function to satisfy opr_sanity.sql
Attachments:
postgres-notify-all-v4.patchtext/x-patch; charset=US-ASCII; name=postgres-notify-all-v4.patchDownload
diff --git a/contrib/tcn/tcn.c b/contrib/tcn/tcn.c
index 7352b29..3a6d4f5 100644
--- a/contrib/tcn/tcn.c
+++ b/contrib/tcn/tcn.c
@@ -160,7 +160,7 @@ triggered_change_notification(PG_FUNCTION_ARGS)
strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\'');
}
- Async_Notify(channel, payload->data);
+ Async_Notify(channel, payload->data, false);
}
ReleaseSysCache(indexTuple);
break;
diff --git a/doc/src/sgml/ref/notify.sgml b/doc/src/sgml/ref/notify.sgml
index 4dd5608..933c76c 100644
--- a/doc/src/sgml/ref/notify.sgml
+++ b/doc/src/sgml/ref/notify.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable class="PARAMETER">payload</replaceable> ]
+NOTIFY [ ALL | DISTINCT ] <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable class="PARAMETER">payload</replaceable> ]
</synopsis>
</refsynopsisdiv>
@@ -105,6 +105,10 @@ NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable cla
transaction get delivered in the order they were sent. It is also
guaranteed that messages from different transactions are delivered in
the order in which the transactions committed.
+ If <literal>ALL</> is specified (contrary to <literal>DISTINCT</>, the
+ default), the server will deliver all notifications, including duplicates.
+ Removal of duplicate notifications takes place within transaction block,
+ finished with <literal>COMMIT</>, <literal>END</> or <literal>SAVEPOINT</>.
</para>
<para>
@@ -190,6 +194,12 @@ NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable cla
to use than the <command>NOTIFY</command> command if you need to work with
non-constant channel names and payloads.
</para>
+ <para>
+ There is a three-argument version, <literal>pg_notify(<type>text</>,
+ <type>text</>, <type>boolean</>)</literal>. The third argument acts like
+ the <literal>ALL</> keyword when set to <literal>true</>, and
+ <literal>DISTINCT</> when set to <literal>false</>.
+ </para>
</refsect2>
</refsect1>
@@ -210,6 +220,21 @@ Asynchronous notification "virtual" with payload "This is the payload" received
LISTEN foo;
SELECT pg_notify('fo' || 'o', 'pay' || 'load');
Asynchronous notification "foo" with payload "payload" received from server process with PID 14728.
+
+/* Identical messages from same (sub-) transaction can be eliminated - unless you use the ALL keyword */
+LISTEN bar;
+BEGIN;
+NOTIFY bar, 'Coffee please';
+NOTIFY bar, 'Coffee please';
+NOTIFY bar, 'Milk please';
+NOTIFY ALL bar, 'Milk please';
+SAVEPOINT s;
+NOTIFY bar, 'Coffee please';
+COMMIT;
+Asynchronous notification "bar" with payload "Coffee please" received from server process with PID 31517.
+Asynchronous notification "bar" with payload "Milk please" received from server process with PID 31517.
+Asynchronous notification "bar" with payload "Milk please" received from server process with PID 31517.
+Asynchronous notification "bar" with payload "Coffee please" received from server process with PID 31517.
</programlisting></para>
</refsect1>
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index c39ac3a..54d1680 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -524,7 +524,42 @@ pg_notify(PG_FUNCTION_ARGS)
/* For NOTIFY as a statement, this is checked in ProcessUtility */
PreventCommandDuringRecovery("NOTIFY");
- Async_Notify(channel, payload);
+ Async_Notify(channel, payload, false);
+
+ PG_RETURN_VOID();
+}
+
+
+/*
+ * pg_notify_3args
+ * SQL function to send a notification event, 3-argument version
+ */
+Datum
+pg_notify_3args(PG_FUNCTION_ARGS)
+{
+ const char *channel;
+ const char *payload;
+ bool use_all;
+
+ if (PG_ARGISNULL(0))
+ channel = "";
+ else
+ channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
+
+ if (PG_ARGISNULL(1))
+ payload = "";
+ else
+ payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
+
+ if (PG_ARGISNULL(2))
+ use_all = false;
+ else
+ use_all = PG_GETARG_BOOL(2);
+
+ /* For NOTIFY as a statement, this is checked in ProcessUtility */
+ PreventCommandDuringRecovery("NOTIFY");
+
+ Async_Notify(channel, payload, use_all);
PG_RETURN_VOID();
}
@@ -540,7 +575,7 @@ pg_notify(PG_FUNCTION_ARGS)
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*/
void
-Async_Notify(const char *channel, const char *payload)
+Async_Notify(const char *channel, const char *payload, bool use_all)
{
Notification *n;
MemoryContext oldcontext;
@@ -570,9 +605,10 @@ Async_Notify(const char *channel, const char *payload)
errmsg("payload string too long")));
}
- /* no point in making duplicate entries in the list ... */
- if (AsyncExistsPendingNotify(channel, payload))
- return;
+ if (!use_all)
+ /* remove duplicate entries in the list */
+ if (AsyncExistsPendingNotify(channel, payload))
+ return;
/*
* The notification list needs to live until end of transaction, so store
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index b307b48..7203f4a 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -8528,11 +8528,12 @@ DropRuleStmt:
*
*****************************************************************************/
-NotifyStmt: NOTIFY ColId notify_payload
+NotifyStmt: NOTIFY all_or_distinct ColId notify_payload
{
NotifyStmt *n = makeNode(NotifyStmt);
- n->conditionname = $2;
- n->payload = $3;
+ n->use_all = $2;
+ n->conditionname = $3;
+ n->payload = $4;
$$ = (Node *)n;
}
;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 045f7f0..0e50561 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -599,7 +599,7 @@ standard_ProcessUtility(Node *parsetree,
NotifyStmt *stmt = (NotifyStmt *) parsetree;
PreventCommandDuringRecovery("NOTIFY");
- Async_Notify(stmt->conditionname, stmt->payload);
+ Async_Notify(stmt->conditionname, stmt->payload, stmt->use_all);
}
break;
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 2222e8f..7df163e 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -4005,6 +4005,8 @@ DESCR("trigger description with pretty-print option");
/* asynchronous notifications */
DATA(insert OID = 3035 ( pg_listening_channels PGNSP PGUID 12 1 10 0 0 f f f f t t s r 0 0 25 "" _null_ _null_ _null_ _null_ _null_ pg_listening_channels _null_ _null_ _null_ ));
DESCR("get the channels that the current backend listens to");
+DATA(insert OID = 2561 ( pg_notify PGNSP PGUID 12 1 0 0 0 f f f f f f v r 3 0 2278 "25 25 16" _null_ _null_ _null_ _null_ _null_ pg_notify_3args _null_ _null_ _null_ ));
+DESCR("send a notification event");
DATA(insert OID = 3036 ( pg_notify PGNSP PGUID 12 1 0 0 0 f f f f f f v r 2 0 2278 "25 25" _null_ _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ ));
DESCR("send a notification event");
DATA(insert OID = 3296 ( pg_notification_queue_usage PGNSP PGUID 12 1 0 0 0 f f f f t f v s 0 0 701 "" _null_ _null_ _null_ _null_ _null_ pg_notification_queue_usage _null_ _null_ _null_ ));
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index b4c13fa..3a2c1c2 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -29,7 +29,7 @@ extern Size AsyncShmemSize(void);
extern void AsyncShmemInit(void);
/* notify-related SQL statements */
-extern void Async_Notify(const char *channel, const char *payload);
+extern void Async_Notify(const char *channel, const char *payload, bool use_all);
extern void Async_Listen(const char *channel);
extern void Async_Unlisten(const char *channel);
extern void Async_UnlistenAll(void);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 2fd0629..7f1f01d 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2590,6 +2590,7 @@ typedef struct NotifyStmt
NodeTag type;
char *conditionname; /* condition name to notify */
char *payload; /* the payload string, or NULL if none */
+ bool use_all; /* ALL option */
} NotifyStmt;
/* ----------------------
diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out
index 19cbe38..c650b90 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -8,6 +8,18 @@ SELECT pg_notify('notify_async1','sample message1');
(1 row)
+SELECT pg_notify('notify_async1','sample message1',false);
+ pg_notify
+-----------
+
+(1 row)
+
+SELECT pg_notify('notify_async1','sample_message1',true);
+ pg_notify
+-----------
+
+(1 row)
+
SELECT pg_notify('notify_async1','');
pg_notify
-----------
@@ -29,6 +41,8 @@ SELECT pg_notify('notify_async_channel_name_too_long____________________________
ERROR: channel name too long
--Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
NOTIFY notify_async2;
+NOTIFY DISTINCT notify_async2;
+NOTIFY ALL notify_async2;
LISTEN notify_async2;
UNLISTEN notify_async2;
UNLISTEN *;
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 40f6e01..6e53b86 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -4,6 +4,8 @@
--Should work. Send a valid message via a valid channel name
SELECT pg_notify('notify_async1','sample message1');
+SELECT pg_notify('notify_async1','sample message1',false);
+SELECT pg_notify('notify_async1','sample_message1',true);
SELECT pg_notify('notify_async1','');
SELECT pg_notify('notify_async1',NULL);
@@ -14,6 +16,8 @@ SELECT pg_notify('notify_async_channel_name_too_long____________________________
--Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
NOTIFY notify_async2;
+NOTIFY DISTINCT notify_async2;
+NOTIFY ALL notify_async2;
LISTEN notify_async2;
UNLISTEN notify_async2;
UNLISTEN *;
On 2/9/16, Tom Lane <tgl@sss.pgh.pa.us> wrote:
FWIW, I think it would be a good thing if the NOTIFY statement syntax were
not remarkably different from the syntax used in the pg_notify() function
call. To do otherwise would certainly be confusing. So on the whole
I'd go with the "NOTIFY channel [ , payload [ , mode ] ]" option.
I'm quite interested in getting this addressed in time for 9.6 as I'll
be using NOTIFY extensively in a project and I agree with Craig that
the deduplication is frustrating both because you sometimes want every
event and because it can apparently cause O(n^2) behaviour (which I
didn't know before this thread). If another use case for suppressing
deduplication is needed, consider publishing events like "inserted
tuple", "deleted tuple" from triggers and a transaction that does
"insert, delete, insert" which the client then sees as "insert,
delete, oops nothing else".
Tom's proposal allows for more flexible modes than just the ALL and
DISTINCT keywords and accommodates the concern that DISTINCT will lead
to bug reports about not really being distinct due to savepoints.
Android has a similar thing for push notifications to mobile devices
which they call collapse:
https://developers.google.com/cloud-messaging/concept-options, search
for collapse_key.
So I propose NOTIFY channel [ , payload [ , collapse_mode ] ] with
collapse mode being:
* 'never'
** Filip's proposed behaviour for the ALL option
** if specified, every notification is queued regardless what's in the queue
* 'maybe'
** vague word allowing for flexibility in what the server decides to do
** current behaviour
** improves performance for big transactions if a row trigger
creates the same payload over and over one after the other due to the
current optimization of checking the tail of the list
** has performance problems O(n^2) for big transactions with
different payloads
*** the performance problems can be addressed by a different
patch which uses a hash table, or decides to collapse less
aggressively (Tom's check last 100 idea), or whatever else
*** in the meantime the 'never' mode acts as a good workaround
In the future we might support an 'always' collapse_mode which would
really be always, including across savepoints. Or an
'only_inside_savepoints' which guarantees the current behaviour.
Filip, do you agree with Tom's proposal? Do you plan to rework the
patch on these lines? If you are I'll try to review it, if not I could
give it a shot as I'm interested in having this in 9.6.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, Feb 19, 2016 at 10:09 PM, Catalin Iacob <iacobcatalin@gmail.com>
wrote:
On 2/9/16, Tom Lane <tgl@sss.pgh.pa.us> wrote:
FWIW, I think it would be a good thing if the NOTIFY statement syntax
were
not remarkably different from the syntax used in the pg_notify() function
call. To do otherwise would certainly be confusing. So on the whole
I'd go with the "NOTIFY channel [ , payload [ , mode ] ]" option.I'm quite interested in getting this addressed in time for 9.6 as I'll
be using NOTIFY extensively in a project and I agree with Craig that
the deduplication is frustrating both because you sometimes want every
event and because it can apparently cause O(n^2) behaviour (which I
didn't know before this thread). If another use case for suppressing
deduplication is needed, consider publishing events like "inserted
tuple", "deleted tuple" from triggers and a transaction that does
"insert, delete, insert" which the client then sees as "insert,
delete, oops nothing else".Tom's proposal allows for more flexible modes than just the ALL and
DISTINCT keywords and accommodates the concern that DISTINCT will lead
to bug reports about not really being distinct due to savepoints.Android has a similar thing for push notifications to mobile devices
which they call collapse:
https://developers.google.com/cloud-messaging/concept-options, search
for collapse_key.So I propose NOTIFY channel [ , payload [ , collapse_mode ] ] with
collapse mode being:* 'never'
** Filip's proposed behaviour for the ALL option
** if specified, every notification is queued regardless what's in the
queue* 'maybe'
** vague word allowing for flexibility in what the server decides to do
** current behaviour
** improves performance for big transactions if a row trigger
creates the same payload over and over one after the other due to the
current optimization of checking the tail of the list
** has performance problems O(n^2) for big transactions with
different payloads
*** the performance problems can be addressed by a different
patch which uses a hash table, or decides to collapse less
aggressively (Tom's check last 100 idea), or whatever else
*** in the meantime the 'never' mode acts as a good workaroundIn the future we might support an 'always' collapse_mode which would
really be always, including across savepoints. Or an
'only_inside_savepoints' which guarantees the current behaviour.Filip, do you agree with Tom's proposal? Do you plan to rework the
patch on these lines? If you are I'll try to review it, if not I could
give it a shot as I'm interested in having this in 9.6.
I see that Tom's remarks give more flexibility, and your refinement makes
sense.
I was stuck because both syntaxes have their ugliness. NOTIFY allows the
payload to be NULL:
NOTIFY chan01;
How would this look like in "never" mode?
NOTIFY chan01, NULL, 'never'; -- seems very cryptic.
On Sat, Feb 20, 2016 at 2:00 PM, Filip Rembiałkowski
<filip.rembialkowski@gmail.com> wrote:
I was stuck because both syntaxes have their ugliness. NOTIFY allows the
payload to be NULL:
NOTIFY chan01;How would this look like in "never" mode?
NOTIFY chan01, NULL, 'never'; -- seems very cryptic.
The docs say:
"The information passed to the client for a notification event
includes the notification channel name, the notifying session's server
process PID, and the payload string, which is an empty string if it
has not been specified."
So a missing payload is not a SQL NULL but an empty string. This means
you would have:
NOTIFY chan01;
NOTIFY chan01, ''; -- same as above
NOTIFY chan01, '', 'maybe'; -- same as above
NOTIFY chan01, '', 'never'; -- send this all the time
Seems ok to me.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hi Filip,
On 2/20/16 8:00 AM, Filip Rembiałkowski wrote:
On Fri, Feb 19, 2016 at 10:09 PM, Catalin Iacob <iacobcatalin@gmail.com
On 2/9/16, Tom Lane <tgl@sss.pgh.pa.us <mailto:tgl@sss.pgh.pa.us>>
wrote:FWIW, I think it would be a good thing if the NOTIFY statement syntax were
not remarkably different from the syntax used in the pg_notify() function
call. To do otherwise would certainly be confusing. So on the whole
I'd go with the "NOTIFY channel [ , payload [ , mode ] ]" option.Filip, do you agree with Tom's proposal? Do you plan to rework the
patch on these lines? If you are I'll try to review it, if not I could
give it a shot as I'm interested in having this in 9.6.I see that Tom's remarks give more flexibility, and your refinement
makes sense.
It looks like we are waiting on a new patch from you before this can be
reviewed. Are you close to having that done?
Meanwhile, I have marked it "Waiting on Author".
--
-David
david@pgmasters.net
On 3/11/16 1:46 PM, David Steele wrote:
Hi Filip,
On 2/20/16 8:00 AM, Filip Rembiałkowski wrote:
On Fri, Feb 19, 2016 at 10:09 PM, Catalin Iacob <iacobcatalin@gmail.com
On 2/9/16, Tom Lane <tgl@sss.pgh.pa.us <mailto:tgl@sss.pgh.pa.us>>
wrote:FWIW, I think it would be a good thing if the NOTIFY statement syntax were
not remarkably different from the syntax used in the pg_notify() function
call. To do otherwise would certainly be confusing. So on the whole
I'd go with the "NOTIFY channel [ , payload [ , mode ] ]" option.Filip, do you agree with Tom's proposal? Do you plan to rework the
patch on these lines? If you are I'll try to review it, if not I could
give it a shot as I'm interested in having this in 9.6.I see that Tom's remarks give more flexibility, and your refinement
makes sense.It looks like we are waiting on a new patch from you before this can be
reviewed. Are you close to having that done?Meanwhile, I have marked it "Waiting on Author".
Since there has been no activity on this thread since before the CF and
no response from the author I have marked this "returned with feedback".
Please feel free to resubmit for 9.7!
--
-David
david@pgmasters.net
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Thanks for all the input.
Finally I found time and motivation to revive this.
See attached patch... I'm ready to work on so it can get merged in the next CF.
Show quoted text
On Thu, Mar 17, 2016 at 12:44 AM David Steele <david@pgmasters.net> wrote:
On 3/11/16 1:46 PM, David Steele wrote:
Hi Filip,
On 2/20/16 8:00 AM, Filip Rembiałkowski wrote:
On Fri, Feb 19, 2016 at 10:09 PM, Catalin Iacob <iacobcatalin@gmail.com
On 2/9/16, Tom Lane <tgl@sss.pgh.pa.us <mailto:tgl@sss.pgh.pa.us>>
wrote:FWIW, I think it would be a good thing if the NOTIFY statement syntax were
not remarkably different from the syntax used in the pg_notify() function
call. To do otherwise would certainly be confusing. So on the whole
I'd go with the "NOTIFY channel [ , payload [ , mode ] ]" option.Filip, do you agree with Tom's proposal? Do you plan to rework the
patch on these lines? If you are I'll try to review it, if not I could
give it a shot as I'm interested in having this in 9.6.I see that Tom's remarks give more flexibility, and your refinement
makes sense.It looks like we are waiting on a new patch from you before this can be
reviewed. Are you close to having that done?Meanwhile, I have marked it "Waiting on Author".
Since there has been no activity on this thread since before the CF and
no response from the author I have marked this "returned with feedback".
Please feel free to resubmit for 9.7!--
-David
david@pgmasters.net
Attachments:
postgres-notify-send-mode-v1.patchtext/x-patch; charset=US-ASCII; name=postgres-notify-send-mode-v1.patchDownload
contrib/tcn/tcn.c | 2 +-
doc/src/sgml/ref/notify.sgml | 63 +++++++++++++++++++++++++--------
src/backend/commands/async.c | 69 +++++++++++++++++++++++++++++++------
src/backend/nodes/copyfuncs.c | 7 ++--
src/backend/nodes/equalfuncs.c | 7 ++--
src/backend/nodes/outfuncs.c | 3 +-
src/backend/nodes/readfuncs.c | 3 +-
src/backend/parser/gram.y | 44 +++++++++++++++++------
src/backend/tcop/utility.c | 8 ++---
src/backend/utils/adt/ruleutils.c | 4 ++-
src/include/catalog/pg_proc.dat | 4 +++
src/include/commands/async.h | 8 ++++-
src/include/nodes/parsenodes.h | 18 +++++++---
src/test/regress/expected/async.out | 20 +++++++++++
src/test/regress/sql/async.sql | 9 +++++
15 files changed, 212 insertions(+), 57 deletions(-)
diff --git a/contrib/tcn/tcn.c b/contrib/tcn/tcn.c
index 5355a64c5e..196e0989a4 100644
--- a/contrib/tcn/tcn.c
+++ b/contrib/tcn/tcn.c
@@ -161,7 +161,7 @@ triggered_change_notification(PG_FUNCTION_ARGS)
strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\'');
}
- Async_Notify(channel, payload->data);
+ Async_Notify(channel, payload->data, NOTIFY_SEND_UNIQUE);
}
ReleaseSysCache(indexTuple);
break;
diff --git a/doc/src/sgml/ref/notify.sgml b/doc/src/sgml/ref/notify.sgml
index e0e125a2a2..ad0795fd2d 100644
--- a/doc/src/sgml/ref/notify.sgml
+++ b/doc/src/sgml/ref/notify.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-NOTIFY <replaceable class="parameter">channel</replaceable> [ , <replaceable class="parameter">payload</replaceable> ]
+NOTIFY <replaceable class="parameter">channel</replaceable> [ , <replaceable class="parameter">payload</replaceable> [ , <replaceable class="parameter">send_mode</replaceable> ] ]
</synopsis>
</refsynopsisdiv>
@@ -47,10 +47,10 @@ NOTIFY <replaceable class="parameter">channel</replaceable> [ , <replaceable cla
</para>
<para>
- The information passed to the client for a notification event includes the
- notification channel
- name, the notifying session's server process <acronym>PID</acronym>, and the
- payload string, which is an empty string if it has not been specified.
+ The information passed to the client for a notification event includes
+ the notification channel name, the notifying session's server process
+ <acronym>PID</acronym>, and the payload string, which is an empty string
+ if it has not been specified.
</para>
<para>
@@ -92,21 +92,13 @@ NOTIFY <replaceable class="parameter">channel</replaceable> [ , <replaceable cla
is that applications using <command>NOTIFY</command> for real-time signaling
should try to keep their transactions short.
</para>
-
<para>
- If the same channel name is signaled multiple times from the same
- transaction with identical payload strings, the
- database server can decide to deliver a single notification only.
- On the other hand, notifications with distinct payload strings will
- always be delivered as distinct notifications. Similarly, notifications from
- different transactions will never get folded into one notification.
Except for dropping later instances of duplicate notifications,
<command>NOTIFY</command> guarantees that notifications from the same
transaction get delivered in the order they were sent. It is also
- guaranteed that messages from different transactions are delivered in
- the order in which the transactions committed.
+ guaranteed that messages from different transactions are delivered
+ in the order in which the transactions committed.
</para>
-
<para>
It is common for a client that executes <command>NOTIFY</command>
to be listening on the same notification channel itself. In that case
@@ -147,6 +139,23 @@ NOTIFY <replaceable class="parameter">channel</replaceable> [ , <replaceable cla
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><replaceable class="parameter">send_mode</replaceable></term>
+ <listitem>
+ <para>
+ Controls collapsing of repeated messages. Can be either
+ <literal>'unique'</literal> (the default) or <literal>'all'</literal>.
+
+ When set to <literal>'unique'</literal>, notification will be skipped if
+ the same channel was already signaled with identical payload in the same
+ transaction block (finished with <literal>COMMIT</literal>,
+ <literal>END</literal> or <literal>SAVEPOINT</literal>).
+
+ When set to <literal>'all'</literal>, duplicates will not be collapsed.
+
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect1>
@@ -190,6 +199,14 @@ NOTIFY <replaceable class="parameter">channel</replaceable> [ , <replaceable cla
to use than the <command>NOTIFY</command> command if you need to work with
non-constant channel names and payloads.
</para>
+ <para>
+ There is a three-argument version,
+ <literal><function>pg_notify</function>(<type>text</type>,
+ <type>text</type>, <type>text</type>)</literal>. The third argument
+ corresponds to the <replaceable
+ class="parameter">send_mode</replaceable> parameter of the
+ <literal>NOTIFY</literal> command.
+ </para>
</refsect2>
</refsect1>
@@ -210,6 +227,22 @@ Asynchronous notification "virtual" with payload "This is the payload" received
LISTEN foo;
SELECT pg_notify('fo' || 'o', 'pay' || 'load');
Asynchronous notification "foo" with payload "payload" received from server process with PID 14728.
+
+/* Identical messages from same (sub-) transaction will be eliminated,
+ unless you use send_mode = 'all' */
+LISTEN bar;
+BEGIN;
+NOTIFY bar, 'Coffee please';
+NOTIFY bar, 'Coffee please';
+NOTIFY bar, 'Milk please';
+NOTIFY bar, 'Milk please', 'off';
+SAVEPOINT s;
+NOTIFY bar, 'Coffee please';
+COMMIT;
+Asynchronous notification "bar" with payload "Coffee please" received from server process with PID 31517.
+Asynchronous notification "bar" with payload "Milk please" received from server process with PID 31517.
+Asynchronous notification "bar" with payload "Milk please" received from server process with PID 31517.
+Asynchronous notification "bar" with payload "Coffee please" received from server process with PID 31517.
</programlisting></para>
</refsect1>
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 5a7ee0de4c..f953424986 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -49,12 +49,14 @@
* 4. The NOTIFY statement (routine Async_Notify) stores the notification in
* a backend-local list which will not be processed until transaction end.
*
- * Duplicate notifications from the same transaction are sent out as one
- * notification only. This is done to save work when for example a trigger
- * on a 2 million row table fires a notification for each row that has been
- * changed. If the application needs to receive every single notification
- * that has been sent, it can easily add some unique string into the extra
- * payload parameter.
+ * In the default send_mode ('unique'), duplicate notifications from the
+ * same transaction are sent out as one notification only. This is done to
+ * save work when for example a trigger on a 2 million row table fires a
+ * notification for each row that has been changed. If the application needs
+ * to receive every single notification that has been sent, it can easily
+ * add some unique string into the extra payload parameter.
+ *
+ * If send_mode is 'all', de-duplication is not performed.
*
* When the transaction is ready to commit, PreCommit_Notify() adds the
* pending notifications to the head of the queue. The head pointer of the
@@ -523,7 +525,51 @@ pg_notify(PG_FUNCTION_ARGS)
/* For NOTIFY as a statement, this is checked in ProcessUtility */
PreventCommandDuringRecovery("NOTIFY");
- Async_Notify(channel, payload);
+ Async_Notify(channel, payload, true);
+
+ PG_RETURN_VOID();
+}
+
+
+/*
+ * pg_notify_mode
+ * SQL function to send a notification event, 3-argument version
+ */
+Datum
+pg_notify_mode(PG_FUNCTION_ARGS)
+{
+ const char *channel;
+ const char *payload;
+ const char *p_send_mode;
+ NotifySendMode send_mode;
+
+ if (PG_ARGISNULL(0))
+ channel = "";
+ else
+ channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
+
+ if (PG_ARGISNULL(1))
+ payload = "";
+ else
+ payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
+
+ if (PG_ARGISNULL(2))
+ {
+ send_mode = NOTIFY_SEND_UNIQUE;
+ }
+ else
+ {
+ p_send_mode = text_to_cstring(PG_GETARG_TEXT_PP(2));
+ if (strcmp(p_send_mode, "all") == 0)
+ send_mode = NOTIFY_SEND_ALL;
+ else
+ send_mode = NOTIFY_SEND_UNIQUE;
+ }
+
+ /* For NOTIFY as a statement, this is checked in ProcessUtility */
+ PreventCommandDuringRecovery("NOTIFY");
+
+ Async_Notify(channel, payload, send_mode);
PG_RETURN_VOID();
}
@@ -539,7 +585,7 @@ pg_notify(PG_FUNCTION_ARGS)
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*/
void
-Async_Notify(const char *channel, const char *payload)
+Async_Notify(const char *channel, const char *payload, NotifySendMode send_mode)
{
Notification *n;
MemoryContext oldcontext;
@@ -569,9 +615,10 @@ Async_Notify(const char *channel, const char *payload)
errmsg("payload string too long")));
}
- /* no point in making duplicate entries in the list ... */
- if (AsyncExistsPendingNotify(channel, payload))
- return;
+ if (send_mode == NOTIFY_SEND_UNIQUE)
+ /* remove duplicate entries in the list */
+ if (AsyncExistsPendingNotify(channel, payload))
+ return;
/*
* The notification list needs to live until end of transaction, so store
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index a0c1389488..ddefeda8d3 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3622,8 +3622,9 @@ _copyNotifyStmt(const NotifyStmt *from)
{
NotifyStmt *newnode = makeNode(NotifyStmt);
- COPY_STRING_FIELD(conditionname);
+ COPY_STRING_FIELD(channel);
COPY_STRING_FIELD(payload);
+ COPY_SCALAR_FIELD(send_mode);
return newnode;
}
@@ -3633,7 +3634,7 @@ _copyListenStmt(const ListenStmt *from)
{
ListenStmt *newnode = makeNode(ListenStmt);
- COPY_STRING_FIELD(conditionname);
+ COPY_STRING_FIELD(channel);
return newnode;
}
@@ -3643,7 +3644,7 @@ _copyUnlistenStmt(const UnlistenStmt *from)
{
UnlistenStmt *newnode = makeNode(UnlistenStmt);
- COPY_STRING_FIELD(conditionname);
+ COPY_STRING_FIELD(channel);
return newnode;
}
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 3cab90e9f8..1c60b55b94 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1480,8 +1480,9 @@ _equalRuleStmt(const RuleStmt *a, const RuleStmt *b)
static bool
_equalNotifyStmt(const NotifyStmt *a, const NotifyStmt *b)
{
- COMPARE_STRING_FIELD(conditionname);
+ COMPARE_STRING_FIELD(channel);
COMPARE_STRING_FIELD(payload);
+ COMPARE_SCALAR_FIELD(send_mode);
return true;
}
@@ -1489,7 +1490,7 @@ _equalNotifyStmt(const NotifyStmt *a, const NotifyStmt *b)
static bool
_equalListenStmt(const ListenStmt *a, const ListenStmt *b)
{
- COMPARE_STRING_FIELD(conditionname);
+ COMPARE_STRING_FIELD(channel);
return true;
}
@@ -1497,7 +1498,7 @@ _equalListenStmt(const ListenStmt *a, const ListenStmt *b)
static bool
_equalUnlistenStmt(const UnlistenStmt *a, const UnlistenStmt *b)
{
- COMPARE_STRING_FIELD(conditionname);
+ COMPARE_STRING_FIELD(channel);
return true;
}
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index d9a5e8cb6a..d228cd51bd 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2660,8 +2660,9 @@ _outNotifyStmt(StringInfo str, const NotifyStmt *node)
{
WRITE_NODE_TYPE("NOTIFY");
- WRITE_STRING_FIELD(conditionname);
+ WRITE_STRING_FIELD(channel);
WRITE_STRING_FIELD(payload);
+ WRITE_ENUM_FIELD(send_mode, NotifySendMode);
}
static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 0a34fd9684..a98211da45 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -296,8 +296,9 @@ _readNotifyStmt(void)
{
READ_LOCALS(NotifyStmt);
- READ_STRING_FIELD(conditionname);
+ READ_STRING_FIELD(channel);
READ_STRING_FIELD(payload);
+ READ_ENUM_FIELD(send_mode, NotifySendMode);
READ_DONE();
}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 753af6073f..ce44312ac6 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -525,6 +525,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <ival> Iconst SignedIconst
%type <str> Sconst comment_text notify_payload
+%type <boolean> notify_send_mode
%type <str> RoleId opt_boolean_or_string
%type <list> var_list
%type <str> ColId ColLabel var_name type_function_name param_name
@@ -9741,24 +9742,45 @@ opt_instead:
*
*****************************************************************************/
-NotifyStmt: NOTIFY ColId notify_payload
- {
- NotifyStmt *n = makeNode(NotifyStmt);
- n->conditionname = $2;
- n->payload = $3;
- $$ = (Node *)n;
- }
+NotifyStmt:
+ NOTIFY ColId
+ {
+ NotifyStmt *n = makeNode(NotifyStmt);
+ n->channel = $2;
+ n->payload = NULL;
+ n->send_mode = NOTIFY_SEND_UNIQUE;
+ $$ = (Node *)n;
+ }
+ | NOTIFY ColId notify_payload
+ {
+ NotifyStmt *n = makeNode(NotifyStmt);
+ n->channel = $2;
+ n->payload = $3;
+ n->send_mode = NOTIFY_SEND_UNIQUE;
+ $$ = (Node *)n;
+ }
+ | NOTIFY ColId notify_payload notify_send_mode
+ {
+ NotifyStmt *n = makeNode(NotifyStmt);
+ n->channel = $2;
+ n->payload = $3;
+ n->send_mode = $4;
+ $$ = (Node *)n;
+ }
;
notify_payload:
',' Sconst { $$ = $2; }
- | /*EMPTY*/ { $$ = NULL; }
+ ;
+
+notify_send_mode:
+ ',' opt_boolean_or_string { $$ = $2; }
;
ListenStmt: LISTEN ColId
{
ListenStmt *n = makeNode(ListenStmt);
- n->conditionname = $2;
+ n->channel = $2;
$$ = (Node *)n;
}
;
@@ -9767,13 +9789,13 @@ UnlistenStmt:
UNLISTEN ColId
{
UnlistenStmt *n = makeNode(UnlistenStmt);
- n->conditionname = $2;
+ n->channel = $2;
$$ = (Node *)n;
}
| UNLISTEN '*'
{
UnlistenStmt *n = makeNode(UnlistenStmt);
- n->conditionname = NULL;
+ n->channel = NULL;
$$ = (Node *)n;
}
;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 6ec795f1b4..c0fc895535 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -611,7 +611,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
NotifyStmt *stmt = (NotifyStmt *) parsetree;
PreventCommandDuringRecovery("NOTIFY");
- Async_Notify(stmt->conditionname, stmt->payload);
+ Async_Notify(stmt->channel, stmt->payload, stmt->send_mode);
}
break;
@@ -621,7 +621,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
PreventCommandDuringRecovery("LISTEN");
CheckRestrictedOperation("LISTEN");
- Async_Listen(stmt->conditionname);
+ Async_Listen(stmt->channel);
}
break;
@@ -631,8 +631,8 @@ standard_ProcessUtility(PlannedStmt *pstmt,
/* we allow UNLISTEN during recovery, as it's a noop */
CheckRestrictedOperation("UNLISTEN");
- if (stmt->conditionname)
- Async_Unlisten(stmt->conditionname);
+ if (stmt->channel)
+ Async_Unlisten(stmt->channel);
else
Async_UnlistenAll();
}
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 85055bbb95..c4b83c50f3 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -6581,11 +6581,13 @@ get_utility_query_def(Query *query, deparse_context *context)
appendContextKeyword(context, "",
0, PRETTYINDENT_STD, 1);
appendStringInfo(buf, "NOTIFY %s",
- quote_identifier(stmt->conditionname));
+ quote_identifier(stmt->channel));
if (stmt->payload)
{
appendStringInfoString(buf, ", ");
simple_quote_literal(buf, stmt->payload);
+ appendStringInfo(buf, ", '%s'",
+ (stmt->send_mode == NOTIFY_SEND_ALL ? "all" : "unique"));
}
}
else
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 2e9d6cf2f5..6d38bbcfc0 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -7549,6 +7549,10 @@
proname => 'pg_notify', proisstrict => 'f', provolatile => 'v',
proparallel => 'r', prorettype => 'void', proargtypes => 'text text',
prosrc => 'pg_notify' },
+{ oid => '3998', descr => 'send a notification event',
+ proname => 'pg_notify', proisstrict => 'f', provolatile => 'v',
+ proparallel => 'r', prorettype => 'void', proargtypes => 'text text text',
+ prosrc => 'pg_notify_mode' },
{ oid => '3296',
descr => 'get the fraction of the asynchronous notification queue currently in use',
proname => 'pg_notification_queue_usage', provolatile => 'v',
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index cfea78e039..1154f3c281 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -22,6 +22,12 @@
*/
#define NUM_ASYNC_BUFFERS 8
+typedef enum NotifySendMode
+{
+ NOTIFY_SEND_UNIQUE,
+ NOTIFY_SEND_ALL
+} NotifySendMode;
+
extern bool Trace_notify;
extern volatile sig_atomic_t notifyInterruptPending;
@@ -33,7 +39,7 @@ extern void NotifyMyFrontEnd(const char *channel,
int32 srcPid);
/* notify-related SQL statements */
-extern void Async_Notify(const char *channel, const char *payload);
+extern void Async_Notify(const char *channel, const char *payload, NotifySendMode send_mode);
extern void Async_Listen(const char *channel);
extern void Async_Unlisten(const char *channel);
extern void Async_UnlistenAll(void);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index fe35783359..e669b76b39 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -27,6 +27,7 @@
#include "nodes/primnodes.h"
#include "nodes/value.h"
#include "partitioning/partdefs.h"
+#include "commands/async.h"
typedef enum OverridingKind
@@ -2944,12 +2945,19 @@ typedef struct RuleStmt
/* ----------------------
* Notify Statement
* ----------------------
+typedef enum NotifySendMode
+{
+ NOTIFY_SEND_UNIQUE,
+ NOTIFY_SEND_ALL
+} NotifySendMode;
*/
+
typedef struct NotifyStmt
{
- NodeTag type;
- char *conditionname; /* condition name to notify */
- char *payload; /* the payload string, or NULL if none */
+ NodeTag type;
+ char *channel; /* channel name to notify */
+ char *payload; /* the payload string, or NULL if none */
+ NotifySendMode send_mode; /* Send mode */
} NotifyStmt;
/* ----------------------
@@ -2959,7 +2967,7 @@ typedef struct NotifyStmt
typedef struct ListenStmt
{
NodeTag type;
- char *conditionname; /* condition name to listen on */
+ char *channel; /* channel name to listen on */
} ListenStmt;
/* ----------------------
@@ -2969,7 +2977,7 @@ typedef struct ListenStmt
typedef struct UnlistenStmt
{
NodeTag type;
- char *conditionname; /* name to unlisten on, or NULL for all */
+ char *channel; /* channel name to unlisten, or NULL for all */
} UnlistenStmt;
/* ----------------------
diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out
index 19cbe38e63..7555d0bcc2 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -8,6 +8,18 @@ SELECT pg_notify('notify_async1','sample message1');
(1 row)
+SELECT pg_notify('notify_async1','sample_message1','unique');
+ pg_notify
+-----------
+
+(1 row)
+
+SELECT pg_notify('notify_async1','sample_message1','all');
+ pg_notify
+-----------
+
+(1 row)
+
SELECT pg_notify('notify_async1','');
pg_notify
-----------
@@ -27,8 +39,16 @@ SELECT pg_notify(NULL,'sample message1');
ERROR: channel name cannot be empty
SELECT pg_notify('notify_async_channel_name_too_long______________________________','sample_message1');
ERROR: channel name too long
+-- Should fail. Invalid 3rd parameter
+NOTIFY notify_async2, 'test', 'invalid';
+ERROR: ?
+NOTIFY notify_async2, 'test', true;
+ERROR: ?
--Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
NOTIFY notify_async2;
+NOTIFY notify_async2, '';
+NOTIFY notify_async2, '', 'unique';
+NOTIFY notify_async2, '', 'all';
LISTEN notify_async2;
UNLISTEN notify_async2;
UNLISTEN *;
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 40f6e01538..ad349cce37 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -4,6 +4,8 @@
--Should work. Send a valid message via a valid channel name
SELECT pg_notify('notify_async1','sample message1');
+SELECT pg_notify('notify_async1','sample_message1','unique');
+SELECT pg_notify('notify_async1','sample_message1','all');
SELECT pg_notify('notify_async1','');
SELECT pg_notify('notify_async1',NULL);
@@ -12,8 +14,15 @@ SELECT pg_notify('','sample message1');
SELECT pg_notify(NULL,'sample message1');
SELECT pg_notify('notify_async_channel_name_too_long______________________________','sample_message1');
+-- Should fail. Invalid 3rd parameter
+NOTIFY notify_async2, 'test', 'invalid';
+NOTIFY notify_async2, 'test', true;
+
--Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
NOTIFY notify_async2;
+NOTIFY notify_async2, '';
+NOTIFY notify_async2, '', 'unique';
+NOTIFY notify_async2, '', 'all';
LISTEN notify_async2;
UNLISTEN notify_async2;
UNLISTEN *;
On Fri, Mar 8, 2019 at 1:37 PM Filip Rembiałkowski
<filip.rembialkowski@gmail.com> wrote:
See attached patch... I'm ready to work on so it can get merged in the next CF.
Hi Filip,
Seen on Travis:
async ... FAILED 126 ms
Looks like the new error isn't being raised for invalid send mode?
(What kind of error message is "?" anyway? :-))
ERROR: channel name too long
-- Should fail. Invalid 3rd parameter
NOTIFY notify_async2, 'test', 'invalid';
-ERROR: ?
NOTIFY notify_async2, 'test', true;
-ERROR: ?
--Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
NOTIFY notify_async2;
NOTIFY notify_async2, '';
--
Thomas Munro
https://enterprisedb.com
Thank you.
Here is my latest attempt, with actual syntax error handling.
Also, the syntax is updated to what Tom Lane suggested in other
thread (with another variant of the same thing, from Julien Demoor)
NOTIFY [ ( option [, ...] ) ] channel [ , payload ]
Still no hash table fallback is implemented, so this is *not* a
performance improvement. Only a little more flexibility.
Show quoted text
On Sat, Mar 9, 2019 at 3:31 AM Thomas Munro <thomas.munro@gmail.com> wrote:
On Fri, Mar 8, 2019 at 1:37 PM Filip Rembiałkowski
<filip.rembialkowski@gmail.com> wrote:See attached patch... I'm ready to work on so it can get merged in the next CF.
Hi Filip,
Seen on Travis:
async ... FAILED 126 ms
Looks like the new error isn't being raised for invalid send mode?
(What kind of error message is "?" anyway? :-))ERROR: channel name too long
-- Should fail. Invalid 3rd parameter
NOTIFY notify_async2, 'test', 'invalid';
-ERROR: ?
NOTIFY notify_async2, 'test', true;
-ERROR: ?
--Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
NOTIFY notify_async2;
NOTIFY notify_async2, '';--
Thomas Munro
https://enterprisedb.com
Attachments:
postgres-notify-options-20190310_01.patchtext/x-patch; charset=US-ASCII; name=postgres-notify-options-20190310_01.patchDownload
contrib/tcn/tcn.c | 2 +-
doc/src/sgml/ref/notify.sgml | 64 +++++++++++++++++++------
src/backend/commands/async.c | 93 +++++++++++++++++++++++++++++++------
src/backend/nodes/copyfuncs.c | 7 +--
src/backend/nodes/equalfuncs.c | 7 +--
src/backend/nodes/outfuncs.c | 3 +-
src/backend/nodes/readfuncs.c | 3 +-
src/backend/parser/gram.y | 78 ++++++++++++++++++++++++++-----
src/backend/tcop/utility.c | 8 ++--
src/backend/utils/adt/ruleutils.c | 2 +-
src/include/catalog/pg_proc.dat | 4 ++
src/include/commands/async.h | 4 +-
src/include/nodes/parsenodes.h | 12 +++--
src/test/regress/expected/async.out | 21 +++++++++
src/test/regress/sql/async.sql | 10 ++++
15 files changed, 257 insertions(+), 61 deletions(-)
diff --git a/contrib/tcn/tcn.c b/contrib/tcn/tcn.c
index 5355a64c5e..b80337a5ce 100644
--- a/contrib/tcn/tcn.c
+++ b/contrib/tcn/tcn.c
@@ -161,7 +161,7 @@ triggered_change_notification(PG_FUNCTION_ARGS)
strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\'');
}
- Async_Notify(channel, payload->data);
+ Async_Notify(channel, payload->data, true);
}
ReleaseSysCache(indexTuple);
break;
diff --git a/doc/src/sgml/ref/notify.sgml b/doc/src/sgml/ref/notify.sgml
index e0e125a2a2..e06b00f1f3 100644
--- a/doc/src/sgml/ref/notify.sgml
+++ b/doc/src/sgml/ref/notify.sgml
@@ -21,7 +21,11 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-NOTIFY <replaceable class="parameter">channel</replaceable> [ , <replaceable class="parameter">payload</replaceable> ]
+NOTIFY [ ( option [, ...] ) ] <replaceable class="parameter">channel</replaceable> [ , <replaceable class="parameter">payload</replaceable> ]
+
+<phrase>where <replaceable class="parameter">option</replaceable> can be one of:</phrase>
+
+COLLAPSE [ boolean ]
</synopsis>
</refsynopsisdiv>
@@ -47,10 +51,10 @@ NOTIFY <replaceable class="parameter">channel</replaceable> [ , <replaceable cla
</para>
<para>
- The information passed to the client for a notification event includes the
- notification channel
- name, the notifying session's server process <acronym>PID</acronym>, and the
- payload string, which is an empty string if it has not been specified.
+ The information passed to the client for a notification event includes
+ the notification channel name, the notifying session's server process
+ <acronym>PID</acronym>, and the payload string, which is an empty string
+ if it has not been specified.
</para>
<para>
@@ -92,21 +96,13 @@ NOTIFY <replaceable class="parameter">channel</replaceable> [ , <replaceable cla
is that applications using <command>NOTIFY</command> for real-time signaling
should try to keep their transactions short.
</para>
-
<para>
- If the same channel name is signaled multiple times from the same
- transaction with identical payload strings, the
- database server can decide to deliver a single notification only.
- On the other hand, notifications with distinct payload strings will
- always be delivered as distinct notifications. Similarly, notifications from
- different transactions will never get folded into one notification.
Except for dropping later instances of duplicate notifications,
<command>NOTIFY</command> guarantees that notifications from the same
transaction get delivered in the order they were sent. It is also
- guaranteed that messages from different transactions are delivered in
- the order in which the transactions committed.
+ guaranteed that messages from different transactions are delivered
+ in the order in which the transactions committed.
</para>
-
<para>
It is common for a client that executes <command>NOTIFY</command>
to be listening on the same notification channel itself. In that case
@@ -147,6 +143,21 @@ NOTIFY <replaceable class="parameter">channel</replaceable> [ , <replaceable cla
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><replaceable class="parameter">COLLAPSE</replaceable></term>
+ <listitem>
+ <para>
+ Controls collapsing of repeated messages.
+
+ When set to <literal>on</literal>, notification will be skipped if
+ the same channel was already signaled with identical payload in the same
+ transaction block (finished with <literal>COMMIT</literal>,
+ <literal>END</literal> or <literal>SAVEPOINT</literal>).
+
+ When set to <literal>off</literal>, duplicates will not be collapsed.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect1>
@@ -190,6 +201,13 @@ NOTIFY <replaceable class="parameter">channel</replaceable> [ , <replaceable cla
to use than the <command>NOTIFY</command> command if you need to work with
non-constant channel names and payloads.
</para>
+ <para>
+ There is a three-argument version,
+ <literal><function>pg_notify</function>(<type>text</type>,
+ <type>text</type>, <type>boolean</type>)</literal>. The third argument
+ corresponds to the <replaceable class="parameter">COLLAPSE</replaceable>
+ option of the <literal>NOTIFY</literal> command.
+ </para>
</refsect2>
</refsect1>
@@ -210,6 +228,22 @@ Asynchronous notification "virtual" with payload "This is the payload" received
LISTEN foo;
SELECT pg_notify('fo' || 'o', 'pay' || 'load');
Asynchronous notification "foo" with payload "payload" received from server process with PID 14728.
+
+/* Identical messages from same (sub-) transaction will be eliminated,
+ unless you use COLLAPSE off */
+LISTEN bar;
+BEGIN;
+NOTIFY bar, 'Coffee please';
+NOTIFY bar, 'Coffee please';
+NOTIFY bar, 'Milk please';
+NOTIFY (COLLAPSE off) bar, 'Milk please';
+SAVEPOINT s;
+NOTIFY bar, 'Coffee please';
+COMMIT;
+Asynchronous notification "bar" with payload "Coffee please" received from server process with PID 31517.
+Asynchronous notification "bar" with payload "Milk please" received from server process with PID 31517.
+Asynchronous notification "bar" with payload "Milk please" received from server process with PID 31517.
+Asynchronous notification "bar" with payload "Coffee please" received from server process with PID 31517.
</programlisting></para>
</refsect1>
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 5a7ee0de4c..0f1280b3f5 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -49,12 +49,14 @@
* 4. The NOTIFY statement (routine Async_Notify) stores the notification in
* a backend-local list which will not be processed until transaction end.
*
- * Duplicate notifications from the same transaction are sent out as one
- * notification only. This is done to save work when for example a trigger
- * on a 2 million row table fires a notification for each row that has been
- * changed. If the application needs to receive every single notification
- * that has been sent, it can easily add some unique string into the extra
- * payload parameter.
+ * By default (COLLAPSE on), duplicate notifications from the
+ * same transaction are sent out as one notification only. This is done to
+ * save work when for example a trigger on a 2 million row table fires a
+ * notification for each row that has been changed. If the application needs
+ * to receive every single notification that has been sent, it can easily
+ * add some unique string into the extra payload parameter.
+ *
+ * If COLLAPSE off, de-duplication is not performed.
*
* When the transaction is ready to commit, PreCommit_Notify() adds the
* pending notifications to the head of the queue. The head pointer of the
@@ -123,6 +125,7 @@
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
+#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
@@ -523,7 +526,42 @@ pg_notify(PG_FUNCTION_ARGS)
/* For NOTIFY as a statement, this is checked in ProcessUtility */
PreventCommandDuringRecovery("NOTIFY");
- Async_Notify(channel, payload);
+ Async_Notify(channel, payload, true);
+
+ PG_RETURN_VOID();
+}
+
+
+/*
+ * pg_notify_3args
+ * SQL function to send a notification event, 3-argument version
+ */
+Datum
+pg_notify_3args(PG_FUNCTION_ARGS)
+{
+ const char *channel;
+ const char *payload;
+ bool collapse;
+
+ if (PG_ARGISNULL(0))
+ channel = "";
+ else
+ channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
+
+ if (PG_ARGISNULL(1))
+ payload = "";
+ else
+ payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
+
+ if (PG_ARGISNULL(2))
+ collapse = true;
+ else
+ collapse = PG_GETARG_BOOL(2);
+
+ /* For NOTIFY as a statement, this is checked in ProcessUtility */
+ PreventCommandDuringRecovery("NOTIFY");
+
+ Async_Notify(channel, payload, collapse);
PG_RETURN_VOID();
}
@@ -539,10 +577,10 @@ pg_notify(PG_FUNCTION_ARGS)
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*/
void
-Async_Notify(const char *channel, const char *payload)
+Async_Notify(const char *channel, const char *payload, bool collapse)
{
- Notification *n;
- MemoryContext oldcontext;
+ Notification *n;
+ MemoryContext oldcontext;
if (IsParallelWorker())
elog(ERROR, "cannot send notifications from a parallel worker");
@@ -550,7 +588,7 @@ Async_Notify(const char *channel, const char *payload)
if (Trace_notify)
elog(DEBUG1, "Async_Notify(%s)", channel);
- /* a channel name must be specified */
+ /* channel name must be specified */
if (!channel || !strlen(channel))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -569,9 +607,10 @@ Async_Notify(const char *channel, const char *payload)
errmsg("payload string too long")));
}
- /* no point in making duplicate entries in the list ... */
- if (AsyncExistsPendingNotify(channel, payload))
- return;
+ if (collapse)
+ /* remove duplicate entries in the list */
+ if (AsyncExistsPendingNotify(channel, payload))
+ return;
/*
* The notification list needs to live until end of transaction, so store
@@ -595,6 +634,32 @@ Async_Notify(const char *channel, const char *payload)
MemoryContextSwitchTo(oldcontext);
}
+void
+Async_Notify_WithOptions(const char *channel,
+ const char *payload, List *options)
+{
+ ListCell *lc;
+
+ bool collapse;
+
+ collapse = true;
+
+ /* Parse options list. */
+ foreach(lc, options)
+ {
+ DefElem *opt = (DefElem *) lfirst(lc);
+
+ if (strcmp(opt->defname, "collapse") == 0)
+ collapse = defGetBoolean(opt);
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("unrecognized NOTIFY option \"%s\"",
+ opt->defname)));
+ }
+ Async_Notify(channel, payload, collapse);
+}
+
/*
* queue_listen
* Common code for listen, unlisten, unlisten all commands.
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index a8a735c247..df91526429 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3623,8 +3623,9 @@ _copyNotifyStmt(const NotifyStmt *from)
{
NotifyStmt *newnode = makeNode(NotifyStmt);
- COPY_STRING_FIELD(conditionname);
+ COPY_STRING_FIELD(channel);
COPY_STRING_FIELD(payload);
+ COPY_NODE_FIELD(options);
return newnode;
}
@@ -3634,7 +3635,7 @@ _copyListenStmt(const ListenStmt *from)
{
ListenStmt *newnode = makeNode(ListenStmt);
- COPY_STRING_FIELD(conditionname);
+ COPY_STRING_FIELD(channel);
return newnode;
}
@@ -3644,7 +3645,7 @@ _copyUnlistenStmt(const UnlistenStmt *from)
{
UnlistenStmt *newnode = makeNode(UnlistenStmt);
- COPY_STRING_FIELD(conditionname);
+ COPY_STRING_FIELD(channel);
return newnode;
}
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 3cab90e9f8..3db593e63c 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1480,8 +1480,9 @@ _equalRuleStmt(const RuleStmt *a, const RuleStmt *b)
static bool
_equalNotifyStmt(const NotifyStmt *a, const NotifyStmt *b)
{
- COMPARE_STRING_FIELD(conditionname);
+ COMPARE_STRING_FIELD(channel);
COMPARE_STRING_FIELD(payload);
+ COMPARE_NODE_FIELD(options);
return true;
}
@@ -1489,7 +1490,7 @@ _equalNotifyStmt(const NotifyStmt *a, const NotifyStmt *b)
static bool
_equalListenStmt(const ListenStmt *a, const ListenStmt *b)
{
- COMPARE_STRING_FIELD(conditionname);
+ COMPARE_STRING_FIELD(channel);
return true;
}
@@ -1497,7 +1498,7 @@ _equalListenStmt(const ListenStmt *a, const ListenStmt *b)
static bool
_equalUnlistenStmt(const UnlistenStmt *a, const UnlistenStmt *b)
{
- COMPARE_STRING_FIELD(conditionname);
+ COMPARE_STRING_FIELD(channel);
return true;
}
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 69179a07c3..bebdd0a9e6 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2661,8 +2661,9 @@ _outNotifyStmt(StringInfo str, const NotifyStmt *node)
{
WRITE_NODE_TYPE("NOTIFY");
- WRITE_STRING_FIELD(conditionname);
+ WRITE_STRING_FIELD(channel);
WRITE_STRING_FIELD(payload);
+ WRITE_NODE_FIELD(options);
}
static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 4b845b1bb7..dfe64b5fec 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -296,8 +296,9 @@ _readNotifyStmt(void)
{
READ_LOCALS(NotifyStmt);
- READ_STRING_FIELD(conditionname);
+ READ_STRING_FIELD(channel);
READ_STRING_FIELD(payload);
+ READ_NODE_FIELD(options);
READ_DONE();
}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index e23e68fdb3..582c2ef692 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -524,7 +524,12 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <boolean> opt_varying opt_timezone opt_no_inherit
%type <ival> Iconst SignedIconst
-%type <str> Sconst comment_text notify_payload
+%type <str> Sconst comment_text
+%type <str> notify_option_name
+%type <node> notify_option_arg
+%type <defelt> notify_option_elem
+%type <list> notify_option_list
+
%type <str> RoleId opt_boolean_or_string
%type <list> var_list
%type <str> ColId ColLabel var_name type_function_name param_name
@@ -9741,24 +9746,73 @@ opt_instead:
*
*****************************************************************************/
-NotifyStmt: NOTIFY ColId notify_payload
+NotifyStmt:
+ NOTIFY ColId
+ {
+ NotifyStmt *n = makeNode(NotifyStmt);
+ n->channel = $2;
+ n->payload = NULL;
+ n->options = NIL;
+ $$ = (Node *)n;
+ }
+ | NOTIFY ColId ',' Sconst
+ {
+ NotifyStmt *n = makeNode(NotifyStmt);
+ n->channel = $2;
+ n->payload = $4;
+ n->options = NIL;
+ $$ = (Node *)n;
+ }
+ | NOTIFY '(' notify_option_list ')' ColId
+ {
+ NotifyStmt *n = makeNode(NotifyStmt);
+ n->options = $3;
+ n->channel = $5;
+ n->payload = NULL;
+ $$ = (Node *)n;
+ }
+ | NOTIFY '(' notify_option_list ')' ColId ',' Sconst
+ {
+ NotifyStmt *n = makeNode(NotifyStmt);
+ n->options = $3;
+ n->channel = $5;
+ n->payload = $7;
+ $$ = (Node *)n;
+ }
+ ;
+
+notify_option_list:
+ notify_option_elem
{
- NotifyStmt *n = makeNode(NotifyStmt);
- n->conditionname = $2;
- n->payload = $3;
- $$ = (Node *)n;
+ $$ = list_make1($1);
+ }
+ | notify_option_list ',' notify_option_elem
+ {
+ $$ = lappend($1, $3);
}
;
-notify_payload:
- ',' Sconst { $$ = $2; }
- | /*EMPTY*/ { $$ = NULL; }
+notify_option_elem:
+ notify_option_name notify_option_arg
+ {
+ $$ = makeDefElem($1, $2, @1);
+ }
+ ;
+
+notify_option_name:
+ NonReservedWord { $$ = $1; }
+ ;
+
+notify_option_arg:
+ opt_boolean_or_string { $$ = (Node *) makeString($1); }
+ | NumericOnly { $$ = (Node *) $1; }
+ | /* EMPTY */ { $$ = NULL; }
;
ListenStmt: LISTEN ColId
{
ListenStmt *n = makeNode(ListenStmt);
- n->conditionname = $2;
+ n->channel = $2;
$$ = (Node *)n;
}
;
@@ -9767,13 +9821,13 @@ UnlistenStmt:
UNLISTEN ColId
{
UnlistenStmt *n = makeNode(UnlistenStmt);
- n->conditionname = $2;
+ n->channel = $2;
$$ = (Node *)n;
}
| UNLISTEN '*'
{
UnlistenStmt *n = makeNode(UnlistenStmt);
- n->conditionname = NULL;
+ n->channel = NULL;
$$ = (Node *)n;
}
;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 6ec795f1b4..d6f8d09174 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -611,7 +611,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
NotifyStmt *stmt = (NotifyStmt *) parsetree;
PreventCommandDuringRecovery("NOTIFY");
- Async_Notify(stmt->conditionname, stmt->payload);
+ Async_Notify_WithOptions(stmt->channel, stmt->payload, stmt->options);
}
break;
@@ -621,7 +621,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
PreventCommandDuringRecovery("LISTEN");
CheckRestrictedOperation("LISTEN");
- Async_Listen(stmt->conditionname);
+ Async_Listen(stmt->channel);
}
break;
@@ -631,8 +631,8 @@ standard_ProcessUtility(PlannedStmt *pstmt,
/* we allow UNLISTEN during recovery, as it's a noop */
CheckRestrictedOperation("UNLISTEN");
- if (stmt->conditionname)
- Async_Unlisten(stmt->conditionname);
+ if (stmt->channel)
+ Async_Unlisten(stmt->channel);
else
Async_UnlistenAll();
}
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 85055bbb95..1300347387 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -6581,7 +6581,7 @@ get_utility_query_def(Query *query, deparse_context *context)
appendContextKeyword(context, "",
0, PRETTYINDENT_STD, 1);
appendStringInfo(buf, "NOTIFY %s",
- quote_identifier(stmt->conditionname));
+ quote_identifier(stmt->channel));
if (stmt->payload)
{
appendStringInfoString(buf, ", ");
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 562c5408f8..94dbe5c70a 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -7554,6 +7554,10 @@
proname => 'pg_notify', proisstrict => 'f', provolatile => 'v',
proparallel => 'r', prorettype => 'void', proargtypes => 'text text',
prosrc => 'pg_notify' },
+{ oid => '3998', descr => 'send a notification event',
+ proname => 'pg_notify', proisstrict => 'f', provolatile => 'v',
+ proparallel => 'r', prorettype => 'void', proargtypes => 'text text bool',
+ prosrc => 'pg_notify_3args' },
{ oid => '3296',
descr => 'get the fraction of the asynchronous notification queue currently in use',
proname => 'pg_notification_queue_usage', provolatile => 'v',
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index cfea78e039..36b39ff8bd 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -16,6 +16,7 @@
#include <signal.h>
#include "fmgr.h"
+#include "nodes/parsenodes.h"
/*
* The number of SLRU page buffers we use for the notification queue.
@@ -33,7 +34,8 @@ extern void NotifyMyFrontEnd(const char *channel,
int32 srcPid);
/* notify-related SQL statements */
-extern void Async_Notify(const char *channel, const char *payload);
+extern void Async_Notify(const char *channel, const char *payload, bool collapse);
+extern void Async_Notify_WithOptions(const char *channel, const char *payload, List *options);
extern void Async_Listen(const char *channel);
extern void Async_Unlisten(const char *channel);
extern void Async_UnlistenAll(void);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index fe35783359..94f24568e9 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -27,6 +27,7 @@
#include "nodes/primnodes.h"
#include "nodes/value.h"
#include "partitioning/partdefs.h"
+#include "commands/async.h"
typedef enum OverridingKind
@@ -2947,9 +2948,10 @@ typedef struct RuleStmt
*/
typedef struct NotifyStmt
{
- NodeTag type;
- char *conditionname; /* condition name to notify */
- char *payload; /* the payload string, or NULL if none */
+ NodeTag type;
+ char *channel; /* channel name to notify */
+ char *payload; /* the payload string, or NULL if none */
+ List *options; /* List of DefElem nodes */
} NotifyStmt;
/* ----------------------
@@ -2959,7 +2961,7 @@ typedef struct NotifyStmt
typedef struct ListenStmt
{
NodeTag type;
- char *conditionname; /* condition name to listen on */
+ char *channel; /* channel name to listen on */
} ListenStmt;
/* ----------------------
@@ -2969,7 +2971,7 @@ typedef struct ListenStmt
typedef struct UnlistenStmt
{
NodeTag type;
- char *conditionname; /* name to unlisten on, or NULL for all */
+ char *channel; /* channel name to unlisten, or NULL for all */
} UnlistenStmt;
/* ----------------------
diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out
index 19cbe38e63..6d5726127c 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -8,6 +8,18 @@ SELECT pg_notify('notify_async1','sample message1');
(1 row)
+SELECT pg_notify('notify_async1','sample_message1',true);
+ pg_notify
+-----------
+
+(1 row)
+
+SELECT pg_notify('notify_async1','sample_message1',false);
+ pg_notify
+-----------
+
+(1 row)
+
SELECT pg_notify('notify_async1','');
pg_notify
-----------
@@ -29,9 +41,18 @@ SELECT pg_notify('notify_async_channel_name_too_long____________________________
ERROR: channel name too long
--Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
NOTIFY notify_async2;
+NOTIFY (collapse off) notify_async2;
+NOTIFY notify_async2, '';
+NOTIFY (collapse on) notify_async2, '';
+NOTIFY (collapse off) notify_async2, '';
LISTEN notify_async2;
UNLISTEN notify_async2;
UNLISTEN *;
+-- Should fail. Invalid option syntax
+NOTIFY (collapse maybe) notify_async2;
+ERROR: collapse requires a Boolean value
+NOTIFY (wrong opt) notify_async2, 'test';
+ERROR: unrecognized NOTIFY option "wrong"
-- Should return zero while there are no pending notifications.
-- src/test/isolation/specs/async-notify.spec tests for actual usage.
SELECT pg_notification_queue_usage();
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 40f6e01538..29f9681d2a 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -4,6 +4,8 @@
--Should work. Send a valid message via a valid channel name
SELECT pg_notify('notify_async1','sample message1');
+SELECT pg_notify('notify_async1','sample_message1',true);
+SELECT pg_notify('notify_async1','sample_message1',false);
SELECT pg_notify('notify_async1','');
SELECT pg_notify('notify_async1',NULL);
@@ -14,10 +16,18 @@ SELECT pg_notify('notify_async_channel_name_too_long____________________________
--Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
NOTIFY notify_async2;
+NOTIFY (collapse off) notify_async2;
+NOTIFY notify_async2, '';
+NOTIFY (collapse on) notify_async2, '';
+NOTIFY (collapse off) notify_async2, '';
LISTEN notify_async2;
UNLISTEN notify_async2;
UNLISTEN *;
+-- Should fail. Invalid option syntax
+NOTIFY (collapse maybe) notify_async2;
+NOTIFY (wrong opt) notify_async2, 'test';
+
-- Should return zero while there are no pending notifications.
-- src/test/isolation/specs/async-notify.spec tests for actual usage.
SELECT pg_notification_queue_usage();
=?UTF-8?Q?Filip_Rembia=C5=82kowski?= <filip.rembialkowski@gmail.com> writes:
Still no hash table fallback is implemented, so this is *not* a
performance improvement. Only a little more flexibility.
I think that we'd probably be better off fixing the root performance issue
than adding semantic complexity to bypass it. Especially since, if you
don't de-duplicate, that's going to cost you when it comes time to
actually send the notifications, receive them, forward them to clients,
and process them in the clients.
Admittedly, if the app *knows* that it's generating non-duplicate events,
maybe it'd be all right to skip checking that. But if we can make the
check cheap, I'd just as soon keep it.
Accordingly, I looked into making a hash table when there are more than
a small number of notifications pending, and attached is a lightly-tested
version of that. This seems to be more or less similar speed to the
existing code for up to 100 or so distinct notifies, but it soon pulls
away above that.
A point that needs discussion is that this patch, unlike the existing
code, *does* de-duplicate fully: any events generated by a subtransaction
that duplicate events already emitted by a parent will get removed when
the subxact is merged to its parent. I did this partly because we have
to expend O(N) work to merge N subtransaction notifies in any case,
now that we have to make new hashtable entries in the parent xact;
so the old excuse that subxact-end processing is really cheap no
longer applies. Also because the Assert(!found) assertions in the
attached hash coding fall over if we cheat on this. If we really want
to maintain exactly the old semantics here, we could relax the hashtable
code to just ignore duplicate entries. But, per the argument above,
de-duplication is a good thing so I'm inclined to keep it like this.
I also noticed that as things stand, it costs us two or three pallocs to
construct a Notification event. It wouldn't be terribly hard to reduce
that to one palloc, and maybe it'd be worthwhile if we're thinking that
transactions with many many notifies are a case worth optimizing.
But I didn't do that here; it seems like a separable patch.
I also thought for awhile about not having the hashtable as an auxiliary
data structure, but making it the main data structure. We could preserve
the required notification ordering information by threading the live
hashtable entries into an slist, say. However, this would greatly
increase the overhead for transactions with just one or a few distinct
NOTIFY events, since we'd have to set up the hashtable at the first one.
I think that's a common enough case that we shouldn't de-optimize it.
A smaller objection is that such a data structure would absolutely commit
us to de-duplication semantics, whereas the list plus separate hashtable
can cope with not de-duping if someone persuades us that's sane.
Thoughts?
regards, tom lane
Attachments:
detect-dup-notifies-by-hashing-1.patchtext/x-diff; charset=us-ascii; name=detect-dup-notifies-by-hashing-1.patchDownload
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 6e9c580..c21daa5 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -135,6 +135,7 @@
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
+#include "utils/hashutils.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
@@ -323,14 +324,25 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */
/*
* State for outbound notifies consists of a list of all channels+payloads
- * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
- * until and unless the transaction commits. pendingNotifies is NIL if no
- * NOTIFYs have been done in the current transaction.
+ * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
+ * until and unless the transaction commits. pendingNotifies is NULL if no
+ * NOTIFYs have been done in the current (sub) transaction.
+ *
+ * We discard duplicate notify events issued in the same transaction.
+ * Hence, in addition to the list proper (which we need to track the order
+ * of the events, since we guarantee to deliver them in order), we build a
+ * hash table which we can probe to detect duplicates. Since building the
+ * hash table is somewhat expensive, we do so only once we have at least
+ * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
+ * before that we just scan the events linearly.
*
* The list is kept in CurTransactionContext. In subtransactions, each
* subtransaction has its own list in its own CurTransactionContext, but
- * successful subtransactions attach their lists to their parent's list.
- * Failed subtransactions simply discard their lists.
+ * successful subtransactions add their entries to their parent's list.
+ * Failed subtransactions simply discard their lists. Since these lists
+ * are independent, there may be notify events in a subtransaction's list
+ * that duplicate events in some ancestor (sub) transaction; we get rid of
+ * the dups when merging the subtransaction's list into its parent's.
*
* Note: the action and notify lists do not interact within a transaction.
* In particular, if a transaction does NOTIFY and then LISTEN on the same
@@ -343,7 +355,20 @@ typedef struct Notification
char *payload; /* payload string (can be empty) */
} Notification;
-static List *pendingNotifies = NIL; /* list of Notifications */
+typedef struct NotificationList
+{
+ List *events; /* list of Notification structs */
+ HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
+} NotificationList;
+
+#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
+
+typedef struct NotificationHash
+{
+ Notification *event; /* => the actual Notification struct */
+} NotificationHash;
+
+static NotificationList *pendingNotifies = NULL; /* current list, if any */
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
@@ -393,6 +418,9 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(void);
static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
+static void AddEventToPendingNotifies(Notification *n);
+static uint32 notification_hash(const void *key, Size keysize);
+static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);
/*
@@ -586,11 +614,19 @@ Async_Notify(const char *channel, const char *payload)
else
n->payload = "";
- /*
- * We want to preserve the order so we need to append every notification.
- * See comments at AsyncExistsPendingNotify().
- */
- pendingNotifies = lappend(pendingNotifies, n);
+ if (pendingNotifies == NULL)
+ {
+ /* First notify event in current (sub)xact */
+ pendingNotifies = (NotificationList *) palloc(sizeof(NotificationList));
+ pendingNotifies->events = list_make1(n);
+ /* We certainly don't need a hashtable yet */
+ pendingNotifies->hashtab = NULL;
+ }
+ else
+ {
+ /* Append more events to existing list */
+ AddEventToPendingNotifies(n);
+ }
MemoryContextSwitchTo(oldcontext);
}
@@ -761,7 +797,7 @@ PreCommit_Notify(void)
{
ListCell *p;
- if (pendingActions == NIL && pendingNotifies == NIL)
+ if (!pendingActions && !pendingNotifies)
return; /* no relevant statements in this xact */
if (Trace_notify)
@@ -821,7 +857,7 @@ PreCommit_Notify(void)
/* Now push the notifications into the queue */
backendHasSentNotifications = true;
- nextNotify = list_head(pendingNotifies);
+ nextNotify = list_head(pendingNotifies->events);
while (nextNotify != NULL)
{
/*
@@ -1294,7 +1330,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
* database OID in order to fill the page. So every page is always used up to
* the last byte which simplifies reading the page later.
*
- * We are passed the list cell (in pendingNotifies) containing the next
+ * We are passed the list cell (in pendingNotifies->events) containing the next
* notification to write and return the first still-unwritten cell back.
* Eventually we will return NULL indicating all is done.
*
@@ -1345,7 +1381,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
if (offset + qe.length <= QUEUE_PAGESIZE)
{
/* OK, so advance nextNotify past this item */
- nextNotify = lnext(pendingNotifies, nextNotify);
+ nextNotify = lnext(pendingNotifies->events, nextNotify);
}
else
{
@@ -1607,7 +1643,7 @@ AtSubStart_Notify(void)
Assert(list_length(upperPendingNotifies) ==
GetCurrentTransactionNestLevel() - 1);
- pendingNotifies = NIL;
+ pendingNotifies = NULL;
MemoryContextSwitchTo(old_cxt);
}
@@ -1621,7 +1657,7 @@ void
AtSubCommit_Notify(void)
{
List *parentPendingActions;
- List *parentPendingNotifies;
+ NotificationList *parentPendingNotifies;
parentPendingActions = linitial_node(List, upperPendingActions);
upperPendingActions = list_delete_first(upperPendingActions);
@@ -1634,16 +1670,41 @@ AtSubCommit_Notify(void)
*/
pendingActions = list_concat(parentPendingActions, pendingActions);
- parentPendingNotifies = linitial_node(List, upperPendingNotifies);
+ parentPendingNotifies = (NotificationList *) linitial(upperPendingNotifies);
upperPendingNotifies = list_delete_first(upperPendingNotifies);
Assert(list_length(upperPendingNotifies) ==
GetCurrentTransactionNestLevel() - 2);
- /*
- * We could try to eliminate duplicates here, but it seems not worthwhile.
- */
- pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
+ if (pendingNotifies == NULL)
+ {
+ /* easy, no notify events happened in current subxact */
+ pendingNotifies = parentPendingNotifies;
+ }
+ else if (parentPendingNotifies == NULL)
+ {
+ /* easy, subxact's list becomes parent's */
+ }
+ else
+ {
+ /*
+ * Formerly, we didn't need to eliminate duplicates here, but now we
+ * must, else we fall foul of "Assert(!found)", either here or during
+ * a later attempt to build the parent-level hashtable.
+ */
+ NotificationList *childPendingNotifies = pendingNotifies;
+ ListCell *l;
+
+ pendingNotifies = parentPendingNotifies;
+ /* Insert all the subxact's events into parent, except for dups */
+ foreach(l, childPendingNotifies->events)
+ {
+ Notification *childn = (Notification *) lfirst(l);
+
+ if (!AsyncExistsPendingNotify(childn->channel, childn->payload))
+ AddEventToPendingNotifies(childn);
+ }
+ }
}
/*
@@ -1672,7 +1733,7 @@ AtSubAbort_Notify(void)
while (list_length(upperPendingNotifies) > my_level - 2)
{
- pendingNotifies = linitial_node(List, upperPendingNotifies);
+ pendingNotifies = (NotificationList *) linitial(upperPendingNotifies);
upperPendingNotifies = list_delete_first(upperPendingNotifies);
}
}
@@ -2102,50 +2163,149 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
static bool
AsyncExistsPendingNotify(const char *channel, const char *payload)
{
- ListCell *p;
- Notification *n;
-
- if (pendingNotifies == NIL)
+ if (pendingNotifies == NULL)
return false;
if (payload == NULL)
payload = "";
- /*----------
- * We need to append new elements to the end of the list in order to keep
- * the order. However, on the other hand we'd like to check the list
- * backwards in order to make duplicate-elimination a tad faster when the
- * same condition is signaled many times in a row. So as a compromise we
- * check the tail element first which we can access directly. If this
- * doesn't match, we check the whole list.
- *
- * As we are not checking our parents' lists, we can still get duplicates
- * in combination with subtransactions, like in:
- *
- * begin;
- * notify foo '1';
- * savepoint foo;
- * notify foo '1';
- * commit;
- *----------
- */
- n = (Notification *) llast(pendingNotifies);
- if (strcmp(n->channel, channel) == 0 &&
- strcmp(n->payload, payload) == 0)
- return true;
-
- foreach(p, pendingNotifies)
+ if (pendingNotifies->hashtab != NULL)
{
- n = (Notification *) lfirst(p);
-
- if (strcmp(n->channel, channel) == 0 &&
- strcmp(n->payload, payload) == 0)
+ /* Use the hash table to probe for a match */
+ Notification n;
+ Notification *k;
+
+ /* set up a dummy Notification struct */
+ n.channel = unconstify(char *, channel);
+ n.payload = unconstify(char *, payload);
+ k = &n;
+ /* ... and probe */
+ if (hash_search(pendingNotifies->hashtab,
+ &k,
+ HASH_FIND,
+ NULL))
return true;
}
+ else
+ {
+ /* Must scan the event list */
+ ListCell *l;
+
+ foreach(l, pendingNotifies->events)
+ {
+ Notification *n = (Notification *) lfirst(l);
+
+ if (strcmp(n->channel, channel) == 0 &&
+ strcmp(n->payload, payload) == 0)
+ return true;
+ }
+ }
return false;
}
+/*
+ * Add a notification event to a pre-existing pendingNotifies list.
+ *
+ * Because pendingNotifies->events is already nonempty, this works
+ * correctly no matter what CurrentMemoryContext is.
+ */
+static void
+AddEventToPendingNotifies(Notification *n)
+{
+ Assert(pendingNotifies->events != NIL);
+
+ /* Create the hash table if it's time to */
+ if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
+ pendingNotifies->hashtab == NULL)
+ {
+ HASHCTL hash_ctl;
+ ListCell *l;
+
+ /* Create the hash table */
+ MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = sizeof(Notification *);
+ hash_ctl.entrysize = sizeof(NotificationHash);
+ hash_ctl.hash = notification_hash;
+ hash_ctl.match = notification_match;
+ hash_ctl.hcxt = CurTransactionContext;
+ pendingNotifies->hashtab =
+ hash_create("Pending Notifies",
+ 256L,
+ &hash_ctl,
+ HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
+
+ /* Insert all the already-existing events */
+ foreach(l, pendingNotifies->events)
+ {
+ Notification *oldn = (Notification *) lfirst(l);
+ NotificationHash *hentry;
+ bool found;
+
+ hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab,
+ &oldn,
+ HASH_ENTER,
+ &found);
+ Assert(!found);
+ hentry->event = oldn;
+ }
+ }
+
+ /* Add new event to the list, in order */
+ pendingNotifies->events = lappend(pendingNotifies->events, n);
+
+ /* Add event to the hash table if needed */
+ if (pendingNotifies->hashtab != NULL)
+ {
+ NotificationHash *hentry;
+ bool found;
+
+ hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab,
+ &n,
+ HASH_ENTER,
+ &found);
+ Assert(!found);
+ hentry->event = n;
+ }
+}
+
+/*
+ * notification_hash: hash function for notification hash table
+ *
+ * The hash "keys" are pointers to Notification structs.
+ */
+static uint32
+notification_hash(const void *key, Size keysize)
+{
+ const Notification *k = *(const Notification *const *) key;
+ uint32 hashc;
+ uint32 hashp;
+
+ Assert(keysize == sizeof(Notification *));
+ /* We just XOR the hashes for the two strings */
+ hashc = DatumGetUInt32(hash_any((const unsigned char *) k->channel,
+ (int) strlen((const char *) k->channel)));
+ hashp = DatumGetUInt32(hash_any((const unsigned char *) k->payload,
+ (int) strlen((const char *) k->payload)));
+ return hashc ^ hashp;
+}
+
+/*
+ * notification_match: match function to use with notification_hash
+ */
+static int
+notification_match(const void *key1, const void *key2, Size keysize)
+{
+ const Notification *k1 = *(const Notification *const *) key1;
+ const Notification *k2 = *(const Notification *const *) key2;
+
+ Assert(keysize == sizeof(Notification *));
+ if (strcmp(k1->channel, k2->channel) == 0 &&
+ strcmp(k1->payload, k2->payload) == 0)
+ return 0; /* equal */
+ return 1; /* not equal */
+}
+
/* Clear the pendingActions and pendingNotifies lists. */
static void
ClearPendingActionsAndNotifies(void)
@@ -2158,5 +2318,5 @@ ClearPendingActionsAndNotifies(void)
* pointers.
*/
pendingActions = NIL;
- pendingNotifies = NIL;
+ pendingNotifies = NULL;
}
I wrote:
I think that we'd probably be better off fixing the root performance issue
than adding semantic complexity to bypass it. ...
Accordingly, I looked into making a hash table when there are more than
a small number of notifications pending, and attached is a lightly-tested
version of that. This seems to be more or less similar speed to the
existing code for up to 100 or so distinct notifies, but it soon pulls
away above that.
I noticed that the cfbot was unhappy with this, because it (intentionally)
changes the results of the async-notify isolation tests I added awhile
ago. So here's an updated version that adjusts that test, and also
changes the NOTIFY documentation to remove the old weasel wording about
whether we de-dup or not.
I also noticed that as things stand, it costs us two or three pallocs to
construct a Notification event. It wouldn't be terribly hard to reduce
that to one palloc, and maybe it'd be worthwhile if we're thinking that
transactions with many many notifies are a case worth optimizing.
But I didn't do that here; it seems like a separable patch.
I also did that, attached as the second patch below. This way ends up
requiring us to palloc the Notification event and then pfree it again,
if it turns out to be a dup. Despite that, it's faster than the first
patch alone, and also faster than HEAD in every case I tried. Not
much faster, if there's not a lot of dups, but as far as I can find
there isn't any case where it loses compared to HEAD. Even with
subtransactions, where in principle the time to merge subtransaction
event lists into the parent transaction ought to cost us. You can't
get that to matter unless the subtransaction had a lot of distinct
events, and then HEAD hits its O(N^2) behavior inside the subxact.
So I can't really see any reason not to commit these.
That leaves the question of whether we want to continue pursuing
the proposed feature for user control of de-duping. I'd tend
to vote against, because it seems like semantic complexity we
don't need. While the idea sounds straightforward, I think it
isn't so much when you start to think hard about how notifies
issued with and without "collapse" ought to interact.
regards, tom lane
Attachments:
0001-detect-dup-notifies-by-hashing-2.patchtext/x-diff; charset=us-ascii; name=0001-detect-dup-notifies-by-hashing-2.patchDownload
diff --git a/doc/src/sgml/ref/notify.sgml b/doc/src/sgml/ref/notify.sgml
index e0e125a..d7dcbea 100644
--- a/doc/src/sgml/ref/notify.sgml
+++ b/doc/src/sgml/ref/notify.sgml
@@ -94,9 +94,9 @@ NOTIFY <replaceable class="parameter">channel</replaceable> [ , <replaceable cla
</para>
<para>
- If the same channel name is signaled multiple times from the same
- transaction with identical payload strings, the
- database server can decide to deliver a single notification only.
+ If the same channel name is signaled multiple times with identical
+ payload strings within the same transaction, only one instance of the
+ notification event is delivered to listeners.
On the other hand, notifications with distinct payload strings will
always be delivered as distinct notifications. Similarly, notifications from
different transactions will never get folded into one notification.
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 6e9c580..3f5f054 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -135,6 +135,7 @@
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
+#include "utils/hashutils.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
@@ -323,14 +324,25 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */
/*
* State for outbound notifies consists of a list of all channels+payloads
- * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
- * until and unless the transaction commits. pendingNotifies is NIL if no
- * NOTIFYs have been done in the current transaction.
+ * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
+ * until and unless the transaction commits. pendingNotifies is NULL if no
+ * NOTIFYs have been done in the current (sub) transaction.
+ *
+ * We discard duplicate notify events issued in the same transaction.
+ * Hence, in addition to the list proper (which we need to track the order
+ * of the events, since we guarantee to deliver them in order), we build a
+ * hash table which we can probe to detect duplicates. Since building the
+ * hash table is somewhat expensive, we do so only once we have at least
+ * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
+ * before that we just scan the events linearly.
*
* The list is kept in CurTransactionContext. In subtransactions, each
* subtransaction has its own list in its own CurTransactionContext, but
- * successful subtransactions attach their lists to their parent's list.
- * Failed subtransactions simply discard their lists.
+ * successful subtransactions add their entries to their parent's list.
+ * Failed subtransactions simply discard their lists. Since these lists
+ * are independent, there may be notify events in a subtransaction's list
+ * that duplicate events in some ancestor (sub) transaction; we get rid of
+ * the dups when merging the subtransaction's list into its parent's.
*
* Note: the action and notify lists do not interact within a transaction.
* In particular, if a transaction does NOTIFY and then LISTEN on the same
@@ -343,7 +355,20 @@ typedef struct Notification
char *payload; /* payload string (can be empty) */
} Notification;
-static List *pendingNotifies = NIL; /* list of Notifications */
+typedef struct NotificationList
+{
+ List *events; /* list of Notification structs */
+ HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
+} NotificationList;
+
+#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
+
+typedef struct NotificationHash
+{
+ Notification *event; /* => the actual Notification struct */
+} NotificationHash;
+
+static NotificationList *pendingNotifies = NULL; /* current list, if any */
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
@@ -393,6 +418,9 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(void);
static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
+static void AddEventToPendingNotifies(Notification *n);
+static uint32 notification_hash(const void *key, Size keysize);
+static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);
/*
@@ -586,11 +614,19 @@ Async_Notify(const char *channel, const char *payload)
else
n->payload = "";
- /*
- * We want to preserve the order so we need to append every notification.
- * See comments at AsyncExistsPendingNotify().
- */
- pendingNotifies = lappend(pendingNotifies, n);
+ if (pendingNotifies == NULL)
+ {
+ /* First notify event in current (sub)xact */
+ pendingNotifies = (NotificationList *) palloc(sizeof(NotificationList));
+ pendingNotifies->events = list_make1(n);
+ /* We certainly don't need a hashtable yet */
+ pendingNotifies->hashtab = NULL;
+ }
+ else
+ {
+ /* Append more events to existing list */
+ AddEventToPendingNotifies(n);
+ }
MemoryContextSwitchTo(oldcontext);
}
@@ -761,7 +797,7 @@ PreCommit_Notify(void)
{
ListCell *p;
- if (pendingActions == NIL && pendingNotifies == NIL)
+ if (!pendingActions && !pendingNotifies)
return; /* no relevant statements in this xact */
if (Trace_notify)
@@ -821,7 +857,7 @@ PreCommit_Notify(void)
/* Now push the notifications into the queue */
backendHasSentNotifications = true;
- nextNotify = list_head(pendingNotifies);
+ nextNotify = list_head(pendingNotifies->events);
while (nextNotify != NULL)
{
/*
@@ -1294,7 +1330,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
* database OID in order to fill the page. So every page is always used up to
* the last byte which simplifies reading the page later.
*
- * We are passed the list cell (in pendingNotifies) containing the next
+ * We are passed the list cell (in pendingNotifies->events) containing the next
* notification to write and return the first still-unwritten cell back.
* Eventually we will return NULL indicating all is done.
*
@@ -1345,7 +1381,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
if (offset + qe.length <= QUEUE_PAGESIZE)
{
/* OK, so advance nextNotify past this item */
- nextNotify = lnext(pendingNotifies, nextNotify);
+ nextNotify = lnext(pendingNotifies->events, nextNotify);
}
else
{
@@ -1607,7 +1643,7 @@ AtSubStart_Notify(void)
Assert(list_length(upperPendingNotifies) ==
GetCurrentTransactionNestLevel() - 1);
- pendingNotifies = NIL;
+ pendingNotifies = NULL;
MemoryContextSwitchTo(old_cxt);
}
@@ -1621,7 +1657,7 @@ void
AtSubCommit_Notify(void)
{
List *parentPendingActions;
- List *parentPendingNotifies;
+ NotificationList *parentPendingNotifies;
parentPendingActions = linitial_node(List, upperPendingActions);
upperPendingActions = list_delete_first(upperPendingActions);
@@ -1634,16 +1670,41 @@ AtSubCommit_Notify(void)
*/
pendingActions = list_concat(parentPendingActions, pendingActions);
- parentPendingNotifies = linitial_node(List, upperPendingNotifies);
+ parentPendingNotifies = (NotificationList *) linitial(upperPendingNotifies);
upperPendingNotifies = list_delete_first(upperPendingNotifies);
Assert(list_length(upperPendingNotifies) ==
GetCurrentTransactionNestLevel() - 2);
- /*
- * We could try to eliminate duplicates here, but it seems not worthwhile.
- */
- pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
+ if (pendingNotifies == NULL)
+ {
+ /* easy, no notify events happened in current subxact */
+ pendingNotifies = parentPendingNotifies;
+ }
+ else if (parentPendingNotifies == NULL)
+ {
+ /* easy, subxact's list becomes parent's */
+ }
+ else
+ {
+ /*
+ * Formerly, we didn't bother to eliminate duplicates here, but now we
+ * must, else we fall foul of "Assert(!found)", either here or during
+ * a later attempt to build the parent-level hashtable.
+ */
+ NotificationList *childPendingNotifies = pendingNotifies;
+ ListCell *l;
+
+ pendingNotifies = parentPendingNotifies;
+ /* Insert all the subxact's events into parent, except for dups */
+ foreach(l, childPendingNotifies->events)
+ {
+ Notification *childn = (Notification *) lfirst(l);
+
+ if (!AsyncExistsPendingNotify(childn->channel, childn->payload))
+ AddEventToPendingNotifies(childn);
+ }
+ }
}
/*
@@ -1672,7 +1733,7 @@ AtSubAbort_Notify(void)
while (list_length(upperPendingNotifies) > my_level - 2)
{
- pendingNotifies = linitial_node(List, upperPendingNotifies);
+ pendingNotifies = (NotificationList *) linitial(upperPendingNotifies);
upperPendingNotifies = list_delete_first(upperPendingNotifies);
}
}
@@ -2102,50 +2163,149 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
static bool
AsyncExistsPendingNotify(const char *channel, const char *payload)
{
- ListCell *p;
- Notification *n;
-
- if (pendingNotifies == NIL)
+ if (pendingNotifies == NULL)
return false;
if (payload == NULL)
payload = "";
- /*----------
- * We need to append new elements to the end of the list in order to keep
- * the order. However, on the other hand we'd like to check the list
- * backwards in order to make duplicate-elimination a tad faster when the
- * same condition is signaled many times in a row. So as a compromise we
- * check the tail element first which we can access directly. If this
- * doesn't match, we check the whole list.
- *
- * As we are not checking our parents' lists, we can still get duplicates
- * in combination with subtransactions, like in:
- *
- * begin;
- * notify foo '1';
- * savepoint foo;
- * notify foo '1';
- * commit;
- *----------
- */
- n = (Notification *) llast(pendingNotifies);
- if (strcmp(n->channel, channel) == 0 &&
- strcmp(n->payload, payload) == 0)
- return true;
-
- foreach(p, pendingNotifies)
+ if (pendingNotifies->hashtab != NULL)
{
- n = (Notification *) lfirst(p);
-
- if (strcmp(n->channel, channel) == 0 &&
- strcmp(n->payload, payload) == 0)
+ /* Use the hash table to probe for a match */
+ Notification n;
+ Notification *k;
+
+ /* set up a dummy Notification struct */
+ n.channel = unconstify(char *, channel);
+ n.payload = unconstify(char *, payload);
+ k = &n;
+ /* ... and probe */
+ if (hash_search(pendingNotifies->hashtab,
+ &k,
+ HASH_FIND,
+ NULL))
return true;
}
+ else
+ {
+ /* Must scan the event list */
+ ListCell *l;
+
+ foreach(l, pendingNotifies->events)
+ {
+ Notification *n = (Notification *) lfirst(l);
+
+ if (strcmp(n->channel, channel) == 0 &&
+ strcmp(n->payload, payload) == 0)
+ return true;
+ }
+ }
return false;
}
+/*
+ * Add a notification event to a pre-existing pendingNotifies list.
+ *
+ * Because pendingNotifies->events is already nonempty, this works
+ * correctly no matter what CurrentMemoryContext is.
+ */
+static void
+AddEventToPendingNotifies(Notification *n)
+{
+ Assert(pendingNotifies->events != NIL);
+
+ /* Create the hash table if it's time to */
+ if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
+ pendingNotifies->hashtab == NULL)
+ {
+ HASHCTL hash_ctl;
+ ListCell *l;
+
+ /* Create the hash table */
+ MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = sizeof(Notification *);
+ hash_ctl.entrysize = sizeof(NotificationHash);
+ hash_ctl.hash = notification_hash;
+ hash_ctl.match = notification_match;
+ hash_ctl.hcxt = CurTransactionContext;
+ pendingNotifies->hashtab =
+ hash_create("Pending Notifies",
+ 256L,
+ &hash_ctl,
+ HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
+
+ /* Insert all the already-existing events */
+ foreach(l, pendingNotifies->events)
+ {
+ Notification *oldn = (Notification *) lfirst(l);
+ NotificationHash *hentry;
+ bool found;
+
+ hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab,
+ &oldn,
+ HASH_ENTER,
+ &found);
+ Assert(!found);
+ hentry->event = oldn;
+ }
+ }
+
+ /* Add new event to the list, in order */
+ pendingNotifies->events = lappend(pendingNotifies->events, n);
+
+ /* Add event to the hash table if needed */
+ if (pendingNotifies->hashtab != NULL)
+ {
+ NotificationHash *hentry;
+ bool found;
+
+ hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab,
+ &n,
+ HASH_ENTER,
+ &found);
+ Assert(!found);
+ hentry->event = n;
+ }
+}
+
+/*
+ * notification_hash: hash function for notification hash table
+ *
+ * The hash "keys" are pointers to Notification structs.
+ */
+static uint32
+notification_hash(const void *key, Size keysize)
+{
+ const Notification *k = *(const Notification *const *) key;
+ uint32 hashc;
+ uint32 hashp;
+
+ Assert(keysize == sizeof(Notification *));
+ /* We just XOR the hashes for the two strings */
+ hashc = DatumGetUInt32(hash_any((const unsigned char *) k->channel,
+ (int) strlen((const char *) k->channel)));
+ hashp = DatumGetUInt32(hash_any((const unsigned char *) k->payload,
+ (int) strlen((const char *) k->payload)));
+ return hashc ^ hashp;
+}
+
+/*
+ * notification_match: match function to use with notification_hash
+ */
+static int
+notification_match(const void *key1, const void *key2, Size keysize)
+{
+ const Notification *k1 = *(const Notification *const *) key1;
+ const Notification *k2 = *(const Notification *const *) key2;
+
+ Assert(keysize == sizeof(Notification *));
+ if (strcmp(k1->channel, k2->channel) == 0 &&
+ strcmp(k1->payload, k2->payload) == 0)
+ return 0; /* equal */
+ return 1; /* not equal */
+}
+
/* Clear the pendingActions and pendingNotifies lists. */
static void
ClearPendingActionsAndNotifies(void)
@@ -2158,5 +2318,5 @@ ClearPendingActionsAndNotifies(void)
* pointers.
*/
pendingActions = NIL;
- pendingNotifies = NIL;
+ pendingNotifies = NULL;
}
diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out
index 60ba506..7ad26b7 100644
--- a/src/test/isolation/expected/async-notify.out
+++ b/src/test/isolation/expected/async-notify.out
@@ -42,8 +42,6 @@ step notifys1:
notifier: NOTIFY "c1" with payload "payload" from notifier
notifier: NOTIFY "c2" with payload "payload" from notifier
-notifier: NOTIFY "c1" with payload "payload" from notifier
-notifier: NOTIFY "c2" with payload "payload" from notifier
notifier: NOTIFY "c1" with payload "payloads" from notifier
notifier: NOTIFY "c2" with payload "payloads" from notifier
0002-better-notification-structure-1.patchtext/x-diff; charset=us-ascii; name=0002-better-notification-structure-1.patchDownload
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 3f5f054..6cb2d44 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -351,8 +351,10 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */
*/
typedef struct Notification
{
- char *channel; /* channel name */
- char *payload; /* payload string (can be empty) */
+ uint16 channel_len; /* length of channel-name string */
+ uint16 payload_len; /* length of payload string */
+ /* null-terminated channel name, then null-terminated payload follow */
+ char data[FLEXIBLE_ARRAY_MEMBER];
} Notification;
typedef struct NotificationList
@@ -417,7 +419,7 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
Snapshot snapshot);
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(void);
-static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
+static bool AsyncExistsPendingNotify(Notification *n);
static void AddEventToPendingNotifies(Notification *n);
static uint32 notification_hash(const void *key, Size keysize);
static int notification_match(const void *key1, const void *key2, Size keysize);
@@ -569,6 +571,8 @@ pg_notify(PG_FUNCTION_ARGS)
void
Async_Notify(const char *channel, const char *payload)
{
+ size_t channel_len;
+ size_t payload_len;
Notification *n;
MemoryContext oldcontext;
@@ -578,41 +582,53 @@ Async_Notify(const char *channel, const char *payload)
if (Trace_notify)
elog(DEBUG1, "Async_Notify(%s)", channel);
+ channel_len = channel ? strlen(channel) : 0;
+ payload_len = payload ? strlen(payload) : 0;
+
/* a channel name must be specified */
- if (!channel || !strlen(channel))
+ if (channel_len == 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("channel name cannot be empty")));
- if (strlen(channel) >= NAMEDATALEN)
+ /* enforce length limits */
+ if (channel_len >= NAMEDATALEN)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("channel name too long")));
- if (payload)
- {
- if (strlen(payload) >= NOTIFY_PAYLOAD_MAX_LENGTH)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("payload string too long")));
- }
-
- /* no point in making duplicate entries in the list ... */
- if (AsyncExistsPendingNotify(channel, payload))
- return;
+ if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("payload string too long")));
/*
+ * We must construct the Notification entry, even if we end up not using
+ * it, in order to compare it cheaply to existing list entries.
+ *
* The notification list needs to live until end of transaction, so store
* it in the transaction context.
*/
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
- n = (Notification *) palloc(sizeof(Notification));
- n->channel = pstrdup(channel);
+ n = (Notification *) palloc(offsetof(Notification, data) +
+ channel_len + payload_len + 2);
+ n->channel_len = channel_len;
+ n->payload_len = payload_len;
+ strcpy(n->data, channel);
if (payload)
- n->payload = pstrdup(payload);
+ strcpy(n->data + channel_len + 1, payload);
else
- n->payload = "";
+ n->data[channel_len + 1] = '\0';
+
+ /* Now check for duplicates */
+ if (AsyncExistsPendingNotify(n))
+ {
+ /* It's a dup, so forget it */
+ pfree(n);
+ MemoryContextSwitchTo(oldcontext);
+ return;
+ }
if (pendingNotifies == NULL)
{
@@ -1303,8 +1319,8 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
static void
asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
{
- size_t channellen = strlen(n->channel);
- size_t payloadlen = strlen(n->payload);
+ size_t channellen = n->channel_len;
+ size_t payloadlen = n->payload_len;
int entryLength;
Assert(channellen < NAMEDATALEN);
@@ -1317,8 +1333,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
qe->dboid = MyDatabaseId;
qe->xid = GetCurrentTransactionId();
qe->srcPid = MyProcPid;
- memcpy(qe->data, n->channel, channellen + 1);
- memcpy(qe->data + channellen + 1, n->payload, payloadlen + 1);
+ memcpy(qe->data, n->data, channellen + payloadlen + 2);
}
/*
@@ -1701,7 +1716,7 @@ AtSubCommit_Notify(void)
{
Notification *childn = (Notification *) lfirst(l);
- if (!AsyncExistsPendingNotify(childn->channel, childn->payload))
+ if (!AsyncExistsPendingNotify(childn))
AddEventToPendingNotifies(childn);
}
}
@@ -2159,29 +2174,18 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
}
-/* Does pendingNotifies include the given channel/payload? */
+/* Does pendingNotifies include a match for the given event? */
static bool
-AsyncExistsPendingNotify(const char *channel, const char *payload)
+AsyncExistsPendingNotify(Notification *n)
{
if (pendingNotifies == NULL)
return false;
- if (payload == NULL)
- payload = "";
-
if (pendingNotifies->hashtab != NULL)
{
/* Use the hash table to probe for a match */
- Notification n;
- Notification *k;
-
- /* set up a dummy Notification struct */
- n.channel = unconstify(char *, channel);
- n.payload = unconstify(char *, payload);
- k = &n;
- /* ... and probe */
if (hash_search(pendingNotifies->hashtab,
- &k,
+ &n,
HASH_FIND,
NULL))
return true;
@@ -2193,10 +2197,12 @@ AsyncExistsPendingNotify(const char *channel, const char *payload)
foreach(l, pendingNotifies->events)
{
- Notification *n = (Notification *) lfirst(l);
+ Notification *oldn = (Notification *) lfirst(l);
- if (strcmp(n->channel, channel) == 0 &&
- strcmp(n->payload, payload) == 0)
+ if (n->channel_len == oldn->channel_len &&
+ n->payload_len == oldn->payload_len &&
+ memcmp(n->data, oldn->data,
+ n->channel_len + n->payload_len + 2) == 0)
return true;
}
}
@@ -2278,16 +2284,11 @@ static uint32
notification_hash(const void *key, Size keysize)
{
const Notification *k = *(const Notification *const *) key;
- uint32 hashc;
- uint32 hashp;
Assert(keysize == sizeof(Notification *));
- /* We just XOR the hashes for the two strings */
- hashc = DatumGetUInt32(hash_any((const unsigned char *) k->channel,
- (int) strlen((const char *) k->channel)));
- hashp = DatumGetUInt32(hash_any((const unsigned char *) k->payload,
- (int) strlen((const char *) k->payload)));
- return hashc ^ hashp;
+ /* We don't bother to include the payload's trailing null in the hash */
+ return DatumGetUInt32(hash_any((const unsigned char *) k->data,
+ k->channel_len + k->payload_len + 1));
}
/*
@@ -2300,8 +2301,10 @@ notification_match(const void *key1, const void *key2, Size keysize)
const Notification *k2 = *(const Notification *const *) key2;
Assert(keysize == sizeof(Notification *));
- if (strcmp(k1->channel, k2->channel) == 0 &&
- strcmp(k1->payload, k2->payload) == 0)
+ if (k1->channel_len == k2->channel_len &&
+ k1->payload_len == k2->payload_len &&
+ memcmp(k1->data, k2->data,
+ k1->channel_len + k1->payload_len + 2) == 0)
return 0; /* equal */
return 1; /* not equal */
}