Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

Started by Rishu Bagga6 months ago25 messages
#1Rishu Bagga
rishu.postgres@gmail.com
1 attachment(s)

Hi all,

There’s been some renewed attention on improving the performance of the
LISTEN/NOTIFY system, which historically hasn’t scaled well under high
notify frequency. Joel Jacobson recently shared some work on optimizing
the LISTEN path [1]/messages/by-id/6899c044-4a82-49be-8117-e6f669765f7e@app.fastmail.com, and I wanted to follow up with a proposal focused on
the NOTIFY side.

One of the main bottlenecks in the current implementation is the global
lock taken in `PreCommit_Notify`, which serializes all notifications.
In many use cases (especially where NOTIFY is used for non–mission-critical
caching or coordination), users may not care about strict notification
ordering or delivery semantics in the event of a transaction rollback.

To explore this further, I’ve drafted a patch that introduces a new GUC:
`publish_out_of_order_notifications`. When enabled, this skips the global
lock in `PreCommit_Notify`, allowing notifications to be queued in parallel.
This comes at the cost of possible out-of-order delivery and the potential
for notifications to be delivered from rolled-back transactions.

For benchmarking, I used pgbench with a custom SQL script that sends a
single NOTIFY message per transaction. The test was run using 8 connections
and 2000 transactions per client.

Here are the results on a MacBook Air (Apple M2 chip, 8 cores, 16 GB memory):

publish_out_of_order_notifications = off:

• Run 1: 158,190 TPS (latency: 0.051 ms)
• Run 2: 189,771 TPS (latency: 0.042 ms)
• Run 3: 189,401 TPS (latency: 0.042 ms)
• Run 4: 190,288 TPS (latency: 0.042 ms)
• Run 5: 185,001 TPS (latency: 0.043 ms)

publish_out_of_order_notifications = on:

• Run 1: 298,982 TPS (latency: 0.027 ms)
• Run 2: 345,162 TPS (latency: 0.023 ms)
• Run 3: 351,309 TPS (latency: 0.023 ms)
• Run 4: 333,035 TPS (latency: 0.024 ms)
• Run 5: 353,834 TPS (latency: 0.023 ms)

This shows roughly a 2x improvement in TPS in this basic benchmark.

I believe this could serve as a useful knob for users who want performance
over guarantees, and it may help guide future efforts to reduce contention
in NOTIFY more generally. I also have some ideas for stricter-but-faster
implementations that preserve ordering, but I wanted to start with a
minimal and illustrative patch.

I'd appreciate thoughts on the direction and whether this seems worth
pursuing further.

Relevant prior discussions:
[1]: /messages/by-id/6899c044-4a82-49be-8117-e6f669765f7e@app.fastmail.com
[2]: /messages/by-id/CAM527d_s8coiXDA4xbJRyVOcNnnjnf+ezPYpn214y3-5ixn75w@mail.gmail.com

Thanks,
Rishu

Attachments:

0001-allow-out-of-order-notifications.patchapplication/octet-stream; name=0001-allow-out-of-order-notifications.patchDownload
From 0f4a1aa14cb6d8378f3de6f863a49e80829d797e Mon Sep 17 00:00:00 2001
From: Rishu Bagga <rishu@WA-TP-1104.localdomain>
Date: Thu, 17 Jul 2025 17:10:26 -0700
Subject: [PATCH] add option to allow out of order notifications for
 performance

---
 src/backend/commands/async.c        | 17 +++++++++++++++--
 src/backend/utils/misc/guc_tables.c | 13 ++++++++++++-
 src/include/commands/async.h        |  1 +
 3 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..293449321af 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -150,6 +150,7 @@
 #include "utils/ps_status.h"
 #include "utils/snapmgr.h"
 #include "utils/timestamp.h"
+#include "utils/guc_tables.h"
 
 
 /*
@@ -423,6 +424,7 @@ static bool tryAdvanceTail = false;
 
 /* GUC parameters */
 bool		Trace_notify = false;
+bool		publish_notifications_out_of_order = false;
 
 /* For 8 KB pages this gives 8 GB of disk space */
 int			max_notify_queue_pages = 1048576;
@@ -919,9 +921,20 @@ PreCommit_Notify(void)
 		 * (Historical note: before PG 9.0, a similar lock on "database 0" was
 		 * used by the flatfiles mechanism.)
 		 */
-		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
-						 AccessExclusiveLock);
 
+		/*
+		 * Allow notifications to be written out of commit order
+		 * for performance gains. Note that some notifications
+		 * may be published even if the transaction fails
+		 * later on.
+		 */
+		if (!publish_notifications_out_of_order)
+		{
+			LockSharedObject(DatabaseRelationId, InvalidOid, 0,
+							AccessExclusiveLock);
+	
+		}
+	
 		/* Now push the notifications into the queue */
 		nextNotify = list_head(pendingNotifies->events);
 		while (nextNotify != NULL)
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index d14b1678e7f..400face33dd 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -638,7 +638,6 @@ char	   *role_string;
 /* should be static, but guc.c needs to get at this */
 bool		in_hot_standby_guc;
 
-
 /*
  * Displayable names for context types (enum GucContext)
  *
@@ -1549,6 +1548,18 @@ struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"publish_notifications_out_of_order", PGC_USERSET, DEVELOPER_OPTIONS,
+			gettext_noop("Allows NOTIFY messages to be published out of order for increased performance."),
+			gettext_noop("When enabled, NOTIFY messages may be delivered to listeners out of order, which can improve performance " 
+						"but may break applications that rely on strict ordering."),
+			GUC_NOT_IN_SAMPLE
+		},
+		&publish_notifications_out_of_order,
+		false,
+		NULL, NULL, NULL
+	},
+
 #ifdef LOCK_DEBUG
 	{
 		{"trace_locks", PGC_SUSET, DEVELOPER_OPTIONS,
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..6e7981d9981 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -16,6 +16,7 @@
 #include <signal.h>
 
 extern PGDLLIMPORT bool Trace_notify;
+extern PGDLLIMPORT bool publish_notifications_out_of_order;
 extern PGDLLIMPORT int max_notify_queue_pages;
 extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending;
 
-- 
2.47.1

#2Tom Lane
tgl@sss.pgh.pa.us
In reply to: Rishu Bagga (#1)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

Rishu Bagga <rishu.postgres@gmail.com> writes:

To explore this further, I’ve drafted a patch that introduces a new GUC:
`publish_out_of_order_notifications`.

We have generally found that GUCs that change query semantics turn
out to be very regrettable choices. They break applications that
aren't expecting it, and the granularity of the effect is frequently
not what you want either.

In the case at hand, I fear that a USERSET GUC is particularly
inappropriate, because what this proposal does is to break the
notification order guarantees system-wide, even if only one
issuer of notifications has it set.

When enabled, this skips the global
lock in `PreCommit_Notify`, allowing notifications to be queued in parallel.

How much does that really help, considering that we'll still serialize
on the NotifyQueueLock? I think that you'd need some rather
fundamental redesign to allow truly parallel queueing.

Stepping back a bit, my recollection is that "queue entries appear in
commit order" is a rather fundamental assumption in async.c, which we
rely on while dequeuing notifications. If that stops being true,
I think you'd have cases where listening backends fail to collect
available (committed) notifications because they appear in the queue
beyond not-yet-committed notifications. Maybe the window in which a
notification would remain uncommitted is short enough that we could
avert our eyes from that problem, but I'm not convinced.

So I sympathize with concerns about how well the notification code
scales, but I think you're going to have to do a ton more work than
this to get to anything that would pass muster to get committed.
In particular, I'd really want to see something that involves
explicitly opting-into out-of-order delivery on a per-NOTIFY basis,
because anything else will break too many applications. The
underlying queue mechanism is going to need a serious rethink, too.
My guess is that we'd need to move to something involving multiple
queues rather than just one, and I'm not very sure how that ought
to work. (But perhaps queuing in-order notifications separately
from not-guaranteed-in-order notifications would help? Or maybe
the latter should be transmitted in some other way entirely.)

regards, tom lane

#3Joel Jacobson
joel@compiler.org
In reply to: Tom Lane (#2)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Fri, Jul 18, 2025, at 03:28, Tom Lane wrote:

So I sympathize with concerns about how well the notification code
scales, but I think you're going to have to do a ton more work than
this to get to anything that would pass muster to get committed.
In particular, I'd really want to see something that involves
explicitly opting-into out-of-order delivery on a per-NOTIFY basis,
because anything else will break too many applications. The
underlying queue mechanism is going to need a serious rethink, too.
My guess is that we'd need to move to something involving multiple
queues rather than just one, and I'm not very sure how that ought
to work. (But perhaps queuing in-order notifications separately
from not-guaranteed-in-order notifications would help? Or maybe
the latter should be transmitted in some other way entirely.)

I agree opting-into out-of-order delivery on a per-NOTIFY basis
sounds like a great idea.

For all the exiting users that rely on in-order delivery,
and those who are not really sure they would dare to make the switch,
do we want to try to do something to improve their use-cases?

It doesn't seem possible to do better than what we already do
when all backends listen on the same channel.

However, for cases when up to K backends listen on the same channel (multicast),
my patch and benchmark in the other thread suggests we can achieve massive
improvements of parallelization of both LISTEN/UNLISTEN as well as NOTIFY
without introducing that much extra complexity.

Do we find this goal worth pursuing on its own?

Or should we only focus on exposing a new third parameter to pg_notify()
to indicate out-of-order delivery?

/Joel

#4Joel Jacobson
joel@compiler.org
In reply to: Rishu Bagga (#1)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Fri, Jul 18, 2025, at 02:49, Rishu Bagga wrote:

Hi all,

There’s been some renewed attention on improving the performance of the
LISTEN/NOTIFY system, which historically hasn’t scaled well under high
notify frequency. Joel Jacobson recently shared some work on optimizing
the LISTEN path [1], and I wanted to follow up with a proposal focused on
the NOTIFY side.

To clarify, my patch optimizes parallelizability of NOTIFY, without degrading parallelizability of LISTEN/UNLISTEN.

/Joel

#5Joel Jacobson
joel@compiler.org
In reply to: Joel Jacobson (#3)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Fri, Jul 18, 2025, at 08:46, Joel Jacobson wrote:

However, for cases when up to K backends listen on the same channel (multicast),
my patch and benchmark in the other thread suggests we can achieve massive
improvements of parallelization of both LISTEN/UNLISTEN as well as NOTIFY
without introducing that much extra complexity.

Do we find this goal worth pursuing on its own?

Or should we only focus on exposing a new third parameter to pg_notify()
to indicate out-of-order delivery?

I guess the best would be if we could do both, i.e. improve existing use-cases as well as out-of-order delivery per notification, within acceptable levels of increased complexity?

One thing I wonder though, that I haven't yet benchmarked, is if even more parallelization than what my in-order optimizations of NOTIFY already achives, would actually significantly improve parallelization of real workloads, where you do some actual DML in the same txn that you send a NOTIFY. My in-order optimizations now scale to 3000 tps at 1000 connections. I wonder if PostgreSQL really can push that much DML tps, or if the serialization effect of LISTEN/NOTIFY would be marginal to other serialization caused by the DML.

/Joel

#6Joel Jacobson
joel@compiler.org
In reply to: Joel Jacobson (#4)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Fri, Jul 18, 2025, at 10:15, Joel Jacobson wrote:

On Fri, Jul 18, 2025, at 02:49, Rishu Bagga wrote:

Hi all,

There’s been some renewed attention on improving the performance of the
LISTEN/NOTIFY system, which historically hasn’t scaled well under high
notify frequency. Joel Jacobson recently shared some work on optimizing
the LISTEN path [1], and I wanted to follow up with a proposal focused on
the NOTIFY side.

To clarify, my patch optimizes parallelizability of NOTIFY, without
degrading parallelizability of LISTEN/UNLISTEN.

I realize my above clarification is not technically true,
what I meant to say is:

My patch improves NOTIFY TPS when many backends are listening on multiple
channels by eliminating unnecessary syscall wake‑ups, but it doesn't increase
the internal parallelism of the NOTIFY queue itself.

/Joel

#7Tom Lane
tgl@sss.pgh.pa.us
In reply to: Joel Jacobson (#6)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

"Joel Jacobson" <joel@compiler.org> writes:

My patch improves NOTIFY TPS when many backends are listening on multiple
channels by eliminating unnecessary syscall wake‑ups, but it doesn't increase
the internal parallelism of the NOTIFY queue itself.

After thinking about this for awhile, I have a rough idea of
something we could do to improve parallelism of NOTIFY.
As a bonus, this'd allow processes on hot standby servers to
receive NOTIFYs from processes on the primary, which is a
feature many have asked for.

The core thought here was to steal some implementation ideas
from two-phase commit. I initially thought maybe we could
remove the SLRU queue entirely, and maybe we can still find
a way to do that, but in this sketch it's still there with
substantially reduced traffic.

The idea basically is to use the WAL log rather than SLRU
as transport for notify messages.

1. In PreCommit_Notify(), gather up all the notifications this
transaction has emitted, and write them into a WAL log message.
Remember the LSN of this message. (I think this part should be
parallelizable, because of work that's previously been done to
allow parallel writes to WAL.)

2. When writing the transaction's commit WAL log entry, include
the LSN of the previous notify-data entry.

3. Concurrently with writing the commit entry, send a message
to the notify SLRU queue. This would be a small fixed-size
message with the transaction's XID, database ID, and the LSN
of the notify-data WAL entry. (The DBID is there to let
listeners quickly ignore traffic from senders in other DBs.)

4. Signal listening backends to check the queue, as we do now.

5. Listeners read the SLRU queue and then, if in same database,
pull the notify data out of the WAL log. (I believe we already
have enough infrastructure to make that cheap, because 2-phase
commit does it too.)

In the simplest implementation of this idea, step 3 would still
require a global lock, to ensure that SLRU entries are made in
commit order. However, that lock only needs to be held for the
duration of step 3, which is much shorter than what happens now.

A more advanced idea could be to send the SLRU message in step 1, as
soon as we've pushed out the notify-data message. In this approach,
listening backends would become responsible for figuring out whether
senders have committed yet and processing the messages in correct
commit order. This is quite handwavy yet because I don't have a
clear idea of how they'd do that reliably, but maybe it's possible.

In a hot standby server, the WAL replay process would simply have to
send the proper SLRU message and issue signals when it sees a commit
message containing a notify-data LSN. (One small detail to be worked
out is who's responsible for truncating the notify SLRU in a hot
standby server. In current usage the sending backends do it, but
there won't be any in hot standby, and there aren't necessarily any
listeners either.)

An area that needs a bit of thought is how to ensure that we don't
truncate away WAL that contains still-unread notify messages.
We have mechanisms already to prevent too-soon truncation of WAL,
so I doubt there's anything too difficult here. (Also note that
we have an existing unsolved problem of preventing CLOG truncation
while the notify SLRU still contains references to some old XID.
Perhaps that could be dealt with at the same time.)

This isn't something I'm likely to work on anytime soon, but
perhaps someone else would like to push on these ideas.

regards, tom lane

#8Joel Jacobson
joel@compiler.org
In reply to: Tom Lane (#7)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Fri, Jul 18, 2025, at 19:06, Tom Lane wrote:

"Joel Jacobson" <joel@compiler.org> writes:

My patch improves NOTIFY TPS when many backends are listening on multiple
channels by eliminating unnecessary syscall wake‑ups, but it doesn't increase
the internal parallelism of the NOTIFY queue itself.

After thinking about this for awhile, I have a rough idea of
something we could do to improve parallelism of NOTIFY.
As a bonus, this'd allow processes on hot standby servers to
receive NOTIFYs from processes on the primary, which is a
feature many have asked for.

LISTEN/NOTIFY on standbys sounds interesting on its own.

However, I don't think reducing the time we hold the exclusive lock,
would do anything at all, to help the users who have been
reporting problems with LISTEN/NOTIFY. I'll explain why I think so.

I assume Rishu in his original post, with "renewed attention"
was referring to the post "Postgres LISTEN/NOTIFY does not scale" [1] https://www.recall.ai/blog/postgres-listen-notify-does-not-scale
that was on the front page of Hacker News with 319 comments [2] https://news.ycombinator.com/item?id=44490510.

I think the reported "still waiting for AccessExclusiveLock"
they saw in the logs, is probably just a *symptom* but not the *cause*
of their problems.

Unfortunately, the author of [1] https://www.recall.ai/blog/postgres-listen-notify-does-not-scale jumped to conclusion and assumed
the global lock was the problem. I'm quite sure it is probably not,
because:

We know for sure, that current users do LISTEN and NOTIFY
in the same database. And there is no point in doing NOTIFY
unless you also do LISTEN.

Their plots show an y-axis with a few hundred "active sessions".
If we assume at least ~100 of them would be listening backends,
that would explain their problems, due to the syscall thundering
herd wake-up bomb, that each NOTIFY currently causes.

So instead of saying
"Postgres LISTEN/NOTIFY does not scale",
like in the article [1] https://www.recall.ai/blog/postgres-listen-notify-does-not-scale, I think it would be much more fair and meaningful to say
"Postgres LISTEN/NOTIFY does not scale, with the number of listening backends".

All my benchmarks support this hypothesis. I've already posted a lot of them,
but can of course provide more specific additional benchmarks if desired.

/Joel

[1]:  https://www.recall.ai/blog/postgres-listen-notify-does-not-scale
[2]:  https://news.ycombinator.com/item?id=44490510

#9Rishu Bagga
rishu.postgres@gmail.com
In reply to: Joel Jacobson (#8)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

"Joel Jacobson" <joel@compiler.org> writes:

Unfortunately, the author of [1] jumped to conclusion and assumed
the global lock was the problem. I'm quite sure it is probably not,
because:

We know for sure, that current users do LISTEN and NOTIFY
in the same database. And there is no point in doing NOTIFY
unless you also do LISTEN.

I agree that, in practice, there is likely no use of NOTIFY without
LISTEN. That being said, I disagree that the global lock is not at
least one of the bottlenecks from a performance perspective.

The global lock is not released until after the call to
RecordTransactionCommit and the reset of CommitTransaction. So if
there are multiple transactions sending notifications, each must
become durable before the next can proceed, which introduces a
significant bottleneck.

This becomes especially expensive in environments where compute and
storage are separated, such as certain cloud-based variants of Postgres.

Tom Lane <tgl@sss.pgh.pa.us> writes:

1. In PreCommit_Notify(), gather up all the notifications this
transaction has emitted, and write them into a WAL log message.
Remember the LSN of this message. (I think this part should be
parallelizable, because of work that's previously been done to
allow parallel writes to WAL.)

2. When writing the transaction's commit WAL log entry, include
the LSN of the previous notify-data entry.

Thanks for the input — this is a compelling idea. I'll work on
implementing a proof of concept.

Thanks,
Rishu

#10Joel Jacobson
joel@compiler.org
In reply to: Rishu Bagga (#9)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Mon, Jul 21, 2025, at 00:06, Rishu Bagga wrote:

"Joel Jacobson" <joel@compiler.org> writes:

Unfortunately, the author of [1] jumped to conclusion and assumed
the global lock was the problem. I'm quite sure it is probably not,
because:

We know for sure, that current users do LISTEN and NOTIFY
in the same database. And there is no point in doing NOTIFY
unless you also do LISTEN.

I agree that, in practice, there is likely no use of NOTIFY without
LISTEN. That being said, I disagree that the global lock is not at
least one of the bottlenecks from a performance perspective.

The global lock is not released until after the call to
RecordTransactionCommit and the reset of CommitTransaction. So if
there are multiple transactions sending notifications, each must
become durable before the next can proceed, which introduces a
significant bottleneck.

This becomes especially expensive in environments where compute and
storage are separated, such as certain cloud-based variants of Postgres.

Can you please share the benchmark script and pgbench commands from your initial
post? When you skipped the global lock, TPS went from max 190k to max 350k.
Based how high these TPS are, I would guess you're not doing LISTEN in your
benchmark, but only NOTIFY?

Since there is no point of just doing NOTIFY if nobody is LISTENing,
a realistic benchmark would also need to do LISTEN.
What you will then see is that TPS will be severely impacted,
and the gains from removing the global exclusive lock will
drown in the huge cost for all the syscalls.

On my end, I will work on reducing the expensive syscalls,
which I believe is the current bottleneck.
Once that's fixed though, the next bottleneck might be the global
exclusive lock, so I think it's great your working on improving that as well.

/Joel

#11Joel Jacobson
joel@compiler.org
In reply to: Joel Jacobson (#10)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Mon, Jul 21, 2025, at 08:16, Joel Jacobson wrote:

Since there is no point of just doing NOTIFY if nobody is LISTENing,
a realistic benchmark would also need to do LISTEN.
What you will then see is that TPS will be severely impacted,
and the gains from removing the global exclusive lock will
drown in the huge cost for all the syscalls.

I thought I better put my money where my mouth is,
and decided to try to replicate Rishu's benchmark results,
and in addition, benchmark my own hypothesis above,
that if not just doing NOTIFY but both LISTEN+NOTIFY,
the 2x improvement would be heavily reduced.

The benchmarks I've run below confirm this hypothesis.
The observed 2x improvement is reduced to a 0.14x
improvement when doing both LISTEN+NOTIFY.

Benchmark from original post:

Here are the results on a MacBook Air (Apple M2 chip, 8 cores, 16 GB memory):

publish_out_of_order_notifications = off:

• Run 1: 158,190 TPS (latency: 0.051 ms)
• Run 2: 189,771 TPS (latency: 0.042 ms)
• Run 3: 189,401 TPS (latency: 0.042 ms)
• Run 4: 190,288 TPS (latency: 0.042 ms)
• Run 5: 185,001 TPS (latency: 0.043 ms)

publish_out_of_order_notifications = on:

• Run 1: 298,982 TPS (latency: 0.027 ms)
• Run 2: 345,162 TPS (latency: 0.023 ms)
• Run 3: 351,309 TPS (latency: 0.023 ms)
• Run 4: 333,035 TPS (latency: 0.024 ms)
• Run 5: 353,834 TPS (latency: 0.023 ms)

This shows roughly a 2x improvement in TPS in this basic benchmark.

# Benchmarks on my MacBook Pro (Apple M3 Max, 16 cores, 128 GB memory)

## NOTIFY only

~160k TPS master (HEAD)
~340k TPS (0001-allow-out-of-order-notifications.patch)
=> ~125% improvement

### master (HEAD)

% cat notify_common.sql
NOTIFY channel_common;

% for n in `seq 1 5` ; do pgbench -f notify_common.sql -c 8 -t 2000 -n | grep -E '^(latency|tps)' ; done
latency average = 0.052 ms
tps = 154326.941626 (without initial connection time)
latency average = 0.049 ms
tps = 162334.368215 (without initial connection time)
latency average = 0.050 ms
tps = 160703.883008 (without initial connection time)
latency average = 0.048 ms
tps = 165296.086615 (without initial connection time)
latency average = 0.049 ms
tps = 163706.310878 (without initial connection time)

### 0001-allow-out-of-order-notifications.patch

% cat notify_common.sql
NOTIFY channel_common;

% for n in `seq 1 5` ; do PGOPTIONS='-c publish_notifications_out_of_order=true' pgbench -f notify_common.sql -c 8 -t 2000 -n | grep -E '^(latency|tps)' ; done

latency average = 0.026 ms
tps = 310149.647205 (without initial connection time)
latency average = 0.021 ms
tps = 380427.029340 (without initial connection time)
latency average = 0.025 ms
tps = 320108.837005 (without initial connection time)
latency average = 0.024 ms
tps = 333500.083375 (without initial connection time)
latency average = 0.022 ms
tps = 357965.859006 (without initial connection time)

## LISTEN+NOTIFY ####

~73k TPS master (HEAD)
~83k TPS (0001-allow-out-of-order-notifications.patch)
=> ~14% improvement

### master (HEAD)

% cat listen_notify_common.sql
LISTEN channel_common;
NOTIFY channel_common;

% for n in `seq 1 5` ; do pgbench -f listen_notify_common.sql -c 8 -t 2000 -n | grep -E '^(latency|tps)' ; done
latency average = 0.112 ms
tps = 71677.201722 (without initial connection time)
latency average = 0.109 ms
tps = 73228.220325 (without initial connection time)
latency average = 0.109 ms
tps = 73310.423826 (without initial connection time)
latency average = 0.108 ms
tps = 73995.625009 (without initial connection time)
latency average = 0.113 ms
tps = 70970.431944 (without initial connection time)

### 0001-allow-out-of-order-notifications.patch

% for n in `seq 1 5` ; do PGOPTIONS='-c publish_notifications_out_of_order=true' pgbench -f listen_notify_common.sql -c 8 -t 2000 -n | grep -E '^(latency|tps)' ; done
latency average = 0.098 ms
tps = 81620.992919 (without initial connection time)
latency average = 0.095 ms
tps = 84173.755675 (without initial connection time)
latency average = 0.096 ms
tps = 83634.329802 (without initial connection time)
latency average = 0.095 ms
tps = 84311.700356 (without initial connection time)
latency average = 0.096 ms
tps = 83340.278357 (without initial connection time)

For a normal PostgreSQL with the CPU and storage on the same physical machine,
I think the results above clearly demonstrate that the global exclusive lock
is at least not the bottleneck, which I strongly believe instead is the flood of
unnecessary kill(pid, SIGUSR1) syscalls.

If anyone with access to a cloud environment, with compute and storage
separated, like suggested by Rishu, it would be interesting to see
what benchmark results you get.

/Joel

#12Joel Jacobson
joel@compiler.org
In reply to: Joel Jacobson (#11)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Tue, Jul 22, 2025, at 14:48, Joel Jacobson wrote:

Benchmark from original post:

...

For a normal PostgreSQL with the CPU and storage on the same physical machine,
I think the results above clearly demonstrate that the global exclusive lock
is at least not the bottleneck, which I strongly believe instead is the flood of
unnecessary kill(pid, SIGUSR1) syscalls.

I was wrong here. This is much more complex than I initially thought.

After some additional benchmarking and analyzing perf results,
I realize the bottleneck depends on the workload,
which is either the kill() syscalls *or* the heavyweight lock.

Here is one scenario where the heavyweight lock actually *is* the bottleneck:

1 session does LISTEN
pgbench -f notify.sql -c 1000 -j 8 -T 60 -n

Simply commenting out the heavyweight lock gives a dramatic difference:
tps = 7679 (with heavyweight lock; in commit order)
tps = 95430 (without heavyweight lock; not in commit order)

My conclusion so far is that we would greatly benefit both from
reducing/eliminating kill() syscalls, as well as finding ways to avoid
the heavyweight lock while preserving commit order.

/Joel

#13Rishu Bagga
rishu.postgres@gmail.com
In reply to: Tom Lane (#7)
1 attachment(s)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Fri, Jul 18, 2025 at 10:06 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:

After thinking about this for awhile, I have a rough idea of
something we could do to improve parallelism of NOTIFY.
As a bonus, this'd allow processes on hot standby servers to
receive NOTIFYs from processes on the primary, which is a
feature many have asked for.

The core thought here was to steal some implementation ideas
from two-phase commit. I initially thought maybe we could
remove the SLRU queue entirely, and maybe we can still find
a way to do that, but in this sketch it's still there with
substantially reduced traffic.

The idea basically is to use the WAL log rather than SLRU
as transport for notify messages.

1. In PreCommit_Notify(), gather up all the notifications this
transaction has emitted, and write them into a WAL log message.
Remember the LSN of this message. (I think this part should be
parallelizable, because of work that's previously been done to
allow parallel writes to WAL.)

2. When writing the transaction's commit WAL log entry, include
the LSN of the previous notify-data entry.

3. Concurrently with writing the commit entry, send a message
to the notify SLRU queue. This would be a small fixed-size
message with the transaction's XID, database ID, and the LSN
of the notify-data WAL entry. (The DBID is there to let
listeners quickly ignore traffic from senders in other DBs.)

4. Signal listening backends to check the queue, as we do now.

5. Listeners read the SLRU queue and then, if in same database,
pull the notify data out of the WAL log. (I believe we already
have enough infrastructure to make that cheap, because 2-phase
commit does it too.)

In the simplest implementation of this idea, step 3 would still
require a global lock, to ensure that SLRU entries are made in
commit order. However, that lock only needs to be held for the
duration of step 3, which is much shorter than what happens now.

Attached is an initial patch that implements this idea.

There is still some
work to be done around how to handle truncation / vacuum for the new
approach, and testing replication of notifications onto a reader instance.

That being said, I ran some basic benchmarking to stress concurrent
notifications.

With the following sql script, I ran
pgbench -T 100 -c 100 -j 8 -f pgbench_transaction_notify.sql -d postgres

BEGIN;
INSERT INTO test VALUES(1);
NOTIFY benchmark_channel, 'transaction_completed';
COMMIT;

With the patch 3 runs showed the following TPS:

tps = 66372.705917
tps = 63445.909465
tps = 64412.544339

Without the patch, we got the following TPS:

tps = 30212.390982
tps = 30908.865812
tps = 29191.388601

So, there is about a 2x increase in TPS at 100 connections, which establishes
some promise in the approach.

Additionally, this would help solve the issue being discussed in a
separate thread [1]/messages/by-id/CAK98qZ3wZLE-RZJN_Y%25 2BTFjiTRPPFPBwNBpBi5K5CU8hUHkzDpw%40mail.gmail.com,
where listeners currently rely on the transaction log to verify if a
transaction that it reads
has indeed committed, but it is possible that the portion of the
transaction log has
been truncated by vacuum.

Would appreciate any thoughts on the direction of this patch.

Thanks, Rishu

[1]: /messages/by-id/CAK98qZ3wZLE-RZJN_Y%25 2BTFjiTRPPFPBwNBpBi5K5CU8hUHkzDpw%40mail.gmail.com
2BTFjiTRPPFPBwNBpBi5K5CU8hUHkzDpw%40mail.gmail.com

Attachments:

notify-through-wal.patchapplication/octet-stream; name=notify-through-wal.patchDownload
From bc638e2220d5b82ea3b289f646617100a44adab6 Mon Sep 17 00:00:00 2001
From: rbagga <bangalorian@gmail.com>
Date: Thu, 28 Aug 2025 16:09:10 -0700
Subject: [PATCH] Implement WAL-based async notifications for improved
 throughput

- Added WAL logging for async notifications to improve scalability
- Implemented async resource manager for WAL-based notification handling
- Added new async descriptor files for pg_waldump support
- Updated makefiles and build configuration for new components
---
 src/backend/access/rmgrdesc/Makefile    |   1 +
 src/backend/access/rmgrdesc/asyncdesc.c |  58 +++
 src/backend/access/rmgrdesc/meson.build |   1 +
 src/backend/access/transam/rmgr.c       |   1 +
 src/backend/commands/async.c            | 568 ++++++++++++++++++------
 src/bin/pg_rewind/parsexlog.c           |   1 +
 src/bin/pg_waldump/asyncdesc.c          |   1 +
 src/bin/pg_waldump/rmgrdesc.c           |   1 +
 src/include/access/async_xlog.h         |  56 +++
 src/include/access/rmgrlist.h           |   1 +
 src/include/commands/async.h            |  19 +
 src/include/storage/proc.h              |   3 +
 12 files changed, 587 insertions(+), 124 deletions(-)
 create mode 100644 src/backend/access/rmgrdesc/asyncdesc.c
 create mode 120000 src/bin/pg_waldump/asyncdesc.c
 create mode 100644 src/include/access/async_xlog.h

diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index cd95eec37f1..6e6e75b12bd 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -9,6 +9,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = \
+	asyncdesc.o \
 	brindesc.o \
 	clogdesc.o \
 	committsdesc.o \
diff --git a/src/backend/access/rmgrdesc/asyncdesc.c b/src/backend/access/rmgrdesc/asyncdesc.c
new file mode 100644
index 00000000000..b110457431f
--- /dev/null
+++ b/src/backend/access/rmgrdesc/asyncdesc.c
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ *
+ * asyncdesc.c
+ *	  rmgr descriptor routines for access/transam/async.c
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/asyncdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/async_xlog.h"
+
+void
+async_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == XLOG_ASYNC_NOTIFY_DATA)
+	{
+		xl_async_notify_data *xlrec = (xl_async_notify_data *) rec;
+
+		appendStringInfo(buf, "notify data: db %u xid %u pid %d notifications %u",
+						 xlrec->dbid, xlrec->xid, xlrec->srcPid, xlrec->nnotifications);
+	}
+	else if (info == XLOG_ASYNC_NOTIFY_COMMIT)
+	{
+		xl_async_notify_commit *xlrec = (xl_async_notify_commit *) rec;
+
+		appendStringInfo(buf, "notify commit: db %u xid %u notify_lsn %X/%X",
+						 xlrec->dbid, xlrec->xid,
+						 LSN_FORMAT_ARGS(xlrec->notify_lsn));
+	}
+}
+
+const char *
+async_identify(uint8 info)
+{
+	const char *id = NULL;
+
+	switch (info & ~XLR_INFO_MASK)
+	{
+		case XLOG_ASYNC_NOTIFY_DATA:
+			id = "NOTIFY_DATA";
+			break;
+		case XLOG_ASYNC_NOTIFY_COMMIT:
+			id = "NOTIFY_COMMIT";
+			break;
+	}
+
+	return id;
+}
\ No newline at end of file
diff --git a/src/backend/access/rmgrdesc/meson.build b/src/backend/access/rmgrdesc/meson.build
index 96c98e800c2..38bef2e87f6 100644
--- a/src/backend/access/rmgrdesc/meson.build
+++ b/src/backend/access/rmgrdesc/meson.build
@@ -2,6 +2,7 @@
 
 # used by frontend programs like pg_waldump
 rmgr_desc_sources = files(
+  'asyncdesc.c',
   'brindesc.c',
   'clogdesc.c',
   'committsdesc.c',
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 1b7499726eb..f8c25e6597a 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -19,6 +19,7 @@
 
 /* includes needed for "access/rmgrlist.h" */
 /* IWYU pragma: begin_keep */
