From e75751ccc1cc5c201320b809d3d99f517dda5074 Mon Sep 17 00:00:00 2001 From: Matheus Alcantara Date: Mon, 8 Jun 2026 14:54:34 -0300 Subject: [PATCH v0] Add pattern matching support for LISTEN/NOTIFY This adds glob-style pattern matching for LISTEN channel names, allowing clients to listen on multiple channels with a single LISTEN command using wildcards. The supported wildcards are * (matches zero or more characters), ? (matches exactly one character), and \ (escapes the next character to match literal * or ?). Pattern channels are stored in a backend-local list rather than the global hash table, since they cannot be looked up by exact name. A new hasPatterns flag in QueueBackendStatus tracks whether a backend is listening on any patterns. When signaling backends, those with hasPatterns set are always woken for notifications in their database so they can check if any notifications match their patterns locally using the MatchPattern function. --- src/backend/commands/async.c | 249 ++++++++++++++++++- src/test/isolation/expected/async-notify.out | 89 ++++++- src/test/isolation/specs/async-notify.spec | 53 ++++ src/test/regress/expected/async.out | 21 ++ src/test/regress/sql/async.sql | 27 ++ 5 files changed, 436 insertions(+), 3 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index eee8bc29f38..0823db944db 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -292,6 +292,7 @@ typedef struct QueueBackendStatus QueuePosition pos; /* backend has read queue up to here */ bool wakeupPending; /* signal sent to backend, not yet processed */ bool isAdvancing; /* backend is advancing its position */ + bool hasPatterns; /* backend is listening on pattern channels */ } QueueBackendStatus; /* @@ -365,6 +366,7 @@ const ShmemCallbacks AsyncShmemCallbacks = { #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending) #define QUEUE_BACKEND_IS_ADVANCING(i) (asyncQueueControl->backend[i].isAdvancing) +#define QUEUE_BACKEND_HAS_PATTERNS(i) (asyncQueueControl->backend[i].hasPatterns) /* * The SLRU buffer area through which we access the notification queue @@ -423,7 +425,8 @@ static HTAB *localChannelTable = NULL; /* We test this condition to detect that we're not listening at all */ #define LocalChannelTableIsEmpty() \ - (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0) + ((localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0) && \ + localPatternList == NIL) /* * State for pending LISTEN/UNLISTEN actions consists of an ordered list of @@ -474,6 +477,7 @@ typedef enum typedef struct PendingListenEntry { char channel[NAMEDATALEN]; /* hash key */ + bool is_pattern; /* channel contains wildcards? */ PendingListenAction action; /* which action should we perform? */ } PendingListenEntry; @@ -557,6 +561,13 @@ static bool unlistenExitRegistered = false; /* True if we're currently registered as a listener in asyncQueueControl */ static bool amRegisteredListener = false; +/* + * List of pattern channel names. These are stored separately because they + * cannot be matched via hash lookup. The list contains palloc'd strings in + * TopMemoryContext. + */ +static List *localPatternList = NIL; + /* * Queue head positions for direct advancement. * These are captured during PreCommit_Notify while holding the heavyweight @@ -602,6 +613,9 @@ static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, int idx); static void ApplyPendingListenActions(bool isCommit); static void CleanupListenersOnExit(void); +static bool IsPattern(const char *channel); +static bool MatchPattern(const char *channel, const char *pattern); +static bool IsListeningOnPattern(const char *channel); static bool IsListeningOn(const char *channel); static void asyncQueueUnregister(void); static bool asyncQueueIsFull(void); @@ -843,6 +857,7 @@ AsyncShmemInit(void *arg) SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); QUEUE_BACKEND_WAKEUP_PENDING(i) = false; QUEUE_BACKEND_IS_ADVANCING(i) = false; + QUEUE_BACKEND_HAS_PATTERNS(i) = false; } /* @@ -1525,6 +1540,9 @@ BecomeRegisteredListener(void) * an entry in localChannelTable, and pre-allocating an entry in the shared * globalChannelTable with removeOnAbort set. AtCommit_Notify will clear * removeOnAbort; abort processing will remove entries still marked so. + * + * For pattern channels, we add them to the local pattern list instead of the + * global channel table, since they cannot be looked up by exact name. */ static void PrepareTableEntriesForListen(const char *channel) @@ -1534,6 +1552,7 @@ PrepareTableEntriesForListen(const char *channel) bool found; ListenerEntry *listeners; PendingListenEntry *pending; + bool is_pattern = IsPattern(channel); /* * Record in local pending hash that we want to LISTEN, overwriting any @@ -1542,6 +1561,7 @@ PrepareTableEntriesForListen(const char *channel) pending = (PendingListenEntry *) hash_search(pendingListenActions, channel, HASH_ENTER, NULL); pending->action = PENDING_LISTEN; + pending->is_pattern = is_pattern; /* * Ensure that there is an entry for the channel in localChannelTable. @@ -1556,6 +1576,38 @@ PrepareTableEntriesForListen(const char *channel) */ (void) hash_search(localChannelTable, channel, HASH_ENTER, NULL); + /* + * For pattern channels, add to the local pattern list and set the + * hasPatterns flag so we get signaled for all notifications. We don't add + * patterns to globalChannelTable since they can't be matched by exact + * channel name lookup. + */ + if (is_pattern) + { + MemoryContext oldcxt; + char *pattern_copy; + + /* Check if we already have this pattern */ + foreach_ptr(char, existing, localPatternList) + { + if (strcmp(existing, channel) == 0) + return; /* already have it */ + } + + /* Add pattern to the list in TopMemoryContext so it persists */ + oldcxt = MemoryContextSwitchTo(TopMemoryContext); + pattern_copy = pstrdup(channel); + localPatternList = lappend(localPatternList, pattern_copy); + MemoryContextSwitchTo(oldcxt); + + /* Set the flag so we get woken up for any notification */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + QUEUE_BACKEND_HAS_PATTERNS(MyProcNumber) = true; + LWLockRelease(NotifyQueueLock); + + return; + } + /* Pre-allocate entry in shared globalChannelTable */ GlobalChannelKeyInit(&key, MyDatabaseId, channel); entry = dshash_find_or_insert(globalChannelTable, &key, &found); @@ -1650,6 +1702,7 @@ PrepareTableEntriesForUnlisten(const char *channel) pending = (PendingListenEntry *) hash_search(pendingListenActions, channel, HASH_ENTER, NULL); pending->action = PENDING_UNLISTEN; + pending->is_pattern = IsPattern(channel); } /* @@ -1676,6 +1729,16 @@ PrepareTableEntriesForUnlistenAll(void) pending = (PendingListenEntry *) hash_search(pendingListenActions, channelEntry->channel, HASH_ENTER, NULL); pending->action = PENDING_UNLISTEN; + pending->is_pattern = IsPattern(channelEntry->channel); + } + + /* Also unlisten from all pattern channels */ + foreach_ptr(char, pattern, localPatternList) + { + pending = (PendingListenEntry *) + hash_search(pendingListenActions, pattern, HASH_ENTER, NULL); + pending->action = PENDING_UNLISTEN; + pending->is_pattern = true; } } @@ -1738,6 +1801,54 @@ ApplyPendingListenActions(bool isCommit) bool removeLocal = true; bool foundListener = false; + /* + * Handle pattern channels specially - they're not in + * globalChannelTable. + */ + if (pending->is_pattern) + { + if (isCommit && pending->action == PENDING_LISTEN) + { + /* Pattern LISTEN committed - keep in localPatternList */ + removeLocal = false; + } + + /* + * If removeLocal is true, it means we are either aborting or + * committing an UNLISTEN. In both cases, remove from the list. + */ + if (removeLocal) + { + ListCell *lc; + + foreach(lc, localPatternList) + { + char *pattern = (char *) lfirst(lc); + + if (strcmp(pattern, pending->channel) == 0) + { + localPatternList = foreach_delete_current(localPatternList, lc); + pfree(pattern); + break; + } + } + + /* Update hasPatterns flag if we have no more patterns */ + if (localPatternList == NIL) + { + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + QUEUE_BACKEND_HAS_PATTERNS(MyProcNumber) = false; + LWLockRelease(NotifyQueueLock); + } + } + + /* Remove from localChannelTable if needed */ + if (removeLocal && localChannelTable != NULL) + (void) hash_search(localChannelTable, pending->channel, + HASH_REMOVE, NULL); + continue; + } + /* * Find the global entry for this channel. If isCommit, it had better * exist (it was created in PreCommit). In an abort, it might not @@ -1855,6 +1966,13 @@ CleanupListenersOnExit(void) localChannelTable = NULL; } + /* Clear the pattern list */ + if (localPatternList != NIL) + { + list_free_deep(localPatternList); + localPatternList = NIL; + } + /* Now remove our entries from the shared globalChannelTable */ if (globalChannelTable == NULL) return; @@ -1891,6 +2009,107 @@ CleanupListenersOnExit(void) dshash_seq_term(&status); } +/* + * Check if a channel name contains pattern wildcards (* or ?). + */ +static bool +IsPattern(const char *channel) +{ + return (strchr(channel, '*') != NULL || strchr(channel, '?') != NULL); +} + +/* + * Match a channel name against a glob-style pattern. + * Supports: + * * - matches zero or more characters + * ? - matches exactly one character + * \ - escapes the next character (to match literal * or ?) + * + * Returns true if the channel matches the pattern. + */ +static bool +MatchPattern(const char *channel, const char *pattern) +{ + const char *c = channel; + const char *p = pattern; + + /* Position in channel and pattern after last * match */ + const char *c_star = NULL; + const char *p_star = NULL; + + while (*c != '\0') + { + if (*p == '\\' && *(p + 1) != '\0') + { + /* Escaped character - must match literally */ + p++; + if (*c == *p) + { + c++; + p++; + } + else if (p_star != NULL) + { + /* Backtrack to last * */ + p = p_star + 1; + c = ++c_star; + } + else + return false; + } + else if (*p == '?') + { + /* ? matches any single character */ + c++; + p++; + } + else if (*p == '*') + { + /* Remember position for backtracking */ + p_star = p++; + c_star = c; + } + else if (*c == *p) + { + /* Literal match */ + c++; + p++; + } + else if (p_star != NULL) + { + /* Mismatch, but we have a * to backtrack to */ + p = p_star + 1; + c = ++c_star; + } + else + { + /* Mismatch with no * to backtrack to */ + return false; + } + } + + /* Skip trailing *'s in pattern */ + while (*p == '*') + p++; + + return (*p == '\0'); +} + +/* + * Check if the channel matches any pattern we are listening on. + */ +static bool +IsListeningOnPattern(const char *channel) +{ + foreach_ptr(char, pattern, localPatternList) + { + if (MatchPattern(channel, pattern)) + return true; + } + + return false; +} + /* * Test whether we are actively listening on the given channel name. * @@ -1902,7 +2121,10 @@ IsListeningOn(const char *channel) if (localChannelTable == NULL) return false; - return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL); + if (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL) + return true; + + return IsListeningOnPattern(channel); } /* @@ -1926,6 +2148,7 @@ asyncQueueUnregister(void) QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid; QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false; QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false; + QUEUE_BACKEND_HAS_PATTERNS(MyProcNumber) = false; /* and remove it from the list */ if (QUEUE_FIRST_LISTENER == MyProcNumber) QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber); @@ -2337,6 +2560,10 @@ SignalBackends(void) * flags above). Check to see if we can directly advance their queue * pointers to save a wakeup. Otherwise, if they are far behind, wake * them anyway so they will catch up. + * + * Also signal backends that are listening on pattern channels, since they + * may match our notifications but weren't found in the per-channel lookup + * above. */ for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) { @@ -2353,6 +2580,24 @@ SignalBackends(void) pid = QUEUE_BACKEND_PID(i); pos = QUEUE_BACKEND_POS(i); + /* + * If this backend has pattern listeners and is in the same database, + * we need to wake it up so it can check if any notifications match + * its patterns. + */ + if (QUEUE_BACKEND_HAS_PATTERNS(i) && + QUEUE_BACKEND_DBOID(i) == MyDatabaseId && + !QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) + { + Assert(pid != InvalidPid); + + QUEUE_BACKEND_WAKEUP_PENDING(i) = true; + signalPids[count] = pid; + signalProcnos[count] = i; + count++; + continue; + } + /* * We can directly advance the other backend's queue pointer if it's * not currently advancing (else there are race conditions), and its diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out index 55b7cbc6e02..95980ded634 100644 --- a/src/test/isolation/expected/async-notify.out +++ b/src/test/isolation/expected/async-notify.out @@ -1,4 +1,4 @@ -Parsed test spec with 6 sessions +Parsed test spec with 8 sessions starting permutation: listenc notify1 notify2 notify3 notifyf step listenc: LISTEN c1; LISTEN c2; @@ -238,3 +238,90 @@ nonzero t (1 row) + +starting permutation: plisten_star pnotify_user_login pnotify_user_logout pnotify_user_empty pcheck +step plisten_star: LISTEN "user_*"; +step pnotify_user_login: NOTIFY user_login, 'login payload'; +step pnotify_user_logout: NOTIFY user_logout, 'logout payload'; +step pnotify_user_empty: NOTIFY user_, 'empty suffix'; +step pcheck: SELECT 1 AS x; +x +- +1 +(1 row) + +pattern_listener: NOTIFY "user_login" with payload "login payload" from pattern_notifier +pattern_listener: NOTIFY "user_logout" with payload "logout payload" from pattern_notifier +pattern_listener: NOTIFY "user_" with payload "empty suffix" from pattern_notifier + +starting permutation: plisten_question pnotify_msg_1 pnotify_msg_a pnotify_msg_12 pcheck +step plisten_question: LISTEN "msg_?"; +step pnotify_msg_1: NOTIFY msg_1, 'message 1'; +step pnotify_msg_a: NOTIFY msg_a, 'message a'; +step pnotify_msg_12: NOTIFY msg_12, 'should not match'; +step pcheck: SELECT 1 AS x; +x +- +1 +(1 row) + +pattern_listener: NOTIFY "msg_1" with payload "message 1" from pattern_notifier +pattern_listener: NOTIFY "msg_a" with payload "message a" from pattern_notifier + +starting permutation: plisten_complex pnotify_prefix_mid_suffix pnotify_prefix_suffix pnotify_prefix_a_b_suffix pcheck +step plisten_complex: LISTEN "prefix_*_suffix"; +step pnotify_prefix_mid_suffix: NOTIFY prefix_mid_suffix, 'complex match'; +step pnotify_prefix_suffix: NOTIFY prefix_suffix, 'no middle'; +step pnotify_prefix_a_b_suffix: NOTIFY prefix_a_b_suffix, 'multi char middle'; +step pcheck: SELECT 1 AS x; +x +- +1 +(1 row) + +pattern_listener: NOTIFY "prefix_mid_suffix" with payload "complex match" from pattern_notifier +pattern_listener: NOTIFY "prefix_a_b_suffix" with payload "multi char middle" from pattern_notifier + +starting permutation: plisten_multi pnotify_event_created pnotify_alert_x pnotify_nomatch pcheck +step plisten_multi: LISTEN "event_*"; LISTEN "alert_?"; +step pnotify_event_created: NOTIFY event_created, 'event'; +step pnotify_alert_x: NOTIFY alert_x, 'alert'; +step pnotify_nomatch: NOTIFY completely_different, 'no match'; +step pcheck: SELECT 1 AS x; +x +- +1 +(1 row) + +pattern_listener: NOTIFY "event_created" with payload "event" from pattern_notifier +pattern_listener: NOTIFY "alert_x" with payload "alert" from pattern_notifier + +starting permutation: plisten_star pnotify_user_login pcheck +step plisten_star: LISTEN "user_*"; +step pnotify_user_login: NOTIFY user_login, 'login payload'; +step pcheck: SELECT 1 AS x; +x +- +1 +(1 row) + +pattern_listener: NOTIFY "user_login" with payload "login payload" from pattern_notifier + +starting permutation: llisten plisten_star notify1 pnotify_user_login lcheck pcheck +step llisten: LISTEN c1; LISTEN c2; +step plisten_star: LISTEN "user_*"; +step notify1: NOTIFY c1; +step pnotify_user_login: NOTIFY user_login, 'login payload'; +step lcheck: SELECT 1 AS x; +x +- +1 +(1 row) + +step pcheck: SELECT 1 AS x; +x +- +1 +(1 row) + +pattern_listener: NOTIFY "user_login" with payload "login payload" from pattern_notifier diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec index 7aef2e8d180..ab4cc2449dc 100644 --- a/src/test/isolation/specs/async-notify.spec +++ b/src/test/isolation/specs/async-notify.spec @@ -94,6 +94,30 @@ step lsnotify { NOTIFY c1, 'subxact_test'; } step lsnotify_check { NOTIFY c1, 'should_not_receive'; } teardown { UNLISTEN *; } +# Session for pattern listening tests +session pattern_listener +step plisten_star { LISTEN "user_*"; } +step plisten_question { LISTEN "msg_?"; } +step plisten_complex { LISTEN "prefix_*_suffix"; } +step plisten_multi { LISTEN "event_*"; LISTEN "alert_?"; } +step pcheck { SELECT 1 AS x; } +teardown { UNLISTEN *; } + +# Session for sending notifications to pattern listeners +session pattern_notifier +step pnotify_user_login { NOTIFY user_login, 'login payload'; } +step pnotify_user_logout { NOTIFY user_logout, 'logout payload'; } +step pnotify_user_empty { NOTIFY user_, 'empty suffix'; } +step pnotify_msg_1 { NOTIFY msg_1, 'message 1'; } +step pnotify_msg_a { NOTIFY msg_a, 'message a'; } +step pnotify_msg_12 { NOTIFY msg_12, 'should not match'; } +step pnotify_prefix_mid_suffix { NOTIFY prefix_mid_suffix, 'complex match'; } +step pnotify_prefix_suffix { NOTIFY prefix_suffix, 'no middle'; } +step pnotify_prefix_a_b_suffix { NOTIFY prefix_a_b_suffix, 'multi char middle'; } +step pnotify_event_created { NOTIFY event_created, 'event'; } +step pnotify_alert_x { NOTIFY alert_x, 'alert'; } +step pnotify_nomatch { NOTIFY completely_different, 'no match'; } + # Trivial cases. permutation listenc notify1 notify2 notify3 notifyf @@ -145,3 +169,32 @@ permutation lch_listen nch_notify lch_check # Hence, this should be the last test in this script. permutation llisten lbegin usage bignotify usage + +# +# Pattern LISTEN/NOTIFY tests +# + +# Basic pattern with * (zero or more characters) +# Should receive: user_login, user_logout, user_ (empty suffix matches *) +permutation plisten_star pnotify_user_login pnotify_user_logout pnotify_user_empty pcheck + +# Pattern with ? (exactly one character) +# Should receive: msg_1, msg_a +# Should NOT receive: msg_12 (two chars after _) +permutation plisten_question pnotify_msg_1 pnotify_msg_a pnotify_msg_12 pcheck + +# Complex pattern with * in the middle +# Should receive: prefix_mid_suffix, prefix_a_b_suffix +# Should NOT receive: prefix_suffix (requires at least one char for *) +permutation plisten_complex pnotify_prefix_mid_suffix pnotify_prefix_suffix pnotify_prefix_a_b_suffix pcheck + +# Multiple patterns on same session +# Should receive: event_created (matches event_*), alert_x (matches alert_?) +# Should NOT receive: completely_different +permutation plisten_multi pnotify_event_created pnotify_alert_x pnotify_nomatch pcheck + +# Cross-session pattern test: listener listens before notifier sends +permutation plisten_star pnotify_user_login pcheck + +# Pattern listener with regular listener (mixed) +permutation llisten plisten_star notify1 pnotify_user_login lcheck pcheck diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out index 19cbe38e636..c5236c15fb9 100644 --- a/src/test/regress/expected/async.out +++ b/src/test/regress/expected/async.out @@ -40,3 +40,24 @@ SELECT pg_notification_queue_usage(); 0 (1 row) +-- +-- Pattern LISTEN tests +-- +-- Pattern with * (matches zero or more characters) +LISTEN "user_*"; +UNLISTEN "user_*"; +-- Pattern with ? (matches exactly one character) +LISTEN "msg_?"; +UNLISTEN "msg_?"; +-- Pattern with wildcards in the middle +LISTEN "prefix_*_suffix"; +UNLISTEN "prefix_*_suffix"; +-- Escaped wildcards (for literal * and ? matching) +LISTEN "literal\*star"; +LISTEN "literal\?question"; +UNLISTEN "literal\*star"; +UNLISTEN "literal\?question"; +-- Multiple patterns +LISTEN "event_*"; +LISTEN "alert_?"; +UNLISTEN *; diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql index 40f6e015387..ca666c3019e 100644 --- a/src/test/regress/sql/async.sql +++ b/src/test/regress/sql/async.sql @@ -21,3 +21,30 @@ UNLISTEN *; -- Should return zero while there are no pending notifications. -- src/test/isolation/specs/async-notify.spec tests for actual usage. SELECT pg_notification_queue_usage(); + +-- +-- Pattern LISTEN tests +-- + +-- Pattern with * (matches zero or more characters) +LISTEN "user_*"; +UNLISTEN "user_*"; + +-- Pattern with ? (matches exactly one character) +LISTEN "msg_?"; +UNLISTEN "msg_?"; + +-- Pattern with wildcards in the middle +LISTEN "prefix_*_suffix"; +UNLISTEN "prefix_*_suffix"; + +-- Escaped wildcards (for literal * and ? matching) +LISTEN "literal\*star"; +LISTEN "literal\?question"; +UNLISTEN "literal\*star"; +UNLISTEN "literal\?question"; + +-- Multiple patterns +LISTEN "event_*"; +LISTEN "alert_?"; +UNLISTEN *; -- 2.50.1 (Apple Git-155)