[PATCH] Avoid pallocs in async.c's SignalBackends critical section
Hi hackers,
This patch addresses this comment in async.c's SignalBackends:
* XXX in principle these pallocs could fail, which would be bad.
* Maybe preallocate the arrays? They're not that large, though.
This is unsafe, since AtCommit_Notify effectively runs in a critical
section, so an OOM there would PANIC ("AbortTransaction while in COMMIT
state"), as we can no longer abort safely.
This patch fixes this by adding two static arrays, notifySignalPids and
notifySignalProcs, allocated lazily in TopMemoryContext by
initSignalArrays. PreCommit_Notify now calls initSignalArrays while it's
still safe to ERROR, ensuring the arrays exist before entering the
commit path.
SignalBackends is updated to use these preallocated arrays instead of
allocating temporary ones.
This removes the risk of palloc in a critical section and eliminates
repeated palloc/pfree cycles.
/Joel
Attachments:
0001-async-avoid-pallocs-in-critical-section-v1.patchapplication/octet-stream; name="=?UTF-8?Q?0001-async-avoid-pallocs-in-critical-section-v1.patch?="Download
From 8b5dd645a523f2725c9c473b5831a0d7cbe14ae9 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Sun, 23 Nov 2025 15:25:16 +0100
Subject: [PATCH] Avoid pallocs in SignalBackends critical section.
SignalBackends, reached from AtCommit_Notify, previously palloc'd arrays
on every call to collect target PIDs and ProcNumbers. That's unsafe:
AtCommit_Notify effectively runs in a critical section, so an OOM there
would PANIC ("AbortTransaction while in COMMIT state"), as we can no
longer abort safely.
Fix by adding two static arrays, notifySignalPids and notifySignalProcs,
allocated lazily in TopMemoryContext by initSignalArrays.
PreCommit_Notify now calls initSignalArrays while it's still safe to
ERROR, ensuring the arrays exist before entering the commit path.
SignalBackends is updated to use these preallocated arrays instead of
allocating temporary ones.
This removes the risk of palloc in a critical section and eliminates
repeated palloc/pfree cycles.
---
src/backend/commands/async.c | 51 +++++++++++++++++++++++++++---------
1 file changed, 38 insertions(+), 13 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index e1cf659485a..6d753079553 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -418,6 +418,12 @@ static bool unlistenExitRegistered = false;
/* True if we're currently registered as a listener in asyncQueueControl */
static bool amRegisteredListener = false;
+/*
+ * Arrays for SignalBackends.
+ */
+static int32 *notifySignalPids = NULL;
+static ProcNumber *notifySignalProcs = NULL;
+
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
static bool tryAdvanceTail = false;
@@ -477,6 +483,27 @@ asyncQueuePagePrecedes(int64 p, int64 q)
return p < q;
}
+/*
+ * initSignalArrays
+ * Lazy initialization of the signal arrays.
+ */
+static void
+initSignalArrays(void)
+{
+ MemoryContext oldcontext;
+
+ if (notifySignalProcs != NULL)
+ return;
+
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+ if (notifySignalPids == NULL)
+ notifySignalPids = (int32 *) palloc(MaxBackends * sizeof(int32));
+ notifySignalProcs = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
/*
* Report space needed for our shared memory area
*/
@@ -902,6 +929,13 @@ PreCommit_Notify(void)
*/
(void) GetCurrentTransactionId();
+ /*
+ * We will be calling SignalBackends() at AtCommit_Notify time, so
+ * make sure its auxiliary data structures exist now, where an ERROR
+ * will still abort the transaction cleanly.
+ */
+ initSignalArrays();
+
/*
* Serialize writers by acquiring a special lock that we hold till
* after commit. This ensures that queue entries appear in commit
@@ -1580,20 +1614,13 @@ asyncQueueFillWarning(void)
static void
SignalBackends(void)
{
- int32 *pids;
- ProcNumber *procnos;
int count;
/*
* Identify backends that we need to signal. We don't want to send
* signals while holding the NotifyQueueLock, so this loop just builds a
* list of target PIDs.
- *
- * XXX in principle these pallocs could fail, which would be bad. Maybe
- * preallocate the arrays? They're not that large, though.
*/
- pids = (int32 *) palloc(MaxBackends * sizeof(int32));
- procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
count = 0;
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
@@ -1624,8 +1651,8 @@ SignalBackends(void)
continue;
}
/* OK, need to signal this one */
- pids[count] = pid;
- procnos[count] = i;
+ notifySignalPids[count] = pid;
+ notifySignalProcs[count] = i;
count++;
}
LWLockRelease(NotifyQueueLock);
@@ -1633,7 +1660,7 @@ SignalBackends(void)
/* Now send signals */
for (int i = 0; i < count; i++)
{
- int32 pid = pids[i];
+ int32 pid = notifySignalPids[i];
/*
* If we are signaling our own process, no need to involve the kernel;
@@ -1651,12 +1678,10 @@ SignalBackends(void)
* NotifyQueueLock; which is unlikely but certainly possible. So we
* just log a low-level debug message if it happens.
*/
- if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
+ if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, notifySignalProcs[i]) < 0)
elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
}
- pfree(pids);
- pfree(procnos);
}
/*
--
2.50.1
On 23/11/2025 16:45, Joel Jacobson wrote:
Hi hackers,
This patch addresses this comment in async.c's SignalBackends:
* XXX in principle these pallocs could fail, which would be bad.
* Maybe preallocate the arrays? They're not that large, though.This is unsafe, since AtCommit_Notify effectively runs in a critical
section, so an OOM there would PANIC ("AbortTransaction while in COMMIT
state"), as we can no longer abort safely.
Ugh. I wonder if we should put an actual critical section around those
post-commit cleanup actions? As you said, it's effectively a critical
section already, except that we don't get the benefit of the
AssertNotInCriticalSection assertions.
Or even better, could we move things out of that effective critical
section? It's painful to write code that cannot palloc.
This patch fixes this by adding two static arrays, notifySignalPids and
notifySignalProcs, allocated lazily in TopMemoryContext by
initSignalArrays. PreCommit_Notify now calls initSignalArrays while it's
still safe to ERROR, ensuring the arrays exist before entering the
commit path.SignalBackends is updated to use these preallocated arrays instead of
allocating temporary ones.This removes the risk of palloc in a critical section and eliminates
repeated palloc/pfree cycles.
Makes sense as far as it goes. But there's more: Exec_ListenCommit()
also palloc's, And AtCommit_Notify also calls asyncQueueAdvanceTail(),
which calls SimpleLruTruncate(). I didn't analyze SimpleLruTruncate() in
detail, but I bet it also palloc's, and it's not nice to panic e.g.
because of a failed unlink() call either.
- Heikki
Heikki Linnakangas <hlinnaka@iki.fi> writes:
On 23/11/2025 16:45, Joel Jacobson wrote:
This patch addresses this comment in async.c's SignalBackends:
* XXX in principle these pallocs could fail, which would be bad.
* Maybe preallocate the arrays? They're not that large, though.
Ugh. I wonder if we should put an actual critical section around those
post-commit cleanup actions? As you said, it's effectively a critical
section already, except that we don't get the benefit of the
AssertNotInCriticalSection assertions.
Or even better, could we move things out of that effective critical
section? It's painful to write code that cannot palloc.
I don't think Joel did anybody any favors by separating this patch
fragment from its larger context [1]/messages/by-id/6899c044-4a82-49be-8117-e6f669765f7e@app.fastmail.com. Given the infrequency of
complaints about failures in this area, I'm not sure that the
notational pain of an actual critical section is justified.
But I complained that the changes contemplated in [1]/messages/by-id/6899c044-4a82-49be-8117-e6f669765f7e@app.fastmail.com were raising
the probability of failure, and while working on tamping that back
down we decided to do something about this old gripe too.
There's a relevant comment in CommitTransaction():
* This is all post-commit cleanup. Note that if an error is raised here,
* it's too late to abort the transaction. This should be just
* noncritical resource releasing.
Unfortunately, releasing locks, sending notifies, etc is not all
that "noncritical" if you want the DB to keep functioning well.
But there's a good deal of code in there and making it all obey
the critical-section rules looks painful.
regards, tom lane
[1]: /messages/by-id/6899c044-4a82-49be-8117-e6f669765f7e@app.fastmail.com
On Mon, Nov 24, 2025, at 17:06, Tom Lane wrote:
I don't think Joel did anybody any favors by separating this patch
fragment from its larger context [1].
I'm a bit surprised by this. My intention in splitting it out
was based on earlier advice in [1] that I think made a lot of sense:
[...my idea of a bgworker to kick lagging backends...]
If you feel that that's not robust enough,
you should split it out as a separate patch that's advertised as a
robustness improvement not a performance improvement, and see if you
can get anyone to bite.
In general, I think it's nice when a bigger change can be split into
smaller meaningful committable changes, which seemed possible in this
case.
Heikki also raised a point that seems worth exploring:
AtCommit_Notify currently calls asyncQueueAdvanceTail.
After PreCommit_Notify, we already know whether tryAdvanceTail is
needed, so it looks feasible to move the asyncQueueAdvanceTail call to
the end of PreCommit_Notify.
Given the infrequency of
complaints about failures in this area, I'm not sure that the
notational pain of an actual critical section is justified.But I complained that the changes contemplated in [1] were raising
the probability of failure, and while working on tamping that back
down we decided to do something about this old gripe too.There's a relevant comment in CommitTransaction():
* This is all post-commit cleanup. Note that if an error is raised here,
* it's too late to abort the transaction. This should be just
* noncritical resource releasing.Unfortunately, releasing locks, sending notifies, etc is not all
that "noncritical" if you want the DB to keep functioning well.
But there's a good deal of code in there and making it all obey
the critical-section rules looks painful.
I see why a critical-section is probably too painful. But since the
direction in [1] is to avoid adding new possibly risky operations to
AtCommit_Notify, I don't think it's completely unreasonable to consider
moving some existing ones into PreCommit_Notify, when feasible.
If it's preferable, I'm fine dropping this standalone patch and folding
any such adjustments into v29 in [1], or I can just leave the existing
code untouched?
/Joel
On Mon, Nov 24, 2025, at 22:53, Joel Jacobson wrote:
On Mon, Nov 24, 2025, at 17:06, Tom Lane wrote:
Unfortunately, releasing locks, sending notifies, etc is not all
that "noncritical" if you want the DB to keep functioning well.
But there's a good deal of code in there and making it all obey
the critical-section rules looks painful.I see why a critical-section is probably too painful. But since the
direction in [1] is to avoid adding new possibly risky operations to
AtCommit_Notify, I don't think it's completely unreasonable to consider
moving some existing ones into PreCommit_Notify, when feasible.If it's preferable, I'm fine dropping this standalone patch and folding
any such adjustments into v29 in [1], or I can just leave the existing
code untouched?
With the following three changes, I think the only remaining
potentially-risky code in AtCommit_Notify, is the acquire/release of
locks.
* 0001-async-avoid-pallocs-in-critical-section-v2.patch:
Preallocate signal arrays to avoid pallocs AtCommit.
* 0002-async-avoid-pallocs-in-critical-section-v2.patch:
Move asyncQueueAdvanceTail from AtCommit to PreCommit.
* 0003-async-avoid-pallocs-in-critical-section-v2.patch
Convert listenChannels to hash table.
This is based on Heikki's suggestion
"We really should turn that into a hash table."
from the bug fix thread [2] /messages/by-id/CAK98qZ3wZLE-RZJN_Y+TFjiTRPPFPBwNBpBi5K5CU8hUHkzDpw@mail.gmail.com combined with Tom's idea of a boolean
"is it REALLY listening?"
field [1]/messages/by-id/6899c044-4a82-49be-8117-e6f669765f7e@app.fastmail.com.
Together, these patches allows us to gets rid of the following comments:
0001:
- * XXX in principle these pallocs could fail, which would be bad. Maybe
- * preallocate the arrays? They're not that large, though.
0002:
- * This is (usually) called during CommitTransaction(), so it's important for
- * it to have very low probability of failure.
0003:
- * XXX It is theoretically possible to get an out-of-memory failure here,
- * which would be bad because we already committed. For the moment it
- * doesn't seem worth trying to guard against that, but maybe improve this
- * later.
Please advise if we want these changes, and if so, if they should be
folded into [1]/messages/by-id/6899c044-4a82-49be-8117-e6f669765f7e@app.fastmail.com i.e. closing this thread, or if we want to keep this thread.
/Joel
[1]: /messages/by-id/6899c044-4a82-49be-8117-e6f669765f7e@app.fastmail.com
[2]: /messages/by-id/CAK98qZ3wZLE-RZJN_Y+TFjiTRPPFPBwNBpBi5K5CU8hUHkzDpw@mail.gmail.com
Attachments:
0001-async-avoid-pallocs-in-critical-section-v2.patchapplication/octet-stream; name="=?UTF-8?Q?0001-async-avoid-pallocs-in-critical-section-v2.patch?="Download
From 7ce69b77548c78f615777ab528323c0996463c18 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Tue, 25 Nov 2025 10:09:51 +0100
Subject: [PATCH 1/3] Preallocate signal arrays to avoid pallocs AtCommit
---
src/backend/commands/async.c | 51 +++++++++++++++++++++++++++---------
1 file changed, 38 insertions(+), 13 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index e1cf659485a..6d753079553 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -418,6 +418,12 @@ static bool unlistenExitRegistered = false;
/* True if we're currently registered as a listener in asyncQueueControl */
static bool amRegisteredListener = false;
+/*
+ * Arrays for SignalBackends.
+ */
+static int32 *notifySignalPids = NULL;
+static ProcNumber *notifySignalProcs = NULL;
+
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
static bool tryAdvanceTail = false;
@@ -477,6 +483,27 @@ asyncQueuePagePrecedes(int64 p, int64 q)
return p < q;
}
+/*
+ * initSignalArrays
+ * Lazy initialization of the signal arrays.
+ */
+static void
+initSignalArrays(void)
+{
+ MemoryContext oldcontext;
+
+ if (notifySignalProcs != NULL)
+ return;
+
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+ if (notifySignalPids == NULL)
+ notifySignalPids = (int32 *) palloc(MaxBackends * sizeof(int32));
+ notifySignalProcs = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
/*
* Report space needed for our shared memory area
*/
@@ -902,6 +929,13 @@ PreCommit_Notify(void)
*/
(void) GetCurrentTransactionId();
+ /*
+ * We will be calling SignalBackends() at AtCommit_Notify time, so
+ * make sure its auxiliary data structures exist now, where an ERROR
+ * will still abort the transaction cleanly.
+ */
+ initSignalArrays();
+
/*
* Serialize writers by acquiring a special lock that we hold till
* after commit. This ensures that queue entries appear in commit
@@ -1580,20 +1614,13 @@ asyncQueueFillWarning(void)
static void
SignalBackends(void)
{
- int32 *pids;
- ProcNumber *procnos;
int count;
/*
* Identify backends that we need to signal. We don't want to send
* signals while holding the NotifyQueueLock, so this loop just builds a
* list of target PIDs.
- *
- * XXX in principle these pallocs could fail, which would be bad. Maybe
- * preallocate the arrays? They're not that large, though.
*/
- pids = (int32 *) palloc(MaxBackends * sizeof(int32));
- procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
count = 0;
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
@@ -1624,8 +1651,8 @@ SignalBackends(void)
continue;
}
/* OK, need to signal this one */
- pids[count] = pid;
- procnos[count] = i;
+ notifySignalPids[count] = pid;
+ notifySignalProcs[count] = i;
count++;
}
LWLockRelease(NotifyQueueLock);
@@ -1633,7 +1660,7 @@ SignalBackends(void)
/* Now send signals */
for (int i = 0; i < count; i++)
{
- int32 pid = pids[i];
+ int32 pid = notifySignalPids[i];
/*
* If we are signaling our own process, no need to involve the kernel;
@@ -1651,12 +1678,10 @@ SignalBackends(void)
* NotifyQueueLock; which is unlikely but certainly possible. So we
* just log a low-level debug message if it happens.
*/
- if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
+ if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, notifySignalProcs[i]) < 0)
elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
}
- pfree(pids);
- pfree(procnos);
}
/*
--
2.50.1
0003-async-avoid-pallocs-in-critical-section-v2.patchapplication/octet-stream; name="=?UTF-8?Q?0003-async-avoid-pallocs-in-critical-section-v2.patch?="Download
From 3d9c474bc60dded52df5673724f9e793b58b1bcd Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Tue, 25 Nov 2025 10:11:30 +0100
Subject: [PATCH 3/3] Convert listenChannels to hash table
---
src/backend/commands/async.c | 230 ++++++++++++++++++++++---------
src/tools/pgindent/typedefs.list | 1 +
2 files changed, 168 insertions(+), 63 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index d5676d43a30..ebc06ba771d 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -68,7 +68,7 @@
* CommitTransaction() which will then do the actual transaction commit.
*
* After commit we are called another time (AtCommit_Notify()). Here we
- * make any actual updates to the effective listen state (listenChannels).
+ * make any actual updates to the effective listen state (listenChannelsHash).
* Then we signal any backends that may be interested in our messages
* (including our own backend, if listening). This is done by
* SignalBackends(), which scans the list of listening backends and sends a
@@ -313,16 +313,25 @@ static SlruCtlData NotifyCtlData;
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
/*
- * listenChannels identifies the channels we are actually listening to
- * (ie, have committed a LISTEN on). It is a simple list of channel names,
- * allocated in TopMemoryContext.
+ * listenChannelsHash identifies the channels we are listening to (or will be
+ * after the current transaction commits). Entries with active=true are
+ * committed listens; entries with active=false are pre-allocated for pending
+ * LISTENs that will become active at commit.
+ *
+ * This hash table is allocated in TopMemoryContext.
*/
-static List *listenChannels = NIL; /* list of C strings */
+typedef struct ListenChannelEntry
+{
+ char channel[NAMEDATALEN]; /* hash key - must be first */
+ bool active; /* true if committed listen */
+} ListenChannelEntry;
+
+static HTAB *listenChannelsHash = NULL;
/*
* State for pending LISTEN/UNLISTEN actions consists of an ordered list of
* all actions requested in the current transaction. As explained above,
- * we don't actually change listenChannels until we reach transaction commit.
+ * we don't actually change listenChannelsHash until we reach transaction commit.
*
* The list is kept in CurTransactionContext. In subtransactions, each
* subtransaction has its own list in its own CurTransactionContext, but
@@ -438,7 +447,9 @@ static inline int64 asyncQueuePageDiff(int64 p, int64 q);
static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
-static void Exec_ListenPreCommit(void);
+static void initListenChannelsHash(void);
+static void CleanupInactiveListenChannels(void);
+static void Exec_ListenPreCommit(const char *channel);
static void Exec_ListenCommit(const char *channel);
static void Exec_UnlistenCommit(const char *channel);
static void Exec_UnlistenAllCommit(void);
@@ -504,6 +515,60 @@ initSignalArrays(void)
MemoryContextSwitchTo(oldcontext);
}
+/*
+ * initListenChannelsHash
+ * Lazy initialization of the listen channels hash table.
+ */
+static void
+initListenChannelsHash(void)
+{
+ HASHCTL hash_ctl;
+
+ if (listenChannelsHash != NULL)
+ return;
+
+ memset(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = NAMEDATALEN;
+ hash_ctl.entrysize = sizeof(ListenChannelEntry);
+
+ listenChannelsHash = hash_create("Listen Channels", 64, &hash_ctl,
+ HASH_ELEM | HASH_STRINGS);
+}
+
+/*
+ * CleanupInactiveListenChannels
+ * Remove all entries with active=false from the hash table.
+ * If the hash table becomes empty, destroy it.
+ */
+static void
+CleanupInactiveListenChannels(void)
+{
+ HASH_SEQ_STATUS status;
+ ListenChannelEntry *entry;
+
+ if (listenChannelsHash == NULL)
+ return;
+
+ hash_seq_init(&status, listenChannelsHash);
+
+ while ((entry = hash_seq_search(&status)) != NULL)
+ {
+ if (!entry->active)
+ {
+ if (hash_search(listenChannelsHash, entry->channel,
+ HASH_REMOVE, NULL) == NULL)
+ elog(ERROR, "hash table corrupted");
+ }
+ }
+
+ /* Destroy hash table if now empty */
+ if (hash_get_num_entries(listenChannelsHash) == 0)
+ {
+ hash_destroy(listenChannelsHash);
+ listenChannelsHash = NULL;
+ }
+}
+
/*
* Report space needed for our shared memory area
*/
@@ -709,7 +774,7 @@ Async_Notify(const char *channel, const char *payload)
* Common code for listen, unlisten, unlisten all commands.
*
* Adds the request to the list of pending actions.
- * Actual update of the listenChannels list happens during transaction
+ * Actual update of the listenChannelsHash happens during transaction
* commit.
*/
static void
@@ -809,30 +874,52 @@ Async_UnlistenAll(void)
* SQL function: return a set of the channel names this backend is actively
* listening to.
*
- * Note: this coding relies on the fact that the listenChannels list cannot
+ * Note: this coding relies on the fact that the listenChannelsHash cannot
* change within a transaction.
*/
Datum
pg_listening_channels(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
+ HASH_SEQ_STATUS *status;
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
+ MemoryContext oldcontext;
+
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
+
+ if (listenChannelsHash != NULL)
+ {
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+ status = palloc(sizeof(HASH_SEQ_STATUS));
+ hash_seq_init(status, listenChannelsHash);
+ funcctx->user_fctx = status;
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ funcctx->user_fctx = NULL;
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
+ status = funcctx->user_fctx;
- if (funcctx->call_cntr < list_length(listenChannels))
+ if (status != NULL)
{
- char *channel = (char *) list_nth(listenChannels,
- funcctx->call_cntr);
+ ListenChannelEntry *entry = hash_seq_search(status);
- SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
+ if (entry != NULL)
+ {
+ /*
+ * All entries should be active; inactive ones are cleaned up at
+ * commit/abort
+ */
+ Assert(entry->active);
+ SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(entry->channel));
+ }
}
SRF_RETURN_DONE(funcctx);
@@ -849,6 +936,7 @@ static void
Async_UnlistenOnExit(int code, Datum arg)
{
Exec_UnlistenAllCommit();
+ CleanupInactiveListenChannels();
asyncQueueUnregister();
}
@@ -904,7 +992,7 @@ PreCommit_Notify(void)
switch (actrec->action)
{
case LISTEN_LISTEN:
- Exec_ListenPreCommit();
+ Exec_ListenPreCommit(actrec->channel);
break;
case LISTEN_UNLISTEN:
/* there is no Exec_UnlistenPreCommit() */
@@ -1005,7 +1093,7 @@ PreCommit_Notify(void)
*
* This is called at transaction commit, after committing to clog.
*
- * Update listenChannels and clear transaction-local state.
+ * Update listenChannelsHash and clear transaction-local state.
*
* If we issued any notifications in the transaction, send signals to
* listening backends (possibly including ourselves) to process them.
@@ -1049,8 +1137,11 @@ AtCommit_Notify(void)
}
}
+ /* Clean up inactive entries from the hash table */
+ CleanupInactiveListenChannels();
+
/* If no longer listening to anything, get out of listener array */
- if (amRegisteredListener && listenChannels == NIL)
+ if (amRegisteredListener && listenChannelsHash == NULL)
asyncQueueUnregister();
/*
@@ -1071,11 +1162,22 @@ AtCommit_Notify(void)
* This function must make sure we are ready to catch any incoming messages.
*/
static void
-Exec_ListenPreCommit(void)
+Exec_ListenPreCommit(const char *channel)
{
QueuePosition head;
QueuePosition max;
ProcNumber prevListener;
+ ListenChannelEntry *entry;
+ bool found;
+
+ /*
+ * Pre-allocate the hash table entry for this channel. This happens before
+ * commit, so an OOM error here is safe (causes transaction abort).
+ */
+ initListenChannelsHash();
+ entry = hash_search(listenChannelsHash, channel, HASH_ENTER, &found);
+ if (!found)
+ entry->active = false;
/*
* Nothing to do if we are already listening to something, nor if we
@@ -1085,7 +1187,7 @@ Exec_ListenPreCommit(void)
return;
if (Trace_notify)
- elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
+ elog(DEBUG1, "Exec_ListenPreCommit(%s,%d)", channel, MyProcPid);
/*
* Before registering, make sure we will unlisten before dying. (Note:
@@ -1163,54 +1265,48 @@ Exec_ListenPreCommit(void)
/*
* Exec_ListenCommit --- subroutine for AtCommit_Notify
*
- * Add the channel to the list of channels we are listening on.
+ * Set the channel's hash entry to active.
*/
static void
Exec_ListenCommit(const char *channel)
{
- MemoryContext oldcontext;
+ ListenChannelEntry *entry;
- /* Do nothing if we are already listening on this channel */
- if (IsListeningOn(channel))
- return;
+ if (Trace_notify)
+ elog(DEBUG1, "Exec_ListenCommit(%s,%d)", channel, MyProcPid);
/*
- * Add the new channel name to listenChannels.
- *
- * XXX It is theoretically possible to get an out-of-memory failure here,
- * which would be bad because we already committed. For the moment it
- * doesn't seem worth trying to guard against that, but maybe improve this
- * later.
+ * Find the pre-allocated entry and mark it active. The entry must exist
+ * because Exec_ListenPreCommit created it.
*/
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
- listenChannels = lappend(listenChannels, pstrdup(channel));
- MemoryContextSwitchTo(oldcontext);
+ Assert(listenChannelsHash != NULL);
+ entry = hash_search(listenChannelsHash, channel, HASH_FIND, NULL);
+ Assert(entry != NULL);
+
+ entry->active = true;
}
/*
* Exec_UnlistenCommit --- subroutine for AtCommit_Notify
*
- * Remove the specified channel name from listenChannels.
+ * Mark the specified channel as inactive. We unset active rather than
+ * removing the entry, to avoid needing to allocate memory if a subsequent
+ * LISTEN in the same transaction re-adds it.
*/
static void
Exec_UnlistenCommit(const char *channel)
{
- ListCell *q;
+ ListenChannelEntry *entry;
if (Trace_notify)
elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
- foreach(q, listenChannels)
- {
- char *lchan = (char *) lfirst(q);
+ if (listenChannelsHash == NULL)
+ return;
- if (strcmp(lchan, channel) == 0)
- {
- listenChannels = foreach_delete_current(listenChannels, q);
- pfree(lchan);
- break;
- }
- }
+ entry = hash_search(listenChannelsHash, channel, HASH_FIND, NULL);
+ if (entry != NULL)
+ entry->active = false;
/*
* We do not complain about unlistening something not being listened;
@@ -1226,34 +1322,35 @@ Exec_UnlistenCommit(const char *channel)
static void
Exec_UnlistenAllCommit(void)
{
+ HASH_SEQ_STATUS seq;
+ ListenChannelEntry *entry;
+
if (Trace_notify)
elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
- list_free_deep(listenChannels);
- listenChannels = NIL;
+ if (listenChannelsHash == NULL)
+ return;
+
+ hash_seq_init(&seq, listenChannelsHash);
+ while ((entry = hash_seq_search(&seq)) != NULL)
+ entry->active = false;
}
/*
* Test whether we are actively listening on the given channel name.
*
* Note: this function is executed for every notification found in the queue.
- * Perhaps it is worth further optimization, eg convert the list to a sorted
- * array so we can binary-search it. In practice the list is likely to be
- * fairly short, though.
*/
static bool
IsListeningOn(const char *channel)
{
- ListCell *p;
+ ListenChannelEntry *entry;
- foreach(p, listenChannels)
- {
- char *lchan = (char *) lfirst(p);
+ if (listenChannelsHash == NULL)
+ return false;
- if (strcmp(lchan, channel) == 0)
- return true;
- }
- return false;
+ entry = hash_search(listenChannelsHash, channel, HASH_FIND, NULL);
+ return (entry != NULL && entry->active);
}
/*
@@ -1263,7 +1360,7 @@ IsListeningOn(const char *channel)
static void
asyncQueueUnregister(void)
{
- Assert(listenChannels == NIL); /* else caller error */
+ Assert(listenChannelsHash == NULL); /* else caller error */
if (!amRegisteredListener) /* nothing to do */
return;
@@ -1695,12 +1792,19 @@ SignalBackends(void)
void
AtAbort_Notify(void)
{
+ /*
+ * Clean up any pre-allocated hash entries that weren't committed. These
+ * have active=false and would have been set to active=true in
+ * Exec_ListenCommit if the transaction had committed.
+ */
+ CleanupInactiveListenChannels();
+
/*
* If we LISTEN but then roll back the transaction after PreCommit_Notify,
- * we have registered as a listener but have not made any entry in
- * listenChannels. In that case, deregister again.
+ * we have registered as a listener but have not made any active entry in
+ * listenChannelsHash. In that case, deregister again.
*/
- if (amRegisteredListener && listenChannels == NIL)
+ if (amRegisteredListener && listenChannelsHash == NULL)
asyncQueueUnregister();
/* And clean up */
@@ -2080,7 +2184,7 @@ asyncQueueProcessPageEntries(QueuePosition *current,
* over it on the first LISTEN in a session, and not get stuck on
* it indefinitely.
*/
- if (listenChannels == NIL)
+ if (listenChannelsHash == NULL)
continue;
if (TransactionIdDidCommit(qe->xid))
@@ -2332,7 +2436,7 @@ ProcessIncomingNotify(bool flush)
notifyInterruptPending = false;
/* Do nothing else if we aren't actively listening */
- if (listenChannels == NIL)
+ if (listenChannelsHash == NULL)
return;
if (Trace_notify)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index c751c25a04d..46ea8f2ff8e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1564,6 +1564,7 @@ ListDictionary
ListParsedLex
ListenAction
ListenActionKind
+ListenChannelEntry
ListenStmt
LoInfo
LoadStmt
--
2.50.1
0002-async-avoid-pallocs-in-critical-section-v2.patchapplication/octet-stream; name="=?UTF-8?Q?0002-async-avoid-pallocs-in-critical-section-v2.patch?="Download
From 3ef887301927fe1c4215c8950f0b706b9454c5f0 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Tue, 25 Nov 2025 10:10:51 +0100
Subject: [PATCH 2/3] Move asyncQueueAdvanceTail from AtCommit to PreCommit
---
src/backend/commands/async.c | 33 +++++++++++++++------------------
1 file changed, 15 insertions(+), 18 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 6d753079553..d5676d43a30 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -981,6 +981,21 @@ PreCommit_Notify(void)
LWLockRelease(NotifyQueueLock);
}
+ /*
+ * If it's time to try to advance the global tail pointer, do that.
+ *
+ * (It might seem odd to do this in the sender, when more than likely the
+ * listeners won't yet have read the messages we just sent. However,
+ * there's less contention if only the sender does it, and there is little
+ * need for urgency in advancing the global tail. So this typically will
+ * be clearing out messages that were sent some time ago.)
+ */
+ if (tryAdvanceTail)
+ {
+ tryAdvanceTail = false;
+ asyncQueueAdvanceTail();
+ }
+
/* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
}
}
@@ -1046,21 +1061,6 @@ AtCommit_Notify(void)
if (pendingNotifies != NULL)
SignalBackends();
- /*
- * If it's time to try to advance the global tail pointer, do that.
- *
- * (It might seem odd to do this in the sender, when more than likely the
- * listeners won't yet have read the messages we just sent. However,
- * there's less contention if only the sender does it, and there is little
- * need for urgency in advancing the global tail. So this typically will
- * be clearing out messages that were sent some time ago.)
- */
- if (tryAdvanceTail)
- {
- tryAdvanceTail = false;
- asyncQueueAdvanceTail();
- }
-
/* And clean up */
ClearPendingActionsAndNotifies();
}
@@ -2135,9 +2135,6 @@ asyncQueueProcessPageEntries(QueuePosition *current,
/*
* Advance the shared queue tail variable to the minimum of all the
* per-backend tail pointers. Truncate pg_notify space if possible.
- *
- * This is (usually) called during CommitTransaction(), so it's important for
- * it to have very low probability of failure.
*/
static void
asyncQueueAdvanceTail(void)
--
2.50.1
On Tue, Nov 25, 2025, at 11:15, Joel Jacobson wrote:
With the following three changes, I think the only remaining
potentially-risky code in AtCommit_Notify, is the acquire/release of
locks.
Patch 0001 and 0002 are unchanged since v2.
0003:
Since this thread is specifically about avoiding pallocs in the
effective "critical section", I realize we shouldn't change
listenChannels from a list to hash (in this patch), but just move the
existing potentially-risky code out of AtCommit_Notify.
Thanks to a sharper focus on that, I realized Tom's alternative design
idea from [1]/messages/by-id/6899c044-4a82-49be-8117-e6f669765f7e@app.fastmail.com, to just go ahead and perform the LISTEN/UNLISTEN updates
in PreCommit_Notify, is an excellent simplification, with no real
downsides that I can identify.
This allowed simplifying 0003 a lot, by just doing all LISTEN/UNLISTEN
operations during PreCommit:
1 file changed, 100 insertions(+), 139 deletions(-)
I really hope this approach is acceptable, because then the optimization
patch will become much simpler.
Please advise whether you think this makes sense, and whether we should
keep this thread open or fold the result directly into [1]/messages/by-id/6899c044-4a82-49be-8117-e6f669765f7e@app.fastmail.com.
Patches:
* 0001-async-avoid-pallocs-in-critical-section-v3.patch:
Preallocate signal arrays to avoid pallocs AtCommit
* 0002-async-avoid-pallocs-in-critical-section-v3.patch:
Move asyncQueueAdvanceTail from AtCommit to PreCommit
* 0003-async-avoid-pallocs-in-critical-section-v3.patch:
Execute LISTEN/UNLISTEN during PreCommit
/Joel
[1]: /messages/by-id/6899c044-4a82-49be-8117-e6f669765f7e@app.fastmail.com
Attachments:
0001-async-avoid-pallocs-in-critical-section-v3.patchapplication/octet-stream; name="=?UTF-8?Q?0001-async-avoid-pallocs-in-critical-section-v3.patch?="Download
From 7e96c9e2c556714ab2a6920013c2ce347b5da03e Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Tue, 25 Nov 2025 10:09:51 +0100
Subject: [PATCH 1/3] Preallocate signal arrays to avoid pallocs AtCommit
---
src/backend/commands/async.c | 51 +++++++++++++++++++++++++++---------
1 file changed, 38 insertions(+), 13 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index eb86402cae4..37526c7b726 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -418,6 +418,12 @@ static bool unlistenExitRegistered = false;
/* True if we're currently registered as a listener in asyncQueueControl */
static bool amRegisteredListener = false;
+/*
+ * Arrays for SignalBackends.
+ */
+static int32 *notifySignalPids = NULL;
+static ProcNumber *notifySignalProcs = NULL;
+
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
static bool tryAdvanceTail = false;
@@ -477,6 +483,27 @@ asyncQueuePagePrecedes(int64 p, int64 q)
return p < q;
}
+/*
+ * initSignalArrays
+ * Lazy initialization of the signal arrays.
+ */
+static void
+initSignalArrays(void)
+{
+ MemoryContext oldcontext;
+
+ if (notifySignalProcs != NULL)
+ return;
+
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+ if (notifySignalPids == NULL)
+ notifySignalPids = (int32 *) palloc(MaxBackends * sizeof(int32));
+ notifySignalProcs = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
/*
* Report space needed for our shared memory area
*/
@@ -902,6 +929,13 @@ PreCommit_Notify(void)
*/
(void) GetCurrentTransactionId();
+ /*
+ * We will be calling SignalBackends() at AtCommit_Notify time, so
+ * make sure its auxiliary data structures exist now, where an ERROR
+ * will still abort the transaction cleanly.
+ */
+ initSignalArrays();
+
/*
* Serialize writers by acquiring a special lock that we hold till
* after commit. This ensures that queue entries appear in commit
@@ -1580,20 +1614,13 @@ asyncQueueFillWarning(void)
static void
SignalBackends(void)
{
- int32 *pids;
- ProcNumber *procnos;
int count;
/*
* Identify backends that we need to signal. We don't want to send
* signals while holding the NotifyQueueLock, so this loop just builds a
* list of target PIDs.
- *
- * XXX in principle these pallocs could fail, which would be bad. Maybe
- * preallocate the arrays? They're not that large, though.
*/
- pids = (int32 *) palloc(MaxBackends * sizeof(int32));
- procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
count = 0;
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
@@ -1624,8 +1651,8 @@ SignalBackends(void)
continue;
}
/* OK, need to signal this one */
- pids[count] = pid;
- procnos[count] = i;
+ notifySignalPids[count] = pid;
+ notifySignalProcs[count] = i;
count++;
}
LWLockRelease(NotifyQueueLock);
@@ -1633,7 +1660,7 @@ SignalBackends(void)
/* Now send signals */
for (int i = 0; i < count; i++)
{
- int32 pid = pids[i];
+ int32 pid = notifySignalPids[i];
/*
* If we are signaling our own process, no need to involve the kernel;
@@ -1651,12 +1678,10 @@ SignalBackends(void)
* NotifyQueueLock; which is unlikely but certainly possible. So we
* just log a low-level debug message if it happens.
*/
- if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
+ if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, notifySignalProcs[i]) < 0)
elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
}
- pfree(pids);
- pfree(procnos);
}
/*
--
2.50.1
0002-async-avoid-pallocs-in-critical-section-v3.patchapplication/octet-stream; name="=?UTF-8?Q?0002-async-avoid-pallocs-in-critical-section-v3.patch?="Download
From 7fa517cd8767cd69aa820bb74173880e282e5a38 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Tue, 25 Nov 2025 10:10:51 +0100
Subject: [PATCH 2/3] Move asyncQueueAdvanceTail from AtCommit to PreCommit
---
src/backend/commands/async.c | 33 +++++++++++++++------------------
1 file changed, 15 insertions(+), 18 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 37526c7b726..ea6ea9e09a8 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -981,6 +981,21 @@ PreCommit_Notify(void)
LWLockRelease(NotifyQueueLock);
}
+ /*
+ * If it's time to try to advance the global tail pointer, do that.
+ *
+ * (It might seem odd to do this in the sender, when more than likely the
+ * listeners won't yet have read the messages we just sent. However,
+ * there's less contention if only the sender does it, and there is little
+ * need for urgency in advancing the global tail. So this typically will
+ * be clearing out messages that were sent some time ago.)
+ */
+ if (tryAdvanceTail)
+ {
+ tryAdvanceTail = false;
+ asyncQueueAdvanceTail();
+ }
+
/* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
}
}
@@ -1046,21 +1061,6 @@ AtCommit_Notify(void)
if (pendingNotifies != NULL)
SignalBackends();
- /*
- * If it's time to try to advance the global tail pointer, do that.
- *
- * (It might seem odd to do this in the sender, when more than likely the
- * listeners won't yet have read the messages we just sent. However,
- * there's less contention if only the sender does it, and there is little
- * need for urgency in advancing the global tail. So this typically will
- * be clearing out messages that were sent some time ago.)
- */
- if (tryAdvanceTail)
- {
- tryAdvanceTail = false;
- asyncQueueAdvanceTail();
- }
-
/* And clean up */
ClearPendingActionsAndNotifies();
}
@@ -2131,9 +2131,6 @@ asyncQueueProcessPageEntries(QueuePosition *current,
/*
* Advance the shared queue tail variable to the minimum of all the
* per-backend tail pointers. Truncate pg_notify space if possible.
- *
- * This is (usually) called during CommitTransaction(), so it's important for
- * it to have very low probability of failure.
*/
static void
asyncQueueAdvanceTail(void)
--
2.50.1
0003-async-avoid-pallocs-in-critical-section-v3.patchapplication/octet-stream; name="=?UTF-8?Q?0003-async-avoid-pallocs-in-critical-section-v3.patch?="Download
From 0ab676629c405a030603274872788c11e7508d59 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Tue, 25 Nov 2025 15:42:58 +0100
Subject: [PATCH 3/3] Execute LISTEN/UNLISTEN during PreCommit
---
src/backend/commands/async.c | 239 +++++++++++++++--------------------
1 file changed, 100 insertions(+), 139 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index ea6ea9e09a8..36b83896363 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -438,10 +438,9 @@ static inline int64 asyncQueuePageDiff(int64 p, int64 q);
static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
-static void Exec_ListenPreCommit(void);
-static void Exec_ListenCommit(const char *channel);
-static void Exec_UnlistenCommit(const char *channel);
-static void Exec_UnlistenAllCommit(void);
+static void Exec_Listen(const char *channel);
+static void Exec_Unlisten(const char *channel);
+static void Exec_UnlistenAll(void);
static bool IsListeningOn(const char *channel);
static void asyncQueueUnregister(void);
static bool asyncQueueIsFull(void);
@@ -848,7 +847,7 @@ pg_listening_channels(PG_FUNCTION_ARGS)
static void
Async_UnlistenOnExit(int code, Datum arg)
{
- Exec_UnlistenAllCommit();
+ Exec_UnlistenAll();
asyncQueueUnregister();
}
@@ -877,7 +876,7 @@ AtPrepare_Notify(void)
* If there are pending LISTEN actions, make sure we are listed in the
* shared-memory listener array. This must happen before commit to
* ensure we don't miss any notifies from transactions that commit
- * just after ours.
+ * just after ours. This function also update listenChannels.
*
* If there are outbound notify requests in the pendingNotifies list,
* add them to the global queue. We do that before commit so that
@@ -894,7 +893,7 @@ PreCommit_Notify(void)
if (Trace_notify)
elog(DEBUG1, "PreCommit_Notify");
- /* Preflight for any pending listen/unlisten actions */
+ /* Perform any pending listen/unlisten actions */
if (pendingActions != NULL)
{
foreach(p, pendingActions->actions)
@@ -904,18 +903,22 @@ PreCommit_Notify(void)
switch (actrec->action)
{
case LISTEN_LISTEN:
- Exec_ListenPreCommit();
+ Exec_Listen(actrec->channel);
break;
case LISTEN_UNLISTEN:
- /* there is no Exec_UnlistenPreCommit() */
+ Exec_Unlisten(actrec->channel);
break;
case LISTEN_UNLISTEN_ALL:
- /* there is no Exec_UnlistenAllPreCommit() */
+ Exec_UnlistenAll();
break;
}
}
}
+ /* If no longer listening to anything, get out of listener array */
+ if (amRegisteredListener && listenChannels == NIL)
+ asyncQueueUnregister();
+
/* Queue any pending notifies (must happen after the above) */
if (pendingNotifies)
{
@@ -1005,18 +1008,16 @@ PreCommit_Notify(void)
*
* This is called at transaction commit, after committing to clog.
*
- * Update listenChannels and clear transaction-local state.
- *
* If we issued any notifications in the transaction, send signals to
* listening backends (possibly including ourselves) to process them.
* Also, if we filled enough queue pages with new notifies, try to
* advance the queue tail pointer.
+ *
+ * Finally, clear transaction-local state.
*/
void
AtCommit_Notify(void)
{
- ListCell *p;
-
/*
* Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
* return as soon as possible
@@ -1027,32 +1028,6 @@ AtCommit_Notify(void)
if (Trace_notify)
elog(DEBUG1, "AtCommit_Notify");
- /* Perform any pending listen/unlisten actions */
- if (pendingActions != NULL)
- {
- foreach(p, pendingActions->actions)
- {
- ListenAction *actrec = (ListenAction *) lfirst(p);
-
- switch (actrec->action)
- {
- case LISTEN_LISTEN:
- Exec_ListenCommit(actrec->channel);
- break;
- case LISTEN_UNLISTEN:
- Exec_UnlistenCommit(actrec->channel);
- break;
- case LISTEN_UNLISTEN_ALL:
- Exec_UnlistenAllCommit();
- break;
- }
- }
- }
-
- /* If no longer listening to anything, get out of listener array */
- if (amRegisteredListener && listenChannels == NIL)
- asyncQueueUnregister();
-
/*
* Send signals to listening backends. We need do this only if there are
* pending notifies, which were previously added to the shared queue by
@@ -1066,109 +1041,100 @@ AtCommit_Notify(void)
}
/*
- * Exec_ListenPreCommit --- subroutine for PreCommit_Notify
+ * Exec_Listen --- subroutine for PreCommit_Notify
*
- * This function must make sure we are ready to catch any incoming messages.
+ * This function must make sure we are ready to catch any incoming messages,
+ * and adds the channel to the list of channels we are listening on.
*/
static void
-Exec_ListenPreCommit(void)
+Exec_Listen(const char *channel)
{
QueuePosition head;
QueuePosition max;
ProcNumber prevListener;
+ MemoryContext oldcontext;
+
+ if (Trace_notify)
+ elog(DEBUG1, "Exec_Listen(%s,%d)", channel, MyProcPid);
/*
* Nothing to do if we are already listening to something, nor if we
* already ran this routine in this transaction.
*/
- if (amRegisteredListener)
- return;
-
- if (Trace_notify)
- elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
-
- /*
- * Before registering, make sure we will unlisten before dying. (Note:
- * this action does not get undone if we abort later.)
- */
- if (!unlistenExitRegistered)
+ if (!amRegisteredListener)
{
- before_shmem_exit(Async_UnlistenOnExit, 0);
- unlistenExitRegistered = true;
- }
+ /*
+ * Before registering, make sure we will unlisten before dying. (Note:
+ * this action does not get undone if we abort later.)
+ */
+ if (!unlistenExitRegistered)
+ {
+ before_shmem_exit(Async_UnlistenOnExit, 0);
+ unlistenExitRegistered = true;
+ }
- /*
- * This is our first LISTEN, so establish our pointer.
- *
- * We set our pointer to the global tail pointer and then move it forward
- * over already-committed notifications. This ensures we cannot miss any
- * not-yet-committed notifications. We might get a few more but that
- * doesn't hurt.
- *
- * In some scenarios there might be a lot of committed notifications that
- * have not yet been pruned away (because some backend is being lazy about
- * reading them). To reduce our startup time, we can look at other
- * backends and adopt the maximum "pos" pointer of any backend that's in
- * our database; any notifications it's already advanced over are surely
- * committed and need not be re-examined by us. (We must consider only
- * backends connected to our DB, because others will not have bothered to
- * check committed-ness of notifications in our DB.)
- *
- * We need exclusive lock here so we can look at other backends' entries
- * and manipulate the list links.
- */
- LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
- head = QUEUE_HEAD;
- max = QUEUE_TAIL;
- prevListener = INVALID_PROC_NUMBER;
- for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
- {
- if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
- max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
- /* Also find last listening backend before this one */
- if (i < MyProcNumber)
- prevListener = i;
- }
- QUEUE_BACKEND_POS(MyProcNumber) = max;
- QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
- QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
- /* Insert backend into list of listeners at correct position */
- if (prevListener != INVALID_PROC_NUMBER)
- {
- QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
- QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
- }
- else
- {
- QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
- QUEUE_FIRST_LISTENER = MyProcNumber;
- }
- LWLockRelease(NotifyQueueLock);
+ /*
+ * This is our first LISTEN, so establish our pointer.
+ *
+ * We set our pointer to the global tail pointer and then move it forward
+ * over already-committed notifications. This ensures we cannot miss any
+ * not-yet-committed notifications. We might get a few more but that
+ * doesn't hurt.
+ *
+ * In some scenarios there might be a lot of committed notifications that
+ * have not yet been pruned away (because some backend is being lazy about
+ * reading them). To reduce our startup time, we can look at other
+ * backends and adopt the maximum "pos" pointer of any backend that's in
+ * our database; any notifications it's already advanced over are surely
+ * committed and need not be re-examined by us. (We must consider only
+ * backends connected to our DB, because others will not have bothered to
+ * check committed-ness of notifications in our DB.)
+ *
+ * We need exclusive lock here so we can look at other backends' entries
+ * and manipulate the list links.
+ */
+ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+ head = QUEUE_HEAD;
+ max = QUEUE_TAIL;
+ prevListener = INVALID_PROC_NUMBER;
+ for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+ {
+ if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+ max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
+ /* Also find last listening backend before this one */
+ if (i < MyProcNumber)
+ prevListener = i;
+ }
+ QUEUE_BACKEND_POS(MyProcNumber) = max;
+ QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
+ QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
+ /* Insert backend into list of listeners at correct position */
+ if (prevListener != INVALID_PROC_NUMBER)
+ {
+ QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
+ QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
+ }
+ else
+ {
+ QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
+ QUEUE_FIRST_LISTENER = MyProcNumber;
+ }
+ LWLockRelease(NotifyQueueLock);
- /* Now we are listed in the global array, so remember we're listening */
- amRegisteredListener = true;
+ /* Now we are listed in the global array, so remember we're listening */
+ amRegisteredListener = true;
- /*
- * Try to move our pointer forward as far as possible. This will skip
- * over already-committed notifications, which we want to do because they
- * might be quite stale. Note that we are not yet listening on anything,
- * so we won't deliver such notifications to our frontend. Also, although
- * our transaction might have executed NOTIFY, those message(s) aren't
- * queued yet so we won't skip them here.
- */
- if (!QUEUE_POS_EQUAL(max, head))
- asyncQueueReadAllNotifications();
-}
-
-/*
- * Exec_ListenCommit --- subroutine for AtCommit_Notify
- *
- * Add the channel to the list of channels we are listening on.
- */
-static void
-Exec_ListenCommit(const char *channel)
-{
- MemoryContext oldcontext;
+ /*
+ * Try to move our pointer forward as far as possible. This will skip
+ * over already-committed notifications, which we want to do because they
+ * might be quite stale. Note that we are not yet listening on anything,
+ * so we won't deliver such notifications to our frontend. Also, although
+ * our transaction might have executed NOTIFY, those message(s) aren't
+ * queued yet so we won't skip them here.
+ */
+ if (!QUEUE_POS_EQUAL(max, head))
+ asyncQueueReadAllNotifications();
+ }
/* Do nothing if we are already listening on this channel */
if (IsListeningOn(channel))
@@ -1176,11 +1142,6 @@ Exec_ListenCommit(const char *channel)
/*
* Add the new channel name to listenChannels.
- *
- * XXX It is theoretically possible to get an out-of-memory failure here,
- * which would be bad because we already committed. For the moment it
- * doesn't seem worth trying to guard against that, but maybe improve this
- * later.
*/
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
listenChannels = lappend(listenChannels, pstrdup(channel));
@@ -1188,17 +1149,17 @@ Exec_ListenCommit(const char *channel)
}
/*
- * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
+ * Exec_Unlisten --- subroutine for AtCommit_Notify
*
* Remove the specified channel name from listenChannels.
*/
static void
-Exec_UnlistenCommit(const char *channel)
+Exec_Unlisten(const char *channel)
{
ListCell *q;
if (Trace_notify)
- elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
+ elog(DEBUG1, "Exec_Unlisten(%s,%d)", channel, MyProcPid);
foreach(q, listenChannels)
{
@@ -1219,15 +1180,15 @@ Exec_UnlistenCommit(const char *channel)
}
/*
- * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
+ * Exec_UnlistenAll --- subroutine for PreCommit_Notify
*
* Unlisten on all channels for this backend.
*/
static void
-Exec_UnlistenAllCommit(void)
+Exec_UnlistenAll(void)
{
if (Trace_notify)
- elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
+ elog(DEBUG1, "Exec_UnlistenAll(%d)", MyProcPid);
list_free_deep(listenChannels);
listenChannels = NIL;
@@ -1927,7 +1888,7 @@ asyncQueueReadAllNotifications(void)
*
* What we do guarantee is that we'll see all notifications from
* transactions committing after the snapshot we take here.
- * Exec_ListenPreCommit has already added us to the listener array,
+ * Exec_Listen has already added us to the listener array,
* so no not-yet-committed messages can be removed from the queue
* before we see them.
*----------
--
2.50.1
On Tue, Nov 25, 2025, at 16:43, Joel Jacobson wrote:
On Tue, Nov 25, 2025, at 11:15, Joel Jacobson wrote:
With the following three changes, I think the only remaining
potentially-risky code in AtCommit_Notify, is the acquire/release of
locks.Patch 0001 and 0002 are unchanged since v2.
0003:
Since this thread is specifically about avoiding pallocs in the
effective "critical section", I realize we shouldn't change
listenChannels from a list to hash (in this patch), but just move the
existing potentially-risky code out of AtCommit_Notify.Thanks to a sharper focus on that, I realized Tom's alternative design
idea from [1], to just go ahead and perform the LISTEN/UNLISTEN updates
in PreCommit_Notify, is an excellent simplification, with no real
downsides that I can identify.This allowed simplifying 0003 a lot, by just doing all LISTEN/UNLISTEN
operations during PreCommit:
Darn, I forgot about the edge-case where something fails in xact.c
in between PreCommit_Notify(); and AtCommit_Notify();
causing an abort.
In this case, AtAbort_Notify() would need to undo the effect
of the LISTEN/UNLISTEN commands that would already have been
executed, with the 0003 patch.
A normal BEGIN; LISTEN foo; ROLLBCK; is not a problem though,
since that will never reach PreCommit_Notify().
Just curious, what type of problems could cause an abort between
PreCommit_Notify and AtCommit_Notify?
I wonder if we can think of a clever and cheap way to do the
cleanup in AtAbort_Notify(), without needing the boolean flag?
I still believe 0001 and 0002 are correct though.
/Joel
Hi,
On Tue, Nov 25, 2025 at 9:14 PM Joel Jacobson <joel@compiler.org> wrote:
...
Darn, I forgot about the edge-case where something fails in xact.c
in between PreCommit_Notify(); and AtCommit_Notify();
causing an abort.In this case, AtAbort_Notify() would need to undo the effect
of the LISTEN/UNLISTEN commands that would already have been
executed, with the 0003 patch.A normal BEGIN; LISTEN foo; ROLLBCK; is not a problem though,
since that will never reach PreCommit_Notify().Just curious, what type of problems could cause an abort between
PreCommit_Notify and AtCommit_Notify?
Serializable conflict could cause an abort after PreCommit_Notify.
0002 in [0]/messages/by-id/CAE7r3MJgJj4D_6mPHMr-4xCrYK7q04M3jM1J_=4baphjA2WeBA@mail.gmail.com is an example of how it can be reproduced.
[0]: /messages/by-id/CAE7r3MJgJj4D_6mPHMr-4xCrYK7q04M3jM1J_=4baphjA2WeBA@mail.gmail.com
Best regards,
Arseniy Mukhin