Parallel Apply
Hi,
Background and Motivation
-------------------------------------
In high-throughput systems, where hundreds of sessions generate data
on the publisher, the subscriber's apply process often becomes a
bottleneck due to the single apply worker model. While users can
mitigate this by creating multiple publication-subscription pairs,
this approach has scalability and usability limitations.
Currently, PostgreSQL supports parallel apply only for large streaming
transactions (streaming=parallel). This proposal aims to extend
parallelism to non-streaming transactions, thereby improving
replication performance in workloads dominated by smaller, frequent
transactions.
Design Overview
------------------------
To safely parallelize non-streaming transactions, we must ensure that
transaction dependencies are respected to avoid failures and
deadlocks. Consider the following scenarios to understand it better:
(a) Transaction failures: Say, if we insert a row in the first
transaction and update it in the second transaction on the publisher,
then allowing the subscriber to apply both in parallel can lead to
failure in the update; (b) Deadlocks - allowing transactions that
update the same set of rows in a table in the opposite order in
parallel can lead to deadlocks.
The core idea is that the leader apply worker ensures the following:
a. Identifies dependencies between transactions. b. Coordinates
parallel workers to apply independent transactions concurrently. c.
Ensures correct ordering for dependent transactions.
Dependency Detection
--------------------------------
1. Basic Dependency Tracking: Maintain a hash table keyed by
(RelationId, ReplicaIdentity) with the value as the transaction XID.
Before dispatching a change to a parallel worker, the leader checks
for existing entries: (a) If no match: add the entry and proceed; (b)
If match: instruct the worker to wait until the dependent transaction
completes.
2. Unique Keys
In addition to RI, track unique keys to detect conflicts. Example:
CREATE TABLE tab1(a INT PRIMARY KEY, b INT UNIQUE);
Transactions on publisher:
Txn1: INSERT (1,1)
Txn2: INSERT (2,2)
Txn3: DELETE (2,2)
Txn4: UPDATE (1,1) → (1,2)
If Txn4 is applied before Txn2 and Txn3, it will fail due to a unique
constraint violation. To prevent this, track both RI and unique keys
in the hash table. Compare keys of both old and new tuples to detect
dependencies. Then old_tuple's RI needs to be compared, and new
tuple's, both unique key and RI (new tuple's RI is required to detect
some prior insertion with the same key) needs to be compared with
existing hash table entries to identify transaction dependency.
3. Foreign Keys
Consider FK constraints between tables. Example:
TABLE owner(user_id INT PRIMARY KEY);
TABLE car(car_name TEXT, user_id INT REFERENCES owner);
Transactions:
Txn1: INSERT INTO owner(1)
Txn2: INSERT INTO car('bz', 1)
Applying Txn2 before Txn1 will fail. To avoid this, check if FK values
in new tuples match any RI or unique key in the hash table. If
matched, treat the transaction as dependent.
4. Triggers and Constraints
For the initial version, exclude tables with user-defined triggers or
constraints from parallel apply due to complexity in dependency
detection. We may need some parallel-apply-safe marking to allow this.
Replication Progress Tracking
-----------------------------------------
Parallel apply introduces out-of-order commit application,
complicating replication progress tracking. To handle restarts and
ensure consistency:
Track Three Key Metrics:
lowest_remote_lsn: Starting point for applying transactions.
highest_remote_lsn: Highest LSN that has been applied.
list_remote_lsn: List of commit LSNs applied between the lowest and highest.
Mechanism:
Store these in ReplicationState: lowest_remote_lsn,
highest_remote_lsn, list_remote_lsn. Flush these to disk during
checkpoints similar to CheckPointReplicationOrigin.
After Restart, Start from lowest_remote_lsn and for each transaction,
if its commit LSN is in list_remote_lsn, skip it, otherwise, apply it.
Once commit LSN > highest_remote_lsn, apply without checking the list.
During apply, the leader maintains list_in_progress_xacts in the
increasing commit order. On commit, update highest_remote_lsn. If
commit LSN matches the first in-progress xact of
list_in_progress_xacts, update lowest_remote_lsn, otherwise, add to
list_remote_lsn. After commit, also remove it from the
list_in_progress_xacts. We need to clean up entries below
lowest_remote_lsn in list_remote_lsn while updating its value.
To illustrate how this mechanism works, consider the following four
transactions:
Transaction ID Commit LSN
501 1000
502 1100
503 1200
504 1300
Assume:
Transactions 501 and 502 take longer to apply whereas transactions 503
and 504 finish earlier. Parallel apply workers are assigned as
follows:
pa-1 → 501
pa-2 → 502
pa-3 → 503
pa-4 → 504
Initial state: list_in_progress_xacts = [501, 502, 503, 504]
Step 1: Transaction 503 commits first and in RecordTransactionCommit,
it updates highest_remote_lsn to 1200. In apply_handle_commit, since
503 is not the first in list_in_progress_xacts, add 1200 to
list_remote_lsn. Remove 503 from list_in_progress_xacts.
Step 2: Transaction 504 commits, Update highest_remote_lsn to 1300.
Add 1300 to list_remote_lsn. Remove 504 from list_in_progress_xacts.
ReplicationState now:
lowest_remote_lsn = 0
list_remote_lsn = [1200, 1300]
highest_remote_lsn = 1300
list_in_progress_xacts = [501, 502]
Step 3: Transaction 501 commits. Since 501 is now the first in
list_in_progress_xacts, update lowest_remote_lsn to 1000. Remove 501
from list_in_progress_xacts. Clean up list_remote_lsn to remove
entries < lowest_remote_lsn (none in this case).
ReplicationState now:
lowest_remote_lsn = 1000
list_remote_lsn = [1200, 1300]
highest_remote_lsn = 1300
list_in_progress_xacts = [502]
Step 4: System crash and restart
Upon restart, Start replication from lowest_remote_lsn = 1000. First
transaction encountered is 502, since it is not present in
list_remote_lsn, apply it. As transactions 503 and 504 are present in
list_remote_lsn, we skip them. Note that each transaction's
end_lsn/commit_lsn has to be compared which the apply worker receives
along with the first transaction command BEGIN. This ensures
correctness and avoids duplicate application of already committed
transactions.
Upon restart, start replication from lowest_remote_lsn = 1000. First
transaction encountered is 502 with commit LSN 1100, since it is not
present in list_remote_lsn, apply it. As transactions 503 and 504's
respective commit LSNs [1200, 1300] are present in list_remote_lsn, we
skip them. This ensures correctness and avoids duplicate application
of already committed transactions.
Now, it is possible that some users may want to parallelize the
transaction but still want to maintain commit order because they don't
explicitly annotate FK, PK for columns but maintain the integrity via
application. So, in such cases as we won't be able to detect
transaction dependencies, it would be better to allow out-of-order
commits optionally.
Thoughts?
--
With Regards,
Amit Kapila.
Hi!
On Mon, 11 Aug 2025 at 09:46, Amit Kapila <amit.kapila16@gmail.com> wrote:
Hi,
Background and Motivation
-------------------------------------
In high-throughput systems, where hundreds of sessions generate data
on the publisher, the subscriber's apply process often becomes a
bottleneck due to the single apply worker model. While users can
mitigate this by creating multiple publication-subscription pairs,
this approach has scalability and usability limitations.Currently, PostgreSQL supports parallel apply only for large streaming
transactions (streaming=parallel). This proposal aims to extend
parallelism to non-streaming transactions, thereby improving
replication performance in workloads dominated by smaller, frequent
transactions.
Sure.
Design Overview
------------------------
To safely parallelize non-streaming transactions, we must ensure that
transaction dependencies are respected to avoid failures and
deadlocks. Consider the following scenarios to understand it better:
(a) Transaction failures: Say, if we insert a row in the first
transaction and update it in the second transaction on the publisher,
then allowing the subscriber to apply both in parallel can lead to
failure in the update; (b) Deadlocks - allowing transactions that
update the same set of rows in a table in the opposite order in
parallel can lead to deadlocks.
Build-in subsystem for transaction dependency tracking would be highly
beneficial for physical replication speedup projects like[0]https://github.com/koichi-szk/postgres
Thoughts?
Surely we need to give it a try.
[0]: https://github.com/koichi-szk/postgres
--
Best regards,
Kirill Reshke
On Mon, Aug 11, 2025 at 1:39 PM Kirill Reshke <reshkekirill@gmail.com> wrote:
Design Overview
------------------------
To safely parallelize non-streaming transactions, we must ensure that
transaction dependencies are respected to avoid failures and
deadlocks. Consider the following scenarios to understand it better:
(a) Transaction failures: Say, if we insert a row in the first
transaction and update it in the second transaction on the publisher,
then allowing the subscriber to apply both in parallel can lead to
failure in the update; (b) Deadlocks - allowing transactions that
update the same set of rows in a table in the opposite order in
parallel can lead to deadlocks.Build-in subsystem for transaction dependency tracking would be highly
beneficial for physical replication speedup projects like[0]
I am not sure if that is directly applicable because this work
proposes to track dependencies based on logical WAL contents. However,
if you can point me to README on the overall design of the work you
are pointing to then I can check it once.
--
With Regards,
Amit Kapila.
On Mon, 11 Aug 2025 at 13:45, Amit Kapila <amit.kapila16@gmail.com> wrote:
I am not sure if that is directly applicable because this work
proposes to track dependencies based on logical WAL contents. However,
if you can point me to README on the overall design of the work you
are pointing to then I can check it once.
The only doc on this that I am aware of is [0]https://wiki.postgresql.org/wiki/Parallel_Recovery. The project is however
more dead than alive, but I hope this is just a temporary stop of
development, not permanent.
[0]: https://wiki.postgresql.org/wiki/Parallel_Recovery
--
Best regards,
Kirill Reshke
On 11/8/2025 06:45, Amit Kapila wrote:
The core idea is that the leader apply worker ensures the following:
a. Identifies dependencies between transactions. b. Coordinates
parallel workers to apply independent transactions concurrently. c.
Ensures correct ordering for dependent transactions.
Dependency detection may be quite an expensive operation. What about a
'positive' approach - deadlock detection on replica and, restart apply
of a record that should be applied later? Have you thought about this
way? What are the pros and cons here? Do you envision common cases where
such a deadlock will be frequent?
--
regards, Andrei Lepikhov
On Tue, Aug 12, 2025 at 12:04 PM Andrei Lepikhov <lepihov@gmail.com> wrote:
On 11/8/2025 06:45, Amit Kapila wrote:
The core idea is that the leader apply worker ensures the following:
a. Identifies dependencies between transactions. b. Coordinates
parallel workers to apply independent transactions concurrently. c.
Ensures correct ordering for dependent transactions.Dependency detection may be quite an expensive operation. What about a
'positive' approach - deadlock detection on replica and, restart apply
of a record that should be applied later? Have you thought about this
way? What are the pros and cons here? Do you envision common cases where
such a deadlock will be frequent?
It is not only deadlocks but we could also incorrectly apply some
transactions which should otherwise fail. For example, consider
following case:
Pub: t1(c1 int unique key, c2 int)
Sub: t1(c1 int unique key, c2 int)
On Pub:
TXN-1
insert(1,11)
TXN-2
update (1,11) --> update (2,12)
On Sub:
table contains (1,11) before replication.
Now, if we allow dependent transactions to go in parallel, instead of
giving an ERROR while doing Insert, the update will be successful and
next insert will also be successful. This will create inconsistency on
the subscriber-side.
Similarly consider another set of transactions:
On Pub:
TXN-1
insert(1,11)
TXN-2
Delete (1,11)
On subscriber, if we allow TXN-2 before TXN-1, then the subscriber
will apply both transactions successfully but will become inconsistent
w.r.t publisher.
My colleague had already built a POC based on this idea and we did
check some initial numbers for non-dependent transactions and the
apply speed has improved drastically. We will share the POC patch and
numbers in the next few days.
For the dependent transactions workload, if we choose to go with the
deadlock detection approach, there will be lot of retries which may
not lead to good apply improvements. Also, we may choose to enable
this form of parallel-apply optionally due to reasons mentioned in my
first email, so if there is overhead due to dependency tracking then
one can disable parally apply for those particular subscriptions.
--
With Regards,
Amit Kapila.
On Mon, Aug 11, 2025 at 3:00 PM Kirill Reshke <reshkekirill@gmail.com> wrote:
On Mon, 11 Aug 2025 at 13:45, Amit Kapila <amit.kapila16@gmail.com> wrote:
I am not sure if that is directly applicable because this work
proposes to track dependencies based on logical WAL contents. However,
if you can point me to README on the overall design of the work you
are pointing to then I can check it once.The only doc on this that I am aware of is [0]. The project is however
more dead than alive, but I hope this is just a temporary stop of
development, not permanent.
Thanks for sharing the wiki page. After reading, it seems we can't use
the exact dependency tracking mechanism as both the projects have
different dependency requirements. However, it could be an example to
refer to and maybe some parts of the infrastructure could be reused.
--
With Regards,
Amit Kapila.
On 11.08.2025 7:45 AM, Amit Kapila wrote:
Hi,
Background and Motivation
-------------------------------------
In high-throughput systems, where hundreds of sessions generate data
on the publisher, the subscriber's apply process often becomes a
bottleneck due to the single apply worker model. While users can
mitigate this by creating multiple publication-subscription pairs,
this approach has scalability and usability limitations.
Currently, PostgreSQL supports parallel apply only for large streaming
transactions (streaming=parallel). This proposal aims to extend
parallelism to non-streaming transactions, thereby improving
replication performance in workloads dominated by smaller, frequent
transactions.
Design Overview
------------------------
To safely parallelize non-streaming transactions, we must ensure that
transaction dependencies are respected to avoid failures and
deadlocks. Consider the following scenarios to understand it better:
(a) Transaction failures: Say, if we insert a row in the first
transaction and update it in the second transaction on the publisher,
then allowing the subscriber to apply both in parallel can lead to
failure in the update; (b) Deadlocks - allowing transactions that
update the same set of rows in a table in the opposite order in
parallel can lead to deadlocks.
The core idea is that the leader apply worker ensures the following:
a. Identifies dependencies between transactions. b. Coordinates
parallel workers to apply independent transactions concurrently. c.
Ensures correct ordering for dependent transactions.
Dependency Detection
--------------------------------
1. Basic Dependency Tracking: Maintain a hash table keyed by
(RelationId, ReplicaIdentity) with the value as the transaction XID.
Before dispatching a change to a parallel worker, the leader checks
for existing entries: (a) If no match: add the entry and proceed; (b)
If match: instruct the worker to wait until the dependent transaction
completes.
2. Unique Keys
In addition to RI, track unique keys to detect conflicts. Example:
CREATE TABLE tab1(a INT PRIMARY KEY, b INT UNIQUE);
Transactions on publisher:
Txn1: INSERT (1,1)
Txn2: INSERT (2,2)
Txn3: DELETE (2,2)
Txn4: UPDATE (1,1) → (1,2)
If Txn4 is applied before Txn2 and Txn3, it will fail due to a unique
constraint violation. To prevent this, track both RI and unique keys
in the hash table. Compare keys of both old and new tuples to detect
dependencies. Then old_tuple's RI needs to be compared, and new
tuple's, both unique key and RI (new tuple's RI is required to detect
some prior insertion with the same key) needs to be compared with
existing hash table entries to identify transaction dependency.
3. Foreign Keys
Consider FK constraints between tables. Example:
TABLE owner(user_id INT PRIMARY KEY);
TABLE car(car_name TEXT, user_id INT REFERENCES owner);
Transactions:
Txn1: INSERT INTO owner(1)
Txn2: INSERT INTO car('bz', 1)
Applying Txn2 before Txn1 will fail. To avoid this, check if FK values
in new tuples match any RI or unique key in the hash table. If
matched, treat the transaction as dependent.
4. Triggers and Constraints
For the initial version, exclude tables with user-defined triggers or
constraints from parallel apply due to complexity in dependency
detection. We may need some parallel-apply-safe marking to allow this.
Replication Progress Tracking
-----------------------------------------
Parallel apply introduces out-of-order commit application,
complicating replication progress tracking. To handle restarts and
ensure consistency:
Track Three Key Metrics:
lowest_remote_lsn: Starting point for applying transactions.
highest_remote_lsn: Highest LSN that has been applied.
list_remote_lsn: List of commit LSNs applied between the lowest and highest.
Mechanism:
Store these in ReplicationState: lowest_remote_lsn,
highest_remote_lsn, list_remote_lsn. Flush these to disk during
checkpoints similar to CheckPointReplicationOrigin.
After Restart, Start from lowest_remote_lsn and for each transaction,
if its commit LSN is in list_remote_lsn, skip it, otherwise, apply it.
Once commit LSN > highest_remote_lsn, apply without checking the list.
During apply, the leader maintains list_in_progress_xacts in the
increasing commit order. On commit, update highest_remote_lsn. If
commit LSN matches the first in-progress xact of
list_in_progress_xacts, update lowest_remote_lsn, otherwise, add to
list_remote_lsn. After commit, also remove it from the
list_in_progress_xacts. We need to clean up entries below
lowest_remote_lsn in list_remote_lsn while updating its value.
To illustrate how this mechanism works, consider the following four
transactions:
Transaction ID Commit LSN
501 1000
502 1100
503 1200
504 1300
Assume:
Transactions 501 and 502 take longer to apply whereas transactions 503
and 504 finish earlier. Parallel apply workers are assigned as
follows:
pa-1 → 501
pa-2 → 502
pa-3 → 503
pa-4 → 504
Initial state: list_in_progress_xacts = [501, 502, 503, 504]
Step 1: Transaction 503 commits first and in RecordTransactionCommit,
it updates highest_remote_lsn to 1200. In apply_handle_commit, since
503 is not the first in list_in_progress_xacts, add 1200 to
list_remote_lsn. Remove 503 from list_in_progress_xacts.
Step 2: Transaction 504 commits, Update highest_remote_lsn to 1300.
Add 1300 to list_remote_lsn. Remove 504 from list_in_progress_xacts.
ReplicationState now:
lowest_remote_lsn = 0
list_remote_lsn = [1200, 1300]
highest_remote_lsn = 1300
list_in_progress_xacts = [501, 502]
Step 3: Transaction 501 commits. Since 501 is now the first in
list_in_progress_xacts, update lowest_remote_lsn to 1000. Remove 501
from list_in_progress_xacts. Clean up list_remote_lsn to remove
entries < lowest_remote_lsn (none in this case).
ReplicationState now:
lowest_remote_lsn = 1000
list_remote_lsn = [1200, 1300]
highest_remote_lsn = 1300
list_in_progress_xacts = [502]
Step 4: System crash and restart
Upon restart, Start replication from lowest_remote_lsn = 1000. First
transaction encountered is 502, since it is not present in
list_remote_lsn, apply it. As transactions 503 and 504 are present in
list_remote_lsn, we skip them. Note that each transaction's
end_lsn/commit_lsn has to be compared which the apply worker receives
along with the first transaction command BEGIN. This ensures
correctness and avoids duplicate application of already committed
transactions.
Upon restart, start replication from lowest_remote_lsn = 1000. First
transaction encountered is 502 with commit LSN 1100, since it is not
present in list_remote_lsn, apply it. As transactions 503 and 504's
respective commit LSNs [1200, 1300] are present in list_remote_lsn, we
skip them. This ensures correctness and avoids duplicate application
of already committed transactions.
Now, it is possible that some users may want to parallelize the
transaction but still want to maintain commit order because they don't
explicitly annotate FK, PK for columns but maintain the integrity via
application. So, in such cases as we won't be able to detect
transaction dependencies, it would be better to allow out-of-order
commits optionally.
Thoughts?
Hi,
This is something similar to what I have in mind when starting my
experiments with LR apply speed improvements. I think that maintaining a
full (RelationId, ReplicaIdentity) hash may be too expensive - there can
be hundreds of active transactions updating millions of rows.
I thought about something like a bloom filter. But frankly speaking I
didn't go far in thinking about all implementation details. Your proposal
is much more concrete.
But I decided to implement first approach with prefetch, which is much more
simple, similar with prefetching currently used for physical replication
and still provide quite significant improvement:
/messages/by-id/84ed36b8-7d06-4945-9a6b-3826b3f999a6@garret.ru
There is one thing which I do not completely understand with your proposal:
do you assume that LR walsender at publisher will use reorder buffer to
"serialize" transactions
or you assume that streaming mode will be used (now it is possible to
enforce parallel apply of short transactions using
`debug_logical_replication_streaming`)?
It seems to be senseless to spend time and memory trying to serialize
transactions at the publisher if we in any case want to apply them in
parallel at subscriber.
But then there is another problem: at publisher there can be hundreds of
concurrent active transactions (limited only by `max_connections`) which
records are intermixed in WAL.
If we try to apply them concurrently at subscriber, we need a corresponding
number of parallel apply workers. But usually the number of such workers is
less than 10 (and default is 2).
So looks like we need to serialize transactions at subscriber side.
Assume that there are 100 concurrent transactions T1..T100, i.e. before
first COMMIT record there are mixed records of 100 transactions.
And there are just two parallel apply workers W1 and W2. Main LR apply
worker with send T1 record to W1, T2 record to W2 and ... there are not
more vacant workers.
It has either to spawn additional ones, but it is not always possible
because total number of background workers is limited.
Either serialize all other transactions in memory or on disk, until it
reaches COMMIT of T1 or T2.
I afraid that such serialization will eliminate any advantages of parallel
apply.
Certainly if we do reordering of transactions at publisher side, then there
is no such problem. Subscriber receives all records for T1, then all
records for T2, ... If there are no more vacant workers, it can just wait
until any of this transactions is completed. But I am afraid that in this
case the reorder buffer at the publisher will be a bottleneck.
On Mon, Aug 11, 2025 at 10:15:41AM +0530, Amit Kapila wrote:
Hi,
Background and Motivation
-------------------------------------
In high-throughput systems, where hundreds of sessions generate data
on the publisher, the subscriber's apply process often becomes a
bottleneck due to the single apply worker model. While users can
mitigate this by creating multiple publication-subscription pairs,
this approach has scalability and usability limitations.Currently, PostgreSQL supports parallel apply only for large streaming
transactions (streaming=parallel). This proposal aims to extend
parallelism to non-streaming transactions, thereby improving
replication performance in workloads dominated by smaller, frequent
transactions.
I thought the approach for improving WAL apply speed, for both binary
and logical, was pipelining:
https://en.wikipedia.org/wiki/Instruction_pipelining
rather than trying to do all the steps in parallel.
--
Bruce Momjian <bruce@momjian.us> https://momjian.us
EDB https://enterprisedb.com
Do not let urgent matters crowd out time for investment in the future.
On Tue, Aug 12, 2025 at 10:40 PM Bruce Momjian <bruce@momjian.us> wrote:
On Mon, Aug 11, 2025 at 10:15:41AM +0530, Amit Kapila wrote:
Hi,
Background and Motivation
-------------------------------------
In high-throughput systems, where hundreds of sessions generate data
on the publisher, the subscriber's apply process often becomes a
bottleneck due to the single apply worker model. While users can
mitigate this by creating multiple publication-subscription pairs,
this approach has scalability and usability limitations.Currently, PostgreSQL supports parallel apply only for large streaming
transactions (streaming=parallel). This proposal aims to extend
parallelism to non-streaming transactions, thereby improving
replication performance in workloads dominated by smaller, frequent
transactions.I thought the approach for improving WAL apply speed, for both binary
and logical, was pipelining:https://en.wikipedia.org/wiki/Instruction_pipelining
rather than trying to do all the steps in parallel.
It is not clear to me how the speed for a mix of dependent and
independent transactions can be improved using the technique you
shared as we still need to follow the commit order for dependent
transactions. Can you please elaborate more on the high-level idea of
how this technique can be used to improve speed for applying logical
WAL records?
--
With Regards,
Amit Kapila.
On Tue, Aug 12, 2025 at 9:22 PM Константин Книжник <knizhnik@garret.ru> wrote:
Hi,
This is something similar to what I have in mind when starting my experiments with LR apply speed improvements. I think that maintaining a full (RelationId, ReplicaIdentity) hash may be too expensive - there can be hundreds of active transactions updating millions of rows.
I thought about something like a bloom filter. But frankly speaking I didn't go far in thinking about all implementation details. Your proposal is much more concrete.
We can surely investigate a different hash_key if that works for all cases.
But I decided to implement first approach with prefetch, which is much more simple, similar with prefetching currently used for physical replication and still provide quite significant improvement:
/messages/by-id/84ed36b8-7d06-4945-9a6b-3826b3f999a6@garret.ruThere is one thing which I do not completely understand with your proposal: do you assume that LR walsender at publisher will use reorder buffer to "serialize" transactions
or you assume that streaming mode will be used (now it is possible to enforce parallel apply of short transactions using `debug_logical_replication_streaming`)?
The current proposal is based on reorderbuffer serializing
transactions as we are doing now.
It seems to be senseless to spend time and memory trying to serialize transactions at the publisher if we in any case want to apply them in parallel at subscriber.
But then there is another problem: at publisher there can be hundreds of concurrent active transactions (limited only by `max_connections`) which records are intermixed in WAL.
If we try to apply them concurrently at subscriber, we need a corresponding number of parallel apply workers. But usually the number of such workers is less than 10 (and default is 2).
So looks like we need to serialize transactions at subscriber side.Assume that there are 100 concurrent transactions T1..T100, i.e. before first COMMIT record there are mixed records of 100 transactions.
And there are just two parallel apply workers W1 and W2. Main LR apply worker with send T1 record to W1, T2 record to W2 and ... there are not more vacant workers.
It has either to spawn additional ones, but it is not always possible because total number of background workers is limited.
Either serialize all other transactions in memory or on disk, until it reaches COMMIT of T1 or T2.
I afraid that such serialization will eliminate any advantages of parallel apply.
Right, I also think so and we will probably end up doing something
what we are doing now in publisher.
Certainly if we do reordering of transactions at publisher side, then there is no such problem. Subscriber receives all records for T1, then all records for T2, ... If there are no more vacant workers, it can just wait until any of this transactions is completed. But I am afraid that in this case the reorder buffer at the publisher will be a bottleneck.
This is a point to investigate if we observe so. But till now in our
internal testing parallel apply gives good improvement in pgbench kind
of workload.
--
With Regards,
Amit Kapila.
On Monday, August 11, 2025 12:46 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
Background and Motivation
-------------------------------------
In high-throughput systems, where hundreds of sessions generate data
on the publisher, the subscriber's apply process often becomes a
bottleneck due to the single apply worker model. While users can
mitigate this by creating multiple publication-subscription pairs,
this approach has scalability and usability limitations.Currently, PostgreSQL supports parallel apply only for large streaming
transactions (streaming=parallel). This proposal aims to extend
parallelism to non-streaming transactions, thereby improving
replication performance in workloads dominated by smaller, frequent
transactions.Design Overview
------------------------
To safely parallelize non-streaming transactions, we must ensure that
transaction dependencies are respected to avoid failures and
deadlocks. Consider the following scenarios to understand it better:
(a) Transaction failures: Say, if we insert a row in the first
transaction and update it in the second transaction on the publisher,
then allowing the subscriber to apply both in parallel can lead to
failure in the update; (b) Deadlocks - allowing transactions that
update the same set of rows in a table in the opposite order in
parallel can lead to deadlocks.The core idea is that the leader apply worker ensures the following:
a. Identifies dependencies between transactions. b. Coordinates
parallel workers to apply independent transactions concurrently. c.
Ensures correct ordering for dependent transactions.Dependency Detection
--------------------------------
1. Basic Dependency Tracking: Maintain a hash table keyed by
(RelationId, ReplicaIdentity) with the value as the transaction XID.
Before dispatching a change to a parallel worker, the leader checks
for existing entries: (a) If no match: add the entry and proceed; (b)
If match: instruct the worker to wait until the dependent transaction
completes.2. Unique Keys
In addition to RI, track unique keys to detect conflicts. Example:
CREATE TABLE tab1(a INT PRIMARY KEY, b INT UNIQUE);
Transactions on publisher:
Txn1: INSERT (1,1)
Txn2: INSERT (2,2)
Txn3: DELETE (2,2)
Txn4: UPDATE (1,1) → (1,2)If Txn4 is applied before Txn2 and Txn3, it will fail due to a unique
constraint violation. To prevent this, track both RI and unique keys
in the hash table. Compare keys of both old and new tuples to detect
dependencies. Then old_tuple's RI needs to be compared, and new
tuple's, both unique key and RI (new tuple's RI is required to detect
some prior insertion with the same key) needs to be compared with
existing hash table entries to identify transaction dependency.3. Foreign Keys
Consider FK constraints between tables. Example:TABLE owner(user_id INT PRIMARY KEY);
TABLE car(car_name TEXT, user_id INT REFERENCES owner);Transactions:
Txn1: INSERT INTO owner(1)
Txn2: INSERT INTO car('bz', 1)Applying Txn2 before Txn1 will fail. To avoid this, check if FK values
in new tuples match any RI or unique key in the hash table. If
matched, treat the transaction as dependent.4. Triggers and Constraints
For the initial version, exclude tables with user-defined triggers or
constraints from parallel apply due to complexity in dependency
detection. We may need some parallel-apply-safe marking to allow this.Replication Progress Tracking
-----------------------------------------
Parallel apply introduces out-of-order commit application,
complicating replication progress tracking. To handle restarts and
ensure consistency:Track Three Key Metrics:
lowest_remote_lsn: Starting point for applying transactions.
highest_remote_lsn: Highest LSN that has been applied.
list_remote_lsn: List of commit LSNs applied between the lowest and highest.Mechanism:
Store these in ReplicationState: lowest_remote_lsn,
highest_remote_lsn, list_remote_lsn. Flush these to disk during
checkpoints similar to CheckPointReplicationOrigin.After Restart, Start from lowest_remote_lsn and for each transaction,
if its commit LSN is in list_remote_lsn, skip it, otherwise, apply it.
Once commit LSN > highest_remote_lsn, apply without checking the list.During apply, the leader maintains list_in_progress_xacts in the
increasing commit order. On commit, update highest_remote_lsn. If
commit LSN matches the first in-progress xact of
list_in_progress_xacts, update lowest_remote_lsn, otherwise, add to
list_remote_lsn. After commit, also remove it from the
list_in_progress_xacts. We need to clean up entries below
lowest_remote_lsn in list_remote_lsn while updating its value.To illustrate how this mechanism works, consider the following four
transactions:Transaction ID Commit LSN
501 1000
502 1100
503 1200
504 1300Assume:
Transactions 501 and 502 take longer to apply whereas transactions 503
and 504 finish earlier. Parallel apply workers are assigned as
follows:
pa-1 → 501
pa-2 → 502
pa-3 → 503
pa-4 → 504Initial state: list_in_progress_xacts = [501, 502, 503, 504]
Step 1: Transaction 503 commits first and in RecordTransactionCommit,
it updates highest_remote_lsn to 1200. In apply_handle_commit, since
503 is not the first in list_in_progress_xacts, add 1200 to
list_remote_lsn. Remove 503 from list_in_progress_xacts.
Step 2: Transaction 504 commits, Update highest_remote_lsn to 1300.
Add 1300 to list_remote_lsn. Remove 504 from list_in_progress_xacts.
ReplicationState now:
lowest_remote_lsn = 0
list_remote_lsn = [1200, 1300]
highest_remote_lsn = 1300
list_in_progress_xacts = [501, 502]Step 3: Transaction 501 commits. Since 501 is now the first in
list_in_progress_xacts, update lowest_remote_lsn to 1000. Remove 501
from list_in_progress_xacts. Clean up list_remote_lsn to remove
entries < lowest_remote_lsn (none in this case).
ReplicationState now:
lowest_remote_lsn = 1000
list_remote_lsn = [1200, 1300]
highest_remote_lsn = 1300
list_in_progress_xacts = [502]Step 4: System crash and restart
Upon restart, Start replication from lowest_remote_lsn = 1000. First
transaction encountered is 502, since it is not present in
list_remote_lsn, apply it. As transactions 503 and 504 are present in
list_remote_lsn, we skip them. Note that each transaction's
end_lsn/commit_lsn has to be compared which the apply worker receives
along with the first transaction command BEGIN. This ensures
correctness and avoids duplicate application of already committed
transactions.Upon restart, start replication from lowest_remote_lsn = 1000. First
transaction encountered is 502 with commit LSN 1100, since it is not
present in list_remote_lsn, apply it. As transactions 503 and 504's
respective commit LSNs [1200, 1300] are present in list_remote_lsn, we
skip them. This ensures correctness and avoids duplicate application
of already committed transactions.Now, it is possible that some users may want to parallelize the
transaction but still want to maintain commit order because they don't
explicitly annotate FK, PK for columns but maintain the integrity via
application. So, in such cases as we won't be able to detect
transaction dependencies, it would be better to allow out-of-order
commits optionally.Thoughts?
Here is the initial POC patch for this idea.
The basic implementation is outlined below. Please note that there are several
TODO items remaining, which we are actively working on; these are also detailed
further down.
The leader worker assigns each non-streaming transaction to a parallel apply
worker. Before dispatching changes to a parallel worker, the leader verifies if
the current modification affects the same row (identitied by replica identity
key) as another ongoing transaction. If so, the leader sends a list of dependent
transaction IDs to the parallel worker, indicating that the parallel apply
worker must wait for these transactions to commit before proceeding. Parallel
apply workers do not maintain commit order; transactions can be committed at any
time provided there are no dependencies.
Each parallel apply worker records the local end LSN of the transaction it
applies in shared memory. Subsequently, the leader gathers these local end LSNs
and logs them in the local 'lsn_mapping' for verifying whether they have been
flushed to disk (following the logic in get_flush_position()).
If no parallel apply worker is available, the leader will apply the transaction
independently.
For further details, please refer to the following:
The leader maintains a local hash table, using the remote change's replica
identity column values and relid as keys, with remote transaction IDs as values.
Before sending changes to the parallel apply worker, the leader computes a hash
using RI key values and the relid of the current change to search the hash
table. If an existing entry is found, the leader tells the parallel worker
to wait for the remote xid in the hash entry, after which the leader updates the
hash entry with the current xid.
If the remote relation lacks a replica identity (RI), it indicates that only
INSERT can be replicated for this table. In such cases, the leader skips
dependency checks, allowing the parallel apply worker to proceed with applying
changes without delay. This is because the only potential conflict could happen
is related to the local unique key or foreign key, which that is yet to be
implemented (see TODO - dependency on local unique key, foreign key.).
In cases of TRUNCATE or remote schema changes affecting the entire table, the
leader retrieves all remote xids touching the same table (via sequential scans
of the hash table) and tells the parallel worker to wait for those transactions
to commit.
Hash entries are cleaned up once the transaction corresponding to the remote xid
in the entry has been committed. Clean-up typically occurs when collecting the
flush position of each transaction, but is forced if the hash table exceeds a
set threshold.
If a transaction is relied upon by others, the leader adds its xid to a shared
hash table. The shared hash table entry is cleared by the parallel apply worker
upon completing the transaction. Workers needing to wait for a transaction check
the shared hash table entry; if present, they lock the transaction ID (using
pa_lock_transaction). If absent, it indicates the transaction has been
committed, negating the need to wait.
--
TODO - replication progress tracking for out of order commit.
TODO - dependency on local unique key, foreign key.
TODO - restrict user defined trigger and constraints.
TODO - enable the parallel apply optionally
TODO - potential improvement to use shared hash table for tracking dependencies.
--
The above TODO items are also included in the initial email[1]/messages/by-id/CAA4eK1+SEus_6vQay9TF_r4ow+E-Q7LYNLfsD78HaOsLSgppxQ@mail.gmail.com.
[1]: /messages/by-id/CAA4eK1+SEus_6vQay9TF_r4ow+E-Q7LYNLfsD78HaOsLSgppxQ@mail.gmail.com
Best Regards,
Hou zj
Attachments:
v1-0001-Parallel-apply-non-streaming-transactions.patchapplication/octet-stream; name=v1-0001-Parallel-apply-non-streaming-transactions.patchDownload+1497-91
On Wed, Aug 13, 2025 at 09:50:27AM +0530, Amit Kapila wrote:
On Tue, Aug 12, 2025 at 10:40 PM Bruce Momjian <bruce@momjian.us> wrote:
Currently, PostgreSQL supports parallel apply only for large streaming
transactions (streaming=parallel). This proposal aims to extend
parallelism to non-streaming transactions, thereby improving
replication performance in workloads dominated by smaller, frequent
transactions.I thought the approach for improving WAL apply speed, for both binary
and logical, was pipelining:https://en.wikipedia.org/wiki/Instruction_pipelining
rather than trying to do all the steps in parallel.
It is not clear to me how the speed for a mix of dependent and
independent transactions can be improved using the technique you
shared as we still need to follow the commit order for dependent
transactions. Can you please elaborate more on the high-level idea of
how this technique can be used to improve speed for applying logical
WAL records?
This blog post from February I think has some good ideas for binary
replication pipelining:
https://www.cybertec-postgresql.com/en/end-of-the-road-for-postgresql-streaming-replication/
Surprisingly, what could be considered the actual replay work
seems to be a minority of the total workload. The largest parts
involve reading WAL and decoding page references from it, followed
by looking up those pages in the cache, and pinning them so they
are not evicted while in use. All of this work could be performed
concurrently with the replay loop. For example, a separate
read-ahead process could handle these tasks, ensuring that the
replay process receives a queue of transaction log records with
associated cache references already pinned, ready for application.
The beauty of the approach is that there is no need for dependency
tracking. I have CC'ed the author, Ants Aasma.
--
Bruce Momjian <bruce@momjian.us> https://momjian.us
EDB https://enterprisedb.com
Do not let urgent matters crowd out time for investment in the future.
On Wed, Aug 13, 2025 at 8:57 PM Bruce Momjian <bruce@momjian.us> wrote:
On Wed, Aug 13, 2025 at 09:50:27AM +0530, Amit Kapila wrote:
On Tue, Aug 12, 2025 at 10:40 PM Bruce Momjian <bruce@momjian.us> wrote:
Currently, PostgreSQL supports parallel apply only for large streaming
transactions (streaming=parallel). This proposal aims to extend
parallelism to non-streaming transactions, thereby improving
replication performance in workloads dominated by smaller, frequent
transactions.I thought the approach for improving WAL apply speed, for both binary
and logical, was pipelining:https://en.wikipedia.org/wiki/Instruction_pipelining
rather than trying to do all the steps in parallel.
It is not clear to me how the speed for a mix of dependent and
independent transactions can be improved using the technique you
shared as we still need to follow the commit order for dependent
transactions. Can you please elaborate more on the high-level idea of
how this technique can be used to improve speed for applying logical
WAL records?This blog post from February I think has some good ideas for binary
replication pipelining:https://www.cybertec-postgresql.com/en/end-of-the-road-for-postgresql-streaming-replication/
Surprisingly, what could be considered the actual replay work
seems to be a minority of the total workload.
This is the biggest difference between physical and logical WAL apply.
In the case of logical WAL, the actual replay is the majority of the
work. We don't need to read WAL or decode it or find/pin the
appropriate pages to apply. Here, you can consider it is almost
equivalent to how primary receives insert/update/delete from the user.
Firstly, the idea shared in the blog is not applicable for logical
replication and even if we try to somehow map with logical apply, I
don't see how or why it will be able to match up the speed of applying
with multiple workers in case of logical replication. Also, note that
dependency calculation is not as tricky for logical replication as we
can easily retrieve such information from logical WAL records in most
cases.
--
With Regards,
Amit Kapila.
On Wed, Aug 13, 2025 at 4:17 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:
Here is the initial POC patch for this idea.
Thank you Hou-san for the patch.
I did some performance benchmarking for the patch and overall, the
results show substantial performance improvements.
Please find the details as follows:
Source code:
----------------
pgHead (572c0f1b0e) and v1-0001 patch
Setup:
---------
Pub --> Sub
- Two nodes created in pub-sub logical replication setup.
- Both nodes have the same set of pgbench tables created with scale=300.
- The sub node is subscribed to all the changes from the pub node's
pgbench tables.
Workload Run:
--------------------
- Disable the subscription on Sub node
- Run default pgbench(read-write) only on Pub node with #clients=40
and run duration=10 minutes
- Enable the subscription on Sub once pgbench completes and then
measure time taken in replication.
~~~
Test-01: Measure Replication lag
----------------------------------------
Observations:
---------------
- Replication time improved as the number of parallel workers
increased with the patch.
- On pgHead, replicating a 10-minute publisher workload took ~46 minutes.
- With just 2 parallel workers (default), replication time was cut in
half, and with 8 workers it completed in ~13 minutes(3.5x faster).
- With 16 parallel workers, achieved ~3.7x speedup over pgHead.
- With 32 workers, performance gains plateaued slightly, likely due
to more workers running on the machine and work done parallelly is not
that high to see further improvements.
Detailed Result:
-----------------
Case Time_taken_in_replication(sec) rep_time_in_minutes
faster_than_head
1. pgHead 2760.791 46.01318333 -
2. patched_#worker=2 1463.853 24.3975 1.88 times
3. patched_#worker=4 1031.376 17.1896 2.68 times
4. patched_#worker=8 781.007 13.0168 3.54 times
5. patched_#worker=16 741.108 12.3518 3.73 times
6. patched_#worker=32 787.203 13.1201 3.51 times
~~~~
Test-02: Measure number of transactions parallelized
-----------------------------------------------------
- Used a top up patch to LOG the number of transactions applied by
parallel worker, applied by leader, and are depended.
- The LOG output e.g. -
```
LOG: parallelized_nxact: 11497254 dependent_nxact: 0 leader_applied_nxact: 600
```
- parallelized_nxact: gives the number of parallelized transactions
- dependent_nxact: gives the dependent transactions
- leader_applied_nxact: gives the transactions applied by leader worker
(the required top-up v1-002 patch is attached.)
Observations:
----------------
- With 4 to 8 parallel workers, ~80%-98% transactions are parallelized
- As the number of workers increased, the parallelized percentage
increased and reached 99.99% with 32 workers.
Detailed Result:
-----------------
case1: #parallel_workers = 2(default)
#total_pgbench_txns = 24745648
parallelized_nxact = 14439480 (58.35%)
dependent_nxact = 16 (0.00006%)
leader_applied_nxact = 10306153 (41.64%)
case2: #parallel_workers = 4
#total_pgbench_txns = 24776108
parallelized_nxact = 19666593 (79.37%)
dependent_nxact = 212 (0.0008%)
leader_applied_nxact = 5109304 (20.62%)
case3: #parallel_workers = 8
#total_pgbench_txns = 24821333
parallelized_nxact = 24397431 (98.29%)
dependent_nxact = 282 (0.001%)
leader_applied_nxact = 423621 (1.71%)
case4: #parallel_workers = 16
#total_pgbench_txns = 24938255
parallelized_nxact = 24937754 (99.99%)
dependent_nxact = 142 (0.0005%)
leader_applied_nxact = 360 (0.0014%)
case5: #parallel_workers = 32
#total_pgbench_txns = 24769474
parallelized_nxact = 24769135 (99.99%)
dependent_nxact = 312 (0.0013%)
leader_applied_nxact = 28 (0.0001%)
~~~~~
The scripts used for above tests are attached.
Next, I plan to extend the testing to larger workloads by running
pgbench for 20–30 minutes.
We will also benchmark performance across different workload types to
evaluate the improvements once the patch has matured further.
--
Thanks,
Nisha
Attachments:
v1-0002-Add-some-simple-statistics.txttext/plain; charset=US-ASCII; name=v1-0002-Add-some-simple-statistics.txtDownload+21-1
v1_pa_pub-sub_setup.shtext/x-sh; charset=US-ASCII; name=v1_pa_pub-sub_setup.shDownload
v1_pa_pub-sub_measure.shtext/x-sh; charset=US-ASCII; name=v1_pa_pub-sub_measure.shDownload
On 18/08/2025 9:56 AM, Nisha Moond wrote:
On Wed, Aug 13, 2025 at 4:17 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:Here is the initial POC patch for this idea.
Thank you Hou-san for the patch.
I did some performance benchmarking for the patch and overall, the
results show substantial performance improvements.
Please find the details as follows:Source code:
----------------
pgHead (572c0f1b0e) and v1-0001 patchSetup:
---------
Pub --> Sub
- Two nodes created in pub-sub logical replication setup.
- Both nodes have the same set of pgbench tables created with scale=300.
- The sub node is subscribed to all the changes from the pub node's
pgbench tables.Workload Run:
--------------------
- Disable the subscription on Sub node
- Run default pgbench(read-write) only on Pub node with #clients=40
and run duration=10 minutes
- Enable the subscription on Sub once pgbench completes and then
measure time taken in replication.
~~~Test-01: Measure Replication lag
----------------------------------------
Observations:
---------------
- Replication time improved as the number of parallel workers
increased with the patch.
- On pgHead, replicating a 10-minute publisher workload took ~46 minutes.
- With just 2 parallel workers (default), replication time was cut in
half, and with 8 workers it completed in ~13 minutes(3.5x faster).
- With 16 parallel workers, achieved ~3.7x speedup over pgHead.
- With 32 workers, performance gains plateaued slightly, likely due
to more workers running on the machine and work done parallelly is not
that high to see further improvements.Detailed Result:
-----------------
Case Time_taken_in_replication(sec) rep_time_in_minutes
faster_than_head
1. pgHead 2760.791 46.01318333 -
2. patched_#worker=2 1463.853 24.3975 1.88 times
3. patched_#worker=4 1031.376 17.1896 2.68 times
4. patched_#worker=8 781.007 13.0168 3.54 times
5. patched_#worker=16 741.108 12.3518 3.73 times
6. patched_#worker=32 787.203 13.1201 3.51 times
~~~~Test-02: Measure number of transactions parallelized
-----------------------------------------------------
- Used a top up patch to LOG the number of transactions applied by
parallel worker, applied by leader, and are depended.
- The LOG output e.g. -
```
LOG: parallelized_nxact: 11497254 dependent_nxact: 0 leader_applied_nxact: 600
```
- parallelized_nxact: gives the number of parallelized transactions
- dependent_nxact: gives the dependent transactions
- leader_applied_nxact: gives the transactions applied by leader worker
(the required top-up v1-002 patch is attached.)Observations:
----------------
- With 4 to 8 parallel workers, ~80%-98% transactions are parallelized
- As the number of workers increased, the parallelized percentage
increased and reached 99.99% with 32 workers.Detailed Result:
-----------------
case1: #parallel_workers = 2(default)
#total_pgbench_txns = 24745648
parallelized_nxact = 14439480 (58.35%)
dependent_nxact = 16 (0.00006%)
leader_applied_nxact = 10306153 (41.64%)case2: #parallel_workers = 4
#total_pgbench_txns = 24776108
parallelized_nxact = 19666593 (79.37%)
dependent_nxact = 212 (0.0008%)
leader_applied_nxact = 5109304 (20.62%)case3: #parallel_workers = 8
#total_pgbench_txns = 24821333
parallelized_nxact = 24397431 (98.29%)
dependent_nxact = 282 (0.001%)
leader_applied_nxact = 423621 (1.71%)case4: #parallel_workers = 16
#total_pgbench_txns = 24938255
parallelized_nxact = 24937754 (99.99%)
dependent_nxact = 142 (0.0005%)
leader_applied_nxact = 360 (0.0014%)case5: #parallel_workers = 32
#total_pgbench_txns = 24769474
parallelized_nxact = 24769135 (99.99%)
dependent_nxact = 312 (0.0013%)
leader_applied_nxact = 28 (0.0001%)~~~~~
The scripts used for above tests are attached.Next, I plan to extend the testing to larger workloads by running
pgbench for 20–30 minutes.
We will also benchmark performance across different workload types to
evaluate the improvements once the patch has matured further.--
Thanks,
Nisha
I also did some benchmarking of the proposed parallel apply patch and
compare it with my prewarming approach.
And parallel apply is significantly more efficient than prefetch (it is
expected).
So I had two tests (more details here):
/messages/by-id/84ed36b8-7d06-4945-9a6b-3826b3f999a6@garret.ru
One is performing random updates and another - inserts with random key.
I stop subscriber, apply workload at publisher during 100 seconds and
then measure how long time it will take subscriber to caught up.
update test (with 8 parallel apply workers):
master: 8:30 min
prefetch: 2:05 min
parallel apply: 1:30 min
insert test (with 8 parallel apply workers):
master: 9:20 min
prefetch: 3:08 min
parallel apply: 1:54 min
On Mon, Aug 18, 2025 at 8:20 PM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
On 18/08/2025 9:56 AM, Nisha Moond wrote:
On Wed, Aug 13, 2025 at 4:17 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:Here is the initial POC patch for this idea.
Thank you Hou-san for the patch.
I did some performance benchmarking for the patch and overall, the
results show substantial performance improvements.
Please find the details as follows:Source code:
----------------
pgHead (572c0f1b0e) and v1-0001 patchSetup:
---------
Pub --> Sub
- Two nodes created in pub-sub logical replication setup.
- Both nodes have the same set of pgbench tables created with scale=300.
- The sub node is subscribed to all the changes from the pub node's
pgbench tables.Workload Run:
--------------------
- Disable the subscription on Sub node
- Run default pgbench(read-write) only on Pub node with #clients=40
and run duration=10 minutes
- Enable the subscription on Sub once pgbench completes and then
measure time taken in replication.
~~~Test-01: Measure Replication lag
----------------------------------------
Observations:
---------------
- Replication time improved as the number of parallel workers
increased with the patch.
- On pgHead, replicating a 10-minute publisher workload took ~46 minutes.
- With just 2 parallel workers (default), replication time was cut in
half, and with 8 workers it completed in ~13 minutes(3.5x faster).
- With 16 parallel workers, achieved ~3.7x speedup over pgHead.
- With 32 workers, performance gains plateaued slightly, likely due
to more workers running on the machine and work done parallelly is not
that high to see further improvements.Detailed Result:
-----------------
Case Time_taken_in_replication(sec) rep_time_in_minutes
faster_than_head
1. pgHead 2760.791 46.01318333 -
2. patched_#worker=2 1463.853 24.3975 1.88 times
3. patched_#worker=4 1031.376 17.1896 2.68 times
4. patched_#worker=8 781.007 13.0168 3.54 times
5. patched_#worker=16 741.108 12.3518 3.73 times
6. patched_#worker=32 787.203 13.1201 3.51 times
~~~~Test-02: Measure number of transactions parallelized
-----------------------------------------------------
- Used a top up patch to LOG the number of transactions applied by
parallel worker, applied by leader, and are depended.
- The LOG output e.g. -
```
LOG: parallelized_nxact: 11497254 dependent_nxact: 0 leader_applied_nxact: 600
```
- parallelized_nxact: gives the number of parallelized transactions
- dependent_nxact: gives the dependent transactions
- leader_applied_nxact: gives the transactions applied by leader worker
(the required top-up v1-002 patch is attached.)Observations:
----------------
- With 4 to 8 parallel workers, ~80%-98% transactions are parallelized
- As the number of workers increased, the parallelized percentage
increased and reached 99.99% with 32 workers.Detailed Result:
-----------------
case1: #parallel_workers = 2(default)
#total_pgbench_txns = 24745648
parallelized_nxact = 14439480 (58.35%)
dependent_nxact = 16 (0.00006%)
leader_applied_nxact = 10306153 (41.64%)case2: #parallel_workers = 4
#total_pgbench_txns = 24776108
parallelized_nxact = 19666593 (79.37%)
dependent_nxact = 212 (0.0008%)
leader_applied_nxact = 5109304 (20.62%)case3: #parallel_workers = 8
#total_pgbench_txns = 24821333
parallelized_nxact = 24397431 (98.29%)
dependent_nxact = 282 (0.001%)
leader_applied_nxact = 423621 (1.71%)case4: #parallel_workers = 16
#total_pgbench_txns = 24938255
parallelized_nxact = 24937754 (99.99%)
dependent_nxact = 142 (0.0005%)
leader_applied_nxact = 360 (0.0014%)case5: #parallel_workers = 32
#total_pgbench_txns = 24769474
parallelized_nxact = 24769135 (99.99%)
dependent_nxact = 312 (0.0013%)
leader_applied_nxact = 28 (0.0001%)~~~~~
The scripts used for above tests are attached.Next, I plan to extend the testing to larger workloads by running
pgbench for 20–30 minutes.
We will also benchmark performance across different workload types to
evaluate the improvements once the patch has matured further.--
Thanks,
NishaI also did some benchmarking of the proposed parallel apply patch and
compare it with my prewarming approach.
And parallel apply is significantly more efficient than prefetch (it is
expected).
Thanks to you and Nisha for doing some preliminary performance
testing, the results are really encouraging (more than 3 to 4 times
improvement in multiple workloads). I hope we keep making progress on
this patch and make it ready for the next release.
--
With Regards,
Amit Kapila.
Hi,
I ran tests to compare the performance of logical synchronous
replication with parallel-apply against physical synchronous
replication.
Highlights
===============
On pgHead:(current behavior)
- With synchronous physical replication set to remote_apply, the
Primary’s TPS drops by ~60% (≈2.5x slower than asynchronous).
- With synchronous logical replication set to remote_apply, the
Publisher’s TPS drops drastically by ~94% (≈16x slower than
asynchronous).
With proposed Parallel-Apply Patch(v1):
- Parallel apply significantly improves logical synchronous
replication performance by 5-6×.
- With 40 parallel workers on the subscriber, the Publisher achieves
30045.82 TPS, which is 5.5× faster than the no-patch case (5435.46
TPS).
- With the patch, the Publisher’s performance is only ~3x slower than
asynchronous, bringing it much closer to the physical replication
case.
Machine details
===============
Intel(R) Xeon(R) CPU E7-4890 v2 @ 2.80GHz CPU(s) :88 cores, - 503 GiB RAM
Source code:
===============
- pgHead(e9a31c0cc60) and v1 patch
Test-01: Physical replication:
======================
- To measure the physical synchronous replication performance on pgHead.
Setup & Workload:
-----------------
Primary --> Standby
- Two nodes created in physical (primary-standby) replication setup.
- Default pgbench (read-write) was run on the Primary with scale=300,
#clients=40, run duration=20 minutes.
- The TPS is measured with the synchronous_commit set as "off" vs
"remote_apply" on pgHead.
Results:
---------
synchronous_commit Primary_TPS regression
OFF 90466.57743 -
remote_apply(run1) 35848.6558 -60%
remote_apply(run2) 35306.25479 -61%
- on phHead, when synchronous_commit is set to "remote_apply" during
physical replication, the Primary experiences a 60–61% reduction in
TPS, which is ~2.5 times slower.
~~~
Test-02: Logical replication:
=====================
- To measure the logical synchronous replication performance on
pgHead and with parallel-apply patch.
Setup & Workload:
-----------------
Publisher --> Subscriber
- Two nodes created in logical (publisher-subscriber) replication setup.
- Default pgbench (read-write) was run on the Pub with scale=300,
#clients=40, run duration=20 minutes.
- The TPS is measured on pgHead and with the parallel-apply v1 patch.
- The number of parallel workers was varied as 2, 4, 8, 16, 32, 40.
case-01: pgHead
-------------------
Results:
synchronous_commit Primary_TPS regression
pgHead(OFF) 89138.14626 --
pgHead(remote_apply) 5435.464525 -94%
- By default(pgHead), the synchronous logical replication sees a 94%
drop in TPS which is -
a) 16.4 times slower than the logical async case and,
b) 6.6 times slower than physical sync replication case.
case-02: patched
---------------------
- synchronous_commit = 'remote_apply'
- measured the performance by varying #parallel workers as 2, 4, 8, 16, 32, 40
Results:
#workers Primary_TPS Improvement_with_patch faster_than_no-patch
2 9679.077736 78% 1.78x
4 14329.64073 164% 2.64x
8 21832.04285 302% 4.02x
16 27676.47085 409% 5.09x
32 29718.40090 447% 5.47x
40 30045.82365 453% 5.53x
- The TPS on the publisher improves significantly as the number of
parallel workers increases.
- At 40 workers, the TPS reaches 30045.82, which is about 5.5x higher
than the no-patch case..
- With 40 parallel workers, logical sync replication is only about
1.2x slower than physical sync replication.
~~~
The scripts used for the tests are attached. We'll do tests with
larger data sets later and share results.
--
Thanks,
Nisha
Attachments:
On Mon, Aug 11, 2025 at 10:16 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
Hi,
Background and Motivation
-------------------------------------
In high-throughput systems, where hundreds of sessions generate data
on the publisher, the subscriber's apply process often becomes a
bottleneck due to the single apply worker model. While users can
mitigate this by creating multiple publication-subscription pairs,
this approach has scalability and usability limitations.Currently, PostgreSQL supports parallel apply only for large streaming
transactions (streaming=parallel). This proposal aims to extend
parallelism to non-streaming transactions, thereby improving
replication performance in workloads dominated by smaller, frequent
transactions.Design Overview
------------------------
To safely parallelize non-streaming transactions, we must ensure that
transaction dependencies are respected to avoid failures and
deadlocks. Consider the following scenarios to understand it better:
(a) Transaction failures: Say, if we insert a row in the first
transaction and update it in the second transaction on the publisher,
then allowing the subscriber to apply both in parallel can lead to
failure in the update; (b) Deadlocks - allowing transactions that
update the same set of rows in a table in the opposite order in
parallel can lead to deadlocks.The core idea is that the leader apply worker ensures the following:
a. Identifies dependencies between transactions. b. Coordinates
parallel workers to apply independent transactions concurrently. c.
Ensures correct ordering for dependent transactions.Dependency Detection
--------------------------------
1. Basic Dependency Tracking: Maintain a hash table keyed by
(RelationId, ReplicaIdentity) with the value as the transaction XID.
Before dispatching a change to a parallel worker, the leader checks
for existing entries: (a) If no match: add the entry and proceed; (b)
If match: instruct the worker to wait until the dependent transaction
completes.2. Unique Keys
In addition to RI, track unique keys to detect conflicts. Example:
CREATE TABLE tab1(a INT PRIMARY KEY, b INT UNIQUE);
Transactions on publisher:
Txn1: INSERT (1,1)
Txn2: INSERT (2,2)
Txn3: DELETE (2,2)
Txn4: UPDATE (1,1) → (1,2)If Txn4 is applied before Txn2 and Txn3, it will fail due to a unique
constraint violation. To prevent this, track both RI and unique keys
in the hash table. Compare keys of both old and new tuples to detect
dependencies. Then old_tuple's RI needs to be compared, and new
tuple's, both unique key and RI (new tuple's RI is required to detect
some prior insertion with the same key) needs to be compared with
existing hash table entries to identify transaction dependency.3. Foreign Keys
Consider FK constraints between tables. Example:TABLE owner(user_id INT PRIMARY KEY);
TABLE car(car_name TEXT, user_id INT REFERENCES owner);Transactions:
Txn1: INSERT INTO owner(1)
Txn2: INSERT INTO car('bz', 1)Applying Txn2 before Txn1 will fail. To avoid this, check if FK values
in new tuples match any RI or unique key in the hash table. If
matched, treat the transaction as dependent.4. Triggers and Constraints
For the initial version, exclude tables with user-defined triggers or
constraints from parallel apply due to complexity in dependency
detection. We may need some parallel-apply-safe marking to allow this.Replication Progress Tracking
-----------------------------------------
Parallel apply introduces out-of-order commit application,
complicating replication progress tracking. To handle restarts and
ensure consistency:Track Three Key Metrics:
lowest_remote_lsn: Starting point for applying transactions.
highest_remote_lsn: Highest LSN that has been applied.
list_remote_lsn: List of commit LSNs applied between the lowest and highest.Mechanism:
Store these in ReplicationState: lowest_remote_lsn,
highest_remote_lsn, list_remote_lsn. Flush these to disk during
checkpoints similar to CheckPointReplicationOrigin.After Restart, Start from lowest_remote_lsn and for each transaction,
if its commit LSN is in list_remote_lsn, skip it, otherwise, apply it.
Once commit LSN > highest_remote_lsn, apply without checking the list.During apply, the leader maintains list_in_progress_xacts in the
increasing commit order. On commit, update highest_remote_lsn. If
commit LSN matches the first in-progress xact of
list_in_progress_xacts, update lowest_remote_lsn, otherwise, add to
list_remote_lsn. After commit, also remove it from the
list_in_progress_xacts. We need to clean up entries below
lowest_remote_lsn in list_remote_lsn while updating its value.To illustrate how this mechanism works, consider the following four
transactions:Transaction ID Commit LSN
501 1000
502 1100
503 1200
504 1300Assume:
Transactions 501 and 502 take longer to apply whereas transactions 503
and 504 finish earlier. Parallel apply workers are assigned as
follows:
pa-1 → 501
pa-2 → 502
pa-3 → 503
pa-4 → 504Initial state: list_in_progress_xacts = [501, 502, 503, 504]
Step 1: Transaction 503 commits first and in RecordTransactionCommit,
it updates highest_remote_lsn to 1200. In apply_handle_commit, since
503 is not the first in list_in_progress_xacts, add 1200 to
list_remote_lsn. Remove 503 from list_in_progress_xacts.
Step 2: Transaction 504 commits, Update highest_remote_lsn to 1300.
Add 1300 to list_remote_lsn. Remove 504 from list_in_progress_xacts.
ReplicationState now:
lowest_remote_lsn = 0
list_remote_lsn = [1200, 1300]
highest_remote_lsn = 1300
list_in_progress_xacts = [501, 502]Step 3: Transaction 501 commits. Since 501 is now the first in
list_in_progress_xacts, update lowest_remote_lsn to 1000. Remove 501
from list_in_progress_xacts. Clean up list_remote_lsn to remove
entries < lowest_remote_lsn (none in this case).
ReplicationState now:
lowest_remote_lsn = 1000
list_remote_lsn = [1200, 1300]
highest_remote_lsn = 1300
list_in_progress_xacts = [502]Step 4: System crash and restart
Upon restart, Start replication from lowest_remote_lsn = 1000. First
transaction encountered is 502, since it is not present in
list_remote_lsn, apply it. As transactions 503 and 504 are present in
list_remote_lsn, we skip them. Note that each transaction's
end_lsn/commit_lsn has to be compared which the apply worker receives
along with the first transaction command BEGIN. This ensures
correctness and avoids duplicate application of already committed
transactions.Upon restart, start replication from lowest_remote_lsn = 1000. First
transaction encountered is 502 with commit LSN 1100, since it is not
present in list_remote_lsn, apply it. As transactions 503 and 504's
respective commit LSNs [1200, 1300] are present in list_remote_lsn, we
skip them. This ensures correctness and avoids duplicate application
of already committed transactions.Now, it is possible that some users may want to parallelize the
transaction but still want to maintain commit order because they don't
explicitly annotate FK, PK for columns but maintain the integrity via
application. So, in such cases as we won't be able to detect
transaction dependencies, it would be better to allow out-of-order
commits optionally.Thoughts?
+1 for the idea. So I see we already have the parallel apply workers
for the large streaming transaction so I am trying to think what
additional problem we need to solve here. IIUC we are actually
parallely applying the transaction which were actually running
parallel on the publisher and commits are actually applied in serial
order. Whereas now we are trying to parallel apply the small
transactions so we are not controlling the commit apply order at the
leader worker so we need extra handling of dependency and also we need
to track which transaction we need to apply and which we need to skip
after the restarts as well. Is that right?
I am reading the proposal and POC patch in more detail to get the
fundamentals of the design and will share my thoughts.
--
Regards,
Dilip Kumar
Google
On Fri, Sep 5, 2025 at 2:59 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
On Mon, Aug 11, 2025 at 10:16 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
+1 for the idea. So I see we already have the parallel apply workers
for the large streaming transaction so I am trying to think what
additional problem we need to solve here. IIUC we are actually
parallely applying the transaction which were actually running
parallel on the publisher and commits are actually applied in serial
order. Whereas now we are trying to parallel apply the small
transactions so we are not controlling the commit apply order at the
leader worker so we need extra handling of dependency and also we need
to track which transaction we need to apply and which we need to skip
after the restarts as well. Is that right?
Right.
I am reading the proposal and POC patch in more detail to get the
fundamentals of the design and will share my thoughts.
Thanks.
--
With Regards,
Amit Kapila.