From 0ab676629c405a030603274872788c11e7508d59 Mon Sep 17 00:00:00 2001 From: Joel Jacobson Date: Tue, 25 Nov 2025 15:42:58 +0100 Subject: [PATCH 3/3] Execute LISTEN/UNLISTEN during PreCommit --- src/backend/commands/async.c | 239 +++++++++++++++-------------------- 1 file changed, 100 insertions(+), 139 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index ea6ea9e09a8..36b83896363 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -438,10 +438,9 @@ static inline int64 asyncQueuePageDiff(int64 p, int64 q); static inline bool asyncQueuePagePrecedes(int64 p, int64 q); static void queue_listen(ListenActionKind action, const char *channel); static void Async_UnlistenOnExit(int code, Datum arg); -static void Exec_ListenPreCommit(void); -static void Exec_ListenCommit(const char *channel); -static void Exec_UnlistenCommit(const char *channel); -static void Exec_UnlistenAllCommit(void); +static void Exec_Listen(const char *channel); +static void Exec_Unlisten(const char *channel); +static void Exec_UnlistenAll(void); static bool IsListeningOn(const char *channel); static void asyncQueueUnregister(void); static bool asyncQueueIsFull(void); @@ -848,7 +847,7 @@ pg_listening_channels(PG_FUNCTION_ARGS) static void Async_UnlistenOnExit(int code, Datum arg) { - Exec_UnlistenAllCommit(); + Exec_UnlistenAll(); asyncQueueUnregister(); } @@ -877,7 +876,7 @@ AtPrepare_Notify(void) * If there are pending LISTEN actions, make sure we are listed in the * shared-memory listener array. This must happen before commit to * ensure we don't miss any notifies from transactions that commit - * just after ours. + * just after ours. This function also update listenChannels. * * If there are outbound notify requests in the pendingNotifies list, * add them to the global queue. We do that before commit so that @@ -894,7 +893,7 @@ PreCommit_Notify(void) if (Trace_notify) elog(DEBUG1, "PreCommit_Notify"); - /* Preflight for any pending listen/unlisten actions */ + /* Perform any pending listen/unlisten actions */ if (pendingActions != NULL) { foreach(p, pendingActions->actions) @@ -904,18 +903,22 @@ PreCommit_Notify(void) switch (actrec->action) { case LISTEN_LISTEN: - Exec_ListenPreCommit(); + Exec_Listen(actrec->channel); break; case LISTEN_UNLISTEN: - /* there is no Exec_UnlistenPreCommit() */ + Exec_Unlisten(actrec->channel); break; case LISTEN_UNLISTEN_ALL: - /* there is no Exec_UnlistenAllPreCommit() */ + Exec_UnlistenAll(); break; } } } + /* If no longer listening to anything, get out of listener array */ + if (amRegisteredListener && listenChannels == NIL) + asyncQueueUnregister(); + /* Queue any pending notifies (must happen after the above) */ if (pendingNotifies) { @@ -1005,18 +1008,16 @@ PreCommit_Notify(void) * * This is called at transaction commit, after committing to clog. * - * Update listenChannels and clear transaction-local state. - * * If we issued any notifications in the transaction, send signals to * listening backends (possibly including ourselves) to process them. * Also, if we filled enough queue pages with new notifies, try to * advance the queue tail pointer. + * + * Finally, clear transaction-local state. */ void AtCommit_Notify(void) { - ListCell *p; - /* * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to * return as soon as possible @@ -1027,32 +1028,6 @@ AtCommit_Notify(void) if (Trace_notify) elog(DEBUG1, "AtCommit_Notify"); - /* Perform any pending listen/unlisten actions */ - if (pendingActions != NULL) - { - foreach(p, pendingActions->actions) - { - ListenAction *actrec = (ListenAction *) lfirst(p); - - switch (actrec->action) - { - case LISTEN_LISTEN: - Exec_ListenCommit(actrec->channel); - break; - case LISTEN_UNLISTEN: - Exec_UnlistenCommit(actrec->channel); - break; - case LISTEN_UNLISTEN_ALL: - Exec_UnlistenAllCommit(); - break; - } - } - } - - /* If no longer listening to anything, get out of listener array */ - if (amRegisteredListener && listenChannels == NIL) - asyncQueueUnregister(); - /* * Send signals to listening backends. We need do this only if there are * pending notifies, which were previously added to the shared queue by @@ -1066,109 +1041,100 @@ AtCommit_Notify(void) } /* - * Exec_ListenPreCommit --- subroutine for PreCommit_Notify + * Exec_Listen --- subroutine for PreCommit_Notify * - * This function must make sure we are ready to catch any incoming messages. + * This function must make sure we are ready to catch any incoming messages, + * and adds the channel to the list of channels we are listening on. */ static void -Exec_ListenPreCommit(void) +Exec_Listen(const char *channel) { QueuePosition head; QueuePosition max; ProcNumber prevListener; + MemoryContext oldcontext; + + if (Trace_notify) + elog(DEBUG1, "Exec_Listen(%s,%d)", channel, MyProcPid); /* * Nothing to do if we are already listening to something, nor if we * already ran this routine in this transaction. */ - if (amRegisteredListener) - return; - - if (Trace_notify) - elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid); - - /* - * Before registering, make sure we will unlisten before dying. (Note: - * this action does not get undone if we abort later.) - */ - if (!unlistenExitRegistered) + if (!amRegisteredListener) { - before_shmem_exit(Async_UnlistenOnExit, 0); - unlistenExitRegistered = true; - } + /* + * Before registering, make sure we will unlisten before dying. (Note: + * this action does not get undone if we abort later.) + */ + if (!unlistenExitRegistered) + { + before_shmem_exit(Async_UnlistenOnExit, 0); + unlistenExitRegistered = true; + } - /* - * This is our first LISTEN, so establish our pointer. - * - * We set our pointer to the global tail pointer and then move it forward - * over already-committed notifications. This ensures we cannot miss any - * not-yet-committed notifications. We might get a few more but that - * doesn't hurt. - * - * In some scenarios there might be a lot of committed notifications that - * have not yet been pruned away (because some backend is being lazy about - * reading them). To reduce our startup time, we can look at other - * backends and adopt the maximum "pos" pointer of any backend that's in - * our database; any notifications it's already advanced over are surely - * committed and need not be re-examined by us. (We must consider only - * backends connected to our DB, because others will not have bothered to - * check committed-ness of notifications in our DB.) - * - * We need exclusive lock here so we can look at other backends' entries - * and manipulate the list links. - */ - LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); - head = QUEUE_HEAD; - max = QUEUE_TAIL; - prevListener = INVALID_PROC_NUMBER; - for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) - { - if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) - max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i)); - /* Also find last listening backend before this one */ - if (i < MyProcNumber) - prevListener = i; - } - QUEUE_BACKEND_POS(MyProcNumber) = max; - QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid; - QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId; - /* Insert backend into list of listeners at correct position */ - if (prevListener != INVALID_PROC_NUMBER) - { - QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener); - QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber; - } - else - { - QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER; - QUEUE_FIRST_LISTENER = MyProcNumber; - } - LWLockRelease(NotifyQueueLock); + /* + * This is our first LISTEN, so establish our pointer. + * + * We set our pointer to the global tail pointer and then move it forward + * over already-committed notifications. This ensures we cannot miss any + * not-yet-committed notifications. We might get a few more but that + * doesn't hurt. + * + * In some scenarios there might be a lot of committed notifications that + * have not yet been pruned away (because some backend is being lazy about + * reading them). To reduce our startup time, we can look at other + * backends and adopt the maximum "pos" pointer of any backend that's in + * our database; any notifications it's already advanced over are surely + * committed and need not be re-examined by us. (We must consider only + * backends connected to our DB, because others will not have bothered to + * check committed-ness of notifications in our DB.) + * + * We need exclusive lock here so we can look at other backends' entries + * and manipulate the list links. + */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + head = QUEUE_HEAD; + max = QUEUE_TAIL; + prevListener = INVALID_PROC_NUMBER; + for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) + { + if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) + max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i)); + /* Also find last listening backend before this one */ + if (i < MyProcNumber) + prevListener = i; + } + QUEUE_BACKEND_POS(MyProcNumber) = max; + QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid; + QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId; + /* Insert backend into list of listeners at correct position */ + if (prevListener != INVALID_PROC_NUMBER) + { + QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener); + QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber; + } + else + { + QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER; + QUEUE_FIRST_LISTENER = MyProcNumber; + } + LWLockRelease(NotifyQueueLock); - /* Now we are listed in the global array, so remember we're listening */ - amRegisteredListener = true; + /* Now we are listed in the global array, so remember we're listening */ + amRegisteredListener = true; - /* - * Try to move our pointer forward as far as possible. This will skip - * over already-committed notifications, which we want to do because they - * might be quite stale. Note that we are not yet listening on anything, - * so we won't deliver such notifications to our frontend. Also, although - * our transaction might have executed NOTIFY, those message(s) aren't - * queued yet so we won't skip them here. - */ - if (!QUEUE_POS_EQUAL(max, head)) - asyncQueueReadAllNotifications(); -} - -/* - * Exec_ListenCommit --- subroutine for AtCommit_Notify - * - * Add the channel to the list of channels we are listening on. - */ -static void -Exec_ListenCommit(const char *channel) -{ - MemoryContext oldcontext; + /* + * Try to move our pointer forward as far as possible. This will skip + * over already-committed notifications, which we want to do because they + * might be quite stale. Note that we are not yet listening on anything, + * so we won't deliver such notifications to our frontend. Also, although + * our transaction might have executed NOTIFY, those message(s) aren't + * queued yet so we won't skip them here. + */ + if (!QUEUE_POS_EQUAL(max, head)) + asyncQueueReadAllNotifications(); + } /* Do nothing if we are already listening on this channel */ if (IsListeningOn(channel)) @@ -1176,11 +1142,6 @@ Exec_ListenCommit(const char *channel) /* * Add the new channel name to listenChannels. - * - * XXX It is theoretically possible to get an out-of-memory failure here, - * which would be bad because we already committed. For the moment it - * doesn't seem worth trying to guard against that, but maybe improve this - * later. */ oldcontext = MemoryContextSwitchTo(TopMemoryContext); listenChannels = lappend(listenChannels, pstrdup(channel)); @@ -1188,17 +1149,17 @@ Exec_ListenCommit(const char *channel) } /* - * Exec_UnlistenCommit --- subroutine for AtCommit_Notify + * Exec_Unlisten --- subroutine for AtCommit_Notify * * Remove the specified channel name from listenChannels. */ static void -Exec_UnlistenCommit(const char *channel) +Exec_Unlisten(const char *channel) { ListCell *q; if (Trace_notify) - elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid); + elog(DEBUG1, "Exec_Unlisten(%s,%d)", channel, MyProcPid); foreach(q, listenChannels) { @@ -1219,15 +1180,15 @@ Exec_UnlistenCommit(const char *channel) } /* - * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify + * Exec_UnlistenAll --- subroutine for PreCommit_Notify * * Unlisten on all channels for this backend. */ static void -Exec_UnlistenAllCommit(void) +Exec_UnlistenAll(void) { if (Trace_notify) - elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid); + elog(DEBUG1, "Exec_UnlistenAll(%d)", MyProcPid); list_free_deep(listenChannels); listenChannels = NIL; @@ -1927,7 +1888,7 @@ asyncQueueReadAllNotifications(void) * * What we do guarantee is that we'll see all notifications from * transactions committing after the snapshot we take here. - * Exec_ListenPreCommit has already added us to the listener array, + * Exec_Listen has already added us to the listener array, * so no not-yet-committed messages can be removed from the queue * before we see them. *---------- -- 2.50.1