+#include "access/async_xlog.h"
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..8520dbe8920 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -129,10 +129,16 @@
 #include <unistd.h>
 #include <signal.h>
 
+#include "access/async_xlog.h"
 #include "access/parallel.h"
 #include "access/slru.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xloginsert.h"
+#include "access/xlogreader.h"
+#include "access/xlogutils.h"
+#include "access/xlogrecovery.h"
 #include "catalog/pg_database.h"
 #include "commands/async.h"
 #include "common/hashfn.h"
@@ -151,6 +157,16 @@
 #include "utils/snapmgr.h"
 #include "utils/timestamp.h"
 
+/* Missing definitions for WAL-based notification system */
+#define AsyncQueueEntryEmptySize ASYNC_QUEUE_ENTRY_SIZE
+#define SLRU_PAGE_SIZE BLCKSZ
+#define AsyncCtl NotifyCtl
+
+/* WAL record types */
+#define XLOG_ASYNC_NOTIFY_DATA	0x00
+#define XLOG_ASYNC_NOTIFY_COMMIT	0x10
+
+
 
 /*
  * Maximum size of a NOTIFY payload, including terminating NULL.  This
@@ -163,30 +179,13 @@
 #define NOTIFY_PAYLOAD_MAX_LENGTH	(BLCKSZ - NAMEDATALEN - 128)
 
 /*
- * Struct representing an entry in the global notify queue
- *
- * This struct declaration has the maximal length, but in a real queue entry
- * the data area is only big enough for the actual channel and payload strings
- * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
- * entry size, if both channel and payload strings are empty (but note it
- * doesn't include alignment padding).
- *
- * The "length" field should always be rounded up to the next QUEUEALIGN
- * multiple so that all fields are properly aligned.
+ * NOTE: The AsyncQueueEntry structure is now defined in commands/async.h
+ * as a compact metadata-only structure for the new WAL-based notification system.
+ * The old variable-length structure with full notification content is no longer used.
  */
-typedef struct AsyncQueueEntry
-{
-	int			length;			/* total allocated length of entry */
-	Oid			dboid;			/* sender's database OID */
-	TransactionId xid;			/* sender's XID */
-	int32		srcPid;			/* sender's PID */
-	char		data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
-} AsyncQueueEntry;
-
-/* Currently, no field of AsyncQueueEntry requires more than int alignment */
-#define QUEUEALIGN(len)		INTALIGN(len)
 
-#define AsyncQueueEntryEmptySize	(offsetof(AsyncQueueEntry, data) + 2)
+/* Queue alignment is still needed for SLRU page management */
+#define QUEUEALIGN(len)		INTALIGN(len)
 
 /*
  * Struct describing a queue position, and assorted macros for working with it
@@ -440,8 +439,6 @@ static bool IsListeningOn(const char *channel);
 static void asyncQueueUnregister(void);
 static bool asyncQueueIsFull(void);
 static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
-static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
-static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
 static double asyncQueueUsage(void);
 static void asyncQueueFillWarning(void);
 static void SignalBackends(void);
@@ -457,6 +454,8 @@ static void AddEventToPendingNotifies(Notification *n);
 static uint32 notification_hash(const void *key, Size keysize);
 static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
+static void asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn);
+static void processNotificationFromWAL(XLogRecPtr notify_lsn);
 
 /*
  * Compute the difference between two queue page numbers.
@@ -890,65 +889,81 @@ PreCommit_Notify(void)
 		}
 	}
 
-	/* Queue any pending notifies (must happen after the above) */
+	/* Write notification data to WAL if we have any */
 	if (pendingNotifies)
 	{
-		ListCell   *nextNotify;
+		TransactionId currentXid;
+		ListCell   *l;
+		size_t		total_size = 0;
+		uint32		nnotifications = 0;
+		char	   *notifications_data;
+		char	   *ptr;
+		XLogRecPtr	notify_lsn;
 
 		/*
 		 * Make sure that we have an XID assigned to the current transaction.
 		 * GetCurrentTransactionId is cheap if we already have an XID, but not
-		 * so cheap if we don't, and we'd prefer not to do that work while
-		 * holding NotifyQueueLock.
+		 * so cheap if we don't.
 		 */
-		(void) GetCurrentTransactionId();
+		currentXid = GetCurrentTransactionId();
 
 		/*
-		 * Serialize writers by acquiring a special lock that we hold till
-		 * after commit.  This ensures that queue entries appear in commit
-		 * order, and in particular that there are never uncommitted queue
-		 * entries ahead of committed ones, so an uncommitted transaction
-		 * can't block delivery of deliverable notifications.
-		 *
-		 * We use a heavyweight lock so that it'll automatically be released
-		 * after either commit or abort.  This also allows deadlocks to be
-		 * detected, though really a deadlock shouldn't be possible here.
-		 *
-		 * The lock is on "database 0", which is pretty ugly but it doesn't
-		 * seem worth inventing a special locktag category just for this.
-		 * (Historical note: before PG 9.0, a similar lock on "database 0" was
-		 * used by the flatfiles mechanism.)
+		 * Step 1: Write notification data to WAL.
+		 * This can be done in parallel with other transactions since we're
+		 * not holding any global locks yet.
 		 */
-		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
-						 AccessExclusiveLock);
+		
+		/* First pass: calculate total size needed for serialization */
+		foreach(l, pendingNotifies->events)
+		{
+			Notification *n = (Notification *) lfirst(l);
+			
+			/* Size: 2 bytes for channel_len + 2 bytes for payload_len + strings */
+			total_size += 4 + n->channel_len + 1 + n->payload_len + 1;
+			nnotifications++;
+		}
+
+		/* Allocate buffer for notification data */
+		notifications_data = palloc(total_size);
+		ptr = notifications_data;
 
-		/* Now push the notifications into the queue */
-		nextNotify = list_head(pendingNotifies->events);
-		while (nextNotify != NULL)
+		/* Second pass: serialize all notifications */
+		foreach(l, pendingNotifies->events)
 		{
-			/*
-			 * Add the pending notifications to the queue.  We acquire and
-			 * release NotifyQueueLock once per page, which might be overkill
-			 * but it does allow readers to get in while we're doing this.
-			 *
-			 * A full queue is very uncommon and should really not happen,
-			 * given that we have so much space available in the SLRU pages.
-			 * Nevertheless we need to deal with this possibility. Note that
-			 * when we get here we are in the process of committing our
-			 * transaction, but we have not yet committed to clog, so at this
-			 * point in time we can still roll the transaction back.
-			 */
-			LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-			asyncQueueFillWarning();
-			if (asyncQueueIsFull())
-				ereport(ERROR,
-						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-						 errmsg("too many notifications in the NOTIFY queue")));
-			nextNotify = asyncQueueAddEntries(nextNotify);
-			LWLockRelease(NotifyQueueLock);
+			Notification *n = (Notification *) lfirst(l);
+			char	   *channel = n->data;
+			char	   *payload = n->data + n->channel_len + 1;
+
+			/* Write channel length, payload length, channel, and payload */
+			memcpy(ptr, &n->channel_len, 2);
+			ptr += 2;
+			memcpy(ptr, &n->payload_len, 2);
+			ptr += 2;
+			memcpy(ptr, channel, n->channel_len + 1);
+			ptr += n->channel_len + 1;
+			memcpy(ptr, payload, n->payload_len + 1);
+			ptr += n->payload_len + 1;
 		}
 
-		/* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
+		/* Write notification data to WAL */
+		notify_lsn = LogAsyncNotifyData(MyDatabaseId, currentXid, MyProcPid,
+										nnotifications, total_size,
+										notifications_data);
+
+		pfree(notifications_data);
+
+		/*
+		 * Step 2: Record the notification LSN in transaction state.
+		 * This will be included in the commit record later.
+		 */
+		MyProc->notifyCommitLsn = notify_lsn;
+
+		/*
+		 * Note: We don't add to the traditional SLRU queue here anymore.
+		 * Instead, AtCommit_Notify will add a compact entry to the queue
+		 * pointing to the WAL data after the transaction commits.
+		 * We also don't clear pendingNotifies here; AtCommit_Notify will.
+		 */
 	}
 }
 
@@ -1006,13 +1021,34 @@ AtCommit_Notify(void)
 		asyncQueueUnregister();
 
 	/*
-	 * Send signals to listening backends.  We need do this only if there are
-	 * pending notifies, which were previously added to the shared queue by
-	 * PreCommit_Notify().
+	 * Step 3: If we have notifications, add compact metadata to SLRU queue
+	 * and signal listeners. This happens after transaction commit so the
+	 * notification LSN in our commit record is now durable.
 	 */
-	if (pendingNotifies != NULL)
+	if (pendingNotifies != NULL && !XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		/*
+		 * Write commit record with reference to notification data.
+		 * This establishes the connection between commit and notifications.
+		 */
+		LogAsyncNotifyCommit(MyDatabaseId, GetCurrentTransactionId(), MyProc->notifyCommitLsn);
+
+		/*
+		 * Add compact entry to SLRU queue pointing to WAL data.
+		 * This is much faster than the old approach since we're only
+		 * writing metadata, not the full notification content.
+		 */
+		LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+		asyncQueueAddCompactEntry(MyDatabaseId, GetCurrentTransactionId(), MyProc->notifyCommitLsn);
+		LWLockRelease(NotifyQueueLock);
+
+		/* Signal listening backends to check the queue */
 		SignalBackends();
 
+		/* Clear the notification LSN now that we're done with it */
+		MyProc->notifyCommitLsn = InvalidXLogRecPtr;
+	}
+
 	/*
 	 * If it's time to try to advance the global tail pointer, do that.
 	 *
@@ -1319,21 +1355,11 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
 static void
 asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
 {
-	size_t		channellen = n->channel_len;
-	size_t		payloadlen = n->payload_len;
-	int			entryLength;
-
-	Assert(channellen < NAMEDATALEN);
-	Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
-
-	/* The terminators are already included in AsyncQueueEntryEmptySize */
-	entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
-	entryLength = QUEUEALIGN(entryLength);
-	qe->length = entryLength;
-	qe->dboid = MyDatabaseId;
+	/* For the new WAL-based system, we create a compact entry with metadata only */
+	qe->dbid = MyDatabaseId;
 	qe->xid = GetCurrentTransactionId();
-	qe->srcPid = MyProcPid;
-	memcpy(qe->data, n->data, channellen + payloadlen + 2);
+	/* notify_lsn will be set later when we write to WAL */
+	qe->notify_lsn = InvalidXLogRecPtr;
 }
 
 /*
@@ -1405,7 +1431,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
 		offset = QUEUE_POS_OFFSET(queue_head);
 
 		/* Check whether the entry really fits on the current page */
-		if (offset + qe.length <= QUEUE_PAGESIZE)
+		if (offset + ASYNC_QUEUE_ENTRY_SIZE <= QUEUE_PAGESIZE)
 		{
 			/* OK, so advance nextNotify past this item */
 			nextNotify = lnext(pendingNotifies->events, nextNotify);
@@ -1414,22 +1440,21 @@ asyncQueueAddEntries(ListCell *nextNotify)
 		{
 			/*
 			 * Write a dummy entry to fill up the page. Actually readers will
-			 * only check dboid and since it won't match any reader's database
+			 * only check dbid and since it won't match any reader's database
 			 * OID, they will ignore this entry and move on.
 			 */
-			qe.length = QUEUE_PAGESIZE - offset;
-			qe.dboid = InvalidOid;
-			qe.data[0] = '\0';	/* empty channel */
-			qe.data[1] = '\0';	/* empty payload */
+			qe.dbid = InvalidOid;
+			qe.xid = InvalidTransactionId;
+			qe.notify_lsn = InvalidXLogRecPtr;
 		}
 
 		/* Now copy qe into the shared buffer page */
 		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
 			   &qe,
-			   qe.length);
+			   ASYNC_QUEUE_ENTRY_SIZE);
 
 		/* Advance queue_head appropriately, and detect if page is full */
-		if (asyncQueueAdvance(&(queue_head), qe.length))
+		if (asyncQueueAdvance(&(queue_head), ASYNC_QUEUE_ENTRY_SIZE))
 		{
 			LWLock	   *lock;
 
@@ -2032,14 +2057,13 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 		qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
 
 		/*
-		 * Advance *current over this message, possibly to the next page. As
-		 * noted in the comments for asyncQueueReadAllNotifications, we must
-		 * do this before possibly failing while processing the message.
+		 * Advance *current over this compact entry. The new compact entries are
+		 * fixed-size, making this much simpler than the old variable-length entries.
 		 */
-		reachedEndOfPage = asyncQueueAdvance(current, qe->length);
+		reachedEndOfPage = asyncQueueAdvance(current, sizeof(AsyncQueueEntry));
 
 		/* Ignore messages destined for other databases */
-		if (qe->dboid == MyDatabaseId)
+		if (qe->dbid == MyDatabaseId)
 		{
 			if (XidInMVCCSnapshot(qe->xid, snapshot))
 			{
@@ -2047,20 +2071,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 				 * The source transaction is still in progress, so we can't
 				 * process this message yet.  Break out of the loop, but first
 				 * back up *current so we will reprocess the message next
-				 * time.  (Note: it is unlikely but not impossible for
-				 * TransactionIdDidCommit to fail, so we can't really avoid
-				 * this advance-then-back-up behavior when dealing with an
-				 * uncommitted message.)
-				 *
-				 * Note that we must test XidInMVCCSnapshot before we test
-				 * TransactionIdDidCommit, else we might return a message from
-				 * a transaction that is not yet visible to snapshots; compare
-				 * the comments at the head of heapam_visibility.c.
-				 *
-				 * Also, while our own xact won't be listed in the snapshot,
-				 * we need not check for TransactionIdIsCurrentTransactionId
-				 * because our transaction cannot (yet) have queued any
-				 * messages.
+				 * time.
 				 */
 				*current = thisentry;
 				reachedStop = true;
@@ -2068,16 +2079,12 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 			}
 			else if (TransactionIdDidCommit(qe->xid))
 			{
-				/* qe->data is the null-terminated channel name */
-				char	   *channel = qe->data;
-
-				if (IsListeningOn(channel))
-				{
-					/* payload follows channel name */
-					char	   *payload = qe->data + strlen(channel) + 1;
-
-					NotifyMyFrontEnd(channel, payload, qe->srcPid);
-				}
+				/*
+				 * Step 5: Read notification data from WAL using stored LSN.
+				 * The compact entry only contains metadata; actual notification
+				 * content is retrieved from WAL on demand.
+				 */
+				processNotificationFromWAL(qe->notify_lsn);
 			}
 			else
 			{
@@ -2097,6 +2104,228 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 	return reachedStop;
 }
 
+/*
+ * processNotificationFromWAL
+ *
+ * Fetch notification data from WAL using the stored LSN and process
+ * the individual notifications for delivery to listening frontend.
+ * This implements Step 5 of the new WAL-based notification system.
+ */
+static void
+processNotificationFromWAL(XLogRecPtr notify_lsn)
+{
+	XLogReaderState *xlogreader;
+	DecodedXLogRecord *record;
+	xl_async_notify_data *xlrec;
+	char	   *data;
+	char	   *ptr;
+	uint32_t	remaining;
+	int			srcPid;
+	char	   *errormsg;
+
+	/*
+	 * Create XLog reader to fetch the notification data record.
+	 * We use a temporary reader since this is called during normal
+	 * notification processing, not during recovery.
+	 */
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									XL_ROUTINE(.page_read = &read_local_xlog_page,
+											   .segment_open = &wal_segment_open,
+											   .segment_close = &wal_segment_close),
+									NULL);
+	if (!xlogreader)
+		elog(ERROR, "failed to allocate XLog reader for notification data");
+
+	/* Begin reading from the specified LSN */
+	{
+		XLogRecPtr startptr;
+		/* notify_lsn can be the end LSN; back up one byte and find next record */
+		startptr = XLogFindNextRecord(xlogreader, notify_lsn - 1);
+		if (XLogRecPtrIsInvalid(startptr))
+			elog(ERROR, "could not locate WAL record preceding %X/%X",
+				 LSN_FORMAT_ARGS(notify_lsn));
+		XLogBeginRead(xlogreader, startptr);
+	}
+
+	/* Read the WAL record containing notification data */
+	record = (DecodedXLogRecord *) XLogReadRecord(xlogreader, &errormsg);
+	if (record == NULL)
+		elog(ERROR, "failed to read notification data from WAL at %X/%X: %s",
+			 LSN_FORMAT_ARGS(notify_lsn), errormsg ? errormsg : "no error message");
+
+	/* Verify this is the expected record type */
+	if (XLogRecGetRmid(xlogreader) != RM_ASYNC_ID ||
+		(XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK) != XLOG_ASYNC_NOTIFY_DATA)
+	{
+		elog(LOG, "Unexpected WAL record type for notification data");
+		elog(LOG, "XLogRecGetRmid(xlogreader): %d", XLogRecGetRmid(xlogreader));
+		elog(LOG, "XLogRecGetInfo(xlogreader): %d", XLogRecGetInfo(xlogreader));
+	}
+
+	/* Extract the notification data from the WAL record */
+	xlrec = (xl_async_notify_data *) XLogRecGetData(xlogreader);
+	srcPid = xlrec->srcPid;
+	data = (char *) xlrec + SizeOfAsyncNotifyData;
+	ptr = data;
+	remaining = XLogRecGetDataLen(xlogreader) - SizeOfAsyncNotifyData;
+
+	/*
+	 * Process each notification in the serialized data.
+	 * The format is: 2-byte channel_len, 2-byte payload_len,
+	 * null-terminated channel, null-terminated payload.
+	 */
+	for (uint32_t i = 0; i < xlrec->nnotifications && remaining >= 4; i++)
+	{
+		uint16		channel_len;
+		uint16		payload_len;
+		char	   *channel;
+		char	   *payload;
+
+		/* Read lengths */
+		memcpy(&channel_len, ptr, 2);
+		ptr += 2;
+		memcpy(&payload_len, ptr, 2);
+		ptr += 2;
+		remaining -= 4;
+
+		/* Verify we have enough data */
+		if (remaining < channel_len + 1 + payload_len + 1)
+			break;
+
+		/* Extract channel and payload strings */
+		channel = ptr;
+		ptr += channel_len + 1;
+		payload = ptr;
+		ptr += payload_len + 1;
+		remaining -= (channel_len + 1 + payload_len + 1);
+
+		/* Deliver notification if we're listening on this channel */
+		if (IsListeningOn(channel))
+			NotifyMyFrontEnd(channel, payload, srcPid);
+	}
+
+	/* Clean up */
+	XLogReaderFree(xlogreader);
+}
+
+/*
+ * asyncQueueAddCompactEntry
+ *
+ * Add a compact entry to the notification SLRU queue containing only
+ * metadata (dbid, xid, notify_lsn) that points to the full notification 
+ * data in WAL. This is much more efficient than the old approach of
+ * storing complete notification content in the SLRU queue.
+ */
+static void
+asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn)
+{
+	AsyncQueueEntry entry;
+	QueuePosition queue_head;
+	int64		pageno;
+	int			offset;
+	int			slotno;
+	LWLock	   *banklock;
+
+	/*
+	 * Fill in the compact entry with just the metadata.
+	 * No payload data is stored here - it's all in WAL.
+	 */
+	entry.dbid = dbid;
+	entry.xid = xid;
+	entry.notify_lsn = notify_lsn;
+
+	/* Caller should already hold NotifyQueueLock in exclusive mode */
+	queue_head = QUEUE_HEAD;
+
+	/*
+	 * Get the current page. If this is the first write since postmaster
+	 * started, initialize the first page.
+	 */
+	pageno = QUEUE_POS_PAGE(queue_head);
+	banklock = SimpleLruGetBankLock(NotifyCtl, pageno);
+
+	LWLockAcquire(banklock, LW_EXCLUSIVE);
+
+	if (QUEUE_POS_IS_ZERO(queue_head))
+		slotno = SimpleLruZeroPage(NotifyCtl, pageno);
+	else
+		slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
+								   InvalidTransactionId);
+
+	/* Mark the page dirty before writing */
+	NotifyCtl->shared->page_dirty[slotno] = true;
+
+	offset = QUEUE_POS_OFFSET(queue_head);
+
+	/* Check if the compact entry fits on the current page */
+	if (offset + sizeof(AsyncQueueEntry) <= QUEUE_PAGESIZE)
+	{
+		/* Copy the compact entry to the shared buffer */
+		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
+			   &entry,
+			   sizeof(AsyncQueueEntry));
+
+		/* Advance queue head by the size of our compact entry */
+		if (asyncQueueAdvance(&queue_head, sizeof(AsyncQueueEntry)))
+		{
+			/*
+			 * Page became full. Initialize the next page to ensure SLRU
+			 * consistency (similar to what asyncQueueAddEntries does).
+			 */
+			LWLock	   *nextlock;
+
+			pageno = QUEUE_POS_PAGE(queue_head);
+			nextlock = SimpleLruGetBankLock(NotifyCtl, pageno);
+			if (nextlock != banklock)
+			{
+				LWLockRelease(banklock);
+				LWLockAcquire(nextlock, LW_EXCLUSIVE);
+			}
+			SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
+			if (nextlock != banklock)
+			{
+				LWLockRelease(nextlock);
+				LWLockAcquire(banklock, LW_EXCLUSIVE);
+			}
+
+			/* Set cleanup flag if appropriate */
+			if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
+				tryAdvanceTail = true;
+		}
+
+		/* Update the global queue head */
+		QUEUE_HEAD = queue_head;
+	}
+	else
+	{
+		/*
+		 * Entry doesn't fit on current page. This should be very rare with
+		 * our small compact entries, but handle it by padding the page and
+		 * writing to the next page.
+		 */
+		AsyncQueueEntry padding;
+
+		memset(&padding, 0, sizeof(padding));
+		padding.dbid = InvalidOid;  /* Mark as padding */
+
+		/* Fill the rest of the page with padding */
+		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
+			   &padding,
+			   QUEUE_PAGESIZE - offset);
+
+		/* Advance to next page */
+		asyncQueueAdvance(&queue_head, QUEUE_PAGESIZE - offset);
+
+		/* Recursively add the entry on the new page */
+		QUEUE_HEAD = queue_head;
+		LWLockRelease(banklock);
+		asyncQueueAddCompactEntry(dbid, xid, notify_lsn);
+		return;
+	}
+
+	LWLockRelease(banklock);
+}
+
 /*
  * Advance the shared queue tail variable to the minimum of all the
  * per-backend tail pointers.  Truncate pg_notify space if possible.
@@ -2395,3 +2624,94 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
 {
 	return check_slru_buffers("notify_buffers", newval);
 }
+
+/*
+ * Write a WAL record containing async notification data
+ *
+ * This logs notification data to WAL, allowing us to release locks earlier
+ * and maintain commit ordering through WAL's natural ordering guarantees.
+ */
+XLogRecPtr
+LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid,
+				   uint32 nnotifications, Size data_len, char *data)
+{
+	xl_async_notify_data xlrec;
+	XLogRecPtr	recptr;
+
+	xlrec.dbid = dboid;
+	xlrec.xid = xid;
+	xlrec.srcPid = srcPid;
+	xlrec.nnotifications = nnotifications;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfAsyncNotifyData);
+	XLogRegisterData(data, data_len);
+
+	recptr = XLogInsert(RM_ASYNC_ID, XLOG_ASYNC_NOTIFY_DATA);
+
+	return recptr;
+}
+
+/*
+ * Write a WAL record marking commit with notification reference
+ *
+ * This creates a link between the transaction commit and its notification data,
+ * allowing listeners to efficiently locate notification data in WAL.
+ */
+XLogRecPtr
+LogAsyncNotifyCommit(Oid dboid, TransactionId xid, XLogRecPtr notify_lsn)
+{
+	xl_async_notify_commit xlrec;
+	XLogRecPtr	recptr;
+
+	xlrec.dbid = dboid;
+	xlrec.xid = xid;
+	xlrec.notify_lsn = notify_lsn;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfAsyncNotifyCommit);
+
+	recptr = XLogInsert(RM_ASYNC_ID, XLOG_ASYNC_NOTIFY_COMMIT);
+
+	return recptr;
+}
+
+
+
+/*
+ * Redo function for async notification WAL records
+ *
+ * During recovery, we need to replay notification records. For now,
+ * we'll add them to the traditional notification queue. In a complete
+ * implementation, replaying backends would read directly from WAL.
+ */
+void
+async_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		case XLOG_ASYNC_NOTIFY_DATA:
+			/* 
+			 * For notification data records, we don't need to do anything
+			 * during recovery since listeners will read directly from WAL.
+			 * The data is already durably stored in the WAL record itself.
+			 */
+			break;
+
+		case XLOG_ASYNC_NOTIFY_COMMIT:
+			/*
+			 * For commit records, we could add the compact entry to the
+			 * SLRU queue during recovery, but it's not strictly necessary
+			 * since recovery typically happens with no active listeners.
+			 * The important thing is that the WAL data is preserved.
+			 */
+			break;
+
+		default:
+			elog(PANIC, "async_redo: unknown op code %u", info);
+	}
+}
+
+
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 8f4b282c6b1..a2e536cc910 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -13,6 +13,7 @@
 
 #include <unistd.h>
 
