[PATCH] Fix replica identity mismatch for partitioned tables with publish_via_partition_root
Hi hackers,
An inconsistency was observed when using logical replication on partitioned
tables with the option `publish_via_partition_root = true`: if REPLICA IDENTITY
FULL is set only on the parent table, but not on all partitions, logical
decoding emits UPDATE and DELETE messages with tag 'O' (old tuple) even for
partitions that do not have full replica identity. In those cases, only the
primary key columns are included in the message, which contradicts the expected
meaning of 'O' and violates the logical replication message protocol:
https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html
This can cause issues in downstream consumers, which interpret
the 'O' tag as implying that a full tuple is present.
The attached patch resolves the inconsistency by selecting the correct tuple
type ('O' vs 'K') based on the replica identity of the actual leaf relation
being published, rather than using the setting of the root relation alone.
As a result, the format of logical replication messages aligns with
the semantics
defined by the protocol.
Steps to reproduce:
1. Create a partitioned table with REPLICA IDENTITY FULL on the parent
and only one of the partitions.
2. Create a publication with `publish_via_partition_root = true`.
3. Perform INSERT, UPDATE, DELETE operations through the root table.
4. Observe via `pg_recvlogical` that for a partition without full replica
identity, the logical replication stream contains 'O' records with
only key fields.
After applying the patch, 'O' is used only when the full row is available,
and 'K' is used otherwise - as expected.
This patch is based on the current `master` branch as of commit: b3754dcc9ff
Best regards,
Mikhail Kharitonov
Attachments:
0001-Fix-replica-identity-flags-for-partitioned-tables.patchapplication/octet-stream; name=0001-Fix-replica-identity-flags-for-partitioned-tables.patchDownload
From 48e288241b4c79581ecdd473cceac15985c8c681 Mon Sep 17 00:00:00 2001
From: Mikhail Kharitonov <mikhail.kharitonov.dev@gmail.com>
Date: Mon, 5 May 2025 10:14:37 +0300
Subject: [PATCH] Fix replica identity flags for partitioned tables with
publish_via_partition_root
When using publish_via_partition_root and REPLICA IDENTITY FULL is set only
on the root partitioned table (not all leaf partitions), UPDATE and DELETE
messages may incorrectly use the 'O' (old tuple) tag, even though only the
primary key fields are sent.
This leads to protocol inconsistencies, since the subscriber interprets the
message as containing a full tuple, while it does not.
This patch ensures that the correct replica identity tag ('O' or 'K') is used
based on the actual leaf partition's REPLICA IDENTITY setting, rather than
the root.
---
src/backend/replication/logical/proto.c | 32 +++++++++++----------
src/backend/replication/pgoutput/pgoutput.c | 4 +--
src/include/replication/logicalproto.h | 6 ++--
3 files changed, 23 insertions(+), 19 deletions(-)
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1a352b542dc..935a39faf20 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -447,36 +447,37 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
* Write UPDATE to the output stream.
*/
void
-logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
+logicalrep_write_update(StringInfo out, TransactionId xid,
+ Relation real_relation, Relation send_as,
TupleTableSlot *oldslot, TupleTableSlot *newslot,
bool binary, Bitmapset *columns,
PublishGencolsType include_gencols_type)
{
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
- Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
- rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
- rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+ Assert(real_relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+ real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+ real_relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
/* use Oid as relation identifier */
- pq_sendint32(out, RelationGetRelid(rel));
+ pq_sendint32(out, RelationGetRelid(send_as));
if (oldslot != NULL)
{
- if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ if (real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+ logicalrep_write_tuple(out, send_as, oldslot, binary, columns,
include_gencols_type);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newslot, binary, columns,
+ logicalrep_write_tuple(out, send_as, newslot, binary, columns,
include_gencols_type);
}
@@ -525,14 +526,15 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
* Write DELETE to the output stream.
*/
void
-logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
+logicalrep_write_delete(StringInfo out, TransactionId xid,
+ Relation real_relation, Relation send_as,
TupleTableSlot *oldslot, bool binary,
Bitmapset *columns,
PublishGencolsType include_gencols_type)
{
- Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
- rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
- rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+ Assert(real_relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+ real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+ real_relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
@@ -541,14 +543,14 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
pq_sendint32(out, xid);
/* use Oid as relation identifier */
- pq_sendint32(out, RelationGetRelid(rel));
+ pq_sendint32(out, RelationGetRelid(send_as));
- if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ if (real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+ logicalrep_write_tuple(out, send_as, oldslot, binary, columns,
include_gencols_type);
}
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 693a766e6d7..7c15be3894b 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1587,12 +1587,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
relentry->include_gencols_type);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
- logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
+ logicalrep_write_update(ctx->out, xid, relation, targetrel, old_slot,
new_slot, data->binary, relentry->columns,
relentry->include_gencols_type);
break;
case REORDER_BUFFER_CHANGE_DELETE:
- logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
+ logicalrep_write_delete(ctx->out, xid, relation, targetrel, old_slot,
data->binary, relentry->columns,
relentry->include_gencols_type);
break;
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index b261c60d3fa..8940c31b4f1 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -228,7 +228,8 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
PublishGencolsType include_gencols_type);
extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
extern void logicalrep_write_update(StringInfo out, TransactionId xid,
- Relation rel, TupleTableSlot *oldslot,
+ Relation real_relation, Relation send_as,
+ TupleTableSlot *oldslot,
TupleTableSlot *newslot, bool binary,
Bitmapset *columns,
PublishGencolsType include_gencols_type);
@@ -236,7 +237,8 @@ extern LogicalRepRelId logicalrep_read_update(StringInfo in,
bool *has_oldtuple, LogicalRepTupleData *oldtup,
LogicalRepTupleData *newtup);
extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
- Relation rel, TupleTableSlot *oldslot,
+ Relation real_relation, Relation send_as,
+ TupleTableSlot *oldslot,
bool binary, Bitmapset *columns,
PublishGencolsType include_gencols_type);
extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
--
2.34.1
Hi!
This is probably not the most familiar part of Postgres to me, but does it
break anything? Or is it just inconsistency in the replication protocol?
A test for the described scenario would be a great addition. And, if it is
feasible, provide an example of what would be broken with the way
partitioned tables are replicated now.
There is a chance that the replication protocol for partitioned tables
needs to be rewritten, and I sincerely hope that I am wrong about this. It
seems Alvaro Herrera tried this here [0]/messages/by-id/201902041630.gpadougzab7v@alvherre.pgsql.
[0]: /messages/by-id/201902041630.gpadougzab7v@alvherre.pgsql
/messages/by-id/201902041630.gpadougzab7v@alvherre.pgsql
--
Best regards,
Maxim Orlov.
Hi,
Thank you for the feedback.
I would like to clarify that the current behavior does not break replication
between PostgreSQL instances. The logical replication stream is still accepted
by the subscriber, and the data is applied correctly. However, the protocol
semantics are violated, which may cause issues for external systems that rely
on interpreting this stream.
When using publish_via_partition_root = true and setting REPLICA IDENTITY FULL
only on the parent table (but not on all partitions), logical replication
generates messages with the tag 'O' (old tuple) for updates and deletes even
for partitions that do not have full identity configured.
In those cases, only key columns are sent, and the rest of the tuple is omitted.
This contradicts the meaning of tag 'O', which, according
to the documentation [1]https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html, indicates that the full old tuple is included.
This behavior is safe for the standard PostgreSQL subscriber, which does not
rely on the tag when applying changes. However, third-party tools that consume
the logical replication stream and follow the protocol strictly can be misled.
For example, one of our clients uses a custom CDC mechanism that extracts
changes and sends them to Oracle. Their handler interprets the 'O' tag as a
signal that the full old row is available. When it is not - the data is
processed incorrectly.
The attached patch changes the behavior so that the 'O' or 'K' tag is chosen
based on the REPLICA IDENTITY setting of the actual partition where the row
ends up not only the parent.
- If the partition has REPLICA IDENTITY FULL, the full tuple is
sent and tagged 'O'.
- Otherwise, only the key columns are sent, and the tag 'K' is used.
This aligns the behavior with the protocol documentation.
I have also included a TAP test: 036_partition_replica_identity.pl,
located in src/test/subscription/t/
It demonstrates two cases:
- An update/delete on a partition with REPLICA IDENTITY FULL correctly
emits an 'O' tag with the full old row.
- An update/delete on a partition without REPLICA IDENTITY FULL currently
also emits an 'O' tag, but only with key fields - this is the problem.
After applying the patch, the second case correctly uses the 'K' tag.
This patch is a minimal change it does not alter protocol structure
or introduce new behavior. It only ensures the implementation matches
the documentation. In the future, we might consider a broader redesign
of logical replication for partitioned tables (see [2]/messages/by-id/201902041630.gpadougzab7v@alvherre.pgsql), but this is
a narrow fix that solves a real inconsistency.
Looking forward to your comments.
Best regards,
Mikhail Kharitonov
[1]: https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html
[2]: /messages/by-id/201902041630.gpadougzab7v@alvherre.pgsql
Show quoted text
On Mon, May 12, 2025 at 5:25 PM Maxim Orlov <orlovmg@gmail.com> wrote:
Hi!
This is probably not the most familiar part of Postgres to me, but does it break anything? Or is it just inconsistency in the replication protocol?
A test for the described scenario would be a great addition. And, if it is feasible, provide an example of what would be broken with the way partitioned tables are replicated now.
There is a chance that the replication protocol for partitioned tables needs to be rewritten, and I sincerely hope that I am wrong about this. It seems Alvaro Herrera tried this here [0].
[0] /messages/by-id/201902041630.gpadougzab7v@alvherre.pgsql
--
Best regards,
Maxim Orlov.
Attachments:
0001-Fix-replica-identity-flags-for-partitioned-tables (2).patchapplication/octet-stream; name="0001-Fix-replica-identity-flags-for-partitioned-tables (2).patch"Download
From 48e288241b4c79581ecdd473cceac15985c8c681 Mon Sep 17 00:00:00 2001
From: Mikhail Kharitonov <mikhail.kharitonov.dev@gmail.com>
Date: Mon, 5 May 2025 10:14:37 +0300
Subject: [PATCH] Fix replica identity flags for partitioned tables with
publish_via_partition_root
When using publish_via_partition_root and REPLICA IDENTITY FULL is set only
on the root partitioned table (not all leaf partitions), UPDATE and DELETE
messages may incorrectly use the 'O' (old tuple) tag, even though only the
primary key fields are sent.
This leads to protocol inconsistencies, since the subscriber interprets the
message as containing a full tuple, while it does not.
This patch ensures that the correct replica identity tag ('O' or 'K') is used
based on the actual leaf partition's REPLICA IDENTITY setting, rather than
the root.
---
src/backend/replication/logical/proto.c | 32 +++++++++++----------
src/backend/replication/pgoutput/pgoutput.c | 4 +--
src/include/replication/logicalproto.h | 6 ++--
3 files changed, 23 insertions(+), 19 deletions(-)
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1a352b542dc..935a39faf20 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -447,36 +447,37 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
* Write UPDATE to the output stream.
*/
void
-logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
+logicalrep_write_update(StringInfo out, TransactionId xid,
+ Relation real_relation, Relation send_as,
TupleTableSlot *oldslot, TupleTableSlot *newslot,
bool binary, Bitmapset *columns,
PublishGencolsType include_gencols_type)
{
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
- Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
- rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
- rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+ Assert(real_relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+ real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+ real_relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
/* use Oid as relation identifier */
- pq_sendint32(out, RelationGetRelid(rel));
+ pq_sendint32(out, RelationGetRelid(send_as));
if (oldslot != NULL)
{
- if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ if (real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+ logicalrep_write_tuple(out, send_as, oldslot, binary, columns,
include_gencols_type);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newslot, binary, columns,
+ logicalrep_write_tuple(out, send_as, newslot, binary, columns,
include_gencols_type);
}
@@ -525,14 +526,15 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
* Write DELETE to the output stream.
*/
void
-logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
+logicalrep_write_delete(StringInfo out, TransactionId xid,
+ Relation real_relation, Relation send_as,
TupleTableSlot *oldslot, bool binary,
Bitmapset *columns,
PublishGencolsType include_gencols_type)
{
- Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
- rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
- rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+ Assert(real_relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+ real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+ real_relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
@@ -541,14 +543,14 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
pq_sendint32(out, xid);
/* use Oid as relation identifier */
- pq_sendint32(out, RelationGetRelid(rel));
+ pq_sendint32(out, RelationGetRelid(send_as));
- if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ if (real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+ logicalrep_write_tuple(out, send_as, oldslot, binary, columns,
include_gencols_type);
}
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 693a766e6d7..7c15be3894b 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1587,12 +1587,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
relentry->include_gencols_type);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
- logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
+ logicalrep_write_update(ctx->out, xid, relation, targetrel, old_slot,
new_slot, data->binary, relentry->columns,
relentry->include_gencols_type);
break;
case REORDER_BUFFER_CHANGE_DELETE:
- logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
+ logicalrep_write_delete(ctx->out, xid, relation, targetrel, old_slot,
data->binary, relentry->columns,
relentry->include_gencols_type);
break;
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index b261c60d3fa..8940c31b4f1 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -228,7 +228,8 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
PublishGencolsType include_gencols_type);
extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
extern void logicalrep_write_update(StringInfo out, TransactionId xid,
- Relation rel, TupleTableSlot *oldslot,
+ Relation real_relation, Relation send_as,
+ TupleTableSlot *oldslot,
TupleTableSlot *newslot, bool binary,
Bitmapset *columns,
PublishGencolsType include_gencols_type);
@@ -236,7 +237,8 @@ extern LogicalRepRelId logicalrep_read_update(StringInfo in,
bool *has_oldtuple, LogicalRepTupleData *oldtup,
LogicalRepTupleData *newtup);
extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
- Relation rel, TupleTableSlot *oldslot,
+ Relation real_relation, Relation send_as,
+ TupleTableSlot *oldslot,
bool binary, Bitmapset *columns,
PublishGencolsType include_gencols_type);
extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
--
2.34.1
Hi all,
I’m sending v2 of the patch. This is a clean rebase onto current master
(commit a27893df45e) and a squash of the fix together with the TAP
test into a single patch file.
I would appreciate your thoughts and comments on the current problem.
Thank you!
--
Best regards,
Mikhail Kharitonov
On Thu, May 29, 2025 at 9:30 AM Mikhail Kharitonov
<mikhail.kharitonov.dev@gmail.com> wrote:
Show quoted text
Hi,
Thank you for the feedback.
I would like to clarify that the current behavior does not break replication
between PostgreSQL instances. The logical replication stream is still accepted
by the subscriber, and the data is applied correctly. However, the protocol
semantics are violated, which may cause issues for external systems that rely
on interpreting this stream.When using publish_via_partition_root = true and setting REPLICA IDENTITY FULL
only on the parent table (but not on all partitions), logical replication
generates messages with the tag 'O' (old tuple) for updates and deletes even
for partitions that do not have full identity configured.In those cases, only key columns are sent, and the rest of the tuple is omitted.
This contradicts the meaning of tag 'O', which, according
to the documentation [1], indicates that the full old tuple is included.This behavior is safe for the standard PostgreSQL subscriber, which does not
rely on the tag when applying changes. However, third-party tools that consume
the logical replication stream and follow the protocol strictly can be misled.
For example, one of our clients uses a custom CDC mechanism that extracts
changes and sends them to Oracle. Their handler interprets the 'O' tag as a
signal that the full old row is available. When it is not - the data is
processed incorrectly.The attached patch changes the behavior so that the 'O' or 'K' tag is chosen
based on the REPLICA IDENTITY setting of the actual partition where the row
ends up not only the parent.
- If the partition has REPLICA IDENTITY FULL, the full tuple is
sent and tagged 'O'.
- Otherwise, only the key columns are sent, and the tag 'K' is used.This aligns the behavior with the protocol documentation.
I have also included a TAP test: 036_partition_replica_identity.pl,
located in src/test/subscription/t/It demonstrates two cases:
- An update/delete on a partition with REPLICA IDENTITY FULL correctly
emits an 'O' tag with the full old row.
- An update/delete on a partition without REPLICA IDENTITY FULL currently
also emits an 'O' tag, but only with key fields - this is the problem.After applying the patch, the second case correctly uses the 'K' tag.
This patch is a minimal change it does not alter protocol structure
or introduce new behavior. It only ensures the implementation matches
the documentation. In the future, we might consider a broader redesign
of logical replication for partitioned tables (see [2]), but this is
a narrow fix that solves a real inconsistency.Looking forward to your comments.
Best regards,
Mikhail Kharitonov[1] https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html
[2] /messages/by-id/201902041630.gpadougzab7v@alvherre.pgsqlOn Mon, May 12, 2025 at 5:25 PM Maxim Orlov <orlovmg@gmail.com> wrote:
Hi!
This is probably not the most familiar part of Postgres to me, but does it break anything? Or is it just inconsistency in the replication protocol?
A test for the described scenario would be a great addition. And, if it is feasible, provide an example of what would be broken with the way partitioned tables are replicated now.
There is a chance that the replication protocol for partitioned tables needs to be rewritten, and I sincerely hope that I am wrong about this. It seems Alvaro Herrera tried this here [0].
[0] /messages/by-id/201902041630.gpadougzab7v@alvherre.pgsql
--
Best regards,
Maxim Orlov.
On Tue, Jul 8, 2025 at 11:53 AM Mikhail Kharitonov
<mikhail.kharitonov.dev@gmail.com> wrote:
Hi all,
I’m sending v2 of the patch. This is a clean rebase onto current master
(commit a27893df45e) and a squash of the fix together with the TAP
test into a single patch file.I would appreciate your thoughts and comments on the current problem.
Thank you!
--
Best regards,
Mikhail Kharitonov
Sorry, I forgot the attachment in my previous message.
Please find the v2 patch attached.
Attachments:
0002-replica_identity.patchapplication/octet-stream; name=0002-replica_identity.patchDownload
From 2ea131eb052c0e4c1c78f011d156e795b5640a58 Mon Sep 17 00:00:00 2001
From: Mikhail Kharitonov <m.kharitonov@postgrespro.ru>
Date: Tue, 8 Jul 2025 09:27:56 +0300
Subject: [PATCH v2] Logical replication: fix replica identity flag for
partitioned tables
With publish_via_partition_root = true, UPDATE and DELETE rows that land in a
partition without REPLICA IDENTITY FULL were sent with tuple flag 'O' (old
tuple) even though only primary-key columns were included. This violates the
logical-replication protocol and can confuse external CDC tools that rely
on the flag.
The flag is now chosen according to the replica identity of the partition
that actually stores the row: partitions with REPLICA IDENTITY FULL still use
'O', all others use 'K'.
A TAP test (src/test/subscription/t/036_partition_replica_identity.pl)
covers both cases.
Author: Mikhail Kharitonov <mikhail.kharitonov.dev@gmail.com>
Discussion: https://www.postgresql.org/message-id/CAKkoVatDVuH%3DAcnecrZSOQ0_Md6RfW4ExvCKJzJJ4JTrX1wbxQ%40mail.gmail.com
---
src/backend/replication/logical/proto.c | 32 +++--
src/backend/replication/pgoutput/pgoutput.c | 4 +-
src/include/replication/logicalproto.h | 6 +-
.../t/036_partition_replica_identity.pl | 135 ++++++++++++++++++
4 files changed, 158 insertions(+), 19 deletions(-)
create mode 100755 src/test/subscription/t/036_partition_replica_identity.pl
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1a352b542dc..935a39faf20 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -447,36 +447,37 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
* Write UPDATE to the output stream.
*/
void
-logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
+logicalrep_write_update(StringInfo out, TransactionId xid,
+ Relation real_relation, Relation send_as,
TupleTableSlot *oldslot, TupleTableSlot *newslot,
bool binary, Bitmapset *columns,
PublishGencolsType include_gencols_type)
{
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
- Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
- rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
- rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+ Assert(real_relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+ real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+ real_relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
/* use Oid as relation identifier */
- pq_sendint32(out, RelationGetRelid(rel));
+ pq_sendint32(out, RelationGetRelid(send_as));
if (oldslot != NULL)
{
- if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ if (real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+ logicalrep_write_tuple(out, send_as, oldslot, binary, columns,
include_gencols_type);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newslot, binary, columns,
+ logicalrep_write_tuple(out, send_as, newslot, binary, columns,
include_gencols_type);
}
@@ -525,14 +526,15 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
* Write DELETE to the output stream.
*/
void
-logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
+logicalrep_write_delete(StringInfo out, TransactionId xid,
+ Relation real_relation, Relation send_as,
TupleTableSlot *oldslot, bool binary,
Bitmapset *columns,
PublishGencolsType include_gencols_type)
{
- Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
- rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
- rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+ Assert(real_relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+ real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+ real_relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
@@ -541,14 +543,14 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
pq_sendint32(out, xid);
/* use Oid as relation identifier */
- pq_sendint32(out, RelationGetRelid(rel));
+ pq_sendint32(out, RelationGetRelid(send_as));
- if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ if (real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+ logicalrep_write_tuple(out, send_as, oldslot, binary, columns,
include_gencols_type);
}
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 082b4d9d327..261be60b8f4 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1587,12 +1587,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
relentry->include_gencols_type);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
- logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
+ logicalrep_write_update(ctx->out, xid, relation, targetrel, old_slot,
new_slot, data->binary, relentry->columns,
relentry->include_gencols_type);
break;
case REORDER_BUFFER_CHANGE_DELETE:
- logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
+ logicalrep_write_delete(ctx->out, xid, relation, targetrel, old_slot,
data->binary, relentry->columns,
relentry->include_gencols_type);
break;
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index b261c60d3fa..8940c31b4f1 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -228,7 +228,8 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
PublishGencolsType include_gencols_type);
extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
extern void logicalrep_write_update(StringInfo out, TransactionId xid,
- Relation rel, TupleTableSlot *oldslot,
+ Relation real_relation, Relation send_as,
+ TupleTableSlot *oldslot,
TupleTableSlot *newslot, bool binary,
Bitmapset *columns,
PublishGencolsType include_gencols_type);
@@ -236,7 +237,8 @@ extern LogicalRepRelId logicalrep_read_update(StringInfo in,
bool *has_oldtuple, LogicalRepTupleData *oldtup,
LogicalRepTupleData *newtup);
extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
- Relation rel, TupleTableSlot *oldslot,
+ Relation real_relation, Relation send_as,
+ TupleTableSlot *oldslot,
bool binary, Bitmapset *columns,
PublishGencolsType include_gencols_type);
extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
diff --git a/src/test/subscription/t/036_partition_replica_identity.pl b/src/test/subscription/t/036_partition_replica_identity.pl
new file mode 100755
index 00000000000..f69bca5ebe2
--- /dev/null
+++ b/src/test/subscription/t/036_partition_replica_identity.pl
@@ -0,0 +1,135 @@
+# Test logical replication with publish_via_partition_root,
+# where the parent has REPLICA IDENTITY FULL, but one partition does not.
+#
+# Expected behavior:
+# - For partitions with REPLICA IDENTITY FULL, old tuple must be marked as 'O' and contain full row.
+# - For partitions with REPLICA IDENTITY DEFAULT, old tuple should be marked as 'K' and contain only key columns.
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+sub log_state
+{
+ my ($node, $label) = @_;
+
+ my $rows = $node->safe_psql('postgres', q{
+ SELECT tableoid::regclass AS part,
+ id,
+ to_char(ts,'YYYY-MM-DD') AS ts,
+ load
+ FROM part_table
+ ORDER BY 1,2;
+ });
+ diag "----- $label: rows -----\n$rows\n";
+}
+
+my $pub = PostgreSQL::Test::Cluster->new('publisher');
+$pub->init(allows_streaming => 'logical');
+$pub->start;
+
+my $sub = PostgreSQL::Test::Cluster->new('subscriber');
+$sub->init;
+$sub->start;
+
+$pub->safe_psql('postgres', q{
+create table part_table(
+ id int generated always as identity,
+ ts timestamp,
+ load text,
+ constraint part_table_pk primary key(id, ts)
+) partition by range(ts);
+
+create table part_table_sect_1 partition of part_table
+ for values from ('2000-01-01') to ('2024-01-01');
+create table part_table_sect_2 partition of part_table
+ for values from ('2024-01-01') to (maxvalue);
+
+alter table part_table replica identity full;
+alter table part_table_sect_1 replica identity full;
+
+create publication pub_part_table
+ for table part_table
+ with (publish_via_partition_root = true);
+});
+
+$pub->safe_psql('postgres',
+ q{select pg_create_logical_replication_slot('slot_test', 'pgoutput');});
+
+$sub->safe_psql('postgres', q{
+create table part_table(
+ id int,
+ ts timestamp,
+ load text,
+ constraint part_table_pk primary key(id, ts)
+) partition by range(ts);
+
+create table part_table_sect_1 partition of part_table
+ for values from ('2000-01-01') to ('2024-01-01');
+create table part_table_sect_2 partition of part_table
+ for values from ('2024-01-01') to (maxvalue);
+});
+
+my $connstr = $pub->connstr . ' dbname=postgres';
+$sub->safe_psql('postgres', qq{
+create subscription sub_part
+ connection '$connstr application_name=sub_part'
+ publication pub_part_table;});
+
+$sub->wait_for_subscription_sync($pub, 'sub_part');
+
+$pub->safe_psql('postgres', q{
+insert into part_table values (default, '2020-01-01 00:00', 'first');
+insert into part_table values (default, '2025-01-01 00:00', 'second');
+});
+$sub->wait_for_subscription_sync($pub, 'sub_part');
+diag("\n");
+log_state($pub, 'publisher after insert');
+log_state($sub, 'subscriber after insert');
+
+$pub->safe_psql('postgres',
+ q{update part_table set ts = ts + interval '1 day';});
+$sub->wait_for_subscription_sync($pub, 'sub_part');
+
+log_state($pub, 'publisher after update');
+log_state($sub, 'subscriber after update');
+
+$pub->safe_psql('postgres', q{delete from part_table;});
+$sub->wait_for_subscription_sync($pub, 'sub_part');
+
+log_state($pub, 'publisher after delete');
+log_state($sub, 'subscriber after delete');
+
+my $wal = $pub->safe_psql('postgres', q{
+select string_agg(encode(data, 'escape'),'')
+ from pg_logical_slot_get_binary_changes(
+ 'slot_test', null, null,
+ 'proto_version','1',
+ 'publication_names','pub_part_table');
+});
+
+diag("---- WAL stream ----\n$wal\n");
+
+# 1: first partition has REPLICA IDENTITY FULL - full old tuple
+# (see - https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html#PROTOCOL-LOGICALREP-MESSAGE-FORMATS-UPDATE)
+like(
+ $wal,
+ qr/U.*O.*first.*first/s,
+ 'partition WITH REPLICA IDENTITY FULL contains full old tuple'
+);
+
+# 2: second partition has REPLICA IDENTITY DEFAULT - only keys expected.
+if ($wal =~ /U.*K.*second/s)
+{
+ pass("Tag K correctly used for partition with REPLICA IDENTITY DEFAULT");
+}
+elsif ($wal =~ /(U.*O.*second)/s)
+{
+ my $blk = $1;
+ my $count = () = $blk =~ /second/g;
+ is($count, 2, "Tag O used but this partition with REPLICA IDENTITY DEFAULT");
+}
+
+done_testing();
--
2.34.1
Hi all,
I've rebased this series onto the latest master.
Changes in v3:
Patch 1/2 adds two new functions: logicalrep_write_update_extended,
logicalrep_write_delete_extended to logicalproto.
These are now used in pgoutput and allow correct old-tuple flag handling
when publish_via_partition_root = true.
The old functions remain as wrappers to preserve compatibility.
A short documentation note was added to explain the new behaviour.
Patch 2/2 moves the TAP test into a separate commit,
so the code change and test are isolated.
--
Best regards,
Mikhail Kharitonov
Attachments:
v3-0002-replica-identity-test.patchapplication/octet-stream; name=v3-0002-replica-identity-test.patchDownload
From 91e46dd7c05adf3bef68d5f20a624e3e4a8ce545 Mon Sep 17 00:00:00 2001
From: Mikhail Kharitonov <mikhail.kharitonov.dev@gmail.com>
Date: Tue, 12 Aug 2025 14:16:10 +0300
Subject: [PATCH v3 2/2] tests: TAP for leaf based old tuple flag with
publish_via_partition_root
---
.../t/036_partition_replica_identity.pl | 135 ++++++++++++++++++
1 file changed, 135 insertions(+)
create mode 100755 src/test/subscription/t/036_partition_replica_identity.pl
diff --git a/src/test/subscription/t/036_partition_replica_identity.pl b/src/test/subscription/t/036_partition_replica_identity.pl
new file mode 100755
index 00000000000..f69bca5ebe2
--- /dev/null
+++ b/src/test/subscription/t/036_partition_replica_identity.pl
@@ -0,0 +1,135 @@
+# Test logical replication with publish_via_partition_root,
+# where the parent has REPLICA IDENTITY FULL, but one partition does not.
+#
+# Expected behavior:
+# - For partitions with REPLICA IDENTITY FULL, old tuple must be marked as 'O' and contain full row.
+# - For partitions with REPLICA IDENTITY DEFAULT, old tuple should be marked as 'K' and contain only key columns.
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+sub log_state
+{
+ my ($node, $label) = @_;
+
+ my $rows = $node->safe_psql('postgres', q{
+ SELECT tableoid::regclass AS part,
+ id,
+ to_char(ts,'YYYY-MM-DD') AS ts,
+ load
+ FROM part_table
+ ORDER BY 1,2;
+ });
+ diag "----- $label: rows -----\n$rows\n";
+}
+
+my $pub = PostgreSQL::Test::Cluster->new('publisher');
+$pub->init(allows_streaming => 'logical');
+$pub->start;
+
+my $sub = PostgreSQL::Test::Cluster->new('subscriber');
+$sub->init;
+$sub->start;
+
+$pub->safe_psql('postgres', q{
+create table part_table(
+ id int generated always as identity,
+ ts timestamp,
+ load text,
+ constraint part_table_pk primary key(id, ts)
+) partition by range(ts);
+
+create table part_table_sect_1 partition of part_table
+ for values from ('2000-01-01') to ('2024-01-01');
+create table part_table_sect_2 partition of part_table
+ for values from ('2024-01-01') to (maxvalue);
+
+alter table part_table replica identity full;
+alter table part_table_sect_1 replica identity full;
+
+create publication pub_part_table
+ for table part_table
+ with (publish_via_partition_root = true);
+});
+
+$pub->safe_psql('postgres',
+ q{select pg_create_logical_replication_slot('slot_test', 'pgoutput');});
+
+$sub->safe_psql('postgres', q{
+create table part_table(
+ id int,
+ ts timestamp,
+ load text,
+ constraint part_table_pk primary key(id, ts)
+) partition by range(ts);
+
+create table part_table_sect_1 partition of part_table
+ for values from ('2000-01-01') to ('2024-01-01');
+create table part_table_sect_2 partition of part_table
+ for values from ('2024-01-01') to (maxvalue);
+});
+
+my $connstr = $pub->connstr . ' dbname=postgres';
+$sub->safe_psql('postgres', qq{
+create subscription sub_part
+ connection '$connstr application_name=sub_part'
+ publication pub_part_table;});
+
+$sub->wait_for_subscription_sync($pub, 'sub_part');
+
+$pub->safe_psql('postgres', q{
+insert into part_table values (default, '2020-01-01 00:00', 'first');
+insert into part_table values (default, '2025-01-01 00:00', 'second');
+});
+$sub->wait_for_subscription_sync($pub, 'sub_part');
+diag("\n");
+log_state($pub, 'publisher after insert');
+log_state($sub, 'subscriber after insert');
+
+$pub->safe_psql('postgres',
+ q{update part_table set ts = ts + interval '1 day';});
+$sub->wait_for_subscription_sync($pub, 'sub_part');
+
+log_state($pub, 'publisher after update');
+log_state($sub, 'subscriber after update');
+
+$pub->safe_psql('postgres', q{delete from part_table;});
+$sub->wait_for_subscription_sync($pub, 'sub_part');
+
+log_state($pub, 'publisher after delete');
+log_state($sub, 'subscriber after delete');
+
+my $wal = $pub->safe_psql('postgres', q{
+select string_agg(encode(data, 'escape'),'')
+ from pg_logical_slot_get_binary_changes(
+ 'slot_test', null, null,
+ 'proto_version','1',
+ 'publication_names','pub_part_table');
+});
+
+diag("---- WAL stream ----\n$wal\n");
+
+# 1: first partition has REPLICA IDENTITY FULL - full old tuple
+# (see - https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html#PROTOCOL-LOGICALREP-MESSAGE-FORMATS-UPDATE)
+like(
+ $wal,
+ qr/U.*O.*first.*first/s,
+ 'partition WITH REPLICA IDENTITY FULL contains full old tuple'
+);
+
+# 2: second partition has REPLICA IDENTITY DEFAULT - only keys expected.
+if ($wal =~ /U.*K.*second/s)
+{
+ pass("Tag K correctly used for partition with REPLICA IDENTITY DEFAULT");
+}
+elsif ($wal =~ /(U.*O.*second)/s)
+{
+ my $blk = $1;
+ my $count = () = $blk =~ /second/g;
+ is($count, 2, "Tag O used but this partition with REPLICA IDENTITY DEFAULT");
+}
+
+done_testing();
--
2.34.1
v3-0001-Fix-replica-identity-flags-for-partitioned-tables.patchapplication/octet-stream; name=v3-0001-Fix-replica-identity-flags-for-partitioned-tables.patchDownload
From 3498e773879b3b87cd7ad46a1430f32b9c628ac7 Mon Sep 17 00:00:00 2001
From: Mikhail Kharitonov <mikhail.kharitonov.dev@gmail.com>
Date: Tue, 12 Aug 2025 14:15:54 +0300
Subject: [PATCH v3 1/2] logical replication: add *_extended API; pgoutput uses
leaf based O/K; doc note
---
doc/src/sgml/logical-replication.sgml | 8 +++
src/backend/replication/logical/proto.c | 79 +++++++++++++--------
src/backend/replication/pgoutput/pgoutput.c | 4 +-
src/include/replication/logicalproto.h | 23 ++++--
4 files changed, 76 insertions(+), 38 deletions(-)
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index a0761cfee3f..9c34423204f 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -946,6 +946,14 @@ HINT: To initiate replication, you must manually create the replication slot, e
row filter is used.
</para>
+ <para>
+ When <literal>publish_via_partition_root</literal> is <literal>true</literal>, the relation OID and
+ the tuple layout in logical replication messages correspond to the <emphasis>root</emphasis>
+ partitioned table. However, for <literal>UPDATE</literal> and <literal>DELETE</literal>, the
+ old-tuple flag (<literal>O</literal> vs <literal>K</literal>) is determined by the replica identity of the
+ <emphasis>leaf</emphasis> partition that actually stored the old row.
+ </para>
+
</sect2>
<sect2 id="logical-replication-row-filter-initial-data-sync">
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1b3d9eb49dd..bec198dc162 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -447,39 +447,51 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
* Write UPDATE to the output stream.
*/
void
-logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
- TupleTableSlot *oldslot, TupleTableSlot *newslot,
- bool binary, Bitmapset *columns,
- PublishGencolsType include_gencols_type)
+logicalrep_write_update_extended(StringInfo out, TransactionId xid,
+ Relation leafrel, Relation pubrel,
+ TupleTableSlot *oldslot, TupleTableSlot *newslot,
+ bool binary, Bitmapset *columns,
+ PublishGencolsType include_gencols_type)
{
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
- Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
- rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
- rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
-
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
/* use Oid as relation identifier */
- pq_sendint32(out, RelationGetRelid(rel));
+ pq_sendint32(out, RelationGetRelid(pubrel));
if (oldslot != NULL)
{
- if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ Assert(leafrel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+ leafrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+ leafrel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+
+ if (leafrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+ logicalrep_write_tuple(out, pubrel, oldslot, binary, columns,
include_gencols_type);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newslot, binary, columns,
+ logicalrep_write_tuple(out, pubrel, newslot, binary, columns,
include_gencols_type);
}
+/* Backward-compatible wrappers keep the old exported symbols alive. */
+void
+logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
+ TupleTableSlot *oldslot, TupleTableSlot *newslot,
+ bool binary, Bitmapset *columns,
+ PublishGencolsType include_gencols_type)
+{
+ logicalrep_write_update_extended(out, xid, rel, rel, oldslot, newslot,
+ binary, columns, include_gencols_type);
+}
+
/*
* Read UPDATE from stream.
*/
@@ -521,19 +533,13 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
return relid;
}
-/*
- * Write DELETE to the output stream.
- */
void
-logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
- TupleTableSlot *oldslot, bool binary,
- Bitmapset *columns,
- PublishGencolsType include_gencols_type)
+logicalrep_write_delete_extended(StringInfo out, TransactionId xid,
+ Relation leafrel, Relation pubrel,
+ TupleTableSlot *oldslot, bool binary,
+ Bitmapset *columns,
+ PublishGencolsType include_gencols_type)
{
- Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
- rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
- rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
-
pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
/* transaction ID (if not valid, we're not streaming) */
@@ -541,15 +547,26 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
pq_sendint32(out, xid);
/* use Oid as relation identifier */
- pq_sendint32(out, RelationGetRelid(rel));
-
- if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
- pq_sendbyte(out, 'O'); /* old tuple follows */
- else
- pq_sendbyte(out, 'K'); /* old key follows */
+ pq_sendint32(out, RelationGetRelid(pubrel));
+ Assert(oldslot != NULL);
+ Assert(leafrel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+ leafrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+ leafrel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+ pq_sendbyte(out, (leafrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) ? 'O' : 'K');
+ logicalrep_write_tuple(out, pubrel, oldslot, binary, columns, include_gencols_type);
+}
- logicalrep_write_tuple(out, rel, oldslot, binary, columns,
- include_gencols_type);
+/*
+ * Write DELETE to the output stream.
+ */
+void
+logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
+ TupleTableSlot *oldslot, bool binary,
+ Bitmapset *columns,
+ PublishGencolsType include_gencols_type)
+{
+ logicalrep_write_delete_extended(out, xid, rel, rel, oldslot, binary,
+ columns, include_gencols_type);
}
/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 80540c017bd..81056768587 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1589,12 +1589,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
relentry->include_gencols_type);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
- logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
+ logicalrep_write_update_extended(ctx->out, xid, relation, targetrel, old_slot,
new_slot, data->binary, relentry->columns,
relentry->include_gencols_type);
break;
case REORDER_BUFFER_CHANGE_DELETE:
- logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
+ logicalrep_write_delete_extended(ctx->out, xid, relation, targetrel, old_slot,
data->binary, relentry->columns,
relentry->include_gencols_type);
break;
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index b261c60d3fa..81932c065c5 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -228,17 +228,30 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
PublishGencolsType include_gencols_type);
extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
extern void logicalrep_write_update(StringInfo out, TransactionId xid,
- Relation rel, TupleTableSlot *oldslot,
- TupleTableSlot *newslot, bool binary,
- Bitmapset *columns,
+ Relation rel,
+ TupleTableSlot *oldslot, TupleTableSlot *newslot,
+ bool binary, Bitmapset *columns,
PublishGencolsType include_gencols_type);
+
+extern void logicalrep_write_update_extended(StringInfo out, TransactionId xid,
+ Relation leafrel, Relation pubrel,
+ TupleTableSlot *oldslot, TupleTableSlot *newslot,
+ bool binary, Bitmapset *columns,
+ PublishGencolsType include_gencols_type);
extern LogicalRepRelId logicalrep_read_update(StringInfo in,
bool *has_oldtuple, LogicalRepTupleData *oldtup,
LogicalRepTupleData *newtup);
extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
- Relation rel, TupleTableSlot *oldslot,
- bool binary, Bitmapset *columns,
+ Relation rel,
+ TupleTableSlot *oldslot, bool binary,
+ Bitmapset *columns,
PublishGencolsType include_gencols_type);
+
+extern void logicalrep_write_delete_extended(StringInfo out, TransactionId xid,
+ Relation leafrel, Relation pubrel,
+ TupleTableSlot *oldslot, bool binary,
+ Bitmapset *columns,
+ PublishGencolsType include_gencols_type);
extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
LogicalRepTupleData *oldtup);
extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
base-commit: b227b0bb4e032e19b3679bedac820eba3ac0d1cf
--
2.34.1
On Mon, May 5, 2025 at 1:56 AM Mikhail Kharitonov
<mikhail.kharitonov.dev@gmail.com> wrote:
Hi hackers,
An inconsistency was observed when using logical replication on partitioned
tables with the option `publish_via_partition_root = true`: if REPLICA IDENTITY
FULL is set only on the parent table, but not on all partitions, logical
decoding emits UPDATE and DELETE messages with tag 'O' (old tuple) even for
partitions that do not have full replica identity. In those cases, only the
primary key columns are included in the message, which contradicts the expected
meaning of 'O' and violates the logical replication message protocol:https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html
This can cause issues in downstream consumers, which interpret
the 'O' tag as implying that a full tuple is present.The attached patch resolves the inconsistency by selecting the correct tuple
type ('O' vs 'K') based on the replica identity of the actual leaf relation
being published, rather than using the setting of the root relation alone.
As a result, the format of logical replication messages aligns with
the semantics
defined by the protocol.Steps to reproduce:
1. Create a partitioned table with REPLICA IDENTITY FULL on the parent
and only one of the partitions.2. Create a publication with `publish_via_partition_root = true`.
3. Perform INSERT, UPDATE, DELETE operations through the root table.
4. Observe via `pg_recvlogical` that for a partition without full replica
identity, the logical replication stream contains 'O' records with
only key fields.
I tested this scenario but what I've seen in my env is somewhat
different from the above analysis; pgoutput plugin writes 'O' records
as you mentioned, but it doesn't omit non-key fields, but writes NULL
as non-key fields. Here are my reproducible steps:
create table p (a int not null, b int) partition by list (a);
create table c1 partition of p for values in (1);
create table c2 partition of p for values in (2);
create unique index on c2 (a);
alter table p replica identity full;
alter table c1 replica identity full;
alter table c2 replica identity using INDEX c2_a_idx ;
insert into p values (1, 10), (2, 20);
create publication pub for all tables with (publish_via_partition_root
= 'true');
select pg_create_logical_replication_slot('sub', 'pgoutput');
delete from p where a = 1;
delete from p where a = 2;
select encode(data, 'escape') from
pg_logical_slot_peek_binary_changes('sub', null, null,
'proto_version', '1', 'publication_names', 'pub');
The last pg_logical_slot_peek_binary_changes() writes the two 'D'
(delete) messages:
1. D\000\000@\000O\000\x02t\000\000\000\x011t\000\000\000\x0210
2. D\000\000@\000O\000\x02t\000\000\000\x012n
What we can know from these messages are:
- Both messages have 'O'.
- Both messages have two columns ('\000\x02').
- The first message has: the first column '1' (length is 1
('\000\000\000\x01')), and the second column '10' (length is 2
('\000\000\000\x02')).
- The second message has: the first column '2', and the second column
NULL ('n').
From these facts, I guess there could be problematic cases even in the
native logical replication. Here are reproducible steps:
-- Publisher
create table p (a int not null, b int) partition by list (a);
create table c1 partition of p for values in (1);
create table c2 partition of p for values in (2);
create unique index on c2 (a);
alter table p replica identity full;
alter table c1 replica identity full;
alter table c2 replica identity using INDEX c2_a_idx ;
insert into p values (1, 10), (2, 20);
create publication pub for all tables with (publish_via_partition_root
= 'true');
-- Subscriber
create table p (a int, b int, c int);
create subscription sub connection 'dbname=postgres port=5551' publication pub;
-- Publisher
delete from p where a = 1; -- generate a message 'DELETE (1, 10)'
delete from p where a = 2; -- generate a message 'DELETE (2, NULL)'
The second delete message cannot find the tuple on the subscriber, so
the table contents are now inconsistent between the publisher and the
subscriber. I need more investigation to verify that it's a problem,
but this behavior doesn't change even with the proposed change.
Regards,
--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com