[Patch] add new parameter to pg_replication_origin_session_setup
Hello all,
While working on our internal tools that utilise replication, we
realised that a new parameter was added to the internal C function
corresponding to pg_replication_origin_session_setup.
However this parameter wasn't included in the user-facing API [1]https://www.postgresql.org/docs/current/functions-admin.html#PG-REPLICATION-ORIGIN-SESSION-SETUP ---.
In 'src/backend/replication/logical/origin.c' at line 1359,
pg_replication_origin_session_setup function calls
replorigin_session_setup(origin, 0);
where currently 0 is assigned to the acquired_by parameter of the
replorigin_session_setup.
I made this patch to the master which adds a way to control this
parameter by adding a new version of the
pg_replication_origin_session_setup function with user facing
parameters 'text int4' in place of the current 'text' while keeping
the existing variant
(ensuring backwards compatibility). Could someone take a look at it?
[1]: https://www.postgresql.org/docs/current/functions-admin.html#PG-REPLICATION-ORIGIN-SESSION-SETUP ---
---
Thanks for the help,
Doruk Yılmaz
Attachments:
0001-pg_replication_origin_session_setup-new-parameter.patchtext/x-patch; charset=US-ASCII; name=0001-pg_replication_origin_session_setup-new-parameter.patchDownload
From e78865ea41444999f1c8879be0f64928f21e43c6 Mon Sep 17 00:00:00 2001
From: Doruk <doruk@mixrank.com>
Date: Mon, 12 Aug 2024 21:21:10 +0300
Subject: [PATCH] pg_replication_origin_session_setup new parameter
---
src/backend/catalog/system_functions.sql | 2 ++
src/backend/replication/logical/origin.c | 26 +++++++++++++++++++++++-
src/include/catalog/pg_proc.dat | 6 ++++++
3 files changed, 33 insertions(+), 1 deletion(-)
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 623b9539b1..7dca2350dc 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -738,6 +738,8 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
+
REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_setup(pg_lsn, timestamp with time zone) FROM public;
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 419e4814f0..208eed9e37 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1347,7 +1347,7 @@ pg_replication_origin_oid(PG_FUNCTION_ARGS)
* Setup a replication origin for this session.
*/
Datum
-pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
+pg_replication_origin_session_setup_nopid(PG_FUNCTION_ARGS)
{
char *name;
RepOriginId origin;
@@ -1364,6 +1364,30 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+Datum
+pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
+{
+ char *name;
+ RepOriginId origin;
+ int pid;
+
+ replorigin_check_prerequisites(true, false);
+
+ name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+ origin = replorigin_by_name(name, false);
+ if (PG_ARGISNULL(1)){
+ replorigin_session_setup(origin, 0);
+ } else {
+ pid = PG_GETARG_INT32(1);
+ replorigin_session_setup(origin, pid);
+ }
+
+ replorigin_session_origin = origin;
+
+ pfree(name);
+
+ PG_RETURN_VOID();
+}
/*
* Reset previously setup origin in this session
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4abc6d9526..c68a0f895f 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11949,6 +11949,12 @@
descr => 'configure session to maintain replication progress tracking for the passed in origin',
proname => 'pg_replication_origin_session_setup', provolatile => 'v',
proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+ prosrc => 'pg_replication_origin_session_setup_nopid' },
+
+{ oid => '6015',
+ descr => 'configure session to maintain replication progress tracking for the passed in origin',
+ proname => 'pg_replication_origin_session_setup', provolatile => 'v',
+ proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
prosrc => 'pg_replication_origin_session_setup' },
{ oid => '6007', descr => 'teardown configured replication progress tracking',
--
2.39.2
On Mon, Aug 12, 2024, at 3:43 PM, Doruk Yilmaz wrote:
Hello all,
Hi!
While working on our internal tools that utilise replication, we
realised that a new parameter was added to the internal C function
corresponding to pg_replication_origin_session_setup.
However this parameter wasn't included in the user-facing API [1].
I'm curious about your use case. Is it just because the internal function has a
different signature or your tool is capable of apply logical replication changes
in parallel using the SQL API?
I made this patch to the master which adds a way to control this
parameter by adding a new version of the
pg_replication_origin_session_setup function with user facing
parameters 'text int4' in place of the current 'text' while keeping
the existing variant
(ensuring backwards compatibility). Could someone take a look at it?
I did a quick look at your patch and have a few suggestions.
* no documentation changes. Since the function you are changing has a new
signature, this change should be reflected in the documentation.
* no need for a new internal function. The second parameter (PID) can be
optional and defaults to 0 in this case. See how we changed the
pg_create_logical_replication_slot along the years add some IN parameters like
twophase and failover in the recent versions.
* add a CF entry [1]https://commitfest.postgresql.org/49/ for this patch so we don't forget it. Another advantage is
that this patch is covered by CI [2]https://wiki.postgresql.org/wiki/Cfbot[3]http://cfbot.cputube.org/index.html.
[1]: https://commitfest.postgresql.org/49/
[2]: https://wiki.postgresql.org/wiki/Cfbot
[3]: http://cfbot.cputube.org/index.html
--
Euler Taveira
EDB https://www.enterprisedb.com/
Hello again,
On Tue, Aug 13, 2024 at 12:48 AM Euler Taveira <euler@eulerto.com> wrote:
I'm curious about your use case. Is it just because the internal function has a
different signature or your tool is capable of apply logical replication changes
in parallel using the SQL API?
The latter is correct, it applies logical replication changes in parallel.
Since multiple connections may commit, we need all of them to be able
to advance the replication origin.
* no documentation changes. Since the function you are changing has a new
signature, this change should be reflected in the documentation.
* no need for a new internal function. The second parameter (PID) can be
optional and defaults to 0 in this case. See how we changed the
pg_create_logical_replication_slot along the years add some IN parameters like
twophase and failover in the recent versions.
I updated/rewrote the patch to reflect these suggestions.
It now has the same DEFAULT 0 style used in pg_create_logical_replication_slot.
I also updated the documentation.
* add a CF entry [1] for this patch so we don't forget it. Another advantage is
that this patch is covered by CI [2][3].
Sadly I still can't log in to the Commitfest due to the cool-off
period. I will create an entry as soon as this period ends.
Thanks for all the feedback,
Doruk Yılmaz
Attachments:
v2-0001-pg_replication_origin_session_setup-new-parameter.patchtext/x-patch; charset=US-ASCII; name=v2-0001-pg_replication_origin_session_setup-new-parameter.patchDownload
From b9c54f3d217f67c24ce74ffa7c1f2812d784333e Mon Sep 17 00:00:00 2001
From: Doruk <doruk@mixrank.com>
Date: Thu, 15 Aug 2024 23:34:26 +0300
Subject: [PATCH] add new parameter to pg_replication_origin_session_setup
---
doc/src/sgml/func.sgml | 2 +-
src/backend/catalog/system_functions.sql | 9 ++++++++-
src/backend/replication/logical/origin.c | 8 +++++---
src/include/catalog/pg_proc.dat | 2 +-
4 files changed, 15 insertions(+), 6 deletions(-)
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 5dd95d73a1..7db5a8ed52 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29486,7 +29486,7 @@ DETAIL: Make sure pg_wal_replay_wait() isn't called within a transaction with a
<indexterm>
<primary>pg_replication_origin_session_setup</primary>
</indexterm>
- <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+ <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type>, <parameter>acquired_by</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal> )
<returnvalue>void</returnvalue>
</para>
<para>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 623b9539b1..4aae06e06d 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -639,6 +639,13 @@ LANGUAGE INTERNAL
CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
AS 'pg_stat_reset_slru';
+CREATE OR REPLACE FUNCTION
+ pg_replication_origin_session_setup(node_name text, acquired_by integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_replication_origin_session_setup';
+
--
-- The default permissions for functions mean that anyone can execute them.
-- A number of functions shouldn't be executable by just anyone, but rather
@@ -736,7 +743,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text,integer) FROM public;
REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 419e4814f0..e50bcc8466 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1351,13 +1351,15 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
{
char *name;
RepOriginId origin;
-
+ int pid;
+
replorigin_check_prerequisites(true, false);
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
origin = replorigin_by_name(name, false);
- replorigin_session_setup(origin, 0);
-
+ pid = PG_GETARG_INT32(1);
+ replorigin_session_setup(origin, pid);
+
replorigin_session_origin = origin;
pfree(name);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4abc6d9526..a490d4fc6e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11948,7 +11948,7 @@
{ oid => '6006',
descr => 'configure session to maintain replication progress tracking for the passed in origin',
proname => 'pg_replication_origin_session_setup', provolatile => 'v',
- proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+ proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
prosrc => 'pg_replication_origin_session_setup' },
{ oid => '6007', descr => 'teardown configured replication progress tracking',
--
2.39.2
On Thu, Aug 15, 2024, at 5:53 PM, Doruk Yilmaz wrote:
Hello again,
On Tue, Aug 13, 2024 at 12:48 AM Euler Taveira <euler@eulerto.com> wrote:
I'm curious about your use case. Is it just because the internal function has a
different signature or your tool is capable of apply logical replication changes
in parallel using the SQL API?The latter is correct, it applies logical replication changes in parallel.
Since multiple connections may commit, we need all of them to be able
to advance the replication origin.* no documentation changes. Since the function you are changing has a new
signature, this change should be reflected in the documentation.
* no need for a new internal function. The second parameter (PID) can be
optional and defaults to 0 in this case. See how we changed the
pg_create_logical_replication_slot along the years add some IN parameters like
twophase and failover in the recent versions.I updated/rewrote the patch to reflect these suggestions.
It now has the same DEFAULT 0 style used in pg_create_logical_replication_slot.
I also updated the documentation.
[after a long hiatus...]
I tested your patch again and it does what is advertised. I changed your patch
a bit. The main change was the documentation. You didn't explain what this new
parameter is for. I tried to explain but don't want to add lots of details.
(There is a section that explain how parallel apply processes work behind the
scenes.) I also renamed it from acquired_by to pid to be more descriptive. I
fixed some white space issues too. I noticed that there are no tests. This
doesn't appear to be a shortcoming from this patch but we need to cover some of
these replication functions with an additional test file in another patch.
Finally, I wrote a commit message and it is RfC.
session 1:
postgres=# select * from pg_replication_origin;
roident | roname
---------+--------
(0 rows)
postgres=# SELECT pg_backend_pid();
pg_backend_pid
----------------
260732
(1 row)
postgres=# SELECT pg_replication_origin_create('test');
pg_replication_origin_create
------------------------------
1
(1 row)
postgres=# SELECT pg_replication_origin_session_setup('test', 0);
pg_replication_origin_session_setup
-------------------------------------
(1 row)
postgres=# select * from pg_replication_origin;
roident | roname
---------+--------
1 | test
(1 row)
session 2:
postgres=# SELECT pg_replication_origin_session_setup('test', 260732);
pg_replication_origin_session_setup
-------------------------------------
(1 row)
session 3:
postgres=# SELECT pg_replication_origin_session_setup('test', 12345);
ERROR: could not find replication state slot for replication origin with OID 1 which was acquired by 12345
--
Euler Taveira
EDB https://www.enterprisedb.com/
Attachments:
v3-0001-pg_replication_origin_session_setup-pid-parameter.patchtext/x-patch; name="=?UTF-8?Q?v3-0001-pg=5Freplication=5Forigin=5Fsession=5Fsetup-pid-parame?= =?UTF-8?Q?ter.patch?="Download
From 74a74fd02bce786093c19a23bef9444d0b8ef41d Mon Sep 17 00:00:00 2001
From: Doruk <doruk@mixrank.com>
Date: Thu, 15 Aug 2024 23:34:26 +0300
Subject: [PATCH v3] pg_replication_origin_session_setup: pid parameter
Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
doc/src/sgml/func.sgml | 8 +++++++-
src/backend/catalog/system_functions.sql | 9 ++++++++-
src/backend/replication/logical/origin.c | 4 +++-
src/include/catalog/pg_proc.dat | 2 +-
4 files changed, 19 insertions(+), 4 deletions(-)
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 47370e581a..e50e689fb6 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29475,7 +29475,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
<indexterm>
<primary>pg_replication_origin_session_setup</primary>
</indexterm>
- <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+ <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
<returnvalue>void</returnvalue>
</para>
<para>
@@ -29483,6 +29483,12 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
origin, allowing replay progress to be tracked.
Can only be used if no origin is currently selected.
Use <function>pg_replication_origin_session_reset</function> to undo.
+ If multiple processes can safely use the same replication origin (for
+ example, parallel apply processes), the optional <parameter>pid</parameter>
+ parameter can be used to specify the process ID of the first process.
+ The first process must provide <parameter>pid</parameter> equals to
+ <literal>0</literal> and the other processes that share the same
+ replication origin should provide the process ID of the first process.
</para></entry>
</row>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 591157b1d1..26151e0f1c 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -668,6 +668,13 @@ LANGUAGE INTERNAL
CALLED ON NULL INPUT VOLATILE
AS 'pg_set_attribute_stats';
+CREATE OR REPLACE FUNCTION
+ pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_replication_origin_session_setup';
+
--
-- The default permissions for functions mean that anyone can execute them.
-- A number of functions shouldn't be executable by just anyone, but rather
@@ -769,7 +776,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 1b586cb1cf..9cbe1eec45 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1355,12 +1355,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
{
char *name;
RepOriginId origin;
+ int pid;
replorigin_check_prerequisites(true, false);
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
origin = replorigin_by_name(name, false);
- replorigin_session_setup(origin, 0);
+ pid = PG_GETARG_INT32(1);
+ replorigin_session_setup(origin, pid);
replorigin_session_origin = origin;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index b37e8a6f88..ea118a0563 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12063,7 +12063,7 @@
{ oid => '6006',
descr => 'configure session to maintain replication progress tracking for the passed in origin',
proname => 'pg_replication_origin_session_setup', provolatile => 'v',
- proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+ proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
prosrc => 'pg_replication_origin_session_setup' },
{ oid => '6007', descr => 'teardown configured replication progress tracking',
--
2.39.5
I noticed that the patch needs rebasing, so here is the rebased version.
Hopefully it makes to the commitfest.
Doruk Yılmaz
Attachments:
v4-0001-pg_replication_origin_session_setup-pid-parameter.patchapplication/x-patch; name=v4-0001-pg_replication_origin_session_setup-pid-parameter.patchDownload
From 74a74fd02bce786093c19a23bef9444d0b8ef41d Mon Sep 17 00:00:00 2001
From: Doruk <doruk@mixrank.com>
Date: Thu, 15 Aug 2024 23:34:26 +0300
Subject: [PATCH v4] pg_replication_origin_session_setup: pid parameter
Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
doc/src/sgml/func.sgml | 8 +++++++-
src/backend/catalog/system_functions.sql | 9 ++++++++-
src/backend/replication/logical/origin.c | 4 +++-
src/include/catalog/pg_proc.dat | 2 +-
4 files changed, 19 insertions(+), 4 deletions(-)
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 47370e581a..e50e689fb6 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29475,7 +29475,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
<indexterm>
<primary>pg_replication_origin_session_setup</primary>
</indexterm>
- <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+ <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
<returnvalue>void</returnvalue>
</para>
<para>
@@ -29483,6 +29483,12 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
origin, allowing replay progress to be tracked.
Can only be used if no origin is currently selected.
Use <function>pg_replication_origin_session_reset</function> to undo.
+ If multiple processes can safely use the same replication origin (for
+ example, parallel apply processes), the optional <parameter>pid</parameter>
+ parameter can be used to specify the process ID of the first process.
+ The first process must provide <parameter>pid</parameter> equals to
+ <literal>0</literal> and the other processes that share the same
+ replication origin should provide the process ID of the first process.
</para></entry>
</row>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 86888cd..ebc4005 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -636,6 +636,13 @@ LANGUAGE INTERNAL
CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
AS 'pg_stat_reset_slru';
+CREATE OR REPLACE FUNCTION
+ pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_replication_origin_session_setup';
+
--
-- The default permissions for functions mean that anyone can execute them.
-- A number of functions shouldn't be executable by just anyone, but rather
@@ -737,7 +744,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 1b586cb1cf..9cbe1eec45 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1355,12 +1355,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
{
char *name;
RepOriginId origin;
+ int pid;
replorigin_check_prerequisites(true, false);
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
origin = replorigin_by_name(name, false);
- replorigin_session_setup(origin, 0);
+ pid = PG_GETARG_INT32(1);
+ replorigin_session_setup(origin, pid);
replorigin_session_origin = origin;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index b37e8a6f88..ea118a0563 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12063,7 +12063,7 @@
{ oid => '6006',
descr => 'configure session to maintain replication progress tracking for the passed in origin',
proname => 'pg_replication_origin_session_setup', provolatile => 'v',
- proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+ proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
prosrc => 'pg_replication_origin_session_setup' },
{ oid => '6007', descr => 'teardown configured replication progress tracking',
--
2.39.5
Import Notes
Reply to msg id not found: CAMPB6wcJio9fAvUFvwEZAvuADfhryX1tfAg06mR722EUQB9g@mail.gmail.com
On Thu, Jan 9, 2025 at 3:26 AM Euler Taveira <euler@eulerto.com> wrote:
On Thu, Aug 15, 2024, at 5:53 PM, Doruk Yilmaz wrote:
Hello again,
On Tue, Aug 13, 2024 at 12:48 AM Euler Taveira <euler@eulerto.com> wrote:
I'm curious about your use case. Is it just because the internal function has a
different signature or your tool is capable of apply logical replication changes
in parallel using the SQL API?The latter is correct, it applies logical replication changes in parallel.
Since multiple connections may commit, we need all of them to be able
to advance the replication origin.
To use replication_origin by multiple processes, one must maintain the
commit order as we do internally by allowing the leader process to
wait for the parallel worker to finish the commit. See comments atop
replorigin_session_setup(). Now, we could expose the pid parameter as
proposed by the patch after documenting the additional requirements,
but I am afraid that users may directly start using the API without
following the commit order principle, which can lead to incorrect
replication. So, isn't it better to do something to avoid the misuse
of this feature before exposing it?
--
With Regards,
Amit Kapila.
On Mon, Mar 3, 2025 at 6:39 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
To use replication_origin by multiple processes, one must maintain the
commit order as we do internally by allowing the leader process to
wait for the parallel worker to finish the commit. See comments atop
replorigin_session_setup(). Now, we could expose the pid parameter as
proposed by the patch after documenting the additional requirements,
but I am afraid that users may directly start using the API without
following the commit order principle, which can lead to incorrect
replication. So, isn't it better to do something to avoid the misuse
of this feature before exposing it?
Wouldn't mentioning/describing needing to follow the commit order
principle on the documentation be enough for this?
It is quite an advanced feature that I don't believe person intending
to use it won't start with reading documentation first.
Is there any updates on the commit? I see that intended commitfest window ended.
Thanks,
Doruk Yılmaz
On Tue, Jul 29, 2025 at 2:43 AM Doruk Yilmaz <doruk@mixrank.com> wrote:
On Mon, Mar 3, 2025 at 6:39 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
To use replication_origin by multiple processes, one must maintain the
commit order as we do internally by allowing the leader process to
wait for the parallel worker to finish the commit. See comments atop
replorigin_session_setup(). Now, we could expose the pid parameter as
proposed by the patch after documenting the additional requirements,
but I am afraid that users may directly start using the API without
following the commit order principle, which can lead to incorrect
replication. So, isn't it better to do something to avoid the misuse
of this feature before exposing it?Wouldn't mentioning/describing needing to follow the commit order
principle on the documentation be enough for this?
It is quite an advanced feature that I don't believe person intending
to use it won't start with reading documentation first.
That is true but I still feel there has to be some mechanism where we
can catch and give an ERROR to the user, if it doesn't follow the
same. For example, pg_replication_origin_advance() always allows going
backwards in terms of LSN which means if one doesn't follow commit
order, it can lead to breaking the replication as after restart the
client can ask to start replication from some prior point.
Is there any updates on the commit?
I think we are still under discussion about the requirements and
design for this API. Can you tell us the use case? Did you also intend
to use it for parallel apply, if so, can you also tell at a high
level, how you are planning to manage origin? It will help us to
extend the API(s) in a meaningful way.
--
With Regards,
Amit Kapila.
On Mon, Jul 29, 2025 at 8:13 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
That is true but I still feel there has to be some mechanism where we
can catch and give an ERROR to the user, if it doesn't follow the
same. For example, pg_replication_origin_advance() always allows going
backwards in terms of LSN which means if one doesn't follow commit
order, it can lead to breaking the replication as after restart the
client can ask to start replication from some prior point.
If you have any ideas for safeguards or API changes, I'd be happy to
help implement them or discuss them.
Can you tell us the use case? Did you also intend to use it for parallel apply, if so, can you also tell at a high
level, how you are planning to manage origin?
Yes, we use it for parallel apply. We have a custom logical
replication system that applies changes using multiple worker
processes, each with their own database connection.
Our use case requires multiple connections to be able to advance the
same replication origin. We handle this by having a master process
coordinate the workers, where each worker process calls
pg_replication_origin_session_setup with the master's PID as the
second parameter.
We may send operations out of order but we always commit in order, so
there's no chance of creating inconsistencies. There's the chance of
deadlocks, but these can be detected. It's really similar to the
existing parallel apply implementation - the main difference is that
we're applying from jsonl files instead of directly from another
database.
Currently we use a local patch to expose the PID parameter, but having
this upstream would be great. It causes a lot of headaches for us to
use a patched PostgreSQL.
Thanks,
Doruk Yılmaz
On Wed, Jul 30, 2025 at 12:00 AM Doruk Yilmaz <doruk@mixrank.com> wrote:
On Mon, Jul 29, 2025 at 8:13 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
That is true but I still feel there has to be some mechanism where we
can catch and give an ERROR to the user, if it doesn't follow the
same. For example, pg_replication_origin_advance() always allows going
backwards in terms of LSN which means if one doesn't follow commit
order, it can lead to breaking the replication as after restart the
client can ask to start replication from some prior point.If you have any ideas for safeguards or API changes, I'd be happy to
help implement them or discuss them.Can you tell us the use case? Did you also intend to use it for parallel apply, if so, can you also tell at a high
level, how you are planning to manage origin?Yes, we use it for parallel apply. We have a custom logical
replication system that applies changes using multiple worker
processes, each with their own database connection.
Our use case requires multiple connections to be able to advance the
same replication origin.
How do you advance the origin? Did you use
pg_replication_origin_advance()? If so, you should be aware that it
can be used for initial setup; see comment in that API code: "Can't
sensibly pass a local commit to be flushed at checkpoint - this xact
hasn't committed yet. This is why this function should be used to set
up the initial replication state, but not for replay." I wonder if you
are using pg_replication_origin_advance(), won't its current
implementation has the potential to cause a problem for your usecase?
I think the problem it can cause is it may miss a transaction to apply
after restart because we can use remote_lsn without a corresponding
transaction (local_lsn) flushed on the subscriber. This can happen
because ideally we want the transaction that is not successfully
flushed to be replayed after restart.
In general, I was thinking of adding a restriction
pg_replication_origin_advance() such that it gives an ERROR when a
user tries to move remote_lsn backward unless requested explicitly.
It would be good to know the opinion of others involved in the
original change of maintaining commit order for parallel apply of
large transactions.
--
With Regards,
Amit Kapila.
On Mon, Aug 11, 2025 at 9:44 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
How do you advance the origin? Did you use > pg_replication_origin_advance()? If so, you should be aware that it
can be used for initial setup; see comment in that API code...
No, we don't use pg_replication_origin_advance(). We use
pg_replication_origin_xact_setup() instead as I mentioned before.
Each worker does the following:
1. Sets up its own replication-origin session with
pg_replication_origin_session_setup() (using the master process PID).
2. Applies changes inside transactions.
3. Right before commit, calls pg_replication_origin_xact_setup(lsn,
commit_timestamp).
4. Commits only if everything succeeded, so the origin only advances
on a real commit.
That way, the origin LSN moves forward only when the transaction is
actually committed. If something fails or the process crashes, the
origin stays at the last successful commit, and on restart we replay
from the correct spot. It's safer than advancing the origin without
knowing the transaction made it to disk.
So the issue you described is not relevant for our implementation.
On Mon, Aug 11, 2025 at 10:41 PM Doruk Yilmaz <doruk@mixrank.com> wrote:
On Mon, Aug 11, 2025 at 9:44 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
How do you advance the origin? Did you use > pg_replication_origin_advance()? If so, you should be aware that it
can be used for initial setup; see comment in that API code...No, we don't use pg_replication_origin_advance(). We use
pg_replication_origin_xact_setup() instead as I mentioned before.Each worker does the following:
1. Sets up its own replication-origin session with
pg_replication_origin_session_setup() (using the master process PID).
2. Applies changes inside transactions.
3. Right before commit, calls pg_replication_origin_xact_setup(lsn,
commit_timestamp).
4. Commits only if everything succeeded, so the origin only advances
on a real commit.That way, the origin LSN moves forward only when the transaction is
actually committed. If something fails or the process crashes, the
origin stays at the last successful commit, and on restart we replay
from the correct spot. It's safer than advancing the origin without
knowing the transaction made it to disk.
Your use looks good to me. So, maybe we can update the docs with the
dangers if the users of API doesn't follow commit order then it may
lead to data inconsistency should be sufficient. Additionally, we may
want to give an example as to how to use this API for parallel apply.
Thoughts?
--
With Regards,
Amit Kapila.
Your use looks good to me. So, maybe we can update the docs with the
dangers if the users of API doesn't follow commit order then it may
lead to data inconsistency should be sufficient. Additionally, we may
want to give an example as to how to use this API for parallel apply.
That sounds reasonable. I’ve updated the patch and added more
information to the documentation covering the topics you mentioned.
I also added a Caution block so potential users won’t miss it. I hope
this patch meets your expectations.
Attachments:
v5-0001-pg_replication_origin_session_setup-pid-parameter.patchtext/x-patch; charset=US-ASCII; name=v5-0001-pg_replication_origin_session_setup-pid-parameter.patchDownload
From 74a74fd02bce786093c19a23bef9444d0b8ef41d Mon Sep 17 00:00:00 2001
From: Doruk <doruk@mixrank.com>
Date: Fri, 15 Aug 2025 21:37:18 +0300
Subject: [PATCH v5] pg_replication_origin_session_setup: pid parameter
Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
doc/src/sgml/func/func-admin.sgml | 22 ++++++++++++++++++++--
src/backend/catalog/system_functions.sql | 9 ++++++++-
src/backend/replication/logical/origin.c | 4 +++-
src/include/catalog/pg_proc.dat | 2 +-
4 files changed, 32 insertions(+), 5 deletions(-)
diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 446fdfe..4b86676 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
<indexterm>
<primary>pg_replication_origin_session_setup</primary>
</indexterm>
- <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+ <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
<returnvalue>void</returnvalue>
</para>
<para>
@@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
origin, allowing replay progress to be tracked.
Can only be used if no origin is currently selected.
Use <function>pg_replication_origin_session_reset</function> to undo.
- </para></entry>
+ If multiple processes can safely use the same replication origin (for
+ example, parallel apply processes), the optional <parameter>pid</parameter>
+ parameter can be used to specify the process ID of the first process.
+ The first process must provide <parameter>pid</parameter> equals to
+ <literal>0</literal> and the other processes that share the same
+ replication origin should provide the process ID of the first process.
+ </para>
+ <caution>
+ <para>
+ When multiple processes share the same replication origin, it is critical
+ to maintain commit order to prevent data inconsistency. While processes
+ may send operations out of order, they must commit transactions in the
+ correct sequence to ensure proper replication consistency. The recommended workflow
+ for each worker is: set up the replication origin session with the first process's PID,
+ apply changes within transactions, call <function>pg_replication_origin_xact_setup</function>
+ with the LSN and commit timestamp before committing, then commit the
+ transaction only if everything succeeded.
+ </para>
+ </caution>
+ </entry>
</row>
<row>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308..f60287d 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -650,6 +650,13 @@ LANGUAGE INTERNAL
CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
AS 'pg_stat_reset_slru';
+CREATE OR REPLACE FUNCTION
+ pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_replication_origin_session_setup';
+
--
-- The default permissions for functions mean that anyone can execute them.
-- A number of functions shouldn't be executable by just anyone, but rather
@@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e5..98d47e1 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1374,12 +1374,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
{
char *name;
RepOriginId origin;
+ int pid;
replorigin_check_prerequisites(true, false);
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
origin = replorigin_by_name(name, false);
- replorigin_session_setup(origin, 0);
+ pid = PG_GETARG_INT32(1);
+ replorigin_session_setup(origin, pid);
replorigin_session_origin = origin;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 118d6da..dd2d938 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12223,7 +12223,7 @@
{ oid => '6006',
descr => 'configure session to maintain replication progress tracking for the passed in origin',
proname => 'pg_replication_origin_session_setup', provolatile => 'v',
- proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+ proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
prosrc => 'pg_replication_origin_session_setup' },
{ oid => '6007', descr => 'teardown configured replication progress tracking',
Dear Doruk,
That sounds reasonable. I’ve updated the patch and added more
information to the documentation covering the topics you mentioned.
I also added a Caution block so potential users won’t miss it. I hope
this patch meets your expectations.
Can you explain more why we must extend the SQL interface? I read your use
case [1]/messages/by-id/CAMPB6wckvkKrXVPH5j8Ske2cVedkb-TRLdnOb5e74zYM1CynGw@mail.gmail.com, and looks like that a new type of background worker is introduced in
your system. If so, why doesn't the worker directly call C-lang interface
replorigin_session_setup()?
Personally considered, SQL functions are usable by unfamiliar users so that this
change may be dangerous. It is better if developers can use C APIs instead.
[1]: /messages/by-id/CAMPB6wckvkKrXVPH5j8Ske2cVedkb-TRLdnOb5e74zYM1CynGw@mail.gmail.com
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Dear Hayato,
Can you explain more why we must extend the SQL interface?
In our system the workers aren't background workers and we don't ship
a server-side extension; they're plain external processes (Python in
our case) talking over standard database connections. In many
deployments -especially managed Postgres- we can't load custom C code
even if we wanted to. That's why we want to expose the existing pid
knob via SQL: it lets ordinary client sessions participate in the
same, already-implemented origin coordination without maintaining a
fork or an extension.
This patch doesn't invent a new capability, it just makes the internal
behavior reachable from SQL. The new argument is optional and defaults
to the current behavior, so nothing changes for existing users. It
also keeps the feature usable from any language/runtime that
coordinates parallel apply at the application layer. And I don't
believe it is that dangerous or risky. The actual code we use in
python is not that complex that I believe a person using replication
already should be able to set it up. I don't understand why being able
to achieve parallel replication is not accessible via SQL already.
I am happy to do changes to the patch if you think there should be
more guardrails.
Thanks,
Doruk Yılmaz
Dear Doruk,
In our system the workers aren't background workers and we don't ship
a server-side extension; they're plain external processes (Python in
our case) talking over standard database connections. In many
deployments -especially managed Postgres- we can't load custom C code
even if we wanted to. That's why we want to expose the existing pid
knob via SQL: it lets ordinary client sessions participate in the
same, already-implemented origin coordination without maintaining a
fork or an extension.
So, your python process establishes two connections, for publisher (replication connection)
and subscriber (normal connection). It receives changes from the publisher,
constructs SQL statements from the received results, and sends to subscriber's
backend, is it right?
I'm not sure it is the common approach, but I see your point that you cannot
install your extensions on managed postgres.
Anyway, I still feel bit dangerous but OK if others can accept.
Regarding the patch, I want to ask one point.
```
+CREATE OR REPLACE FUNCTION
+ pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_replication_origin_session_setup';
...
{ oid => '6006',
descr => 'configure session to maintain replication progress tracking for the passed in origin',
proname => 'pg_replication_origin_session_setup', provolatile => 'v',
- proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+ proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
prosrc => 'pg_replication_origin_session_setup' },
```
Is there a rule which attribute is clarified and others are not?
For example, VOLATILE is specified on both side, STRICT is written only in the
system_functions.sql, and PARALLEL UNSAFE is set on pg_proc.dat.
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Dear Hayato,
So, your python process establishes two connections, for publisher (replication connection)
and subscriber (normal connection). It receives changes from the publisher,
constructs SQL statements from the received results, and sends to subscriber's
backend, is it right?
Actually, it's a bit simpler than that - there are no two connections.
Our program reads changes from JSONL files rather than directly from a
publisher connection.
We have multiple Python processes, each with a single database
connection to the subscriber,
reading from these files and applying changes in parallel.
Is there a rule which attribute is clarified and others are not?
For example, VOLATILE is specified on both side, STRICT is written only in the
system_functions.sql, and PARALLEL UNSAFE is set on pg_proc.dat.
In pg_proc.dat, I believe the STRICT, IMMUTABLE, and PARALLEL SAFE are
the defaults (check out pg_proc.h).
So in pg_proc.dat, the ones that are specified are the ones that
aren't defaults,
there is provolatile => 'v' (for VOLATILE) and proparallel => 'u' (for
UNSAFE), but no prostrict since it's already true by default.
In system_functions.sql, I went with being explicit about all the
attributes for clarity as it is the code declaration.
If you want, I can also make the pg_proc.dat explicit.
Thanks,
Doruk Yılmaz
On Wed, Sep 3, 2025 at 6:13 PM Doruk Yilmaz <doruk@mixrank.com> wrote:
Dear Hayato,
So, your python process establishes two connections, for publisher (replication connection)
and subscriber (normal connection). It receives changes from the publisher,
constructs SQL statements from the received results, and sends to subscriber's
backend, is it right?Actually, it's a bit simpler than that - there are no two connections.
Our program reads changes from JSONL files rather than directly from a
publisher connection.
We have multiple Python processes, each with a single database
connection to the subscriber,
reading from these files and applying changes in parallel.Is there a rule which attribute is clarified and others are not?
For example, VOLATILE is specified on both side, STRICT is written only in the
system_functions.sql, and PARALLEL UNSAFE is set on pg_proc.dat.In pg_proc.dat, I believe the STRICT, IMMUTABLE, and PARALLEL SAFE are
the defaults (check out pg_proc.h).
So in pg_proc.dat, the ones that are specified are the ones that
aren't defaults,
there is provolatile => 'v' (for VOLATILE) and proparallel => 'u' (for
UNSAFE), but no prostrict since it's already true by default.
In system_functions.sql, I went with being explicit about all the
attributes for clarity as it is the code declaration.
Then why didn't you specified PARALLEL UNSAFE as well?
BTW, yesterday a new thread started with the same requirement [1]/messages/by-id/CAE2gYzyTSNvHY1+iWUwykaLETSuAZsCWyryokjP6rG46ZvRgQA@mail.gmail.com -- With Regards, Amit Kapila.. It
uses a slightly different way to define the new function. do you have
any opinion on it?
[1]: /messages/by-id/CAE2gYzyTSNvHY1+iWUwykaLETSuAZsCWyryokjP6rG46ZvRgQA@mail.gmail.com -- With Regards, Amit Kapila.
--
With Regards,
Amit Kapila.
Then why didn't you specified PARALLEL UNSAFE as well?
You are correct, I missed marking the function as PARALLEL UNSAFE.
I’ve attached a revised patch with the correct annotation.
BTW, yesterday a new thread started with the same requirement [1]. It
uses a slightly different way to define the new function. do you have
any opinion on it?
I don’t think introducing a separate function is a good idea. It’s
effectively the same behavior, technical debt, and maintenance
overhead without a clear benefit.
Our patch keeps a single function with a default parameter, so it’s
not a breaking change. So I believe our approach is preferable.
But I would say that, the fact that another patch is proposing the
same capability indicates there’s broader demand for this change.
Attachments:
v6-0001-pg_replication_origin_session_setup-pid-parameter.patchtext/x-patch; charset=US-ASCII; name=v6-0001-pg_replication_origin_session_setup-pid-parameter.patchDownload
From 74a74fd02bce786093c19a23bef9444d0b8ef41d Mon Sep 8 00:00:00 2025
From: Doruk <doruk@mixrank.com>
Date: Mon, 8 Sep 2025 14:22:15 +0300
Subject: [PATCH v6] pg_replication_origin_session_setup: pid parameter
Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
doc/src/sgml/func/func-admin.sgml | 22 ++++++++++++++++++++--
src/backend/catalog/system_functions.sql | 9 ++++++++-
src/backend/replication/logical/origin.c | 4 +++-
src/include/catalog/pg_proc.dat | 2 +-
4 files changed, 32 insertions(+), 5 deletions(-)
diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 446fdfe..4b86676 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
<indexterm>
<primary>pg_replication_origin_session_setup</primary>
</indexterm>
- <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+ <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
<returnvalue>void</returnvalue>
</para>
<para>
@@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
origin, allowing replay progress to be tracked.
Can only be used if no origin is currently selected.
Use <function>pg_replication_origin_session_reset</function> to undo.
- </para></entry>
+ If multiple processes can safely use the same replication origin (for
+ example, parallel apply processes), the optional <parameter>pid</parameter>
+ parameter can be used to specify the process ID of the first process.
+ The first process must provide <parameter>pid</parameter> equals to
+ <literal>0</literal> and the other processes that share the same
+ replication origin should provide the process ID of the first process.
+ </para>
+ <caution>
+ <para>
+ When multiple processes share the same replication origin, it is critical
+ to maintain commit order to prevent data inconsistency. While processes
+ may send operations out of order, they must commit transactions in the
+ correct sequence to ensure proper replication consistency. The recommended workflow
+ for each worker is: set up the replication origin session with the first process's PID,
+ apply changes within transactions, call <function>pg_replication_origin_xact_setup</function>
+ with the LSN and commit timestamp before committing, then commit the
+ transaction only if everything succeeded.
+ </para>
+ </caution>
+ </entry>
</row>
<row>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308..f60287d 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -650,6 +650,13 @@ LANGUAGE INTERNAL
CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
AS 'pg_stat_reset_slru';
+CREATE OR REPLACE FUNCTION
+ pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE PARALLEL UNSAFE
+AS 'pg_replication_origin_session_setup';
+
--
-- The default permissions for functions mean that anyone can execute them.
-- A number of functions shouldn't be executable by just anyone, but rather
@@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e5..98d47e1 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1374,12 +1374,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
{
char *name;
RepOriginId origin;
+ int pid;
replorigin_check_prerequisites(true, false);
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
origin = replorigin_by_name(name, false);
- replorigin_session_setup(origin, 0);
+ pid = PG_GETARG_INT32(1);
+ replorigin_session_setup(origin, pid);
replorigin_session_origin = origin;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 118d6da..dd2d938 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12223,7 +12223,7 @@
{ oid => '6006',
descr => 'configure session to maintain replication progress tracking for the passed in origin',
proname => 'pg_replication_origin_session_setup', provolatile => 'v',
- proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+ proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
prosrc => 'pg_replication_origin_session_setup' },
{ oid => '6007', descr => 'teardown configured replication progress tracking',
Dear Doruk,
Thanks for updating the patch and sorry for being late.
The new patch looks good to me.
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Dear Hayato,
Thanks for the feedback on the patch, I'm glad the latest version looks good.
I was wondering if there is anything else I need to do on my end, or
any other process I should be aware of, for this patch to move
forward? I'm happy to make any further adjustments or provide more
information if needed.
Thanks,
Doruk Yılmaz
On Tue, Sep 16, 2025 at 10:07 AM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
Dear Doruk,
Thanks for updating the patch and sorry for being late.
The new patch looks good to me.
Can we think of writing a few tests for this newly exposed functionality?
--
With Regards,
Amit Kapila.
Dear Amit,
Can we think of writing a few tests for this newly exposed functionality?
I considered a test, please see attached files. 0001 was not changed from v6 and
0002 contained tests. Here, two sessions were opened and confirmed that they can
set the same origin.
BTW, while testing I found the existing issue of this function. Since the
session_replication_state is set before the pid check, there is a case that
session origin retains in case of failure. Here is a quick reproducer:
```
postgres=# SELECT pg_replication_origin_create('origin');
pg_replication_origin_create
------------------------------
1
(1 row)
postgres=# -- run origin_session_setup with incorrect parameter
postgres=# SELECT pg_replication_origin_session_setup('origin', -1);
ERROR: could not find replication state slot for replication origin with OID 1 which was acquired by -1
postgres=# -- run origin_session_setup again with correct parameter
postgres=# SELECT pg_replication_origin_session_setup('origin');
ERROR: cannot setup replication origin when one is already setup
```
The issue has exist since we introduces the parallel apply, but it has not been
found till now. Because parallel apply workers have not specified the invalid
pid. It can be more likely to happen so it's time to fix at the same time.
Idea for fix is that use local replication state and then at end assign it to
process-level. 0003 implemented that.
How do you feel?
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Attachments:
v7-0001-pg_replication_origin_session_setup-pid-parameter.patchapplication/octet-stream; name=v7-0001-pg_replication_origin_session_setup-pid-parameter.patchDownload
From 566b20a80856c1fe49efb63930cbb695903a8235 Mon Sep 17 00:00:00 2001
From: Doruk <doruk@mixrank.com>
Date: Mon, 8 Sep 2025 14:22:15 +0300
Subject: [PATCH v7 1/3] pg_replication_origin_session_setup: pid parameter
Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
doc/src/sgml/func/func-admin.sgml | 23 +++++++++++++++++++++--
src/backend/catalog/system_functions.sql | 9 ++++++++-
src/backend/replication/logical/origin.c | 4 +++-
src/include/catalog/pg_proc.dat | 2 +-
4 files changed, 33 insertions(+), 5 deletions(-)
diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 57ff333159f..1b465bc8ba7 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
<indexterm>
<primary>pg_replication_origin_session_setup</primary>
</indexterm>
- <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+ <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
<returnvalue>void</returnvalue>
</para>
<para>
@@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
origin, allowing replay progress to be tracked.
Can only be used if no origin is currently selected.
Use <function>pg_replication_origin_session_reset</function> to undo.
- </para></entry>
+ If multiple processes can safely use the same replication origin (for
+ example, parallel apply processes), the optional <parameter>pid</parameter>
+ parameter can be used to specify the process ID of the first process.
+ The first process must provide <parameter>pid</parameter> equals to
+ <literal>0</literal> and the other processes that share the same
+ replication origin should provide the process ID of the first process.
+ </para>
+ <caution>
+ <para>
+ When multiple processes share the same replication origin, it is critical
+ to maintain commit order to prevent data inconsistency. While processes
+ may send operations out of order, they must commit transactions in the
+ correct sequence to ensure proper replication consistency. The recommended workflow
+ for each worker is: set up the replication origin session with the first process's PID,
+ apply changes within transactions, call <function>pg_replication_origin_xact_setup</function>
+ with the LSN and commit timestamp before committing, then commit the
+ transaction only if everything succeeded.
+ </para>
+ </caution>
+ </entry>
</row>
<row>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308e443..2d946d6d9e9 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -650,6 +650,13 @@ LANGUAGE INTERNAL
CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
AS 'pg_stat_reset_slru';
+CREATE OR REPLACE FUNCTION
+ pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE PARALLEL UNSAFE
+AS 'pg_replication_origin_session_setup';
+
--
-- The default permissions for functions mean that anyone can execute them.
-- A number of functions shouldn't be executable by just anyone, but rather
@@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e50dcc..98d47e1beb8 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1374,12 +1374,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
{
char *name;
RepOriginId origin;
+ int pid;
replorigin_check_prerequisites(true, false);
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
origin = replorigin_by_name(name, false);
- replorigin_session_setup(origin, 0);
+ pid = PG_GETARG_INT32(1);
+ replorigin_session_setup(origin, pid);
replorigin_session_origin = origin;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 03e82d28c87..01eba3b5a19 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12235,7 +12235,7 @@
{ oid => '6006',
descr => 'configure session to maintain replication progress tracking for the passed in origin',
proname => 'pg_replication_origin_session_setup', provolatile => 'v',
- proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+ proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
prosrc => 'pg_replication_origin_session_setup' },
{ oid => '6007', descr => 'teardown configured replication progress tracking',
--
2.47.3
v7-0002-add-test.patchapplication/octet-stream; name=v7-0002-add-test.patchDownload
From 3a5018870916f0be07797127c628e152b3c920bd Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 17 Sep 2025 15:20:45 +0900
Subject: [PATCH v7 2/3] add test
---
contrib/test_decoding/meson.build | 1 +
contrib/test_decoding/t/002_repl_origin.pl | 73 ++++++++++++++++++++++
2 files changed, 74 insertions(+)
create mode 100644 contrib/test_decoding/t/002_repl_origin.pl
diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build
index 25f6b8a9082..e00e03ba08d 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -74,6 +74,7 @@ tests += {
'tap': {
'tests': [
't/001_repl_stats.pl',
+ 't/002_repl_origin.pl',
],
},
}
diff --git a/contrib/test_decoding/t/002_repl_origin.pl b/contrib/test_decoding/t/002_repl_origin.pl
new file mode 100644
index 00000000000..e1aa57b0995
--- /dev/null
+++ b/contrib/test_decoding/t/002_repl_origin.pl
@@ -0,0 +1,73 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test session replication origin setup, especially by the multiple sessions
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Test set-up
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init(allows_streaming => 'logical');
+$node->append_conf('postgresql.conf', 'autovacuum = off');
+$node->start;
+
+# Create a replication origin
+$node->safe_psql('postgres',
+ "SELECT pg_replication_origin_create('origin');");
+
+# Bump the query timeout to avoid false negatives on slow test systems.
+my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default;
+
+# Start a background session
+my $session1 = $node->background_psql(
+ 'postgres',
+ on_error_stop => 0,
+ timeout => $psql_timeout_secs);
+
+# Setup the replication origin to the session
+my $pid = $session1->query_safe(
+ qq(SELECT pg_replication_origin_session_setup('origin');
+ SELECT pg_backend_pid();));
+
+is( $session1->query_safe(
+ qq(SELECT pg_replication_origin_session_is_setup();)),
+ 't',
+ "A replication origin is assigned to the session");
+
+# Start another session
+my $session2 = $node->background_psql(
+ 'postgres',
+ on_error_stop => 0,
+ timeout => $psql_timeout_secs);
+
+# Attach to the same replication origin
+$session2->query_safe(
+ qq(SELECT pg_replication_origin_session_setup('origin', $pid);));
+
+is( $session2->query_safe(
+ qq(SELECT pg_replication_origin_session_is_setup();)),
+ 't',
+ "Replication origin can accept multiple assignment");
+
+# Emit a transactional message to update the local_lsn and store the current
+# value.
+$session1->query_safe(
+ qq(SELECT pg_logical_emit_message(true, 'prefix', 'message on session1');)
+);
+my $old_lsn = $session1->query_safe(
+ qq(SELECT local_lsn FROM pg_replication_origin_status;));
+
+# Emit a transactional message from another session
+$session2->query_safe(
+ qq(SELECT pg_logical_emit_message(true, 'prefix', 'message on session2');)
+);
+
+# Confirm local_lsn can be updated by concurrent processes
+my $new_lsn = $session1->query_safe(
+ qq(SELECT local_lsn FROM pg_replication_origin_status;));
+is($session1->query_safe(qq(SELECT '$old_lsn' < '$new_lsn')),
+ 't', "Replication origin can be advanced by both sessions");
+
+done_testing();
--
2.47.3
v7-0003-Avoid-setting-ReplicationState-in-case-of-ERROR.patchapplication/octet-stream; name=v7-0003-Avoid-setting-ReplicationState-in-case-of-ERROR.patchDownload
From aef02a788bacc87400872a6a8d42d9aeb0301001 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 17 Sep 2025 18:15:33 +0900
Subject: [PATCH v7 3/3] Avoid setting ReplicationState in case of ERROR
---
contrib/test_decoding/expected/replorigin.out | 3 ++
contrib/test_decoding/sql/replorigin.sql | 3 ++
src/backend/replication/logical/origin.c | 31 +++++++++++++------
3 files changed, 27 insertions(+), 10 deletions(-)
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c85e1a01b23..4f64ea8942f 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -41,6 +41,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index"
DETAIL: Key (roname)=(regress_test_decoding: regression_slot) already exists.
+-- ensure session setup with invalid pid fail
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+ERROR: could not find replication state slot for replication origin with OID 1 which was acquired by -1
--ensure deletions work (once)
SELECT pg_replication_origin_create('regress_test_decoding: temp');
pg_replication_origin_create
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e71ee02d050..d899d5cdc18 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -26,6 +26,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
-- ensure duplicate creations fail
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
+-- ensure session setup with invalid pid fail
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+
--ensure deletions work (once)
SELECT pg_replication_origin_create('regress_test_decoding: temp');
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 98d47e1beb8..0bbc96bcee5 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1122,6 +1122,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
static bool registered_cleanup;
int i;
int free_slot = -1;
+ ReplicationState *candidate_state = NULL;
+ bool initialized = false;
if (!registered_cleanup)
{
@@ -1168,34 +1170,43 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
}
/* ok, found slot */
- session_replication_state = curstate;
+ candidate_state = curstate;
break;
}
- if (session_replication_state == NULL && free_slot == -1)
+ if (candidate_state == NULL && free_slot == -1)
ereport(ERROR,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("could not find free replication state slot for replication origin with ID %d",
node),
errhint("Increase \"max_active_replication_origins\" and try again.")));
- else if (session_replication_state == NULL)
+ else if (candidate_state == NULL)
{
/* initialize new slot */
- session_replication_state = &replication_states[free_slot];
- Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
- Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
- session_replication_state->roident = node;
+ candidate_state = &replication_states[free_slot];
+ Assert(candidate_state->remote_lsn == InvalidXLogRecPtr);
+ Assert(candidate_state->local_lsn == InvalidXLogRecPtr);
+ candidate_state->roident = node;
+ initialized = true;
}
- Assert(session_replication_state->roident != InvalidRepOriginId);
+ Assert(candidate_state->roident != InvalidRepOriginId);
if (acquired_by == 0)
- session_replication_state->acquired_by = MyProcPid;
- else if (session_replication_state->acquired_by != acquired_by)
+ candidate_state->acquired_by = MyProcPid;
+ else if (candidate_state->acquired_by != acquired_by)
+ {
+ if (initialized)
+ candidate_state->roident = InvalidRepOriginId;
+
elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
node, acquired_by);
+ }
+
+ /* Candidate slot looks ok, use it */
+ session_replication_state = candidate_state;
LWLockRelease(ReplicationOriginLock);
--
2.47.3
Dear hackers,
I considered a test, please see attached files. 0001 was not changed from v6 and
0002 contained tests. Here, two sessions were opened and confirmed that they
can
set the same origin.
After considering and verifying more, it is more efficient to test via isolation
tester. Attached patchset does like that.
On my env, the duration became 10x faster because it does not start the instance
within the test.
In the test file, two sessions s0 and s1 are launched, they set the same session
origin. They insert local_lsn to a table and confirm latter insertion has larger
value.
One hacky point is to obtain pid for s0 from s1. Below contains my analysis.
application_name is controlled by the isolation_main.c and isolationtester.c.
When the isolation test works, initially isolation_main starts and launches
isolaiontester process, one per spec file.
In main.c, the application_name is set to "isolation/${testname}" at the starter
function. Then, after isolationtester parses the spec file, it appends given
name to each session. This is done at line 193.
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Attachments:
v8-0001-pg_replication_origin_session_setup-pid-parameter.patchapplication/octet-stream; name=v8-0001-pg_replication_origin_session_setup-pid-parameter.patchDownload
From 6315206a177be337669883376218cef362d8412b Mon Sep 17 00:00:00 2001
From: Doruk <doruk@mixrank.com>
Date: Mon, 8 Sep 2025 14:22:15 +0300
Subject: [PATCH v8 1/3] pg_replication_origin_session_setup: pid parameter
Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
doc/src/sgml/func/func-admin.sgml | 23 +++++++++++++++++++++--
src/backend/catalog/system_functions.sql | 9 ++++++++-
src/backend/replication/logical/origin.c | 4 +++-
src/include/catalog/pg_proc.dat | 2 +-
4 files changed, 33 insertions(+), 5 deletions(-)
diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 57ff333159f..1b465bc8ba7 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
<indexterm>
<primary>pg_replication_origin_session_setup</primary>
</indexterm>
- <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+ <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
<returnvalue>void</returnvalue>
</para>
<para>
@@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
origin, allowing replay progress to be tracked.
Can only be used if no origin is currently selected.
Use <function>pg_replication_origin_session_reset</function> to undo.
- </para></entry>
+ If multiple processes can safely use the same replication origin (for
+ example, parallel apply processes), the optional <parameter>pid</parameter>
+ parameter can be used to specify the process ID of the first process.
+ The first process must provide <parameter>pid</parameter> equals to
+ <literal>0</literal> and the other processes that share the same
+ replication origin should provide the process ID of the first process.
+ </para>
+ <caution>
+ <para>
+ When multiple processes share the same replication origin, it is critical
+ to maintain commit order to prevent data inconsistency. While processes
+ may send operations out of order, they must commit transactions in the
+ correct sequence to ensure proper replication consistency. The recommended workflow
+ for each worker is: set up the replication origin session with the first process's PID,
+ apply changes within transactions, call <function>pg_replication_origin_xact_setup</function>
+ with the LSN and commit timestamp before committing, then commit the
+ transaction only if everything succeeded.
+ </para>
+ </caution>
+ </entry>
</row>
<row>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308e443..2d946d6d9e9 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -650,6 +650,13 @@ LANGUAGE INTERNAL
CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
AS 'pg_stat_reset_slru';
+CREATE OR REPLACE FUNCTION
+ pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE PARALLEL UNSAFE
+AS 'pg_replication_origin_session_setup';
+
--
-- The default permissions for functions mean that anyone can execute them.
-- A number of functions shouldn't be executable by just anyone, but rather
@@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e50dcc..98d47e1beb8 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1374,12 +1374,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
{
char *name;
RepOriginId origin;
+ int pid;
replorigin_check_prerequisites(true, false);
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
origin = replorigin_by_name(name, false);
- replorigin_session_setup(origin, 0);
+ pid = PG_GETARG_INT32(1);
+ replorigin_session_setup(origin, pid);
replorigin_session_origin = origin;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 03e82d28c87..01eba3b5a19 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12235,7 +12235,7 @@
{ oid => '6006',
descr => 'configure session to maintain replication progress tracking for the passed in origin',
proname => 'pg_replication_origin_session_setup', provolatile => 'v',
- proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+ proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
prosrc => 'pg_replication_origin_session_setup' },
{ oid => '6007', descr => 'teardown configured replication progress tracking',
--
2.47.3
v8-0002-add-test.patchapplication/octet-stream; name=v8-0002-add-test.patchDownload
From ee0f1539ddb8d68cc80af229fa0afb5d58e55e92 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 17 Sep 2025 15:20:45 +0900
Subject: [PATCH v8 2/3] add test
---
contrib/test_decoding/Makefile | 2 +-
.../test_decoding/expected/repl_origin.out | 79 +++++++++++++++++++
contrib/test_decoding/meson.build | 1 +
contrib/test_decoding/specs/repl_origin.spec | 56 +++++++++++++
4 files changed, 137 insertions(+), 1 deletion(-)
create mode 100644 contrib/test_decoding/expected/repl_origin.out
create mode 100644 contrib/test_decoding/specs/repl_origin.spec
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 02e961f4d31..8aa80054944 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
twophase_snapshot slot_creation_error catalog_change_snapshot \
- skip_snapshot_restore invalidation_distribution
+ skip_snapshot_restore invalidation_distribution repl_origin
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/repl_origin.out b/contrib/test_decoding/expected/repl_origin.out
new file mode 100644
index 00000000000..9ef80217b9d
--- /dev/null
+++ b/contrib/test_decoding/expected/repl_origin.out
@@ -0,0 +1,79 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s1_setup:
+ SELECT pg_replication_origin_session_setup('origin', pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/repl_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s0_add_message:
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
+
+?column?
+--------
+ 1
+(1 row)
+
+step s0_store_lsn:
+ INSERT INTO local_lsn_store
+ SELECT 0, local_lsn FROM pg_replication_origin_status;
+
+step s1_add_message:
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
+
+?column?
+--------
+ 1
+(1 row)
+
+step s1_store_lsn:
+ INSERT INTO local_lsn_store
+ SELECT 1, local_lsn FROM pg_replication_origin_status;
+
+step s0_compare:
+ SELECT s0.lsn < s1.lsn
+ FROM local_lsn_store as s0, local_lsn_store as s1
+ WHERE s0.session = 0 AND s1.session = 1;
+
+?column?
+--------
+t
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build
index 25f6b8a9082..6d687eeb2d7 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -64,6 +64,7 @@ tests += {
'slot_creation_error',
'skip_snapshot_restore',
'invalidation_distribution',
+ 'repl_origin',
],
'regress_args': [
'--temp-config', files('logical.conf'),
diff --git a/contrib/test_decoding/specs/repl_origin.spec b/contrib/test_decoding/specs/repl_origin.spec
new file mode 100644
index 00000000000..266ce553444
--- /dev/null
+++ b/contrib/test_decoding/specs/repl_origin.spec
@@ -0,0 +1,56 @@
+# Test multi-session replication origin manipulations; ensure local_lsn can be
+# updated by all attached sessions.
+
+setup
+{
+ SELECT pg_replication_origin_create('origin');
+ CREATE UNLOGGED TABLE local_lsn_store (session int, lsn pg_lsn);
+}
+
+teardown
+{
+ SELECT pg_replication_origin_drop('origin');
+ DROP TABLE local_lsn_store;
+}
+
+session "s0"
+setup { SET synchronous_commit = on; }
+step "s0_setup" { SELECT pg_replication_origin_session_setup('origin'); }
+step "s0_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
+step "s0_add_message" {
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
+}
+step "s0_store_lsn" {
+ INSERT INTO local_lsn_store
+ SELECT 0, local_lsn FROM pg_replication_origin_status;
+}
+step "s0_compare" {
+ SELECT s0.lsn < s1.lsn
+ FROM local_lsn_store as s0, local_lsn_store as s1
+ WHERE s0.session = 0 AND s1.session = 1;
+}
+step "s0_reset" { SELECT pg_replication_origin_session_reset(); }
+
+session "s1"
+setup { SET synchronous_commit = on; }
+step "s1_setup" {
+ SELECT pg_replication_origin_session_setup('origin', pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/repl_origin/s0';
+}
+step "s1_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
+step "s1_add_message" {
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
+}
+step "s1_store_lsn" {
+ INSERT INTO local_lsn_store
+ SELECT 1, local_lsn FROM pg_replication_origin_status;
+}
+step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
+
+# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
+# commits a transaction and store the local_lsn of the replication origin.
+# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
--
2.47.3
v8-0003-Avoid-setting-ReplicationState-in-case-of-ERROR.patchapplication/octet-stream; name=v8-0003-Avoid-setting-ReplicationState-in-case-of-ERROR.patchDownload
From 601cbb02265a5373d298e6803715d30c2370a111 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 17 Sep 2025 18:15:33 +0900
Subject: [PATCH v8 3/3] Avoid setting ReplicationState in case of ERROR
---
contrib/test_decoding/expected/replorigin.out | 3 ++
contrib/test_decoding/sql/replorigin.sql | 3 ++
src/backend/replication/logical/origin.c | 31 +++++++++++++------
3 files changed, 27 insertions(+), 10 deletions(-)
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c85e1a01b23..4f64ea8942f 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -41,6 +41,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index"
DETAIL: Key (roname)=(regress_test_decoding: regression_slot) already exists.
+-- ensure session setup with invalid pid fail
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+ERROR: could not find replication state slot for replication origin with OID 1 which was acquired by -1
--ensure deletions work (once)
SELECT pg_replication_origin_create('regress_test_decoding: temp');
pg_replication_origin_create
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e71ee02d050..d899d5cdc18 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -26,6 +26,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
-- ensure duplicate creations fail
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
+-- ensure session setup with invalid pid fail
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+
--ensure deletions work (once)
SELECT pg_replication_origin_create('regress_test_decoding: temp');
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 98d47e1beb8..0bbc96bcee5 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1122,6 +1122,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
static bool registered_cleanup;
int i;
int free_slot = -1;
+ ReplicationState *candidate_state = NULL;
+ bool initialized = false;
if (!registered_cleanup)
{
@@ -1168,34 +1170,43 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
}
/* ok, found slot */
- session_replication_state = curstate;
+ candidate_state = curstate;
break;
}
- if (session_replication_state == NULL && free_slot == -1)
+ if (candidate_state == NULL && free_slot == -1)
ereport(ERROR,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("could not find free replication state slot for replication origin with ID %d",
node),
errhint("Increase \"max_active_replication_origins\" and try again.")));
- else if (session_replication_state == NULL)
+ else if (candidate_state == NULL)
{
/* initialize new slot */
- session_replication_state = &replication_states[free_slot];
- Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
- Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
- session_replication_state->roident = node;
+ candidate_state = &replication_states[free_slot];
+ Assert(candidate_state->remote_lsn == InvalidXLogRecPtr);
+ Assert(candidate_state->local_lsn == InvalidXLogRecPtr);
+ candidate_state->roident = node;
+ initialized = true;
}
- Assert(session_replication_state->roident != InvalidRepOriginId);
+ Assert(candidate_state->roident != InvalidRepOriginId);
if (acquired_by == 0)
- session_replication_state->acquired_by = MyProcPid;
- else if (session_replication_state->acquired_by != acquired_by)
+ candidate_state->acquired_by = MyProcPid;
+ else if (candidate_state->acquired_by != acquired_by)
+ {
+ if (initialized)
+ candidate_state->roident = InvalidRepOriginId;
+
elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
node, acquired_by);
+ }
+
+ /* Candidate slot looks ok, use it */
+ session_replication_state = candidate_state;
LWLockRelease(ReplicationOriginLock);
--
2.47.3
On Thu, Sep 18, 2025 at 1:07 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
Dear hackers,
I considered a test, please see attached files.
Few comments:
1. +step "s0_compare" {
+ SELECT s0.lsn < s1.lsn
+ FROM local_lsn_store as s0, local_lsn_store as s1
+ WHERE s0.session = 0 AND s1.session = 1;
+}
This appears to be a bit tricky to compare the values. Doing a
sequential scan won't guarantee the order of rows' appearance. Can't
we somehow get the two rows ordered by session_id and compare their
values?
2.
+ else if (candidate_state->acquired_by != acquired_by)
+ {
+ if (initialized)
+ candidate_state->roident = InvalidRepOriginId;
+
elog(ERROR, "could not find replication state slot for replication
origin with OID %u which was acquired by %d",
node, acquired_by);
+ }
This doesn't appear neat. Instead, how about checking this case before
setting current_state as shown in attached. If we do that, we
shouldn't even need new variables like current_state and initialized.
Additionally, as shown in attached, it is better to make this a
user-facing error by using ereport.
3. Merge all patches as I don't see the need to do any backpatch here.
--
With Regards,
Amit Kapila.
Attachments:
v8_amit_1.txttext/plain; charset=US-ASCII; name=v8_amit_1.txtDownload
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 0bbc96bcee5..c93b6eb1798 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1169,6 +1169,15 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
curstate->roident, curstate->acquired_by)));
}
+ else if (curstate->acquired_by != 0 &&
+ curstate->acquired_by != acquired_by)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
+ node, acquired_by)));
+ }
+
/* ok, found slot */
candidate_state = curstate;
break;
@@ -1196,14 +1205,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
if (acquired_by == 0)
candidate_state->acquired_by = MyProcPid;
- else if (candidate_state->acquired_by != acquired_by)
- {
- if (initialized)
- candidate_state->roident = InvalidRepOriginId;
-
- elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
- node, acquired_by);
- }
+ else
+ Assert(candidate_state->acquired_by == acquired_by);
/* Candidate slot looks ok, use it */
session_replication_state = candidate_state;
Dear Amit,
Few comments: 1. +step "s0_compare" { + SELECT s0.lsn < s1.lsn + FROM local_lsn_store as s0, local_lsn_store as s1 + WHERE s0.session = 0 AND s1.session = 1; +}This appears to be a bit tricky to compare the values. Doing a
sequential scan won't guarantee the order of rows' appearance. Can't
we somehow get the two rows ordered by session_id and compare their
values?
I considered another way to use the CTE for session 0. How do you feel?
2. + else if (candidate_state->acquired_by != acquired_by) + { + if (initialized) + candidate_state->roident = InvalidRepOriginId; + elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d", node, acquired_by); + }This doesn't appear neat. Instead, how about checking this case before
setting current_state as shown in attached. If we do that, we
shouldn't even need new variables like current_state and initialized.
Your approach cannot work when the specified origin is not used yet after the
instance starts. In this case the origin has not exist in the replication_states
yet and new slot is initialized.
Per current understanding, two ERRORs are needed to avoid adding new variables;
first one is in the loop, and second one is in session_replication_state == NULL
case. Latter one indicates the case that origin is inactive but PID is specified
so different error message can be set.
Additionally, as shown in attached, it is better to make this a
user-facing error by using ereport.
Indeed, elog() were replaced with ereport().
3. Merge all patches as I don't see the need to do any backpatch here.
Sure.
Attached patch includes all changes. Thought?
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Attachments:
v9-0001-pg_replication_origin_session_setup-pid-parameter.patchapplication/octet-stream; name=v9-0001-pg_replication_origin_session_setup-pid-parameter.patchDownload
From 7a41bc12c7464cc9dca9f3bfd6d0548538f64ed9 Mon Sep 17 00:00:00 2001
From: Doruk <doruk@mixrank.com>
Date: Mon, 8 Sep 2025 14:22:15 +0300
Subject: [PATCH v9] pg_replication_origin_session_setup: pid parameter
Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
contrib/test_decoding/Makefile | 2 +-
.../test_decoding/expected/repl_origin.out | 79 +++++++++++++++++++
contrib/test_decoding/expected/replorigin.out | 3 +
contrib/test_decoding/meson.build | 1 +
contrib/test_decoding/specs/repl_origin.spec | 56 +++++++++++++
contrib/test_decoding/sql/replorigin.sql | 3 +
doc/src/sgml/func/func-admin.sgml | 23 +++++-
src/backend/catalog/system_functions.sql | 9 ++-
src/backend/replication/logical/origin.c | 24 +++++-
src/include/catalog/pg_proc.dat | 2 +-
10 files changed, 193 insertions(+), 9 deletions(-)
create mode 100644 contrib/test_decoding/expected/repl_origin.out
create mode 100644 contrib/test_decoding/specs/repl_origin.spec
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 02e961f4d31..8aa80054944 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
twophase_snapshot slot_creation_error catalog_change_snapshot \
- skip_snapshot_restore invalidation_distribution
+ skip_snapshot_restore invalidation_distribution repl_origin
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/repl_origin.out b/contrib/test_decoding/expected/repl_origin.out
new file mode 100644
index 00000000000..9ef80217b9d
--- /dev/null
+++ b/contrib/test_decoding/expected/repl_origin.out
@@ -0,0 +1,79 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s1_setup:
+ SELECT pg_replication_origin_session_setup('origin', pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/repl_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s0_add_message:
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
+
+?column?
+--------
+ 1
+(1 row)
+
+step s0_store_lsn:
+ INSERT INTO local_lsn_store
+ SELECT 0, local_lsn FROM pg_replication_origin_status;
+
+step s1_add_message:
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
+
+?column?
+--------
+ 1
+(1 row)
+
+step s1_store_lsn:
+ INSERT INTO local_lsn_store
+ SELECT 1, local_lsn FROM pg_replication_origin_status;
+
+step s0_compare:
+ SELECT s0.lsn < s1.lsn
+ FROM local_lsn_store as s0, local_lsn_store as s1
+ WHERE s0.session = 0 AND s1.session = 1;
+
+?column?
+--------
+t
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c85e1a01b23..cb9d63e20c1 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -41,6 +41,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index"
DETAIL: Key (roname)=(regress_test_decoding: regression_slot) already exists.
+-- ensure inactive origin cannot be set as session one if pid is specified
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+ERROR: replication origin with ID 1 is inactive but PID -1 was specified
--ensure deletions work (once)
SELECT pg_replication_origin_create('regress_test_decoding: temp');
pg_replication_origin_create
diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build
index 25f6b8a9082..6d687eeb2d7 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -64,6 +64,7 @@ tests += {
'slot_creation_error',
'skip_snapshot_restore',
'invalidation_distribution',
+ 'repl_origin',
],
'regress_args': [
'--temp-config', files('logical.conf'),
diff --git a/contrib/test_decoding/specs/repl_origin.spec b/contrib/test_decoding/specs/repl_origin.spec
new file mode 100644
index 00000000000..266ce553444
--- /dev/null
+++ b/contrib/test_decoding/specs/repl_origin.spec
@@ -0,0 +1,56 @@
+# Test multi-session replication origin manipulations; ensure local_lsn can be
+# updated by all attached sessions.
+
+setup
+{
+ SELECT pg_replication_origin_create('origin');
+ CREATE UNLOGGED TABLE local_lsn_store (session int, lsn pg_lsn);
+}
+
+teardown
+{
+ SELECT pg_replication_origin_drop('origin');
+ DROP TABLE local_lsn_store;
+}
+
+session "s0"
+setup { SET synchronous_commit = on; }
+step "s0_setup" { SELECT pg_replication_origin_session_setup('origin'); }
+step "s0_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
+step "s0_add_message" {
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
+}
+step "s0_store_lsn" {
+ INSERT INTO local_lsn_store
+ SELECT 0, local_lsn FROM pg_replication_origin_status;
+}
+step "s0_compare" {
+ SELECT s0.lsn < s1.lsn
+ FROM local_lsn_store as s0, local_lsn_store as s1
+ WHERE s0.session = 0 AND s1.session = 1;
+}
+step "s0_reset" { SELECT pg_replication_origin_session_reset(); }
+
+session "s1"
+setup { SET synchronous_commit = on; }
+step "s1_setup" {
+ SELECT pg_replication_origin_session_setup('origin', pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/repl_origin/s0';
+}
+step "s1_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
+step "s1_add_message" {
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
+}
+step "s1_store_lsn" {
+ INSERT INTO local_lsn_store
+ SELECT 1, local_lsn FROM pg_replication_origin_status;
+}
+step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
+
+# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
+# commits a transaction and store the local_lsn of the replication origin.
+# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e71ee02d050..17f2b888238 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -26,6 +26,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
-- ensure duplicate creations fail
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
+-- ensure inactive origin cannot be set as session one if pid is specified
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+
--ensure deletions work (once)
SELECT pg_replication_origin_create('regress_test_decoding: temp');
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 57ff333159f..1b465bc8ba7 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
<indexterm>
<primary>pg_replication_origin_session_setup</primary>
</indexterm>
- <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+ <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
<returnvalue>void</returnvalue>
</para>
<para>
@@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
origin, allowing replay progress to be tracked.
Can only be used if no origin is currently selected.
Use <function>pg_replication_origin_session_reset</function> to undo.
- </para></entry>
+ If multiple processes can safely use the same replication origin (for
+ example, parallel apply processes), the optional <parameter>pid</parameter>
+ parameter can be used to specify the process ID of the first process.
+ The first process must provide <parameter>pid</parameter> equals to
+ <literal>0</literal> and the other processes that share the same
+ replication origin should provide the process ID of the first process.
+ </para>
+ <caution>
+ <para>
+ When multiple processes share the same replication origin, it is critical
+ to maintain commit order to prevent data inconsistency. While processes
+ may send operations out of order, they must commit transactions in the
+ correct sequence to ensure proper replication consistency. The recommended workflow
+ for each worker is: set up the replication origin session with the first process's PID,
+ apply changes within transactions, call <function>pg_replication_origin_xact_setup</function>
+ with the LSN and commit timestamp before committing, then commit the
+ transaction only if everything succeeded.
+ </para>
+ </caution>
+ </entry>
</row>
<row>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308e443..2d946d6d9e9 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -650,6 +650,13 @@ LANGUAGE INTERNAL
CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
AS 'pg_stat_reset_slru';
+CREATE OR REPLACE FUNCTION
+ pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE PARALLEL UNSAFE
+AS 'pg_replication_origin_session_setup';
+
--
-- The default permissions for functions mean that anyone can execute them.
-- A number of functions shouldn't be executable by just anyone, but rather
@@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e50dcc..a88b0e6d1ea 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1167,6 +1167,14 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
curstate->roident, curstate->acquired_by)));
}
+ else if (curstate->acquired_by != acquired_by)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
+ node, acquired_by)));
+ }
+
/* ok, found slot */
session_replication_state = curstate;
break;
@@ -1181,6 +1189,13 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
errhint("Increase \"max_active_replication_origins\" and try again.")));
else if (session_replication_state == NULL)
{
+ /* The origin is not used but PID is specified */
+ if(acquired_by)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication origin with ID %d is inactive but PID %d was specified",
+ node, acquired_by)));
+
/* initialize new slot */
session_replication_state = &replication_states[free_slot];
Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
@@ -1193,9 +1208,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
if (acquired_by == 0)
session_replication_state->acquired_by = MyProcPid;
- else if (session_replication_state->acquired_by != acquired_by)
- elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
- node, acquired_by);
+ else
+ Assert(session_replication_state->acquired_by == acquired_by);
LWLockRelease(ReplicationOriginLock);
@@ -1374,12 +1388,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
{
char *name;
RepOriginId origin;
+ int pid;
replorigin_check_prerequisites(true, false);
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
origin = replorigin_by_name(name, false);
- replorigin_session_setup(origin, 0);
+ pid = PG_GETARG_INT32(1);
+ replorigin_session_setup(origin, pid);
replorigin_session_origin = origin;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 03e82d28c87..01eba3b5a19 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12235,7 +12235,7 @@
{ oid => '6006',
descr => 'configure session to maintain replication progress tracking for the passed in origin',
proname => 'pg_replication_origin_session_setup', provolatile => 'v',
- proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+ proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
prosrc => 'pg_replication_origin_session_setup' },
{ oid => '6007', descr => 'teardown configured replication progress tracking',
--
2.47.3
Hi,
When testing the new parameter in pg_replication_origin_session_setup(), I
noticed a bug allowing the origin in use to be dropped. The issue arises when
two backends set up the same origin; if the second backend resets the origin
first, it resets the acquired_by flag regardless of whether the first backend is
using it. This allows the origin to be dropped, enabling the slot in shared
memory to be reused, which is unintended.
About the fix, simply adding a check for acquired_by field does not work,
because if the first backend resets the origin first, it still risks being
dropped while second backend uses it.
To fully resolve this, I tried to add a reference count (refcount) for the
origin. The count is incremented when a backend sets up the origin and
decremented upon a reset. As a result, the replication origin is only dropped
when the reference count reaches zero.
Thanks to Kuroda-San for discussing and reviewing this patch off-list.
Best Regards,
Hou zj
Attachments:
v1-0001-Fix-unintended-drop-of-active-replication-origins.patchapplication/octet-stream; name=v1-0001-Fix-unintended-drop-of-active-replication-origins.patchDownload
From 8b39ed57f03d0a2f90f9b89572db2e1242a11dd0 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Mon, 22 Sep 2025 11:22:55 +0800
Subject: [PATCH v1] Fix unintended drop of active replication origins
Currently, if two backends configure the same replication origin and one backend
resets it first, the acquired_by flag is cleared without recognizing the active
usage by the first backend. This can result in the unintended dropping of the
origin, potentially leading to issues if the shared memory of the dropped origin
is reused for a newly created origin. Such reuse could cause unpredictable
advancement of a different slot by the remaining backend holding the memory of
the dropped origin.
This commit addresses the issue by introducing a reference count for replication
origins. The count is incremented when a backend sets up the origin and
decremented upon a reset. As a result, the replication origin is only dropped
when the reference count reaches zero.
---
.../expected/parallel_session_origin.out | 44 +++++++++++
.../specs/parallel_session_origin.spec | 4 +
src/backend/replication/logical/origin.c | 79 ++++++++++++-------
3 files changed, 97 insertions(+), 30 deletions(-)
diff --git a/contrib/test_decoding/expected/parallel_session_origin.out b/contrib/test_decoding/expected/parallel_session_origin.out
index e515b39f7ce..546d8933954 100644
--- a/contrib/test_decoding/expected/parallel_session_origin.out
+++ b/contrib/test_decoding/expected/parallel_session_origin.out
@@ -77,3 +77,47 @@ pg_replication_origin_session_reset
(1 row)
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_reset s1_drop s1_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s1_setup:
+ SELECT pg_replication_origin_session_setup('origin', pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/parallel_session_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
+step s1_drop: SELECT pg_replication_origin_drop('origin');
+ERROR: could not drop replication origin with ID 1, in use by another process
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/specs/parallel_session_origin.spec b/contrib/test_decoding/specs/parallel_session_origin.spec
index c0e5fda0723..8e9c81e4419 100644
--- a/contrib/test_decoding/specs/parallel_session_origin.spec
+++ b/contrib/test_decoding/specs/parallel_session_origin.spec
@@ -49,8 +49,12 @@ step "s1_store_lsn" {
SELECT 1, local_lsn FROM pg_replication_origin_status;
}
step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
+step "s1_drop" { SELECT pg_replication_origin_drop('origin'); }
# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
# commits a transaction and store the local_lsn of the replication origin.
# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
+
+# Test that the origin cannot be dropped if any session is actively using it.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_reset" "s1_drop" "s1_reset"
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 2380f369578..536e524f4d5 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -130,6 +130,9 @@ typedef struct ReplicationState
*/
int acquired_by;
+ /* Number of backend that is currently using this origin. */
+ int refcount;
+
/*
* Condition variable that's signaled when acquired_by changes.
*/
@@ -383,16 +386,19 @@ restart:
if (state->roident == roident)
{
/* found our slot, is it busy? */
- if (state->acquired_by != 0)
+ if (state->refcount > 0)
{
ConditionVariable *cv;
if (nowait)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("could not drop replication origin with ID %d, in use by PID %d",
- state->roident,
- state->acquired_by)));
+ (state->acquired_by != 0)
+ ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
+ state->roident,
+ state->acquired_by)
+ : errmsg("could not drop replication origin with ID %d, in use by another process",
+ state->roident)));
/*
* We must wait and then retry. Since we don't know which CV
@@ -1069,32 +1075,47 @@ replorigin_get_progress(RepOriginId node, bool flush)
return remote_lsn;
}
-/*
- * Tear down a (possibly) configured session replication origin during process
- * exit.
- */
+/* Helpful function to reset the session replication origin */
static void
-ReplicationOriginExitCleanup(int code, Datum arg)
+replorigin_session_reset_internal(void)
{
- ConditionVariable *cv = NULL;
+ ConditionVariable *cv;
- if (session_replication_state == NULL)
- return;
+ Assert(session_replication_state != NULL);
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
- if (session_replication_state->acquired_by == MyProcPid)
- {
- cv = &session_replication_state->origin_cv;
+ Assert(session_replication_state->refcount > 0);
+ /*
+ * Reset the PID only if the current backend is the first to set up this
+ * origin. This prevents resetting the PID when other backends are still
+ * using this origin.
+ */
+ if (session_replication_state->acquired_by == MyProcPid)
session_replication_state->acquired_by = 0;
- session_replication_state = NULL;
- }
+
+ session_replication_state->refcount--;
+
+ cv = &session_replication_state->origin_cv;
+ session_replication_state = NULL;
LWLockRelease(ReplicationOriginLock);
- if (cv)
- ConditionVariableBroadcast(cv);
+ ConditionVariableBroadcast(cv);
+}
+
+/*
+ * Tear down a (possibly) configured session replication origin during process
+ * exit.
+ */
+static void
+ReplicationOriginExitCleanup(int code, Datum arg)
+{
+ if (session_replication_state == NULL)
+ return;
+
+ replorigin_session_reset_internal();
}
/*
@@ -1205,9 +1226,17 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
Assert(session_replication_state->roident != InvalidRepOriginId);
if (acquired_by == 0)
+ {
session_replication_state->acquired_by = MyProcPid;
+ Assert(session_replication_state->refcount == 0);
+ }
else
+ {
Assert(session_replication_state->acquired_by == acquired_by);
+ Assert(session_replication_state->refcount > 0);
+ }
+
+ session_replication_state->refcount++;
LWLockRelease(ReplicationOriginLock);
@@ -1224,8 +1253,6 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
void
replorigin_session_reset(void)
{
- ConditionVariable *cv;
-
Assert(max_active_replication_origins != 0);
if (session_replication_state == NULL)
@@ -1233,15 +1260,7 @@ replorigin_session_reset(void)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("no replication origin is configured")));
- LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
-
- session_replication_state->acquired_by = 0;
- cv = &session_replication_state->origin_cv;
- session_replication_state = NULL;
-
- LWLockRelease(ReplicationOriginLock);
-
- ConditionVariableBroadcast(cv);
+ replorigin_session_reset_internal();
}
/*
--
2.51.1.windows.1
On Tue, Dec 23, 2025 at 2:24 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:
When testing the new parameter in pg_replication_origin_session_setup(), I
noticed a bug allowing the origin in use to be dropped. The issue arises when
two backends set up the same origin; if the second backend resets the origin
first, it resets the acquired_by flag regardless of whether the first backend is
using it. This allows the origin to be dropped, enabling the slot in shared
memory to be reused, which is unintended.About the fix, simply adding a check for acquired_by field does not work,
because if the first backend resets the origin first, it still risks being
dropped while second backend uses it.To fully resolve this, I tried to add a reference count (refcount) for the
origin. The count is incremented when a backend sets up the origin and
decremented upon a reset. As a result, the replication origin is only dropped
when the reference count reaches zero.Thanks to Kuroda-San for discussing and reviewing this patch off-list.
Thanks to both of you for the report and patch. I'll look into it
sometime during the next CF.
--
With Regards,
Amit Kapila.
On Tue, Dec 23, 2025 at 2:24 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:
Hi,
When testing the new parameter in pg_replication_origin_session_setup(), I
noticed a bug allowing the origin in use to be dropped. The issue arises when
two backends set up the same origin; if the second backend resets the origin
first, it resets the acquired_by flag regardless of whether the first backend is
using it. This allows the origin to be dropped, enabling the slot in shared
memory to be reused, which is unintended.About the fix, simply adding a check for acquired_by field does not work,
because if the first backend resets the origin first, it still risks being
dropped while second backend uses it.To fully resolve this, I tried to add a reference count (refcount) for the
origin. The count is incremented when a backend sets up the origin and
decremented upon a reset. As a result, the replication origin is only dropped
when the reference count reaches zero.Thanks to Kuroda-San for discussing and reviewing this patch off-list.
Thanks Hou-San and Kuroda-San.
What should be the expected behavior when Session1 resets the origin
(changing acquired_pid from its own PID to 0), while Session2 is
already connected to the origin and Session3 also attempts to reuse
the same origin?
Currently it asserts:
Session1:
select pg_replication_origin_create('origin');
SELECT pg_replication_origin_session_setup('origin');
Session2:
SELECT pg_replication_origin_session_setup('origin',48028);
Session1:
SELECT pg_replication_origin_session_reset();
Session3:
SELECT pg_replication_origin_session_setup('origin');
This asserts at:
TRAP: failed Assert("session_replication_state->refcount == 0"), File:
"origin.c", Line: 1231, PID: 48037
thanks
Shveta
On Mon, Jan 5, 2026 at 3:15 PM shveta malik <shveta.malik@gmail.com> wrote:
On Tue, Dec 23, 2025 at 2:24 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:Hi,
When testing the new parameter in pg_replication_origin_session_setup(), I
noticed a bug allowing the origin in use to be dropped. The issue arises when
two backends set up the same origin; if the second backend resets the origin
first, it resets the acquired_by flag regardless of whether the first backend is
using it. This allows the origin to be dropped, enabling the slot in shared
memory to be reused, which is unintended.About the fix, simply adding a check for acquired_by field does not work,
because if the first backend resets the origin first, it still risks being
dropped while second backend uses it.To fully resolve this, I tried to add a reference count (refcount) for the
origin. The count is incremented when a backend sets up the origin and
decremented upon a reset. As a result, the replication origin is only dropped
when the reference count reaches zero.Thanks to Kuroda-San for discussing and reviewing this patch off-list.
Thanks Hou-San and Kuroda-San.
What should be the expected behavior when Session1 resets the origin
(changing acquired_pid from its own PID to 0), while Session2 is
already connected to the origin and Session3 also attempts to reuse
the same origin?Currently it asserts:
Session1:
select pg_replication_origin_create('origin');
SELECT pg_replication_origin_session_setup('origin');Session2:
SELECT pg_replication_origin_session_setup('origin',48028);Session1:
SELECT pg_replication_origin_session_reset();Session3:
SELECT pg_replication_origin_session_setup('origin');
This asserts at:
TRAP: failed Assert("session_replication_state->refcount == 0"), File:
"origin.c", Line: 1231, PID: 48037
I checked the behavior on HEAD. Session3 is able to set up the origin
and sets its own PID in acquired_pid. But it is unclear to me which
PID should be recorded in acquired_pid - Session2’s PID, since it set
up the origin earlier, or Session3’s PID. Or does this even make any
difference?
I found one more related issue on HEAD, sharing it here:
When the first backend creates and sets up the origin, followed by a
second backend setting it up, and then the first backend resets it
while the second backend attempts to drop it, an assertion is
triggered:
TRAP: failed Assert("session_replication_state->roident !=
InvalidRepOriginId"), File: "origin.c", Line: 1257, PID: 48438
thanks
Shveta
On Mon, Jan 5, 2026 at 4:00 PM shveta malik <shveta.malik@gmail.com> wrote:
On Mon, Jan 5, 2026 at 3:15 PM shveta malik <shveta.malik@gmail.com> wrote:
On Tue, Dec 23, 2025 at 2:24 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:Hi,
When testing the new parameter in pg_replication_origin_session_setup(), I
noticed a bug allowing the origin in use to be dropped. The issue arises when
two backends set up the same origin; if the second backend resets the origin
first, it resets the acquired_by flag regardless of whether the first backend is
using it. This allows the origin to be dropped, enabling the slot in shared
memory to be reused, which is unintended.About the fix, simply adding a check for acquired_by field does not work,
because if the first backend resets the origin first, it still risks being
dropped while second backend uses it.To fully resolve this, I tried to add a reference count (refcount) for the
origin. The count is incremented when a backend sets up the origin and
decremented upon a reset. As a result, the replication origin is only dropped
when the reference count reaches zero.Thanks to Kuroda-San for discussing and reviewing this patch off-list.
Thanks Hou-San and Kuroda-San.
What should be the expected behavior when Session1 resets the origin
(changing acquired_pid from its own PID to 0), while Session2 is
already connected to the origin and Session3 also attempts to reuse
the same origin?Currently it asserts:
Session1:
select pg_replication_origin_create('origin');
SELECT pg_replication_origin_session_setup('origin');Session2:
SELECT pg_replication_origin_session_setup('origin',48028);Session1:
SELECT pg_replication_origin_session_reset();Session3:
SELECT pg_replication_origin_session_setup('origin');
This asserts at:
TRAP: failed Assert("session_replication_state->refcount == 0"), File:
"origin.c", Line: 1231, PID: 48037I checked the behavior on HEAD. Session3 is able to set up the origin
and sets its own PID in acquired_pid. But it is unclear to me which
PID should be recorded in acquired_pid - Session2’s PID, since it set
up the origin earlier, or Session3’s PID. Or does this even make any
difference?I found one more related issue on HEAD, sharing it here:
When the first backend creates and sets up the origin, followed by a
second backend setting it up, and then the first backend resets it
while the second backend attempts to drop it, an assertion is
triggered:
TRAP: failed Assert("session_replication_state->roident !=
InvalidRepOriginId"), File: "origin.c", Line: 1257, PID: 48438
Can we address these problems by prohibiting leader worker to reset
when pa workers are still associated with the origin? The way for
leader to know if pa workers are associated with origin is by checking
following condition: acquired_by == MyProcpid AND refcount > 1.
--
With Regards,
Amit Kapila.
Dear Amit, Shveta,
Thanks Hou-San and Kuroda-San.
What should be the expected behavior when Session1 resets the origin
(changing acquired_pid from its own PID to 0), while Session2 is
already connected to the origin and Session3 also attempts to reuse
the same origin?Currently it asserts:
Session1:
select pg_replication_origin_create('origin');
SELECT pg_replication_origin_session_setup('origin');Session2:
SELECT pg_replication_origin_session_setup('origin',48028);Session1:
SELECT pg_replication_origin_session_reset();Session3:
SELECT pg_replication_origin_session_setup('origin');
This asserts at:
TRAP: failed Assert("session_replication_state->refcount == 0"), File:
"origin.c", Line: 1231, PID: 48037
FYI, this happened because v1 assumed refcount was 0 if acquired_by was 0.
But your proposed scenario met it.
I checked the behavior on HEAD. Session3 is able to set up the origin
and sets its own PID in acquired_pid. But it is unclear to me which
PID should be recorded in acquired_pid - Session2’s PID, since it set
up the origin earlier, or Session3’s PID. Or does this even make any
difference?
To clarify, I think the behavior on HEAD is not correct. The backend should
acquire the active origin if it expressly specifies the PID. Otherwise, users
may acquire unintentionally and advance it.
Can we address these problems by prohibiting leader worker to reset
when pa workers are still associated with the origin? The way for
leader to know if pa workers are associated with origin is by checking
following condition: acquired_by == MyProcpid AND refcount > 1.
I think it's okay. IIUC, the idea is to avoid that active origin has invalid
acquired_by attribute. The replication origin was extended to support parallel
apply of logical replication, and it is reasonable to force the same approach.
Attached 0001 implemented that.
One concern with the implementation is that acquired_by can be zero if the process
exits without releasing the origin; this can happen if the first acquired process
exits while the second is still using it.
Regarding our logical replication, it won't be problematic because the leader
worker ensures all parallel workers finish before it exits.
To address the issue, I propose that another process should not be able to
acquire the origin if the acquired_by of the active origin is 0. The problem
should be resolved within the SQL interface, since it occurs there.
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Attachments:
v3-0001-Fix-unintended-drop-of-active-replication-origins.patchapplication/octet-stream; name=v3-0001-Fix-unintended-drop-of-active-replication-origins.patchDownload
From 5383f9d8c801e48192d59d0b1f07c88c2b595de2 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Mon, 22 Sep 2025 11:22:55 +0800
Subject: [PATCH v3 1/2] Fix unintended drop of active replication origins
Currently, if two backends configure the same replication origin and one backend
resets it first, the acquired_by flag is cleared without recognizing the active
usage by the first backend. This can result in the unintended dropping of the
origin, potentially leading to issues if the shared memory of the dropped origin
is reused for a newly created origin. Such reuse could cause unpredictable
advancement of a different slot by the remaining backend holding the memory of
the dropped origin.
This commit addresses the issue by introducing a reference count for replication
origins. The count is incremented when a backend sets up the origin and
decremented upon a reset. Also, the backend process which firstly acquired the
origin does not release till other acquiring process releases it. This ensures
that acquired_by flag cannot be zero while processes are actively using it.
---
.../expected/parallel_session_origin.out | 46 ++++++++++-
.../specs/parallel_session_origin.spec | 6 +-
src/backend/replication/logical/origin.c | 80 +++++++++++++------
3 files changed, 105 insertions(+), 27 deletions(-)
diff --git a/contrib/test_decoding/expected/parallel_session_origin.out b/contrib/test_decoding/expected/parallel_session_origin.out
index e515b39f7ce..07c1e3622ce 100644
--- a/contrib/test_decoding/expected/parallel_session_origin.out
+++ b/contrib/test_decoding/expected/parallel_session_origin.out
@@ -1,6 +1,6 @@
Parsed test spec with 2 sessions
-starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s1_reset s0_reset
step s0_setup: SELECT pg_replication_origin_session_setup('origin');
pg_replication_origin_session_setup
-----------------------------------
@@ -65,15 +65,59 @@ step s0_compare:
t
(1 row)
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
step s0_reset: SELECT pg_replication_origin_session_reset();
pg_replication_origin_session_reset
-----------------------------------
(1 row)
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_reset s1_reset s0_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s1_setup:
+ SELECT pg_replication_origin_session_setup('origin', pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/parallel_session_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+ERROR: another process is acquiring the replication origin
step s1_reset: SELECT pg_replication_origin_session_reset();
pg_replication_origin_session_reset
-----------------------------------
(1 row)
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/specs/parallel_session_origin.spec b/contrib/test_decoding/specs/parallel_session_origin.spec
index c0e5fda0723..2253a7a14eb 100644
--- a/contrib/test_decoding/specs/parallel_session_origin.spec
+++ b/contrib/test_decoding/specs/parallel_session_origin.spec
@@ -53,4 +53,8 @@ step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
# commits a transaction and store the local_lsn of the replication origin.
# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
-permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s1_reset" "s0_reset"
+
+# Test that the origin cannot be released if another session is actively using
+# it.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_reset" "s1_reset" "s0_reset"
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 04bc704a332..dff2c073def 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -130,6 +130,9 @@ typedef struct ReplicationState
*/
int acquired_by;
+ /* Number of backend that is currently using this origin. */
+ int refcount;
+
/*
* Condition variable that's signaled when acquired_by changes.
*/
@@ -1069,32 +1072,47 @@ replorigin_get_progress(RepOriginId node, bool flush)
return remote_lsn;
}
-/*
- * Tear down a (possibly) configured session replication origin during process
- * exit.
- */
+/* Helpful function to reset the session replication origin */
static void
-ReplicationOriginExitCleanup(int code, Datum arg)
+replorigin_session_reset_internal(void)
{
- ConditionVariable *cv = NULL;
+ ConditionVariable *cv;
- if (session_replication_state == NULL)
- return;
+ Assert(session_replication_state != NULL);
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
- if (session_replication_state->acquired_by == MyProcPid)
- {
- cv = &session_replication_state->origin_cv;
+ Assert(session_replication_state->refcount > 0);
+ /*
+ * Reset the PID only if the current backend is the first to set up this
+ * origin. This prevents resetting the PID when other backends are still
+ * using this origin.
+ */
+ if (session_replication_state->acquired_by == MyProcPid)
session_replication_state->acquired_by = 0;
- session_replication_state = NULL;
- }
+
+ session_replication_state->refcount--;
+
+ cv = &session_replication_state->origin_cv;
+ session_replication_state = NULL;
LWLockRelease(ReplicationOriginLock);
- if (cv)
- ConditionVariableBroadcast(cv);
+ ConditionVariableBroadcast(cv);
+}
+
+/*
+ * Tear down a (possibly) configured session replication origin during process
+ * exit.
+ */
+static void
+ReplicationOriginExitCleanup(int code, Datum arg)
+{
+ if (session_replication_state == NULL)
+ return;
+
+ replorigin_session_reset_internal();
}
/*
@@ -1205,9 +1223,17 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
Assert(session_replication_state->roident != InvalidRepOriginId);
if (acquired_by == 0)
+ {
session_replication_state->acquired_by = MyProcPid;
+ Assert(session_replication_state->refcount == 0);
+ }
else
+ {
Assert(session_replication_state->acquired_by == acquired_by);
+ Assert(session_replication_state->refcount > 0);
+ }
+
+ session_replication_state->refcount++;
LWLockRelease(ReplicationOriginLock);
@@ -1224,8 +1250,6 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
void
replorigin_session_reset(void)
{
- ConditionVariable *cv;
-
Assert(max_active_replication_origins != 0);
if (session_replication_state == NULL)
@@ -1233,15 +1257,21 @@ replorigin_session_reset(void)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("no replication origin is configured")));
- LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
-
- session_replication_state->acquired_by = 0;
- cv = &session_replication_state->origin_cv;
- session_replication_state = NULL;
-
- LWLockRelease(ReplicationOriginLock);
+ /*
+ * The replication origin cannot be reset if the replication origin is
+ * firstly acquired by this backend and other processes are actively using
+ * now. This can cause acquired_by to be zero and active replication origin
+ * might be dropped.
+ */
+ if (session_replication_state->acquired_by == MyProcPid &&
+ session_replication_state->refcount > 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg_plural("another process is acquiring the replication origin",
+ "other processes are acquiring the replication origin",
+ session_replication_state->refcount - 1)));
- ConditionVariableBroadcast(cv);
+ replorigin_session_reset_internal();
}
/*
--
2.47.3
v3-0002-Disallow-setting-the-replication-origin-if-it-is-.patchapplication/octet-stream; name=v3-0002-Disallow-setting-the-replication-origin-if-it-is-.patchDownload
From be56cec12647052135a35a78c0d5a87dd5eebf68 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 9 Jan 2026 19:31:57 +0900
Subject: [PATCH v3 2/2] Disallow setting the replication origin if it is being
used PID is not stored
---
src/backend/replication/logical/origin.c | 13 +++++++++++++
1 file changed, 13 insertions(+)
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index dff2c073def..f373e3df2f9 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1192,6 +1192,19 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
node, acquired_by)));
}
+ /*
+ * PID was not noted in the origin. This can happen the process
+ * originally acquired the origin exits without releasing. To make the
+ * staus clean again, other processes cannot acquire the origin till
+ * all using ones release.
+ */
+ else if (curstate->acquired_by == 0 && curstate->refcount > 0)
+ {
+ elog(ERROR,
+ "replication origin with ID %d is already active for another process",
+ node);
+ }
+
/* ok, found slot */
session_replication_state = curstate;
break;
--
2.47.3