+#include "access/async_xlog.h"
 #include "access/rmgr.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
diff --git a/src/bin/pg_waldump/asyncdesc.c b/src/bin/pg_waldump/asyncdesc.c
new file mode 120000
index 00000000000..0f6512e98ef
--- /dev/null
+++ b/src/bin/pg_waldump/asyncdesc.c
@@ -0,0 +1 @@
+../../../src/backend/access/rmgrdesc/asyncdesc.c
\ No newline at end of file
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index fac509ed134..b06c85bf0e7 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -8,6 +8,7 @@
 #define FRONTEND 1
 #include "postgres.h"
 
+#include "access/async_xlog.h"
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
diff --git a/src/include/access/async_xlog.h b/src/include/access/async_xlog.h
new file mode 100644
index 00000000000..1214be82099
--- /dev/null
+++ b/src/include/access/async_xlog.h
@@ -0,0 +1,56 @@
+/*-------------------------------------------------------------------------
+ *
+ * async_xlog.h
+ *	  Async notification WAL definitions
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/async_xlog.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef ASYNC_XLOG_H
+#define ASYNC_XLOG_H
+
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
+
+/*
+ * WAL record types for async notifications
+ */
+#define XLOG_ASYNC_NOTIFY_DATA	0x00	/* notification data */
+#define XLOG_ASYNC_NOTIFY_COMMIT	0x10	/* commit with notify reference */
+
+/*
+ * WAL record for notification data (written in PreCommit_Notify)
+ */
+typedef struct xl_async_notify_data
+{
+	Oid			dbid;			/* database ID */
+	TransactionId xid;			/* transaction ID */
+	int32		srcPid;			/* source backend PID */
+	uint32		nnotifications;	/* number of notifications */
+	/* followed by serialized notification data */
+} xl_async_notify_data;
+
+#define SizeOfAsyncNotifyData	(offsetof(xl_async_notify_data, nnotifications) + sizeof(uint32))
+
+/*
+ * WAL record for commit with notification reference
+ */
+typedef struct xl_async_notify_commit
+{
+	Oid			dbid;			/* database ID */
+	TransactionId xid;			/* transaction ID */
+	XLogRecPtr	notify_lsn;		/* LSN of corresponding notify data record */
+} xl_async_notify_commit;
+
+#define SizeOfAsyncNotifyCommit	(offsetof(xl_async_notify_commit, notify_lsn) + sizeof(XLogRecPtr))
+
+extern void async_redo(XLogReaderState *record);
+extern void async_desc(StringInfo buf, XLogReaderState *record);
+extern const char *async_identify(uint8 info);
+
+#endif							/* ASYNC_XLOG_H */
\ No newline at end of file
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 8e7fc9db877..58293e05165 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
 PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+PG_RMGR(RM_ASYNC_ID, "Async", async_redo, async_desc, async_identify, NULL, NULL, NULL, NULL)
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..1d204542840 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -14,11 +14,24 @@
 #define ASYNC_H
 
 #include <signal.h>
+#include "access/xlogreader.h"
 
 extern PGDLLIMPORT bool Trace_notify;
 extern PGDLLIMPORT int max_notify_queue_pages;
 extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending;
 
+/*
+ * Compact SLRU queue entry - stores metadata pointing to WAL data
+ */
+typedef struct AsyncQueueEntry
+{
+	Oid			dbid;			/* database ID for quick filtering */
+	TransactionId	xid;			/* transaction ID */
+	XLogRecPtr	notify_lsn;		/* LSN of notification data in WAL */
+} AsyncQueueEntry;
+
+#define ASYNC_QUEUE_ENTRY_SIZE	sizeof(AsyncQueueEntry)
+
 extern Size AsyncShmemSize(void);
 extern void AsyncShmemInit(void);
 
@@ -46,4 +59,10 @@ extern void HandleNotifyInterrupt(void);
 /* process interrupts */
 extern void ProcessNotifyInterrupt(bool flush);
 
+/* WAL-based notification functions */
+extern XLogRecPtr LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid,
+									 uint32 nnotifications, Size data_len, char *data);
+extern XLogRecPtr LogAsyncNotifyCommit(Oid dboid, TransactionId xid, XLogRecPtr notify_lsn);
+extern void async_redo(XLogReaderState *record);
+
 #endif							/* ASYNC_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index c6f5ebceefd..71459fe5529 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -301,6 +301,9 @@ struct PGPROC
 
 	uint32		wait_event_info;	/* proc's wait information */
 
+	/* Support for async notifications */
+	XLogRecPtr	notifyCommitLsn;	/* LSN of notification data for current xact */
+
 	/* Support for group transaction status update. */
 	bool		clogGroupMember;	/* true, if member of clog group */
 	pg_atomic_uint32 clogGroupNext; /* next clog group member */
-- 
2.47.1

#14Joel Jacobson
joel@compiler.org
In reply to: Rishu Bagga (#13)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Fri, Sep 5, 2025, at 00:53, Rishu Bagga wrote:

With the following sql script, I ran
pgbench -T 100 -c 100 -j 8 -f pgbench_transaction_notify.sql -d postgres

BEGIN;
INSERT INTO test VALUES(1);
NOTIFY benchmark_channel, 'transaction_completed';
COMMIT;

Thanks for working on this.
I haven't looked at the code yet, but have some questions on the benchmark.

What's the definition of the test table?

With the patch 3 runs showed the following TPS:

tps = 66372.705917
tps = 63445.909465
tps = 64412.544339

Without the patch, we got the following TPS:

tps = 30212.390982
tps = 30908.865812
tps = 29191.388601

So, there is about a 2x increase in TPS at 100 connections, which establishes
some promise in the approach.

In your benchmark, how many backends were doing `LISTEN benchmark_channel;`?

It would be interesting if you could run the benchmark and vary the number of listeners,
e.g. with 1, 10, 100 listeners, to understand the effect of this patch for different type of
workloads.

Additionally, this would help solve the issue being discussed in a
separate thread [1],
where listeners currently rely on the transaction log to verify if a
transaction that it reads
has indeed committed, but it is possible that the portion of the
transaction log has
been truncated by vacuum.

Nice!

/Joel

#15Arseniy Mukhin
arseniy.mukhin.dev@gmail.com
In reply to: Rishu Bagga (#13)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

Hi,

On Fri, Sep 5, 2025 at 1:53 AM Rishu Bagga <rishu.postgres@gmail.com> wrote:

Attached is an initial patch that implements this idea.

Thank you for sharing the patch and for working on this!

I briefly read the patch and there are some points:

1) There are some compilation warnings about unused functions.

2) I thought the idea was to add notification_data_entry_lsn to the
transaction's commit wal entry, but the patch introduced the new
XLOG_ASYNC_NOTIFY_COMMIT wal entry and put lsn in it. Also
XLOG_ASYNC_NOTIFY_COMMIT is written in AtCommit_Notify, when a
transaction is already committed, so we write it after the commit
entry. I don't think we can write anything about the transaction to
the wal after we wrote the transaction commit record. In my
understanding all transaction stuff in wal should be before commit /
abort entries. At least because during committing we sync wal with
disk up to the commit entry, and we can't guarantee that anything
after a commit entry will survive a crash. Anyway I think we just
don't need this new wal entry.

3) I see you removed global lock, but I don't see where you use lock
for step 3 [0] to provide the commit order of notifications. I thought
step 3 should look something like that:

if (have_notifications)
acquire_lock();

...

XactLogCommitRecord();

....

if (have_notifications)
{
asyncQueueAddCompactEntry();
release_lock();
}

So this way we will have the same order for commit records in wal and
entries in the listen/notify queue. I'm not sure what level we should
add this stuff, but it seems that to minimise the time we hold the
lock we need to release it before calling XLogFlush(). I'm not sure
about it.

Best regards,
Arseniy Mukhin

#16Matheus Alcantara
matheusssilv97@gmail.com
In reply to: Rishu Bagga (#13)
1 attachment(s)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Thu Sep 4, 2025 at 7:53 PM -03, Rishu Bagga wrote:

Attached is an initial patch that implements this idea.

Thanks for working on this! I haven't looked at the code yet, but have
some questions related with the issue discussed at [1]/messages/by-id/CAK98qZ3wZLE-RZJN_Y+TFjiTRPPFPBwNBpBi5K5CU8hUHkzDpw@mail.gmail.com.

Additionally, this would help solve the issue being discussed in a
separate thread [1],
where listeners currently rely on the transaction log to verify if a
transaction that it reads
has indeed committed, but it is possible that the portion of the
transaction log has
been truncated by vacuum.

Your patch already aims to fix the issue? On [2]/messages/by-id/CAFY6G8cJm73_MM9SuynZUqtqcaTuepUDgDuvS661oLW7U0dgsg@mail.gmail.com I implemented a TAP
test that reproduce the issue and I tried to execute using your patch
and I still see the error. I'm attaching the TAP test isolated and maybe
we could incorporate into your patch series to ensure that the issue is
fixed? What do you think?

[1]: /messages/by-id/CAK98qZ3wZLE-RZJN_Y+TFjiTRPPFPBwNBpBi5K5CU8hUHkzDpw@mail.gmail.com
[2]: /messages/by-id/CAFY6G8cJm73_MM9SuynZUqtqcaTuepUDgDuvS661oLW7U0dgsg@mail.gmail.com

--
Matheus Alcantara

Attachments:

0001-Add-TAP-test-for-LISTEN-NOTIFY-xid-vacuum-freeze-bug.patch.nocfbottext/plain; charset=utf-8; name=0001-Add-TAP-test-for-LISTEN-NOTIFY-xid-vacuum-freeze-bug.patch.nocfbotDownload
From c93fb480c7676b02bac5813a69d7df3d717fd9a6 Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths.dev@pm.me>
Date: Sat, 6 Sep 2025 11:29:02 -0300
Subject: [PATCH] Add TAP test for LISTEN/NOTIFY xid vacuum freeze bug

---
 src/test/modules/Makefile                     |  1 +
 src/test/modules/meson.build                  |  1 +
 src/test/modules/test_listen_notify/Makefile  | 17 +++++
 .../modules/test_listen_notify/meson.build    | 13 ++++
 .../test_listen_notify/t/001_xid_freeze.pl    | 73 +++++++++++++++++++
 5 files changed, 105 insertions(+)
 create mode 100644 src/test/modules/test_listen_notify/Makefile
 create mode 100644 src/test/modules/test_listen_notify/meson.build
 create mode 100644 src/test/modules/test_listen_notify/t/001_xid_freeze.pl

diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 903a8ac151a..4c0160df341 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -28,6 +28,7 @@ SUBDIRS = \
 		  test_int128 \
 		  test_integerset \
 		  test_json_parser \
+		  test_listen_notify \
 		  test_lfind \
 		  test_misc \
 		  test_oat_hooks \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 93be0f57289..144379b619b 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -27,6 +27,7 @@ subdir('test_ginpostinglist')
 subdir('test_int128')
 subdir('test_integerset')
 subdir('test_json_parser')
+subdir('test_listen_notify')
 subdir('test_lfind')
 subdir('test_misc')
 subdir('test_oat_hooks')
diff --git a/src/test/modules/test_listen_notify/Makefile b/src/test/modules/test_listen_notify/Makefile
new file mode 100644
index 00000000000..da1bf5bb1b7
--- /dev/null
+++ b/src/test/modules/test_listen_notify/Makefile
@@ -0,0 +1,17 @@
+# src/test/modules/test_listen_notify/Makefile
+
+MODULE = test_listen_notify
+PGFILEDESC = "test_listen_notify - regression testing for LISTEN/NOTIFY support"
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_listen_notify
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_listen_notify/meson.build b/src/test/modules/test_listen_notify/meson.build
new file mode 100644
index 00000000000..8119e6c761f
--- /dev/null
+++ b/src/test/modules/test_listen_notify/meson.build
@@ -0,0 +1,13 @@
+# Copyright (c) 2022-2025, PostgreSQL Global Development Group
+
+tests += {
+  'name': 'test_listen_notify',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_xid_freeze.pl',
+    ],
+  },
+}
+
diff --git a/src/test/modules/test_listen_notify/t/001_xid_freeze.pl b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
new file mode 100644
index 00000000000..79dcd73ed65
--- /dev/null
+++ b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
@@ -0,0 +1,73 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use File::Path qw(mkpath);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+# Setup
+$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound');
+$node->safe_psql('postgres',
+	'CREATE TABLE t AS SELECT g AS a, g+2 AS b from generate_series(1,100000) g;'
+);
+$node->safe_psql('postgres',
+	'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true');
+
+# --- Start Session 1 and leave it idle in transaction
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe('listen s;', "Session 1 listens to 's'");
+$psql_session1->query_safe('begin;', "Session 1 starts a transaction");
+
+# --- Session 2, multiple notify's, and commit ---
+for my $i (1 .. 10)
+{
+	$node->safe_psql(
+		'postgres', "
+		BEGIN;
+		NOTIFY s, '$i';
+		COMMIT;");
+}
+
+# Consume enough XIDs to trigger truncation
+$node->safe_psql('postgres', 'select consume_xids(10000000);');
+
+# Execute update so the frozen xid of "t" table is updated to a xid greater
+# than consume_xids() result
+$node->safe_psql('postgres', 'UPDATE t SET a = a+b;');
+
+# Remember current datfrozenxid before vacuum freeze to ensure that it is advanced.
+my $datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'");
+
+# Execute vacuum freeze on all databases
+$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ],
+	"vacuumdb --all --freeze");
+
+# Get the new datfrozenxid after vacuum freeze to ensure that is advanced but
+# we can still get the notification status of the notification
+my $datafronzenxid_partial_freeze = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'");
+ok($datafronzenxid_partial_freeze > $datafronzenxid, 'datfrozenxid is partially advanced');
+
+# On Session 1, commit and ensure that the all notifications is received
+my $res = $psql_session1->query_safe('commit;', "commit listen s;");
+my $notifications_count = 0;
+foreach my $i (split('\n', $res))
+{
+	$notifications_count++;
+	like($i, qr/Asynchronous notification "s" with payload "$notifications_count" received/);
+}
+is($notifications_count, 10, 'received all committed notifications');
+
+# Execute vacuum freeze on all databases again and ensure that datfrozenxid is fully advanced.
+$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ],
+	"vacuumdb --all --freeze");
+
+my $datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'");
+ok($datafronzenxid_freeze > $datafronzenxid_partial_freeze, 'datfrozenxid is advanced after notification is consumed');
+
+done_testing();
-- 
2.39.5 (Apple Git-154)

#17Rishu Bagga
rishu.postgres@gmail.com
In reply to: Matheus Alcantara (#16)
1 attachment(s)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

Hi Joel, Arseniy, Matheus, thanks for taking a look. I’ve attached an
updated patch and rebased on the latest commits that fixes the
correctness issues.

On Fri, Sep 5, 2025 at 2:38 AM Joel Jacobson <joel@compiler.org> wrote:

What's the definition of the test table?

It’s just a one column integer table defined as such:

CREATE TABLE test(x int);

In your benchmark, how many backends were doing
`LISTEN benchmark_channel;`?

I just tested the notify codepath throughput, with no listeners.

It would be interesting if you could run the benchmark and vary the
number of listeners, e.g. with 1, 10, 100 listeners, to understand the
effect of this patch for different type of workloads.

IIUC, the current benchmark wouldn’t be affected by listeners - the
commit of the notifying transaction is independent from the listener
reading the transaction. I agree that it would be useful to test listener
performance as well. I will work on this shortly.

On Fri, Sep 5, 2025 at 10:31 AM Arseniy Mukhin
<arseniy.mukhin.dev@gmail.com> wrote:

1) There are some compilation warnings about unused functions.

Taken care of in the attached patch.

2) I thought the idea was to add notification_data_entry_lsn to the
transaction's commit wal entry, but the patch introduced the new
XLOG_ASYNC_NOTIFY_COMMIT wal entry and put lsn in it. Also
XLOG_ASYNC_NOTIFY_COMMIT is written in AtCommit_Notify, when a
transaction is already committed, so we write it after the commit
entry. I don't think we can write anything about the transaction to
the wal after we wrote the transaction commit record. In my
understanding all transaction stuff in wal should be before commit /
abort entries. At least because during committing we sync wal with
disk up to the commit entry, and we can't guarantee that anything
after a commit entry will survive a crash. Anyway I think we just
don't need this new wal entry.

I removed the XLOG_ASYNC_NOTIFY_COMMIT record in the new patch,
in favor of including the LSN of the notification as a new field in the
commit record itself. The reasoning here is that Notify data is WAL’d
before transaction commit, so during WAL replay, we need some way
to know whether the notification should be added to the notify queue.
In the previous patch we were using XLOG_ASYNC_NOTIFY_COMMIT
record, but actually this doesn’t work, because we could write this
record before commit, or we could commit and crash before writing
this record. So the best solution is to have a field within the commit
record, that tells us which log record contains the notifications for that
transaction, if there are any.

I see you removed global lock, but I don't see where you use lock
for step 3 [0] to provide the commit order of notifications.

Fixed this in the updated patch.

On Sat, Sep 6, 2025 at 7:52 AM Matheus Alcantara
<matheusssilv97@gmail.com> wrote:

Your patch already aims to fix the issue? On [2] I implemented a TAP
test that reproduce the issue and I tried to execute using your patch
and I still see the error. I'm attaching the TAP test isolated and
maybe we could incorporate into your patch series to ensure that the
issue is fixed? What do you think?

I wasn’t able to run the TAP tests; however, in the updated patch, we
can be confident that entries in the queue are from committed
transactions. If there is a crash before committing and after writing to
the queue, this would be within the critical section, so a notification
from an uncommitted transaction would never be read in the queue.
That allows us to remove the XidInMVCCSnapshot and
TransactionIdDidCommit check.

Thanks,
Rishu

Attachments:

notify-through-wal-v2.patchapplication/octet-stream; name=notify-through-wal-v2.patchDownload
From 0bdf76b72f346f8056bee29c4bd9e738e57e17fd Mon Sep 17 00:00:00 2001
From: rbagga <bangalorian@gmail.com>
Date: Sun, 7 Sep 2025 16:55:57 -0700
Subject: [PATCH] Implement WAL-based async notifications for improved
 throughput

- Added WAL logging for async notifications to improve scalability
- Implemented async resource manager for WAL-based notification handling
- Added new async descriptor files for pg_waldump support
- Updated makefiles and build configuration for new components
---
 src/backend/access/rmgrdesc/Makefile    |   1 +
 src/backend/access/rmgrdesc/meson.build |   1 +
 src/backend/access/rmgrdesc/xactdesc.c  |  13 +
 src/backend/access/transam/rmgr.c       |   1 +
 src/backend/access/transam/xact.c       |  48 +-
 src/backend/commands/async.c            | 800 ++++++++++++------------
 src/bin/pg_rewind/parsexlog.c           |   1 +
 src/bin/pg_waldump/rmgrdesc.c           |   2 +
 src/include/access/rmgrlist.h           |   1 +
 src/include/access/xact.h               |   9 +
 src/include/commands/async.h            |  25 +
 src/include/storage/proc.h              |   3 +
 12 files changed, 501 insertions(+), 404 deletions(-)

diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index cd95eec37f1..6e6e75b12bd 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -9,6 +9,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = \
+	asyncdesc.o \
 	brindesc.o \
 	clogdesc.o \
 	committsdesc.o \
diff --git a/src/backend/access/rmgrdesc/meson.build b/src/backend/access/rmgrdesc/meson.build
index 96c98e800c2..38bef2e87f6 100644
--- a/src/backend/access/rmgrdesc/meson.build
+++ b/src/backend/access/rmgrdesc/meson.build
@@ -2,6 +2,7 @@
 
 # used by frontend programs like pg_waldump
 rmgr_desc_sources = files(
+  'asyncdesc.c',
   'brindesc.c',
   'clogdesc.c',
   'committsdesc.c',
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index f0f696855b9..4f32f7fc591 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -135,6 +135,19 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 
 		data += sizeof(xl_xact_origin);
 	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_NOTIFY)
+	{
+		xl_xact_notify xl_notify;
+
+		/* no alignment is guaranteed, so copy onto stack */
+		memcpy(&xl_notify, data, sizeof(xl_notify));
+
+		parsed->notify_lsn = xl_notify.notify_lsn;
+
+		data += sizeof(xl_xact_notify);
+	}
+
 }
 
 void
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 1b7499726eb..f8c25e6597a 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -19,6 +19,7 @@
 
 /* includes needed for "access/rmgrlist.h" */
 /* IWYU pragma: begin_keep */
+#include "access/async_xlog.h"
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b46e7e9c2a6..33b16ff4746 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5841,10 +5841,24 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_invals xl_invals;
 	xl_xact_twophase xl_twophase;
 	xl_xact_origin xl_origin;
+	xl_xact_notify xl_notify;
 	uint8		info;
+	XLogRecPtr	result;
 
 	Assert(CritSectionCount > 0);
 
+	/*
+	 * Handle notification commit ordering: if this transaction has pending
+	 * notifications, we must write the queue entry just before the commit
+	 * record while holding NotifyQueueLock to ensure proper ordering.
+	 */
+	if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		TransactionId xid = GetCurrentTransactionId();
+		LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+		asyncQueueAddCompactEntry(MyDatabaseId, xid, MyProc->notifyCommitLsn);
+	}
+
 	xl_xinfo.xinfo = 0;
 
 	/* decide between a plain and 2pc commit */
@@ -5926,9 +5940,17 @@ XactLogCommitRecord(TimestampTz commit_time,
 		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
+	/* include notification information if present */
+	if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_NOTIFY;
+		xl_notify.notify_lsn = MyProc->notifyCommitLsn;
+	}
+
 	if (xl_xinfo.xinfo != 0)
 		info |= XLOG_XACT_HAS_INFO;
 
+
 	/* Then include all the collected data into the commit record. */
 
 	XLogBeginInsert();
@@ -5982,10 +6004,28 @@ XactLogCommitRecord(TimestampTz commit_time,
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData(&xl_origin, sizeof(xl_xact_origin));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_NOTIFY)
+		XLogRegisterData(&xl_notify, sizeof(xl_xact_notify));
+
 	/* we allow filtering by xacts */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
-	return XLogInsert(RM_XACT_ID, info);
+	/* Insert the commit record */
+	result = XLogInsert(RM_XACT_ID, info);
+
+	/*
+	 * Release NotifyQueueLock if we held it. The queue entry is now
+	 * associated with a committed transaction, so readers can process it.
+	 */
+	if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		LWLockRelease(NotifyQueueLock);
+		
+		/* Signal listening backends to check for new notifications */
+		SignalBackends();
+	}
+
+	return result;
 }
 
 /*
@@ -6227,6 +6267,12 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 						   false /* backward */ , false /* WAL */ );
 	}
 
