Allow using replication origins in SQL level parallel sessions

Started by Emre Hasegeli4 months ago2 messages
#1Emre Hasegeli
emre@hasegeli.com
1 attachment(s)

Replication origins is a useful feature for external replication
systems to do conflict resolution in bi-directional replication. It's
possible for the external systems to make use of this feature using
the SQL functions pg_replication_origin_*().

pg_replication_origin_session_setup() is the one to configure the
current session. Currently, only a single version of this function is
exposed that allows one replica origin to be used only by a single
session. This limits the usefulness of this feature.

The attached patch creates another variant of this function
pg_replication_origin_session_setup(text, int). This allows the same
replica origin to be used by parallel sessions the same way logical
replication apply workers are using it.

I'll add this to the next commitfest.

Attachments:

v00-pg_replication_origin_session_setup_acquired_by.patchapplication/octet-stream; name=v00-pg_replication_origin_session_setup_acquired_by.patchDownload
diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 57ff333159f..64effa61d77 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1308,21 +1308,21 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         no such replication origin is found, <literal>NULL</literal> is
         returned.
        </para></entry>
       </row>
 
       <row>
        <entry id="pg-replication-origin-session-setup" role="func_table_entry"><para role="func_signature">
         <indexterm>
          <primary>pg_replication_origin_session_setup</primary>
         </indexterm>
-        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>acquired_by</parameter> <type>integer</type></optional> )
         <returnvalue>void</returnvalue>
        </para>
        <para>
         Marks the current session as replaying from the given
         origin, allowing replay progress to be tracked.
         Can only be used if no origin is currently selected.
         Use <function>pg_replication_origin_session_reset</function> to undo.
        </para></entry>
       </row>
 
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e50dcc..7022422988a 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1381,20 +1381,44 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
 	origin = replorigin_by_name(name, false);
 	replorigin_session_setup(origin, 0);
 
 	replorigin_session_origin = origin;
 
 	pfree(name);
 
 	PG_RETURN_VOID();
 }
 
+/*
+ * Setup a replication origin for this session acquired by another
+ */
+Datum
+pg_replication_origin_session_setup_acquired_by(PG_FUNCTION_ARGS)
+{
+	char	   *name;
+	RepOriginId origin;
+	int			acquired_by;
+
+	replorigin_check_prerequisites(true, false);
+
+	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+	acquired_by = PG_GETARG_INT32(1);
+	origin = replorigin_by_name(name, false);
+	replorigin_session_setup(origin, acquired_by);
+
+	replorigin_session_origin = origin;
+
+	pfree(name);
+
+	PG_RETURN_VOID();
+}
+
 /*
  * Reset previously setup origin in this session
  */
 Datum
 pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
 {
 	replorigin_check_prerequisites(true, false);
 
 	replorigin_session_reset();
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 118d6da1ace..7cd580164cb 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12218,20 +12218,25 @@
   descr => 'translate the replication origin\'s name to its id',
   proname => 'pg_replication_origin_oid', provolatile => 's',
   prorettype => 'oid', proargtypes => 'text',
   prosrc => 'pg_replication_origin_oid' },
 
 { oid => '6006',
   descr => 'configure session to maintain replication progress tracking for the passed in origin',
   proname => 'pg_replication_origin_session_setup', provolatile => 'v',
   proparallel => 'u', prorettype => 'void', proargtypes => 'text',
   prosrc => 'pg_replication_origin_session_setup' },
+{ oid => '6015',
+  descr => 'configure session to maintain replication progress tracking for the passed in origin',
+  proname => 'pg_replication_origin_session_setup', provolatile => 'v',
+  proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
+  prosrc => 'pg_replication_origin_session_setup_acquired_by' },
 
 { oid => '6007', descr => 'teardown configured replication progress tracking',
   proname => 'pg_replication_origin_session_reset', provolatile => 'v',
   proparallel => 'u', prorettype => 'void', proargtypes => '',
   prosrc => 'pg_replication_origin_session_reset' },
 
 { oid => '6008',
   descr => 'is a replication origin configured in this session',
   proname => 'pg_replication_origin_session_is_setup', provolatile => 'v',
   proparallel => 'r', prorettype => 'bool', proargtypes => '',
#2Amit Kapila
amit.kapila16@gmail.com
In reply to: Emre Hasegeli (#1)
Re: Allow using replication origins in SQL level parallel sessions

On Thu, Sep 4, 2025 at 11:32 PM Emre Hasegeli <emre@hasegeli.com> wrote:

Replication origins is a useful feature for external replication
systems to do conflict resolution in bi-directional replication. It's
possible for the external systems to make use of this feature using
the SQL functions pg_replication_origin_*().

pg_replication_origin_session_setup() is the one to configure the
current session. Currently, only a single version of this function is
exposed that allows one replica origin to be used only by a single
session. This limits the usefulness of this feature.

The attached patch creates another variant of this function
pg_replication_origin_session_setup(text, int). This allows the same
replica origin to be used by parallel sessions the same way logical
replication apply workers are using it.

We are already discussing the same feature in an email thread [1]/messages/by-id/CAMPB6wfe4zLjJL8jiZV5kjjpwBM2=rTRme0UCL7Ra4L8MTVdOg@mail.gmail.com. Can
you check that and share your inputs there?

[1]: /messages/by-id/CAMPB6wfe4zLjJL8jiZV5kjjpwBM2=rTRme0UCL7Ra4L8MTVdOg@mail.gmail.com

--
With Regards,
Amit Kapila.