Make default subscription streaming option as Parallel
Hi,
By default, currently streaming of in-progress transactions for
subscriptions is disabled. All transactions are fully decoded on the
publisher before being sent to the subscriber. This approach can lead
to increased latency and reduced performance, particularly under heavy
load. By default, we could enable the parallel streaming option for
subscriptions. By doing this, incoming changes will be directly
applied by one of the available parallel apply workers. This method
significantly improves the performance of commit operations.
I conducted a series of tests using logical replication, comparing SQL
execution times with streaming set to both parallel and off. The tests
varied the logical_decoding_work_mem setting and included the
following scenarios: a) Insert, b) Delete, c) Update, d) rollback 5%
records, e) rollback 10% records, f) rollback 20% records, g) rollback
50% records. I have written tap tests for the same, the attached files
can be copied to src/test/subscription/t and the
logical_decoding_work_mem configuration and streaming option in create
subscription command should be changed accordingly before running the
tests. The tests were executed 5 times and the average of them was
taken.
The execution time is in seconds.
Insert 5kk records
Logical Decoding mem | Parallel | off | % Improvement
-------------------------------|-------------|---------------|------------------------
64 KB | 37.304 | 69.465 | 46.298
256 KB | 36.327 | 70.671 | 48.597
64 MB | 41.173 | 69.228 | 40.526
Delete 5kk records
Logical Decoding mem | Parallel | off | % Improvement
-------------------------------|-------------|---------------|------------------------
64 KB | 42.322 | 69.404 | 39.021
256 KB | 43.250 | 66.973 | 35.422
64 MB | 44.183 | 67.873 | 34.903
Update 5kk records
Logical Decoding mem | Parallel | off | % Improvement
-------------------------------|-------------|---------------|------------------------
64 KB | 93.953 | 127.691 | 26.422
256 KB | 94.166 | 128.541 | 26.743
64 MB | 93.367 | 134.275 | 30.465
Rollback 05% records
Logical Decoding mem | Parallel | off | % Improvement
-------------------------------|-------------|---------------|------------------------
64 KB | 36.968 | 67.161 | 44.957
256 KB | 38.059 | 68.021 | 44.047
64 MB | 39.431 | 66.878 | 41.041
Rollback 10% records
Logical Decoding mem | Parallel | off | % Improvement
-------------------------------|-------------|---------------|------------------------
64 KB | 35.966 | 63.968 | 43.775
256 KB | 36.597 | 64.836 | 43.554
64 MB | 39.069 | 64.357 | 39.292
Rollback 20% records
Logical Decoding mem | Parallel | off | % Improvement
-------------------------------|-------------|---------------|------------------------
64 KB | 37.616 | 58.903 | 36.139
256 KB | 37.330 | 58.606 | 36.303
64 MB | 38.720 | 60.236 | 35.720
Rollback 50% records
Logical Decoding mem | Parallel | off | % Improvement
-------------------------------|-------------|---------------|------------------------
64 KB | 38.999 | 44.776 | 12.902
256 KB | 36.567 | 44.530 | 17.882
64 MB | 38.592 | 45.346 | 14.893
The machine configuration that was used is also attached.
The tests demonstrate a significant performance improvement when using
the parallel streaming option, insert shows 40-48 %improvement, delete
shows 34-39 %improvement, update shows 26-30 %improvement. In the case
of rollback the improvement is between 12-44%, the improvement
slightly reduces with larger amounts of data being rolled back in this
case. If there's a significant amount of data to roll back, the
performance of streaming in parallel may be comparable to or slightly
lower in some instances. However, this is acceptable since commit
operations are generally more frequent than rollback operations.
One key point to consider is that the lock on transaction objects will
be held for a longer duration when using streaming in parallel. This
occurs because the parallel apply worker initiates the transaction as
soon as streaming begins, maintaining the lock until the transaction
is fully completed. As a result, for long-running transactions, this
extended lock can hinder concurrent access that requires a lock.
Since there is a significant percentage improvement, we should make
the default subscription streaming option parallel. Attached patch has
the change for the same.
Thoughts?
All of these tests were conducted with both the publisher and
subscriber on the same host. I will perform additional tests with one
of the logical replication nodes on a different host and share the
results later.
Regards,
Vignesh
Attachments:
v1-0001-Make-default-value-for-susbcription-streaming-opt.patchapplication/octet-stream; name=v1-0001-Make-default-value-for-susbcription-streaming-opt.patchDownload
From e40ad2f41391add02e605979a600f8459956ee86 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Wed, 25 Sep 2024 14:42:20 +0530
Subject: [PATCH v1] Make default value for susbcription streaming option to
parallel.
Make default value for susbcription streaming option to parallel.
---
doc/src/sgml/ref/create_subscription.sgml | 19 ++++++++---------
src/backend/commands/subscriptioncmds.c | 2 +-
src/bin/pg_dump/pg_dump.c | 2 ++
src/bin/pg_dump/t/002_pg_dump.pl | 6 +++---
src/test/regress/expected/subscription.out | 24 +++++++++++-----------
5 files changed, 27 insertions(+), 26 deletions(-)
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 740b7d9421..2f9b769de2 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -271,9 +271,13 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
<listitem>
<para>
Specifies whether to enable streaming of in-progress transactions
- for this subscription. The default value is <literal>off</literal>,
- meaning all transactions are fully decoded on the publisher and only
- then sent to the subscriber as a whole.
+ for this subscription. The default value is <literal>parallel</literal>,
+ meaning incoming changes are directly applied via one of the parallel
+ apply workers, if available. If no parallel apply worker is free to
+ handle streaming transactions then the changes are written to
+ temporary files and applied after the transaction is committed. Note
+ that if an error happens in a parallel apply worker, the finish LSN
+ of the remote transaction might not be reported in the server log.
</para>
<para>
@@ -283,13 +287,8 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</para>
<para>
- If set to <literal>parallel</literal>, incoming changes are directly
- applied via one of the parallel apply workers, if available. If no
- parallel apply worker is free to handle streaming transactions then
- the changes are written to temporary files and applied after the
- transaction is committed. Note that if an error happens in a
- parallel apply worker, the finish LSN of the remote transaction
- might not be reported in the server log.
+ If set to <literal>off</literal>, all transactions are fully decoded
+ on the publisher and only then sent to the subscriber as a whole.
</para>
</listitem>
</varlistentry>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 02ccc636b8..0a7a618855 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -151,7 +151,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
if (IsSet(supported_opts, SUBOPT_BINARY))
opts->binary = false;
if (IsSet(supported_opts, SUBOPT_STREAMING))
- opts->streaming = LOGICALREP_STREAM_OFF;
+ opts->streaming = LOGICALREP_STREAM_PARALLEL;
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
opts->twophase = false;
if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 130b80775d..047b914a25 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5232,6 +5232,8 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBufferStr(query, ", streaming = on");
else if (strcmp(subinfo->substream, "p") == 0)
appendPQExpBufferStr(query, ", streaming = parallel");
+ else
+ appendPQExpBufferStr(query, ", streaming = off");
if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
appendPQExpBufferStr(query, ", two_phase = on");
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index ab6c830491..7aec965cc8 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -2992,7 +2992,7 @@ my %tests = (
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
WITH (connect = false);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub1');\E
+ \QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub1', streaming = parallel);\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
@@ -3003,7 +3003,7 @@ my %tests = (
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
WITH (connect = false, origin = none);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', origin = none);\E
+ \QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', streaming = parallel, origin = none);\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
@@ -3014,7 +3014,7 @@ my %tests = (
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
WITH (connect = false, origin = any);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3');\E
+ \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = parallel);\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 17d48b1685..1443e1d929 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -119,7 +119,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
@@ -127,7 +127,7 @@ ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub3;
@@ -148,7 +148,7 @@ ERROR: invalid connection string syntax: missing "=" after "foobar" in connecti
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -160,7 +160,7 @@ ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -179,7 +179,7 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345
(1 row)
-- ok - with lsn = NONE
@@ -191,7 +191,7 @@ ERROR: invalid WAL location (LSN): 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
BEGIN;
@@ -226,7 +226,7 @@ HINT: Available values: local, remote_write, remote_apply, on, off.
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0
(1 row)
-- rename back to keep the rest simple
@@ -258,7 +258,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
@@ -267,7 +267,7 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -374,7 +374,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- we can alter streaming when two_phase enabled
@@ -412,7 +412,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
@@ -420,7 +420,7 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
--
2.34.1
Dear Vignesh,
One key point to consider is that the lock on transaction objects will
be held for a longer duration when using streaming in parallel. This
occurs because the parallel apply worker initiates the transaction as
soon as streaming begins, maintaining the lock until the transaction
is fully completed. As a result, for long-running transactions, this
extended lock can hinder concurrent access that requires a lock.
So, the current default is the most conservative and can run anywhere; your
proposal is a bit optimistic, right? Since long transactions should be avoided
in any case, and everyone knows it, I agree with your point.
One comment for your patch;
Shouldn't we add a streaming=off case for pg_dump test? You added lines but no one
passes it.
Best regards,
Hayato Kuroda
FUJITSU LIMITED
On Mon, 7 Oct 2024 at 12:26, Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
Dear Vignesh,
One key point to consider is that the lock on transaction objects will
be held for a longer duration when using streaming in parallel. This
occurs because the parallel apply worker initiates the transaction as
soon as streaming begins, maintaining the lock until the transaction
is fully completed. As a result, for long-running transactions, this
extended lock can hinder concurrent access that requires a lock.So, the current default is the most conservative and can run anywhere; your
proposal is a bit optimistic, right?
Yes, that is correct.
One comment for your patch;
Shouldn't we add a streaming=off case for pg_dump test? You added lines but no one
passes it.
Update existing pg_dump tests to cover the streaming options, the
attached patch has the changes for the same.
Regards,
Vignesh
Attachments:
v2-0001-Make-default-value-for-susbcription-streaming-opt.patchapplication/octet-stream; name=v2-0001-Make-default-value-for-susbcription-streaming-opt.patchDownload
From 5ee49e587ad8783c78fcbb6715cd9170838ebb39 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Wed, 25 Sep 2024 14:42:20 +0530
Subject: [PATCH v2] Make default value for susbcription streaming option to
parallel.
Currently default value of streaming option is set to false. All
transactions are fully decoded on the publisher before being sent
to the subscriber. This approach can leads reduced performance,
particularly under heavy load.
Changing default streaming option to parallel, by doing this,
incoming changes will be directly applied by one of the available
parallel apply workers. This method significantly improves the
performance of commit operations.
---
doc/src/sgml/ref/create_subscription.sgml | 19 ++++++++---------
src/backend/commands/subscriptioncmds.c | 2 +-
src/bin/pg_dump/pg_dump.c | 2 ++
src/bin/pg_dump/t/002_pg_dump.pl | 10 ++++-----
src/test/regress/expected/subscription.out | 24 +++++++++++-----------
5 files changed, 29 insertions(+), 28 deletions(-)
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 8a3096e62b..43f1b1f881 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -271,9 +271,13 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
<listitem>
<para>
Specifies whether to enable streaming of in-progress transactions
- for this subscription. The default value is <literal>off</literal>,
- meaning all transactions are fully decoded on the publisher and only
- then sent to the subscriber as a whole.
+ for this subscription. The default value is <literal>parallel</literal>,
+ meaning incoming changes are directly applied via one of the parallel
+ apply workers, if available. If no parallel apply worker is free to
+ handle streaming transactions then the changes are written to
+ temporary files and applied after the transaction is committed. Note
+ that if an error happens in a parallel apply worker, the finish LSN
+ of the remote transaction might not be reported in the server log.
</para>
<para>
@@ -283,13 +287,8 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</para>
<para>
- If set to <literal>parallel</literal>, incoming changes are directly
- applied via one of the parallel apply workers, if available. If no
- parallel apply worker is free to handle streaming transactions then
- the changes are written to temporary files and applied after the
- transaction is committed. Note that if an error happens in a
- parallel apply worker, the finish LSN of the remote transaction
- might not be reported in the server log.
+ If set to <literal>off</literal>, all transactions are fully decoded
+ on the publisher and only then sent to the subscriber as a whole.
</para>
</listitem>
</varlistentry>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 02ccc636b8..0a7a618855 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -151,7 +151,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
if (IsSet(supported_opts, SUBOPT_BINARY))
opts->binary = false;
if (IsSet(supported_opts, SUBOPT_STREAMING))
- opts->streaming = LOGICALREP_STREAM_OFF;
+ opts->streaming = LOGICALREP_STREAM_PARALLEL;
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
opts->twophase = false;
if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 1b47c388ce..d8c6330732 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5235,6 +5235,8 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBufferStr(query, ", streaming = on");
else if (strcmp(subinfo->substream, "p") == 0)
appendPQExpBufferStr(query, ", streaming = parallel");
+ else
+ appendPQExpBufferStr(query, ", streaming = off");
if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
appendPQExpBufferStr(query, ", two_phase = on");
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index ab6c830491..ac60829d68 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -2992,7 +2992,7 @@ my %tests = (
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
WITH (connect = false);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub1');\E
+ \QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub1', streaming = parallel);\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
@@ -3001,9 +3001,9 @@ my %tests = (
create_order => 50,
create_sql => 'CREATE SUBSCRIPTION sub2
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
- WITH (connect = false, origin = none);',
+ WITH (connect = false, origin = none, streaming = off);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', origin = none);\E
+ \QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', streaming = off, origin = none);\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
@@ -3012,9 +3012,9 @@ my %tests = (
create_order => 50,
create_sql => 'CREATE SUBSCRIPTION sub3
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
- WITH (connect = false, origin = any);',
+ WITH (connect = false, origin = any, streaming = on);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3');\E
+ \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 17d48b1685..1443e1d929 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -119,7 +119,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
@@ -127,7 +127,7 @@ ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub3;
@@ -148,7 +148,7 @@ ERROR: invalid connection string syntax: missing "=" after "foobar" in connecti
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -160,7 +160,7 @@ ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -179,7 +179,7 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345
(1 row)
-- ok - with lsn = NONE
@@ -191,7 +191,7 @@ ERROR: invalid WAL location (LSN): 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
BEGIN;
@@ -226,7 +226,7 @@ HINT: Available values: local, remote_write, remote_apply, on, off.
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0
(1 row)
-- rename back to keep the rest simple
@@ -258,7 +258,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
@@ -267,7 +267,7 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -374,7 +374,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- we can alter streaming when two_phase enabled
@@ -412,7 +412,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
@@ -420,7 +420,7 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
--
2.34.1
On Mon, Oct 7, 2024 at 4:03 PM vignesh C <vignesh21@gmail.com> wrote:
With parallel streaming as default, do you think there is a need to
increase the default for 'max_logical_replication_workers' as IIUC
parallel workers are taken from the same pool.
thanks
Shveta
On Tue, Oct 8, 2024 at 2:25 PM shveta malik <shveta.malik@gmail.com> wrote:
On Mon, Oct 7, 2024 at 4:03 PM vignesh C <vignesh21@gmail.com> wrote:
With parallel streaming as default, do you think there is a need to
increase the default for 'max_logical_replication_workers' as IIUC
parallel workers are taken from the same pool.
Good question. But then one may say that we should proportionately
increase max_worker_processes as well. I don't know what should be
reasonable new defaults. I think we should make parallel streaming as
default and then wait for some user feedback before changing other
defaults.
--
With Regards,
Amit Kapila.
On Mon, 7 Oct 2024 at 11:05, vignesh C <vignesh21@gmail.com> wrote:
Hi,
By default, currently streaming of in-progress transactions for
subscriptions is disabled. All transactions are fully decoded on the
publisher before being sent to the subscriber. This approach can lead
to increased latency and reduced performance, particularly under heavy
load. By default, we could enable the parallel streaming option for
subscriptions. By doing this, incoming changes will be directly
applied by one of the available parallel apply workers. This method
significantly improves the performance of commit operations.I conducted a series of tests using logical replication, comparing SQL
execution times with streaming set to both parallel and off. The tests
varied the logical_decoding_work_mem setting and included the
following scenarios: a) Insert, b) Delete, c) Update, d) rollback 5%
records, e) rollback 10% records, f) rollback 20% records, g) rollback
50% records. I have written tap tests for the same, the attached files
can be copied to src/test/subscription/t and the
logical_decoding_work_mem configuration and streaming option in create
subscription command should be changed accordingly before running the
tests. The tests were executed 5 times and the average of them was
taken.
The execution time is in seconds.Insert 5kk records
Logical Decoding mem | Parallel | off | % Improvement
-------------------------------|-------------|---------------|------------------------
64 KB | 37.304 | 69.465 | 46.298
256 KB | 36.327 | 70.671 | 48.597
64 MB | 41.173 | 69.228 | 40.526Delete 5kk records
Logical Decoding mem | Parallel | off | % Improvement
-------------------------------|-------------|---------------|------------------------
64 KB | 42.322 | 69.404 | 39.021
256 KB | 43.250 | 66.973 | 35.422
64 MB | 44.183 | 67.873 | 34.903Update 5kk records
Logical Decoding mem | Parallel | off | % Improvement
-------------------------------|-------------|---------------|------------------------
64 KB | 93.953 | 127.691 | 26.422
256 KB | 94.166 | 128.541 | 26.743
64 MB | 93.367 | 134.275 | 30.465Rollback 05% records
Logical Decoding mem | Parallel | off | % Improvement
-------------------------------|-------------|---------------|------------------------
64 KB | 36.968 | 67.161 | 44.957
256 KB | 38.059 | 68.021 | 44.047
64 MB | 39.431 | 66.878 | 41.041Rollback 10% records
Logical Decoding mem | Parallel | off | % Improvement
-------------------------------|-------------|---------------|------------------------
64 KB | 35.966 | 63.968 | 43.775
256 KB | 36.597 | 64.836 | 43.554
64 MB | 39.069 | 64.357 | 39.292Rollback 20% records
Logical Decoding mem | Parallel | off | % Improvement
-------------------------------|-------------|---------------|------------------------
64 KB | 37.616 | 58.903 | 36.139
256 KB | 37.330 | 58.606 | 36.303
64 MB | 38.720 | 60.236 | 35.720Rollback 50% records
Logical Decoding mem | Parallel | off | % Improvement
-------------------------------|-------------|---------------|------------------------
64 KB | 38.999 | 44.776 | 12.902
256 KB | 36.567 | 44.530 | 17.882
64 MB | 38.592 | 45.346 | 14.893The machine configuration that was used is also attached.
The tests demonstrate a significant performance improvement when using
the parallel streaming option, insert shows 40-48 %improvement, delete
shows 34-39 %improvement, update shows 26-30 %improvement. In the case
of rollback the improvement is between 12-44%, the improvement
slightly reduces with larger amounts of data being rolled back in this
case. If there's a significant amount of data to roll back, the
performance of streaming in parallel may be comparable to or slightly
lower in some instances. However, this is acceptable since commit
operations are generally more frequent than rollback operations.One key point to consider is that the lock on transaction objects will
be held for a longer duration when using streaming in parallel. This
occurs because the parallel apply worker initiates the transaction as
soon as streaming begins, maintaining the lock until the transaction
is fully completed. As a result, for long-running transactions, this
extended lock can hinder concurrent access that requires a lock.Since there is a significant percentage improvement, we should make
the default subscription streaming option parallel. Attached patch has
the change for the same.
Thoughts?All of these tests were conducted with both the publisher and
subscriber on the same host. I will perform additional tests with one
of the logical replication nodes on a different host and share the
results later.
I have run the tests with publisher and subscriber running on
different hosts in synchronous replication mode. The tests were
executed 5 times and the average of them was taken. The scripts that
were used for the tests are attached.
The test results for the same are:
Operation/Streaming | Parallel | Off | % improvement
-----------------------------|------------------|--------------|-------------------------
Insert | 30.44 | 50.28 | 39.45
Delete | 27.66 | 46.80 | 40.89
Update | 57.37 | 90.16 | 36.37
Rollback 5% | 28.57 | 47.49 | 39.82
Rollback 10% | 28.63 | 46.25 | 38.09
Rollback 20% | 28.16 | 42.66 | 33.99
Rollback 50% | 28.33 | 34.78 | 18.53
The tests indicate a notable performance boost with the parallel
streaming option: inserts improved by 39%, deletes by 41%, and updates
by 36%. For rollback operations, the improvement ranges from 18% to
39%. Similar to the results observed in logical replication when both
publisher and subscriber are on the same host (as mentioned in [1]/messages/by-id/CALDaNm1=MedhW23NuoePJTmonwsMSp80ddsw+sEJs0GUMC_kqQ@mail.gmail.com),
performance results show that running the publisher and subscriber on
different hosts with the parallel streaming option outperforms the off
option.
[1]: /messages/by-id/CALDaNm1=MedhW23NuoePJTmonwsMSp80ddsw+sEJs0GUMC_kqQ@mail.gmail.com
Regards,
Vignesh
Attachments:
On Tue, Oct 8, 2024 at 3:38 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Tue, Oct 8, 2024 at 2:25 PM shveta malik <shveta.malik@gmail.com> wrote:
On Mon, Oct 7, 2024 at 4:03 PM vignesh C <vignesh21@gmail.com> wrote:
With parallel streaming as default, do you think there is a need to
increase the default for 'max_logical_replication_workers' as IIUC
parallel workers are taken from the same pool.Good question. But then one may say that we should proportionately
increase max_worker_processes as well. I don't know what should be
reasonable new defaults.
Agreed on the same query on the next level of max_worker_processes .
I think we should make parallel streaming as
default and then wait for some user feedback before changing other
defaults.
Okay, sounds good to me. It is not a blocking factor anyway, user can
always change it to a higher value in case his requirement is like
that.
thanks
Shveta
On Tue, Oct 8, 2024 at 3:38 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Tue, Oct 8, 2024 at 2:25 PM shveta malik <shveta.malik@gmail.com>
wrote:On Mon, Oct 7, 2024 at 4:03 PM vignesh C <vignesh21@gmail.com> wrote:
With parallel streaming as default, do you think there is a need to
increase the default for 'max_logical_replication_workers' as IIUC
parallel workers are taken from the same pool.Good question. But then one may say that we should proportionately
increase max_worker_processes as well. I don't know what should be
reasonable new defaults. I think we should make parallel streaming as
default and then wait for some user feedback before changing other
defaults.
I agree, actually streaming of in progress transactions is a useful feature
for performance in case of large transactions, so it makes sense to make it
"on" by default. So +1 from my side.
--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
On Fri, Oct 18, 2024 at 9:48 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
On Tue, Oct 8, 2024 at 3:38 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Tue, Oct 8, 2024 at 2:25 PM shveta malik <shveta.malik@gmail.com> wrote:
On Mon, Oct 7, 2024 at 4:03 PM vignesh C <vignesh21@gmail.com> wrote:
With parallel streaming as default, do you think there is a need to
increase the default for 'max_logical_replication_workers' as IIUC
parallel workers are taken from the same pool.Good question. But then one may say that we should proportionately
increase max_worker_processes as well. I don't know what should be
reasonable new defaults. I think we should make parallel streaming as
default and then wait for some user feedback before changing other
defaults.I agree, actually streaming of in progress transactions is a useful feature for performance in case of large transactions, so it makes sense to make it "on" by default. So +1 from my side.
Your response is confusing. AFAIU, this proposal is to change the
default value of the streaming option to 'parallel' but you are
suggesting to make 'on' as default which is different from the
proposed default but OTOH you are saying +1 as well. So, both can't be
true.
--
With Regards,
Amit Kapila.
On Fri, 18 Oct 2024 at 5:24 PM, Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Oct 18, 2024 at 9:48 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
On Tue, Oct 8, 2024 at 3:38 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:
On Tue, Oct 8, 2024 at 2:25 PM shveta malik <shveta.malik@gmail.com>
wrote:
On Mon, Oct 7, 2024 at 4:03 PM vignesh C <vignesh21@gmail.com> wrote:
With parallel streaming as default, do you think there is a need to
increase the default for 'max_logical_replication_workers' as IIUC
parallel workers are taken from the same pool.Good question. But then one may say that we should proportionately
increase max_worker_processes as well. I don't know what should be
reasonable new defaults. I think we should make parallel streaming as
default and then wait for some user feedback before changing other
defaults.I agree, actually streaming of in progress transactions is a useful
feature for performance in case of large transactions, so it makes sense to
make it "on" by default. So +1 from my side.Your response is confusing. AFAIU, this proposal is to change the
default value of the streaming option to 'parallel' but you are
suggesting to make 'on' as default which is different from the
proposed default but OTOH you are saying +1 as well. So, both can't be
true.
Sorry for confusion I meant to say change default as ‘parallel’
—
Dilip
Show quoted text
On Mon, Oct 7, 2024 at 4:03 PM vignesh C <vignesh21@gmail.com> wrote:
On Mon, 7 Oct 2024 at 12:26, Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:One comment for your patch;
Shouldn't we add a streaming=off case for pg_dump test? You added lines but no one
passes it.Update existing pg_dump tests to cover the streaming options, the
attached patch has the changes for the same.
@@ -5235,6 +5235,8 @@ dumpSubscription(Archive *fout, const
SubscriptionInfo *subinfo)
appendPQExpBufferStr(query, ", streaming = on");
else if (strcmp(subinfo->substream, "p") == 0)
appendPQExpBufferStr(query, ", streaming = parallel");
+ else
+ appendPQExpBufferStr(query, ", streaming = off");
For newer versions (>=18), we shouldn't need to specify "streaming =
parallel" while creating a subscription as that will be the default.
However, with the above code pg_dump statements will still have that.
There is nothing wrong with that but ideally, it won't be required.
Now, OTOH, doing that would require some version-handling code, which
is nothing new for pg_dump but not sure it makes sense for this
particular case. Another related point is that normally we don't
recommend people to use dump generated with pg_dump to use with lower
server versions than pg_dump itself, but the current proposed patch
will allow that. However, if we change it as I am saying that won't be
possible. So, I am okay with the current code but want to see if
anyone else thinks otherwise or if I am missing something.
--
With Regards,
Amit Kapila.
On Mon, Oct 7, 2024 at 11:05 AM vignesh C <vignesh21@gmail.com> wrote:
The tests demonstrate a significant performance improvement when using
the parallel streaming option, insert shows 40-48 %improvement, delete
shows 34-39 %improvement, update shows 26-30 %improvement. In the case
of rollback the improvement is between 12-44%, the improvement
slightly reduces with larger amounts of data being rolled back in this
case. If there's a significant amount of data to roll back, the
performance of streaming in parallel may be comparable to or slightly
lower in some instances. However, this is acceptable since commit
operations are generally more frequent than rollback operations.One key point to consider is that the lock on transaction objects will
be held for a longer duration when using streaming in parallel. This
occurs because the parallel apply worker initiates the transaction as
soon as streaming begins, maintaining the lock until the transaction
is fully completed. As a result, for long-running transactions, this
extended lock can hinder concurrent access that requires a lock.
The longer-running transactions will anyway have a risk of deadlocks
or longer waits if there are concurrent operations on the subscribers.
However, with parallel apply, there is a risk of deadlock among the
leader and parallel workers when the schema in publisher and
subscriber is different. Say the subscriber has a unique constraint
that the publisher doesn't have. See the comments in this regard atop
applyparallelworker.c in the "Locking Considerations" section. Having
said that, the apply workers will detect deadlock in such cases and
will retry to apply the errored-out transaction. So, there is a
self-healing in-built mechanism and in such cases, we anyway have a
risk of UNIQUE_KEY conflict ERRORs which in most cases would require
manual intervention.
Since there is a significant percentage improvement, we should make
the default subscription streaming option parallel.
This makes sense to me. I think it would be better to add a Note or
Warning in the docs for the risk of deadlock when the schema of
publisher and subscriber is not the same even though such cases should
be less.
--
With Regards,
Amit Kapila.
On Mon, Oct 21, 2024 at 5:09 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Oct 7, 2024 at 4:03 PM vignesh C <vignesh21@gmail.com> wrote:
On Mon, 7 Oct 2024 at 12:26, Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:One comment for your patch;
Shouldn't we add a streaming=off case for pg_dump test? You added lines but no one
passes it.Update existing pg_dump tests to cover the streaming options, the
attached patch has the changes for the same.@@ -5235,6 +5235,8 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) appendPQExpBufferStr(query, ", streaming = on"); else if (strcmp(subinfo->substream, "p") == 0) appendPQExpBufferStr(query, ", streaming = parallel"); + else + appendPQExpBufferStr(query, ", streaming = off");For newer versions (>=18), we shouldn't need to specify "streaming =
parallel" while creating a subscription as that will be the default.
However, with the above code pg_dump statements will still have that.
There is nothing wrong with that but ideally, it won't be required.
Now, OTOH, doing that would require some version-handling code, which
is nothing new for pg_dump but not sure it makes sense for this
particular case. Another related point is that normally we don't
recommend people to use dump generated with pg_dump to use with lower
server versions than pg_dump itself, but the current proposed patch
will allow that. However, if we change it as I am saying that won't be
possible.
Leaving the patch as-is seems better to me.
PROS
- The simple code explicitly setting all parameter values is easy to
understand as-is.
- AFAICT it works for all that the pg_dump docs [1]https://www.postgresql.org/docs/devel/app-pgdump.html guarantees.
- No version handling code will be needed...
- Therefore, no risk of accidentally introducing any versioning bugs.
- Yields a more portable dump file (even though not guaranteed by pg_dump docs)
CONS
- A few more chars in the dump file?
- What else?
======
[1]: https://www.postgresql.org/docs/devel/app-pgdump.html
Kind Regards,
Peter Smith.
Fujitsu Australia
On Mon, 21 Oct 2024 at 14:36, Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Oct 7, 2024 at 11:05 AM vignesh C <vignesh21@gmail.com> wrote:
The tests demonstrate a significant performance improvement when using
the parallel streaming option, insert shows 40-48 %improvement, delete
shows 34-39 %improvement, update shows 26-30 %improvement. In the case
of rollback the improvement is between 12-44%, the improvement
slightly reduces with larger amounts of data being rolled back in this
case. If there's a significant amount of data to roll back, the
performance of streaming in parallel may be comparable to or slightly
lower in some instances. However, this is acceptable since commit
operations are generally more frequent than rollback operations.One key point to consider is that the lock on transaction objects will
be held for a longer duration when using streaming in parallel. This
occurs because the parallel apply worker initiates the transaction as
soon as streaming begins, maintaining the lock until the transaction
is fully completed. As a result, for long-running transactions, this
extended lock can hinder concurrent access that requires a lock.The longer-running transactions will anyway have a risk of deadlocks
or longer waits if there are concurrent operations on the subscribers.
However, with parallel apply, there is a risk of deadlock among the
leader and parallel workers when the schema in publisher and
subscriber is different. Say the subscriber has a unique constraint
that the publisher doesn't have. See the comments in this regard atop
applyparallelworker.c in the "Locking Considerations" section. Having
said that, the apply workers will detect deadlock in such cases and
will retry to apply the errored-out transaction. So, there is a
self-healing in-built mechanism and in such cases, we anyway have a
risk of UNIQUE_KEY conflict ERRORs which in most cases would require
manual intervention.Since there is a significant percentage improvement, we should make
the default subscription streaming option parallel.This makes sense to me. I think it would be better to add a Note or
Warning in the docs for the risk of deadlock when the schema of
publisher and subscriber is not the same even though such cases should
be less.
Yes this can happen like scenarios below(with deadlock_timeout = 10ms):
Publisher:
CREATE TABLE t1(c1 int);
create publication pub1 for table t1;
Subscriber has an addition index on the table:
CREATE TABLE t1(c1 int);
CREATE UNIQUE INDEX idx1 on t1(c1);
Create subscription ...;
Publisher:
Session1:
Begin;
INSERT INTO t1 SELECT i FROM generate_series(1, 5000) s(i);
Session2:
-- Insert the record that is already inserted in session1
INSERT INTO t1 value(1);
Session1:
Commit;
In this case a deadlock will occur.
Attached v3 version patch has a caution added for the same.
Regards,
Vignesh
Attachments:
v3-0001-Make-default-value-for-susbcription-streaming-opt.patchtext/x-patch; charset=US-ASCII; name=v3-0001-Make-default-value-for-susbcription-streaming-opt.patchDownload
From a56f565689b8b18573acd873a097a213af6c6722 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Wed, 25 Sep 2024 14:42:20 +0530
Subject: [PATCH v3] Make default value for susbcription streaming option to
parallel.
Currently default value of streaming option is set to false. All
transactions are fully decoded on the publisher before being sent
to the subscriber. This approach can leads reduced performance,
particularly under heavy load.
Changing default streaming option to parallel, by doing this,
incoming changes will be directly applied by one of the available
parallel apply workers. This method significantly improves the
performance of commit operations.
---
doc/src/sgml/ref/create_subscription.sgml | 27 ++++++++++++++--------
src/backend/commands/subscriptioncmds.c | 2 +-
src/bin/pg_dump/pg_dump.c | 2 ++
src/bin/pg_dump/t/002_pg_dump.pl | 10 ++++----
src/test/regress/expected/subscription.out | 24 +++++++++----------
5 files changed, 37 insertions(+), 28 deletions(-)
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 8a3096e62b..3eab06bd2d 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -271,11 +271,23 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
<listitem>
<para>
Specifies whether to enable streaming of in-progress transactions
- for this subscription. The default value is <literal>off</literal>,
- meaning all transactions are fully decoded on the publisher and only
- then sent to the subscriber as a whole.
+ for this subscription. The default value is <literal>parallel</literal>,
+ meaning incoming changes are directly applied via one of the parallel
+ apply workers, if available. If no parallel apply worker is free to
+ handle streaming transactions then the changes are written to
+ temporary files and applied after the transaction is committed. Note
+ that if an error happens in a parallel apply worker, the finish LSN
+ of the remote transaction might not be reported in the server log.
</para>
+ <caution>
+ <para>
+ There is a risk of deadlock when the schemas of the publisher and
+ subscriber differ, although such cases are rare. The apply worker
+ is equipped to automatically retry these transactions.
+ </para>
+ </caution>
+
<para>
If set to <literal>on</literal>, the incoming changes are written to
temporary files and then applied only after the transaction is
@@ -283,13 +295,8 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</para>
<para>
- If set to <literal>parallel</literal>, incoming changes are directly
- applied via one of the parallel apply workers, if available. If no
- parallel apply worker is free to handle streaming transactions then
- the changes are written to temporary files and applied after the
- transaction is committed. Note that if an error happens in a
- parallel apply worker, the finish LSN of the remote transaction
- might not be reported in the server log.
+ If set to <literal>off</literal>, all transactions are fully decoded
+ on the publisher and only then sent to the subscriber as a whole.
</para>
</listitem>
</varlistentry>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 02ccc636b8..0a7a618855 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -151,7 +151,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
if (IsSet(supported_opts, SUBOPT_BINARY))
opts->binary = false;
if (IsSet(supported_opts, SUBOPT_STREAMING))
- opts->streaming = LOGICALREP_STREAM_OFF;
+ opts->streaming = LOGICALREP_STREAM_PARALLEL;
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
opts->twophase = false;
if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 1d79865058..19e3d24326 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5248,6 +5248,8 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBufferStr(query, ", streaming = on");
else if (strcmp(subinfo->substream, "p") == 0)
appendPQExpBufferStr(query, ", streaming = parallel");
+ else
+ appendPQExpBufferStr(query, ", streaming = off");
if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
appendPQExpBufferStr(query, ", two_phase = on");
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 91a4c63744..213904440f 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -3002,7 +3002,7 @@ my %tests = (
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
WITH (connect = false);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub1');\E
+ \QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub1', streaming = parallel);\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
@@ -3011,9 +3011,9 @@ my %tests = (
create_order => 50,
create_sql => 'CREATE SUBSCRIPTION sub2
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
- WITH (connect = false, origin = none);',
+ WITH (connect = false, origin = none, streaming = off);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', origin = none);\E
+ \QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', streaming = off, origin = none);\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
@@ -3022,9 +3022,9 @@ my %tests = (
create_order => 50,
create_sql => 'CREATE SUBSCRIPTION sub3
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
- WITH (connect = false, origin = any);',
+ WITH (connect = false, origin = any, streaming = on);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3');\E
+ \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 17d48b1685..1443e1d929 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -119,7 +119,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
@@ -127,7 +127,7 @@ ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub3;
@@ -148,7 +148,7 @@ ERROR: invalid connection string syntax: missing "=" after "foobar" in connecti
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -160,7 +160,7 @@ ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -179,7 +179,7 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345
(1 row)
-- ok - with lsn = NONE
@@ -191,7 +191,7 @@ ERROR: invalid WAL location (LSN): 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
BEGIN;
@@ -226,7 +226,7 @@ HINT: Available values: local, remote_write, remote_apply, on, off.
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0
(1 row)
-- rename back to keep the rest simple
@@ -258,7 +258,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
@@ -267,7 +267,7 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -374,7 +374,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- we can alter streaming when two_phase enabled
@@ -412,7 +412,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
@@ -420,7 +420,7 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
--
2.34.1
On Mon, Oct 21, 2024 at 8:40 PM vignesh C <vignesh21@gmail.com> wrote:
Attached v3 version patch has a caution added for the same.
Thanks, the patch looks good to me and I am planning to commit this
early next week unless there are objections or any major problems. I
have slightly updated the docs and commit message. Few more points to
consider:
1. Please ensure that none of the existing tests that use
subscriptions with large changes will be impacted due to this change.
2. The pg_createsubscriber utility uses CREATE SUBSCRIPTION statement
and after this change, it will enable parallel mode by default which I
think is a good idea as users won't need to do that manually after
running the tool. Do you see any problem with this?
--
With Regards,
Amit Kapila.
Attachments:
v4-0001-Change-the-default-value-of-the-streaming-option-.patchapplication/octet-stream; name=v4-0001-Change-the-default-value-of-the-streaming-option-.patchDownload
From 1659ee78ce6acba3e1dbc7516807fa6f3427a3e8 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Wed, 25 Sep 2024 14:42:20 +0530
Subject: [PATCH v5] Change the default value of the streaming option to
'parallel'.
Previously the default value of this option was 'off'. The parallel option
indicates that the changes in large transactions (greater than
logical_decoding_work_mem) are to be applied directly via one of the
parallel apply workers, if available.
The parallel mode was introduced in 16, but we refrain from enabling it by
default to avoid seeing any unpleasant behavior in the existing
applications. However we haven't found any such report yet, so this is a
good time to enable it by default.
Reported-by: Vignesh C
Author: Hayato Kuroda, Amit Kapila
Discussion: https://postgr.es/m/CALDaNm1=MedhW23NuoePJTmonwsMSp80ddsw+sEJs0GUMC_kqQ@mail.gmail.com
---
doc/src/sgml/ref/create_subscription.sgml | 27 ++++++++++++++--------
src/backend/commands/subscriptioncmds.c | 2 +-
src/bin/pg_dump/pg_dump.c | 2 ++
src/bin/pg_dump/t/002_pg_dump.pl | 10 ++++----
src/test/regress/expected/subscription.out | 24 +++++++++----------
5 files changed, 37 insertions(+), 28 deletions(-)
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 8a3096e62b..6cf7d4f9a1 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -271,11 +271,23 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
<listitem>
<para>
Specifies whether to enable streaming of in-progress transactions
- for this subscription. The default value is <literal>off</literal>,
- meaning all transactions are fully decoded on the publisher and only
- then sent to the subscriber as a whole.
+ for this subscription. The default value is <literal>parallel</literal>,
+ meaning incoming changes are directly applied via one of the parallel
+ apply workers, if available. If no parallel apply worker is free to
+ handle streaming transactions then the changes are written to
+ temporary files and applied after the transaction is committed. Note
+ that if an error happens in a parallel apply worker, the finish LSN
+ of the remote transaction might not be reported in the server log.
</para>
+ <caution>
+ <para>
+ There is a risk of deadlock when the schemas of the publisher and
+ subscriber differ, although such cases are rare. The apply worker
+ is equipped to retry these transactions automatically.
+ </para>
+ </caution>
+
<para>
If set to <literal>on</literal>, the incoming changes are written to
temporary files and then applied only after the transaction is
@@ -283,13 +295,8 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</para>
<para>
- If set to <literal>parallel</literal>, incoming changes are directly
- applied via one of the parallel apply workers, if available. If no
- parallel apply worker is free to handle streaming transactions then
- the changes are written to temporary files and applied after the
- transaction is committed. Note that if an error happens in a
- parallel apply worker, the finish LSN of the remote transaction
- might not be reported in the server log.
+ If set to <literal>off</literal>, all transactions are fully decoded
+ on the publisher and only then sent to the subscriber as a whole.
</para>
</listitem>
</varlistentry>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 02ccc636b8..0a7a618855 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -151,7 +151,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
if (IsSet(supported_opts, SUBOPT_BINARY))
opts->binary = false;
if (IsSet(supported_opts, SUBOPT_STREAMING))
- opts->streaming = LOGICALREP_STREAM_OFF;
+ opts->streaming = LOGICALREP_STREAM_PARALLEL;
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
opts->twophase = false;
if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 1b47c388ce..d8c6330732 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5235,6 +5235,8 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBufferStr(query, ", streaming = on");
else if (strcmp(subinfo->substream, "p") == 0)
appendPQExpBufferStr(query, ", streaming = parallel");
+ else
+ appendPQExpBufferStr(query, ", streaming = off");
if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
appendPQExpBufferStr(query, ", two_phase = on");
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index ab6c830491..ac60829d68 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -2992,7 +2992,7 @@ my %tests = (
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
WITH (connect = false);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub1');\E
+ \QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub1', streaming = parallel);\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
@@ -3001,9 +3001,9 @@ my %tests = (
create_order => 50,
create_sql => 'CREATE SUBSCRIPTION sub2
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
- WITH (connect = false, origin = none);',
+ WITH (connect = false, origin = none, streaming = off);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', origin = none);\E
+ \QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', streaming = off, origin = none);\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
@@ -3012,9 +3012,9 @@ my %tests = (
create_order => 50,
create_sql => 'CREATE SUBSCRIPTION sub3
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
- WITH (connect = false, origin = any);',
+ WITH (connect = false, origin = any, streaming = on);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3');\E
+ \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 17d48b1685..1443e1d929 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -119,7 +119,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
@@ -127,7 +127,7 @@ ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub3;
@@ -148,7 +148,7 @@ ERROR: invalid connection string syntax: missing "=" after "foobar" in connecti
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -160,7 +160,7 @@ ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -179,7 +179,7 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345
(1 row)
-- ok - with lsn = NONE
@@ -191,7 +191,7 @@ ERROR: invalid WAL location (LSN): 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
BEGIN;
@@ -226,7 +226,7 @@ HINT: Available values: local, remote_write, remote_apply, on, off.
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0
(1 row)
-- rename back to keep the rest simple
@@ -258,7 +258,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
@@ -267,7 +267,7 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -374,7 +374,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- we can alter streaming when two_phase enabled
@@ -412,7 +412,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
@@ -420,7 +420,7 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
--
2.28.0.windows.1
Dear Amit, Vignesh,
1. Please ensure that none of the existing tests that use
subscriptions with large changes will be impacted due to this change.
I found at least 022_twophase_cascade.pl should be fixed.
The file has a part which tests non-streaming case:
```
# -----------------------
# 2PC NON-STREAMING TESTS
# -----------------------
...
$node_B->safe_psql(
'postgres', "
CREATE SUBSCRIPTION tap_sub_B
CONNECTION '$node_A_connstr application_name=$appname_B'
PUBLICATION tap_pub_A
WITH (two_phase = on)");
...
```
I know the streaming actually does not happen because few tuples will be inserted
later, but creating as streaming=parallel is bit misleading.
I checked other files as well but I couldn't find what we should fix.
2. The pg_createsubscriber utility uses CREATE SUBSCRIPTION statement
and after this change, it will enable parallel mode by default which I
think is a good idea as users won't need to do that manually after
running the tool. Do you see any problem with this?
I also think it is okay. IIUC, there were no specific reasons to create
subscriptions with streaming=off, it was chosen because it was a default.
I cannot find strong reasons to keep current behavior.
Best regards,
Hayato Kuroda
FUJITSU LIMITED
On Tue, 22 Oct 2024 at 16:24, Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
Dear Amit, Vignesh,
1. Please ensure that none of the existing tests that use
subscriptions with large changes will be impacted due to this change.I found at least 022_twophase_cascade.pl should be fixed.
The file has a part which tests non-streaming case:```
# -----------------------
# 2PC NON-STREAMING TESTS
# -----------------------
...
$node_B->safe_psql(
'postgres', "
CREATE SUBSCRIPTION tap_sub_B
CONNECTION '$node_A_connstr application_name=$appname_B'
PUBLICATION tap_pub_A
WITH (two_phase = on)");
...
```I know the streaming actually does not happen because few tuples will be inserted
later, but creating as streaming=parallel is bit misleading.
The attached v5 version has the change to create subscriptions in
streaming off mode. I also did not find any other TAP test which
required further changes.
Regards,
Vignesh
Attachments:
v5-0001-Change-the-default-value-of-the-streaming-option-.patchtext/x-patch; charset=US-ASCII; name=v5-0001-Change-the-default-value-of-the-streaming-option-.patchDownload
From 479006dc72c2b101884349e3185128ffaf25b5a0 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Wed, 25 Sep 2024 14:42:20 +0530
Subject: [PATCH v5] Change the default value of the streaming option to
'parallel'.
Previously the default value of this option was 'off'. The parallel option
indicates that the changes in large transactions (greater than
logical_decoding_work_mem) are to be applied directly via one of the
parallel apply workers, if available.
The parallel mode was introduced in 16, but we refrain from enabling it by
default to avoid seeing any unpleasant behavior in the existing
applications. However we haven't found any such report yet, so this is a
good time to enable it by default.
Reported-by: Vignesh C
Author: Hayato Kuroda, Amit Kapila
Discussion: https://postgr.es/m/CALDaNm1=MedhW23NuoePJTmonwsMSp80ddsw+sEJs0GUMC_kqQ@mail.gmail.com
---
doc/src/sgml/ref/create_subscription.sgml | 27 ++++++++++++-------
src/backend/commands/subscriptioncmds.c | 2 +-
src/bin/pg_dump/pg_dump.c | 2 ++
src/bin/pg_dump/t/002_pg_dump.pl | 10 +++----
src/test/regress/expected/subscription.out | 24 ++++++++---------
.../subscription/t/022_twophase_cascade.pl | 4 +--
6 files changed, 39 insertions(+), 30 deletions(-)
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 8a3096e62b..6cf7d4f9a1 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -271,11 +271,23 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
<listitem>
<para>
Specifies whether to enable streaming of in-progress transactions
- for this subscription. The default value is <literal>off</literal>,
- meaning all transactions are fully decoded on the publisher and only
- then sent to the subscriber as a whole.
+ for this subscription. The default value is <literal>parallel</literal>,
+ meaning incoming changes are directly applied via one of the parallel
+ apply workers, if available. If no parallel apply worker is free to
+ handle streaming transactions then the changes are written to
+ temporary files and applied after the transaction is committed. Note
+ that if an error happens in a parallel apply worker, the finish LSN
+ of the remote transaction might not be reported in the server log.
</para>
+ <caution>
+ <para>
+ There is a risk of deadlock when the schemas of the publisher and
+ subscriber differ, although such cases are rare. The apply worker
+ is equipped to retry these transactions automatically.
+ </para>
+ </caution>
+
<para>
If set to <literal>on</literal>, the incoming changes are written to
temporary files and then applied only after the transaction is
@@ -283,13 +295,8 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</para>
<para>
- If set to <literal>parallel</literal>, incoming changes are directly
- applied via one of the parallel apply workers, if available. If no
- parallel apply worker is free to handle streaming transactions then
- the changes are written to temporary files and applied after the
- transaction is committed. Note that if an error happens in a
- parallel apply worker, the finish LSN of the remote transaction
- might not be reported in the server log.
+ If set to <literal>off</literal>, all transactions are fully decoded
+ on the publisher and only then sent to the subscriber as a whole.
</para>
</listitem>
</varlistentry>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 02ccc636b8..0a7a618855 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -151,7 +151,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
if (IsSet(supported_opts, SUBOPT_BINARY))
opts->binary = false;
if (IsSet(supported_opts, SUBOPT_STREAMING))
- opts->streaming = LOGICALREP_STREAM_OFF;
+ opts->streaming = LOGICALREP_STREAM_PARALLEL;
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
opts->twophase = false;
if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 1b47c388ce..d8c6330732 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5235,6 +5235,8 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBufferStr(query, ", streaming = on");
else if (strcmp(subinfo->substream, "p") == 0)
appendPQExpBufferStr(query, ", streaming = parallel");
+ else
+ appendPQExpBufferStr(query, ", streaming = off");
if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
appendPQExpBufferStr(query, ", two_phase = on");
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index ab6c830491..ac60829d68 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -2992,7 +2992,7 @@ my %tests = (
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
WITH (connect = false);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub1');\E
+ \QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub1', streaming = parallel);\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
@@ -3001,9 +3001,9 @@ my %tests = (
create_order => 50,
create_sql => 'CREATE SUBSCRIPTION sub2
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
- WITH (connect = false, origin = none);',
+ WITH (connect = false, origin = none, streaming = off);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', origin = none);\E
+ \QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', streaming = off, origin = none);\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
@@ -3012,9 +3012,9 @@ my %tests = (
create_order => 50,
create_sql => 'CREATE SUBSCRIPTION sub3
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
- WITH (connect = false, origin = any);',
+ WITH (connect = false, origin = any, streaming = on);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3');\E
+ \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 17d48b1685..1443e1d929 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -119,7 +119,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
@@ -127,7 +127,7 @@ ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub3;
@@ -148,7 +148,7 @@ ERROR: invalid connection string syntax: missing "=" after "foobar" in connecti
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -160,7 +160,7 @@ ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -179,7 +179,7 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345
(1 row)
-- ok - with lsn = NONE
@@ -191,7 +191,7 @@ ERROR: invalid WAL location (LSN): 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
BEGIN;
@@ -226,7 +226,7 @@ HINT: Available values: local, remote_write, remote_apply, on, off.
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0
(1 row)
-- rename back to keep the rest simple
@@ -258,7 +258,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
@@ -267,7 +267,7 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -374,7 +374,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- we can alter streaming when two_phase enabled
@@ -412,7 +412,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
@@ -420,7 +420,7 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl
index 1acc79f17e..7fe7c7c175 100644
--- a/src/test/subscription/t/022_twophase_cascade.pl
+++ b/src/test/subscription/t/022_twophase_cascade.pl
@@ -88,7 +88,7 @@ $node_B->safe_psql(
CREATE SUBSCRIPTION tap_sub_B
CONNECTION '$node_A_connstr application_name=$appname_B'
PUBLICATION tap_pub_A
- WITH (two_phase = on)");
+ WITH (two_phase = on, streaming = off)");
# node_B (pub) -> node_C (sub)
my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
@@ -100,7 +100,7 @@ $node_C->safe_psql(
CREATE SUBSCRIPTION tap_sub_C
CONNECTION '$node_B_connstr application_name=$appname_C'
PUBLICATION tap_pub_B
- WITH (two_phase = on)");
+ WITH (two_phase = on, streaming = off)");
# Wait for subscribers to finish initialization
$node_A->wait_for_catchup($appname_B);
--
2.34.1
On Tue, Oct 22, 2024 at 2:17 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Oct 21, 2024 at 8:40 PM vignesh C <vignesh21@gmail.com> wrote:
Attached v3 version patch has a caution added for the same.
Thanks, the patch looks good to me and I am planning to commit this
early next week unless there are objections or any major problems. I
have slightly updated the docs and commit message. Few more points to
consider:
Another point is that streaming = 'on' will be used by default if a
v18 subscriber connects to a v15 or older publisher. I think it would
not be a problem but worth considering the side effects.
Regards,
--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
Hi Vignesh, here are some review comments for the patch v5-0001 (docs only).
======
Commit message
1.
The commit message only refers to this as the "streaming option", but
an option of what? Somewhere we should mention this is an option of
CREATE SUBSCRIPTION.
~
2.
Previously the default value of this option was 'off'. The parallel option
indicates that the changes in large transactions (greater than
logical_decoding_work_mem) are to be applied directly via one of the
parallel apply workers, if available.
~
typo - /parallel option/'parallel' mode/
typo - /parallel mode/'parallel' mode/
typo - /we refrain from enabling it/we refrained from enabling it/
======
doc/src/sgml/ref/create_subscription.sgml
3.
<para>
Specifies whether to enable streaming of in-progress transactions
- for this subscription. The default value is <literal>off</literal>,
- meaning all transactions are fully decoded on the publisher and only
- then sent to the subscriber as a whole.
+ for this subscription. The default value is
<literal>parallel</literal>,
+ meaning incoming changes are directly applied via one of the parallel
+ apply workers, if available. If no parallel apply worker is free to
+ handle streaming transactions then the changes are written to
+ temporary files and applied after the transaction is committed. Note
+ that if an error happens in a parallel apply worker, the finish LSN
+ of the remote transaction might not be reported in the server log.
</para>
The other enum values have separate paragraphs:
- "If set to 'on'" and
- "If set to 'off'"
I felt the 'parallel' value description should have this same style --
e.g. a separate paragraph saying:
- "If set to 'parallel' (the default value)...".
IMO, doing this makes the 3 available enum values much clearer.
Please take a look at the attachment where I've made this suggested change.
======
Kind Regards,
Peter Smith.
Fujitsu Australia
Attachments:
PS_V5_DOCS_DIFF.txttext/plain; charset=US-ASCII; name=PS_V5_DOCS_DIFF.txtDownload
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 6cf7d4f..b4763c4 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -271,8 +271,12 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
<listitem>
<para>
Specifies whether to enable streaming of in-progress transactions
- for this subscription. The default value is <literal>parallel</literal>,
- meaning incoming changes are directly applied via one of the parallel
+ for this subscription.
+ </para>
+
+ <para>
+ If set to <literal>parallel</literal> (the default value),
+ incoming changes are directly applied via one of the parallel
apply workers, if available. If no parallel apply worker is free to
handle streaming transactions then the changes are written to
temporary files and applied after the transaction is committed. Note
On Wed, Oct 23, 2024 at 5:58 AM Peter Smith <smithpb2250@gmail.com> wrote:
Hi Vignesh, here are some review comments for the patch v5-0001 (docs only).
======
Commit message1.
The commit message only refers to this as the "streaming option", but
an option of what? Somewhere we should mention this is an option of
CREATE SUBSCRIPTION.
I think we can change the first line to: "Previously the default value
of streaming option for a subscription was 'off'...."
======
doc/src/sgml/ref/create_subscription.sgml3. <para> Specifies whether to enable streaming of in-progress transactions - for this subscription. The default value is <literal>off</literal>, - meaning all transactions are fully decoded on the publisher and only - then sent to the subscriber as a whole. + for this subscription. The default value is <literal>parallel</literal>, + meaning incoming changes are directly applied via one of the parallel + apply workers, if available. If no parallel apply worker is free to + handle streaming transactions then the changes are written to + temporary files and applied after the transaction is committed. Note + that if an error happens in a parallel apply worker, the finish LSN + of the remote transaction might not be reported in the server log. </para>The other enum values have separate paragraphs:
- "If set to 'on'" and
- "If set to 'off'"I felt the 'parallel' value description should have this same style --
e.g. a separate paragraph saying:
- "If set to 'parallel' (the default value)...".IMO, doing this makes the 3 available enum values much clearer.
The currently proposed way is better as it maintains the description
flow. With your proposal, there is some repetition and it is not
making things significantly better. This is a matter of opinion, so I
leave it to others to see if they have any opinions.
--
With Regards,
Amit Kapila.
On Wed, Oct 23, 2024 at 1:21 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Tue, Oct 22, 2024 at 2:17 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Oct 21, 2024 at 8:40 PM vignesh C <vignesh21@gmail.com> wrote:
Attached v3 version patch has a caution added for the same.
Thanks, the patch looks good to me and I am planning to commit this
early next week unless there are objections or any major problems. I
have slightly updated the docs and commit message. Few more points to
consider:Another point is that streaming = 'on' will be used by default if a
v18 subscriber connects to a v15 or older publisher.
Right, but older publishers shouldn't be less than 14.
I think it would
not be a problem but worth considering the side effects.
I couldn't think of any side effects but if you or someone else sees
any problems then we can discuss those.
--
With Regards,
Amit Kapila.
On Tue, Oct 22, 2024 at 9:26 PM vignesh C <vignesh21@gmail.com> wrote:
The attached v5 version has the change to create subscriptions in
streaming off mode. I also did not find any other TAP test which
required further changes.
Pushed.
--
With Regards,
Amit Kapila.