+	/* Add notification queue entry if this commit has notifications */
+	if (parsed->xinfo & XACT_XINFO_HAS_NOTIFY)
+	{
+		asyncQueueAddCompactEntry(parsed->dbId, xid, parsed->notify_lsn);
+	}
+
 	/* Make sure files supposed to be dropped are dropped */
 	if (parsed->nrels > 0)
 	{
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..57fa732e9b8 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -133,6 +133,12 @@
 #include "access/slru.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xloginsert.h"
+#include "access/xlogreader.h"
+#include "access/xlogutils.h"
+#include "access/xlogrecovery.h"
+#include "access/xlog_internal.h"
 #include "catalog/pg_database.h"
 #include "commands/async.h"
 #include "common/hashfn.h"
@@ -151,6 +157,29 @@
 #include "utils/snapmgr.h"
 #include "utils/timestamp.h"
 
+/* Missing definitions for WAL-based notification system */
+#define AsyncQueueEntryEmptySize ASYNC_QUEUE_ENTRY_SIZE
+#define SLRU_PAGE_SIZE BLCKSZ
+#define AsyncCtl NotifyCtl
+
+/* WAL record types */
+#define XLOG_ASYNC_NOTIFY_DATA	0x00
+
+/*
+ * WAL record for notification data (written in PreCommit_Notify)
+ */
+typedef struct xl_async_notify_data
+{
+	Oid			dbid;			/* database ID */
+	TransactionId xid;			/* transaction ID */
+	int32		srcPid;			/* source backend PID */
+	uint32		nnotifications;	/* number of notifications */
+	/* followed by serialized notification data */
+} xl_async_notify_data;
+
+#define SizeOfAsyncNotifyData	(offsetof(xl_async_notify_data, nnotifications) + sizeof(uint32))
+
+
 
 /*
  * Maximum size of a NOTIFY payload, including terminating NULL.  This
@@ -163,30 +192,13 @@
 #define NOTIFY_PAYLOAD_MAX_LENGTH	(BLCKSZ - NAMEDATALEN - 128)
 
 /*
- * Struct representing an entry in the global notify queue
- *
- * This struct declaration has the maximal length, but in a real queue entry
- * the data area is only big enough for the actual channel and payload strings
- * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
- * entry size, if both channel and payload strings are empty (but note it
- * doesn't include alignment padding).
- *
- * The "length" field should always be rounded up to the next QUEUEALIGN
- * multiple so that all fields are properly aligned.
+ * NOTE: The AsyncQueueEntry structure is now defined in commands/async.h
+ * as a compact metadata-only structure for the new WAL-based notification system.
+ * The old variable-length structure with full notification content is no longer used.
  */
-typedef struct AsyncQueueEntry
-{
-	int			length;			/* total allocated length of entry */
-	Oid			dboid;			/* sender's database OID */
-	TransactionId xid;			/* sender's XID */
-	int32		srcPid;			/* sender's PID */
-	char		data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
-} AsyncQueueEntry;
-
-/* Currently, no field of AsyncQueueEntry requires more than int alignment */
-#define QUEUEALIGN(len)		INTALIGN(len)
 
-#define AsyncQueueEntryEmptySize	(offsetof(AsyncQueueEntry, data) + 2)
+/* Queue alignment is still needed for SLRU page management */
+#define QUEUEALIGN(len)		INTALIGN(len)
 
 /*
  * Struct describing a queue position, and assorted macros for working with it
@@ -438,18 +450,13 @@ static void Exec_UnlistenCommit(const char *channel);
 static void Exec_UnlistenAllCommit(void);
 static bool IsListeningOn(const char *channel);
 static void asyncQueueUnregister(void);
-static bool asyncQueueIsFull(void);
 static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
-static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
-static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
 static double asyncQueueUsage(void);
-static void asyncQueueFillWarning(void);
-static void SignalBackends(void);
+void SignalBackends(void);
 static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 										 QueuePosition stop,
-										 char *page_buffer,
-										 Snapshot snapshot);
+										 char *page_buffer);
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(bool flush);
 static bool AsyncExistsPendingNotify(Notification *n);
@@ -457,6 +464,7 @@ static void AddEventToPendingNotifies(Notification *n);
 static uint32 notification_hash(const void *key, Size keysize);
 static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
+static void processNotificationFromWAL(XLogRecPtr notify_lsn);
 
 /*
  * Compute the difference between two queue page numbers.
@@ -890,65 +898,75 @@ PreCommit_Notify(void)
 		}
 	}
 
-	/* Queue any pending notifies (must happen after the above) */
+	/* Write notification data to WAL if we have any */
 	if (pendingNotifies)
 	{
-		ListCell   *nextNotify;
+		TransactionId currentXid;
+		ListCell   *l;
+		size_t		total_size = 0;
+		uint32		nnotifications = 0;
+		char	   *notifications_data;
+		char	   *ptr;
+		XLogRecPtr	notify_lsn;
 
 		/*
 		 * Make sure that we have an XID assigned to the current transaction.
 		 * GetCurrentTransactionId is cheap if we already have an XID, but not
-		 * so cheap if we don't, and we'd prefer not to do that work while
-		 * holding NotifyQueueLock.
+		 * so cheap if we don't.
 		 */
-		(void) GetCurrentTransactionId();
+		currentXid = GetCurrentTransactionId();
 
 		/*
-		 * Serialize writers by acquiring a special lock that we hold till
-		 * after commit.  This ensures that queue entries appear in commit
-		 * order, and in particular that there are never uncommitted queue
-		 * entries ahead of committed ones, so an uncommitted transaction
-		 * can't block delivery of deliverable notifications.
-		 *
-		 * We use a heavyweight lock so that it'll automatically be released
-		 * after either commit or abort.  This also allows deadlocks to be
-		 * detected, though really a deadlock shouldn't be possible here.
-		 *
-		 * The lock is on "database 0", which is pretty ugly but it doesn't
-		 * seem worth inventing a special locktag category just for this.
-		 * (Historical note: before PG 9.0, a similar lock on "database 0" was
-		 * used by the flatfiles mechanism.)
+		 * Step 1: Write notification data to WAL.
+		 * This can be done in parallel with other transactions since we're
+		 * not holding any global locks yet.
 		 */
-		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
-						 AccessExclusiveLock);
+		
+		/* First pass: calculate total size needed for serialization */
+		foreach(l, pendingNotifies->events)
+		{
+			Notification *n = (Notification *) lfirst(l);
+			
+			/* Size: 2 bytes for channel_len + 2 bytes for payload_len + strings */
+			total_size += 4 + n->channel_len + 1 + n->payload_len + 1;
+			nnotifications++;
+		}
 
-		/* Now push the notifications into the queue */
-		nextNotify = list_head(pendingNotifies->events);
-		while (nextNotify != NULL)
+		/* Allocate buffer for notification data */
+		notifications_data = palloc(total_size);
+		ptr = notifications_data;
+
+		/* Second pass: serialize all notifications */
+		foreach(l, pendingNotifies->events)
 		{
-			/*
-			 * Add the pending notifications to the queue.  We acquire and
-			 * release NotifyQueueLock once per page, which might be overkill
-			 * but it does allow readers to get in while we're doing this.
-			 *
-			 * A full queue is very uncommon and should really not happen,
-			 * given that we have so much space available in the SLRU pages.
-			 * Nevertheless we need to deal with this possibility. Note that
-			 * when we get here we are in the process of committing our
-			 * transaction, but we have not yet committed to clog, so at this
-			 * point in time we can still roll the transaction back.
-			 */
-			LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-			asyncQueueFillWarning();
-			if (asyncQueueIsFull())
-				ereport(ERROR,
-						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-						 errmsg("too many notifications in the NOTIFY queue")));
-			nextNotify = asyncQueueAddEntries(nextNotify);
-			LWLockRelease(NotifyQueueLock);
+			Notification *n = (Notification *) lfirst(l);
+			char	   *channel = n->data;
+			char	   *payload = n->data + n->channel_len + 1;
+
+			/* Write channel length, payload length, channel, and payload */
+			memcpy(ptr, &n->channel_len, 2);
+			ptr += 2;
+			memcpy(ptr, &n->payload_len, 2);
+			ptr += 2;
+			memcpy(ptr, channel, n->channel_len + 1);
+			ptr += n->channel_len + 1;
+			memcpy(ptr, payload, n->payload_len + 1);
+			ptr += n->payload_len + 1;
 		}
 
-		/* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
+		/* Write notification data to WAL */
+		notify_lsn = LogAsyncNotifyData(MyDatabaseId, currentXid, MyProcPid,
+										nnotifications, total_size,
+										notifications_data);
+
+		pfree(notifications_data);
+
+		/*
+		 * Step 2: Store the notification LSN in PROC for use during commit.
+		 * The queue entry will be written just before the commit record
+		 * while holding the global notification commit lock to ensure proper ordering.
+		 */
+		MyProc->notifyCommitLsn = notify_lsn;
 	}
 }
 
@@ -1006,12 +1024,19 @@ AtCommit_Notify(void)
 		asyncQueueUnregister();
 
 	/*
-	 * Send signals to listening backends.  We need do this only if there are
-	 * pending notifies, which were previously added to the shared queue by
-	 * PreCommit_Notify().
+	 * If we had notifications, they were already written to the queue in
+	 * PreCommit_Notify. Now that we've committed, signal listening backends
+	 * to check the queue. The transaction visibility logic will now see our
+	 * XID as committed and process the notifications.
 	 */
-	if (pendingNotifies != NULL)
+	if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		/* Signal listening backends to check the queue */
 		SignalBackends();
+		
+		/* Clear the flag now that we're done */
+		MyProc->notifyCommitLsn = InvalidXLogRecPtr;
+	}
 
 	/*
 	 * If it's time to try to advance the global tail pointer, do that.
@@ -1263,21 +1288,6 @@ asyncQueueUnregister(void)
 	amRegisteredListener = false;
 }
 
-/*
- * Test whether there is room to insert more notification messages.
- *
- * Caller must hold at least shared NotifyQueueLock.
- */
-static bool
-asyncQueueIsFull(void)
-{
-	int64		headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
-	int64		tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
-	int64		occupied = headPage - tailPage;
-
-	return occupied >= max_notify_queue_pages;
-}
-
 /*
  * Advance the QueuePosition to the next entry, assuming that the current
  * entry is of length entryLength.  If we jump to a new page the function
@@ -1313,166 +1323,6 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
 	return pageJump;
 }
 
-/*
- * Fill the AsyncQueueEntry at *qe with an outbound notification message.
- */
-static void
-asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
-{
-	size_t		channellen = n->channel_len;
-	size_t		payloadlen = n->payload_len;
-	int			entryLength;
-
-	Assert(channellen < NAMEDATALEN);
-	Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
-
-	/* The terminators are already included in AsyncQueueEntryEmptySize */
-	entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
-	entryLength = QUEUEALIGN(entryLength);
-	qe->length = entryLength;
-	qe->dboid = MyDatabaseId;
-	qe->xid = GetCurrentTransactionId();
-	qe->srcPid = MyProcPid;
-	memcpy(qe->data, n->data, channellen + payloadlen + 2);
-}
-
-/*
- * Add pending notifications to the queue.
- *
- * We go page by page here, i.e. we stop once we have to go to a new page but
- * we will be called again and then fill that next page. If an entry does not
- * fit into the current page, we write a dummy entry with an InvalidOid as the
- * database OID in order to fill the page. So every page is always used up to
- * the last byte which simplifies reading the page later.
- *
- * We are passed the list cell (in pendingNotifies->events) containing the next
- * notification to write and return the first still-unwritten cell back.
- * Eventually we will return NULL indicating all is done.
- *
- * We are holding NotifyQueueLock already from the caller and grab
- * page specific SLRU bank lock locally in this function.
- */
-static ListCell *
-asyncQueueAddEntries(ListCell *nextNotify)
-{
-	AsyncQueueEntry qe;
-	QueuePosition queue_head;
-	int64		pageno;
-	int			offset;
-	int			slotno;
-	LWLock	   *prevlock;
-
-	/*
-	 * We work with a local copy of QUEUE_HEAD, which we write back to shared
-	 * memory upon exiting.  The reason for this is that if we have to advance
-	 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
-	 * instance), and we must not advance QUEUE_HEAD if it does.  (Otherwise,
-	 * subsequent insertions would try to put entries into a page that slru.c
-	 * thinks doesn't exist yet.)  So, use a local position variable.  Note
-	 * that if we do fail, any already-inserted queue entries are forgotten;
-	 * this is okay, since they'd be useless anyway after our transaction
-	 * rolls back.
-	 */
-	queue_head = QUEUE_HEAD;
-
-	/*
-	 * If this is the first write since the postmaster started, we need to
-	 * initialize the first page of the async SLRU.  Otherwise, the current
-	 * page should be initialized already, so just fetch it.
-	 */
-	pageno = QUEUE_POS_PAGE(queue_head);
-	prevlock = SimpleLruGetBankLock(NotifyCtl, pageno);
-
-	/* We hold both NotifyQueueLock and SLRU bank lock during this operation */
-	LWLockAcquire(prevlock, LW_EXCLUSIVE);
-
-	if (QUEUE_POS_IS_ZERO(queue_head))
-		slotno = SimpleLruZeroPage(NotifyCtl, pageno);
-	else
-		slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
-								   InvalidTransactionId);
-
-	/* Note we mark the page dirty before writing in it */
-	NotifyCtl->shared->page_dirty[slotno] = true;
-
-	while (nextNotify != NULL)
-	{
-		Notification *n = (Notification *) lfirst(nextNotify);
-
-		/* Construct a valid queue entry in local variable qe */
-		asyncQueueNotificationToEntry(n, &qe);
-
-		offset = QUEUE_POS_OFFSET(queue_head);
-
-		/* Check whether the entry really fits on the current page */
-		if (offset + qe.length <= QUEUE_PAGESIZE)
-		{
-			/* OK, so advance nextNotify past this item */
-			nextNotify = lnext(pendingNotifies->events, nextNotify);
-		}
-		else
-		{
-			/*
-			 * Write a dummy entry to fill up the page. Actually readers will
-			 * only check dboid and since it won't match any reader's database
-			 * OID, they will ignore this entry and move on.
-			 */
-			qe.length = QUEUE_PAGESIZE - offset;
-			qe.dboid = InvalidOid;
-			qe.data[0] = '\0';	/* empty channel */
-			qe.data[1] = '\0';	/* empty payload */
-		}
-
-		/* Now copy qe into the shared buffer page */
-		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
-			   &qe,
-			   qe.length);
-
-		/* Advance queue_head appropriately, and detect if page is full */
-		if (asyncQueueAdvance(&(queue_head), qe.length))
-		{
-			LWLock	   *lock;
-
-			pageno = QUEUE_POS_PAGE(queue_head);
-			lock = SimpleLruGetBankLock(NotifyCtl, pageno);
-			if (lock != prevlock)
-			{
-				LWLockRelease(prevlock);
-				LWLockAcquire(lock, LW_EXCLUSIVE);
-				prevlock = lock;
-			}
-
-			/*
-			 * Page is full, so we're done here, but first fill the next page
-			 * with zeroes.  The reason to do this is to ensure that slru.c's
-			 * idea of the head page is always the same as ours, which avoids
-			 * boundary problems in SimpleLruTruncate.  The test in
-			 * asyncQueueIsFull() ensured that there is room to create this
-			 * page without overrunning the queue.
-			 */
-			slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
-
-			/*
-			 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
-			 * set flag to remember that we should try to advance the tail
-			 * pointer (we don't want to actually do that right here).
-			 */
-			if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
-				tryAdvanceTail = true;
-
-			/* And exit the loop */
-			break;
-		}
-	}
-
-	/* Success, so update the global QUEUE_HEAD */
-	QUEUE_HEAD = queue_head;
-
-	LWLockRelease(prevlock);
-
-	return nextNotify;
-}
-
 /*
  * SQL function to return the fraction of the notification queue currently
  * occupied.
@@ -1515,52 +1365,6 @@ asyncQueueUsage(void)
 	return (double) occupied / (double) max_notify_queue_pages;
 }
 
-/*
- * Check whether the queue is at least half full, and emit a warning if so.
- *
- * This is unlikely given the size of the queue, but possible.
- * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
- *
- * Caller must hold exclusive NotifyQueueLock.
- */
-static void
-asyncQueueFillWarning(void)
-{
-	double		fillDegree;
-	TimestampTz t;
-
-	fillDegree = asyncQueueUsage();
-	if (fillDegree < 0.5)
-		return;
-
-	t = GetCurrentTimestamp();
-
-	if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
-								   t, QUEUE_FULL_WARN_INTERVAL))
-	{
-		QueuePosition min = QUEUE_HEAD;
-		int32		minPid = InvalidPid;
-
-		for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
-		{
-			Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
-			min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
-			if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
-				minPid = QUEUE_BACKEND_PID(i);
-		}
-
-		ereport(WARNING,
-				(errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
-				 (minPid != InvalidPid ?
-				  errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
-				  : 0),
-				 (minPid != InvalidPid ?
-				  errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
-				  : 0)));
-
-		asyncQueueControl->lastQueueFillWarn = t;
-	}
-}
 
 /*
  * Send signals to listening backends.
@@ -1577,7 +1381,7 @@ asyncQueueFillWarning(void)
  * This is called during CommitTransaction(), so it's important for it
  * to have very low probability of failure.
  */
-static void
+void
 SignalBackends(void)
 {
 	int32	   *pids;
@@ -1844,15 +1648,13 @@ ProcessNotifyInterrupt(bool flush)
 
 /*
  * Read all pending notifications from the queue, and deliver appropriate
- * ones to my frontend.  Stop when we reach queue head or an uncommitted
- * notification.
+ * ones to my frontend.  Stop when we reach queue head.
  */
 static void
 asyncQueueReadAllNotifications(void)
 {
 	volatile QueuePosition pos;
 	QueuePosition head;
-	Snapshot	snapshot;
 
 	/* page_buffer must be adequately aligned, so use a union */
 	union
@@ -1875,46 +1677,6 @@ asyncQueueReadAllNotifications(void)
 		return;
 	}
 
-	/*----------
-	 * Get snapshot we'll use to decide which xacts are still in progress.
-	 * This is trickier than it might seem, because of race conditions.
-	 * Consider the following example:
-	 *
-	 * Backend 1:					 Backend 2:
-	 *
-	 * transaction starts
-	 * UPDATE foo SET ...;
-	 * NOTIFY foo;
-	 * commit starts
-	 * queue the notify message
-	 *								 transaction starts
-	 *								 LISTEN foo;  -- first LISTEN in session
-	 *								 SELECT * FROM foo WHERE ...;
-	 * commit to clog
-	 *								 commit starts
-	 *								 add backend 2 to array of listeners
-	 *								 advance to queue head (this code)
-	 *								 commit to clog
-	 *
-	 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
-	 * wasn't committed yet.  Ideally we'd ensure that client 2 would
-	 * eventually get transaction 1's notify message, but there's no way
-	 * to do that; until we're in the listener array, there's no guarantee
-	 * that the notify message doesn't get removed from the queue.
-	 *
-	 * Therefore the coding technique transaction 2 is using is unsafe:
-	 * applications must commit a LISTEN before inspecting database state,
-	 * if they want to ensure they will see notifications about subsequent
-	 * changes to that state.
-	 *
-	 * What we do guarantee is that we'll see all notifications from
-	 * transactions committing after the snapshot we take here.
-	 * Exec_ListenPreCommit has already added us to the listener array,
-	 * so no not-yet-committed messages can be removed from the queue
-	 * before we see them.
-	 *----------
-	 */
-	snapshot = RegisterSnapshot(GetLatestSnapshot());
 
 	/*
 	 * It is possible that we fail while trying to send a message to our
@@ -1979,8 +1741,7 @@ asyncQueueReadAllNotifications(void)
 			 * while sending the notifications to the frontend.
 			 */
 			reachedStop = asyncQueueProcessPageEntries(&pos, head,
-													   page_buffer.buf,
-													   snapshot);
+													   page_buffer.buf);
 		} while (!reachedStop);
 	}
 	PG_FINALLY();
@@ -1992,8 +1753,6 @@ asyncQueueReadAllNotifications(void)
 	}
 	PG_END_TRY();
 
-	/* Done with snapshot */
-	UnregisterSnapshot(snapshot);
 }
 
 /*
@@ -2004,19 +1763,17 @@ asyncQueueReadAllNotifications(void)
  * memory.  (We could access the page right in shared memory, but that
  * would imply holding the SLRU bank lock throughout this routine.)
  *
- * We stop if we reach the "stop" position, or reach a notification from an
- * uncommitted transaction, or reach the end of the page.
+ * We stop if we reach the "stop" position or reach the end of the page.
  *
- * The function returns true once we have reached the stop position or an
- * uncommitted notification, and false if we have finished with the page.
+ * The function returns true once we have reached the stop position, and false
+ * if we have finished with the page.
  * In other words: once it returns true there is no need to look further.
  * The QueuePosition *current is advanced past all processed messages.
  */
 static bool
 asyncQueueProcessPageEntries(volatile QueuePosition *current,
 							 QueuePosition stop,
-							 char *page_buffer,
-							 Snapshot snapshot)
+							 char *page_buffer)
 {
 	bool		reachedStop = false;
 	bool		reachedEndOfPage;
@@ -2032,60 +1789,24 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 		qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
 
 		/*
-		 * Advance *current over this message, possibly to the next page. As
-		 * noted in the comments for asyncQueueReadAllNotifications, we must
-		 * do this before possibly failing while processing the message.
+		 * Advance *current over this compact entry. The new compact entries are
+		 * fixed-size, making this much simpler than the old variable-length entries.
 		 */
-		reachedEndOfPage = asyncQueueAdvance(current, qe->length);
+		reachedEndOfPage = asyncQueueAdvance(current, sizeof(AsyncQueueEntry));
 
 		/* Ignore messages destined for other databases */
-		if (qe->dboid == MyDatabaseId)
+		if (qe->dbid == MyDatabaseId)
 		{
-			if (XidInMVCCSnapshot(qe->xid, snapshot))
-			{
-				/*
-				 * The source transaction is still in progress, so we can't
-				 * process this message yet.  Break out of the loop, but first
-				 * back up *current so we will reprocess the message next
-				 * time.  (Note: it is unlikely but not impossible for
-				 * TransactionIdDidCommit to fail, so we can't really avoid
-				 * this advance-then-back-up behavior when dealing with an
-				 * uncommitted message.)
-				 *
-				 * Note that we must test XidInMVCCSnapshot before we test
-				 * TransactionIdDidCommit, else we might return a message from
-				 * a transaction that is not yet visible to snapshots; compare
-				 * the comments at the head of heapam_visibility.c.
-				 *
-				 * Also, while our own xact won't be listed in the snapshot,
-				 * we need not check for TransactionIdIsCurrentTransactionId
-				 * because our transaction cannot (yet) have queued any
-				 * messages.
-				 */
-				*current = thisentry;
-				reachedStop = true;
-				break;
-			}
-			else if (TransactionIdDidCommit(qe->xid))
-			{
-				/* qe->data is the null-terminated channel name */
-				char	   *channel = qe->data;
-
-				if (IsListeningOn(channel))
-				{
-					/* payload follows channel name */
-					char	   *payload = qe->data + strlen(channel) + 1;
-
-					NotifyMyFrontEnd(channel, payload, qe->srcPid);
-				}
-			}
-			else
-			{
-				/*
-				 * The source transaction aborted or crashed, so we just
-				 * ignore its notifications.
-				 */
-			}
+			/*
+			 * Since queue entries are written atomically with commit records
+			 * while holding NotifyQueueLock exclusively, all entries in the queue
+			 * are guaranteed to be from committed transactions.
+			 *
+			 * Step 5: Read notification data from WAL using stored LSN.
+			 * The compact entry only contains metadata; actual notification
+			 * content is retrieved from WAL on demand.
+			 */
+			processNotificationFromWAL(qe->notify_lsn);
 		}
 
 		/* Loop back if we're not at end of page */
@@ -2097,6 +1818,220 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 	return reachedStop;
 }
 
+/*
+ * processNotificationFromWAL
+ *
+ * Fetch notification data from WAL using the stored LSN and process
+ * the individual notifications for delivery to listening frontend.
+ * This implements Step 5 of the new WAL-based notification system.
+ */
+static void
+processNotificationFromWAL(XLogRecPtr notify_lsn)
+{
+	XLogReaderState *xlogreader;
+	DecodedXLogRecord *record;
+	xl_async_notify_data *xlrec;
+	char	   *data;
+	char	   *ptr;
+	uint32_t	remaining;
+	int			srcPid;
+	char	   *errormsg;
+
+	/*
+	 * Create XLog reader to fetch the notification data record.
+	 * We use a temporary reader since this is called during normal
+	 * notification processing, not during recovery.
+	 */
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									XL_ROUTINE(.page_read = &read_local_xlog_page,
+											   .segment_open = &wal_segment_open,
+											   .segment_close = &wal_segment_close),
+									NULL);
+	if (!xlogreader)
+		elog(ERROR, "failed to allocate XLog reader for notification data");
+
+    /* Start reading exactly at the NOTIFY_DATA record begin LSN */
+    XLogBeginRead(xlogreader, notify_lsn);
+
+    /* Read the NOTIFY_DATA record */
+    record = (DecodedXLogRecord *) XLogReadRecord(xlogreader, &errormsg);
+    if (record == NULL)
+        elog(ERROR, "failed to read notification data from WAL at %X/%X: %s",
+             LSN_FORMAT_ARGS(notify_lsn), errormsg ? errormsg : "no error message");
+
+    /* Verify this is the expected record type */
+    if (XLogRecGetRmid(xlogreader) != RM_ASYNC_ID ||
+        (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK) != XLOG_ASYNC_NOTIFY_DATA)
+        elog(ERROR, "expected NOTIFY_DATA at %X/%X, found rmgr %u info %u",
+             LSN_FORMAT_ARGS(notify_lsn),
+             XLogRecGetRmid(xlogreader),
+             (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK));
+
+	/* Extract the notification data from the WAL record */
+	xlrec = (xl_async_notify_data *) XLogRecGetData(xlogreader);
+	srcPid = xlrec->srcPid;
+	data = (char *) xlrec + SizeOfAsyncNotifyData;
+	ptr = data;
+	remaining = XLogRecGetDataLen(xlogreader) - SizeOfAsyncNotifyData;
+
+	/*
+	 * Process each notification in the serialized data.
+	 * The format is: 2-byte channel_len, 2-byte payload_len,
+	 * null-terminated channel, null-terminated payload.
+	 */
+	for (uint32_t i = 0; i < xlrec->nnotifications && remaining >= 4; i++)
+	{
+		uint16		channel_len;
+		uint16		payload_len;
+		char	   *channel;
+		char	   *payload;
+
+		/* Read lengths */
+		memcpy(&channel_len, ptr, 2);
+		ptr += 2;
+		memcpy(&payload_len, ptr, 2);
+		ptr += 2;
+		remaining -= 4;
+
+		/* Verify we have enough data */
+		if (remaining < channel_len + 1 + payload_len + 1)
+			break;
+
+		/* Extract channel and payload strings */
+		channel = ptr;
+		ptr += channel_len + 1;
+		payload = ptr;
+		ptr += payload_len + 1;
+		remaining -= (channel_len + 1 + payload_len + 1);
+
+		/* Deliver notification if we're listening on this channel */
+		if (IsListeningOn(channel))
+			NotifyMyFrontEnd(channel, payload, srcPid);
+	}
+
+	/* Clean up */
+	XLogReaderFree(xlogreader);
+}
+
+
+/*
+ * asyncQueueAddCompactEntry
+ *
+ * Add a compact entry to the notification SLRU queue containing only
+ * metadata (dbid, xid, notify_lsn) that points to the full notification 
+ * data in WAL. This is much more efficient than the old approach of
+ * storing complete notification content in the SLRU queue.
+ */
+void
+asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn)
+{
+	AsyncQueueEntry entry;
+	QueuePosition queue_head;
+	int64		pageno;
+	int			offset;
+	int			slotno;
+	LWLock	   *banklock;
+
+	/*
+	 * Fill in the compact entry with just the metadata.
+	 * No payload data is stored here - it's all in WAL.
+	 */
+	entry.dbid = dbid;
+	entry.xid = xid;
+	entry.notify_lsn = notify_lsn;
+
+	/* Caller should already hold NotifyQueueLock in exclusive mode */
+	queue_head = QUEUE_HEAD;
+
+	/*
+	 * Get the current page. If this is the first write since postmaster
+	 * started, initialize the first page.
+	 */
+	pageno = QUEUE_POS_PAGE(queue_head);
+	banklock = SimpleLruGetBankLock(NotifyCtl, pageno);
+
+	LWLockAcquire(banklock, LW_EXCLUSIVE);
+
+	if (QUEUE_POS_IS_ZERO(queue_head))
+		slotno = SimpleLruZeroPage(NotifyCtl, pageno);
+	else
+		slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
+								   InvalidTransactionId);
+
+	/* Mark the page dirty before writing */
+	NotifyCtl->shared->page_dirty[slotno] = true;
+
+	offset = QUEUE_POS_OFFSET(queue_head);
+
+	/* Check if the compact entry fits on the current page */
+	if (offset + sizeof(AsyncQueueEntry) <= QUEUE_PAGESIZE)
+	{
+		/* Copy the compact entry to the shared buffer */
+		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
+			   &entry,
+			   sizeof(AsyncQueueEntry));
+
+		/* Advance queue head by the size of our compact entry */
+		if (asyncQueueAdvance(&queue_head, sizeof(AsyncQueueEntry)))
+		{
+			/*
+			 * Page became full. Initialize the next page to ensure SLRU
+			 * consistency (similar to what asyncQueueAddEntries does).
+			 */
+			LWLock	   *nextlock;
+
+			pageno = QUEUE_POS_PAGE(queue_head);
+			nextlock = SimpleLruGetBankLock(NotifyCtl, pageno);
+			if (nextlock != banklock)
+			{
+				LWLockRelease(banklock);
+				LWLockAcquire(nextlock, LW_EXCLUSIVE);
+			}
+			SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
+			if (nextlock != banklock)
+			{
+				LWLockRelease(nextlock);
+				LWLockAcquire(banklock, LW_EXCLUSIVE);
+			}
+
+			/* Set cleanup flag if appropriate */
+			if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
+				tryAdvanceTail = true;
+		}
+
+		/* Update the global queue head */
+		QUEUE_HEAD = queue_head;
+	}
+	else
+	{
+		/*
+		 * Entry doesn't fit on current page. This should be very rare with
+		 * our small compact entries, but handle it by padding the page and
+		 * writing to the next page.
+		 */
+		AsyncQueueEntry padding;
+
+		memset(&padding, 0, sizeof(padding));
+		padding.dbid = InvalidOid;  /* Mark as padding */
+
+		/* Fill the rest of the page with padding */
+		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
+			   &padding,
+			   QUEUE_PAGESIZE - offset);
+
+		/* Advance to next page */
+		asyncQueueAdvance(&queue_head, QUEUE_PAGESIZE - offset);
+
+		/* Recursively add the entry on the new page */
+		QUEUE_HEAD = queue_head;
+		LWLockRelease(banklock);
+		asyncQueueAddCompactEntry(dbid, xid, notify_lsn);
+		return;
+	}
+
+	LWLockRelease(banklock);
+}
+
 /*
  * Advance the shared queue tail variable to the minimum of all the
  * per-backend tail pointers.  Truncate pg_notify space if possible.
@@ -2395,3 +2330,62 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
 {
 	return check_slru_buffers("notify_buffers", newval);
 }
+
+/*
+ * Write a WAL record containing async notification data
+ *
+ * This logs notification data to WAL, allowing us to release locks earlier
+ * and maintain commit ordering through WAL's natural ordering guarantees.
+ */
+XLogRecPtr
+LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid,
+			   uint32 nnotifications, Size data_len, char *data)
+{
+	xl_async_notify_data xlrec;
+
+
+	xlrec.dbid = dboid;
+	xlrec.xid = xid;
+	xlrec.srcPid = srcPid;
+	xlrec.nnotifications = nnotifications;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, SizeOfAsyncNotifyData);
+    XLogRegisterData(data, data_len);
+
+    (void) XLogInsert(RM_ASYNC_ID, XLOG_ASYNC_NOTIFY_DATA);
+
+    /* Return the begin LSN of the record we just inserted. */
+    return ProcLastRecPtr;
+}
+
+
+
+
+/*
+ * Redo function for async notification WAL records
+ *
+ * During recovery, we need to replay notification records. For now,
+ * we'll add them to the traditional notification queue. In a complete
+ * implementation, replaying backends would read directly from WAL.
+ */
+void
+async_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		case XLOG_ASYNC_NOTIFY_DATA:
+			/* 
+			 * For notification data records, we don't need to do anything
+			 * during recovery since listeners will read directly from WAL.
+			 * The data is already durably stored in the WAL record itself.
+			 */
+			break;
+
+
+		default:
+			elog(PANIC, "async_redo: unknown op code %u", info);
+	}
+}
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 8f4b282c6b1..b35b007e51c 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -19,6 +19,7 @@
 #include "access/xlogreader.h"
 #include "catalog/pg_control.h"
 #include "catalog/storage_xlog.h"
