diff --git a/doc/src/sgml/ref/listen.sgml b/doc/src/sgml/ref/listen.sgml
index 9cd53b0..b6fae56 100644
--- a/doc/src/sgml/ref/listen.sgml
+++ b/doc/src/sgml/ref/listen.sgml
@@ -22,6 +22,7 @@ PostgreSQL documentation
LISTEN channel
+LISTEN SIMILAR TO 'pattern'
@@ -31,9 +32,11 @@ LISTEN channel
LISTEN registers the current session as a
listener on the notification channel named channel.
+ class="PARAMETER">channel. Session can listen
+ to multiple notification channels simultaneously by using
+ LISTEN SIMILAR TO pattern.
If the current session is already registered as a listener for
- this notification channel, nothing is done.
+ this notification channel or pattern, nothing is done.
@@ -84,6 +87,16 @@ LISTEN channel
+
+
+ pattern
+
+
+ SIMILAR TO>
+ pattern (any string constant). Used to match notification channel names.
+
+
+
@@ -113,6 +126,9 @@ LISTEN channel
LISTEN virtual;
NOTIFY virtual;
Asynchronous notification "virtual" received from server process with PID 8448.
+LISTEN SIMILAR TO 'foo%';
+NOTIFY foobar;
+Asynchronous notification "foobar" received from server process with PID 8448.
diff --git a/doc/src/sgml/ref/notify.sgml b/doc/src/sgml/ref/notify.sgml
index 3389aa0..c660754 100644
--- a/doc/src/sgml/ref/notify.sgml
+++ b/doc/src/sgml/ref/notify.sgml
@@ -33,7 +33,10 @@ NOTIFY channel [ , payload> string to each client application that
has previously executed
LISTEN channel>
- for the specified channel name in the current database.
+ for the specified channel name in the current database. Client application
+ can also listen to multiple channel names using pattern matching by
+ LISTEN SIMILAR TO 'pattern>'
+ command.
Notifications are visible to all users.
@@ -210,6 +213,10 @@ Asynchronous notification "virtual" with payload "This is the payload" received
LISTEN foo;
SELECT pg_notify('fo' || 'o', 'pay' || 'load');
Asynchronous notification "foo" with payload "payload" received from server process with PID 14728.
+
+LISTEN SIMILAR TO 'foo%';
+NOTIFY foobar;
+Asynchronous notification "foobar" received from server process with PID 8448.
diff --git a/doc/src/sgml/ref/unlisten.sgml b/doc/src/sgml/ref/unlisten.sgml
index f7c3c47..46908f9 100644
--- a/doc/src/sgml/ref/unlisten.sgml
+++ b/doc/src/sgml/ref/unlisten.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
-UNLISTEN { channel | * }
+UNLISTEN { channel | 'channel_pattern' | * }
@@ -34,9 +34,10 @@ UNLISTEN { channel | * }
UNLISTEN cancels any existing registration of
the current PostgreSQL session as a
listener on the notification channel named channel. The special wildcard
- * cancels all listener registrations for the
- current session.
+ class="PARAMETER">channel. Existing pattern based listeners
+ are unregistered using 'channel_pattern'
+ string constant. The special wildcard * cancels all listener
+ registrations for the current session.
@@ -59,6 +60,15 @@ UNLISTEN { channel | * }
+
+
+ channel_pattern
+
+
+ Pattern or channel name to unregister (any string constant).
+
+
+
*
@@ -88,6 +98,12 @@ UNLISTEN { channel | * }
A transaction that has executed UNLISTEN cannot be
prepared for two-phase commit.
+
+
+ UNLISTEN 'channel_pattern'
+ only unregisters a single pattern that is equal to a previously
+ registered pattern. See below for an example.
+
@@ -100,6 +116,9 @@ UNLISTEN { channel | * }
LISTEN virtual;
NOTIFY virtual;
Asynchronous notification "virtual" received from server process with PID 8448.
+LISTEN SIMILAR TO 'foo%';
+NOTIFY foobar;
+Asynchronous notification "foobar" received from server process with PID 8448.
@@ -109,9 +128,22 @@ Asynchronous notification "virtual" received from server process with PID 8448.
UNLISTEN virtual;
+UNLISTEN 'foo%';
NOTIFY virtual;
+NOTIFY foobar;
-- no NOTIFY event is received
+
+
+ UNLISTEN 'channel_pattern'
+ cannot be used in a pattern matching way:
+
+
+LISTEN SIMILAR TO 'foobar%';
+UNLISTEN 'foo%'; -- this has no effect
+NOTIFY foobarbaz;
+Asynchronous notification "foobarbaz" received from server process with PID 8448.
+
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index bacc08e..16af303 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -138,7 +138,10 @@
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
-
+#include "utils/elog.h"
+#include "regex/regex.h"
+#include "catalog/pg_collation.h"
+#include "miscadmin.h"
/*
* Maximum size of a NOTIFY payload, including terminating NULL. This
@@ -286,9 +289,19 @@ static SlruCtlData AsyncCtlData;
*/
#define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
+ /*
+ * Currently listened channel consisting of compiled RE
+ * used for pattern listeners and the pattern send by the user.
+ */
+typedef struct ListenChannel
+{
+ regex_t *compiledRegex;
+ char userPattern[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
+} ListenChannel;
+
/*
* listenChannels identifies the channels we are actually listening to
- * (ie, have committed a LISTEN on). It is a simple list of channel names,
+ * (ie, have committed a LISTEN on). It is a list of ListenChannel,
* allocated in TopMemoryContext.
*/
static List *listenChannels = NIL; /* list of C strings */
@@ -313,7 +326,9 @@ typedef enum
typedef struct
{
ListenActionKind action;
- char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
+ bool actionApplied;
+ regex_t *compiledRegex;
+ char userPattern[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
} ListenAction;
static List *pendingActions = NIL; /* list of ListenAction */
@@ -369,12 +384,12 @@ bool Trace_notify = false;
/* local function prototypes */
static bool asyncQueuePagePrecedes(int p, int q);
-static void queue_listen(ListenActionKind action, const char *channel);
+static void queue_listen(ListenActionKind action, const char *pattern, bool isSimilarToPattern);
static void Async_UnlistenOnExit(int code, Datum arg);
static void Exec_ListenPreCommit(void);
-static void Exec_ListenCommit(const char *channel);
-static void Exec_UnlistenCommit(const char *channel);
-static void Exec_UnlistenAllCommit(void);
+static bool Exec_ListenCommit(const char *pattern, regex_t *compiledRegex);
+static bool Exec_UnlistenCommit(const char *pattern);
+static bool Exec_UnlistenAllCommit(void);
static bool IsListeningOn(const char *channel);
static void asyncQueueUnregister(void);
static bool asyncQueueIsFull(void);
@@ -594,6 +609,36 @@ Async_Notify(const char *channel, const char *payload)
}
/*
+* compile_regex
+* Compiles RE pattern into a compiled RE.
+*
+* Returns result code from pg_regcomp.
+*/
+static int
+compile_regex(const char *pattern, regex_t *compiled_regex)
+{
+ pg_wchar *wcharpattern;
+ int resregcomp;
+ int lenwchar;
+ int lenpattern = strlen(pattern);
+
+ wcharpattern = (pg_wchar *)palloc((lenpattern + 1) * sizeof(pg_wchar));
+ lenwchar = pg_mb2wchar_with_len(pattern,
+ wcharpattern,
+ lenpattern);
+
+ resregcomp = pg_regcomp(compiled_regex,
+ wcharpattern,
+ lenwchar,
+ REG_ADVANCED,
+ DEFAULT_COLLATION_OID);
+
+ pfree(wcharpattern);
+
+ return resregcomp;
+}
+
+/*
* queue_listen
* Common code for listen, unlisten, unlisten all commands.
*
@@ -602,10 +647,15 @@ Async_Notify(const char *channel, const char *payload)
* commit.
*/
static void
-queue_listen(ListenActionKind action, const char *channel)
+queue_listen(ListenActionKind action, const char *pattern, bool isSimilarToPattern)
{
MemoryContext oldcontext;
ListenAction *actrec;
+ regex_t compreg;
+ regex_t *pcompreg;
+ int resregcomp;
+ char errormsg[100];
+ Datum datum;
/*
* Unlike Async_Notify, we don't try to collapse out duplicates. It would
@@ -615,11 +665,46 @@ queue_listen(ListenActionKind action, const char *channel)
*/
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
+ if (isSimilarToPattern)
+ {
+ /* convert to regex pattern */
+ datum = DirectFunctionCall2(similar_escape,
+ CStringGetTextDatum(pattern),
+ CStringGetTextDatum("\\"));
+
+ /*
+ * Regex pattern is now compiled to ensure any errors are captured at this point.
+ * Compiled regex is copied to top memory context when we reach transaction commit.
+ * If compiled RE was not applied as a listener then it is freed at transaction commit.
+ */
+ resregcomp = compile_regex(TextDatumGetCString(datum), &compreg);
+
+ if (resregcomp != REG_OKAY)
+ {
+ MemoryContextSwitchTo(oldcontext);
+
+ CHECK_FOR_INTERRUPTS();
+ pg_regerror(resregcomp, &compreg, errormsg, sizeof(errormsg));
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_REGULAR_EXPRESSION),
+ errmsg("invalid regular expression: %s", errormsg)));
+ }
+
+ pcompreg = palloc(sizeof(regex_t));
+ memcpy(pcompreg, &compreg, sizeof(regex_t));
+ }
+ else
+ {
+ pcompreg = NULL;
+ }
+
/* space for terminating null is included in sizeof(ListenAction) */
- actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
- strlen(channel) + 1);
+ actrec = (ListenAction *) palloc(offsetof(ListenAction, userPattern) +
+ strlen(pattern) + 1);
actrec->action = action;
- strcpy(actrec->channel, channel);
+ actrec->actionApplied = false;
+ actrec->compiledRegex = pcompreg;
+ strcpy(actrec->userPattern, pattern);
pendingActions = lappend(pendingActions, actrec);
@@ -632,12 +717,12 @@ queue_listen(ListenActionKind action, const char *channel)
* This is executed by the SQL listen command.
*/
void
-Async_Listen(const char *channel)
+Async_Listen(const char *pattern, bool isSimilarToPattern)
{
if (Trace_notify)
- elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
+ elog(DEBUG1, "Async_Listen(%s,%d)", pattern, MyProcPid);
- queue_listen(LISTEN_LISTEN, channel);
+ queue_listen(LISTEN_LISTEN, pattern, isSimilarToPattern);
}
/*
@@ -646,16 +731,16 @@ Async_Listen(const char *channel)
* This is executed by the SQL unlisten command.
*/
void
-Async_Unlisten(const char *channel)
+Async_Unlisten(const char *pattern)
{
if (Trace_notify)
- elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
+ elog(DEBUG1, "Async_Unlisten(%s,%d)", pattern, MyProcPid);
/* If we couldn't possibly be listening, no need to queue anything */
if (pendingActions == NIL && !unlistenExitRegistered)
return;
- queue_listen(LISTEN_UNLISTEN, channel);
+ queue_listen(LISTEN_UNLISTEN, pattern, false);
}
/*
@@ -673,7 +758,7 @@ Async_UnlistenAll(void)
if (pendingActions == NIL && !unlistenExitRegistered)
return;
- queue_listen(LISTEN_UNLISTEN_ALL, "");
+ queue_listen(LISTEN_UNLISTEN_ALL, "", false);
}
/*
@@ -714,10 +799,10 @@ pg_listening_channels(PG_FUNCTION_ARGS)
while (*lcp != NULL)
{
- char *channel = (char *) lfirst(*lcp);
+ ListenChannel *channel = (ListenChannel *) lfirst(*lcp);
*lcp = lnext(*lcp);
- SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
+ SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel->userPattern));
}
SRF_RETURN_DONE(funcctx);
@@ -890,13 +975,13 @@ AtCommit_Notify(void)
switch (actrec->action)
{
case LISTEN_LISTEN:
- Exec_ListenCommit(actrec->channel);
+ actrec->actionApplied = Exec_ListenCommit(actrec->userPattern, actrec->compiledRegex);
break;
case LISTEN_UNLISTEN:
- Exec_UnlistenCommit(actrec->channel);
+ actrec->actionApplied = Exec_UnlistenCommit(actrec->userPattern);
break;
case LISTEN_UNLISTEN_ALL:
- Exec_UnlistenAllCommit();
+ actrec->actionApplied = Exec_UnlistenAllCommit();
break;
}
}
@@ -1000,14 +1085,23 @@ Exec_ListenPreCommit(void)
*
* Add the channel to the list of channels we are listening on.
*/
-static void
-Exec_ListenCommit(const char *channel)
+static bool
+Exec_ListenCommit(const char *pattern, regex_t *compiledRegex)
{
+ ListCell *p;
MemoryContext oldcontext;
+ ListenChannel *lchan;
+ regex_t *copiedcompreg;
- /* Do nothing if we are already listening on this channel */
- if (IsListeningOn(channel))
- return;
+ /* Do nothing if we are already using this pattern for listening */
+
+ foreach(p, listenChannels)
+ {
+ ListenChannel *lchan = (ListenChannel *)lfirst(p);
+
+ if (strcmp(lchan->userPattern, pattern) == 0)
+ return false;
+ }
/*
* Add the new channel name to listenChannels.
@@ -1017,9 +1111,31 @@ Exec_ListenCommit(const char *channel)
* doesn't seem worth trying to guard against that, but maybe improve this
* later.
*/
+
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
- listenChannels = lappend(listenChannels, pstrdup(channel));
+
+ if (compiledRegex != NULL)
+ {
+ /* copy the compiled RE to top memory context */
+
+ copiedcompreg = (regex_t *)palloc(sizeof(regex_t));
+ memcpy(copiedcompreg, compiledRegex, sizeof(regex_t));
+ }
+ else
+ {
+ copiedcompreg = NULL;
+ }
+
+ lchan = (ListenChannel *)palloc(offsetof(ListenChannel, userPattern) +
+ strlen(pattern) + 1);
+ lchan->compiledRegex = copiedcompreg;
+ strcpy(lchan->userPattern, pattern);
+
+ listenChannels = lappend(listenChannels, lchan);
+
MemoryContextSwitchTo(oldcontext);
+
+ return true;
}
/*
@@ -1027,24 +1143,35 @@ Exec_ListenCommit(const char *channel)
*
* Remove the specified channel name from listenChannels.
*/
-static void
-Exec_UnlistenCommit(const char *channel)
+static bool
+Exec_UnlistenCommit(const char *pattern)
{
ListCell *q;
ListCell *prev;
+ bool found;
if (Trace_notify)
- elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
+ elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", pattern, MyProcPid);
+ found = false;
prev = NULL;
foreach(q, listenChannels)
{
- char *lchan = (char *) lfirst(q);
+ ListenChannel *lchan = (ListenChannel *) lfirst(q);
- if (strcmp(lchan, channel) == 0)
+ if (strcmp(lchan->userPattern, pattern) == 0)
{
+ if (lchan->compiledRegex != NULL)
+ {
+ pg_regfree(lchan->compiledRegex);
+ pfree(lchan->compiledRegex);
+ }
+
listenChannels = list_delete_cell(listenChannels, q, prev);
+
pfree(lchan);
+
+ found = true;
break;
}
prev = q;
@@ -1054,6 +1181,8 @@ Exec_UnlistenCommit(const char *channel)
* We do not complain about unlistening something not being listened;
* should we?
*/
+
+ return found;
}
/*
@@ -1061,14 +1190,34 @@ Exec_UnlistenCommit(const char *channel)
*
* Unlisten on all channels for this backend.
*/
-static void
+static bool
Exec_UnlistenAllCommit(void)
{
+ ListCell *p;
+ bool is_empty;
+
if (Trace_notify)
elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
+ is_empty = true;
+
+ foreach(p, listenChannels)
+ {
+ ListenChannel *lchan = (ListenChannel *)lfirst(p);
+
+ if (lchan->compiledRegex != NULL)
+ {
+ pg_regfree(lchan->compiledRegex);
+ pfree(lchan->compiledRegex);
+ }
+
+ is_empty = false;
+ }
+
list_free_deep(listenChannels);
listenChannels = NIL;
+
+ return is_empty;
}
/*
@@ -1163,16 +1312,66 @@ ProcessCompletedNotifies(void)
static bool
IsListeningOn(const char *channel)
{
- ListCell *p;
+ ListCell *p;
+ pg_wchar *wcharchannel;
+ int lenwchar;
+ int resregexec;
+ char errormsg[100];
+ bool matches;
+
+ wcharchannel = NULL;
+ matches = false;
foreach(p, listenChannels)
{
- char *lchan = (char *) lfirst(p);
+ ListenChannel *lchan = (ListenChannel *) lfirst(p);
- if (strcmp(lchan, channel) == 0)
- return true;
+ if (lchan->compiledRegex == NULL)
+ {
+ if (strcmp(lchan->userPattern, channel) == 0)
+ {
+ matches = true;
+ break;
+ }
+ }
+ else
+ {
+ if (wcharchannel == NULL)
+ {
+ /* Convert channel string to wide characters */
+ wcharchannel = (pg_wchar *)palloc((strlen(channel) + 1) * sizeof(pg_wchar));
+ lenwchar = pg_mb2wchar_with_len(channel, wcharchannel, strlen(channel));
+ }
+
+ /* Check RE match */
+ resregexec = pg_regexec(lchan->compiledRegex, wcharchannel, lenwchar, 0, NULL, 0, NULL, 0);
+
+ if (resregexec != REG_OKAY && resregexec != REG_NOMATCH)
+ {
+ pfree(wcharchannel);
+ wcharchannel = NULL;
+
+ CHECK_FOR_INTERRUPTS();
+ pg_regerror(resregexec, lchan->compiledRegex, errormsg, sizeof(errormsg));
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_REGULAR_EXPRESSION),
+ errmsg("regular expression failed: %s", errormsg)));
+ }
+
+ if (resregexec == REG_OKAY)
+ {
+ matches = true;
+ break;
+ }
+ }
}
- return false;
+
+ if (wcharchannel != NULL)
+ {
+ pfree(wcharchannel);
+ }
+
+ return matches;
}
/*
@@ -2149,6 +2348,20 @@ AsyncExistsPendingNotify(const char *channel, const char *payload)
static void
ClearPendingActionsAndNotifies(void)
{
+ ListCell *p;
+
+ /* free compiled REs that were not added as new listeners */
+ foreach(p, pendingActions)
+ {
+ ListenAction *actrec = (ListenAction *)lfirst(p);
+
+ if (!actrec->actionApplied && actrec->compiledRegex != NULL)
+ {
+ pg_regfree(actrec->compiledRegex);
+ pfree(actrec->compiledRegex);
+ }
+ }
+
/*
* We used to have to explicitly deallocate the list members and nodes,
* because they were malloc'd. Now, since we know they are palloc'd in
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 7204169..ebc2e16 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3547,7 +3547,8 @@ _copyListenStmt(const ListenStmt *from)
{
ListenStmt *newnode = makeNode(ListenStmt);
- COPY_STRING_FIELD(conditionname);
+ COPY_STRING_FIELD(pattern);
+ COPY_SCALAR_FIELD(isSimilarToPattern);
return newnode;
}
@@ -3557,7 +3558,7 @@ _copyUnlistenStmt(const UnlistenStmt *from)
{
UnlistenStmt *newnode = makeNode(UnlistenStmt);
- COPY_STRING_FIELD(conditionname);
+ COPY_STRING_FIELD(pattern);
return newnode;
}
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 8d92c03..a3ac36b 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1482,7 +1482,8 @@ _equalNotifyStmt(const NotifyStmt *a, const NotifyStmt *b)
static bool
_equalListenStmt(const ListenStmt *a, const ListenStmt *b)
{
- COMPARE_STRING_FIELD(conditionname);
+ COMPARE_STRING_FIELD(pattern);
+ COMPARE_SCALAR_FIELD(isSimilarToPattern);
return true;
}
@@ -1490,7 +1491,7 @@ _equalListenStmt(const ListenStmt *a, const ListenStmt *b)
static bool
_equalUnlistenStmt(const UnlistenStmt *a, const UnlistenStmt *b)
{
- COMPARE_STRING_FIELD(conditionname);
+ COMPARE_STRING_FIELD(pattern);
return true;
}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 7d0de99..3bb1605 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9420,7 +9420,15 @@ notify_payload:
ListenStmt: LISTEN ColId
{
ListenStmt *n = makeNode(ListenStmt);
- n->conditionname = $2;
+ n->pattern = $2;
+ n->isSimilarToPattern = false;
+ $$ = (Node *)n;
+ }
+ | LISTEN SIMILAR TO Sconst
+ {
+ ListenStmt *n = makeNode(ListenStmt);
+ n->pattern = $4;
+ n->isSimilarToPattern = true;
$$ = (Node *)n;
}
;
@@ -9429,13 +9437,19 @@ UnlistenStmt:
UNLISTEN ColId
{
UnlistenStmt *n = makeNode(UnlistenStmt);
- n->conditionname = $2;
+ n->pattern = $2;
+ $$ = (Node *)n;
+ }
+ | UNLISTEN Sconst
+ {
+ UnlistenStmt *n = makeNode(UnlistenStmt);
+ n->pattern = $2;
$$ = (Node *)n;
}
| UNLISTEN '*'
{
UnlistenStmt *n = makeNode(UnlistenStmt);
- n->conditionname = NULL;
+ n->pattern = NULL;
$$ = (Node *)n;
}
;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 775477c..d7b4fb6 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -630,7 +630,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
PreventCommandDuringRecovery("LISTEN");
CheckRestrictedOperation("LISTEN");
- Async_Listen(stmt->conditionname);
+ Async_Listen(stmt->pattern, stmt->isSimilarToPattern);
}
break;
@@ -640,8 +640,8 @@ standard_ProcessUtility(PlannedStmt *pstmt,
PreventCommandDuringRecovery("UNLISTEN");
CheckRestrictedOperation("UNLISTEN");
- if (stmt->conditionname)
- Async_Unlisten(stmt->conditionname);
+ if (stmt->pattern)
+ Async_Unlisten(stmt->pattern);
else
Async_UnlistenAll();
}
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index 939711d..f86a482 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -34,8 +34,8 @@ extern void NotifyMyFrontEnd(const char *channel,
/* notify-related SQL statements */
extern void Async_Notify(const char *channel, const char *payload);
-extern void Async_Listen(const char *channel);
-extern void Async_Unlisten(const char *channel);
+extern void Async_Listen(const char *pattern, bool isSimilarToPattern);
+extern void Async_Unlisten(const char *pattern);
extern void Async_UnlistenAll(void);
/* perform (or cancel) outbound notify processing at transaction commit */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 5f2a4a7..3f9fe9d 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2889,7 +2889,8 @@ typedef struct NotifyStmt
typedef struct ListenStmt
{
NodeTag type;
- char *conditionname; /* condition name to listen on */
+ bool isSimilarToPattern;
+ char *pattern; /* condition pattern to listen on */
} ListenStmt;
/* ----------------------
@@ -2899,7 +2900,7 @@ typedef struct ListenStmt
typedef struct UnlistenStmt
{
NodeTag type;
- char *conditionname; /* name to unlisten on, or NULL for all */
+ char *pattern; /* condition pattern to unlisten on, or NULL for all */
} UnlistenStmt;
/* ----------------------
diff --git a/src/test/isolation/expected/async-notify-2.out b/src/test/isolation/expected/async-notify-2.out
new file mode 100644
index 0000000..081cae3
--- /dev/null
+++ b/src/test/isolation/expected/async-notify-2.out
@@ -0,0 +1,118 @@
+Parsed test spec with 2 sessions
+
+starting permutation: listen_normal notify consume
+step listen_normal: LISTEN test;
+step notify:
+ SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s;
+ SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s;
+
+count
+
+5
+count
+
+5
+step consume: BEGIN; END;
+ASYNC NOTIFY of 'test' with payload '1' received
+ASYNC NOTIFY of 'test' with payload '2' received
+ASYNC NOTIFY of 'test' with payload '3' received
+ASYNC NOTIFY of 'test' with payload '4' received
+ASYNC NOTIFY of 'test' with payload '5' received
+
+starting permutation: listen_pattern_1 notify consume
+step listen_pattern_1: LISTEN SIMILAR TO 'te%';
+step notify:
+ SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s;
+ SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s;
+
+count
+
+5
+count
+
+5
+step consume: BEGIN; END;
+ASYNC NOTIFY of 'test' with payload '1' received
+ASYNC NOTIFY of 'test' with payload '2' received
+ASYNC NOTIFY of 'test' with payload '3' received
+ASYNC NOTIFY of 'test' with payload '4' received
+ASYNC NOTIFY of 'test' with payload '5' received
+ASYNC NOTIFY of 'test_2' with payload '1' received
+ASYNC NOTIFY of 'test_2' with payload '2' received
+ASYNC NOTIFY of 'test_2' with payload '3' received
+ASYNC NOTIFY of 'test_2' with payload '4' received
+ASYNC NOTIFY of 'test_2' with payload '5' received
+
+starting permutation: listen_pattern_2 notify consume
+step listen_pattern_2: LISTEN SIMILAR TO 'test';
+step notify:
+ SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s;
+ SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s;
+
+count
+
+5
+count
+
+5
+step consume: BEGIN; END;
+ASYNC NOTIFY of 'test' with payload '1' received
+ASYNC NOTIFY of 'test' with payload '2' received
+ASYNC NOTIFY of 'test' with payload '3' received
+ASYNC NOTIFY of 'test' with payload '4' received
+ASYNC NOTIFY of 'test' with payload '5' received
+
+starting permutation: listen_pattern_3 notify consume
+step listen_pattern_3: LISTEN SIMILAR TO 'te';
+step notify:
+ SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s;
+ SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s;
+
+count
+
+5
+count
+
+5
+step consume: BEGIN; END;
+
+starting permutation: listen_pattern_invalid notify consume
+step listen_pattern_invalid: LISTEN SIMILAR TO '*';
+ERROR: invalid regular expression: quantifier operand invalid
+step notify:
+ SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s;
+ SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s;
+
+count
+
+5
+count
+
+5
+step consume: BEGIN; END;
+
+starting permutation: listen_normal listen_pattern_1 unlisten_1 notify consume
+step listen_normal: LISTEN test;
+step listen_pattern_1: LISTEN SIMILAR TO 'te%';
+step unlisten_1: UNLISTEN 't%';
+step notify:
+ SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s;
+ SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s;
+
+count
+
+5
+count
+
+5
+step consume: BEGIN; END;
+ASYNC NOTIFY of 'test' with payload '1' received
+ASYNC NOTIFY of 'test' with payload '2' received
+ASYNC NOTIFY of 'test' with payload '3' received
+ASYNC NOTIFY of 'test' with payload '4' received
+ASYNC NOTIFY of 'test' with payload '5' received
+ASYNC NOTIFY of 'test_2' with payload '1' received
+ASYNC NOTIFY of 'test_2' with payload '2' received
+ASYNC NOTIFY of 'test_2' with payload '3' received
+ASYNC NOTIFY of 'test_2' with payload '4' received
+ASYNC NOTIFY of 'test_2' with payload '5' received
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index 32c965b..a25cdeb 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -60,5 +60,6 @@ test: alter-table-3
test: create-trigger
test: sequence-ddl
test: async-notify
+test: async-notify-2
test: vacuum-reltuples
test: timeouts
diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c
index ba8082c..b1eb17d 100644
--- a/src/test/isolation/isolationtester.c
+++ b/src/test/isolation/isolationtester.c
@@ -46,7 +46,8 @@ static bool try_complete_step(Step *step, int flags);
static int step_qsort_cmp(const void *a, const void *b);
static int step_bsearch_cmp(const void *a, const void *b);
-static void printResultSet(PGresult *res);
+static void printResultSet(PGresult *res, PGconn *conn);
+static void printAsyncNotify(PGconn *conn);
/* close all connections and exit */
static void
@@ -487,7 +488,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
res = PQexec(conns[0], testspec->setupsqls[i]);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
- printResultSet(res);
+ printResultSet(res, conns[i + 1]);
}
else if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
@@ -505,7 +506,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
res = PQexec(conns[i + 1], testspec->sessions[i]->setupsql);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
- printResultSet(res);
+ printResultSet(res, conns[i + 1]);
}
else if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
@@ -640,7 +641,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
res = PQexec(conns[i + 1], testspec->sessions[i]->teardownsql);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
- printResultSet(res);
+ printResultSet(res, conns[i + 1]);
}
else if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
@@ -659,7 +660,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
res = PQexec(conns[0], testspec->teardownsql);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
- printResultSet(res);
+ printResultSet(res, conns[0]);
}
else if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
@@ -821,9 +822,10 @@ try_complete_step(Step *step, int flags)
switch (PQresultStatus(res))
{
case PGRES_COMMAND_OK:
+ printAsyncNotify(conn);
break;
case PGRES_TUPLES_OK:
- printResultSet(res);
+ printResultSet(res, conn);
break;
case PGRES_FATAL_ERROR:
if (step->errormsg != NULL)
@@ -860,7 +862,7 @@ try_complete_step(Step *step, int flags)
}
static void
-printResultSet(PGresult *res)
+printResultSet(PGresult *res, PGconn *conn)
{
int nFields;
int i,
@@ -879,4 +881,24 @@ printResultSet(PGresult *res)
printf("%-15s", PQgetvalue(res, i, j));
printf("\n");
}
+
+ printAsyncNotify(conn);
+}
+
+static void
+printAsyncNotify(PGconn *conn)
+{
+ PGnotify *notify;
+
+ while ((notify = PQnotifies(conn)) != NULL)
+ {
+ if (notify->extra[0])
+ printf("ASYNC NOTIFY of '%s' with payload '%s' received\n",
+ notify->relname, notify->extra);
+ else
+ printf("ASYNC NOTIFY of '%s' received\n",
+ notify->relname);
+
+ PQfreemem(notify);
+ }
}
diff --git a/src/test/isolation/specs/async-notify-2.spec b/src/test/isolation/specs/async-notify-2.spec
new file mode 100644
index 0000000..dd1d38f
--- /dev/null
+++ b/src/test/isolation/specs/async-notify-2.spec
@@ -0,0 +1,31 @@
+# Verify that messages are consumed from the notify queue.
+
+session "listener"
+step "listen_normal" { LISTEN test; }
+step "listen_pattern_1" { LISTEN SIMILAR TO 'te%'; }
+step "listen_pattern_2" { LISTEN SIMILAR TO 'test'; }
+step "listen_pattern_3" { LISTEN SIMILAR TO 'te'; }
+step "listen_pattern_invalid" { LISTEN SIMILAR TO '*'; }
+step "unlisten_1" { UNLISTEN 't%'; }
+step "consume" { BEGIN; END; }
+teardown { UNLISTEN *; }
+
+session "notifier"
+step "notify"
+{
+ SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s;
+ SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s;
+}
+
+# Should print first notify channel
+permutation "listen_normal" "notify" "consume"
+# Should print both notify channels
+permutation "listen_pattern_1" "notify" "consume"
+# Should print first notify channel
+permutation "listen_pattern_2" "notify" "consume"
+# Should not print either notify channels
+permutation "listen_pattern_3" "notify" "consume"
+# Should fail to invalid RE pattern
+permutation "listen_pattern_invalid" "notify" "consume"
+# Test that UNLISTEN with a pattern does not work as a RE matcher
+permutation "listen_normal" "listen_pattern_1" "unlisten_1" "notify" "consume"
\ No newline at end of file
diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out
index 19cbe38..963b1d0 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -27,11 +27,54 @@ SELECT pg_notify(NULL,'sample message1');
ERROR: channel name cannot be empty
SELECT pg_notify('notify_async_channel_name_too_long______________________________','sample_message1');
ERROR: channel name too long
---Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
+-- Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
+-- src/test/isolation/specs/async-notify-2.spec tests for actual usage.
NOTIFY notify_async2;
LISTEN notify_async2;
+LISTEN SIMILAR TO 'notify_%';
UNLISTEN notify_async2;
+UNLISTEN 'notify_%';
+UNLISTEN 'notify_(%';
UNLISTEN *;
+-- Should fail. Invalid LISTEN command
+LISTEN *;
+ERROR: syntax error at or near "*"
+LINE 1: LISTEN *;
+ ^
+LISTEN notify_%;
+ERROR: syntax error at or near "%"
+LINE 1: LISTEN notify_%;
+ ^
+LISTEN SIMILAR TO 'notify_(%';
+ERROR: invalid regular expression: parentheses () not balanced
+LISTEN SIMILAR TO '*';
+ERROR: invalid regular expression: quantifier operand invalid
+-- Should contain two listeners
+LISTEN notify_async2;
+LISTEN SIMILAR TO 'notify_async2';
+LISTEN SIMILAR TO 'notify_%';
+SELECT pg_listening_channels();
+ pg_listening_channels
+-----------------------
+ notify_async2
+ notify_%
+(2 rows)
+
+-- Should contain one listener
+UNLISTEN 'notify_%';
+SELECT pg_listening_channels();
+ pg_listening_channels
+-----------------------
+ notify_async2
+(1 row)
+
+-- Should not contain listeners
+UNLISTEN *;
+SELECT pg_listening_channels();
+ pg_listening_channels
+-----------------------
+(0 rows)
+
-- Should return zero while there are no pending notifications.
-- src/test/isolation/specs/async-notify.spec tests for actual usage.
SELECT pg_notification_queue_usage();
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 40f6e01..69d8f98 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -12,12 +12,36 @@ SELECT pg_notify('','sample message1');
SELECT pg_notify(NULL,'sample message1');
SELECT pg_notify('notify_async_channel_name_too_long______________________________','sample_message1');
---Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
+-- Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
+-- src/test/isolation/specs/async-notify-2.spec tests for actual usage.
NOTIFY notify_async2;
LISTEN notify_async2;
+LISTEN SIMILAR TO 'notify_%';
UNLISTEN notify_async2;
+UNLISTEN 'notify_%';
+UNLISTEN 'notify_(%';
UNLISTEN *;
+-- Should fail. Invalid LISTEN command
+LISTEN *;
+LISTEN notify_%;
+LISTEN SIMILAR TO 'notify_(%';
+LISTEN SIMILAR TO '*';
+
+-- Should contain two listeners
+LISTEN notify_async2;
+LISTEN SIMILAR TO 'notify_async2';
+LISTEN SIMILAR TO 'notify_%';
+SELECT pg_listening_channels();
+
+-- Should contain one listener
+UNLISTEN 'notify_%';
+SELECT pg_listening_channels();
+
+-- Should not contain listeners
+UNLISTEN *;
+SELECT pg_listening_channels();
+
-- Should return zero while there are no pending notifications.
-- src/test/isolation/specs/async-notify.spec tests for actual usage.
SELECT pg_notification_queue_usage();