Allow logical replication in the same cluster
Hi Hacker,
I was doing some test about logical replication a few days ago. When I
tried to setup a logical replication on my Macbook.
The basic workflow is simple:
```
Step 1: edit postgresql.conf and set:
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
Step 2: create two databases for pub and sub
% createdb pubdb
% createdb subdb
Step 3: create a table in pubdb, and create a publication
pubdb=# CREATE TABLE t (id int primary key, data text);
CREATE TABLE
pubdb=# INSERT INTO t VALUES (1, 'hello from pub');
INSERT 0 1
pubdb=# CREATE PUBLICATION mypub FOR TABLE t;
CREATE PUBLICATION
Step 4: create the same table in subdb
subdb=# CREATE TABLE t (id int primary key, data text);
CREATE TABLE
Step 5: create subscription in subdb
subdb=# CREATE SUBSCRIPTION mysub CONNECTION 'host=localhost dbname=pubdb'
PUBLICATION mypub; <==== stuck here
```
In step 5, "CREATE SUBSCRIPTION" got stuck. Then I found the issue had been
discussed with [1]/messages/by-id/20170426165954.GK14000@momjian.us in 2017, but no more effort had been spent resolving the
issue.
Then I investigated the root cause. Feels like this is a architectural
problem. Because both pubdb and subdb run in the same cluster, so they
share the same transaction id serial.
In step 5, when subdb "CREATE SUBSCRIPTION", say the transaction id is 100,
what the backend worker process does is like:
1) start a xact (100)
2) insert a tuple into pg_subscription
3) request pub side to create a sub slot and wait for the result
4) commit
When the pub side receives the request to create a replication slot, it
needs to check no running transactions. However, xact 100 is running and
waiting for replication slot creation to finish. This is a deadlock, and
the deadlock exists only when pub and sub are in the same cluster.
To resolve the problem, the key is to let pub side know sub side is local
within the same cluster.
From sub side, "CREATE SUBSCRIPTION" statement uses a connection string to
specify where is publisher. It is impossible to decide if pub is within the
same cluster from the connection string.
From pub side, sub side uses libpq to send a "CREATE REPLICATION SLOT"
command to pub side, without modifying libpq, there is no way to let the
command carry more information. So pub side has no way to know if sub is in
the same cluster from the command.
So, I decided an "opt-in" solution. We can add a "local=true" option, so
sub side runs:
```
subdb=# CREATE SUBSCRIPTION mysub2 CONNECTION 'host=localhost dbname=pubdb'
PUBLICATION mypub WITH (local=true);
```
When "local=true" is set, subdb's backend end worker process stores the
current transaction id as well as its process id into a shared memory area
before sending "CREATE REPLICATION SLOT" command to pub side.
In pub side, if the shared memory area exists, and the xact id stored in
the shared memory area is the only ongoing xact, then it can skip waiting
for it. Then the deadlock is avoided.
```
subdb=# CREATE SUBSCRIPTION mysub2 CONNECTION 'host=localhost dbname=pubdb'
PUBLICATION mypub WITH (local=true);
NOTICE: created replication slot "mysub2" on publisher
CREATE SUBSCRIPTION
# data are properly replicated
subdb=# select * from t;
id | data
----+----------------
1 | hello from pub
2 | 2
3 | 3
(3 rows)
```
I think this solution is relatively safe. Because without specifying
"local=true", this patch will not impact anything. And a local logical
replication is not something should be done in a production environment.
This patch has a limitation. In sub side, you cannot run multiple "CREATE
SUBSCRIPTION WITH (local=true)" concurrently, otherwise they may still
trigger deadlock. But I don't think this is a big issue, because the
"local=true" suppose to be only used in dev environment, and we can clearly
state the limitation in doc.
The attached patch is unpolished. You may download and test it. Please
mainly focus on the design for now. Once the design is agreed by the
community, I will polish the change, add tests and update the doc.
[1]: /messages/by-id/20170426165954.GK14000@momjian.us
/messages/by-id/20170426165954.GK14000@momjian.us
Chao Li (Evan)
---------------------
HighGo Software Co., Ltd.
https://www.highgo.com/
Attachments:
v1-0001-Allow-logical-replication-in-the-same-cluster.patchapplication/octet-stream; name=v1-0001-Allow-logical-replication-in-the-same-cluster.patchDownload
From 684b35a9a4c62f60a405ff32f1b14fb90b4f025e Mon Sep 17 00:00:00 2001
From: "Chao Li (Evan)" <lic@highgo.com>
Date: Fri, 5 Sep 2025 10:42:24 +0800
Subject: [PATCH v1] Allow logical replication in the same cluster
Thare is a known issue discussed by [1] in 2017 where creating a
logical replication in the same cluster will fail.
This patch provides a solution by adding a new option "local=true"
to "CREATE SUBSCRIPTION".
[1] https://www.postgresql.org/message-id/20170426165954.GK14000%40momjian.us
Author: Chao Li <lic@highgo.com>
---
src/backend/commands/subscriptioncmds.c | 51 ++++++++++++++++++-
src/backend/replication/logical/snapbuild.c | 54 ++++++++++++++++++++-
src/include/replication/logical.h | 5 ++
3 files changed, 106 insertions(+), 4 deletions(-)
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 82cf65fae73..28fa1b068a3 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -37,6 +37,7 @@
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "pgstat.h"
+#include "replication/logical.h"
#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
@@ -75,6 +76,7 @@
#define SUBOPT_MAX_RETENTION_DURATION 0x00008000
#define SUBOPT_LSN 0x00010000
#define SUBOPT_ORIGIN 0x00020000
+#define SUBOPT_LOCAL 0x00040000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -101,6 +103,7 @@ typedef struct SubOpts
bool runasowner;
bool failover;
bool retaindeadtuples;
+ bool local;
int32 maxretention;
char *origin;
XLogRecPtr lsn;
@@ -118,7 +121,7 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub,
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
static void CheckAlterSubOption(Subscription *sub, const char *option,
bool slot_needs_update, bool isTopLevel);
-
+static void mark_local_subscription_creation(bool set);
/*
* Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -170,6 +173,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->failover = false;
if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
opts->retaindeadtuples = false;
+ if (IsSet(supported_opts, SUBOPT_LOCAL))
+ opts->local = false;
if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
opts->maxretention = 0;
if (IsSet(supported_opts, SUBOPT_ORIGIN))
@@ -385,6 +390,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_LSN;
opts->lsn = lsn;
}
+ else if (IsSet(supported_opts, SUBOPT_LOCAL) &&
+ strcmp(defel->defname, "local") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_LOCAL))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_LOCAL;
+ opts->local = defGetBoolean(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -593,7 +607,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
SUBOPT_RETAIN_DEAD_TUPLES |
- SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
+ SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN |
+ SUBOPT_LOCAL);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -828,9 +843,15 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
if (opts.twophase && !opts.copy_data && tables != NIL)
twophase_enabled = true;
+ if (opts.local)
+ mark_local_subscription_creation(true);
+
walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
+ if (opts.local)
+ mark_local_subscription_creation(false);
+
if (twophase_enabled)
UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
@@ -2893,3 +2914,29 @@ defGetStreamingMode(DefElem *def)
def->defname)));
return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
}
+
+static void
+mark_local_subscription_creation(bool set)
+{
+ bool found;
+ LogicalLocalCreateSubscriptioinXactInfo *plx;
+ plx = ShmemInitStruct("pg_command_subscription_local",
+ sizeof(LogicalLocalCreateSubscriptioinXactInfo),
+ &found);
+
+ /* when unset, the memory must be found */
+ Assert(!set && !found);
+
+ if (set)
+ {
+ plx->backend_proc = MyProcNumber;
+ plx->xid = GetCurrentTransactionId();
+ ereport(LOG,
+ (errmsg("marked local subscription creation xid %d", plx->xid)));
+ }
+ else
+ {
+ plx->backend_proc = INVALID_PROC_NUMBER;
+ plx->xid = InvalidTransactionId;
+ }
+}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 98ddee20929..e2afca2beb3 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -175,6 +175,9 @@ static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path);
+static bool is_xact_local_subscription_creation(TransactionId xid);
+static bool is_xact_local_subscription_creation_only_running(TransactionId oldestRunningXid, TransactionId nextXid);
+
/*
* Allocate a new snapshot builder.
*
@@ -1291,7 +1294,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
* NB: We might have already started to incrementally assemble a snapshot,
* so we need to be careful to deal with that.
*/
- if (running->oldestRunningXid == running->nextXid)
+ if (running->oldestRunningXid == running->nextXid ||
+ is_xact_local_subscription_creation_only_running(running->oldestRunningXid, running->nextXid))
{
if (builder->start_decoding_at == InvalidXLogRecPtr ||
builder->start_decoding_at <= lsn)
@@ -1299,7 +1303,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->start_decoding_at = lsn + 1;
/* As no transactions were running xmin/xmax can be trivially set. */
- builder->xmin = running->nextXid; /* < are finished */
+ builder->xmin = running->oldestRunningXid; /* < are finished */
builder->xmax = running->nextXid; /* >= are running */
/* so we can safely use the faster comparisons */
@@ -1448,6 +1452,9 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
if (TransactionIdIsCurrentTransactionId(xid))
elog(ERROR, "waiting for ourselves");
+ if (is_xact_local_subscription_creation(xid))
+ continue;
+
if (TransactionIdFollows(xid, cutoff))
continue;
@@ -2074,3 +2081,46 @@ SnapBuildSnapshotExists(XLogRecPtr lsn)
return ret == 0;
}
+
+/*
+ * Check if the transaction with id 'xid' is a local subscription creation
+ * transaction.
+ */
+static bool
+is_xact_local_subscription_creation(TransactionId xid)
+{
+ bool found;
+ PgBackendStatus *backendState;
+ LogicalLocalCreateSubscriptioinXactInfo *plx;
+
+ plx = ShmemInitStruct("pg_command_subscription_local",
+ sizeof(LogicalLocalCreateSubscriptioinXactInfo),
+ &found);
+ if (!found) {
+ return false;
+ }
+
+ if (plx->xid != xid) {
+ return true;
+ }
+
+ backendState = pgstat_get_beentry_by_proc_number(plx->backend_proc);
+ if (backendState != NULL && backendState->st_state == STATE_RUNNING) {
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Check if the only running transaction is a local subscription creation
+ * transaction.
+ */
+static bool
+is_xact_local_subscription_creation_only_running(TransactionId oldestRunningXid, TransactionId nextXid)
+{
+ if (oldestRunningXid + 1 != nextXid) {
+ return false;
+ }
+ return is_xact_local_subscription_creation(oldestRunningXid);
+}
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 2e562bee5a9..980c236bc27 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -114,6 +114,11 @@ typedef struct LogicalDecodingContext
bool processing_required;
} LogicalDecodingContext;
+typedef struct LogicalLocalCreateSubscriptioinXactInfo
+{
+ TransactionId xid;
+ ProcNumber backend_proc;
+} LogicalLocalCreateSubscriptioinXactInfo;
extern void CheckLogicalDecodingRequirements(void);
--
2.39.5 (Apple Git-154)
On Fri, Sep 5, 2025 at 8:21 AM Chao Li <li.evan.chao@gmail.com> wrote:
I was doing some test about logical replication a few days ago. When I tried to setup a logical replication on my Macbook.
The basic workflow is simple:
```
Step 1: edit postgresql.conf and set:wal_level = logical
max_replication_slots = 4
max_wal_senders = 4Step 2: create two databases for pub and sub
% createdb pubdb
% createdb subdbStep 3: create a table in pubdb, and create a publication
pubdb=# CREATE TABLE t (id int primary key, data text);
CREATE TABLEpubdb=# INSERT INTO t VALUES (1, 'hello from pub');
INSERT 0 1pubdb=# CREATE PUBLICATION mypub FOR TABLE t;
CREATE PUBLICATIONStep 4: create the same table in subdb
subdb=# CREATE TABLE t (id int primary key, data text);
CREATE TABLEStep 5: create subscription in subdb
subdb=# CREATE SUBSCRIPTION mysub CONNECTION 'host=localhost dbname=pubdb' PUBLICATION mypub; <==== stuck here
```In step 5, "CREATE SUBSCRIPTION" got stuck. Then I found the issue had been discussed with [1] in 2017, but no more effort had been spent resolving the issue.
Then I investigated the root cause. Feels like this is a architectural problem. Because both pubdb and subdb run in the same cluster, so they share the same transaction id serial.
In step 5, when subdb "CREATE SUBSCRIPTION", say the transaction id is 100, what the backend worker process does is like:
1) start a xact (100)
2) insert a tuple into pg_subscription
3) request pub side to create a sub slot and wait for the result
4) commitWhen the pub side receives the request to create a replication slot, it needs to check no running transactions. However, xact 100 is running and waiting for replication slot creation to finish. This is a deadlock, and the deadlock exists only when pub and sub are in the same cluster.
You can avoid this problem by creating a slot first on publisher with
something like:
postgres=# select pg_create_logical_replication_slot('s1', 'pgoutput',
false, true);
pg_create_logical_replication_slot
------------------------------------
(s1,0/01BFF178)
(1 row)
Then while creating subscription you can use the above created slot as follows:
db1=# create subscription sub1 connection 'dbname=postgres'
publication pub1 WITH(create_slot=false, slot_name='s1');
CREATE SUBSCRIPTION
--
With Regards,
Amit Kapila.
Hi Amit,
Thanks for your info.
On Sep 6, 2025, at 12:32, Amit Kapila <amit.kapila16@gmail.com> wrote:
You can avoid this problem by creating a slot first on publisher with
something like:
postgres=# select pg_create_logical_replication_slot('s1', 'pgoutput',
false, true);
pg_create_logical_replication_slot
------------------------------------
(s1,0/01BFF178)
(1 row)Then while creating subscription you can use the above created slot as follows:
db1=# create subscription sub1 connection 'dbname=postgres'
publication pub1 WITH(create_slot=false, slot_name='s1');
CREATE SUBSCRIPTION--
With Regards,
Amit Kapila.
I am aware of this workaround solution. When I encountered the “create subscription” stuck problem, I did a Google search, then I found the old discussion thread that mentioned a workaround, so I did further search and eventually figured out the workaround.
I think it’s worth resolving this issue, because it avoids confusion from people who want to try or test logical replication logically, and allows the same script to run in both an pure local environment and a real non-local environment. WDYT?
Best regards,
--
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/
On Mon, Sep 8, 2025 at 8:20 AM Chao Li <li.evan.chao@gmail.com> wrote:
Hi Amit,
Thanks for your info.
On Sep 6, 2025, at 12:32, Amit Kapila <amit.kapila16@gmail.com> wrote:
You can avoid this problem by creating a slot first on publisher with
something like:
postgres=# select pg_create_logical_replication_slot('s1', 'pgoutput',
false, true);
pg_create_logical_replication_slot
------------------------------------
(s1,0/01BFF178)
(1 row)Then while creating subscription you can use the above created slot as follows:
db1=# create subscription sub1 connection 'dbname=postgres'
publication pub1 WITH(create_slot=false, slot_name='s1');
CREATE SUBSCRIPTION--
With Regards,
Amit Kapila.I am aware of this workaround solution. When I encountered the “create subscription” stuck problem, I did a Google search, then I found the old discussion thread that mentioned a workaround, so I did further search and eventually figured out the workaround.
I think it’s worth resolving this issue, because it avoids confusion from people who want to try or test logical replication logically, and allows the same script to run in both an pure local environment and a real non-local environment. WDYT?
I don't see why someone can't use the workaround provided. The
solution proposed is more like adding hacks in the code making it a
maintenance burden.
--
With Regards,
Amit Kapila.