+#include "commands/async.h"
 #include "commands/dbcommands_xlog.h"
 #include "fe_utils/archive.h"
 #include "filemap.h"
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index fac509ed134..03e73ae33c9 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -8,6 +8,7 @@
 #define FRONTEND 1
 #include "postgres.h"
 
+#include "access/async_xlog.h"
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
@@ -23,6 +24,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/storage_xlog.h"
+#include "commands/async.h"
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 8e7fc9db877..58293e05165 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
 PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+PG_RMGR(RM_ASYNC_ID, "Async", async_redo, async_desc, async_identify, NULL, NULL, NULL, NULL)
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index b2bc10ee041..aa1e2733976 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -194,6 +194,7 @@ typedef struct SavedTransactionCharacteristics
 #define XACT_XINFO_HAS_AE_LOCKS			(1U << 6)
 #define XACT_XINFO_HAS_GID				(1U << 7)
 #define XACT_XINFO_HAS_DROPPED_STATS	(1U << 8)
+#define XACT_XINFO_HAS_NOTIFY			(1U << 9)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -317,6 +318,11 @@ typedef struct xl_xact_origin
 	TimestampTz origin_timestamp;
 } xl_xact_origin;
 
+typedef struct xl_xact_notify
+{
+	XLogRecPtr	notify_lsn;		/* LSN of notification data in WAL */
+} xl_xact_notify;
+
 typedef struct xl_xact_commit
 {
 	TimestampTz xact_time;		/* time of commit */
@@ -330,6 +336,7 @@ typedef struct xl_xact_commit
 	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
 	/* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */
 	/* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */
+	/* xl_xact_notify follows if XINFO_HAS_NOTIFY, stored unaligned! */
 } xl_xact_commit;
 #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
 
@@ -403,6 +410,8 @@ typedef struct xl_xact_parsed_commit
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
+
+	XLogRecPtr	notify_lsn;		/* LSN of notification data */
 } xl_xact_parsed_commit;
 
 typedef xl_xact_parsed_commit xl_xact_parsed_prepare;
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..7e9f10cb84b 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -14,11 +14,25 @@
 #define ASYNC_H
 
 #include <signal.h>
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
 
 extern PGDLLIMPORT bool Trace_notify;
 extern PGDLLIMPORT int max_notify_queue_pages;
 extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending;
 
+/*
+ * Compact SLRU queue entry - stores metadata pointing to WAL data
+ */
+typedef struct AsyncQueueEntry
+{
+	Oid			dbid;			/* database ID for quick filtering */
+	TransactionId	xid;			/* transaction ID */
+	XLogRecPtr	notify_lsn;		/* LSN of notification data in WAL */
+} AsyncQueueEntry;
+
+#define ASYNC_QUEUE_ENTRY_SIZE	sizeof(AsyncQueueEntry)
+
 extern Size AsyncShmemSize(void);
 extern void AsyncShmemInit(void);
 
@@ -46,4 +60,15 @@ extern void HandleNotifyInterrupt(void);
 /* process interrupts */
 extern void ProcessNotifyInterrupt(bool flush);
 
+/* WAL-based notification functions */
+extern XLogRecPtr LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid,
+									 uint32 nnotifications, Size data_len, char *data);
+extern void async_redo(XLogReaderState *record);
+extern void async_desc(StringInfo buf, XLogReaderState *record);
+extern const char *async_identify(uint8 info);
+
+/* notification queue functions */
+extern void asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn);
+extern void SignalBackends(void);
+
 #endif							/* ASYNC_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index c6f5ebceefd..71459fe5529 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -301,6 +301,9 @@ struct PGPROC
 
 	uint32		wait_event_info;	/* proc's wait information */
 
+	/* Support for async notifications */
+	XLogRecPtr	notifyCommitLsn;	/* LSN of notification data for current xact */
+
 	/* Support for group transaction status update. */
 	bool		clogGroupMember;	/* true, if member of clog group */
 	pg_atomic_uint32 clogGroupNext; /* next clog group member */
-- 
2.39.3 (Apple Git-145)

#18Matheus Alcantara
matheusssilv97@gmail.com
In reply to: Rishu Bagga (#17)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Mon Sep 8, 2025 at 9:08 PM -03, Rishu Bagga wrote:

Hi Joel, Arseniy, Matheus, thanks for taking a look. I’ve attached an
updated patch and rebased on the latest commits that fixes the
correctness issues.

I think that your latest patch miss to include of asyncdesc.c file. I'm
getting a compile error:

src/backend/access/rmgrdesc/meson.build:4:20: ERROR: File asyncdesc.c does not exist.

--
Matheus Alcantara

#19Rishu Bagga
rishu.postgres@gmail.com
In reply to: Matheus Alcantara (#18)
1 attachment(s)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Tue, Sep 9, 2025 at 3:34 PM Matheus Alcantara
<matheusssilv97@gmail.com> wrote:

On Mon Sep 8, 2025 at 9:08 PM -03, Rishu Bagga wrote:

Hi Joel, Arseniy, Matheus, thanks for taking a look. I’ve attached an
updated patch and rebased on the latest commits that fixes the
correctness issues.

I think that your latest patch miss to include of asyncdesc.c file. I'm
getting a compile error:

src/backend/access/rmgrdesc/meson.build:4:20: ERROR: File asyncdesc.c does not exist.

--
Matheus Alcantara

Sorry about that, added the asyncdesc.c file in the attached patch.

Attachments:

notify-through-wal-v3.patchapplication/octet-stream; name=notify-through-wal-v3.patchDownload
From 02551a3981730c232af36375dca55cc42e8e1165 Mon Sep 17 00:00:00 2001
From: rbagga <bangalorian@gmail.com>
Date: Sun, 7 Sep 2025 16:55:57 -0700
Subject: [PATCH] Implement WAL-based async notifications for improved
 throughput

- Added WAL logging for async notifications to improve scalability
- Implemented async resource manager for WAL-based notification handling
- Added new async descriptor files for pg_waldump support
- Updated makefiles and build configuration for new components
---
 src/backend/access/rmgrdesc/Makefile    |   1 +
 src/backend/access/rmgrdesc/asyncdesc.c |  47 ++
 src/backend/access/rmgrdesc/meson.build |   1 +
 src/backend/access/rmgrdesc/xactdesc.c  |  13 +
 src/backend/access/transam/rmgr.c       |   1 +
 src/backend/access/transam/xact.c       |  48 +-
 src/backend/commands/async.c            | 800 ++++++++++++------------
 src/bin/pg_rewind/parsexlog.c           |   1 +
 src/bin/pg_waldump/rmgrdesc.c           |   2 +
 src/include/access/rmgrlist.h           |   1 +
 src/include/access/xact.h               |   9 +
 src/include/commands/async.h            |  25 +
 src/include/storage/proc.h              |   3 +
 13 files changed, 548 insertions(+), 404 deletions(-)
 create mode 100644 src/backend/access/rmgrdesc/asyncdesc.c

diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index cd95eec37f1..6e6e75b12bd 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -9,6 +9,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = \
+	asyncdesc.o \
 	brindesc.o \
 	clogdesc.o \
 	committsdesc.o \
diff --git a/src/backend/access/rmgrdesc/asyncdesc.c b/src/backend/access/rmgrdesc/asyncdesc.c
new file mode 100644
index 00000000000..7f322849ff1
--- /dev/null
+++ b/src/backend/access/rmgrdesc/asyncdesc.c
@@ -0,0 +1,47 @@
+/*-------------------------------------------------------------------------
+ *
+ * asyncdesc.c
+ *	  rmgr descriptor routines for access/async.c
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/asyncdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/async_xlog.h"
+
+void
+async_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == XLOG_ASYNC_NOTIFY_DATA)
+	{
+		xl_async_notify_data *xlrec = (xl_async_notify_data *) rec;
+
+		appendStringInfo(buf, "notify data: db %u xid %u pid %d notifications %u",
+						 xlrec->dbid, xlrec->xid, xlrec->srcPid, xlrec->nnotifications);
+	}
+}
+
+const char *
+async_identify(uint8 info)
+{
+	const char *id = NULL;
+
+	switch (info & ~XLR_INFO_MASK)
+	{
+		case XLOG_ASYNC_NOTIFY_DATA:
+			id = "NOTIFY_DATA";
+			break;
+	}
+
+	return id;
+}
\ No newline at end of file
diff --git a/src/backend/access/rmgrdesc/meson.build b/src/backend/access/rmgrdesc/meson.build
index 96c98e800c2..38bef2e87f6 100644
--- a/src/backend/access/rmgrdesc/meson.build
+++ b/src/backend/access/rmgrdesc/meson.build
@@ -2,6 +2,7 @@
 
 # used by frontend programs like pg_waldump
 rmgr_desc_sources = files(
+  'asyncdesc.c',
   'brindesc.c',
   'clogdesc.c',
   'committsdesc.c',
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index f0f696855b9..4f32f7fc591 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -135,6 +135,19 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 
 		data += sizeof(xl_xact_origin);
 	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_NOTIFY)
+	{
+		xl_xact_notify xl_notify;
+
+		/* no alignment is guaranteed, so copy onto stack */
+		memcpy(&xl_notify, data, sizeof(xl_notify));
+
+		parsed->notify_lsn = xl_notify.notify_lsn;
+
+		data += sizeof(xl_xact_notify);
+	}
+
 }
 
 void
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 1b7499726eb..f8c25e6597a 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -19,6 +19,7 @@
 
 /* includes needed for "access/rmgrlist.h" */
 /* IWYU pragma: begin_keep */
+#include "access/async_xlog.h"
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b46e7e9c2a6..33b16ff4746 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5841,10 +5841,24 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_invals xl_invals;
 	xl_xact_twophase xl_twophase;
 	xl_xact_origin xl_origin;
+	xl_xact_notify xl_notify;
 	uint8		info;
+	XLogRecPtr	result;
 
 	Assert(CritSectionCount > 0);
 
+	/*
+	 * Handle notification commit ordering: if this transaction has pending
+	 * notifications, we must write the queue entry just before the commit
+	 * record while holding NotifyQueueLock to ensure proper ordering.
+	 */
+	if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		TransactionId xid = GetCurrentTransactionId();
+		LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+		asyncQueueAddCompactEntry(MyDatabaseId, xid, MyProc->notifyCommitLsn);
+	}
+
 	xl_xinfo.xinfo = 0;
 
 	/* decide between a plain and 2pc commit */
@@ -5926,9 +5940,17 @@ XactLogCommitRecord(TimestampTz commit_time,
 		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
+	/* include notification information if present */
+	if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_NOTIFY;
+		xl_notify.notify_lsn = MyProc->notifyCommitLsn;
+	}
+
 	if (xl_xinfo.xinfo != 0)
 		info |= XLOG_XACT_HAS_INFO;
 
+
 	/* Then include all the collected data into the commit record. */
 
 	XLogBeginInsert();
@@ -5982,10 +6004,28 @@ XactLogCommitRecord(TimestampTz commit_time,
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData(&xl_origin, sizeof(xl_xact_origin));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_NOTIFY)
+		XLogRegisterData(&xl_notify, sizeof(xl_xact_notify));
+
 	/* we allow filtering by xacts */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
-	return XLogInsert(RM_XACT_ID, info);
+	/* Insert the commit record */
+	result = XLogInsert(RM_XACT_ID, info);
+
+	/*
+	 * Release NotifyQueueLock if we held it. The queue entry is now
+	 * associated with a committed transaction, so readers can process it.
+	 */
+	if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		LWLockRelease(NotifyQueueLock);
+		
+		/* Signal listening backends to check for new notifications */
+		SignalBackends();
+	}
+
+	return result;
 }
 
 /*
@@ -6227,6 +6267,12 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 						   false /* backward */ , false /* WAL */ );
 	}
 
+	/* Add notification queue entry if this commit has notifications */
+	if (parsed->xinfo & XACT_XINFO_HAS_NOTIFY)
+	{
+		asyncQueueAddCompactEntry(parsed->dbId, xid, parsed->notify_lsn);
+	}
+
 	/* Make sure files supposed to be dropped are dropped */
 	if (parsed->nrels > 0)
 	{
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..57fa732e9b8 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -133,6 +133,12 @@
 #include "access/slru.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xloginsert.h"
+#include "access/xlogreader.h"
+#include "access/xlogutils.h"
+#include "access/xlogrecovery.h"
+#include "access/xlog_internal.h"
 #include "catalog/pg_database.h"
 #include "commands/async.h"
 #include "common/hashfn.h"
@@ -151,6 +157,29 @@
 #include "utils/snapmgr.h"
 #include "utils/timestamp.h"
 
+/* Missing definitions for WAL-based notification system */
+#define AsyncQueueEntryEmptySize ASYNC_QUEUE_ENTRY_SIZE
+#define SLRU_PAGE_SIZE BLCKSZ
+#define AsyncCtl NotifyCtl
+
+/* WAL record types */
+#define XLOG_ASYNC_NOTIFY_DATA	0x00
+
+/*
+ * WAL record for notification data (written in PreCommit_Notify)
+ */
+typedef struct xl_async_notify_data
+{
+	Oid			dbid;			/* database ID */
+	TransactionId xid;			/* transaction ID */
+	int32		srcPid;			/* source backend PID */
+	uint32		nnotifications;	/* number of notifications */
+	/* followed by serialized notification data */
+} xl_async_notify_data;
+
+#define SizeOfAsyncNotifyData	(offsetof(xl_async_notify_data, nnotifications) + sizeof(uint32))
+
+
 
 /*
  * Maximum size of a NOTIFY payload, including terminating NULL.  This
@@ -163,30 +192,13 @@
 #define NOTIFY_PAYLOAD_MAX_LENGTH	(BLCKSZ - NAMEDATALEN - 128)
 
 /*
- * Struct representing an entry in the global notify queue
- *
- * This struct declaration has the maximal length, but in a real queue entry
- * the data area is only big enough for the actual channel and payload strings
- * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
- * entry size, if both channel and payload strings are empty (but note it
- * doesn't include alignment padding).
- *
- * The "length" field should always be rounded up to the next QUEUEALIGN
- * multiple so that all fields are properly aligned.
+ * NOTE: The AsyncQueueEntry structure is now defined in commands/async.h
+ * as a compact metadata-only structure for the new WAL-based notification system.
+ * The old variable-length structure with full notification content is no longer used.
  */
-typedef struct AsyncQueueEntry
-{
-	int			length;			/* total allocated length of entry */
-	Oid			dboid;			/* sender's database OID */
-	TransactionId xid;			/* sender's XID */
-	int32		srcPid;			/* sender's PID */
-	char		data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
-} AsyncQueueEntry;
-
-/* Currently, no field of AsyncQueueEntry requires more than int alignment */
-#define QUEUEALIGN(len)		INTALIGN(len)
 
-#define AsyncQueueEntryEmptySize	(offsetof(AsyncQueueEntry, data) + 2)
+/* Queue alignment is still needed for SLRU page management */
+#define QUEUEALIGN(len)		INTALIGN(len)
 
 /*
  * Struct describing a queue position, and assorted macros for working with it
@@ -438,18 +450,13 @@ static void Exec_UnlistenCommit(const char *channel);
 static void Exec_UnlistenAllCommit(void);
 static bool IsListeningOn(const char *channel);
 static void asyncQueueUnregister(void);
-static bool asyncQueueIsFull(void);
 static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
-static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
-static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
 static double asyncQueueUsage(void);
-static void asyncQueueFillWarning(void);
-static void SignalBackends(void);
+void SignalBackends(void);
 static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 										 QueuePosition stop,
-										 char *page_buffer,
-										 Snapshot snapshot);
+										 char *page_buffer);
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(bool flush);
 static bool AsyncExistsPendingNotify(Notification *n);
@@ -457,6 +464,7 @@ static void AddEventToPendingNotifies(Notification *n);
 static uint32 notification_hash(const void *key, Size keysize);
 static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
+static void processNotificationFromWAL(XLogRecPtr notify_lsn);
 
 /*
  * Compute the difference between two queue page numbers.
@@ -890,65 +898,75 @@ PreCommit_Notify(void)
 		}
 	}
 
-	/* Queue any pending notifies (must happen after the above) */
+	/* Write notification data to WAL if we have any */
 	if (pendingNotifies)
 	{
-		ListCell   *nextNotify;
+		TransactionId currentXid;
+		ListCell   *l;
+		size_t		total_size = 0;
+		uint32		nnotifications = 0;
+		char	   *notifications_data;
+		char	   *ptr;
+		XLogRecPtr	notify_lsn;
 
 		/*
 		 * Make sure that we have an XID assigned to the current transaction.
 		 * GetCurrentTransactionId is cheap if we already have an XID, but not
-		 * so cheap if we don't, and we'd prefer not to do that work while
-		 * holding NotifyQueueLock.
+		 * so cheap if we don't.
 		 */
-		(void) GetCurrentTransactionId();
+		currentXid = GetCurrentTransactionId();
 
 		/*
-		 * Serialize writers by acquiring a special lock that we hold till
-		 * after commit.  This ensures that queue entries appear in commit
-		 * order, and in particular that there are never uncommitted queue
-		 * entries ahead of committed ones, so an uncommitted transaction
-		 * can't block delivery of deliverable notifications.
-		 *
-		 * We use a heavyweight lock so that it'll automatically be released
-		 * after either commit or abort.  This also allows deadlocks to be
-		 * detected, though really a deadlock shouldn't be possible here.
-		 *
-		 * The lock is on "database 0", which is pretty ugly but it doesn't
-		 * seem worth inventing a special locktag category just for this.
-		 * (Historical note: before PG 9.0, a similar lock on "database 0" was
-		 * used by the flatfiles mechanism.)
+		 * Step 1: Write notification data to WAL.
+		 * This can be done in parallel with other transactions since we're
+		 * not holding any global locks yet.
 		 */
-		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
-						 AccessExclusiveLock);
+		
+		/* First pass: calculate total size needed for serialization */
+		foreach(l, pendingNotifies->events)
+		{
+			Notification *n = (Notification *) lfirst(l);
+			
+			/* Size: 2 bytes for channel_len + 2 bytes for payload_len + strings */
+			total_size += 4 + n->channel_len + 1 + n->payload_len + 1;
+			nnotifications++;
+		}
 
-		/* Now push the notifications into the queue */
-		nextNotify = list_head(pendingNotifies->events);
-		while (nextNotify != NULL)
+		/* Allocate buffer for notification data */
+		notifications_data = palloc(total_size);
+		ptr = notifications_data;
+
+		/* Second pass: serialize all notifications */
+		foreach(l, pendingNotifies->events)
 		{
-			/*
-			 * Add the pending notifications to the queue.  We acquire and
-			 * release NotifyQueueLock once per page, which might be overkill
-			 * but it does allow readers to get in while we're doing this.
-			 *
-			 * A full queue is very uncommon and should really not happen,
-			 * given that we have so much space available in the SLRU pages.
-			 * Nevertheless we need to deal with this possibility. Note that
-			 * when we get here we are in the process of committing our
-			 * transaction, but we have not yet committed to clog, so at this
-			 * point in time we can still roll the transaction back.
-			 */
-			LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-			asyncQueueFillWarning();
-			if (asyncQueueIsFull())
-				ereport(ERROR,
-						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-						 errmsg("too many notifications in the NOTIFY queue")));
-			nextNotify = asyncQueueAddEntries(nextNotify);
-			LWLockRelease(NotifyQueueLock);
+			Notification *n = (Notification *) lfirst(l);
+			char	   *channel = n->data;
+			char	   *payload = n->data + n->channel_len + 1;
+
+			/* Write channel length, payload length, channel, and payload */
+			memcpy(ptr, &n->channel_len, 2);
+			ptr += 2;
+			memcpy(ptr, &n->payload_len, 2);
+			ptr += 2;
+			memcpy(ptr, channel, n->channel_len + 1);
+			ptr += n->channel_len + 1;
+			memcpy(ptr, payload, n->payload_len + 1);
+			ptr += n->payload_len + 1;
 		}
 
-		/* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
+		/* Write notification data to WAL */
+		notify_lsn = LogAsyncNotifyData(MyDatabaseId, currentXid, MyProcPid,
+										nnotifications, total_size,
+										notifications_data);
+
+		pfree(notifications_data);
+
+		/*
+		 * Step 2: Store the notification LSN in PROC for use during commit.
+		 * The queue entry will be written just before the commit record
+		 * while holding the global notification commit lock to ensure proper ordering.
+		 */
+		MyProc->notifyCommitLsn = notify_lsn;
 	}
 }
 
@@ -1006,12 +1024,19 @@ AtCommit_Notify(void)
 		asyncQueueUnregister();
 
 	/*
-	 * Send signals to listening backends.  We need do this only if there are
-	 * pending notifies, which were previously added to the shared queue by
-	 * PreCommit_Notify().
+	 * If we had notifications, they were already written to the queue in
+	 * PreCommit_Notify. Now that we've committed, signal listening backends
+	 * to check the queue. The transaction visibility logic will now see our
+	 * XID as committed and process the notifications.
 	 */
-	if (pendingNotifies != NULL)
+	if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		/* Signal listening backends to check the queue */
 		SignalBackends();
