From 3d9c474bc60dded52df5673724f9e793b58b1bcd Mon Sep 17 00:00:00 2001 From: Joel Jacobson Date: Tue, 25 Nov 2025 10:11:30 +0100 Subject: [PATCH 3/3] Convert listenChannels to hash table --- src/backend/commands/async.c | 230 ++++++++++++++++++++++--------- src/tools/pgindent/typedefs.list | 1 + 2 files changed, 168 insertions(+), 63 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index d5676d43a30..ebc06ba771d 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -68,7 +68,7 @@ * CommitTransaction() which will then do the actual transaction commit. * * After commit we are called another time (AtCommit_Notify()). Here we - * make any actual updates to the effective listen state (listenChannels). + * make any actual updates to the effective listen state (listenChannelsHash). * Then we signal any backends that may be interested in our messages * (including our own backend, if listening). This is done by * SignalBackends(), which scans the list of listening backends and sends a @@ -313,16 +313,25 @@ static SlruCtlData NotifyCtlData; #define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */ /* - * listenChannels identifies the channels we are actually listening to - * (ie, have committed a LISTEN on). It is a simple list of channel names, - * allocated in TopMemoryContext. + * listenChannelsHash identifies the channels we are listening to (or will be + * after the current transaction commits). Entries with active=true are + * committed listens; entries with active=false are pre-allocated for pending + * LISTENs that will become active at commit. + * + * This hash table is allocated in TopMemoryContext. */ -static List *listenChannels = NIL; /* list of C strings */ +typedef struct ListenChannelEntry +{ + char channel[NAMEDATALEN]; /* hash key - must be first */ + bool active; /* true if committed listen */ +} ListenChannelEntry; + +static HTAB *listenChannelsHash = NULL; /* * State for pending LISTEN/UNLISTEN actions consists of an ordered list of * all actions requested in the current transaction. As explained above, - * we don't actually change listenChannels until we reach transaction commit. + * we don't actually change listenChannelsHash until we reach transaction commit. * * The list is kept in CurTransactionContext. In subtransactions, each * subtransaction has its own list in its own CurTransactionContext, but @@ -438,7 +447,9 @@ static inline int64 asyncQueuePageDiff(int64 p, int64 q); static inline bool asyncQueuePagePrecedes(int64 p, int64 q); static void queue_listen(ListenActionKind action, const char *channel); static void Async_UnlistenOnExit(int code, Datum arg); -static void Exec_ListenPreCommit(void); +static void initListenChannelsHash(void); +static void CleanupInactiveListenChannels(void); +static void Exec_ListenPreCommit(const char *channel); static void Exec_ListenCommit(const char *channel); static void Exec_UnlistenCommit(const char *channel); static void Exec_UnlistenAllCommit(void); @@ -504,6 +515,60 @@ initSignalArrays(void) MemoryContextSwitchTo(oldcontext); } +/* + * initListenChannelsHash + * Lazy initialization of the listen channels hash table. + */ +static void +initListenChannelsHash(void) +{ + HASHCTL hash_ctl; + + if (listenChannelsHash != NULL) + return; + + memset(&hash_ctl, 0, sizeof(hash_ctl)); + hash_ctl.keysize = NAMEDATALEN; + hash_ctl.entrysize = sizeof(ListenChannelEntry); + + listenChannelsHash = hash_create("Listen Channels", 64, &hash_ctl, + HASH_ELEM | HASH_STRINGS); +} + +/* + * CleanupInactiveListenChannels + * Remove all entries with active=false from the hash table. + * If the hash table becomes empty, destroy it. + */ +static void +CleanupInactiveListenChannels(void) +{ + HASH_SEQ_STATUS status; + ListenChannelEntry *entry; + + if (listenChannelsHash == NULL) + return; + + hash_seq_init(&status, listenChannelsHash); + + while ((entry = hash_seq_search(&status)) != NULL) + { + if (!entry->active) + { + if (hash_search(listenChannelsHash, entry->channel, + HASH_REMOVE, NULL) == NULL) + elog(ERROR, "hash table corrupted"); + } + } + + /* Destroy hash table if now empty */ + if (hash_get_num_entries(listenChannelsHash) == 0) + { + hash_destroy(listenChannelsHash); + listenChannelsHash = NULL; + } +} + /* * Report space needed for our shared memory area */ @@ -709,7 +774,7 @@ Async_Notify(const char *channel, const char *payload) * Common code for listen, unlisten, unlisten all commands. * * Adds the request to the list of pending actions. - * Actual update of the listenChannels list happens during transaction + * Actual update of the listenChannelsHash happens during transaction * commit. */ static void @@ -809,30 +874,52 @@ Async_UnlistenAll(void) * SQL function: return a set of the channel names this backend is actively * listening to. * - * Note: this coding relies on the fact that the listenChannels list cannot + * Note: this coding relies on the fact that the listenChannelsHash cannot * change within a transaction. */ Datum pg_listening_channels(PG_FUNCTION_ARGS) { FuncCallContext *funcctx; + HASH_SEQ_STATUS *status; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { + MemoryContext oldcontext; + /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); + + if (listenChannelsHash != NULL) + { + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + status = palloc(sizeof(HASH_SEQ_STATUS)); + hash_seq_init(status, listenChannelsHash); + funcctx->user_fctx = status; + MemoryContextSwitchTo(oldcontext); + } + else + funcctx->user_fctx = NULL; } /* stuff done on every call of the function */ funcctx = SRF_PERCALL_SETUP(); + status = funcctx->user_fctx; - if (funcctx->call_cntr < list_length(listenChannels)) + if (status != NULL) { - char *channel = (char *) list_nth(listenChannels, - funcctx->call_cntr); + ListenChannelEntry *entry = hash_seq_search(status); - SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel)); + if (entry != NULL) + { + /* + * All entries should be active; inactive ones are cleaned up at + * commit/abort + */ + Assert(entry->active); + SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(entry->channel)); + } } SRF_RETURN_DONE(funcctx); @@ -849,6 +936,7 @@ static void Async_UnlistenOnExit(int code, Datum arg) { Exec_UnlistenAllCommit(); + CleanupInactiveListenChannels(); asyncQueueUnregister(); } @@ -904,7 +992,7 @@ PreCommit_Notify(void) switch (actrec->action) { case LISTEN_LISTEN: - Exec_ListenPreCommit(); + Exec_ListenPreCommit(actrec->channel); break; case LISTEN_UNLISTEN: /* there is no Exec_UnlistenPreCommit() */ @@ -1005,7 +1093,7 @@ PreCommit_Notify(void) * * This is called at transaction commit, after committing to clog. * - * Update listenChannels and clear transaction-local state. + * Update listenChannelsHash and clear transaction-local state. * * If we issued any notifications in the transaction, send signals to * listening backends (possibly including ourselves) to process them. @@ -1049,8 +1137,11 @@ AtCommit_Notify(void) } } + /* Clean up inactive entries from the hash table */ + CleanupInactiveListenChannels(); + /* If no longer listening to anything, get out of listener array */ - if (amRegisteredListener && listenChannels == NIL) + if (amRegisteredListener && listenChannelsHash == NULL) asyncQueueUnregister(); /* @@ -1071,11 +1162,22 @@ AtCommit_Notify(void) * This function must make sure we are ready to catch any incoming messages. */ static void -Exec_ListenPreCommit(void) +Exec_ListenPreCommit(const char *channel) { QueuePosition head; QueuePosition max; ProcNumber prevListener; + ListenChannelEntry *entry; + bool found; + + /* + * Pre-allocate the hash table entry for this channel. This happens before + * commit, so an OOM error here is safe (causes transaction abort). + */ + initListenChannelsHash(); + entry = hash_search(listenChannelsHash, channel, HASH_ENTER, &found); + if (!found) + entry->active = false; /* * Nothing to do if we are already listening to something, nor if we @@ -1085,7 +1187,7 @@ Exec_ListenPreCommit(void) return; if (Trace_notify) - elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid); + elog(DEBUG1, "Exec_ListenPreCommit(%s,%d)", channel, MyProcPid); /* * Before registering, make sure we will unlisten before dying. (Note: @@ -1163,54 +1265,48 @@ Exec_ListenPreCommit(void) /* * Exec_ListenCommit --- subroutine for AtCommit_Notify * - * Add the channel to the list of channels we are listening on. + * Set the channel's hash entry to active. */ static void Exec_ListenCommit(const char *channel) { - MemoryContext oldcontext; + ListenChannelEntry *entry; - /* Do nothing if we are already listening on this channel */ - if (IsListeningOn(channel)) - return; + if (Trace_notify) + elog(DEBUG1, "Exec_ListenCommit(%s,%d)", channel, MyProcPid); /* - * Add the new channel name to listenChannels. - * - * XXX It is theoretically possible to get an out-of-memory failure here, - * which would be bad because we already committed. For the moment it - * doesn't seem worth trying to guard against that, but maybe improve this - * later. + * Find the pre-allocated entry and mark it active. The entry must exist + * because Exec_ListenPreCommit created it. */ - oldcontext = MemoryContextSwitchTo(TopMemoryContext); - listenChannels = lappend(listenChannels, pstrdup(channel)); - MemoryContextSwitchTo(oldcontext); + Assert(listenChannelsHash != NULL); + entry = hash_search(listenChannelsHash, channel, HASH_FIND, NULL); + Assert(entry != NULL); + + entry->active = true; } /* * Exec_UnlistenCommit --- subroutine for AtCommit_Notify * - * Remove the specified channel name from listenChannels. + * Mark the specified channel as inactive. We unset active rather than + * removing the entry, to avoid needing to allocate memory if a subsequent + * LISTEN in the same transaction re-adds it. */ static void Exec_UnlistenCommit(const char *channel) { - ListCell *q; + ListenChannelEntry *entry; if (Trace_notify) elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid); - foreach(q, listenChannels) - { - char *lchan = (char *) lfirst(q); + if (listenChannelsHash == NULL) + return; - if (strcmp(lchan, channel) == 0) - { - listenChannels = foreach_delete_current(listenChannels, q); - pfree(lchan); - break; - } - } + entry = hash_search(listenChannelsHash, channel, HASH_FIND, NULL); + if (entry != NULL) + entry->active = false; /* * We do not complain about unlistening something not being listened; @@ -1226,34 +1322,35 @@ Exec_UnlistenCommit(const char *channel) static void Exec_UnlistenAllCommit(void) { + HASH_SEQ_STATUS seq; + ListenChannelEntry *entry; + if (Trace_notify) elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid); - list_free_deep(listenChannels); - listenChannels = NIL; + if (listenChannelsHash == NULL) + return; + + hash_seq_init(&seq, listenChannelsHash); + while ((entry = hash_seq_search(&seq)) != NULL) + entry->active = false; } /* * Test whether we are actively listening on the given channel name. * * Note: this function is executed for every notification found in the queue. - * Perhaps it is worth further optimization, eg convert the list to a sorted - * array so we can binary-search it. In practice the list is likely to be - * fairly short, though. */ static bool IsListeningOn(const char *channel) { - ListCell *p; + ListenChannelEntry *entry; - foreach(p, listenChannels) - { - char *lchan = (char *) lfirst(p); + if (listenChannelsHash == NULL) + return false; - if (strcmp(lchan, channel) == 0) - return true; - } - return false; + entry = hash_search(listenChannelsHash, channel, HASH_FIND, NULL); + return (entry != NULL && entry->active); } /* @@ -1263,7 +1360,7 @@ IsListeningOn(const char *channel) static void asyncQueueUnregister(void) { - Assert(listenChannels == NIL); /* else caller error */ + Assert(listenChannelsHash == NULL); /* else caller error */ if (!amRegisteredListener) /* nothing to do */ return; @@ -1695,12 +1792,19 @@ SignalBackends(void) void AtAbort_Notify(void) { + /* + * Clean up any pre-allocated hash entries that weren't committed. These + * have active=false and would have been set to active=true in + * Exec_ListenCommit if the transaction had committed. + */ + CleanupInactiveListenChannels(); + /* * If we LISTEN but then roll back the transaction after PreCommit_Notify, - * we have registered as a listener but have not made any entry in - * listenChannels. In that case, deregister again. + * we have registered as a listener but have not made any active entry in + * listenChannelsHash. In that case, deregister again. */ - if (amRegisteredListener && listenChannels == NIL) + if (amRegisteredListener && listenChannelsHash == NULL) asyncQueueUnregister(); /* And clean up */ @@ -2080,7 +2184,7 @@ asyncQueueProcessPageEntries(QueuePosition *current, * over it on the first LISTEN in a session, and not get stuck on * it indefinitely. */ - if (listenChannels == NIL) + if (listenChannelsHash == NULL) continue; if (TransactionIdDidCommit(qe->xid)) @@ -2332,7 +2436,7 @@ ProcessIncomingNotify(bool flush) notifyInterruptPending = false; /* Do nothing else if we aren't actively listening */ - if (listenChannels == NIL) + if (listenChannelsHash == NULL) return; if (Trace_notify) diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index c751c25a04d..46ea8f2ff8e 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1564,6 +1564,7 @@ ListDictionary ListParsedLex ListenAction ListenActionKind +ListenChannelEntry ListenStmt LoInfo LoadStmt -- 2.50.1