+		
+		/* Clear the flag now that we're done */
+		MyProc->notifyCommitLsn = InvalidXLogRecPtr;
+	}
 
 	/*
 	 * If it's time to try to advance the global tail pointer, do that.
@@ -1263,21 +1288,6 @@ asyncQueueUnregister(void)
 	amRegisteredListener = false;
 }
 
-/*
- * Test whether there is room to insert more notification messages.
- *
- * Caller must hold at least shared NotifyQueueLock.
- */
-static bool
-asyncQueueIsFull(void)
-{
-	int64		headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
-	int64		tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
-	int64		occupied = headPage - tailPage;
-
-	return occupied >= max_notify_queue_pages;
-}
-
 /*
  * Advance the QueuePosition to the next entry, assuming that the current
  * entry is of length entryLength.  If we jump to a new page the function
@@ -1313,166 +1323,6 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
 	return pageJump;
 }
 
-/*
- * Fill the AsyncQueueEntry at *qe with an outbound notification message.
- */
-static void
-asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
-{
-	size_t		channellen = n->channel_len;
-	size_t		payloadlen = n->payload_len;
-	int			entryLength;
-
-	Assert(channellen < NAMEDATALEN);
-	Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
-
-	/* The terminators are already included in AsyncQueueEntryEmptySize */
-	entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
-	entryLength = QUEUEALIGN(entryLength);
-	qe->length = entryLength;
-	qe->dboid = MyDatabaseId;
-	qe->xid = GetCurrentTransactionId();
-	qe->srcPid = MyProcPid;
-	memcpy(qe->data, n->data, channellen + payloadlen + 2);
-}
-
-/*
- * Add pending notifications to the queue.
- *
- * We go page by page here, i.e. we stop once we have to go to a new page but
- * we will be called again and then fill that next page. If an entry does not
- * fit into the current page, we write a dummy entry with an InvalidOid as the
- * database OID in order to fill the page. So every page is always used up to
- * the last byte which simplifies reading the page later.
- *
- * We are passed the list cell (in pendingNotifies->events) containing the next
- * notification to write and return the first still-unwritten cell back.
- * Eventually we will return NULL indicating all is done.
- *
- * We are holding NotifyQueueLock already from the caller and grab
- * page specific SLRU bank lock locally in this function.
- */
-static ListCell *
-asyncQueueAddEntries(ListCell *nextNotify)
-{
-	AsyncQueueEntry qe;
-	QueuePosition queue_head;
-	int64		pageno;
-	int			offset;
-	int			slotno;
-	LWLock	   *prevlock;
-
-	/*
-	 * We work with a local copy of QUEUE_HEAD, which we write back to shared
-	 * memory upon exiting.  The reason for this is that if we have to advance
-	 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
-	 * instance), and we must not advance QUEUE_HEAD if it does.  (Otherwise,
-	 * subsequent insertions would try to put entries into a page that slru.c
-	 * thinks doesn't exist yet.)  So, use a local position variable.  Note
-	 * that if we do fail, any already-inserted queue entries are forgotten;
-	 * this is okay, since they'd be useless anyway after our transaction
-	 * rolls back.
-	 */
-	queue_head = QUEUE_HEAD;
-
-	/*
-	 * If this is the first write since the postmaster started, we need to
-	 * initialize the first page of the async SLRU.  Otherwise, the current
-	 * page should be initialized already, so just fetch it.
-	 */
-	pageno = QUEUE_POS_PAGE(queue_head);
-	prevlock = SimpleLruGetBankLock(NotifyCtl, pageno);
-
-	/* We hold both NotifyQueueLock and SLRU bank lock during this operation */
-	LWLockAcquire(prevlock, LW_EXCLUSIVE);
-
-	if (QUEUE_POS_IS_ZERO(queue_head))
-		slotno = SimpleLruZeroPage(NotifyCtl, pageno);
-	else
-		slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
-								   InvalidTransactionId);
-
-	/* Note we mark the page dirty before writing in it */
-	NotifyCtl->shared->page_dirty[slotno] = true;
-
-	while (nextNotify != NULL)
-	{
-		Notification *n = (Notification *) lfirst(nextNotify);
-
-		/* Construct a valid queue entry in local variable qe */
-		asyncQueueNotificationToEntry(n, &qe);
-
-		offset = QUEUE_POS_OFFSET(queue_head);
-
-		/* Check whether the entry really fits on the current page */
-		if (offset + qe.length <= QUEUE_PAGESIZE)
-		{
-			/* OK, so advance nextNotify past this item */
-			nextNotify = lnext(pendingNotifies->events, nextNotify);
-		}
-		else
-		{
-			/*
-			 * Write a dummy entry to fill up the page. Actually readers will
-			 * only check dboid and since it won't match any reader's database
-			 * OID, they will ignore this entry and move on.
-			 */
-			qe.length = QUEUE_PAGESIZE - offset;
-			qe.dboid = InvalidOid;
-			qe.data[0] = '\0';	/* empty channel */
-			qe.data[1] = '\0';	/* empty payload */
-		}
-
-		/* Now copy qe into the shared buffer page */
-		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
-			   &qe,
-			   qe.length);
-
-		/* Advance queue_head appropriately, and detect if page is full */
-		if (asyncQueueAdvance(&(queue_head), qe.length))
-		{
-			LWLock	   *lock;
-
-			pageno = QUEUE_POS_PAGE(queue_head);
-			lock = SimpleLruGetBankLock(NotifyCtl, pageno);
-			if (lock != prevlock)
-			{
-				LWLockRelease(prevlock);
-				LWLockAcquire(lock, LW_EXCLUSIVE);
-				prevlock = lock;
-			}
-
-			/*
-			 * Page is full, so we're done here, but first fill the next page
-			 * with zeroes.  The reason to do this is to ensure that slru.c's
-			 * idea of the head page is always the same as ours, which avoids
-			 * boundary problems in SimpleLruTruncate.  The test in
-			 * asyncQueueIsFull() ensured that there is room to create this
-			 * page without overrunning the queue.
-			 */
-			slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
-
-			/*
-			 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
-			 * set flag to remember that we should try to advance the tail
-			 * pointer (we don't want to actually do that right here).
-			 */
-			if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
-				tryAdvanceTail = true;
-
-			/* And exit the loop */
-			break;
-		}
-	}
-
-	/* Success, so update the global QUEUE_HEAD */
-	QUEUE_HEAD = queue_head;
-
-	LWLockRelease(prevlock);
-
-	return nextNotify;
-}
-
 /*
  * SQL function to return the fraction of the notification queue currently
  * occupied.
@@ -1515,52 +1365,6 @@ asyncQueueUsage(void)
 	return (double) occupied / (double) max_notify_queue_pages;
 }
 
-/*
- * Check whether the queue is at least half full, and emit a warning if so.
- *
- * This is unlikely given the size of the queue, but possible.
- * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
- *
- * Caller must hold exclusive NotifyQueueLock.
- */
-static void
-asyncQueueFillWarning(void)
-{
-	double		fillDegree;
-	TimestampTz t;
-
-	fillDegree = asyncQueueUsage();
-	if (fillDegree < 0.5)
-		return;
-
-	t = GetCurrentTimestamp();
-
-	if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
-								   t, QUEUE_FULL_WARN_INTERVAL))
-	{
-		QueuePosition min = QUEUE_HEAD;
-		int32		minPid = InvalidPid;
-
-		for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
-		{
-			Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
-			min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
-			if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
-				minPid = QUEUE_BACKEND_PID(i);
-		}
-
-		ereport(WARNING,
-				(errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
-				 (minPid != InvalidPid ?
-				  errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
-				  : 0),
-				 (minPid != InvalidPid ?
-				  errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
-				  : 0)));
-
-		asyncQueueControl->lastQueueFillWarn = t;
-	}
-}
 
 /*
  * Send signals to listening backends.
@@ -1577,7 +1381,7 @@ asyncQueueFillWarning(void)
  * This is called during CommitTransaction(), so it's important for it
  * to have very low probability of failure.
  */
-static void
+void
 SignalBackends(void)
 {
 	int32	   *pids;
@@ -1844,15 +1648,13 @@ ProcessNotifyInterrupt(bool flush)
 
 /*
  * Read all pending notifications from the queue, and deliver appropriate
- * ones to my frontend.  Stop when we reach queue head or an uncommitted
- * notification.
+ * ones to my frontend.  Stop when we reach queue head.
  */
 static void
 asyncQueueReadAllNotifications(void)
 {
 	volatile QueuePosition pos;
 	QueuePosition head;
-	Snapshot	snapshot;
 
 	/* page_buffer must be adequately aligned, so use a union */
 	union
@@ -1875,46 +1677,6 @@ asyncQueueReadAllNotifications(void)
 		return;
 	}
 
-	/*----------
-	 * Get snapshot we'll use to decide which xacts are still in progress.
-	 * This is trickier than it might seem, because of race conditions.
-	 * Consider the following example:
-	 *
-	 * Backend 1:					 Backend 2:
-	 *
-	 * transaction starts
-	 * UPDATE foo SET ...;
-	 * NOTIFY foo;
-	 * commit starts
-	 * queue the notify message
-	 *								 transaction starts
-	 *								 LISTEN foo;  -- first LISTEN in session
-	 *								 SELECT * FROM foo WHERE ...;
-	 * commit to clog
-	 *								 commit starts
-	 *								 add backend 2 to array of listeners
-	 *								 advance to queue head (this code)
-	 *								 commit to clog
-	 *
-	 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
-	 * wasn't committed yet.  Ideally we'd ensure that client 2 would
-	 * eventually get transaction 1's notify message, but there's no way
-	 * to do that; until we're in the listener array, there's no guarantee
-	 * that the notify message doesn't get removed from the queue.
-	 *
-	 * Therefore the coding technique transaction 2 is using is unsafe:
-	 * applications must commit a LISTEN before inspecting database state,
-	 * if they want to ensure they will see notifications about subsequent
-	 * changes to that state.
-	 *
-	 * What we do guarantee is that we'll see all notifications from
-	 * transactions committing after the snapshot we take here.
-	 * Exec_ListenPreCommit has already added us to the listener array,
-	 * so no not-yet-committed messages can be removed from the queue
-	 * before we see them.
-	 *----------
-	 */
-	snapshot = RegisterSnapshot(GetLatestSnapshot());
 
 	/*
 	 * It is possible that we fail while trying to send a message to our
@@ -1979,8 +1741,7 @@ asyncQueueReadAllNotifications(void)
 			 * while sending the notifications to the frontend.
 			 */
 			reachedStop = asyncQueueProcessPageEntries(&pos, head,
-													   page_buffer.buf,
-													   snapshot);
+													   page_buffer.buf);
 		} while (!reachedStop);
 	}
 	PG_FINALLY();
@@ -1992,8 +1753,6 @@ asyncQueueReadAllNotifications(void)
 	}
 	PG_END_TRY();
 
-	/* Done with snapshot */
-	UnregisterSnapshot(snapshot);
 }
 
 /*
@@ -2004,19 +1763,17 @@ asyncQueueReadAllNotifications(void)
  * memory.  (We could access the page right in shared memory, but that
  * would imply holding the SLRU bank lock throughout this routine.)
  *
- * We stop if we reach the "stop" position, or reach a notification from an
- * uncommitted transaction, or reach the end of the page.
+ * We stop if we reach the "stop" position or reach the end of the page.
  *
- * The function returns true once we have reached the stop position or an
- * uncommitted notification, and false if we have finished with the page.
+ * The function returns true once we have reached the stop position, and false
+ * if we have finished with the page.
  * In other words: once it returns true there is no need to look further.
  * The QueuePosition *current is advanced past all processed messages.
  */
 static bool
 asyncQueueProcessPageEntries(volatile QueuePosition *current,
 							 QueuePosition stop,
-							 char *page_buffer,
-							 Snapshot snapshot)
+							 char *page_buffer)
 {
 	bool		reachedStop = false;
 	bool		reachedEndOfPage;
@@ -2032,60 +1789,24 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 		qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
 
 		/*
-		 * Advance *current over this message, possibly to the next page. As
-		 * noted in the comments for asyncQueueReadAllNotifications, we must
-		 * do this before possibly failing while processing the message.
+		 * Advance *current over this compact entry. The new compact entries are
+		 * fixed-size, making this much simpler than the old variable-length entries.
 		 */
-		reachedEndOfPage = asyncQueueAdvance(current, qe->length);
+		reachedEndOfPage = asyncQueueAdvance(current, sizeof(AsyncQueueEntry));
 
 		/* Ignore messages destined for other databases */
-		if (qe->dboid == MyDatabaseId)
+		if (qe->dbid == MyDatabaseId)
 		{
-			if (XidInMVCCSnapshot(qe->xid, snapshot))
-			{
-				/*
-				 * The source transaction is still in progress, so we can't
-				 * process this message yet.  Break out of the loop, but first
-				 * back up *current so we will reprocess the message next
-				 * time.  (Note: it is unlikely but not impossible for
-				 * TransactionIdDidCommit to fail, so we can't really avoid
-				 * this advance-then-back-up behavior when dealing with an
-				 * uncommitted message.)
-				 *
-				 * Note that we must test XidInMVCCSnapshot before we test
-				 * TransactionIdDidCommit, else we might return a message from
-				 * a transaction that is not yet visible to snapshots; compare
-				 * the comments at the head of heapam_visibility.c.
-				 *
-				 * Also, while our own xact won't be listed in the snapshot,
-				 * we need not check for TransactionIdIsCurrentTransactionId
-				 * because our transaction cannot (yet) have queued any
-				 * messages.
-				 */
-				*current = thisentry;
-				reachedStop = true;
-				break;
-			}
-			else if (TransactionIdDidCommit(qe->xid))
-			{
-				/* qe->data is the null-terminated channel name */
-				char	   *channel = qe->data;
-
-				if (IsListeningOn(channel))
-				{
-					/* payload follows channel name */
-					char	   *payload = qe->data + strlen(channel) + 1;
-
-					NotifyMyFrontEnd(channel, payload, qe->srcPid);
-				}
-			}
-			else
-			{
-				/*
-				 * The source transaction aborted or crashed, so we just
-				 * ignore its notifications.
-				 */
-			}
+			/*
+			 * Since queue entries are written atomically with commit records
+			 * while holding NotifyQueueLock exclusively, all entries in the queue
+			 * are guaranteed to be from committed transactions.
+			 *
+			 * Step 5: Read notification data from WAL using stored LSN.
+			 * The compact entry only contains metadata; actual notification
+			 * content is retrieved from WAL on demand.
+			 */
+			processNotificationFromWAL(qe->notify_lsn);
 		}
 
 		/* Loop back if we're not at end of page */
@@ -2097,6 +1818,220 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 	return reachedStop;
 }
 
+/*
+ * processNotificationFromWAL
+ *
+ * Fetch notification data from WAL using the stored LSN and process
+ * the individual notifications for delivery to listening frontend.
+ * This implements Step 5 of the new WAL-based notification system.
+ */
+static void
+processNotificationFromWAL(XLogRecPtr notify_lsn)
+{
+	XLogReaderState *xlogreader;
+	DecodedXLogRecord *record;
+	xl_async_notify_data *xlrec;
+	char	   *data;
+	char	   *ptr;
+	uint32_t	remaining;
+	int			srcPid;
+	char	   *errormsg;
+
+	/*
+	 * Create XLog reader to fetch the notification data record.
+	 * We use a temporary reader since this is called during normal
+	 * notification processing, not during recovery.
+	 */
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									XL_ROUTINE(.page_read = &read_local_xlog_page,
+											   .segment_open = &wal_segment_open,
+											   .segment_close = &wal_segment_close),
+									NULL);
+	if (!xlogreader)
+		elog(ERROR, "failed to allocate XLog reader for notification data");
+
+    /* Start reading exactly at the NOTIFY_DATA record begin LSN */
+    XLogBeginRead(xlogreader, notify_lsn);
+
+    /* Read the NOTIFY_DATA record */
+    record = (DecodedXLogRecord *) XLogReadRecord(xlogreader, &errormsg);
+    if (record == NULL)
+        elog(ERROR, "failed to read notification data from WAL at %X/%X: %s",
+             LSN_FORMAT_ARGS(notify_lsn), errormsg ? errormsg : "no error message");
+
+    /* Verify this is the expected record type */
+    if (XLogRecGetRmid(xlogreader) != RM_ASYNC_ID ||
+        (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK) != XLOG_ASYNC_NOTIFY_DATA)
+        elog(ERROR, "expected NOTIFY_DATA at %X/%X, found rmgr %u info %u",
+             LSN_FORMAT_ARGS(notify_lsn),
+             XLogRecGetRmid(xlogreader),
+             (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK));
+
+	/* Extract the notification data from the WAL record */
+	xlrec = (xl_async_notify_data *) XLogRecGetData(xlogreader);
+	srcPid = xlrec->srcPid;
+	data = (char *) xlrec + SizeOfAsyncNotifyData;
+	ptr = data;
+	remaining = XLogRecGetDataLen(xlogreader) - SizeOfAsyncNotifyData;
+
+	/*
+	 * Process each notification in the serialized data.
+	 * The format is: 2-byte channel_len, 2-byte payload_len,
+	 * null-terminated channel, null-terminated payload.
+	 */
+	for (uint32_t i = 0; i < xlrec->nnotifications && remaining >= 4; i++)
+	{
+		uint16		channel_len;
+		uint16		payload_len;
+		char	   *channel;
+		char	   *payload;
+
+		/* Read lengths */
+		memcpy(&channel_len, ptr, 2);
+		ptr += 2;
+		memcpy(&payload_len, ptr, 2);
+		ptr += 2;
+		remaining -= 4;
+
+		/* Verify we have enough data */
+		if (remaining < channel_len + 1 + payload_len + 1)
+			break;
+
+		/* Extract channel and payload strings */
+		channel = ptr;
+		ptr += channel_len + 1;
+		payload = ptr;
+		ptr += payload_len + 1;
+		remaining -= (channel_len + 1 + payload_len + 1);
+
+		/* Deliver notification if we're listening on this channel */
+		if (IsListeningOn(channel))
+			NotifyMyFrontEnd(channel, payload, srcPid);
+	}
+
+	/* Clean up */
+	XLogReaderFree(xlogreader);
+}
+
+
+/*
+ * asyncQueueAddCompactEntry
+ *
+ * Add a compact entry to the notification SLRU queue containing only
+ * metadata (dbid, xid, notify_lsn) that points to the full notification 
+ * data in WAL. This is much more efficient than the old approach of
+ * storing complete notification content in the SLRU queue.
+ */
+void
+asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn)
+{
+	AsyncQueueEntry entry;
+	QueuePosition queue_head;
+	int64		pageno;
+	int			offset;
+	int			slotno;
+	LWLock	   *banklock;
+
+	/*
+	 * Fill in the compact entry with just the metadata.
+	 * No payload data is stored here - it's all in WAL.
+	 */
+	entry.dbid = dbid;
+	entry.xid = xid;
+	entry.notify_lsn = notify_lsn;
+
+	/* Caller should already hold NotifyQueueLock in exclusive mode */
+	queue_head = QUEUE_HEAD;
+
+	/*
+	 * Get the current page. If this is the first write since postmaster
+	 * started, initialize the first page.
+	 */
+	pageno = QUEUE_POS_PAGE(queue_head);
+	banklock = SimpleLruGetBankLock(NotifyCtl, pageno);
+
+	LWLockAcquire(banklock, LW_EXCLUSIVE);
+
+	if (QUEUE_POS_IS_ZERO(queue_head))
+		slotno = SimpleLruZeroPage(NotifyCtl, pageno);
+	else
+		slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
+								   InvalidTransactionId);
+
+	/* Mark the page dirty before writing */
+	NotifyCtl->shared->page_dirty[slotno] = true;
+
+	offset = QUEUE_POS_OFFSET(queue_head);
+
+	/* Check if the compact entry fits on the current page */
+	if (offset + sizeof(AsyncQueueEntry) <= QUEUE_PAGESIZE)
+	{
+		/* Copy the compact entry to the shared buffer */
+		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
+			   &entry,
+			   sizeof(AsyncQueueEntry));
+
+		/* Advance queue head by the size of our compact entry */
+		if (asyncQueueAdvance(&queue_head, sizeof(AsyncQueueEntry)))
+		{
+			/*
+			 * Page became full. Initialize the next page to ensure SLRU
+			 * consistency (similar to what asyncQueueAddEntries does).
+			 */
+			LWLock	   *nextlock;
+
+			pageno = QUEUE_POS_PAGE(queue_head);
+			nextlock = SimpleLruGetBankLock(NotifyCtl, pageno);
+			if (nextlock != banklock)
+			{
+				LWLockRelease(banklock);
+				LWLockAcquire(nextlock, LW_EXCLUSIVE);
+			}
+			SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
+			if (nextlock != banklock)
+			{
+				LWLockRelease(nextlock);
+				LWLockAcquire(banklock, LW_EXCLUSIVE);
+			}
+
+			/* Set cleanup flag if appropriate */
+			if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
+				tryAdvanceTail = true;
+		}
+
+		/* Update the global queue head */
+		QUEUE_HEAD = queue_head;
+	}
+	else
+	{
+		/*
+		 * Entry doesn't fit on current page. This should be very rare with
+		 * our small compact entries, but handle it by padding the page and
+		 * writing to the next page.
+		 */
+		AsyncQueueEntry padding;
+
+		memset(&padding, 0, sizeof(padding));
+		padding.dbid = InvalidOid;  /* Mark as padding */
+
+		/* Fill the rest of the page with padding */
+		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
+			   &padding,
+			   QUEUE_PAGESIZE - offset);
+
+		/* Advance to next page */
+		asyncQueueAdvance(&queue_head, QUEUE_PAGESIZE - offset);
+
+		/* Recursively add the entry on the new page */
+		QUEUE_HEAD = queue_head;
+		LWLockRelease(banklock);
+		asyncQueueAddCompactEntry(dbid, xid, notify_lsn);
+		return;
+	}
+
+	LWLockRelease(banklock);
+}
+
 /*
  * Advance the shared queue tail variable to the minimum of all the
  * per-backend tail pointers.  Truncate pg_notify space if possible.
@@ -2395,3 +2330,62 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
 {
 	return check_slru_buffers("notify_buffers", newval);
 }
+
+/*
+ * Write a WAL record containing async notification data
+ *
+ * This logs notification data to WAL, allowing us to release locks earlier
+ * and maintain commit ordering through WAL's natural ordering guarantees.
+ */
+XLogRecPtr
+LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid,
+			   uint32 nnotifications, Size data_len, char *data)
+{
+	xl_async_notify_data xlrec;
+
+
+	xlrec.dbid = dboid;
+	xlrec.xid = xid;
+	xlrec.srcPid = srcPid;
+	xlrec.nnotifications = nnotifications;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, SizeOfAsyncNotifyData);
+    XLogRegisterData(data, data_len);
+
+    (void) XLogInsert(RM_ASYNC_ID, XLOG_ASYNC_NOTIFY_DATA);
+
+    /* Return the begin LSN of the record we just inserted. */
+    return ProcLastRecPtr;
+}
+
+
+
+
+/*
+ * Redo function for async notification WAL records
+ *
+ * During recovery, we need to replay notification records. For now,
+ * we'll add them to the traditional notification queue. In a complete
+ * implementation, replaying backends would read directly from WAL.
+ */
+void
+async_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		case XLOG_ASYNC_NOTIFY_DATA:
+			/* 
+			 * For notification data records, we don't need to do anything
+			 * during recovery since listeners will read directly from WAL.
+			 * The data is already durably stored in the WAL record itself.
+			 */
+			break;
+
+
+		default:
+			elog(PANIC, "async_redo: unknown op code %u", info);
+	}
+}
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 8f4b282c6b1..b35b007e51c 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -19,6 +19,7 @@
 #include "access/xlogreader.h"
 #include "catalog/pg_control.h"
 #include "catalog/storage_xlog.h"
+#include "commands/async.h"
 #include "commands/dbcommands_xlog.h"
 #include "fe_utils/archive.h"
 #include "filemap.h"
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index fac509ed134..03e73ae33c9 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -8,6 +8,7 @@
 #define FRONTEND 1
 #include "postgres.h"
 
+#include "access/async_xlog.h"
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
@@ -23,6 +24,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/storage_xlog.h"
+#include "commands/async.h"
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 8e7fc9db877..58293e05165 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
 PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+PG_RMGR(RM_ASYNC_ID, "Async", async_redo, async_desc, async_identify, NULL, NULL, NULL, NULL)
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index b2bc10ee041..aa1e2733976 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -194,6 +194,7 @@ typedef struct SavedTransactionCharacteristics
 #define XACT_XINFO_HAS_AE_LOCKS			(1U << 6)
 #define XACT_XINFO_HAS_GID				(1U << 7)
 #define XACT_XINFO_HAS_DROPPED_STATS	(1U << 8)
+#define XACT_XINFO_HAS_NOTIFY			(1U << 9)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -317,6 +318,11 @@ typedef struct xl_xact_origin
 	TimestampTz origin_timestamp;
 } xl_xact_origin;
 
+typedef struct xl_xact_notify
+{
+	XLogRecPtr	notify_lsn;		/* LSN of notification data in WAL */
+} xl_xact_notify;
+
 typedef struct xl_xact_commit
 {
 	TimestampTz xact_time;		/* time of commit */
@@ -330,6 +336,7 @@ typedef struct xl_xact_commit
 	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
 	/* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */
 	/* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */
+	/* xl_xact_notify follows if XINFO_HAS_NOTIFY, stored unaligned! */
 } xl_xact_commit;
 #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
 
@@ -403,6 +410,8 @@ typedef struct xl_xact_parsed_commit
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
+
+	XLogRecPtr	notify_lsn;		/* LSN of notification data */
 } xl_xact_parsed_commit;
 
 typedef xl_xact_parsed_commit xl_xact_parsed_prepare;
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..7e9f10cb84b 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -14,11 +14,25 @@
 #define ASYNC_H
 
 #include <signal.h>
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
 
 extern PGDLLIMPORT bool Trace_notify;
 extern PGDLLIMPORT int max_notify_queue_pages;
 extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending;
 
+/*
+ * Compact SLRU queue entry - stores metadata pointing to WAL data
+ */
+typedef struct AsyncQueueEntry
+{
+	Oid			dbid;			/* database ID for quick filtering */
+	TransactionId	xid;			/* transaction ID */
+	XLogRecPtr	notify_lsn;		/* LSN of notification data in WAL */
+} AsyncQueueEntry;
+
+#define ASYNC_QUEUE_ENTRY_SIZE	sizeof(AsyncQueueEntry)
+
 extern Size AsyncShmemSize(void);
 extern void AsyncShmemInit(void);
 
@@ -46,4 +60,15 @@ extern void HandleNotifyInterrupt(void);
 /* process interrupts */
 extern void ProcessNotifyInterrupt(bool flush);
 
+/* WAL-based notification functions */
+extern XLogRecPtr LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid,
+									 uint32 nnotifications, Size data_len, char *data);
+extern void async_redo(XLogReaderState *record);
+extern void async_desc(StringInfo buf, XLogReaderState *record);
+extern const char *async_identify(uint8 info);
+
+/* notification queue functions */
+extern void asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn);
+extern void SignalBackends(void);
+
 #endif							/* ASYNC_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index c6f5ebceefd..71459fe5529 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -301,6 +301,9 @@ struct PGPROC
 
 	uint32		wait_event_info;	/* proc's wait information */
 
+	/* Support for async notifications */
+	XLogRecPtr	notifyCommitLsn;	/* LSN of notification data for current xact */
+
 	/* Support for group transaction status update. */
 	bool		clogGroupMember;	/* true, if member of clog group */
 	pg_atomic_uint32 clogGroupNext; /* next clog group member */
-- 
2.39.3 (Apple Git-145)

#20Matheus Alcantara
matheusssilv97@gmail.com
In reply to: Rishu Bagga (#19)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Tue Sep 9, 2025 at 7:49 PM -03, Rishu Bagga wrote:

On Tue, Sep 9, 2025 at 3:34 PM Matheus Alcantara
<matheusssilv97@gmail.com> wrote:

On Mon Sep 8, 2025 at 9:08 PM -03, Rishu Bagga wrote:

Hi Joel, Arseniy, Matheus, thanks for taking a look. I’ve attached an
updated patch and rebased on the latest commits that fixes the
correctness issues.

I think that your latest patch miss to include of asyncdesc.c file. I'm
getting a compile error:

src/backend/access/rmgrdesc/meson.build:4:20: ERROR: File asyncdesc.c does not exist.

--
Matheus Alcantara

Sorry about that, added the asyncdesc.c file in the attached patch.

I'm still seeing some compiler errors.

FAILED: src/backend/postgres_lib.a.p/access_rmgrdesc_asyncdesc.c.o
ccache cc -Isrc/backend/postgres_lib.a.p -Isrc/include -I../src/include -I/opt/homebrew/Cellar/icu4c@77/77.1/include -I/opt/homebrew/opt/lz4/include -I/opt/homebrew/Cellar/openssl@3/3.5.2/include -I/opt/homebrew/opt/zstd/include -I/opt/homebrew/opt/openssl@3/include -fdiagnostics-color=always -Wall -Winvalid-pch -O0 -g -isysroot /Library/Developer/CommandLineTools/SDKs/MacOSX15.2.sdk -fno-strict-aliasing -fwrapv -fexcess-precision=standard -Wmissing-prototypes -Wpointer-arith -Werror=vla -Werror=unguarded-availability-new -Wendif-labels -Wmissing-format-attribute -Wcast-function-type -Wformat-security -Wdeclaration-after-statement -Wmissing-variable-declarations -Wno-unused-command-line-argument -Wno-compound-token-split-by-macro -Wno-cast-function-type-strict -DBUILDING_DLL -MD -MQ src/backend/postgres_lib.a.p/access_rmgrdesc_asyncdesc.c.o -MF src/backend/postgres_lib.a.p/access_rmgrdesc_asyncdesc.c.o.d -o src/backend/postgres_lib.a.p/access_rmgrdesc_asyncdesc.c.o -c ../src/backend/access/rmgrdesc/asyncdesc.c
../src/backend/access/rmgrdesc/asyncdesc.c:17:10: fatal error: 'access/async_xlog.h' file not found
17 | #include "access/async_xlog.h"
| ^~~~~~~~~~~~~~~~~~~~~
1 error generated.

I'm missing something here? I'm using meson+ninja and I remove the build
directory, executed meson setup and then ninja -C build install and I
got this error.

--
Matheus Alcantara

#21Rishu Bagga
rishu.postgres@gmail.com
In reply to: Matheus Alcantara (#20)
1 attachment(s)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Tue, Sep 9, 2025 at 4:02 PM Matheus Alcantara
<matheusssilv97@gmail.com> wrote:

FAILED: src/backend/postgres_lib.a.p/access_rmgrdesc_asyncdesc.c.o
ccache cc -Isrc/backend/postgres_lib.a.p -Isrc/include -I../src/include -I/opt/homebrew/Cellar/icu4c@77/77.1/include -I/opt/homebrew/opt/lz4/include -I/opt/homebrew/Cellar/openssl@3/3.5.2/include -I/opt/homebrew/opt/zstd/include -I/opt/homebrew/opt/openssl@3/include -fdiagnostics-color=always -Wall -Winvalid-pch -O0 -g -isysroot /Library/Developer/CommandLineTools/SDKs/MacOSX15.2.sdk -fno-strict-aliasing -fwrapv -fexcess-precision=standard -Wmissing-prototypes -Wpointer-arith -Werror=vla -Werror=unguarded-availability-new -Wendif-labels -Wmissing-format-attribute -Wcast-function-type -Wformat-security -Wdeclaration-after-statement -Wmissing-variable-declarations -Wno-unused-command-line-argument -Wno-compound-token-split-by-macro -Wno-cast-function-type-strict -DBUILDING_DLL -MD -MQ src/backend/postgres_lib.a.p/access_rmgrdesc_asyncdesc.c.o -MF src/backend/postgres_lib.a.p/access_rmgrdesc_asyncdesc.c.o.d -o src/backend/postgres_lib.a.p/access_rmgrdesc_asyncdesc.c.o -c ../src/backend/access/rmgrdesc/asyncdesc.c
../src/backend/access/rmgrdesc/asyncdesc.c:17:10: fatal error: 'access/async_xlog.h' file not found
17 | #include "access/async_xlog.h"
| ^~~~~~~~~~~~~~~~~~~~~
1 error generated.

I'm missing something here? I'm using meson+ninja and I remove the build
directory, executed meson setup and then ninja -C build install and I
got this error.

Oops again - I didn't "git add" the new files, so they weren't showing
up in the patch. I added async_xlog.h as well now,
and tested to make sure the patch applies and compiles. Sorry about
that, it should work now.

Attachments:

notify-through-wal-v4.patchapplication/octet-stream; name=notify-through-wal-v4.patchDownload
From d37772e3337a083e719be2fc9d93474792bc8c8d Mon Sep 17 00:00:00 2001
From: rbagga <bangalorian@gmail.com>
Date: Sun, 7 Sep 2025 16:55:57 -0700
Subject: [PATCH v4] Implement WAL-based async notifications for improved
 throughput

- Added WAL logging for async notifications to improve scalability
- Implemented async resource manager for WAL-based notification handling
- Added new async descriptor files for pg_waldump support
- Updated makefiles and build configuration for new components
---
 src/backend/access/rmgrdesc/Makefile    |   1 +
 src/backend/access/rmgrdesc/asyncdesc.c |  47 ++
 src/backend/access/rmgrdesc/meson.build |   1 +
 src/backend/access/rmgrdesc/xactdesc.c  |  13 +
 src/backend/access/transam/rmgr.c       |   1 +
 src/backend/access/transam/xact.c       |  48 +-
 src/backend/commands/async.c            | 800 ++++++++++++------------
 src/bin/pg_rewind/parsexlog.c           |   1 +
 src/bin/pg_waldump/rmgrdesc.c           |   2 +
 src/include/access/async_xlog.h         |  43 ++
 src/include/access/rmgrlist.h           |   1 +
 src/include/access/xact.h               |   9 +
 src/include/commands/async.h            |  25 +
 src/include/storage/proc.h              |   3 +
 14 files changed, 591 insertions(+), 404 deletions(-)
 create mode 100644 src/backend/access/rmgrdesc/asyncdesc.c
 create mode 100644 src/include/access/async_xlog.h

diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index cd95eec37f1..6e6e75b12bd 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -9,6 +9,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = \
+	asyncdesc.o \
 	brindesc.o \
 	clogdesc.o \
 	committsdesc.o \
diff --git a/src/backend/access/rmgrdesc/asyncdesc.c b/src/backend/access/rmgrdesc/asyncdesc.c
new file mode 100644
index 00000000000..7f322849ff1
--- /dev/null
+++ b/src/backend/access/rmgrdesc/asyncdesc.c
@@ -0,0 +1,47 @@
+/*-------------------------------------------------------------------------
+ *
+ * asyncdesc.c
+ *	  rmgr descriptor routines for access/async.c
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/asyncdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/async_xlog.h"
+
+void
+async_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == XLOG_ASYNC_NOTIFY_DATA)
+	{
+		xl_async_notify_data *xlrec = (xl_async_notify_data *) rec;
+
+		appendStringInfo(buf, "notify data: db %u xid %u pid %d notifications %u",
+						 xlrec->dbid, xlrec->xid, xlrec->srcPid, xlrec->nnotifications);
+	}
+}
+
+const char *
+async_identify(uint8 info)
+{
+	const char *id = NULL;
+
+	switch (info & ~XLR_INFO_MASK)
+	{
+		case XLOG_ASYNC_NOTIFY_DATA:
+			id = "NOTIFY_DATA";
+			break;
+	}
+
+	return id;
+}
\ No newline at end of file
diff --git a/src/backend/access/rmgrdesc/meson.build b/src/backend/access/rmgrdesc/meson.build
index 96c98e800c2..38bef2e87f6 100644
--- a/src/backend/access/rmgrdesc/meson.build
+++ b/src/backend/access/rmgrdesc/meson.build
@@ -2,6 +2,7 @@
 
 # used by frontend programs like pg_waldump
 rmgr_desc_sources = files(
+  'asyncdesc.c',
   'brindesc.c',
   'clogdesc.c',
   'committsdesc.c',
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index f0f696855b9..4f32f7fc591 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -135,6 +135,19 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 
 		data += sizeof(xl_xact_origin);
 	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_NOTIFY)
+	{
+		xl_xact_notify xl_notify;
+
+		/* no alignment is guaranteed, so copy onto stack */
+		memcpy(&xl_notify, data, sizeof(xl_notify));
+
+		parsed->notify_lsn = xl_notify.notify_lsn;
+
+		data += sizeof(xl_xact_notify);
+	}
+
 }
 
 void
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 1b7499726eb..f8c25e6597a 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -19,6 +19,7 @@
 
 /* includes needed for "access/rmgrlist.h" */
 /* IWYU pragma: begin_keep */
+#include "access/async_xlog.h"
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b46e7e9c2a6..33b16ff4746 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5841,10 +5841,24 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_invals xl_invals;
 	xl_xact_twophase xl_twophase;
 	xl_xact_origin xl_origin;
+	xl_xact_notify xl_notify;
 	uint8		info;
+	XLogRecPtr	result;
 
 	Assert(CritSectionCount > 0);
 
+	/*
+	 * Handle notification commit ordering: if this transaction has pending
+	 * notifications, we must write the queue entry just before the commit
+	 * record while holding NotifyQueueLock to ensure proper ordering.
+	 */
+	if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		TransactionId xid = GetCurrentTransactionId();
+		LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+		asyncQueueAddCompactEntry(MyDatabaseId, xid, MyProc->notifyCommitLsn);
+	}
+
 	xl_xinfo.xinfo = 0;
 
 	/* decide between a plain and 2pc commit */
@@ -5926,9 +5940,17 @@ XactLogCommitRecord(TimestampTz commit_time,
 		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
+	/* include notification information if present */
+	if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_NOTIFY;
+		xl_notify.notify_lsn = MyProc->notifyCommitLsn;
+	}
+
 	if (xl_xinfo.xinfo != 0)
 		info |= XLOG_XACT_HAS_INFO;
 
+
 	/* Then include all the collected data into the commit record. */
 
 	XLogBeginInsert();
@@ -5982,10 +6004,28 @@ XactLogCommitRecord(TimestampTz commit_time,
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData(&xl_origin, sizeof(xl_xact_origin));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_NOTIFY)
+		XLogRegisterData(&xl_notify, sizeof(xl_xact_notify));
+
 	/* we allow filtering by xacts */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
-	return XLogInsert(RM_XACT_ID, info);
+	/* Insert the commit record */
+	result = XLogInsert(RM_XACT_ID, info);
+
+	/*
+	 * Release NotifyQueueLock if we held it. The queue entry is now
+	 * associated with a committed transaction, so readers can process it.
+	 */
+	if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		LWLockRelease(NotifyQueueLock);
+		
+		/* Signal listening backends to check for new notifications */
+		SignalBackends();
+	}
+
+	return result;
 }
 
 /*
@@ -6227,6 +6267,12 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 						   false /* backward */ , false /* WAL */ );
 	}
 
+	/* Add notification queue entry if this commit has notifications */
+	if (parsed->xinfo & XACT_XINFO_HAS_NOTIFY)
+	{
+		asyncQueueAddCompactEntry(parsed->dbId, xid, parsed->notify_lsn);
+	}
+
 	/* Make sure files supposed to be dropped are dropped */
 	if (parsed->nrels > 0)
 	{
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..57fa732e9b8 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -133,6 +133,12 @@
 #include "access/slru.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xloginsert.h"
+#include "access/xlogreader.h"
+#include "access/xlogutils.h"
+#include "access/xlogrecovery.h"
+#include "access/xlog_internal.h"
 #include "catalog/pg_database.h"
 #include "commands/async.h"
 #include "common/hashfn.h"
@@ -151,6 +157,29 @@
 #include "utils/snapmgr.h"
 #include "utils/timestamp.h"
 
+/* Missing definitions for WAL-based notification system */
+#define AsyncQueueEntryEmptySize ASYNC_QUEUE_ENTRY_SIZE
+#define SLRU_PAGE_SIZE BLCKSZ
+#define AsyncCtl NotifyCtl
+
+/* WAL record types */
+#define XLOG_ASYNC_NOTIFY_DATA	0x00
+
+/*
+ * WAL record for notification data (written in PreCommit_Notify)
+ */
+typedef struct xl_async_notify_data
+{
+	Oid			dbid;			/* database ID */
+	TransactionId xid;			/* transaction ID */
+	int32		srcPid;			/* source backend PID */
+	uint32		nnotifications;	/* number of notifications */
+	/* followed by serialized notification data */
+} xl_async_notify_data;
+
+#define SizeOfAsyncNotifyData	(offsetof(xl_async_notify_data, nnotifications) + sizeof(uint32))
+
+
 
 /*
  * Maximum size of a NOTIFY payload, including terminating NULL.  This
@@ -163,30 +192,13 @@
 #define NOTIFY_PAYLOAD_MAX_LENGTH	(BLCKSZ - NAMEDATALEN - 128)
 
 /*
- * Struct representing an entry in the global notify queue
- *
- * This struct declaration has the maximal length, but in a real queue entry
- * the data area is only big enough for the actual channel and payload strings
- * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
- * entry size, if both channel and payload strings are empty (but note it
- * doesn't include alignment padding).
- *
- * The "length" field should always be rounded up to the next QUEUEALIGN
- * multiple so that all fields are properly aligned.
+ * NOTE: The AsyncQueueEntry structure is now defined in commands/async.h
+ * as a compact metadata-only structure for the new WAL-based notification system.
+ * The old variable-length structure with full notification content is no longer used.
  */
-typedef struct AsyncQueueEntry
-{
-	int			length;			/* total allocated length of entry */
-	Oid			dboid;			/* sender's database OID */
-	TransactionId xid;			/* sender's XID */
-	int32		srcPid;			/* sender's PID */
-	char		data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
-} AsyncQueueEntry;
-
-/* Currently, no field of AsyncQueueEntry requires more than int alignment */
-#define QUEUEALIGN(len)		INTALIGN(len)
 
-#define AsyncQueueEntryEmptySize	(offsetof(AsyncQueueEntry, data) + 2)
+/* Queue alignment is still needed for SLRU page management */
+#define QUEUEALIGN(len)		INTALIGN(len)
 
 /*
  * Struct describing a queue position, and assorted macros for working with it
@@ -438,18 +450,13 @@ static void Exec_UnlistenCommit(const char *channel);
 static void Exec_UnlistenAllCommit(void);
 static bool IsListeningOn(const char *channel);
 static void asyncQueueUnregister(void);
-static bool asyncQueueIsFull(void);
 static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
-static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
-static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
 static double asyncQueueUsage(void);
-static void asyncQueueFillWarning(void);
-static void SignalBackends(void);
+void SignalBackends(void);
 static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 										 QueuePosition stop,
-										 char *page_buffer,
-										 Snapshot snapshot);
+										 char *page_buffer);
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(bool flush);
 static bool AsyncExistsPendingNotify(Notification *n);
@@ -457,6 +464,7 @@ static void AddEventToPendingNotifies(Notification *n);
 static uint32 notification_hash(const void *key, Size keysize);
 static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
+static void processNotificationFromWAL(XLogRecPtr notify_lsn);
 
 /*
  * Compute the difference between two queue page numbers.
@@ -890,65 +898,75 @@ PreCommit_Notify(void)
 		}
 	}
 
-	/* Queue any pending notifies (must happen after the above) */
+	/* Write notification data to WAL if we have any */
 	if (pendingNotifies)
 	{
-		ListCell   *nextNotify;
+		TransactionId currentXid;
+		ListCell   *l;
+		size_t		total_size = 0;
+		uint32		nnotifications = 0;
+		char	   *notifications_data;
+		char	   *ptr;
+		XLogRecPtr	notify_lsn;
 
 		/*
 		 * Make sure that we have an XID assigned to the current transaction.
 		 * GetCurrentTransactionId is cheap if we already have an XID, but not
-		 * so cheap if we don't, and we'd prefer not to do that work while
-		 * holding NotifyQueueLock.
+		 * so cheap if we don't.
 		 */
-		(void) GetCurrentTransactionId();
+		currentXid = GetCurrentTransactionId();
 
 		/*
-		 * Serialize writers by acquiring a special lock that we hold till
-		 * after commit.  This ensures that queue entries appear in commit
-		 * order, and in particular that there are never uncommitted queue
-		 * entries ahead of committed ones, so an uncommitted transaction
-		 * can't block delivery of deliverable notifications.
-		 *
-		 * We use a heavyweight lock so that it'll automatically be released
-		 * after either commit or abort.  This also allows deadlocks to be
-		 * detected, though really a deadlock shouldn't be possible here.
-		 *
-		 * The lock is on "database 0", which is pretty ugly but it doesn't
-		 * seem worth inventing a special locktag category just for this.
-		 * (Historical note: before PG 9.0, a similar lock on "database 0" was
-		 * used by the flatfiles mechanism.)
+		 * Step 1: Write notification data to WAL.
+		 * This can be done in parallel with other transactions since we're
+		 * not holding any global locks yet.
 		 */
-		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
-						 AccessExclusiveLock);
+		
+		/* First pass: calculate total size needed for serialization */
+		foreach(l, pendingNotifies->events)
+		{
+			Notification *n = (Notification *) lfirst(l);
+			
+			/* Size: 2 bytes for channel_len + 2 bytes for payload_len + strings */
+			total_size += 4 + n->channel_len + 1 + n->payload_len + 1;
+			nnotifications++;
+		}
 
-		/* Now push the notifications into the queue */
-		nextNotify = list_head(pendingNotifies->events);
-		while (nextNotify != NULL)
+		/* Allocate buffer for notification data */
+		notifications_data = palloc(total_size);
+		ptr = notifications_data;
+
+		/* Second pass: serialize all notifications */
+		foreach(l, pendingNotifies->events)
 		{
-			/*
-			 * Add the pending notifications to the queue.  We acquire and
-			 * release NotifyQueueLock once per page, which might be overkill
-			 * but it does allow readers to get in while we're doing this.
-			 *
-			 * A full queue is very uncommon and should really not happen,
-			 * given that we have so much space available in the SLRU pages.
-			 * Nevertheless we need to deal with this possibility. Note that
-			 * when we get here we are in the process of committing our
-			 * transaction, but we have not yet committed to clog, so at this
-			 * point in time we can still roll the transaction back.
-			 */
-			LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-			asyncQueueFillWarning();
-			if (asyncQueueIsFull())
-				ereport(ERROR,
-						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-						 errmsg("too many notifications in the NOTIFY queue")));
-			nextNotify = asyncQueueAddEntries(nextNotify);
-			LWLockRelease(NotifyQueueLock);
+			Notification *n = (Notification *) lfirst(l);
+			char	   *channel = n->data;
+			char	   *payload = n->data + n->channel_len + 1;
+
+			/* Write channel length, payload length, channel, and payload */
+			memcpy(ptr, &n->channel_len, 2);
+			ptr += 2;
+			memcpy(ptr, &n->payload_len, 2);
+			ptr += 2;
+			memcpy(ptr, channel, n->channel_len + 1);
+			ptr += n->channel_len + 1;
+			memcpy(ptr, payload, n->payload_len + 1);
+			ptr += n->payload_len + 1;
 		}
 
-		/* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
+		/* Write notification data to WAL */
+		notify_lsn = LogAsyncNotifyData(MyDatabaseId, currentXid, MyProcPid,
+										nnotifications, total_size,
+										notifications_data);
+
+		pfree(notifications_data);
+
+		/*
+		 * Step 2: Store the notification LSN in PROC for use during commit.
+		 * The queue entry will be written just before the commit record
+		 * while holding the global notification commit lock to ensure proper ordering.
+		 */
+		MyProc->notifyCommitLsn = notify_lsn;
 	}
 }
 
@@ -1006,12 +1024,19 @@ AtCommit_Notify(void)
 		asyncQueueUnregister();
 
 	/*
-	 * Send signals to listening backends.  We need do this only if there are
-	 * pending notifies, which were previously added to the shared queue by
-	 * PreCommit_Notify().
+	 * If we had notifications, they were already written to the queue in
+	 * PreCommit_Notify. Now that we've committed, signal listening backends
+	 * to check the queue. The transaction visibility logic will now see our
+	 * XID as committed and process the notifications.
 	 */
-	if (pendingNotifies != NULL)
+	if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		/* Signal listening backends to check the queue */
 		SignalBackends();
+		
+		/* Clear the flag now that we're done */
+		MyProc->notifyCommitLsn = InvalidXLogRecPtr;
+	}
 
 	/*
 	 * If it's time to try to advance the global tail pointer, do that.
@@ -1263,21 +1288,6 @@ asyncQueueUnregister(void)
 	amRegisteredListener = false;
 }
 
-/*
- * Test whether there is room to insert more notification messages.
- *
- * Caller must hold at least shared NotifyQueueLock.
- */
-static bool
-asyncQueueIsFull(void)
-{
-	int64		headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
-	int64		tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
-	int64		occupied = headPage - tailPage;
-
-	return occupied >= max_notify_queue_pages;
-}
-
 /*
  * Advance the QueuePosition to the next entry, assuming that the current
  * entry is of length entryLength.  If we jump to a new page the function
@@ -1313,166 +1323,6 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
 	return pageJump;
 }
 
-/*
- * Fill the AsyncQueueEntry at *qe with an outbound notification message.
- */
-static void
-asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
-{
-	size_t		channellen = n->channel_len;
-	size_t		payloadlen = n->payload_len;
-	int			entryLength;
-
-	Assert(channellen < NAMEDATALEN);
-	Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
-
-	/* The terminators are already included in AsyncQueueEntryEmptySize */
-	entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
-	entryLength = QUEUEALIGN(entryLength);
-	qe->length = entryLength;
-	qe->dboid = MyDatabaseId;
-	qe->xid = GetCurrentTransactionId();
-	qe->srcPid = MyProcPid;
-	memcpy(qe->data, n->data, channellen + payloadlen + 2);
-}
-
-/*
- * Add pending notifications to the queue.
- *
- * We go page by page here, i.e. we stop once we have to go to a new page but
- * we will be called again and then fill that next page. If an entry does not
- * fit into the current page, we write a dummy entry with an InvalidOid as the
- * database OID in order to fill the page. So every page is always used up to
- * the last byte which simplifies reading the page later.
- *
- * We are passed the list cell (in pendingNotifies->events) containing the next
- * notification to write and return the first still-unwritten cell back.
- * Eventually we will return NULL indicating all is done.
- *
- * We are holding NotifyQueueLock already from the caller and grab
- * page specific SLRU bank lock locally in this function.
- */
-static ListCell *
-asyncQueueAddEntries(ListCell *nextNotify)
-{
-	AsyncQueueEntry qe;
-	QueuePosition queue_head;
-	int64		pageno;
-	int			offset;
-	int			slotno;
-	LWLock	   *prevlock;
-
-	/*
-	 * We work with a local copy of QUEUE_HEAD, which we write back to shared
-	 * memory upon exiting.  The reason for this is that if we have to advance
-	 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
-	 * instance), and we must not advance QUEUE_HEAD if it does.  (Otherwise,
-	 * subsequent insertions would try to put entries into a page that slru.c
-	 * thinks doesn't exist yet.)  So, use a local position variable.  Note
-	 * that if we do fail, any already-inserted queue entries are forgotten;
-	 * this is okay, since they'd be useless anyway after our transaction
-	 * rolls back.
-	 */
-	queue_head = QUEUE_HEAD;
-
-	/*
-	 * If this is the first write since the postmaster started, we need to
-	 * initialize the first page of the async SLRU.  Otherwise, the current
-	 * page should be initialized already, so just fetch it.
-	 */
-	pageno = QUEUE_POS_PAGE(queue_head);
-	prevlock = SimpleLruGetBankLock(NotifyCtl, pageno);
-
-	/* We hold both NotifyQueueLock and SLRU bank lock during this operation */
-	LWLockAcquire(prevlock, LW_EXCLUSIVE);
-
-	if (QUEUE_POS_IS_ZERO(queue_head))
-		slotno = SimpleLruZeroPage(NotifyCtl, pageno);
-	else
-		slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
-								   InvalidTransactionId);
-
-	/* Note we mark the page dirty before writing in it */
-	NotifyCtl->shared->page_dirty[slotno] = true;
-
-	while (nextNotify != NULL)
-	{
-		Notification *n = (Notification *) lfirst(nextNotify);
-
-		/* Construct a valid queue entry in local variable qe */
-		asyncQueueNotificationToEntry(n, &qe);
-
-		offset = QUEUE_POS_OFFSET(queue_head);
-
-		/* Check whether the entry really fits on the current page */
-		if (offset + qe.length <= QUEUE_PAGESIZE)
-		{
-			/* OK, so advance nextNotify past this item */
-			nextNotify = lnext(pendingNotifies->events, nextNotify);
-		}
-		else
-		{
-			/*
-			 * Write a dummy entry to fill up the page. Actually readers will
-			 * only check dboid and since it won't match any reader's database
-			 * OID, they will ignore this entry and move on.
-			 */
-			qe.length = QUEUE_PAGESIZE - offset;
-			qe.dboid = InvalidOid;
-			qe.data[0] = '\0';	/* empty channel */
-			qe.data[1] = '\0';	/* empty payload */
-		}
-
-		/* Now copy qe into the shared buffer page */
-		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
-			   &qe,
-			   qe.length);
-
-		/* Advance queue_head appropriately, and detect if page is full */
-		if (asyncQueueAdvance(&(queue_head), qe.length))
-		{
-			LWLock	   *lock;
-
-			pageno = QUEUE_POS_PAGE(queue_head);
-			lock = SimpleLruGetBankLock(NotifyCtl, pageno);
-			if (lock != prevlock)
-			{
-				LWLockRelease(prevlock);
-				LWLockAcquire(lock, LW_EXCLUSIVE);
-				prevlock = lock;
-			}
-
-			/*
-			 * Page is full, so we're done here, but first fill the next page
-			 * with zeroes.  The reason to do this is to ensure that slru.c's
-			 * idea of the head page is always the same as ours, which avoids
-			 * boundary problems in SimpleLruTruncate.  The test in
-			 * asyncQueueIsFull() ensured that there is room to create this
-			 * page without overrunning the queue.
-			 */
-			slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
-
-			/*
-			 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
-			 * set flag to remember that we should try to advance the tail
-			 * pointer (we don't want to actually do that right here).
-			 */
-			if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
-				tryAdvanceTail = true;
-
-			/* And exit the loop */
-			break;
-		}
-	}
-
-	/* Success, so update the global QUEUE_HEAD */
-	QUEUE_HEAD = queue_head;
-
-	LWLockRelease(prevlock);
-
-	return nextNotify;
-}
-
 /*
  * SQL function to return the fraction of the notification queue currently
  * occupied.
@@ -1515,52 +1365,6 @@ asyncQueueUsage(void)
 	return (double) occupied / (double) max_notify_queue_pages;
 }
 
-/*
- * Check whether the queue is at least half full, and emit a warning if so.
- *
- * This is unlikely given the size of the queue, but possible.
- * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
- *
- * Caller must hold exclusive NotifyQueueLock.
- */
-static void
-asyncQueueFillWarning(void)
-{
-	double		fillDegree;
-	TimestampTz t;
-
-	fillDegree = asyncQueueUsage();
-	if (fillDegree < 0.5)
-		return;
-
-	t = GetCurrentTimestamp();
-
-	if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
-								   t, QUEUE_FULL_WARN_INTERVAL))
-	{
-		QueuePosition min = QUEUE_HEAD;
-		int32		minPid = InvalidPid;
-
-		for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
-		{
-			Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
-			min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
-			if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
-				minPid = QUEUE_BACKEND_PID(i);
-		}
-
-		ereport(WARNING,
-				(errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
-				 (minPid != InvalidPid ?
-				  errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
-				  : 0),
-				 (minPid != InvalidPid ?
-				  errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
-				  : 0)));
-
-		asyncQueueControl->lastQueueFillWarn = t;
-	}
-}
 
 /*
  * Send signals to listening backends.
@@ -1577,7 +1381,7 @@ asyncQueueFillWarning(void)
  * This is called during CommitTransaction(), so it's important for it
  * to have very low probability of failure.
  */
-static void
+void
 SignalBackends(void)
 {
 	int32	   *pids;
@@ -1844,15 +1648,13 @@ ProcessNotifyInterrupt(bool flush)
 
 /*
  * Read all pending notifications from the queue, and deliver appropriate
- * ones to my frontend.  Stop when we reach queue head or an uncommitted
- * notification.
+ * ones to my frontend.  Stop when we reach queue head.
  */
 static void
 asyncQueueReadAllNotifications(void)
 {
 	volatile QueuePosition pos;
 	QueuePosition head;
-	Snapshot	snapshot;
 
 	/* page_buffer must be adequately aligned, so use a union */
 	union
@@ -1875,46 +1677,6 @@ asyncQueueReadAllNotifications(void)
 		return;
 	}
 
-	/*----------
-	 * Get snapshot we'll use to decide which xacts are still in progress.
-	 * This is trickier than it might seem, because of race conditions.
-	 * Consider the following example:
-	 *
-	 * Backend 1:					 Backend 2:
-	 *
-	 * transaction starts
-	 * UPDATE foo SET ...;
-	 * NOTIFY foo;
-	 * commit starts
-	 * queue the notify message
-	 *								 transaction starts
-	 *								 LISTEN foo;  -- first LISTEN in session
-	 *								 SELECT * FROM foo WHERE ...;
-	 * commit to clog
-	 *								 commit starts
-	 *								 add backend 2 to array of listeners
-	 *								 advance to queue head (this code)
-	 *								 commit to clog
-	 *
-	 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
-	 * wasn't committed yet.  Ideally we'd ensure that client 2 would
-	 * eventually get transaction 1's notify message, but there's no way
-	 * to do that; until we're in the listener array, there's no guarantee
-	 * that the notify message doesn't get removed from the queue.
-	 *
-	 * Therefore the coding technique transaction 2 is using is unsafe:
-	 * applications must commit a LISTEN before inspecting database state,
-	 * if they want to ensure they will see notifications about subsequent
-	 * changes to that state.
-	 *
-	 * What we do guarantee is that we'll see all notifications from
-	 * transactions committing after the snapshot we take here.
-	 * Exec_ListenPreCommit has already added us to the listener array,
-	 * so no not-yet-committed messages can be removed from the queue
-	 * before we see them.
-	 *----------
-	 */
-	snapshot = RegisterSnapshot(GetLatestSnapshot());
 
 	/*
 	 * It is possible that we fail while trying to send a message to our
@@ -1979,8 +1741,7 @@ asyncQueueReadAllNotifications(void)
 			 * while sending the notifications to the frontend.
 			 */
 			reachedStop = asyncQueueProcessPageEntries(&pos, head,
-													   page_buffer.buf,
-													   snapshot);
+													   page_buffer.buf);
 		} while (!reachedStop);
 	}
 	PG_FINALLY();
@@ -1992,8 +1753,6 @@ asyncQueueReadAllNotifications(void)
 	}
 	PG_END_TRY();
 
-	/* Done with snapshot */
-	UnregisterSnapshot(snapshot);
 }
 
 /*
@@ -2004,19 +1763,17 @@ asyncQueueReadAllNotifications(void)
  * memory.  (We could access the page right in shared memory, but that
  * would imply holding the SLRU bank lock throughout this routine.)
  *
- * We stop if we reach the "stop" position, or reach a notification from an
- * uncommitted transaction, or reach the end of the page.
+ * We stop if we reach the "stop" position or reach the end of the page.
  *
- * The function returns true once we have reached the stop position or an
- * uncommitted notification, and false if we have finished with the page.
+ * The function returns true once we have reached the stop position, and false
+ * if we have finished with the page.
  * In other words: once it returns true there is no need to look further.
  * The QueuePosition *current is advanced past all processed messages.
  */
 static bool
 asyncQueueProcessPageEntries(volatile QueuePosition *current,
 							 QueuePosition stop,
-							 char *page_buffer,
-							 Snapshot snapshot)
+							 char *page_buffer)
 {
 	bool		reachedStop = false;
 	bool		reachedEndOfPage;
@@ -2032,60 +1789,24 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 		qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
 
 		/*
-		 * Advance *current over this message, possibly to the next page. As
-		 * noted in the comments for asyncQueueReadAllNotifications, we must
-		 * do this before possibly failing while processing the message.
+		 * Advance *current over this compact entry. The new compact entries are
+		 * fixed-size, making this much simpler than the old variable-length entries.
 		 */
-		reachedEndOfPage = asyncQueueAdvance(current, qe->length);
+		reachedEndOfPage = asyncQueueAdvance(current, sizeof(AsyncQueueEntry));
 
 		/* Ignore messages destined for other databases */
-		if (qe->dboid == MyDatabaseId)
+		if (qe->dbid == MyDatabaseId)
 		{
-			if (XidInMVCCSnapshot(qe->xid, snapshot))
-			{
-				/*
-				 * The source transaction is still in progress, so we can't
-				 * process this message yet.  Break out of the loop, but first
-				 * back up *current so we will reprocess the message next
-				 * time.  (Note: it is unlikely but not impossible for
-				 * TransactionIdDidCommit to fail, so we can't really avoid
-				 * this advance-then-back-up behavior when dealing with an
-				 * uncommitted message.)
-				 *
-				 * Note that we must test XidInMVCCSnapshot before we test
-				 * TransactionIdDidCommit, else we might return a message from
-				 * a transaction that is not yet visible to snapshots; compare
-				 * the comments at the head of heapam_visibility.c.
-				 *
-				 * Also, while our own xact won't be listed in the snapshot,
-				 * we need not check for TransactionIdIsCurrentTransactionId
-				 * because our transaction cannot (yet) have queued any
-				 * messages.
-				 */
-				*current = thisentry;
-				reachedStop = true;
-				break;
-			}
-			else if (TransactionIdDidCommit(qe->xid))
-			{
-				/* qe->data is the null-terminated channel name */
-				char	   *channel = qe->data;
-
-				if (IsListeningOn(channel))
-				{
-					/* payload follows channel name */
-					char	   *payload = qe->data + strlen(channel) + 1;
-
-					NotifyMyFrontEnd(channel, payload, qe->srcPid);
-				}
-			}
-			else
-			{
-				/*
-				 * The source transaction aborted or crashed, so we just
-				 * ignore its notifications.
-				 */
-			}
+			/*
+			 * Since queue entries are written atomically with commit records
+			 * while holding NotifyQueueLock exclusively, all entries in the queue
+			 * are guaranteed to be from committed transactions.
+			 *
+			 * Step 5: Read notification data from WAL using stored LSN.
+			 * The compact entry only contains metadata; actual notification
+			 * content is retrieved from WAL on demand.
+			 */
+			processNotificationFromWAL(qe->notify_lsn);
 		}
 
 		/* Loop back if we're not at end of page */
@@ -2097,6 +1818,220 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 	return reachedStop;
 }
 
+/*
+ * processNotificationFromWAL
+ *
+ * Fetch notification data from WAL using the stored LSN and process
+ * the individual notifications for delivery to listening frontend.
+ * This implements Step 5 of the new WAL-based notification system.
+ */
+static void
+processNotificationFromWAL(XLogRecPtr notify_lsn)
+{
+	XLogReaderState *xlogreader;
+	DecodedXLogRecord *record;
+	xl_async_notify_data *xlrec;
+	char	   *data;
+	char	   *ptr;
+	uint32_t	remaining;
+	int			srcPid;
+	char	   *errormsg;
+
+	/*
+	 * Create XLog reader to fetch the notification data record.
+	 * We use a temporary reader since this is called during normal
+	 * notification processing, not during recovery.
+	 */
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									XL_ROUTINE(.page_read = &read_local_xlog_page,
+											   .segment_open = &wal_segment_open,
+											   .segment_close = &wal_segment_close),
+									NULL);
+	if (!xlogreader)
+		elog(ERROR, "failed to allocate XLog reader for notification data");
+
+    /* Start reading exactly at the NOTIFY_DATA record begin LSN */
+    XLogBeginRead(xlogreader, notify_lsn);
+
+    /* Read the NOTIFY_DATA record */
+    record = (DecodedXLogRecord *) XLogReadRecord(xlogreader, &errormsg);
+    if (record == NULL)
+        elog(ERROR, "failed to read notification data from WAL at %X/%X: %s",
+             LSN_FORMAT_ARGS(notify_lsn), errormsg ? errormsg : "no error message");
+
+    /* Verify this is the expected record type */
+    if (XLogRecGetRmid(xlogreader) != RM_ASYNC_ID ||
+        (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK) != XLOG_ASYNC_NOTIFY_DATA)
+        elog(ERROR, "expected NOTIFY_DATA at %X/%X, found rmgr %u info %u",
+             LSN_FORMAT_ARGS(notify_lsn),
+             XLogRecGetRmid(xlogreader),
+             (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK));
+
+	/* Extract the notification data from the WAL record */
+	xlrec = (xl_async_notify_data *) XLogRecGetData(xlogreader);
+	srcPid = xlrec->srcPid;
+	data = (char *) xlrec + SizeOfAsyncNotifyData;
+	ptr = data;
+	remaining = XLogRecGetDataLen(xlogreader) - SizeOfAsyncNotifyData;
+
+	/*
+	 * Process each notification in the serialized data.
+	 * The format is: 2-byte channel_len, 2-byte payload_len,
+	 * null-terminated channel, null-terminated payload.
+	 */
+	for (uint32_t i = 0; i < xlrec->nnotifications && remaining >= 4; i++)
+	{
+		uint16		channel_len;
+		uint16		payload_len;
+		char	   *channel;
+		char	   *payload;
+
+		/* Read lengths */
+		memcpy(&channel_len, ptr, 2);
+		ptr += 2;
+		memcpy(&payload_len, ptr, 2);
+		ptr += 2;
+		remaining -= 4;
+
+		/* Verify we have enough data */
+		if (remaining < channel_len + 1 + payload_len + 1)
+			break;
+
+		/* Extract channel and payload strings */
+		channel = ptr;
+		ptr += channel_len + 1;
+		payload = ptr;
+		ptr += payload_len + 1;
+		remaining -= (channel_len + 1 + payload_len + 1);
+
+		/* Deliver notification if we're listening on this channel */
+		if (IsListeningOn(channel))
+			NotifyMyFrontEnd(channel, payload, srcPid);
+	}
+
+	/* Clean up */
+	XLogReaderFree(xlogreader);
+}
+
+
+/*
+ * asyncQueueAddCompactEntry
+ *
+ * Add a compact entry to the notification SLRU queue containing only
+ * metadata (dbid, xid, notify_lsn) that points to the full notification 
+ * data in WAL. This is much more efficient than the old approach of
+ * storing complete notification content in the SLRU queue.
+ */
+void
+asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn)
+{
+	AsyncQueueEntry entry;
+	QueuePosition queue_head;
+	int64		pageno;
+	int			offset;
+	int			slotno;
+	LWLock	   *banklock;
+
+	/*
+	 * Fill in the compact entry with just the metadata.
+	 * No payload data is stored here - it's all in WAL.
+	 */
+	entry.dbid = dbid;
+	entry.xid = xid;
+	entry.notify_lsn = notify_lsn;
+
+	/* Caller should already hold NotifyQueueLock in exclusive mode */
+	queue_head = QUEUE_HEAD;
+
+	/*
+	 * Get the current page. If this is the first write since postmaster
+	 * started, initialize the first page.
+	 */
+	pageno = QUEUE_POS_PAGE(queue_head);
+	banklock = SimpleLruGetBankLock(NotifyCtl, pageno);
+
+	LWLockAcquire(banklock, LW_EXCLUSIVE);
+
+	if (QUEUE_POS_IS_ZERO(queue_head))
+		slotno = SimpleLruZeroPage(NotifyCtl, pageno);
+	else
+		slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
+								   InvalidTransactionId);
+
+	/* Mark the page dirty before writing */
+	NotifyCtl->shared->page_dirty[slotno] = true;
+
+	offset = QUEUE_POS_OFFSET(queue_head);
+
+	/* Check if the compact entry fits on the current page */
+	if (offset + sizeof(AsyncQueueEntry) <= QUEUE_PAGESIZE)
+	{
+		/* Copy the compact entry to the shared buffer */
+		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
+			   &entry,
+			   sizeof(AsyncQueueEntry));
+
+		/* Advance queue head by the size of our compact entry */
+		if (asyncQueueAdvance(&queue_head, sizeof(AsyncQueueEntry)))
+		{
+			/*
+			 * Page became full. Initialize the next page to ensure SLRU
+			 * consistency (similar to what asyncQueueAddEntries does).
+			 */
+			LWLock	   *nextlock;
+
+			pageno = QUEUE_POS_PAGE(queue_head);
+			nextlock = SimpleLruGetBankLock(NotifyCtl, pageno);
+			if (nextlock != banklock)
+			{
+				LWLockRelease(banklock);
+				LWLockAcquire(nextlock, LW_EXCLUSIVE);
+			}
+			SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
+			if (nextlock != banklock)
+			{
+				LWLockRelease(nextlock);
+				LWLockAcquire(banklock, LW_EXCLUSIVE);
+			}
+
+			/* Set cleanup flag if appropriate */
+			if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
+				tryAdvanceTail = true;
+		}
+
+		/* Update the global queue head */
+		QUEUE_HEAD = queue_head;
+	}
+	else
+	{
+		/*
+		 * Entry doesn't fit on current page. This should be very rare with
+		 * our small compact entries, but handle it by padding the page and
+		 * writing to the next page.
+		 */
+		AsyncQueueEntry padding;
+
+		memset(&padding, 0, sizeof(padding));
+		padding.dbid = InvalidOid;  /* Mark as padding */
+
+		/* Fill the rest of the page with padding */
+		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
+			   &padding,
+			   QUEUE_PAGESIZE - offset);
+
+		/* Advance to next page */
+		asyncQueueAdvance(&queue_head, QUEUE_PAGESIZE - offset);
+
+		/* Recursively add the entry on the new page */
+		QUEUE_HEAD = queue_head;
+		LWLockRelease(banklock);
+		asyncQueueAddCompactEntry(dbid, xid, notify_lsn);
+		return;
+	}
+
+	LWLockRelease(banklock);
+}
+
 /*
  * Advance the shared queue tail variable to the minimum of all the
  * per-backend tail pointers.  Truncate pg_notify space if possible.
@@ -2395,3 +2330,62 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
 {
 	return check_slru_buffers("notify_buffers", newval);
 }
+
+/*
+ * Write a WAL record containing async notification data
+ *
+ * This logs notification data to WAL, allowing us to release locks earlier
+ * and maintain commit ordering through WAL's natural ordering guarantees.
+ */
+XLogRecPtr
+LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid,
+			   uint32 nnotifications, Size data_len, char *data)
+{
+	xl_async_notify_data xlrec;
+
+
+	xlrec.dbid = dboid;
+	xlrec.xid = xid;
+	xlrec.srcPid = srcPid;
+	xlrec.nnotifications = nnotifications;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, SizeOfAsyncNotifyData);
+    XLogRegisterData(data, data_len);
+
+    (void) XLogInsert(RM_ASYNC_ID, XLOG_ASYNC_NOTIFY_DATA);
+
+    /* Return the begin LSN of the record we just inserted. */
+    return ProcLastRecPtr;
+}
+
+
+
+
+/*
+ * Redo function for async notification WAL records
+ *
+ * During recovery, we need to replay notification records. For now,
+ * we'll add them to the traditional notification queue. In a complete
+ * implementation, replaying backends would read directly from WAL.
+ */
+void
+async_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		case XLOG_ASYNC_NOTIFY_DATA:
+			/* 
+			 * For notification data records, we don't need to do anything
+			 * during recovery since listeners will read directly from WAL.
+			 * The data is already durably stored in the WAL record itself.
+			 */
+			break;
+
+
+		default:
+			elog(PANIC, "async_redo: unknown op code %u", info);
+	}
+}
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 8f4b282c6b1..b35b007e51c 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -19,6 +19,7 @@
 #include "access/xlogreader.h"
 #include "catalog/pg_control.h"
 #include "catalog/storage_xlog.h"
+#include "commands/async.h"
 #include "commands/dbcommands_xlog.h"
 #include "fe_utils/archive.h"
 #include "filemap.h"
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index fac509ed134..03e73ae33c9 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -8,6 +8,7 @@
 #define FRONTEND 1
 #include "postgres.h"
 
+#include "access/async_xlog.h"
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
@@ -23,6 +24,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/storage_xlog.h"
+#include "commands/async.h"
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
diff --git a/src/include/access/async_xlog.h b/src/include/access/async_xlog.h
new file mode 100644
index 00000000000..d4c0c828e84
--- /dev/null
+++ b/src/include/access/async_xlog.h
@@ -0,0 +1,43 @@
+/*-------------------------------------------------------------------------
+ *
+ * async_xlog.h
+ *	  Async notification WAL definitions
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/async_xlog.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef ASYNC_XLOG_H
+#define ASYNC_XLOG_H
+
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
+
+/*
+ * WAL record types for async notifications
+ */
+#define XLOG_ASYNC_NOTIFY_DATA	0x00	/* notification data */
+
+/*
+ * WAL record for notification data (written in PreCommit_Notify)
+ */
+typedef struct xl_async_notify_data
+{
+	Oid			dbid;			/* database ID */
+	TransactionId xid;			/* transaction ID */
+	int32		srcPid;			/* source backend PID */
+	uint32		nnotifications;	/* number of notifications */
+	/* followed by serialized notification data */
+} xl_async_notify_data;
+
+#define SizeOfAsyncNotifyData	(offsetof(xl_async_notify_data, nnotifications) + sizeof(uint32))
+
+extern void async_redo(XLogReaderState *record);
+extern void async_desc(StringInfo buf, XLogReaderState *record);
+extern const char *async_identify(uint8 info);
+
+#endif							/* ASYNC_XLOG_H */
\ No newline at end of file
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 8e7fc9db877..58293e05165 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
 PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+PG_RMGR(RM_ASYNC_ID, "Async", async_redo, async_desc, async_identify, NULL, NULL, NULL, NULL)
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index b2bc10ee041..aa1e2733976 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -194,6 +194,7 @@ typedef struct SavedTransactionCharacteristics
 #define XACT_XINFO_HAS_AE_LOCKS			(1U << 6)
 #define XACT_XINFO_HAS_GID				(1U << 7)
 #define XACT_XINFO_HAS_DROPPED_STATS	(1U << 8)
+#define XACT_XINFO_HAS_NOTIFY			(1U << 9)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -317,6 +318,11 @@ typedef struct xl_xact_origin
 	TimestampTz origin_timestamp;
 } xl_xact_origin;
 
+typedef struct xl_xact_notify
+{
+	XLogRecPtr	notify_lsn;		/* LSN of notification data in WAL */
+} xl_xact_notify;
+
 typedef struct xl_xact_commit
 {
 	TimestampTz xact_time;		/* time of commit */
@@ -330,6 +336,7 @@ typedef struct xl_xact_commit
 	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
 	/* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */
 	/* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */
+	/* xl_xact_notify follows if XINFO_HAS_NOTIFY, stored unaligned! */
 } xl_xact_commit;
 #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
 
@@ -403,6 +410,8 @@ typedef struct xl_xact_parsed_commit
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
+
+	XLogRecPtr	notify_lsn;		/* LSN of notification data */
 } xl_xact_parsed_commit;
 
 typedef xl_xact_parsed_commit xl_xact_parsed_prepare;
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..7e9f10cb84b 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -14,11 +14,25 @@
 #define ASYNC_H
 
 #include <signal.h>
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
 
 extern PGDLLIMPORT bool Trace_notify;
 extern PGDLLIMPORT int max_notify_queue_pages;
 extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending;
 
+/*
+ * Compact SLRU queue entry - stores metadata pointing to WAL data
+ */
+typedef struct AsyncQueueEntry
+{
+	Oid			dbid;			/* database ID for quick filtering */
+	TransactionId	xid;			/* transaction ID */
+	XLogRecPtr	notify_lsn;		/* LSN of notification data in WAL */
+} AsyncQueueEntry;
+
+#define ASYNC_QUEUE_ENTRY_SIZE	sizeof(AsyncQueueEntry)
+
 extern Size AsyncShmemSize(void);
 extern void AsyncShmemInit(void);
 
@@ -46,4 +60,15 @@ extern void HandleNotifyInterrupt(void);
 /* process interrupts */
 extern void ProcessNotifyInterrupt(bool flush);
 
+/* WAL-based notification functions */
+extern XLogRecPtr LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid,
+									 uint32 nnotifications, Size data_len, char *data);
+extern void async_redo(XLogReaderState *record);
+extern void async_desc(StringInfo buf, XLogReaderState *record);
+extern const char *async_identify(uint8 info);
+
+/* notification queue functions */
+extern void asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn);
+extern void SignalBackends(void);
+
 #endif							/* ASYNC_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index c6f5ebceefd..71459fe5529 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -301,6 +301,9 @@ struct PGPROC
 
 	uint32		wait_event_info;	/* proc's wait information */
 
+	/* Support for async notifications */
+	XLogRecPtr	notifyCommitLsn;	/* LSN of notification data for current xact */
+
 	/* Support for group transaction status update. */
 	bool		clogGroupMember;	/* true, if member of clog group */
 	pg_atomic_uint32 clogGroupNext; /* next clog group member */
-- 
2.39.3 (Apple Git-145)

#22Matheus Alcantara
matheusssilv97@gmail.com
In reply to: Rishu Bagga (#21)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Tue Sep 9, 2025 at 8:14 PM -03, Rishu Bagga wrote:

Oops again - I didn't "git add" the new files, so they weren't showing
up in the patch. I added async_xlog.h as well now,
and tested to make sure the patch applies and compiles. Sorry about
that, it should work now.

Thanks, it's compiling now.

You wrote:

I wasn’t able to run the TAP tests; however, in the updated patch, we
can be confident that entries in the queue are from committed
transactions. If there is a crash before committing and after writing to
the queue, this would be within the critical section, so a notification
from an uncommitted transaction would never be read in the queue.
That allows us to remove the XidInMVCCSnapshot and
TransactionIdDidCommit check.

I've executed the TAP test that I've attached on [1]/messages/by-id/DCLSWKOKDAX4.3HS2NBE53P0M2@gmail.com and the test is
failing due to an assert failure:
TRAP: failed Assert("CritSectionCount == 0 || (context)->allowInCritSection"), File: "../src/backend/utils/mmgr/mcxt.c", Line: 1372, PID: 79968
0 postgres 0x0000000105661860 ExceptionalCondition + 236
1 postgres 0x00000001056b7a0c palloc + 248
2 postgres 0x0000000104f96850 SignalBackends + 36
3 postgres 0x0000000104edd50c XactLogCommitRecord + 1632
4 postgres 0x0000000104ee0188 RecordTransactionCommit + 860
5 postgres 0x0000000104edcb8c CommitTransaction + 896
6 postgres 0x0000000104ed7b38 CommitTransactionCommandInternal + 256
7 postgres 0x0000000104ed7a24 CommitTransactionCommand + 16
8 postgres 0x0000000105402220 finish_xact_command + 32
9 postgres 0x00000001053ffb60 exec_simple_query + 1556
10 postgres 0x00000001053feb78 PostgresMain + 3424
11 postgres 0x00000001053f5504 BackendInitialize + 0
12 postgres 0x00000001052c11f4 postmaster_child_launch + 492
13 postgres 0x00000001052c9660 BackendStartup + 336
14 postgres 0x00000001052c70e0 ServerLoop + 432
15 postgres 0x00000001052c5a38 PostmasterMain + 7096
16 postgres 0x0000000105166488 main + 952
17 dyld 0x000000019145ab98 start + 6076

There is also some other tests failing, like isolation, regress and
others.

[1]: /messages/by-id/DCLSWKOKDAX4.3HS2NBE53P0M2@gmail.com

--
Matheus Alcantara

#23Arseniy Mukhin
arseniy.mukhin.dev@gmail.com
In reply to: Matheus Alcantara (#22)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

Hi,

Thank you for the new version! There are several points I noticed:

1)

On Tue, Sep 9, 2025 at 3:08 AM Rishu Bagga <rishu.postgres@gmail.com> wrote:

...
On Sat, Sep 6, 2025 at 7:52 AM Matheus Alcantara
<matheusssilv97@gmail.com> wrote:

Your patch already aims to fix the issue? On [2] I implemented a TAP
test that reproduce the issue and I tried to execute using your patch
and I still see the error. I'm attaching the TAP test isolated and
maybe we could incorporate into your patch series to ensure that the
issue is fixed? What do you think?

I wasn’t able to run the TAP tests; however, in the updated patch, we
can be confident that entries in the queue are from committed
transactions. If there is a crash before committing and after writing to
the queue, this would be within the critical section, so a notification
from an uncommitted transaction would never be read in the queue.
That allows us to remove the XidInMVCCSnapshot and
TransactionIdDidCommit check.

I agree that listeners don't need to check if the notify transaction
was committed or not anymore, but it seems that listeners still need
to wait until the notify transaction is completed before sending its
notifications. I believe we should continue using XidInMVCCSnapshot as
the current version does.

2) Now we have two calls to SignalBackends (is it intentional?). The
first in AtCommit_Notify() which seems correct to me. The second in
XactLogCommitRecord() seems too early, because other backends can't
process new notifications at this point (if the point about
XidInMVCCSnapshot is correct). And there is Assert failure (Matheus'
report is about it).

3) I think we still need to check if the queue is full or not while
adding new entries and ereport if it is (entries are smaller now, but
we still can run out of space). And it seems to me that we should do
this check before entering the critical section, because in the
critical section it will result in PANIC. Master results in ERROR in
case of the full queue, so should we do the same here?

4) I don't know, but It seems to me that XactLogCommitRecord is not
the right place for adding listen/notify logic, it seems to be only
about wal stuff.

5) Do we want to use NotifyQueueLock as a lock that provides the
commit order? Maybe we can introduce another lock to avoid unnecessary
contantion? For example, if we use NotifyQueueLock, we block listeners
that want to read new notifications while we are inserting a commit
record, which seems unnecessary.

6) We have fixed size queue entries, so I think we don't need this
"padding" logic at the end of the page anymore, because we know how
many entries we can have on each page.

BTW, what do you think about creating a separate thread for the patch?
The current thread's subject seems a bit irrelevant.

Best regards,
Arseniy Mukhin

#24Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Rishu Bagga (#13)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Thu, Sep 4, 2025 at 3:53 PM Rishu Bagga <rishu.postgres@gmail.com> wrote:

On Fri, Jul 18, 2025 at 10:06 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:

After thinking about this for awhile, I have a rough idea of
something we could do to improve parallelism of NOTIFY.
As a bonus, this'd allow processes on hot standby servers to
receive NOTIFYs from processes on the primary, which is a
feature many have asked for.

The core thought here was to steal some implementation ideas
from two-phase commit. I initially thought maybe we could
remove the SLRU queue entirely, and maybe we can still find
a way to do that, but in this sketch it's still there with
substantially reduced traffic.

The idea basically is to use the WAL log rather than SLRU
as transport for notify messages.

1. In PreCommit_Notify(), gather up all the notifications this
transaction has emitted, and write them into a WAL log message.
Remember the LSN of this message. (I think this part should be
parallelizable, because of work that's previously been done to
allow parallel writes to WAL.)

2. When writing the transaction's commit WAL log entry, include
the LSN of the previous notify-data entry.

3. Concurrently with writing the commit entry, send a message
to the notify SLRU queue. This would be a small fixed-size
message with the transaction's XID, database ID, and the LSN
of the notify-data WAL entry. (The DBID is there to let
listeners quickly ignore traffic from senders in other DBs.)

4. Signal listening backends to check the queue, as we do now.

5. Listeners read the SLRU queue and then, if in same database,
pull the notify data out of the WAL log. (I believe we already
have enough infrastructure to make that cheap, because 2-phase
commit does it too.)

In the simplest implementation of this idea, step 3 would still
require a global lock, to ensure that SLRU entries are made in
commit order. However, that lock only needs to be held for the
duration of step 3, which is much shorter than what happens now.

Attached is an initial patch that implements this idea.

There is still some
work to be done around how to handle truncation / vacuum for the new
approach, and testing replication of notifications onto a reader instance.

That being said, I ran some basic benchmarking to stress concurrent
notifications.

With the following sql script, I ran
pgbench -T 100 -c 100 -j 8 -f pgbench_transaction_notify.sql -d postgres

BEGIN;
INSERT INTO test VALUES(1);
NOTIFY benchmark_channel, 'transaction_completed';
COMMIT;

With the patch 3 runs showed the following TPS:

tps = 66372.705917
tps = 63445.909465
tps = 64412.544339

Without the patch, we got the following TPS:

tps = 30212.390982
tps = 30908.865812
tps = 29191.388601

So, there is about a 2x increase in TPS at 100 connections, which establishes
some promise in the approach.

Looks promising improvement.

Additionally, this would help solve the issue being discussed in a
separate thread [1],
where listeners currently rely on the transaction log to verify if a
transaction that it reads
has indeed committed, but it is possible that the portion of the
transaction log has
been truncated by vacuum.

With your patch, since the backends get the notification by reading
WAL records do we need to prevent WAL records that potentially have
unconsumed notification from being removed by the checkpointer? Or we
can save unconsumed notifications in WAL records to somewhere during
the checkpoint as we do for 2PC transactions.

Also, could you add this patch to the next commit fest[1]https://commitfest.postgresql.org/56/ if not yet?

Regards,

[1]: https://commitfest.postgresql.org/56/

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#25Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Arseniy Mukhin (#23)
Re: Proposal: Out-of-Order NOTIFY via GUC to Improve LISTEN/NOTIFY Throughput

On Wed, Sep 10, 2025 at 12:00 PM Arseniy Mukhin
<arseniy.mukhin.dev@gmail.com> wrote:

6) We have fixed size queue entries, so I think we don't need this
"padding" logic at the end of the page anymore, because we know how
many entries we can have on each page.

+1

Probably we also no longer need to have a pair of page number and
offset number as an entry queue's position since AsyncQueueEntry is
now a fixed size.

BTW, what do you think about creating a separate thread for the patch?
The current thread's subject seems a bit irrelevant.

+1

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com