Conflict detection for multiple_unique_conflicts in logical replication
Hi Hackers,
(CCing people involved in related discussions)
I am starting this thread to propose a new conflict detection type
"multiple_unique_conflicts" that identifies when an incoming row
during logical replication violates more than one UNIQUE constraint.
If multiple constraints (such as the primary key/replica identity
along with additional unique indexes) are violated for the same
incoming tuple, the apply_worker will trigger a dedicated
multiple_unique_conflicts conflict.
----
ISSUE:
----
Currently, when the apply_worker encounters an 'insert_exists'
conflict (where the incoming row conflicts with an existing row based
on the replica identity), it logs the conflict and errors out
immediately. If the user tries to resolve this manually by deleting
the conflicting row, the apply worker may fail again due to another
unique constraint violation on a different column. This forces users
to fix conflicts one by one, making resolution tedious and
inefficient.
Example:
Schema:
CREATE TABLE tab1 (col1 integer PRIMARY KEY, col2 integer UNIQUE, col3
integer UNIQUE);
- col1 is Replica Identity.
Data:
- on pub: (1, 11, 111)
- on sub: 3 additional local Inserts: (2, 22, 222); (3, 33, 333); (4, 44, 444)
- Concurrently on pub, new insert: (2, 33, 444)
When the new incoming tuple (2, 33, 444) is applied on the subscriber:
- The apply worker first detects an 'insert_exists' conflict on col1
(primary key) and errors out.
- If the user deletes the conflicting row (key col1=2) : (2, 22,
222), the apply worker fails again because col2=33 violates another
unique constraint for tuple (3, 33, 333);
- If the user deletes col2=33, the apply worker fails yet again due
to tuple (4, 44, 444) because col3=444 is also unique.
Conflicts on both col2 and col3 (which are independent of each other)
are an example of a 'Multiple Unique Constraints' violation.
In such cases, users are forced to resolve conflicts one by one,
making the process slow and error-prone.
---
SOLUTION:
---
During an INSERT or UPDATE conflict check, instead of stopping at the
first encountered conflict, the apply_worker will now check all unique
indexes before reporting a conflict. If multiple unique key violations
are found, it will report a 'multiple_unique_conflicts' conflict,
listing all conflicting tuples in the logs. If only a single key
conflict is detected, the existing 'insert_exists' conflict will be
raised as it is now.
This helps users resolve conflicts in one go, by deleting all
conflicting tuples at once, instead of handling them one at a time,
unless they choose to skip the transaction.
Without an automatic resolution strategy in place, the apply worker
will keep retrying until manual intervention resolves the conflict.
Like other existing conflict types, statistics for
multiple_unique_conflicts will be collected and can be viewed in the
pg_stat_subscription_stats view for monitoring.
~~~
Note:
Beyond its immediate benefits, this conflict type also helps the
future automatic resolution [1]/messages/by-id/CAJpy0uD0-DpYVMtsxK5R=zszXauZBayQMAYET9sWr_w0CNWXxQ@mail.gmail.com. Handling multiple unique conflicts
under the 'insert_exists' or 'update_exists' conflict resolutions can
cause unexpected behavior since their resolution strategies are
designed for single-tuple conflicts. For example, 'last_update_wins'
works well for a single key conflict but fails when the remote tuple
is newer than some conflicting local rows and older than others,
making resolution unclear. Introducing 'multiple_unique_conflicts'
ensures such cases are handled separately and correctly with dedicated
resolution strategies. The possible resolution strategies for this
conflict type could be:
apply: Apply the remote change by deleting all conflicting rows and
then inserting or updating the remote row.
skip: Skip applying the change.
error: Error out on conflict (default behavior).
~~~
The proposal patch is attached. Suggestions and feedback are highly appreciated!
[1]: /messages/by-id/CAJpy0uD0-DpYVMtsxK5R=zszXauZBayQMAYET9sWr_w0CNWXxQ@mail.gmail.com
--
Thanks,
Nisha
Attachments:
v1-0001-Implement-the-conflict-detection-for-multiple_uni.patchapplication/octet-stream; name=v1-0001-Implement-the-conflict-detection-for-multiple_uni.patchDownload
From 78cc3deacfd686d843f77f3c2fdf7dca70b16c8f Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Tue, 22 Oct 2024 11:01:41 +0530
Subject: [PATCH v1] Implement the conflict detection for
multiple_unique_conflicts in logical replication
Introduce a new conflict type, multiple_unique_conflicts, to handle cases
where an incoming row during logical replication violates multiple UNIQUE
constraints.
Previously, the apply worker detected and reported only the first
encountered key conflict (insert_exists/update_exists), causing repeated
failures as each constraint violation need to be handled one by one making
the process slow and error-prone.
Now, the apply worker checks all unique constraints upfront and reports a
multiple_unique_conflicts if multiple violations exist. This allows users
to resolve all conflicts at once by deleting all conflicting tuples rather
than dealing with them individually or skipping the transaction.
Also, the patch adds a new column in view pg_stat_subscription_stats
to support stats collection for this conflict type.
---
doc/src/sgml/logical-replication.sgml | 12 ++
doc/src/sgml/monitoring.sgml | 12 ++
src/backend/catalog/system_views.sql | 1 +
src/backend/executor/execReplication.c | 50 ++++++--
src/backend/replication/logical/conflict.c | 94 +++++++++++++-
src/backend/utils/adt/pgstatfuncs.c | 6 +-
src/include/catalog/pg_proc.dat | 6 +-
src/include/replication/conflict.h | 11 +-
src/test/regress/expected/rules.out | 3 +-
src/test/subscription/meson.build | 1 +
.../t/035_multiple_unique_conflicts.pl | 119 ++++++++++++++++++
11 files changed, 293 insertions(+), 22 deletions(-)
create mode 100644 src/test/subscription/t/035_multiple_unique_conflicts.pl
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 3d18e507bb..b0c8a0ff48 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1877,6 +1877,18 @@ test_sub=# SELECT * from tab_gen_to_gen;
</para>
</listitem>
</varlistentry>
+ <varlistentry id="conflict-multiple-unique-conflicts" xreflabel="multiple_unique_conflicts">
+ <term><literal>multiple_unique_conflicts</literal></term>
+ <listitem>
+ <para>
+ Inserting a row or updating values of a row violates more than one <literal>NOT DEFERRABLE</literal>
+ unique constraints. Note that to log the origin and commit timestamp details of the conflicting key,
+ <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+ should be enabled on the subscriber. In this case, an error will be
+ raised until the conflict is resolved manually.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
Note that there are other conflict scenarios, such as exclusion constraint
violations. Currently, we do not provide additional details for them in the
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 928a6eb64b..8ccc92fddf 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2244,6 +2244,18 @@ description | Waiting for a newly initialized WAL file to reach durable storage
</para></entry>
</row>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>confl_multiple_unique_conflicts</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times a row insertion or an updated row values violated multiple
+ <literal>NOT DEFERRABLE</literal> unique constraints during the
+ application of changes. See <xref linkend="conflict-multiple-unique-conflicts"/>
+ for details about this conflict.
+ </para></entry>
+ </row>
+
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index eff0990957..16951272b4 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1388,6 +1388,7 @@ CREATE VIEW pg_stat_subscription_stats AS
ss.confl_update_missing,
ss.confl_delete_origin_differs,
ss.confl_delete_missing,
+ ss.confl_multiple_unique_conflicts,
ss.stats_reset
FROM pg_subscription as s,
pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 5f7613cc83..82f3664e88 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -493,25 +493,55 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
ConflictType type, List *recheckIndexes,
TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
{
+ int conflicts = 0;
+ List *conflictSlots = NIL;
+ List *conflictIndexes = NIL;
+ TupleTableSlot *conflictslot;
+
/* Check all the unique indexes for a conflict */
foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
{
- TupleTableSlot *conflictslot;
-
if (list_member_oid(recheckIndexes, uniqueidx) &&
FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
&conflictslot))
{
- RepOriginId origin;
- TimestampTz committs;
- TransactionId xmin;
-
- GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
- ReportApplyConflict(estate, resultRelInfo, ERROR, type,
- searchslot, conflictslot, remoteslot,
- uniqueidx, xmin, origin, committs);
+ conflicts++;
+
+ /* Add the conflict slot and index to their respective lists */
+ conflictSlots = lappend(conflictSlots, conflictslot);
+ conflictIndexes = lappend_oid(conflictIndexes, uniqueidx);
}
}
+
+ /*
+ * Report an UPDATE_EXISTS conflict when only one unique constraint is
+ * violated.
+ */
+ if (conflicts == 1)
+ {
+ Oid uniqueidx;
+ RepOriginId origin;
+ TimestampTz committs;
+ TransactionId xmin;
+
+ uniqueidx = linitial_oid(conflictIndexes);
+ conflictslot = linitial(conflictSlots);
+
+ GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
+ ReportApplyConflict(estate, resultRelInfo, ERROR, type,
+ searchslot, conflictslot, remoteslot,
+ uniqueidx, xmin, origin, committs);
+ }
+
+ /*
+ * Report a MULTIPLE_UNIQUE_CONFLICTS when two or more unique constraints
+ * are violated.
+ */
+ else if (conflicts > 1)
+ ReportMultipleUniqueConflict(estate, resultRelInfo, ERROR,
+ CT_MULTIPLE_UNIQUE_CONFLICTS,
+ searchslot, remoteslot,
+ conflictSlots, conflictIndexes);
}
/*
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 772fc83e88..2f71fbb287 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -29,7 +29,8 @@ static const char *const ConflictTypeNames[] = {
[CT_UPDATE_EXISTS] = "update_exists",
[CT_UPDATE_MISSING] = "update_missing",
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
- [CT_DELETE_MISSING] = "delete_missing"
+ [CT_DELETE_MISSING] = "delete_missing",
+ [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
static int errcode_apply_conflict(ConflictType type);
@@ -41,7 +42,7 @@ static int errdetail_apply_conflict(EState *estate,
TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
RepOriginId localorigin,
- TimestampTz localts);
+ TimestampTz localts, StringInfo err_msg);
static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
ConflictType type,
TupleTableSlot *searchslot,
@@ -125,7 +126,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictTypeNames[type]),
errdetail_apply_conflict(estate, relinfo, type, searchslot,
localslot, remoteslot, indexoid,
- localxmin, localorigin, localts));
+ localxmin, localorigin, localts, NULL));
}
/*
@@ -169,6 +170,7 @@ errcode_apply_conflict(ConflictType type)
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
return errcode(ERRCODE_UNIQUE_VIOLATION);
case CT_UPDATE_ORIGIN_DIFFERS:
case CT_UPDATE_MISSING:
@@ -196,7 +198,8 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
ConflictType type, TupleTableSlot *searchslot,
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts)
+ RepOriginId localorigin, TimestampTz localts,
+ StringInfo err_msg)
{
StringInfoData err_detail;
char *val_desc;
@@ -209,6 +212,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
Assert(OidIsValid(indexoid));
if (localts)
@@ -291,6 +295,17 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
if (val_desc)
appendStringInfo(&err_detail, "\n%s", val_desc);
+ /*
+ * If the caller provides a non-null 'err_msg' pointer, only the
+ * err_detail.data is requested. Append the constructed err_detail message
+ * to 'err_msg' and return.
+ */
+ if (err_msg)
+ {
+ appendStringInfo(err_msg, "\n%s", err_detail.data);
+ return 0;
+ }
+
return errdetail_internal("%s", err_detail.data);
}
@@ -323,7 +338,8 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
* Report the conflicting key values in the case of a unique constraint
* violation.
*/
- if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
+ if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
+ type == CT_MULTIPLE_UNIQUE_CONFLICTS)
{
Assert(OidIsValid(indexoid) && localslot);
@@ -489,3 +505,71 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
return index_value;
}
+
+/*
+ * Report a multiple_unique_conflicts while applying replication changes.
+ *
+ * 'searchslot' holds the tuple used to search the corresponding local
+ * tuple for update or deletion.
+ *
+ * 'conflictslots_list' is a list of slots containing the local tuples
+ * that conflict with the remote tuple.
+ *
+ * 'remoteslot' contains the new tuple from the remote side, if available.
+ *
+ * The 'conflictIndexes' list stores the OIDs of the unique indexes that
+ * triggered the constraint violation. These indexes help identify the key
+ * values of the conflicting tuple.
+ *
+ * The caller must ensure that all indexes in 'conflictIndexes' are locked,
+ * allowing us to fetch and display the conflicting key values.
+ */
+void
+ReportMultipleUniqueConflict(EState *estate, ResultRelInfo *relinfo,
+ int elevel, ConflictType type,
+ TupleTableSlot *searchslot,
+ TupleTableSlot *remoteslot,
+ List *conflictSlots, List *conflictIndexes)
+{
+ int conflictNum = 0;
+ Oid indexoid = linitial_oid(conflictIndexes);
+ Relation localrel = relinfo->ri_RelationDesc;
+ RepOriginId localorigin;
+ TimestampTz localts;
+ TransactionId localxmin;
+ StringInfoData err_detail;
+
+ initStringInfo(&err_detail);
+ appendStringInfo(&err_detail, _("The remote tuple violates multiple unique constraints on the local table."));
+
+ foreach_ptr(TupleTableSlot, slot, conflictSlots)
+ {
+ indexoid = lfirst_oid(list_nth_cell(conflictIndexes, conflictNum));
+
+ Assert(!OidIsValid(indexoid) ||
+ CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+
+ GetTupleTransactionInfo(slot, &localxmin, &localorigin, &localts);
+
+ /*
+ * Build the error detail message containing the conflicting key and
+ * tuple information. The details for each conflict will be appended
+ * to err_detail.
+ */
+ errdetail_apply_conflict(estate, relinfo, type, searchslot,
+ slot, remoteslot, indexoid,
+ localxmin, localorigin, localts, &err_detail);
+
+ conflictNum++;
+ }
+
+ pgstat_report_subscription_conflict(MySubscription->oid, type);
+
+ ereport(elevel,
+ errcode_apply_conflict(type),
+ errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+ get_namespace_name(RelationGetNamespace(localrel)),
+ RelationGetRelationName(localrel),
+ ConflictTypeNames[type]),
+ errdetail_internal("%s", err_detail.data));
+}
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index e9096a8849..550e0e8557 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2159,7 +2159,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 10
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 11
Oid subid = PG_GETARG_OID(0);
TupleDesc tupdesc;
Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
@@ -2191,7 +2191,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9e803d610d..25ab24b6f6 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5625,9 +5625,9 @@
{ oid => '6231', descr => 'statistics: information about subscription stats',
proname => 'pg_stat_get_subscription_stats', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
- proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}',
+ proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
prosrc => 'pg_stat_get_subscription_stats' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 37454dc951..29c3c08528 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -41,6 +41,9 @@ typedef enum
/* The row to be deleted is missing */
CT_DELETE_MISSING,
+ /* The row to be inserted/updated violates multiple unique constraint */
+ CT_MULTIPLE_UNIQUE_CONFLICTS,
+
/*
* Other conflicts, such as exclusion constraint violations, involve more
* complex rules than simple equality checks. These conflicts are left for
@@ -48,7 +51,7 @@ typedef enum
*/
} ConflictType;
-#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+#define CONFLICT_NUM_TYPES (CT_MULTIPLE_UNIQUE_CONFLICTS + 1)
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
TransactionId *xmin,
@@ -63,4 +66,10 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
RepOriginId localorigin, TimestampTz localts);
extern void InitConflictIndexes(ResultRelInfo *relInfo);
+extern void ReportMultipleUniqueConflict(EState *estate, ResultRelInfo *relinfo,
+ int elevel, ConflictType type,
+ TupleTableSlot *searchslot,
+ TupleTableSlot *remoteslot,
+ List *conflictslots_list,
+ List *conflictIndexes);
#endif
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5baba8d39f..2c95e2d197 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2157,9 +2157,10 @@ pg_stat_subscription_stats| SELECT ss.subid,
ss.confl_update_missing,
ss.confl_delete_origin_differs,
ss.confl_delete_missing,
+ ss.confl_multiple_unique_conflicts,
ss.stats_reset
FROM pg_subscription s,
- LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset);
+ LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
pg_stat_sys_indexes| SELECT relid,
indexrelid,
schemaname,
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index d40b49714f..05fcdd08f5 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -41,6 +41,7 @@ tests += {
't/032_subscribe_use_index.pl',
't/033_run_as_table_owner.pl',
't/034_temporal.pl',
+ 't/035_multiple_unique_conflicts.pl',
't/100_bugs.pl',
],
},
diff --git a/src/test/subscription/t/035_multiple_unique_conflicts.pl b/src/test/subscription/t/035_multiple_unique_conflicts.pl
new file mode 100644
index 0000000000..50e64c936a
--- /dev/null
+++ b/src/test/subscription/t/035_multiple_unique_conflicts.pl
@@ -0,0 +1,119 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test the conflict detection of conflict type 'multiple_unique_conflicts'.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+ qq(max_prepared_transactions = 10));
+$node_publisher->start;
+
+# Create subscriber node with track_commit_timestamp enabled
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+ qq(track_commit_timestamp = on));
+$node_subscriber->start;
+
+# Create table on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE conf_tab(a int PRIMARY key, b int unique, c int unique);");
+
+# Create similar table on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE conf_tab(a int PRIMARY key, b int unique, c int unique);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE conf_tab");
+
+# Create the subscription
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE SUBSCRIPTION tap_sub
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION tap_pub;");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+##################################################
+# INSERT some data on Pub and Sub
+##################################################
+
+# Insert data in the publisher
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (1,1,1);");
+
+# Insert data in the subscriber
+$node_subscriber->safe_psql(
+ 'postgres',
+ "INSERT INTO conf_tab VALUES (2,2,2);
+ INSERT INTO conf_tab VALUES (3,3,3);
+ INSERT INTO conf_tab VALUES (4,4,4);");
+
+my $log_location = -s $node_subscriber->logfile;
+##################################################
+# Test multiple_unique_conflicts due to INSERT
+##################################################
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,3,4);");
+
+my $logfile = slurp_file($node_subscriber->logfile(), $log_location);
+my $log_offset = -s $node_subscriber->logfile;
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+ qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ $log_offset);
+
+ok( $logfile =~
+ qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts*\n.*DETAIL:.* The remote tuple violates multiple unique constraints on the local table./,
+ 'multiple_unique_conflicts detected during insertion');
+
+# Truncate table to get rid of the error
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+$node_publisher->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+##################################################
+# Test multiple_unique_conflicts due to UPDATE
+##################################################
+
+# Insert data in the publisher
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (1,1,1);");
+
+# Insert data in the subscriber
+$node_subscriber->safe_psql(
+ 'postgres',
+ "INSERT INTO conf_tab VALUES (2,2,2);
+ INSERT INTO conf_tab VALUES (3,3,3);
+ INSERT INTO conf_tab VALUES (4,4,4);");
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE conf_tab set a=2,b=3,c=4 where a=1;");
+
+$log_offset = -s $node_subscriber->logfile;
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+ qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ $log_offset);
+
+ok( $logfile =~
+ qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts*\n.*DETAIL:.* The remote tuple violates multiple unique constraints on the local table./,
+ 'multiple_unique_conflicts detected during update');
+
+done_testing();
--
2.34.1
Hi Nisha.
Some review comments for patch v1-0001
======
GENERAL
1.
This may be a basic/naive question, but it is unclear to me why we
care about the stats of confl_multiple_unique_conflicts?
I can understand it would be useful to get multiple conflicts logged
at the same time so the user doesn't have to stumble across them one
by one when fixing them, but as far as the stats go, why do we care
about this stat? Also, since you don't distinguish between multiple
insert conflicts versus multiple update conflicts the stat usefulness
seemed even more dubious.
(because of this question, I skipped reviewing some parts of this
patch related to stats)
======
Commit message
2.
/reports a multiple_unique_conflicts/reports multiple_unique_conflicts/
~~~
3.
Also, the patch adds a new column in view pg_stat_subscription_stats
to support stats collection for this conflict type.
~
Maybe say the name of the added column.
======
doc/src/sgml/logical-replication.sgml
4.
+ <para>
+ Inserting a row or updating values of a row violates more than
one <literal>NOT DEFERRABLE</literal>
+ unique constraints. Note that to log the origin and commit
timestamp details of the conflicting key,
+ <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+ should be enabled on the subscriber. In this case, an error will be
+ raised until the conflict is resolved manually.
+ </para>
/unique constraints./unique constraint./
======
doc/src/sgml/monitoring.sgml
5.
See the GENERAL question. Why is this useful?
======
src/backend/executor/execReplication.c
6.
+
+ /*
+ * Report an UPDATE_EXISTS conflict when only one unique constraint is
+ * violated.
+ */
The comment says UPDATE_EXISTS, but I thought that could be something
else like INSERT_EXISTS too.
~~~
7.
+ /*
+ * Report a MULTIPLE_UNIQUE_CONFLICTS when two or more unique constraints
+ * are violated.
+ */
/Report a MULTIPLE_UNIQUE_CONFLICTS/Report MULTIPLE_UNIQUE_CONFLICTS/
======
src/backend/replication/logical/conflict.c
ReportMultipleUniqueConflict:
8.
+ * 'conflictslots_list' is a list of slots containing the local tuples
+ * that conflict with the remote tuple.
There is no parameter called conflictslots_list
======
.../t/035_multiple_unique_conflicts.pl
9.
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
/2024/2025/
~~~
10.
+my $log_location = -s $node_subscriber->logfile;
+##################################################
+# Test multiple_unique_conflicts due to INSERT
+##################################################
Add blank line above that comment.
~~~
11.
+ok( $logfile =~
+ qr/conflict detected on relation \"public.conf_tab\":
conflict=multiple_unique_conflicts*\n.*DETAIL:.* The remote tuple
violates multiple unique constraints on the local table./,
+ 'multiple_unique_conflicts detected during insertion');
+
Won't it be more useful to also log the column name causing the
conflict? Otherwise, if there are 100s of unique columns and just 2 of
them are conflicts how will the user know what to look for when fixing
it?
~~~
12.
+ok( $logfile =~
+ qr/conflict detected on relation \"public.conf_tab\":
conflict=multiple_unique_conflicts*\n.*DETAIL:.* The remote tuple
violates multiple unique constraints on the local table./,
+ 'multiple_unique_conflicts detected during update');
(same as #12)
Won't it be more useful to also log the column name causing the
conflict? Otherwise, if there are 100s of unique columns and just 2 of
them are conflicts how will the user know what to look for when fixing
it?
======
Kind Regards,
Peter Smith.
Fujitsu Australia
On Mon, Feb 24, 2025 at 7:39 AM Peter Smith <smithpb2250@gmail.com> wrote:
Hi Nisha.
Some review comments for patch v1-0001
======
GENERAL1.
This may be a basic/naive question, but it is unclear to me why we
care about the stats of confl_multiple_unique_conflicts?I can understand it would be useful to get multiple conflicts logged
at the same time so the user doesn't have to stumble across them one
by one when fixing them, but as far as the stats go, why do we care
about this stat? Also, since you don't distinguish between multiple
insert conflicts versus multiple update conflicts the stat usefulness
seemed even more dubious.(because of this question, I skipped reviewing some parts of this
patch related to stats)
IMO, tracking multiple_unique_conflicts, like other conflict stats,
helps users understand their workload better, especially since in
these cases, stats aren't gathered under either insert_exists or
update_exists.
I'll wait for others' opinions too on the need for the stats in this case.
~~~
12. +ok( $logfile =~ + qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts*\n.*DETAIL:.* The remote tuple violates multiple unique constraints on the local table./, + 'multiple_unique_conflicts detected during update');(same as #12)
Won't it be more useful to also log the column name causing the
conflict? Otherwise, if there are 100s of unique columns and just 2 of
them are conflicts how will the user know what to look for when fixing
it?
The conflicting column details are logged. In the test case, only the
header line of the DETAIL message is compared to keep it simple.
For example, the full LOG message will look like -
ERROR: conflict detected on relation "public.conf_tab":
conflict=multiple_unique_conflicts
DETAIL: The remote tuple violates multiple unique constraints on the
local table.
Key already exists in unique index "conf_tab_pkey", modified
locally in transaction 757 at 2025-02-25 14:00:56.955403+05:30.
Key (a)=(2); existing local tuple (2, 2, 2); remote tuple (2, 3, 4).
Key already exists in unique index "conf_tab_b_key", modified
locally in transaction 758 at 2025-02-25 14:00:56.957092+05:30.
Key (b)=(3); existing local tuple (3, 3, 3); remote tuple (2, 3, 4).
Key already exists in unique index "conf_tab_c_key", modified
locally in transaction 759 at 2025-02-25 14:00:56.957337+05:30.
Key (c)=(4); existing local tuple (4, 4, 4); remote tuple (2, 3, 4).
~~~~
Addressed comments and attached the v2 patch.
--
Thanks,
Nisha
Attachments:
v2-0001-Implement-the-conflict-detection-for-multiple_uni.patchapplication/octet-stream; name=v2-0001-Implement-the-conflict-detection-for-multiple_uni.patchDownload
From d4e81a284aabe5f3fbbaf5741dd3f5e2a0fea924 Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Tue, 22 Oct 2024 11:01:41 +0530
Subject: [PATCH v2] Implement the conflict detection for
multiple_unique_conflicts in logical replication
Introduce a new conflict type, multiple_unique_conflicts, to handle cases
where an incoming row during logical replication violates multiple UNIQUE
constraints.
Previously, the apply worker detected and reported only the first
encountered key conflict (insert_exists/update_exists), causing repeated
failures as each constraint violation need to be handled one by one making
the process slow and error-prone.
Now, the apply worker checks all unique constraints upfront and reports
multiple_unique_conflicts if multiple violations exist. This allows users
to resolve all conflicts at once by deleting all conflicting tuples rather
than dealing with them individually or skipping the transaction.
Also, the patch adds a new column 'confl_multiple_unique_conflicts' in view
pg_stat_subscription_stats to support stats collection for this conflict type.
---
doc/src/sgml/logical-replication.sgml | 13 +++
doc/src/sgml/monitoring.sgml | 12 ++
src/backend/catalog/system_views.sql | 1 +
src/backend/executor/execReplication.c | 50 +++++++--
src/backend/replication/logical/conflict.c | 94 +++++++++++++++-
src/backend/utils/adt/pgstatfuncs.c | 6 +-
src/include/catalog/pg_proc.dat | 6 +-
src/include/replication/conflict.h | 11 +-
src/test/regress/expected/rules.out | 3 +-
src/test/subscription/meson.build | 1 +
.../t/035_multiple_unique_conflicts.pl | 121 +++++++++++++++++++++
11 files changed, 296 insertions(+), 22 deletions(-)
create mode 100644 src/test/subscription/t/035_multiple_unique_conflicts.pl
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 3d18e50..4817206 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1877,6 +1877,19 @@ test_sub=# SELECT * from tab_gen_to_gen;
</para>
</listitem>
</varlistentry>
+ <varlistentry id="conflict-multiple-unique-conflicts" xreflabel="multiple_unique_conflicts">
+ <term><literal>multiple_unique_conflicts</literal></term>
+ <listitem>
+ <para>
+ Inserting a row or updating values of a row violates more than one
+ <literal>NOT DEFERRABLE</literal> unique constraint. Note that to log
+ the origin and commit timestamp details of the conflicting key,
+ <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+ should be enabled on the subscriber. In this case, an error will be
+ raised until the conflict is resolved manually.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
Note that there are other conflict scenarios, such as exclusion constraint
violations. Currently, we do not provide additional details for them in the
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3dfd059..1359265 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2241,6 +2241,18 @@ description | Waiting for a newly initialized WAL file to reach durable storage
<row>
<entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>confl_multiple_unique_conflicts</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times a row insertion or an updated row values violated multiple
+ <literal>NOT DEFERRABLE</literal> unique constraints during the
+ application of changes. See <xref linkend="conflict-multiple-unique-conflicts"/>
+ for details about this conflict.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
</para>
<para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index a4d2cfd..31d269b7e 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1384,6 +1384,7 @@ CREATE VIEW pg_stat_subscription_stats AS
ss.confl_update_missing,
ss.confl_delete_origin_differs,
ss.confl_delete_missing,
+ ss.confl_multiple_unique_conflicts,
ss.stats_reset
FROM pg_subscription as s,
pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 5cef54f..b2309cb 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -493,25 +493,55 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
ConflictType type, List *recheckIndexes,
TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
{
+ int conflicts = 0;
+ List *conflictSlots = NIL;
+ List *conflictIndexes = NIL;
+ TupleTableSlot *conflictslot;
+
/* Check all the unique indexes for a conflict */
foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
{
- TupleTableSlot *conflictslot;
-
if (list_member_oid(recheckIndexes, uniqueidx) &&
FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
&conflictslot))
{
- RepOriginId origin;
- TimestampTz committs;
- TransactionId xmin;
-
- GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
- ReportApplyConflict(estate, resultRelInfo, ERROR, type,
- searchslot, conflictslot, remoteslot,
- uniqueidx, xmin, origin, committs);
+ conflicts++;
+
+ /* Add the conflict slot and index to their respective lists */
+ conflictSlots = lappend(conflictSlots, conflictslot);
+ conflictIndexes = lappend_oid(conflictIndexes, uniqueidx);
}
}
+
+ /*
+ * Report an INSERT_EXISTS or UPDATE_EXISTS conflict when only one unique
+ * constraint is violated.
+ */
+ if (conflicts == 1)
+ {
+ Oid uniqueidx;
+ RepOriginId origin;
+ TimestampTz committs;
+ TransactionId xmin;
+
+ uniqueidx = linitial_oid(conflictIndexes);
+ conflictslot = linitial(conflictSlots);
+
+ GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
+ ReportApplyConflict(estate, resultRelInfo, ERROR, type,
+ searchslot, conflictslot, remoteslot,
+ uniqueidx, xmin, origin, committs);
+ }
+
+ /*
+ * Report MULTIPLE_UNIQUE_CONFLICTS when two or more unique constraints
+ * are violated.
+ */
+ else if (conflicts > 1)
+ ReportMultipleUniqueConflict(estate, resultRelInfo, ERROR,
+ CT_MULTIPLE_UNIQUE_CONFLICTS,
+ searchslot, remoteslot,
+ conflictSlots, conflictIndexes);
}
/*
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 772fc83..ae297d8 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -29,7 +29,8 @@ static const char *const ConflictTypeNames[] = {
[CT_UPDATE_EXISTS] = "update_exists",
[CT_UPDATE_MISSING] = "update_missing",
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
- [CT_DELETE_MISSING] = "delete_missing"
+ [CT_DELETE_MISSING] = "delete_missing",
+ [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
static int errcode_apply_conflict(ConflictType type);
@@ -41,7 +42,7 @@ static int errdetail_apply_conflict(EState *estate,
TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
RepOriginId localorigin,
- TimestampTz localts);
+ TimestampTz localts, StringInfo err_msg);
static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
ConflictType type,
TupleTableSlot *searchslot,
@@ -125,7 +126,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictTypeNames[type]),
errdetail_apply_conflict(estate, relinfo, type, searchslot,
localslot, remoteslot, indexoid,
- localxmin, localorigin, localts));
+ localxmin, localorigin, localts, NULL));
}
/*
@@ -169,6 +170,7 @@ errcode_apply_conflict(ConflictType type)
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
return errcode(ERRCODE_UNIQUE_VIOLATION);
case CT_UPDATE_ORIGIN_DIFFERS:
case CT_UPDATE_MISSING:
@@ -196,7 +198,8 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
ConflictType type, TupleTableSlot *searchslot,
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts)
+ RepOriginId localorigin, TimestampTz localts,
+ StringInfo err_msg)
{
StringInfoData err_detail;
char *val_desc;
@@ -209,6 +212,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
Assert(OidIsValid(indexoid));
if (localts)
@@ -291,6 +295,17 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
if (val_desc)
appendStringInfo(&err_detail, "\n%s", val_desc);
+ /*
+ * If the caller provides a non-null 'err_msg' pointer, only the
+ * err_detail.data is requested. Append the constructed err_detail message
+ * to 'err_msg' and return.
+ */
+ if (err_msg)
+ {
+ appendStringInfo(err_msg, "\n%s", err_detail.data);
+ return 0;
+ }
+
return errdetail_internal("%s", err_detail.data);
}
@@ -323,7 +338,8 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
* Report the conflicting key values in the case of a unique constraint
* violation.
*/
- if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
+ if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
+ type == CT_MULTIPLE_UNIQUE_CONFLICTS)
{
Assert(OidIsValid(indexoid) && localslot);
@@ -489,3 +505,71 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
return index_value;
}
+
+/*
+ * Report a multiple_unique_conflicts while applying replication changes.
+ *
+ * 'searchslot' holds the tuple used to search the corresponding local
+ * tuple for update or deletion.
+ *
+ * 'remoteslot' contains the new tuple from the remote side, if available.
+ *
+ * 'conflictSlots' is a list of slots containing the local tuples
+ * that conflict with the remote tuple.
+ *
+ * The 'conflictIndexes' list stores the OIDs of the unique indexes that
+ * triggered the constraint violation. These indexes help identify the key
+ * values of the conflicting tuple.
+ *
+ * The caller must ensure that all indexes in 'conflictIndexes' are locked,
+ * allowing us to fetch and display the conflicting key values.
+ */
+void
+ReportMultipleUniqueConflict(EState *estate, ResultRelInfo *relinfo,
+ int elevel, ConflictType type,
+ TupleTableSlot *searchslot,
+ TupleTableSlot *remoteslot,
+ List *conflictSlots, List *conflictIndexes)
+{
+ int conflictNum = 0;
+ Oid indexoid = linitial_oid(conflictIndexes);
+ Relation localrel = relinfo->ri_RelationDesc;
+ RepOriginId localorigin;
+ TimestampTz localts;
+ TransactionId localxmin;
+ StringInfoData err_detail;
+
+ initStringInfo(&err_detail);
+ appendStringInfo(&err_detail, _("The remote tuple violates multiple unique constraints on the local table."));
+
+ foreach_ptr(TupleTableSlot, slot, conflictSlots)
+ {
+ indexoid = lfirst_oid(list_nth_cell(conflictIndexes, conflictNum));
+
+ Assert(!OidIsValid(indexoid) ||
+ CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+
+ GetTupleTransactionInfo(slot, &localxmin, &localorigin, &localts);
+
+ /*
+ * Build the error detail message containing the conflicting key and
+ * tuple information. The details for each conflict will be appended
+ * to err_detail.
+ */
+ errdetail_apply_conflict(estate, relinfo, type, searchslot,
+ slot, remoteslot, indexoid,
+ localxmin, localorigin, localts, &err_detail);
+
+ conflictNum++;
+ }
+
+ pgstat_report_subscription_conflict(MySubscription->oid, type);
+
+ ereport(elevel,
+ errcode_apply_conflict(type),
+ errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+ get_namespace_name(RelationGetNamespace(localrel)),
+ RelationGetRelationName(localrel),
+ ConflictTypeNames[type]),
+ errdetail_internal("%s", err_detail.data));
+}
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 68e16e5..bcebf79 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2145,7 +2145,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 10
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 11
Oid subid = PG_GETARG_OID(0);
TupleDesc tupdesc;
Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
@@ -2177,7 +2177,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index af9546d..dfa667c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5625,9 +5625,9 @@
{ oid => '6231', descr => 'statistics: information about subscription stats',
proname => 'pg_stat_get_subscription_stats', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
- proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}',
+ proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
prosrc => 'pg_stat_get_subscription_stats' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 37454dc..29c3c08 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -41,6 +41,9 @@ typedef enum
/* The row to be deleted is missing */
CT_DELETE_MISSING,
+ /* The row to be inserted/updated violates multiple unique constraint */
+ CT_MULTIPLE_UNIQUE_CONFLICTS,
+
/*
* Other conflicts, such as exclusion constraint violations, involve more
* complex rules than simple equality checks. These conflicts are left for
@@ -48,7 +51,7 @@ typedef enum
*/
} ConflictType;
-#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+#define CONFLICT_NUM_TYPES (CT_MULTIPLE_UNIQUE_CONFLICTS + 1)
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
TransactionId *xmin,
@@ -63,4 +66,10 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
RepOriginId localorigin, TimestampTz localts);
extern void InitConflictIndexes(ResultRelInfo *relInfo);
+extern void ReportMultipleUniqueConflict(EState *estate, ResultRelInfo *relinfo,
+ int elevel, ConflictType type,
+ TupleTableSlot *searchslot,
+ TupleTableSlot *remoteslot,
+ List *conflictslots_list,
+ List *conflictIndexes);
#endif
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 62f69ac..4747896 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2157,9 +2157,10 @@ pg_stat_subscription_stats| SELECT ss.subid,
ss.confl_update_missing,
ss.confl_delete_origin_differs,
ss.confl_delete_missing,
+ ss.confl_multiple_unique_conflicts,
ss.stats_reset
FROM pg_subscription s,
- LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset);
+ LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
pg_stat_sys_indexes| SELECT relid,
indexrelid,
schemaname,
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index d40b497..05fcdd0 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -41,6 +41,7 @@ tests += {
't/032_subscribe_use_index.pl',
't/033_run_as_table_owner.pl',
't/034_temporal.pl',
+ 't/035_multiple_unique_conflicts.pl',
't/100_bugs.pl',
],
},
diff --git a/src/test/subscription/t/035_multiple_unique_conflicts.pl b/src/test/subscription/t/035_multiple_unique_conflicts.pl
new file mode 100644
index 0000000..774e000
--- /dev/null
+++ b/src/test/subscription/t/035_multiple_unique_conflicts.pl
@@ -0,0 +1,121 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection of conflict type 'multiple_unique_conflicts'.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+ qq(max_prepared_transactions = 10));
+$node_publisher->start;
+
+# Create subscriber node with track_commit_timestamp enabled
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+ qq(track_commit_timestamp = on));
+$node_subscriber->start;
+
+# Create table on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE conf_tab(a int PRIMARY key, b int unique, c int unique);");
+
+# Create similar table on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE conf_tab(a int PRIMARY key, b int unique, c int unique);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE conf_tab");
+
+# Create the subscription
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE SUBSCRIPTION tap_sub
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION tap_pub;");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+##################################################
+# INSERT some data on Pub and Sub
+##################################################
+
+# Insert data in the publisher
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (1,1,1);");
+
+# Insert data in the subscriber
+$node_subscriber->safe_psql(
+ 'postgres',
+ "INSERT INTO conf_tab VALUES (2,2,2);
+ INSERT INTO conf_tab VALUES (3,3,3);
+ INSERT INTO conf_tab VALUES (4,4,4);");
+
+my $log_location = -s $node_subscriber->logfile;
+
+##################################################
+# Test multiple_unique_conflicts due to INSERT
+##################################################
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,3,4);");
+
+my $logfile = slurp_file($node_subscriber->logfile(), $log_location);
+my $log_offset = -s $node_subscriber->logfile;
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+ qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ $log_offset);
+
+ok( $logfile =~
+ qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts*\n.*DETAIL:.* The remote tuple violates multiple unique constraints on the local table./,
+ 'multiple_unique_conflicts detected during insertion');
+
+# Truncate table to get rid of the error
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+$node_publisher->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+##################################################
+# Test multiple_unique_conflicts due to UPDATE
+##################################################
+
+# Insert data in the publisher
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (1,1,1);");
+
+# Insert data in the subscriber
+$node_subscriber->safe_psql(
+ 'postgres',
+ "INSERT INTO conf_tab VALUES (2,2,2);
+ INSERT INTO conf_tab VALUES (3,3,3);
+ INSERT INTO conf_tab VALUES (4,4,4);");
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE conf_tab set a=2,b=3,c=4 where a=1;");
+
+$log_offset = -s $node_subscriber->logfile;
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+ qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ $log_offset);
+
+ok( $logfile =~
+ qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts*\n.*DETAIL:.* The remote tuple violates multiple unique constraints on the local table./,
+ 'multiple_unique_conflicts detected during update');
+
+done_testing();
--
1.8.3.1
On Thu, Feb 20, 2025 at 5:01 PM Nisha Moond <nisha.moond412@gmail.com> wrote:
Hi Hackers,
(CCing people involved in related discussions)I am starting this thread to propose a new conflict detection type
"multiple_unique_conflicts" that identifies when an incoming row
during logical replication violates more than one UNIQUE constraint.
If multiple constraints (such as the primary key/replica identity
along with additional unique indexes) are violated for the same
incoming tuple, the apply_worker will trigger a dedicated
multiple_unique_conflicts conflict.----
ISSUE:
----
Currently, when the apply_worker encounters an 'insert_exists'
conflict (where the incoming row conflicts with an existing row based
on the replica identity), it logs the conflict and errors out
immediately. If the user tries to resolve this manually by deleting
the conflicting row, the apply worker may fail again due to another
unique constraint violation on a different column. This forces users
to fix conflicts one by one, making resolution tedious and
inefficient.Example:
Schema:
CREATE TABLE tab1 (col1 integer PRIMARY KEY, col2 integer UNIQUE, col3
integer UNIQUE);
- col1 is Replica Identity.Data:
- on pub: (1, 11, 111)
- on sub: 3 additional local Inserts: (2, 22, 222); (3, 33, 333); (4, 44, 444)
- Concurrently on pub, new insert: (2, 33, 444)When the new incoming tuple (2, 33, 444) is applied on the subscriber:
- The apply worker first detects an 'insert_exists' conflict on col1
(primary key) and errors out.
- If the user deletes the conflicting row (key col1=2) : (2, 22,
222), the apply worker fails again because col2=33 violates another
unique constraint for tuple (3, 33, 333);
- If the user deletes col2=33, the apply worker fails yet again due
to tuple (4, 44, 444) because col3=444 is also unique.
Conflicts on both col2 and col3 (which are independent of each other)
are an example of a 'Multiple Unique Constraints' violation.In such cases, users are forced to resolve conflicts one by one,
making the process slow and error-prone.---
SOLUTION:
---
During an INSERT or UPDATE conflict check, instead of stopping at the
first encountered conflict, the apply_worker will now check all unique
indexes before reporting a conflict. If multiple unique key violations
are found, it will report a 'multiple_unique_conflicts' conflict,
listing all conflicting tuples in the logs. If only a single key
conflict is detected, the existing 'insert_exists' conflict will be
raised as it is now.
I think it makes sense to report all the unique key conflicts for a
single row at once, rather than stopping after the first one. However,
I don't understand the need to create a new conflict type. Can't we
report multiple conflicts for the row using the existing conflict
type, if different columns in the incoming row conflict with different
existing rows?
--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
On Tue, Mar 11, 2025 at 11:10 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
On Thu, Feb 20, 2025 at 5:01 PM Nisha Moond <nisha.moond412@gmail.com> wrote:
Hi Hackers,
(CCing people involved in related discussions)I am starting this thread to propose a new conflict detection type
"multiple_unique_conflicts" that identifies when an incoming row
during logical replication violates more than one UNIQUE constraint.
If multiple constraints (such as the primary key/replica identity
along with additional unique indexes) are violated for the same
incoming tuple, the apply_worker will trigger a dedicated
multiple_unique_conflicts conflict.----
ISSUE:
----
Currently, when the apply_worker encounters an 'insert_exists'
conflict (where the incoming row conflicts with an existing row based
on the replica identity), it logs the conflict and errors out
immediately. If the user tries to resolve this manually by deleting
the conflicting row, the apply worker may fail again due to another
unique constraint violation on a different column. This forces users
to fix conflicts one by one, making resolution tedious and
inefficient.Example:
Schema:
CREATE TABLE tab1 (col1 integer PRIMARY KEY, col2 integer UNIQUE, col3
integer UNIQUE);
- col1 is Replica Identity.Data:
- on pub: (1, 11, 111)
- on sub: 3 additional local Inserts: (2, 22, 222); (3, 33, 333); (4, 44, 444)
- Concurrently on pub, new insert: (2, 33, 444)When the new incoming tuple (2, 33, 444) is applied on the subscriber:
- The apply worker first detects an 'insert_exists' conflict on col1
(primary key) and errors out.
- If the user deletes the conflicting row (key col1=2) : (2, 22,
222), the apply worker fails again because col2=33 violates another
unique constraint for tuple (3, 33, 333);
- If the user deletes col2=33, the apply worker fails yet again due
to tuple (4, 44, 444) because col3=444 is also unique.
Conflicts on both col2 and col3 (which are independent of each other)
are an example of a 'Multiple Unique Constraints' violation.In such cases, users are forced to resolve conflicts one by one,
making the process slow and error-prone.---
SOLUTION:
---
During an INSERT or UPDATE conflict check, instead of stopping at the
first encountered conflict, the apply_worker will now check all unique
indexes before reporting a conflict. If multiple unique key violations
are found, it will report a 'multiple_unique_conflicts' conflict,
listing all conflicting tuples in the logs. If only a single key
conflict is detected, the existing 'insert_exists' conflict will be
raised as it is now.I think it makes sense to report all the unique key conflicts for a
single row at once, rather than stopping after the first one. However,
I don't understand the need to create a new conflict type. Can't we
report multiple conflicts for the row using the existing conflict
type, if different columns in the incoming row conflict with different
existing rows?
The goal of introducing a new conflict type is to handle multiple-key
conflicts separately. It will not only allow users to apply different
resolution methods for single-key vs multi-key conflicts but will also
help them identify such cases directly from stats instead of filtering
through the LOG file.
For example, with future resolution options, 'last_update_wins' will
work well for single-key conflicts (insert_exists/update_exists) since
only one local tuple will be replaced if the remote wins. However, for
multi-key conflicts, this resolution may not be feasible. Even if
supported, resolving the conflict could result in deleting multiple
tuples, which users may want to control separately, like opting to
skip or error out in case of multi-key conflicts.
For reference, databases like EDB-PGD(BDR)[1]https://www.enterprisedb.com/docs/pgd/4/bdr/conflicts/#insert-operations-that-violate-multiple-unique-constraints support a separate
conflict type for multi-key cases. OTOH, Oracle[2]https://docs.oracle.com/goldengate/c1230/gg-winux/GWUAD/configuring-conflict-detection-and-resolution.htm#GWUAD316 and DB2[3]https://www.ibm.com/docs/en/idr/11.4.0?topic=console-setting-conflict-detection-resolution do not
have a separate conflict_type and always ERROR out during conflict
resolution if multiple unique constraint violations happen. However,
none of these databases use the single-key conflict_type and
resolution for the multi-key conflict cases.
We’d like to know your preference—should we always ERROR out like
Oracle/DB2, or keep it as a separate conflict_type for better control
during resolution?
[1]: https://www.enterprisedb.com/docs/pgd/4/bdr/conflicts/#insert-operations-that-violate-multiple-unique-constraints
[2]: https://docs.oracle.com/goldengate/c1230/gg-winux/GWUAD/configuring-conflict-detection-and-resolution.htm#GWUAD316
[3]: https://www.ibm.com/docs/en/idr/11.4.0?topic=console-setting-conflict-detection-resolution
--
Thanks,
Nisha Moond
Hi Nisha,
Thanks for addressing some of my v1 comments. I confirmed they are all
ok. But, I haven't reviewed the v2 again because I still had doubts
about the "stats" question and am waiting to see how that pans out.
Meanwhile, I had a couple more replies below.
On Tue, Feb 25, 2025 at 8:37 PM Nisha Moond <nisha.moond412@gmail.com> wrote:
On Mon, Feb 24, 2025 at 7:39 AM Peter Smith <smithpb2250@gmail.com> wrote:
Hi Nisha.
Some review comments for patch v1-0001
======
GENERAL1.
This may be a basic/naive question, but it is unclear to me why we
care about the stats of confl_multiple_unique_conflicts?I can understand it would be useful to get multiple conflicts logged
at the same time so the user doesn't have to stumble across them one
by one when fixing them, but as far as the stats go, why do we care
about this stat? Also, since you don't distinguish between multiple
insert conflicts versus multiple update conflicts the stat usefulness
seemed even more dubious.(because of this question, I skipped reviewing some parts of this
patch related to stats)IMO, tracking multiple_unique_conflicts, like other conflict stats,
helps users understand their workload better, especially since in
these cases, stats aren't gathered under either insert_exists or
update_exists.
When you say "stats aren't gathered" isn't that just your
implementation choice? e.g. If an INSERT fails because it
simultaneously violates multiple (say 5) different constraints I
assumed that could be implemented as conf_insert_exists += 1 or
conf_insert_exists += 5. Isn't it just a matter of documenting the
behaviour?
I'll wait for others' opinions too on the need for the stats in this case.
OK. I'll revisit this thread after I learn the outcome.
~~~
12. +ok( $logfile =~ + qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts*\n.*DETAIL:.* The remote tuple violates multiple unique constraints on the local table./, + 'multiple_unique_conflicts detected during update');(same as #12)
Won't it be more useful to also log the column name causing the
conflict? Otherwise, if there are 100s of unique columns and just 2 of
them are conflicts how will the user know what to look for when fixing
it?The conflicting column details are logged. In the test case, only the
header line of the DETAIL message is compared to keep it simple.
For example, the full LOG message will look like -ERROR: conflict detected on relation "public.conf_tab":
conflict=multiple_unique_conflicts
DETAIL: The remote tuple violates multiple unique constraints on the
local table.
Key already exists in unique index "conf_tab_pkey", modified
locally in transaction 757 at 2025-02-25 14:00:56.955403+05:30.
Key (a)=(2); existing local tuple (2, 2, 2); remote tuple (2, 3, 4).
Key already exists in unique index "conf_tab_b_key", modified
locally in transaction 758 at 2025-02-25 14:00:56.957092+05:30.
Key (b)=(3); existing local tuple (3, 3, 3); remote tuple (2, 3, 4).
Key already exists in unique index "conf_tab_c_key", modified
locally in transaction 759 at 2025-02-25 14:00:56.957337+05:30.
Key (c)=(4); existing local tuple (4, 4, 4); remote tuple (2, 3, 4).
OK, but I think the "keep it simple" approach hides all the
interesting conflict details, which means you can't even tell if the
log emits all the expected conflicts or not. Changing the test regex
to match all those local/remove tuple details in the log would be more
useful.
BTW, when I looked at the
'tmp_check/log/035_multiple_unique_conflicts_subscriber.log' I didn't
understand why there are there 3 separate
"conflict=multiple_unique_conflicts" ERRORs in the log. Since, AFAICT
in the TAP file, there are only 2 failure test cases (e.g. due to
INSERT, and due to UPDATE). Can you explain the difference?
======
Kind Regards,
Peter Smith.
Fujitsu Australia
On Tue, Mar 11, 2025 at 2:28 PM Nisha Moond <nisha.moond412@gmail.com> wrote:
On Tue, Mar 11, 2025 at 11:10 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
The goal of introducing a new conflict type is to handle multiple-key
conflicts separately. It will not only allow users to apply different
resolution methods for single-key vs multi-key conflicts but will also
help them identify such cases directly from stats instead of filtering
through the LOG file.
Thanks for the explanation, that makes sense.
For example, with future resolution options, 'last_update_wins' will
work well for single-key conflicts (insert_exists/update_exists) since
only one local tuple will be replaced if the remote wins. However, for
multi-key conflicts, this resolution may not be feasible. Even if
supported, resolving the conflict could result in deleting multiple
tuples, which users may want to control separately, like opting to
skip or error out in case of multi-key conflicts.
For reference, databases like EDB-PGD(BDR)[1] support a separate
conflict type for multi-key cases. OTOH, Oracle[2] and DB2[3] do not
have a separate conflict_type and always ERROR out during conflict
resolution if multiple unique constraint violations happen. However,
none of these databases use the single-key conflict_type and
resolution for the multi-key conflict cases.We’d like to know your preference—should we always ERROR out like
Oracle/DB2, or keep it as a separate conflict_type for better control
during resolution?
I agree that supporting this new conflict type will provide users with
better control, particularly when doing automatic conflict resolution.
--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
On Wed, Mar 12, 2025 at 6:33 AM Peter Smith <smithpb2250@gmail.com> wrote:
Hi Nisha,
Thanks for addressing some of my v1 comments. I confirmed they are all
ok. But, I haven't reviewed the v2 again because I still had doubts
about the "stats" question and am waiting to see how that pans out.
Meanwhile, I had a couple more replies below.On Tue, Feb 25, 2025 at 8:37 PM Nisha Moond <nisha.moond412@gmail.com> wrote:
On Mon, Feb 24, 2025 at 7:39 AM Peter Smith <smithpb2250@gmail.com> wrote:
Hi Nisha.
Some review comments for patch v1-0001
======
GENERAL1.
This may be a basic/naive question, but it is unclear to me why we
care about the stats of confl_multiple_unique_conflicts?I can understand it would be useful to get multiple conflicts logged
at the same time so the user doesn't have to stumble across them one
by one when fixing them, but as far as the stats go, why do we care
about this stat? Also, since you don't distinguish between multiple
insert conflicts versus multiple update conflicts the stat usefulness
seemed even more dubious.(because of this question, I skipped reviewing some parts of this
patch related to stats)IMO, tracking multiple_unique_conflicts, like other conflict stats,
helps users understand their workload better, especially since in
these cases, stats aren't gathered under either insert_exists or
update_exists.When you say "stats aren't gathered" isn't that just your
implementation choice? e.g. If an INSERT fails because it
simultaneously violates multiple (say 5) different constraints I
assumed that could be implemented as conf_insert_exists += 1 or
conf_insert_exists += 5. Isn't it just a matter of documenting the
behaviour?
Yes, if we handle multiple-key conflicts under existing single-key
conflicts, we can align the implementation and documentation
accordingly.
However, as I explained in [1]/messages/by-id/CABdArM42thrKrgvERxyw9hhc56tM4P+CmjKFpU07hR_9UMM8Zg@mail.gmail.com, we are providing a separate conflict
type for better control of auto-resolution as well. IMO, If we provide
separate handling for multi-key cases, their stats shouldn't be
included under insert_exists or update_exists. Since these cases won't
follow single-key conflict resolution methods, merging the stats could
be confusing for users.
I'll wait for others' opinions too on the need for the stats in this case.
OK. I'll revisit this thread after I learn the outcome.
~~~
12. +ok( $logfile =~ + qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts*\n.*DETAIL:.* The remote tuple violates multiple unique constraints on the local table./, + 'multiple_unique_conflicts detected during update');(same as #12)
Won't it be more useful to also log the column name causing the
conflict? Otherwise, if there are 100s of unique columns and just 2 of
them are conflicts how will the user know what to look for when fixing
it?The conflicting column details are logged. In the test case, only the
header line of the DETAIL message is compared to keep it simple.
For example, the full LOG message will look like -ERROR: conflict detected on relation "public.conf_tab":
conflict=multiple_unique_conflicts
DETAIL: The remote tuple violates multiple unique constraints on the
local table.
Key already exists in unique index "conf_tab_pkey", modified
locally in transaction 757 at 2025-02-25 14:00:56.955403+05:30.
Key (a)=(2); existing local tuple (2, 2, 2); remote tuple (2, 3, 4).
Key already exists in unique index "conf_tab_b_key", modified
locally in transaction 758 at 2025-02-25 14:00:56.957092+05:30.
Key (b)=(3); existing local tuple (3, 3, 3); remote tuple (2, 3, 4).
Key already exists in unique index "conf_tab_c_key", modified
locally in transaction 759 at 2025-02-25 14:00:56.957337+05:30.
Key (c)=(4); existing local tuple (4, 4, 4); remote tuple (2, 3, 4).OK, but I think the "keep it simple" approach hides all the
interesting conflict details, which means you can't even tell if the
log emits all the expected conflicts or not. Changing the test regex
to match all those local/remove tuple details in the log would be more
useful.
Fixed. Added regex to match the conflicting index, key, local tuple
and remote tuple.
BTW, when I looked at the
'tmp_check/log/035_multiple_unique_conflicts_subscriber.log' I didn't
understand why there are there 3 separate
"conflict=multiple_unique_conflicts" ERRORs in the log. Since, AFAICT
in the TAP file, there are only 2 failure test cases (e.g. due to
INSERT, and due to UPDATE). Can you explain the difference?
Thanks for pointing it out. There was a log offset issue—the test
waited for the LOG entry after the apply worker errored out and
retried the conflict. I've fixed it now.
Attached the v3 patch with the above comments addressed. I've also
optimized the test case and removed unnecessary code.
[1]: /messages/by-id/CABdArM42thrKrgvERxyw9hhc56tM4P+CmjKFpU07hR_9UMM8Zg@mail.gmail.com
--
Thanks,
Nisha Moond
Attachments:
v3-0001-Implement-the-conflict-detection-for-multiple_uni.patchapplication/octet-stream; name=v3-0001-Implement-the-conflict-detection-for-multiple_uni.patchDownload
From a43c389ee92a1ded5158176a6a29026e3cbd1e3d Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Tue, 22 Oct 2024 11:01:41 +0530
Subject: [PATCH v3] Implement the conflict detection for
multiple_unique_conflicts in logical replication
Introduce a new conflict type, multiple_unique_conflicts, to handle cases
where an incoming row during logical replication violates multiple UNIQUE
constraints.
Previously, the apply worker detected and reported only the first
encountered key conflict (insert_exists/update_exists), causing repeated
failures as each constraint violation need to be handled one by one making
the process slow and error-prone.
Now, the apply worker checks all unique constraints upfront and reports
multiple_unique_conflicts if multiple violations exist. This allows users
to resolve all conflicts at once by deleting all conflicting tuples rather
than dealing with them individually or skipping the transaction.
Also, the patch adds a new column 'confl_multiple_unique_conflicts' in view
pg_stat_subscription_stats to support stats collection for this conflict type.
---
doc/src/sgml/logical-replication.sgml | 13 ++
doc/src/sgml/monitoring.sgml | 12 ++
src/backend/catalog/system_views.sql | 1 +
src/backend/executor/execReplication.c | 50 ++++++--
src/backend/replication/logical/conflict.c | 94 ++++++++++++++-
src/backend/utils/adt/pgstatfuncs.c | 6 +-
src/include/catalog/pg_proc.dat | 6 +-
src/include/replication/conflict.h | 11 +-
src/test/regress/expected/rules.out | 3 +-
src/test/subscription/meson.build | 1 +
.../t/035_multiple_unique_conflicts.pl | 114 ++++++++++++++++++
11 files changed, 289 insertions(+), 22 deletions(-)
create mode 100644 src/test/subscription/t/035_multiple_unique_conflicts.pl
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 3d18e507bbc..4817206af7d 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1877,6 +1877,19 @@ test_sub=# SELECT * from tab_gen_to_gen;
</para>
</listitem>
</varlistentry>
+ <varlistentry id="conflict-multiple-unique-conflicts" xreflabel="multiple_unique_conflicts">
+ <term><literal>multiple_unique_conflicts</literal></term>
+ <listitem>
+ <para>
+ Inserting a row or updating values of a row violates more than one
+ <literal>NOT DEFERRABLE</literal> unique constraint. Note that to log
+ the origin and commit timestamp details of the conflicting key,
+ <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+ should be enabled on the subscriber. In this case, an error will be
+ raised until the conflict is resolved manually.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
Note that there are other conflict scenarios, such as exclusion constraint
violations. Currently, we do not provide additional details for them in the
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index aaa6586d3a4..0960f5ba94a 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2250,6 +2250,18 @@ description | Waiting for a newly initialized WAL file to reach durable storage
</para></entry>
</row>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>confl_multiple_unique_conflicts</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times a row insertion or an updated row values violated multiple
+ <literal>NOT DEFERRABLE</literal> unique constraints during the
+ application of changes. See <xref linkend="conflict-multiple-unique-conflicts"/>
+ for details about this conflict.
+ </para></entry>
+ </row>
+
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index a4d2cfdcaf5..31d269b7ee0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1384,6 +1384,7 @@ CREATE VIEW pg_stat_subscription_stats AS
ss.confl_update_missing,
ss.confl_delete_origin_differs,
ss.confl_delete_missing,
+ ss.confl_multiple_unique_conflicts,
ss.stats_reset
FROM pg_subscription as s,
pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 0a9b880d250..2432db3a5f4 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -493,25 +493,55 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
ConflictType type, List *recheckIndexes,
TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
{
+ int conflicts = 0;
+ List *conflictSlots = NIL;
+ List *conflictIndexes = NIL;
+ TupleTableSlot *conflictslot;
+
/* Check all the unique indexes for a conflict */
foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
{
- TupleTableSlot *conflictslot;
-
if (list_member_oid(recheckIndexes, uniqueidx) &&
FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
&conflictslot))
{
- RepOriginId origin;
- TimestampTz committs;
- TransactionId xmin;
-
- GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
- ReportApplyConflict(estate, resultRelInfo, ERROR, type,
- searchslot, conflictslot, remoteslot,
- uniqueidx, xmin, origin, committs);
+ conflicts++;
+
+ /* Add the conflict slot and index to their respective lists */
+ conflictSlots = lappend(conflictSlots, conflictslot);
+ conflictIndexes = lappend_oid(conflictIndexes, uniqueidx);
}
}
+
+ /*
+ * Report an INSERT_EXISTS or UPDATE_EXISTS conflict when only one unique
+ * constraint is violated.
+ */
+ if (conflicts == 1)
+ {
+ Oid uniqueidx;
+ RepOriginId origin;
+ TimestampTz committs;
+ TransactionId xmin;
+
+ uniqueidx = linitial_oid(conflictIndexes);
+ conflictslot = linitial(conflictSlots);
+
+ GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
+ ReportApplyConflict(estate, resultRelInfo, ERROR, type,
+ searchslot, conflictslot, remoteslot,
+ uniqueidx, xmin, origin, committs);
+ }
+
+ /*
+ * Report MULTIPLE_UNIQUE_CONFLICTS when two or more unique constraints
+ * are violated.
+ */
+ else if (conflicts > 1)
+ ReportMultipleUniqueConflict(estate, resultRelInfo, ERROR,
+ CT_MULTIPLE_UNIQUE_CONFLICTS,
+ searchslot, remoteslot,
+ conflictSlots, conflictIndexes);
}
/*
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 772fc83e88b..ae297d86374 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -29,7 +29,8 @@ static const char *const ConflictTypeNames[] = {
[CT_UPDATE_EXISTS] = "update_exists",
[CT_UPDATE_MISSING] = "update_missing",
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
- [CT_DELETE_MISSING] = "delete_missing"
+ [CT_DELETE_MISSING] = "delete_missing",
+ [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
static int errcode_apply_conflict(ConflictType type);
@@ -41,7 +42,7 @@ static int errdetail_apply_conflict(EState *estate,
TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
RepOriginId localorigin,
- TimestampTz localts);
+ TimestampTz localts, StringInfo err_msg);
static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
ConflictType type,
TupleTableSlot *searchslot,
@@ -125,7 +126,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictTypeNames[type]),
errdetail_apply_conflict(estate, relinfo, type, searchslot,
localslot, remoteslot, indexoid,
- localxmin, localorigin, localts));
+ localxmin, localorigin, localts, NULL));
}
/*
@@ -169,6 +170,7 @@ errcode_apply_conflict(ConflictType type)
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
return errcode(ERRCODE_UNIQUE_VIOLATION);
case CT_UPDATE_ORIGIN_DIFFERS:
case CT_UPDATE_MISSING:
@@ -196,7 +198,8 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
ConflictType type, TupleTableSlot *searchslot,
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts)
+ RepOriginId localorigin, TimestampTz localts,
+ StringInfo err_msg)
{
StringInfoData err_detail;
char *val_desc;
@@ -209,6 +212,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
Assert(OidIsValid(indexoid));
if (localts)
@@ -291,6 +295,17 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
if (val_desc)
appendStringInfo(&err_detail, "\n%s", val_desc);
+ /*
+ * If the caller provides a non-null 'err_msg' pointer, only the
+ * err_detail.data is requested. Append the constructed err_detail message
+ * to 'err_msg' and return.
+ */
+ if (err_msg)
+ {
+ appendStringInfo(err_msg, "\n%s", err_detail.data);
+ return 0;
+ }
+
return errdetail_internal("%s", err_detail.data);
}
@@ -323,7 +338,8 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
* Report the conflicting key values in the case of a unique constraint
* violation.
*/
- if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
+ if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
+ type == CT_MULTIPLE_UNIQUE_CONFLICTS)
{
Assert(OidIsValid(indexoid) && localslot);
@@ -489,3 +505,71 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
return index_value;
}
+
+/*
+ * Report a multiple_unique_conflicts while applying replication changes.
+ *
+ * 'searchslot' holds the tuple used to search the corresponding local
+ * tuple for update or deletion.
+ *
+ * 'remoteslot' contains the new tuple from the remote side, if available.
+ *
+ * 'conflictSlots' is a list of slots containing the local tuples
+ * that conflict with the remote tuple.
+ *
+ * The 'conflictIndexes' list stores the OIDs of the unique indexes that
+ * triggered the constraint violation. These indexes help identify the key
+ * values of the conflicting tuple.
+ *
+ * The caller must ensure that all indexes in 'conflictIndexes' are locked,
+ * allowing us to fetch and display the conflicting key values.
+ */
+void
+ReportMultipleUniqueConflict(EState *estate, ResultRelInfo *relinfo,
+ int elevel, ConflictType type,
+ TupleTableSlot *searchslot,
+ TupleTableSlot *remoteslot,
+ List *conflictSlots, List *conflictIndexes)
+{
+ int conflictNum = 0;
+ Oid indexoid = linitial_oid(conflictIndexes);
+ Relation localrel = relinfo->ri_RelationDesc;
+ RepOriginId localorigin;
+ TimestampTz localts;
+ TransactionId localxmin;
+ StringInfoData err_detail;
+
+ initStringInfo(&err_detail);
+ appendStringInfo(&err_detail, _("The remote tuple violates multiple unique constraints on the local table."));
+
+ foreach_ptr(TupleTableSlot, slot, conflictSlots)
+ {
+ indexoid = lfirst_oid(list_nth_cell(conflictIndexes, conflictNum));
+
+ Assert(!OidIsValid(indexoid) ||
+ CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+
+ GetTupleTransactionInfo(slot, &localxmin, &localorigin, &localts);
+
+ /*
+ * Build the error detail message containing the conflicting key and
+ * tuple information. The details for each conflict will be appended
+ * to err_detail.
+ */
+ errdetail_apply_conflict(estate, relinfo, type, searchslot,
+ slot, remoteslot, indexoid,
+ localxmin, localorigin, localts, &err_detail);
+
+ conflictNum++;
+ }
+
+ pgstat_report_subscription_conflict(MySubscription->oid, type);
+
+ ereport(elevel,
+ errcode_apply_conflict(type),
+ errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+ get_namespace_name(RelationGetNamespace(localrel)),
+ RelationGetRelationName(localrel),
+ ConflictTypeNames[type]),
+ errdetail_internal("%s", err_detail.data));
+}
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 662ce46cbc2..97af7c6554f 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2171,7 +2171,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 10
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 11
Oid subid = PG_GETARG_OID(0);
TupleDesc tupdesc;
Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
@@ -2203,7 +2203,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 42e427f8fe8..68bb2f92024 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5644,9 +5644,9 @@
{ oid => '6231', descr => 'statistics: information about subscription stats',
proname => 'pg_stat_get_subscription_stats', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
- proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}',
+ proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
prosrc => 'pg_stat_get_subscription_stats' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 37454dc9513..29c3c085283 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -41,6 +41,9 @@ typedef enum
/* The row to be deleted is missing */
CT_DELETE_MISSING,
+ /* The row to be inserted/updated violates multiple unique constraint */
+ CT_MULTIPLE_UNIQUE_CONFLICTS,
+
/*
* Other conflicts, such as exclusion constraint violations, involve more
* complex rules than simple equality checks. These conflicts are left for
@@ -48,7 +51,7 @@ typedef enum
*/
} ConflictType;
-#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+#define CONFLICT_NUM_TYPES (CT_MULTIPLE_UNIQUE_CONFLICTS + 1)
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
TransactionId *xmin,
@@ -63,4 +66,10 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
RepOriginId localorigin, TimestampTz localts);
extern void InitConflictIndexes(ResultRelInfo *relInfo);
+extern void ReportMultipleUniqueConflict(EState *estate, ResultRelInfo *relinfo,
+ int elevel, ConflictType type,
+ TupleTableSlot *searchslot,
+ TupleTableSlot *remoteslot,
+ List *conflictslots_list,
+ List *conflictIndexes);
#endif
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 62f69ac20b2..47478969135 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2157,9 +2157,10 @@ pg_stat_subscription_stats| SELECT ss.subid,
ss.confl_update_missing,
ss.confl_delete_origin_differs,
ss.confl_delete_missing,
+ ss.confl_multiple_unique_conflicts,
ss.stats_reset
FROM pg_subscription s,
- LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset);
+ LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
pg_stat_sys_indexes| SELECT relid,
indexrelid,
schemaname,
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index d40b49714f6..05fcdd08f57 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -41,6 +41,7 @@ tests += {
't/032_subscribe_use_index.pl',
't/033_run_as_table_owner.pl',
't/034_temporal.pl',
+ 't/035_multiple_unique_conflicts.pl',
't/100_bugs.pl',
],
},
diff --git a/src/test/subscription/t/035_multiple_unique_conflicts.pl b/src/test/subscription/t/035_multiple_unique_conflicts.pl
new file mode 100644
index 00000000000..2788bbc4628
--- /dev/null
+++ b/src/test/subscription/t/035_multiple_unique_conflicts.pl
@@ -0,0 +1,114 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection of conflict type 'multiple_unique_conflicts'.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################
+# Setup
+###############################
+
+# Create a publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create a subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create a table on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE conf_tab (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);");
+
+# Create same table on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE conf_tab (a int PRIMARY key, b int unique, c int unique);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub_tab FOR TABLE conf_tab");
+
+# Create the subscription
+my $appname = 'sub_tab';
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE SUBSCRIPTION sub_tab
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION pub_tab;");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+##################################################
+# INSERT data on Pub and Sub
+##################################################
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (1,1,1);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,2,2), (3,3,3), (4,4,4);");
+
+##################################################
+# Test multiple_unique_conflicts due to INSERT
+##################################################
+my $log_offset = -s $node_subscriber->logfile;
+my $logfile = slurp_file($node_subscriber->logfile, $log_offset);
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,3,4);");
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+ qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ $log_offset);
+
+ok( $logfile =~
+ qr/DETAIL: The remote tuple violates multiple unique constraints on the local table.\n/
+ . qr/.*Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(2\); existing local tuple \(2, 2, 2\); remote tuple \(2, 3, 4\).\n/
+ . qr/.*Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(3\); existing local tuple \(3, 3, 3\); remote tuple \(2, 3, 4\).\n/
+ . qr/.*Key already exists in unique index \"conf_tab_c_key\".*\n.*Key \(c\)=\(4\); existing local tuple \(4, 4, 4\); remote tuple \(2, 3, 4\)./m,
+ 'multiple_unique_conflicts detected during insertion');
+
+# Truncate table to get rid of the error
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+$node_publisher->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+##################################################
+# Test multiple_unique_conflicts due to UPDATE
+##################################################
+$log_offset = -s $node_subscriber->logfile;
+$logfile = slurp_file($node_subscriber->logfile, $log_offset);
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (1,1,1);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,2,2), (3,3,3), (4,4,4);");
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE conf_tab set a=2,b=3,c=4 where a=1;");
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+ qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ $log_offset);
+
+ok( $logfile =~
+ qr/DETAIL: The remote tuple violates multiple unique constraints on the local table.\n/
+ . qr/.*Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(2\); existing local tuple \(2, 2, 2\); remote tuple \(2, 3, 4\).\n/
+ . qr/.*Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(3\); existing local tuple \(3, 3, 3\); remote tuple \(2, 3, 4\).\n/
+ . qr/.*Key already exists in unique index \"conf_tab_c_key\".*\n.*Key \(c\)=\(4\); existing local tuple \(4, 4, 4\); remote tuple \(2, 3, 4\)./m,
+ 'multiple_unique_conflicts detected during update');
+
+done_testing();
--
2.34.1
Hi,
For v3, CFbot failed for the patch test case due to unstable test
steps. Truncating the subscriber table resolved the apply worker error
but sometimes caused out-of-order inserts, leading to unexpected
failures instead of a conflict error. Fixed this and updated the LOG
regex check to avoid false positives.
Note: I have broken down the full error detail message check into
multiple checks to avoid very long lines in the file. I'll see if
there's a better way to compare the full error detail in a single
check.
Attached is the v4 patch (test case changes only).
--
Thanks,
Nisha Moond
Attachments:
v4-0001-Implement-the-conflict-detection-for-multiple_uni.patchapplication/octet-stream; name=v4-0001-Implement-the-conflict-detection-for-multiple_uni.patchDownload
From 152ece6a4540981fb9e9e584af404709dc47e3fd Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Tue, 22 Oct 2024 11:01:41 +0530
Subject: [PATCH v4] Implement the conflict detection for
multiple_unique_conflicts in logical replication
Introduce a new conflict type, multiple_unique_conflicts, to handle cases
where an incoming row during logical replication violates multiple UNIQUE
constraints.
Previously, the apply worker detected and reported only the first
encountered key conflict (insert_exists/update_exists), causing repeated
failures as each constraint violation need to be handled one by one making
the process slow and error-prone.
Now, the apply worker checks all unique constraints upfront and reports
multiple_unique_conflicts if multiple violations exist. This allows users
to resolve all conflicts at once by deleting all conflicting tuples rather
than dealing with them individually or skipping the transaction.
Also, the patch adds a new column 'confl_multiple_unique_conflicts' in view
pg_stat_subscription_stats to support stats collection for this conflict type.
---
doc/src/sgml/logical-replication.sgml | 13 ++
doc/src/sgml/monitoring.sgml | 12 ++
src/backend/catalog/system_views.sql | 1 +
src/backend/executor/execReplication.c | 50 ++++--
src/backend/replication/logical/conflict.c | 94 +++++++++++-
src/backend/utils/adt/pgstatfuncs.c | 6 +-
src/include/catalog/pg_proc.dat | 6 +-
src/include/replication/conflict.h | 11 +-
src/test/regress/expected/rules.out | 3 +-
src/test/subscription/meson.build | 1 +
.../t/035_multiple_unique_conflicts.pl | 143 ++++++++++++++++++
11 files changed, 318 insertions(+), 22 deletions(-)
create mode 100644 src/test/subscription/t/035_multiple_unique_conflicts.pl
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 3d18e507bbc..4817206af7d 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1877,6 +1877,19 @@ test_sub=# SELECT * from tab_gen_to_gen;
</para>
</listitem>
</varlistentry>
+ <varlistentry id="conflict-multiple-unique-conflicts" xreflabel="multiple_unique_conflicts">
+ <term><literal>multiple_unique_conflicts</literal></term>
+ <listitem>
+ <para>
+ Inserting a row or updating values of a row violates more than one
+ <literal>NOT DEFERRABLE</literal> unique constraint. Note that to log
+ the origin and commit timestamp details of the conflicting key,
+ <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+ should be enabled on the subscriber. In this case, an error will be
+ raised until the conflict is resolved manually.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
Note that there are other conflict scenarios, such as exclusion constraint
violations. Currently, we do not provide additional details for them in the
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index aaa6586d3a4..0960f5ba94a 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2250,6 +2250,18 @@ description | Waiting for a newly initialized WAL file to reach durable storage
</para></entry>
</row>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>confl_multiple_unique_conflicts</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times a row insertion or an updated row values violated multiple
+ <literal>NOT DEFERRABLE</literal> unique constraints during the
+ application of changes. See <xref linkend="conflict-multiple-unique-conflicts"/>
+ for details about this conflict.
+ </para></entry>
+ </row>
+
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index a4d2cfdcaf5..31d269b7ee0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1384,6 +1384,7 @@ CREATE VIEW pg_stat_subscription_stats AS
ss.confl_update_missing,
ss.confl_delete_origin_differs,
ss.confl_delete_missing,
+ ss.confl_multiple_unique_conflicts,
ss.stats_reset
FROM pg_subscription as s,
pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 0a9b880d250..2432db3a5f4 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -493,25 +493,55 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
ConflictType type, List *recheckIndexes,
TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
{
+ int conflicts = 0;
+ List *conflictSlots = NIL;
+ List *conflictIndexes = NIL;
+ TupleTableSlot *conflictslot;
+
/* Check all the unique indexes for a conflict */
foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
{
- TupleTableSlot *conflictslot;
-
if (list_member_oid(recheckIndexes, uniqueidx) &&
FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
&conflictslot))
{
- RepOriginId origin;
- TimestampTz committs;
- TransactionId xmin;
-
- GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
- ReportApplyConflict(estate, resultRelInfo, ERROR, type,
- searchslot, conflictslot, remoteslot,
- uniqueidx, xmin, origin, committs);
+ conflicts++;
+
+ /* Add the conflict slot and index to their respective lists */
+ conflictSlots = lappend(conflictSlots, conflictslot);
+ conflictIndexes = lappend_oid(conflictIndexes, uniqueidx);
}
}
+
+ /*
+ * Report an INSERT_EXISTS or UPDATE_EXISTS conflict when only one unique
+ * constraint is violated.
+ */
+ if (conflicts == 1)
+ {
+ Oid uniqueidx;
+ RepOriginId origin;
+ TimestampTz committs;
+ TransactionId xmin;
+
+ uniqueidx = linitial_oid(conflictIndexes);
+ conflictslot = linitial(conflictSlots);
+
+ GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
+ ReportApplyConflict(estate, resultRelInfo, ERROR, type,
+ searchslot, conflictslot, remoteslot,
+ uniqueidx, xmin, origin, committs);
+ }
+
+ /*
+ * Report MULTIPLE_UNIQUE_CONFLICTS when two or more unique constraints
+ * are violated.
+ */
+ else if (conflicts > 1)
+ ReportMultipleUniqueConflict(estate, resultRelInfo, ERROR,
+ CT_MULTIPLE_UNIQUE_CONFLICTS,
+ searchslot, remoteslot,
+ conflictSlots, conflictIndexes);
}
/*
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 772fc83e88b..ae297d86374 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -29,7 +29,8 @@ static const char *const ConflictTypeNames[] = {
[CT_UPDATE_EXISTS] = "update_exists",
[CT_UPDATE_MISSING] = "update_missing",
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
- [CT_DELETE_MISSING] = "delete_missing"
+ [CT_DELETE_MISSING] = "delete_missing",
+ [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
static int errcode_apply_conflict(ConflictType type);
@@ -41,7 +42,7 @@ static int errdetail_apply_conflict(EState *estate,
TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
RepOriginId localorigin,
- TimestampTz localts);
+ TimestampTz localts, StringInfo err_msg);
static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
ConflictType type,
TupleTableSlot *searchslot,
@@ -125,7 +126,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictTypeNames[type]),
errdetail_apply_conflict(estate, relinfo, type, searchslot,
localslot, remoteslot, indexoid,
- localxmin, localorigin, localts));
+ localxmin, localorigin, localts, NULL));
}
/*
@@ -169,6 +170,7 @@ errcode_apply_conflict(ConflictType type)
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
return errcode(ERRCODE_UNIQUE_VIOLATION);
case CT_UPDATE_ORIGIN_DIFFERS:
case CT_UPDATE_MISSING:
@@ -196,7 +198,8 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
ConflictType type, TupleTableSlot *searchslot,
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts)
+ RepOriginId localorigin, TimestampTz localts,
+ StringInfo err_msg)
{
StringInfoData err_detail;
char *val_desc;
@@ -209,6 +212,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
Assert(OidIsValid(indexoid));
if (localts)
@@ -291,6 +295,17 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
if (val_desc)
appendStringInfo(&err_detail, "\n%s", val_desc);
+ /*
+ * If the caller provides a non-null 'err_msg' pointer, only the
+ * err_detail.data is requested. Append the constructed err_detail message
+ * to 'err_msg' and return.
+ */
+ if (err_msg)
+ {
+ appendStringInfo(err_msg, "\n%s", err_detail.data);
+ return 0;
+ }
+
return errdetail_internal("%s", err_detail.data);
}
@@ -323,7 +338,8 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
* Report the conflicting key values in the case of a unique constraint
* violation.
*/
- if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
+ if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
+ type == CT_MULTIPLE_UNIQUE_CONFLICTS)
{
Assert(OidIsValid(indexoid) && localslot);
@@ -489,3 +505,71 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
return index_value;
}
+
+/*
+ * Report a multiple_unique_conflicts while applying replication changes.
+ *
+ * 'searchslot' holds the tuple used to search the corresponding local
+ * tuple for update or deletion.
+ *
+ * 'remoteslot' contains the new tuple from the remote side, if available.
+ *
+ * 'conflictSlots' is a list of slots containing the local tuples
+ * that conflict with the remote tuple.
+ *
+ * The 'conflictIndexes' list stores the OIDs of the unique indexes that
+ * triggered the constraint violation. These indexes help identify the key
+ * values of the conflicting tuple.
+ *
+ * The caller must ensure that all indexes in 'conflictIndexes' are locked,
+ * allowing us to fetch and display the conflicting key values.
+ */
+void
+ReportMultipleUniqueConflict(EState *estate, ResultRelInfo *relinfo,
+ int elevel, ConflictType type,
+ TupleTableSlot *searchslot,
+ TupleTableSlot *remoteslot,
+ List *conflictSlots, List *conflictIndexes)
+{
+ int conflictNum = 0;
+ Oid indexoid = linitial_oid(conflictIndexes);
+ Relation localrel = relinfo->ri_RelationDesc;
+ RepOriginId localorigin;
+ TimestampTz localts;
+ TransactionId localxmin;
+ StringInfoData err_detail;
+
+ initStringInfo(&err_detail);
+ appendStringInfo(&err_detail, _("The remote tuple violates multiple unique constraints on the local table."));
+
+ foreach_ptr(TupleTableSlot, slot, conflictSlots)
+ {
+ indexoid = lfirst_oid(list_nth_cell(conflictIndexes, conflictNum));
+
+ Assert(!OidIsValid(indexoid) ||
+ CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+
+ GetTupleTransactionInfo(slot, &localxmin, &localorigin, &localts);
+
+ /*
+ * Build the error detail message containing the conflicting key and
+ * tuple information. The details for each conflict will be appended
+ * to err_detail.
+ */
+ errdetail_apply_conflict(estate, relinfo, type, searchslot,
+ slot, remoteslot, indexoid,
+ localxmin, localorigin, localts, &err_detail);
+
+ conflictNum++;
+ }
+
+ pgstat_report_subscription_conflict(MySubscription->oid, type);
+
+ ereport(elevel,
+ errcode_apply_conflict(type),
+ errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+ get_namespace_name(RelationGetNamespace(localrel)),
+ RelationGetRelationName(localrel),
+ ConflictTypeNames[type]),
+ errdetail_internal("%s", err_detail.data));
+}
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 662ce46cbc2..97af7c6554f 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2171,7 +2171,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 10
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 11
Oid subid = PG_GETARG_OID(0);
TupleDesc tupdesc;
Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
@@ -2203,7 +2203,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 42e427f8fe8..68bb2f92024 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5644,9 +5644,9 @@
{ oid => '6231', descr => 'statistics: information about subscription stats',
proname => 'pg_stat_get_subscription_stats', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
- proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}',
+ proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
prosrc => 'pg_stat_get_subscription_stats' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 37454dc9513..29c3c085283 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -41,6 +41,9 @@ typedef enum
/* The row to be deleted is missing */
CT_DELETE_MISSING,
+ /* The row to be inserted/updated violates multiple unique constraint */
+ CT_MULTIPLE_UNIQUE_CONFLICTS,
+
/*
* Other conflicts, such as exclusion constraint violations, involve more
* complex rules than simple equality checks. These conflicts are left for
@@ -48,7 +51,7 @@ typedef enum
*/
} ConflictType;
-#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+#define CONFLICT_NUM_TYPES (CT_MULTIPLE_UNIQUE_CONFLICTS + 1)
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
TransactionId *xmin,
@@ -63,4 +66,10 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
RepOriginId localorigin, TimestampTz localts);
extern void InitConflictIndexes(ResultRelInfo *relInfo);
+extern void ReportMultipleUniqueConflict(EState *estate, ResultRelInfo *relinfo,
+ int elevel, ConflictType type,
+ TupleTableSlot *searchslot,
+ TupleTableSlot *remoteslot,
+ List *conflictslots_list,
+ List *conflictIndexes);
#endif
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 62f69ac20b2..47478969135 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2157,9 +2157,10 @@ pg_stat_subscription_stats| SELECT ss.subid,
ss.confl_update_missing,
ss.confl_delete_origin_differs,
ss.confl_delete_missing,
+ ss.confl_multiple_unique_conflicts,
ss.stats_reset
FROM pg_subscription s,
- LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset);
+ LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
pg_stat_sys_indexes| SELECT relid,
indexrelid,
schemaname,
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index d40b49714f6..05fcdd08f57 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -41,6 +41,7 @@ tests += {
't/032_subscribe_use_index.pl',
't/033_run_as_table_owner.pl',
't/034_temporal.pl',
+ 't/035_multiple_unique_conflicts.pl',
't/100_bugs.pl',
],
},
diff --git a/src/test/subscription/t/035_multiple_unique_conflicts.pl b/src/test/subscription/t/035_multiple_unique_conflicts.pl
new file mode 100644
index 00000000000..1eb0bd022a1
--- /dev/null
+++ b/src/test/subscription/t/035_multiple_unique_conflicts.pl
@@ -0,0 +1,143 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection of conflict type 'multiple_unique_conflicts'.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################
+# Setup
+###############################
+
+# Create a publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create a subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create a table on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE conf_tab (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);");
+
+# Create same table on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE conf_tab (a int PRIMARY key, b int unique, c int unique);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub_tab FOR TABLE conf_tab");
+
+# Create the subscription
+my $appname = 'sub_tab';
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE SUBSCRIPTION sub_tab
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION pub_tab;");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+##################################################
+# INSERT data on Pub and Sub
+##################################################
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (1,1,1);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,2,2), (3,3,3), (4,4,4);");
+
+##################################################
+# Test multiple_unique_conflicts due to INSERT
+##################################################
+my $log_offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,3,4);");
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+ qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ $log_offset);
+
+ok( $node_subscriber->log_contains(
+ qr/DETAIL: The remote tuple violates multiple unique constraints on the local table./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during insertion');
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(2\); existing local tuple \(2, 2, 2\); remote tuple \(2, 3, 4\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during insertion for conf_tab_pkey (a) = (2)'
+);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(3\); existing local tuple \(3, 3, 3\); remote tuple \(2, 3, 4\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during insertion for conf_tab_b_key (b) = (3)'
+);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_c_key\".*\n.*Key \(c\)=\(4\); existing local tuple \(4, 4, 4\); remote tuple \(2, 3, 4\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during insertion for conf_tab_c_key (c) = (4)'
+);
+
+# Truncate table to get rid of the error
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+##################################################
+# Test multiple_unique_conflicts due to UPDATE
+##################################################
+$log_offset = -s $node_subscriber->logfile;
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (5,5,5);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (6,6,6), (7,7,7), (8,8,8);");
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE conf_tab set a=6, b=7, c=8 where a=5;");
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+ qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ $log_offset);
+
+ok( $node_subscriber->log_contains(
+ qr/DETAIL: The remote tuple violates multiple unique constraints on the local table./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during update');
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(6\); existing local tuple \(6, 6, 6\); remote tuple \(6, 7, 8\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during update for conf_tab_pkey (a) = (6)'
+);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(7\); existing local tuple \(7, 7, 7\); remote tuple \(6, 7, 8\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during update for conf_tab_b_key (b) = (7)'
+);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_c_key\".*\n.*Key \(c\)=\(8\); existing local tuple \(8, 8, 8\); remote tuple \(6, 7, 8\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during update for conf_tab_c_key (c) = (8)'
+);
+
+done_testing();
--
2.34.1
On Thu, Mar 13, 2025 at 4:30 PM Nisha Moond <nisha.moond412@gmail.com> wrote:
Attached is the v4 patch (test case changes only).
Comments:
=========
1.
+ /*
+ * Report an INSERT_EXISTS or UPDATE_EXISTS conflict when only one unique
+ * constraint is violated.
+ */
+ if (conflicts == 1)
+ {
+ Oid uniqueidx;
+ RepOriginId origin;
+ TimestampTz committs;
+ TransactionId xmin;
+
+ uniqueidx = linitial_oid(conflictIndexes);
+ conflictslot = linitial(conflictSlots);
+
+ GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
+ ReportApplyConflict(estate, resultRelInfo, ERROR, type,
+ searchslot, conflictslot, remoteslot,
+ uniqueidx, xmin, origin, committs);
+ }
+
+ /*
+ * Report MULTIPLE_UNIQUE_CONFLICTS when two or more unique constraints
+ * are violated.
+ */
+ else if (conflicts > 1)
+ ReportMultipleUniqueConflict(estate, resultRelInfo, ERROR,
+ CT_MULTIPLE_UNIQUE_CONFLICTS,
+ searchslot, remoteslot,
+ conflictSlots, conflictIndexes);
It looks a bit odd to have different functions for one or multiple
conflicts. We can improve this coding pattern by extending the current
function ReportApplyConflict to report one or multiple conflicts
depending on the length of conflictSlots.
2. From the commit message: "Also, the patch adds a new column
'confl_multiple_unique_conflicts' in view pg_stat_subscription_stats
to support stats collection for this conflict type.". This part can be
split into a second patch. Let's try to get the core patch first.
--
With Regards,
Amit Kapila.
On Mon, Mar 17, 2025 at 3:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Thu, Mar 13, 2025 at 4:30 PM Nisha Moond <nisha.moond412@gmail.com> wrote:
Attached is the v4 patch (test case changes only).
Comments: ========= 1. + /* + * Report an INSERT_EXISTS or UPDATE_EXISTS conflict when only one unique + * constraint is violated. + */ + if (conflicts == 1) + { + Oid uniqueidx; + RepOriginId origin; + TimestampTz committs; + TransactionId xmin; + + uniqueidx = linitial_oid(conflictIndexes); + conflictslot = linitial(conflictSlots); + + GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs); + ReportApplyConflict(estate, resultRelInfo, ERROR, type, + searchslot, conflictslot, remoteslot, + uniqueidx, xmin, origin, committs); + } + + /* + * Report MULTIPLE_UNIQUE_CONFLICTS when two or more unique constraints + * are violated. + */ + else if (conflicts > 1) + ReportMultipleUniqueConflict(estate, resultRelInfo, ERROR, + CT_MULTIPLE_UNIQUE_CONFLICTS, + searchslot, remoteslot, + conflictSlots, conflictIndexes);It looks a bit odd to have different functions for one or multiple
conflicts. We can improve this coding pattern by extending the current
function ReportApplyConflict to report one or multiple conflicts
depending on the length of conflictSlots.
Modified the code to use the existing ReportApplyConflict function.
2. From the commit message: "Also, the patch adds a new column
'confl_multiple_unique_conflicts' in view pg_stat_subscription_stats
to support stats collection for this conflict type.". This part can be
split into a second patch. Let's try to get the core patch first.
I have separated the "stats" part from the core patch and will post it
as a separate patch in the next version.
Please find the attached v5-0001 patch without the stats part.
--
Thanks,
Nisha
Attachments:
v5-0001-Implement-the-conflict-detection-for-multiple_uni.patchapplication/octet-stream; name=v5-0001-Implement-the-conflict-detection-for-multiple_uni.patchDownload
From 8d8c9208193cb93170b5b0423d52c5459b45890c Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Tue, 18 Mar 2025 12:27:48 +0530
Subject: [PATCH v5] Implement the conflict detection for
multiple_unique_conflicts in logical replication
Introduce a new conflict type, multiple_unique_conflicts, to handle cases
where an incoming row during logical replication violates multiple UNIQUE
constraints.
Previously, the apply worker detected and reported only the first
encountered key conflict (insert_exists/update_exists), causing repeated
failures as each constraint violation need to be handled one by one making
the process slow and error-prone.
Now, the apply worker checks all unique constraints upfront and reports
multiple_unique_conflicts if multiple violations exist. This allows users
to resolve all conflicts at once by deleting all conflicting tuples rather
than dealing with them individually or skipping the transaction.
---
doc/src/sgml/logical-replication.sgml | 13 ++
src/backend/executor/execReplication.c | 29 ++--
src/backend/replication/logical/conflict.c | 76 +++++++---
src/backend/replication/logical/worker.c | 18 +--
src/include/replication/conflict.h | 13 +-
src/test/subscription/meson.build | 1 +
.../t/035_multiple_unique_conflicts.pl | 133 ++++++++++++++++++
7 files changed, 239 insertions(+), 44 deletions(-)
create mode 100644 src/test/subscription/t/035_multiple_unique_conflicts.pl
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 3d18e507bbc..4817206af7d 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1877,6 +1877,19 @@ test_sub=# SELECT * from tab_gen_to_gen;
</para>
</listitem>
</varlistentry>
+ <varlistentry id="conflict-multiple-unique-conflicts" xreflabel="multiple_unique_conflicts">
+ <term><literal>multiple_unique_conflicts</literal></term>
+ <listitem>
+ <para>
+ Inserting a row or updating values of a row violates more than one
+ <literal>NOT DEFERRABLE</literal> unique constraint. Note that to log
+ the origin and commit timestamp details of the conflicting key,
+ <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+ should be enabled on the subscriber. In this case, an error will be
+ raised until the conflict is resolved manually.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
Note that there are other conflict scenarios, such as exclusion constraint
violations. Currently, we do not provide additional details for them in the
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 0a9b880d250..c5b31cd728b 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -493,25 +493,32 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
ConflictType type, List *recheckIndexes,
TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
{
- /* Check all the unique indexes for a conflict */
+ int conflicts = 0;
+ List *conflictSlots = NIL;
+ List *conflictIndexes = NIL;
+ TupleTableSlot *conflictslot;
+
+ /* Check all the unique indexes for conflicts */
foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
{
- TupleTableSlot *conflictslot;
-
if (list_member_oid(recheckIndexes, uniqueidx) &&
FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
&conflictslot))
{
- RepOriginId origin;
- TimestampTz committs;
- TransactionId xmin;
-
- GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
- ReportApplyConflict(estate, resultRelInfo, ERROR, type,
- searchslot, conflictslot, remoteslot,
- uniqueidx, xmin, origin, committs);
+ conflicts++;
+
+ /* Add the conflict slot and index to their respective lists */
+ conflictSlots = lappend(conflictSlots, conflictslot);
+ conflictIndexes = lappend_oid(conflictIndexes, uniqueidx);
}
}
+
+ /* Report the conflict if found */
+ if (conflicts)
+ ReportApplyConflict(estate, resultRelInfo, ERROR,
+ conflicts > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
+ searchslot, conflictSlots, remoteslot,
+ conflictIndexes, InvalidTransactionId, 0, 0);
}
/*
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 772fc83e88b..cba85c16888 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -29,11 +29,12 @@ static const char *const ConflictTypeNames[] = {
[CT_UPDATE_EXISTS] = "update_exists",
[CT_UPDATE_MISSING] = "update_missing",
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
- [CT_DELETE_MISSING] = "delete_missing"
+ [CT_DELETE_MISSING] = "delete_missing",
+ [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
static int errcode_apply_conflict(ConflictType type);
-static int errdetail_apply_conflict(EState *estate,
+static void errdetail_apply_conflict(EState *estate,
ResultRelInfo *relinfo,
ConflictType type,
TupleTableSlot *searchslot,
@@ -41,7 +42,7 @@ static int errdetail_apply_conflict(EState *estate,
TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
RepOriginId localorigin,
- TimestampTz localts);
+ TimestampTz localts, StringInfo err_msg);
static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
ConflictType type,
TupleTableSlot *searchslot,
@@ -90,15 +91,15 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
* 'searchslot' should contain the tuple used to search the local tuple to be
* updated or deleted.
*
- * 'localslot' should contain the existing local tuple, if any, that conflicts
- * with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the
- * transaction information related to this existing local tuple.
+ * 'conflictSlots' list contain the existing local tuples, if any, that
+ * conflicts with the remote tuple. 'localxmin', 'localorigin', and 'localts'
+ * provide the transaction information related to this existing local tuple.
*
* 'remoteslot' should contain the remote new tuple, if any.
*
- * The 'indexoid' represents the OID of the unique index that triggered the
- * constraint violation error. We use this to report the key values for
- * conflicting tuple.
+ * The 'conflictIndexes' list represents the OIDs of the unique index that
+ * triggered the constraint violation error. We use this to report the key
+ * values for conflicting tuple.
*
* The caller must ensure that the index with the OID 'indexoid' is locked so
* that we can fetch and display the conflicting key value.
@@ -106,16 +107,47 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
void
ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictType type, TupleTableSlot *searchslot,
- TupleTableSlot *localslot, TupleTableSlot *remoteslot,
- Oid indexoid, TransactionId localxmin,
+ List *conflictSlots, TupleTableSlot *remoteslot,
+ List *conflictIndexes, TransactionId localxmin,
RepOriginId localorigin, TimestampTz localts)
{
+ int conflictNum = 0;
+ Oid indexoid;
Relation localrel = relinfo->ri_RelationDesc;
+ StringInfoData err_detail;
+
+ initStringInfo(&err_detail);
- Assert(!OidIsValid(indexoid) ||
- CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+ if (!conflictSlots)
+ errdetail_apply_conflict(estate, relinfo, type, searchslot,
+ NULL, remoteslot, InvalidOid,
+ localxmin, localorigin, localts, &err_detail);
- pgstat_report_subscription_conflict(MySubscription->oid, type);
+ foreach_ptr(TupleTableSlot, slot, conflictSlots)
+ {
+ indexoid = lfirst_oid(list_nth_cell(conflictIndexes, conflictNum));
+
+ Assert(!OidIsValid(indexoid) ||
+ CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+
+ if (!localorigin)
+ GetTupleTransactionInfo(slot, &localxmin, &localorigin, &localts);
+
+ /*
+ * Build the error detail message containing the conflicting key and
+ * tuple information. The details for each conflict will be appended
+ * to err_detail.
+ */
+ errdetail_apply_conflict(estate, relinfo, type, searchslot,
+ slot, remoteslot, indexoid,
+ localxmin, localorigin, localts, &err_detail);
+
+ conflictNum++;
+ }
+
+ /* XXX: stats not supported for multiple_unique_conflict in this patch */
+ if (type != CT_MULTIPLE_UNIQUE_CONFLICTS)
+ pgstat_report_subscription_conflict(MySubscription->oid, type);
ereport(elevel,
errcode_apply_conflict(type),
@@ -123,9 +155,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
get_namespace_name(RelationGetNamespace(localrel)),
RelationGetRelationName(localrel),
ConflictTypeNames[type]),
- errdetail_apply_conflict(estate, relinfo, type, searchslot,
- localslot, remoteslot, indexoid,
- localxmin, localorigin, localts));
+ errdetail_internal("%s", err_detail.data));
}
/*
@@ -169,6 +199,7 @@ errcode_apply_conflict(ConflictType type)
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
return errcode(ERRCODE_UNIQUE_VIOLATION);
case CT_UPDATE_ORIGIN_DIFFERS:
case CT_UPDATE_MISSING:
@@ -191,12 +222,13 @@ errcode_apply_conflict(ConflictType type)
* replica identity columns, if any. The remote old tuple is excluded as its
* information is covered in the replica identity columns.
*/
-static int
+static void
errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
ConflictType type, TupleTableSlot *searchslot,
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts)
+ RepOriginId localorigin, TimestampTz localts,
+ StringInfo err_msg)
{
StringInfoData err_detail;
char *val_desc;
@@ -209,6 +241,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
Assert(OidIsValid(indexoid));
if (localts)
@@ -291,7 +324,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
if (val_desc)
appendStringInfo(&err_detail, "\n%s", val_desc);
- return errdetail_internal("%s", err_detail.data);
+ appendStringInfo(err_msg, "%s", err_detail.data);
}
/*
@@ -323,7 +356,8 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
* Report the conflicting key values in the case of a unique constraint
* violation.
*/
- if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
+ if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
+ type == CT_MULTIPLE_UNIQUE_CONFLICTS)
{
Assert(OidIsValid(indexoid) && localslot);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 31ab69ea13a..6a8d8843b61 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2711,8 +2711,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
slot_store_data(newslot, relmapentry, newtup);
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
- remoteslot, localslot, newslot,
- InvalidOid, localxmin, localorigin, localts);
+ remoteslot, list_make1(localslot), newslot,
+ list_make1(InvalidOid), localxmin, localorigin, localts);
}
/* Process and store remote tuple in the slot */
@@ -2742,7 +2742,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
*/
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
remoteslot, NULL, newslot,
- InvalidOid, InvalidTransactionId,
+ NULL, InvalidTransactionId,
InvalidRepOriginId, 0);
}
@@ -2887,8 +2887,8 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
localorigin != replorigin_session_origin)
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
- remoteslot, localslot, NULL,
- InvalidOid, localxmin, localorigin, localts);
+ remoteslot, list_make1(localslot), NULL,
+ list_make1(InvalidOid), localxmin, localorigin, localts);
EvalPlanQualSetSlot(&epqstate, localslot);
@@ -2904,7 +2904,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
*/
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
remoteslot, NULL, NULL,
- InvalidOid, InvalidTransactionId,
+ NULL, InvalidTransactionId,
InvalidRepOriginId, 0);
}
@@ -3096,7 +3096,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
ReportApplyConflict(estate, partrelinfo,
LOG, CT_UPDATE_MISSING,
remoteslot_part, NULL, newslot,
- InvalidOid, InvalidTransactionId,
+ NULL, InvalidTransactionId,
InvalidRepOriginId, 0);
return;
@@ -3116,8 +3116,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
slot_store_data(newslot, part_entry, newtup);
ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
- remoteslot_part, localslot, newslot,
- InvalidOid, localxmin, localorigin,
+ remoteslot_part, list_make1(localslot), newslot,
+ list_make1(InvalidOid), localxmin, localorigin,
localts);
}
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 37454dc9513..2c5a129959d 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -41,6 +41,9 @@ typedef enum
/* The row to be deleted is missing */
CT_DELETE_MISSING,
+ /* The row to be inserted/updated violates multiple unique constraint */
+ CT_MULTIPLE_UNIQUE_CONFLICTS,
+
/*
* Other conflicts, such as exclusion constraint violations, involve more
* complex rules than simple equality checks. These conflicts are left for
@@ -48,6 +51,11 @@ typedef enum
*/
} ConflictType;
+/*
+ * XXX: Don't increament the CONFLICT_NUM_TYPES, as it is used by subscription
+ * stats module and this patch does not support stats for
+ * multiple_unique_conflicts.
+ */
#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
@@ -57,10 +65,9 @@ extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
int elevel, ConflictType type,
TupleTableSlot *searchslot,
- TupleTableSlot *localslot,
+ List *conflictSlots,
TupleTableSlot *remoteslot,
- Oid indexoid, TransactionId localxmin,
+ List *conflictIndexes, TransactionId localxmin,
RepOriginId localorigin, TimestampTz localts);
extern void InitConflictIndexes(ResultRelInfo *relInfo);
-
#endif
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index d40b49714f6..05fcdd08f57 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -41,6 +41,7 @@ tests += {
't/032_subscribe_use_index.pl',
't/033_run_as_table_owner.pl',
't/034_temporal.pl',
+ 't/035_multiple_unique_conflicts.pl',
't/100_bugs.pl',
],
},
diff --git a/src/test/subscription/t/035_multiple_unique_conflicts.pl b/src/test/subscription/t/035_multiple_unique_conflicts.pl
new file mode 100644
index 00000000000..f1417e313db
--- /dev/null
+++ b/src/test/subscription/t/035_multiple_unique_conflicts.pl
@@ -0,0 +1,133 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection of conflict type 'multiple_unique_conflicts'.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################
+# Setup
+###############################
+
+# Create a publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create a subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create a table on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE conf_tab (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);");
+
+# Create same table on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE conf_tab (a int PRIMARY key, b int unique, c int unique);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub_tab FOR TABLE conf_tab");
+
+# Create the subscription
+my $appname = 'sub_tab';
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE SUBSCRIPTION sub_tab
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION pub_tab;");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+##################################################
+# INSERT data on Pub and Sub
+##################################################
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (1,1,1);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,2,2), (3,3,3), (4,4,4);");
+
+##################################################
+# Test multiple_unique_conflicts due to INSERT
+##################################################
+my $log_offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,3,4);");
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+ qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ $log_offset);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(2\); existing local tuple \(2, 2, 2\); remote tuple \(2, 3, 4\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during insertion for conf_tab_pkey (a) = (2)'
+);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(3\); existing local tuple \(3, 3, 3\); remote tuple \(2, 3, 4\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during insertion for conf_tab_b_key (b) = (3)'
+);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_c_key\".*\n.*Key \(c\)=\(4\); existing local tuple \(4, 4, 4\); remote tuple \(2, 3, 4\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during insertion for conf_tab_c_key (c) = (4)'
+);
+
+# Truncate table to get rid of the error
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+##################################################
+# Test multiple_unique_conflicts due to UPDATE
+##################################################
+$log_offset = -s $node_subscriber->logfile;
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (5,5,5);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (6,6,6), (7,7,7), (8,8,8);");
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE conf_tab set a=6, b=7, c=8 where a=5;");
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+ qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ $log_offset);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(6\); existing local tuple \(6, 6, 6\); remote tuple \(6, 7, 8\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during update for conf_tab_pkey (a) = (6)'
+);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(7\); existing local tuple \(7, 7, 7\); remote tuple \(6, 7, 8\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during update for conf_tab_b_key (b) = (7)'
+);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_c_key\".*\n.*Key \(c\)=\(8\); existing local tuple \(8, 8, 8\); remote tuple \(6, 7, 8\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during update for conf_tab_c_key (c) = (8)'
+);
+
+done_testing();
--
2.34.1
On Wed, Mar 19, 2025 at 11:11 AM Nisha Moond <nisha.moond412@gmail.com> wrote:
Please find the attached v5-0001 patch without the stats part.
Review:
=======
1.
+ foreach_ptr(TupleTableSlot, slot, conflictSlots)
+ {
+ indexoid = lfirst_oid(list_nth_cell(conflictIndexes, conflictNum));
+
+ Assert(!OidIsValid(indexoid) ||
+ CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+
+ if (!localorigin)
+ GetTupleTransactionInfo(slot, &localxmin, &localorigin, &localts);
+
+ /*
+ * Build the error detail message containing the conflicting key and
+ * tuple information. The details for each conflict will be appended
+ * to err_detail.
+ */
+ errdetail_apply_conflict(estate, relinfo, type, searchslot,
+ slot, remoteslot, indexoid,
+ localxmin, localorigin, localts, &err_detail);
+
+ conflictNum++;
+ }
Won't this get TupleTransactionInfo only for the first conflicting
tuple instead of getting it separately for each tuple?
2. We make an array/list of local tuple's origin info from the caller
and send it to this ReportApplyConflict. Additionally, we can use
do...while loop to avoid calling errdetail_apply_conflict multiple
times in the function ReportApplyConflict(). We break the loop if
there are no conflictSlots after generating the first detail message.
3. Related to the new test file 't/035_multiple_unique_conflicts.pl',
is it worth to add a separate test file only for
multiple_unique_conflicts? I see that for all other conflicts, we have
tests spread over existing tests where such conflict-related tests
were present. Shall we name it as t/035_conflicts.pl? That will allow
us to add more tests related to other conflicts like update_delete in
the same file?
--
With Regards,
Amit Kapila.
On Wed, Mar 19, 2025 at 4:18 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Wed, Mar 19, 2025 at 11:11 AM Nisha Moond <nisha.moond412@gmail.com> wrote:
Please find the attached v5-0001 patch without the stats part.
Review: ======= 1. + foreach_ptr(TupleTableSlot, slot, conflictSlots) + { + indexoid = lfirst_oid(list_nth_cell(conflictIndexes, conflictNum)); + + Assert(!OidIsValid(indexoid) || + CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); + + if (!localorigin) + GetTupleTransactionInfo(slot, &localxmin, &localorigin, &localts); + + /* + * Build the error detail message containing the conflicting key and + * tuple information. The details for each conflict will be appended + * to err_detail. + */ + errdetail_apply_conflict(estate, relinfo, type, searchslot, + slot, remoteslot, indexoid, + localxmin, localorigin, localts, &err_detail); + + conflictNum++; + }Won't this get TupleTransactionInfo only for the first conflicting
tuple instead of getting it separately for each tuple?
Fixed.
2. We make an array/list of local tuple's origin info from the caller
and send it to this ReportApplyConflict. Additionally, we can use
do...while loop to avoid calling errdetail_apply_conflict multiple
times in the function ReportApplyConflict(). We break the loop if
there are no conflictSlots after generating the first detail message.
Implemented the suggestions.
3. Related to the new test file 't/035_multiple_unique_conflicts.pl',
is it worth to add a separate test file only for
multiple_unique_conflicts? I see that for all other conflicts, we have
tests spread over existing tests where such conflict-related tests
were present. Shall we name it as t/035_conflicts.pl? That will allow
us to add more tests related to other conflicts like update_delete in
the same file?
+1, renamed the test file as t/035_conflicts.pl.
Attached is v6 patch with above comments addressed.
--
Thanks,
Nisha
Attachments:
v6-0001-Implement-the-conflict-detection-for-multiple_uni.patchapplication/octet-stream; name=v6-0001-Implement-the-conflict-detection-for-multiple_uni.patchDownload
From 88239bd809783173832bc98227007fb7ebb2b902 Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Tue, 18 Mar 2025 12:27:48 +0530
Subject: [PATCH v6] Implement the conflict detection for
multiple_unique_conflicts in logical replication
Introduce a new conflict type, multiple_unique_conflicts, to handle cases
where an incoming row during logical replication violates multiple UNIQUE
constraints.
Previously, the apply worker detected and reported only the first
encountered key conflict (insert_exists/update_exists), causing repeated
failures as each constraint violation need to be handled one by one making
the process slow and error-prone.
Now, the apply worker checks all unique constraints upfront and reports
multiple_unique_conflicts if multiple violations exist. This allows users
to resolve all conflicts at once by deleting all conflicting tuples rather
than dealing with them individually or skipping the transaction.
The CONFLICT_NUM_TYPES is not incremented since subscription stats do not
support multiple_unique_conflicts in this patch.
---
doc/src/sgml/logical-replication.sgml | 13 ++
src/backend/executor/execReplication.c | 34 +++++-
src/backend/replication/logical/conflict.c | 93 ++++++++++----
src/backend/replication/logical/worker.c | 36 +++---
src/include/replication/conflict.h | 10 +-
src/test/subscription/meson.build | 1 +
src/test/subscription/t/035_conflicts.pl | 133 +++++++++++++++++++++
7 files changed, 270 insertions(+), 50 deletions(-)
create mode 100644 src/test/subscription/t/035_conflicts.pl
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 3d18e507bbc..4817206af7d 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1877,6 +1877,19 @@ test_sub=# SELECT * from tab_gen_to_gen;
</para>
</listitem>
</varlistentry>
+ <varlistentry id="conflict-multiple-unique-conflicts" xreflabel="multiple_unique_conflicts">
+ <term><literal>multiple_unique_conflicts</literal></term>
+ <listitem>
+ <para>
+ Inserting a row or updating values of a row violates more than one
+ <literal>NOT DEFERRABLE</literal> unique constraint. Note that to log
+ the origin and commit timestamp details of the conflicting key,
+ <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+ should be enabled on the subscriber. In this case, an error will be
+ raised until the conflict is resolved manually.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
Note that there are other conflict scenarios, such as exclusion constraint
violations. Currently, we do not provide additional details for them in the
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 0a9b880d250..efd93cda69a 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -493,11 +493,17 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
ConflictType type, List *recheckIndexes,
TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
{
- /* Check all the unique indexes for a conflict */
+ int conflicts = 0;
+ List *conflictSlots = NIL;
+ List *conflictIndexes = NIL;
+ List *localxmins = NIL;
+ List *localorigins = NIL;
+ List *localts = NIL;
+ TupleTableSlot *conflictslot;
+
+ /* Check all the unique indexes for conflicts */
foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
{
- TupleTableSlot *conflictslot;
-
if (list_member_oid(recheckIndexes, uniqueidx) &&
FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
&conflictslot))
@@ -507,11 +513,27 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
TransactionId xmin;
GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
- ReportApplyConflict(estate, resultRelInfo, ERROR, type,
- searchslot, conflictslot, remoteslot,
- uniqueidx, xmin, origin, committs);
+
+ /*
+ * Add the conflict slot, index, and the transaction info to the
+ * respective lists.
+ */
+ conflictSlots = lappend(conflictSlots, conflictslot);
+ conflictIndexes = lappend_oid(conflictIndexes, uniqueidx);
+ localxmins = lappend_xid(localxmins, xmin);
+ localorigins = lappend_int(localorigins, origin);
+ localts = lappend(localts, DatumGetPointer(Int64GetDatum(committs)));
+
+ conflicts++;
}
}
+
+ /* Report the conflict if found */
+ if (conflicts)
+ ReportApplyConflict(estate, resultRelInfo, ERROR,
+ conflicts > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
+ searchslot, conflictSlots, remoteslot,
+ conflictIndexes, localxmins, localorigins, localts);
}
/*
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 772fc83e88b..2074ef17321 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -29,11 +29,12 @@ static const char *const ConflictTypeNames[] = {
[CT_UPDATE_EXISTS] = "update_exists",
[CT_UPDATE_MISSING] = "update_missing",
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
- [CT_DELETE_MISSING] = "delete_missing"
+ [CT_DELETE_MISSING] = "delete_missing",
+ [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
static int errcode_apply_conflict(ConflictType type);
-static int errdetail_apply_conflict(EState *estate,
+static void errdetail_apply_conflict(EState *estate,
ResultRelInfo *relinfo,
ConflictType type,
TupleTableSlot *searchslot,
@@ -41,7 +42,7 @@ static int errdetail_apply_conflict(EState *estate,
TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
RepOriginId localorigin,
- TimestampTz localts);
+ TimestampTz localts, StringInfo err_msg);
static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
ConflictType type,
TupleTableSlot *searchslot,
@@ -90,15 +91,15 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
* 'searchslot' should contain the tuple used to search the local tuple to be
* updated or deleted.
*
- * 'localslot' should contain the existing local tuple, if any, that conflicts
- * with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the
- * transaction information related to this existing local tuple.
+ * 'conflictSlots' list contain the existing local tuples, if any, that
+ * conflicts with the remote tuple. 'localxmins', 'localorigins', and 'localts'
+ * provide the transaction information related to the existing local tuples.
*
* 'remoteslot' should contain the remote new tuple, if any.
*
- * The 'indexoid' represents the OID of the unique index that triggered the
- * constraint violation error. We use this to report the key values for
- * conflicting tuple.
+ * The 'conflictIndexes' list represents the OIDs of the unique index that
+ * triggered the constraint violation error. We use this to report the key
+ * values for conflicting tuple.
*
* The caller must ensure that the index with the OID 'indexoid' is locked so
* that we can fetch and display the conflicting key value.
@@ -106,16 +107,62 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
void
ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictType type, TupleTableSlot *searchslot,
- TupleTableSlot *localslot, TupleTableSlot *remoteslot,
- Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts)
+ List *conflictSlots, TupleTableSlot *remoteslot,
+ List *conflictIndexes, List *localxmins,
+ List *localorigins, List *localts)
{
+ int conflictNum = 0;
Relation localrel = relinfo->ri_RelationDesc;
+ ListCell *slotlc = list_head(conflictSlots);
+ StringInfoData err_detail;
+
+ initStringInfo(&err_detail);
+
+ do
+ {
+ Oid indexoid = InvalidOid;
+ TimestampTz committs = 0;
+ RepOriginId origin = InvalidRepOriginId;
+ TransactionId xmin = InvalidTransactionId;
+ TupleTableSlot *slot = NULL;
+
+ if (slotlc)
+ {
+ Assert(localxmins && localorigins && localts);
+
+ slot = lfirst(slotlc);
+ origin = list_nth_int(localorigins, conflictNum);
+ xmin = lfirst_xid(list_nth_cell(localxmins, conflictNum));
+ committs = (TimestampTz) lfirst(list_nth_cell(localts, conflictNum));
+
+ slotlc = lnext(conflictSlots, slotlc);
+ }
+
+ if (conflictIndexes)
+ indexoid = list_nth_oid(conflictIndexes, conflictNum);
+
+ Assert(!OidIsValid(indexoid) ||
+ CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
- Assert(!OidIsValid(indexoid) ||
- CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+ /*
+ * Build the error detail message containing the conflicting key and
+ * tuple information. The details for each conflict will be appended
+ * to err_detail.
+ */
+ errdetail_apply_conflict(estate, relinfo, type, searchslot,
+ slot, remoteslot, indexoid,
+ xmin, origin, committs, &err_detail);
- pgstat_report_subscription_conflict(MySubscription->oid, type);
+ conflictNum++;
+
+ } while (slotlc);
+
+ /* Conflict stats are not gathered for multiple_unique_conflicts */
+ if (type != CT_MULTIPLE_UNIQUE_CONFLICTS)
+ pgstat_report_subscription_conflict(MySubscription->oid, type);
+
+ /* Remove the extra newline at the end of err_detail */
+ err_detail.data[err_detail.len - 1] = '\0';
ereport(elevel,
errcode_apply_conflict(type),
@@ -123,9 +170,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
get_namespace_name(RelationGetNamespace(localrel)),
RelationGetRelationName(localrel),
ConflictTypeNames[type]),
- errdetail_apply_conflict(estate, relinfo, type, searchslot,
- localslot, remoteslot, indexoid,
- localxmin, localorigin, localts));
+ errdetail_internal("%s", err_detail.data));
}
/*
@@ -169,6 +214,7 @@ errcode_apply_conflict(ConflictType type)
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
return errcode(ERRCODE_UNIQUE_VIOLATION);
case CT_UPDATE_ORIGIN_DIFFERS:
case CT_UPDATE_MISSING:
@@ -191,12 +237,13 @@ errcode_apply_conflict(ConflictType type)
* replica identity columns, if any. The remote old tuple is excluded as its
* information is covered in the replica identity columns.
*/
-static int
+static void
errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
ConflictType type, TupleTableSlot *searchslot,
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts)
+ RepOriginId localorigin, TimestampTz localts,
+ StringInfo err_msg)
{
StringInfoData err_detail;
char *val_desc;
@@ -209,6 +256,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
Assert(OidIsValid(indexoid));
if (localts)
@@ -291,7 +339,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
if (val_desc)
appendStringInfo(&err_detail, "\n%s", val_desc);
- return errdetail_internal("%s", err_detail.data);
+ appendStringInfo(err_msg, "%s\n", err_detail.data);
}
/*
@@ -323,7 +371,8 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
* Report the conflicting key values in the case of a unique constraint
* violation.
*/
- if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
+ if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
+ type == CT_MULTIPLE_UNIQUE_CONFLICTS)
{
Assert(OidIsValid(indexoid) && localslot);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 31ab69ea13a..4a595e7906f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2711,8 +2711,10 @@ apply_handle_update_internal(ApplyExecutionData *edata,
slot_store_data(newslot, relmapentry, newtup);
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
- remoteslot, localslot, newslot,
- InvalidOid, localxmin, localorigin, localts);
+ remoteslot, list_make1(localslot), newslot,
+ NULL, list_make1_xid(localxmin),
+ list_make1_int(localorigin),
+ list_make1(DatumGetPointer(Int64GetDatum(localts))));
}
/* Process and store remote tuple in the slot */
@@ -2741,9 +2743,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
* emitting a log message.
*/
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
- remoteslot, NULL, newslot,
- InvalidOid, InvalidTransactionId,
- InvalidRepOriginId, 0);
+ remoteslot, NULL, newslot, NULL,
+ NULL, NULL, NULL);
}
/* Cleanup. */
@@ -2887,8 +2888,10 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
localorigin != replorigin_session_origin)
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
- remoteslot, localslot, NULL,
- InvalidOid, localxmin, localorigin, localts);
+ remoteslot, list_make1(localslot), NULL,
+ NULL, list_make1_xid(localxmin),
+ list_make1_int(localorigin),
+ list_make1(DatumGetPointer(Int64GetDatum(localts))));
EvalPlanQualSetSlot(&epqstate, localslot);
@@ -2903,9 +2906,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
* emitting a log message.
*/
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
- remoteslot, NULL, NULL,
- InvalidOid, InvalidTransactionId,
- InvalidRepOriginId, 0);
+ remoteslot, NULL, NULL, NULL, NULL, NULL, NULL);
}
/* Cleanup. */
@@ -3093,11 +3094,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
* The tuple to be updated could not be found. Do nothing
* except for emitting a log message.
*/
- ReportApplyConflict(estate, partrelinfo,
- LOG, CT_UPDATE_MISSING,
- remoteslot_part, NULL, newslot,
- InvalidOid, InvalidTransactionId,
- InvalidRepOriginId, 0);
+ ReportApplyConflict(estate, partrelinfo, LOG,
+ CT_UPDATE_MISSING, remoteslot_part,
+ NULL, newslot, NULL, NULL, NULL, NULL);
return;
}
@@ -3116,9 +3115,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
slot_store_data(newslot, part_entry, newtup);
ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
- remoteslot_part, localslot, newslot,
- InvalidOid, localxmin, localorigin,
- localts);
+ remoteslot_part, list_make1(localslot),
+ newslot, NULL, list_make1_xid(localxmin),
+ list_make1_int(localorigin),
+ list_make1(DatumGetPointer(Int64GetDatum(localts))));
}
/*
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 37454dc9513..bba7f7156b5 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -41,6 +41,9 @@ typedef enum
/* The row to be deleted is missing */
CT_DELETE_MISSING,
+ /* The row to be inserted/updated violates multiple unique constraint */
+ CT_MULTIPLE_UNIQUE_CONFLICTS,
+
/*
* Other conflicts, such as exclusion constraint violations, involve more
* complex rules than simple equality checks. These conflicts are left for
@@ -57,10 +60,9 @@ extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
int elevel, ConflictType type,
TupleTableSlot *searchslot,
- TupleTableSlot *localslot,
+ List *conflictSlots,
TupleTableSlot *remoteslot,
- Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts);
+ List *conflictIndexes, List *localxmin,
+ List *localorigin, List *localts);
extern void InitConflictIndexes(ResultRelInfo *relInfo);
-
#endif
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index d40b49714f6..586ffba434e 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -41,6 +41,7 @@ tests += {
't/032_subscribe_use_index.pl',
't/033_run_as_table_owner.pl',
't/034_temporal.pl',
+ 't/035_conflicts.pl',
't/100_bugs.pl',
],
},
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
new file mode 100644
index 00000000000..f1417e313db
--- /dev/null
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -0,0 +1,133 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection of conflict type 'multiple_unique_conflicts'.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################
+# Setup
+###############################
+
+# Create a publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create a subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create a table on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE conf_tab (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);");
+
+# Create same table on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE conf_tab (a int PRIMARY key, b int unique, c int unique);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub_tab FOR TABLE conf_tab");
+
+# Create the subscription
+my $appname = 'sub_tab';
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE SUBSCRIPTION sub_tab
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION pub_tab;");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+##################################################
+# INSERT data on Pub and Sub
+##################################################
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (1,1,1);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,2,2), (3,3,3), (4,4,4);");
+
+##################################################
+# Test multiple_unique_conflicts due to INSERT
+##################################################
+my $log_offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,3,4);");
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+ qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ $log_offset);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(2\); existing local tuple \(2, 2, 2\); remote tuple \(2, 3, 4\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during insertion for conf_tab_pkey (a) = (2)'
+);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(3\); existing local tuple \(3, 3, 3\); remote tuple \(2, 3, 4\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during insertion for conf_tab_b_key (b) = (3)'
+);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_c_key\".*\n.*Key \(c\)=\(4\); existing local tuple \(4, 4, 4\); remote tuple \(2, 3, 4\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during insertion for conf_tab_c_key (c) = (4)'
+);
+
+# Truncate table to get rid of the error
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+##################################################
+# Test multiple_unique_conflicts due to UPDATE
+##################################################
+$log_offset = -s $node_subscriber->logfile;
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (5,5,5);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (6,6,6), (7,7,7), (8,8,8);");
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE conf_tab set a=6, b=7, c=8 where a=5;");
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+ qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ $log_offset);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(6\); existing local tuple \(6, 6, 6\); remote tuple \(6, 7, 8\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during update for conf_tab_pkey (a) = (6)'
+);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(7\); existing local tuple \(7, 7, 7\); remote tuple \(6, 7, 8\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during update for conf_tab_b_key (b) = (7)'
+);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_c_key\".*\n.*Key \(c\)=\(8\); existing local tuple \(8, 8, 8\); remote tuple \(6, 7, 8\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during update for conf_tab_c_key (c) = (8)'
+);
+
+done_testing();
--
2.34.1
On Thu, Mar 20, 2025 at 3:06 PM Nisha Moond wrote:
Attached is v6 patch with above comments addressed.
Thanks updating the patch. I have some comments:
1.
The naming style of variables changed in this function seems a bit Inconsistent
with existing ones, I feel we'd better use similar style, e.g., conflictSlots => conflictslots
I included the suggested changes in 0001.
ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictType type, TupleTableSlot *searchslot,
- TupleTableSlot *localslot, TupleTableSlot *remoteslot,
- Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts)
+ List *conflictSlots, TupleTableSlot *remoteslot,
+ List *conflictIndexes, List *localxmins,
+ List *localorigins, List *localts)
2.
I modified the documents a bit for consistency. Please see 0001
attachment.
3.
I have been thinking whether the codes in ReportApplyConflict() can be improved
further, e.g., avoid the extra checks in do while().
One idea could be that each caller of
ReportApplyConflict() can always pass a valid list for all
list-parameter(e.g., conflictIndexes, localxmins ...). And for the cases when the
caller could not provide a valid item, it would save an invalid value
in the list and pass it to the function.
In this approach, ReportApplyConflict() seems cleaner. I am sharing a POC diff
(0002) for reference, it can pass regression test, but I have not confirmed
every case yet.
4.
+ origin = list_nth_int(localorigins, conflictNum);
...
+ localts = lappend(localts, DatumGetPointer(Int64GetDatum(committs)));
I personally feel this could be improved, because
1) RepOriginId, being a 16-bit value, is smaller than an int, which might not
cause issues but appears somewhat odd when storing a 32-bit value within it;
2) The approach used to store 'committs' seems inelegant (and I didn't find
precedents).
An alternative approach is to introduce a new structure, ConflictTupleInfo,
containing items like slot, origin, committs, and xmin for list integration.
This way, the code could be simpler, and we can avoid the above coding. Please
see 0003 for reference. (Note that some comments in this patch could be
improved)
Best Regards,
Hou zj
Attachments:
0001-cosmetic-changes.patch.txttext/plain; name=0001-cosmetic-changes.patch.txtDownload
From 66dab4059c010435d41fa7b548e7d4cf43e6e1be Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 20 Mar 2025 18:52:59 +0800
Subject: [PATCH] cosmetic changes
---
doc/src/sgml/logical-replication.sgml | 12 ++++++------
src/backend/executor/execReplication.c | 12 ++++++------
src/backend/replication/logical/conflict.c | 8 ++++----
src/include/replication/conflict.h | 6 +++---
4 files changed, 19 insertions(+), 19 deletions(-)
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 4817206af7d..518520d3ff4 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1881,12 +1881,12 @@ test_sub=# SELECT * from tab_gen_to_gen;
<term><literal>multiple_unique_conflicts</literal></term>
<listitem>
<para>
- Inserting a row or updating values of a row violates more than one
- <literal>NOT DEFERRABLE</literal> unique constraint. Note that to log
- the origin and commit timestamp details of the conflicting key,
- <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
- should be enabled on the subscriber. In this case, an error will be
- raised until the conflict is resolved manually.
+ Inserting or updating a row violates multiple
+ <literal>NOT DEFERRABLE</literal> unique constraints. Note that to log
+ the origin and commit timestamp details of conflicting keys, ensure
+ that <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+ is enabled on the subscriber. In this case, an error will be raised until
+ the conflict is resolved manually.
</para>
</listitem>
</varlistentry>
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index efd93cda69a..a2a0ab90ab0 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -494,8 +494,8 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
{
int conflicts = 0;
- List *conflictSlots = NIL;
- List *conflictIndexes = NIL;
+ List *conflictslots = NIL;
+ List *conflictindexes = NIL;
List *localxmins = NIL;
List *localorigins = NIL;
List *localts = NIL;
@@ -518,8 +518,8 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
* Add the conflict slot, index, and the transaction info to the
* respective lists.
*/
- conflictSlots = lappend(conflictSlots, conflictslot);
- conflictIndexes = lappend_oid(conflictIndexes, uniqueidx);
+ conflictslots = lappend(conflictslots, conflictslot);
+ conflictindexes = lappend_oid(conflictindexes, uniqueidx);
localxmins = lappend_xid(localxmins, xmin);
localorigins = lappend_int(localorigins, origin);
localts = lappend(localts, DatumGetPointer(Int64GetDatum(committs)));
@@ -532,8 +532,8 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
if (conflicts)
ReportApplyConflict(estate, resultRelInfo, ERROR,
conflicts > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
- searchslot, conflictSlots, remoteslot,
- conflictIndexes, localxmins, localorigins, localts);
+ searchslot, conflictslots, remoteslot,
+ conflictindexes, localxmins, localorigins, localts);
}
/*
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 2074ef17321..4b50467373d 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -91,13 +91,13 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
* 'searchslot' should contain the tuple used to search the local tuple to be
* updated or deleted.
*
- * 'conflictSlots' list contain the existing local tuples, if any, that
+ * 'conflictslots' list contains the existing local tuples, if any, that
* conflicts with the remote tuple. 'localxmins', 'localorigins', and 'localts'
* provide the transaction information related to the existing local tuples.
*
* 'remoteslot' should contain the remote new tuple, if any.
*
- * The 'conflictIndexes' list represents the OIDs of the unique index that
+ * The 'conflictindexes' list represents the OIDs of the unique index that
* triggered the constraint violation error. We use this to report the key
* values for conflicting tuple.
*
@@ -107,8 +107,8 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
void
ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictType type, TupleTableSlot *searchslot,
- List *conflictSlots, TupleTableSlot *remoteslot,
- List *conflictIndexes, List *localxmins,
+ List *conflictslots, TupleTableSlot *remoteslot,
+ List *conflictindexes, List *localxmins,
List *localorigins, List *localts)
{
int conflictNum = 0;
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index bba7f7156b5..f32c3266719 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -60,9 +60,9 @@ extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
int elevel, ConflictType type,
TupleTableSlot *searchslot,
- List *conflictSlots,
+ List *conflictslots,
TupleTableSlot *remoteslot,
- List *conflictIndexes, List *localxmin,
- List *localorigin, List *localts);
+ List *conflictindexes, List *localxmins,
+ List *localorigins, List *localts);
extern void InitConflictIndexes(ResultRelInfo *relInfo);
#endif
--
2.30.0.windows.2
0002-refactor-code.patch.txttext/plain; name=0002-refactor-code.patch.txtDownload
From 1b809742f1de7a87e159ac8b4f120e66ed7172ce Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 20 Mar 2025 18:55:11 +0800
Subject: [PATCH 2/3] refactor code
---
src/backend/replication/logical/conflict.c | 71 +++++++++-------------
src/backend/replication/logical/worker.c | 55 ++++++++++++-----
2 files changed, 68 insertions(+), 58 deletions(-)
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 84ef4648747..9beded483d4 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -111,59 +111,36 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
List *conflictindexes, List *localxmins,
List *localorigins, List *localts)
{
- int conflictNum = 0;
Relation localrel = relinfo->ri_RelationDesc;
- ListCell *slotlc = list_head(conflictSlots);
StringInfoData err_detail;
+ ListCell *lslot;
+ ListCell *lindex;
+ ListCell *lxmin;
+ ListCell *lorigin;
+ ListCell *lts;
initStringInfo(&err_detail);
- do
+ /*
+ * Iterate over conflicting tuples, along with their commit timestamps,
+ * origins, and the conflicting indexes to assemble an errdetail() line.
+ */
+ forfive(lslot, conflictslots, lindex, conflictindexes, lxmin, localxmins,
+ lorigin, localorigins, lts, localts)
{
- Oid indexoid = InvalidOid;
- TimestampTz committs = 0;
- RepOriginId origin = InvalidRepOriginId;
- TransactionId xmin = InvalidTransactionId;
- TupleTableSlot *slot = NULL;
-
- if (slotlc)
- {
- Assert(localxmins && localorigins && localts);
-
- slot = lfirst(slotlc);
- origin = list_nth_int(localorigins, conflictNum);
- xmin = lfirst_xid(list_nth_cell(localxmins, conflictNum));
- committs = (TimestampTz) lfirst(list_nth_cell(localts, conflictNum));
-
- slotlc = lnext(conflictSlots, slotlc);
- }
-
- if (conflictIndexes)
- indexoid = list_nth_oid(conflictIndexes, conflictNum);
-
- Assert(!OidIsValid(indexoid) ||
- CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
-
- /*
- * Build the error detail message containing the conflicting key and
- * tuple information. The details for each conflict will be appended
- * to err_detail.
- */
errdetail_apply_conflict(estate, relinfo, type, searchslot,
- slot, remoteslot, indexoid,
- xmin, origin, committs, &err_detail);
-
- conflictNum++;
-
- } while (slotlc);
+ lfirst(lslot), remoteslot,
+ lfirst_oid(lindex),
+ lfirst_xid(lxmin),
+ lfirst_int(lorigin),
+ (TimestampTz) lfirst(lts),
+ &err_detail);
+ }
/* Conflict stats are not gathered for multiple_unique_conflicts */
if (type != CT_MULTIPLE_UNIQUE_CONFLICTS)
pgstat_report_subscription_conflict(MySubscription->oid, type);
- /* Remove the extra newline at the end of err_detail */
- err_detail.data[err_detail.len - 1] = '\0';
-
ereport(elevel,
errcode_apply_conflict(type),
errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
@@ -257,7 +234,8 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
case CT_MULTIPLE_UNIQUE_CONFLICTS:
- Assert(OidIsValid(indexoid));
+ Assert(OidIsValid(indexoid) &&
+ CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
if (localts)
{
@@ -339,7 +317,14 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
if (val_desc)
appendStringInfo(&err_detail, "\n%s", val_desc);
- appendStringInfo(err_msg, "%s\n", err_detail.data);
+ /*
+ * Insert a blank line to visually separate the new detail line from the
+ * existing ones.
+ */
+ if (err_msg->len > 0)
+ appendStringInfoChar(err_msg, '\n');
+
+ appendStringInfo(err_msg, "%s", err_detail.data);
}
/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4a595e7906f..c19c4f938db 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2674,7 +2674,11 @@ apply_handle_update_internal(ApplyExecutionData *edata,
LogicalRepRelMapEntry *relmapentry = edata->targetRel;
Relation localrel = relinfo->ri_RelationDesc;
EPQState epqstate;
- TupleTableSlot *localslot;
+ TupleTableSlot *localslot = NULL;
+ Oid conflictindex = InvalidOid;
+ RepOriginId localorigin = InvalidRepOriginId;
+ TransactionId localxmin = InvalidTransactionId;
+ TimestampTz localts = 0;
bool found;
MemoryContext oldctx;
@@ -2693,10 +2697,6 @@ apply_handle_update_internal(ApplyExecutionData *edata,
*/
if (found)
{
- RepOriginId localorigin;
- TransactionId localxmin;
- TimestampTz localts;
-
/*
* Report the conflict if the tuple was modified by a different
* origin.
@@ -2712,7 +2712,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
remoteslot, list_make1(localslot), newslot,
- NULL, list_make1_xid(localxmin),
+ list_make1_oid(conflictindex),
+ list_make1_xid(localxmin),
list_make1_int(localorigin),
list_make1(DatumGetPointer(Int64GetDatum(localts))));
}
@@ -2735,6 +2736,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
{
TupleTableSlot *newslot = localslot;
+ localslot = NULL;
+
/* Store the new tuple for conflict reporting */
slot_store_data(newslot, relmapentry, newtup);
@@ -2743,8 +2746,11 @@ apply_handle_update_internal(ApplyExecutionData *edata,
* emitting a log message.
*/
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
- remoteslot, NULL, newslot, NULL,
- NULL, NULL, NULL);
+ remoteslot, list_make1(localslot), newslot,
+ list_make1_oid(conflictindex),
+ list_make1_xid(localxmin),
+ list_make1_int(localorigin),
+ list_make1(DatumGetPointer(Int64GetDatum(localts))));
}
/* Cleanup. */
@@ -2862,6 +2868,10 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
EPQState epqstate;
TupleTableSlot *localslot;
+ Oid conflictindex = InvalidOid;
+ RepOriginId localorigin = InvalidRepOriginId;
+ TransactionId localxmin = InvalidTransactionId;
+ TimestampTz localts = 0;
bool found;
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
@@ -2889,7 +2899,8 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
localorigin != replorigin_session_origin)
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
remoteslot, list_make1(localslot), NULL,
- NULL, list_make1_xid(localxmin),
+ list_make1_oid(conflictindex),
+ list_make1_xid(localxmin),
list_make1_int(localorigin),
list_make1(DatumGetPointer(Int64GetDatum(localts))));
@@ -2901,12 +2912,18 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
}
else
{
+ localslot = NULL;
+
/*
* The tuple to be deleted could not be found. Do nothing except for
* emitting a log message.
*/
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
- remoteslot, NULL, NULL, NULL, NULL, NULL, NULL);
+ remoteslot, list_make1(localslot), NULL,
+ list_make1_oid(conflictindex),
+ list_make1_xid(localxmin),
+ list_make1_int(localorigin),
+ list_make1(DatumGetPointer(Int64GetDatum(localts))));
}
/* Cleanup. */
@@ -3074,9 +3091,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
Relation partrel_new;
bool found;
EPQState epqstate;
- RepOriginId localorigin;
- TransactionId localxmin;
- TimestampTz localts;
+ RepOriginId localorigin = InvalidRepOriginId;
+ TransactionId localxmin = InvalidTransactionId;
+ TimestampTz localts = 0;
+ Oid conflictindex = InvalidOid;
/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel(edata, partrel,
@@ -3087,6 +3105,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
{
TupleTableSlot *newslot = localslot;
+ localslot = NULL;
+
/* Store the new tuple for conflict reporting */
slot_store_data(newslot, part_entry, newtup);
@@ -3096,7 +3116,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
*/
ReportApplyConflict(estate, partrelinfo, LOG,
CT_UPDATE_MISSING, remoteslot_part,
- NULL, newslot, NULL, NULL, NULL, NULL);
+ list_make1(localslot), newslot,
+ list_make1_oid(conflictindex),
+ list_make1_xid(localxmin),
+ list_make1_int(localorigin),
+ list_make1(DatumGetPointer(Int64GetDatum(localts))));
return;
}
@@ -3116,7 +3140,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
remoteslot_part, list_make1(localslot),
- newslot, NULL, list_make1_xid(localxmin),
+ newslot, list_make1_oid(conflictindex),
+ list_make1_xid(localxmin),
list_make1_int(localorigin),
list_make1(DatumGetPointer(Int64GetDatum(localts))));
}
--
2.30.0.windows.2
0003-add-a-struct.patch.txttext/plain; name=0003-add-a-struct.patch.txtDownload
From cf93ea5272c3ea225af1a52b979a21a1bcb00c95 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 20 Mar 2025 19:20:17 +0800
Subject: [PATCH 3/3] add a struct
---
src/backend/executor/execReplication.c | 33 +++-----
src/backend/replication/logical/conflict.c | 17 ++---
src/backend/replication/logical/worker.c | 87 ++++++++--------------
src/include/replication/conflict.h | 17 ++++-
4 files changed, 59 insertions(+), 95 deletions(-)
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index a2a0ab90ab0..bf352f69e2f 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -493,12 +493,7 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
ConflictType type, List *recheckIndexes,
TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
{
- int conflicts = 0;
- List *conflictslots = NIL;
- List *conflictindexes = NIL;
- List *localxmins = NIL;
- List *localorigins = NIL;
- List *localts = NIL;
+ List *conflicttuples = NIL;
TupleTableSlot *conflictslot;
/* Check all the unique indexes for conflicts */
@@ -508,32 +503,22 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
&conflictslot))
{
- RepOriginId origin;
- TimestampTz committs;
- TransactionId xmin;
+ ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);
- GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
+ conflicttuple->slot = conflictslot;
- /*
- * Add the conflict slot, index, and the transaction info to the
- * respective lists.
- */
- conflictslots = lappend(conflictslots, conflictslot);
- conflictindexes = lappend_oid(conflictindexes, uniqueidx);
- localxmins = lappend_xid(localxmins, xmin);
- localorigins = lappend_int(localorigins, origin);
- localts = lappend(localts, DatumGetPointer(Int64GetDatum(committs)));
+ GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
+ &conflicttuple->origin, &conflicttuple->ts);
- conflicts++;
+ conflicttuples = lappend(conflicttuples, conflicttuple);
}
}
/* Report the conflict if found */
- if (conflicts)
+ if (conflicttuples)
ReportApplyConflict(estate, resultRelInfo, ERROR,
- conflicts > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
- searchslot, conflictslots, remoteslot,
- conflictindexes, localxmins, localorigins, localts);
+ list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
+ searchslot, remoteslot, conflicttuples);
}
/*
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 9beded483d4..10f07e18a4b 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -107,9 +107,7 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
void
ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictType type, TupleTableSlot *searchslot,
- List *conflictslots, TupleTableSlot *remoteslot,
- List *conflictindexes, List *localxmins,
- List *localorigins, List *localts)
+ TupleTableSlot *remoteslot, List *conflicttuples)
{
Relation localrel = relinfo->ri_RelationDesc;
StringInfoData err_detail;
@@ -125,15 +123,14 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
* Iterate over conflicting tuples, along with their commit timestamps,
* origins, and the conflicting indexes to assemble an errdetail() line.
*/
- forfive(lslot, conflictslots, lindex, conflictindexes, lxmin, localxmins,
- lorigin, localorigins, lts, localts)
+ foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
{
errdetail_apply_conflict(estate, relinfo, type, searchslot,
- lfirst(lslot), remoteslot,
- lfirst_oid(lindex),
- lfirst_xid(lxmin),
- lfirst_int(lorigin),
- (TimestampTz) lfirst(lts),
+ conflicttuple->slot, remoteslot,
+ conflicttuple->indexoid,
+ conflicttuple->xmin,
+ conflicttuple->origin,
+ conflicttuple->ts,
&err_detail);
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c19c4f938db..058c353f992 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2675,10 +2675,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
Relation localrel = relinfo->ri_RelationDesc;
EPQState epqstate;
TupleTableSlot *localslot = NULL;
- Oid conflictindex = InvalidOid;
- RepOriginId localorigin = InvalidRepOriginId;
- TransactionId localxmin = InvalidTransactionId;
- TimestampTz localts = 0;
+ ConflictTupleInfo conflicttuple = {0};
bool found;
MemoryContext oldctx;
@@ -2701,8 +2698,9 @@ apply_handle_update_internal(ApplyExecutionData *edata,
* Report the conflict if the tuple was modified by a different
* origin.
*/
- if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
- localorigin != replorigin_session_origin)
+ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
+ &conflicttuple.origin, &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
{
TupleTableSlot *newslot;
@@ -2710,12 +2708,11 @@ apply_handle_update_internal(ApplyExecutionData *edata,
newslot = table_slot_create(localrel, &estate->es_tupleTable);
slot_store_data(newslot, relmapentry, newtup);
+ conflicttuple.slot = localslot;
+
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
- remoteslot, list_make1(localslot), newslot,
- list_make1_oid(conflictindex),
- list_make1_xid(localxmin),
- list_make1_int(localorigin),
- list_make1(DatumGetPointer(Int64GetDatum(localts))));
+ remoteslot, newslot,
+ list_make1(&conflicttuple));
}
/* Process and store remote tuple in the slot */
@@ -2736,8 +2733,6 @@ apply_handle_update_internal(ApplyExecutionData *edata,
{
TupleTableSlot *newslot = localslot;
- localslot = NULL;
-
/* Store the new tuple for conflict reporting */
slot_store_data(newslot, relmapentry, newtup);
@@ -2746,11 +2741,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
* emitting a log message.
*/
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
- remoteslot, list_make1(localslot), newslot,
- list_make1_oid(conflictindex),
- list_make1_xid(localxmin),
- list_make1_int(localorigin),
- list_make1(DatumGetPointer(Int64GetDatum(localts))));
+ remoteslot, newslot, list_make1(&conflicttuple));
}
/* Cleanup. */
@@ -2868,10 +2859,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
EPQState epqstate;
TupleTableSlot *localslot;
- Oid conflictindex = InvalidOid;
- RepOriginId localorigin = InvalidRepOriginId;
- TransactionId localxmin = InvalidTransactionId;
- TimestampTz localts = 0;
+ ConflictTupleInfo conflicttuple = {0};
bool found;
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
@@ -2887,22 +2875,19 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
/* If found delete it. */
if (found)
{
- RepOriginId localorigin;
- TransactionId localxmin;
- TimestampTz localts;
-
/*
* Report the conflict if the tuple was modified by a different
* origin.
*/
- if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
- localorigin != replorigin_session_origin)
+ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
+ &conflicttuple.origin, &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
+ {
+ conflicttuple.slot = localslot;
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
- remoteslot, list_make1(localslot), NULL,
- list_make1_oid(conflictindex),
- list_make1_xid(localxmin),
- list_make1_int(localorigin),
- list_make1(DatumGetPointer(Int64GetDatum(localts))));
+ remoteslot, NULL,
+ list_make1(&conflicttuple));
+ }
EvalPlanQualSetSlot(&epqstate, localslot);
@@ -2912,18 +2897,12 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
}
else
{
- localslot = NULL;
-
/*
* The tuple to be deleted could not be found. Do nothing except for
* emitting a log message.
*/
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
- remoteslot, list_make1(localslot), NULL,
- list_make1_oid(conflictindex),
- list_make1_xid(localxmin),
- list_make1_int(localorigin),
- list_make1(DatumGetPointer(Int64GetDatum(localts))));
+ remoteslot, NULL, list_make1(&conflicttuple));
}
/* Cleanup. */
@@ -3091,10 +3070,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
Relation partrel_new;
bool found;
EPQState epqstate;
- RepOriginId localorigin = InvalidRepOriginId;
- TransactionId localxmin = InvalidTransactionId;
- TimestampTz localts = 0;
- Oid conflictindex = InvalidOid;
+ ConflictTupleInfo conflicttuple = {0};
/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel(edata, partrel,
@@ -3105,8 +3081,6 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
{
TupleTableSlot *newslot = localslot;
- localslot = NULL;
-
/* Store the new tuple for conflict reporting */
slot_store_data(newslot, part_entry, newtup);
@@ -3116,11 +3090,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
*/
ReportApplyConflict(estate, partrelinfo, LOG,
CT_UPDATE_MISSING, remoteslot_part,
- list_make1(localslot), newslot,
- list_make1_oid(conflictindex),
- list_make1_xid(localxmin),
- list_make1_int(localorigin),
- list_make1(DatumGetPointer(Int64GetDatum(localts))));
+ newslot, list_make1(&conflicttuple));
return;
}
@@ -3129,8 +3099,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
* Report the conflict if the tuple was modified by a
* different origin.
*/
- if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
- localorigin != replorigin_session_origin)
+ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
+ &conflicttuple.origin,
+ &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
{
TupleTableSlot *newslot;
@@ -3138,12 +3110,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
newslot = table_slot_create(partrel, &estate->es_tupleTable);
slot_store_data(newslot, part_entry, newtup);
+ conflicttuple.slot = localslot;
+
ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
- remoteslot_part, list_make1(localslot),
- newslot, list_make1_oid(conflictindex),
- list_make1_xid(localxmin),
- list_make1_int(localorigin),
- list_make1(DatumGetPointer(Int64GetDatum(localts))));
+ remoteslot_part, newslot,
+ list_make1(&conflicttuple));
}
/*
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index f32c3266719..976f1708095 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -53,6 +53,19 @@ typedef enum
#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+
+/*
+ * Information for the exiting local tuple that caused the conflict.
+ */
+typedef struct ConflictTupleInfo
+{
+ TupleTableSlot *slot;
+ Oid indexoid; /* conflicting index */
+ TransactionId xmin; /* transaction ID that modified the existing local tuple */
+ RepOriginId origin; /* which origin modified the exiting local tuple */
+ TimestampTz ts; /* when the exiting local tuple was modified by the origin */
+} ConflictTupleInfo;
+
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
TransactionId *xmin,
RepOriginId *localorigin,
@@ -60,9 +73,7 @@ extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
int elevel, ConflictType type,
TupleTableSlot *searchslot,
- List *conflictslots,
TupleTableSlot *remoteslot,
- List *conflictindexes, List *localxmins,
- List *localorigins, List *localts);
+ List *conflicttuples);
extern void InitConflictIndexes(ResultRelInfo *relInfo);
#endif
--
2.30.0.windows.2
On Thu, Mar 20, 2025 at 5:23 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:
On Thu, Mar 20, 2025 at 3:06 PM Nisha Moond wrote:
Attached is v6 patch with above comments addressed.
Thanks updating the patch. I have some comments:
1.
The naming style of variables changed in this function seems a bit Inconsistent
with existing ones, I feel we'd better use similar style, e.g., conflictSlots => conflictslotsI included the suggested changes in 0001.
ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, - TupleTableSlot *localslot, TupleTableSlot *remoteslot, - Oid indexoid, TransactionId localxmin, - RepOriginId localorigin, TimestampTz localts) + List *conflictSlots, TupleTableSlot *remoteslot, + List *conflictIndexes, List *localxmins, + List *localorigins, List *localts)2.
I modified the documents a bit for consistency. Please see 0001
attachment.3.
I have been thinking whether the codes in ReportApplyConflict() can be improved
further, e.g., avoid the extra checks in do while().One idea could be that each caller of
ReportApplyConflict() can always pass a valid list for all
list-parameter(e.g., conflictIndexes, localxmins ...). And for the cases when the
caller could not provide a valid item, it would save an invalid value
in the list and pass it to the function.In this approach, ReportApplyConflict() seems cleaner. I am sharing a POC diff
(0002) for reference, it can pass regression test, but I have not confirmed
every case yet.4.
+ origin = list_nth_int(localorigins, conflictNum); ... + localts = lappend(localts, DatumGetPointer(Int64GetDatum(committs)));I personally feel this could be improved, because
1) RepOriginId, being a 16-bit value, is smaller than an int, which might not
cause issues but appears somewhat odd when storing a 32-bit value within it;
2) The approach used to store 'committs' seems inelegant (and I didn't find
precedents).An alternative approach is to introduce a new structure, ConflictTupleInfo,
containing items like slot, origin, view.committs, and xmin for list integration.
This way, the code could be simpler, and we can avoid the above coding. Please
see 0003 for reference. (Note that some comments in this patch could be
improved)
Thanks, Hou-san, for the review and fix patches. I’ve incorporated
your suggestions.
Attached are the v7 patches, including patch 002, which implements
stats collection for 'multiple_unique_conflicts' in
pg_stat_subscription_stats.
--
Thanks,
Nisha
Attachments:
v7-0001-Implement-the-conflict-detection-for-multiple_uni.patchapplication/octet-stream; name=v7-0001-Implement-the-conflict-detection-for-multiple_uni.patchDownload
From fe30f45aa3690eee85e7b71c085840ede5ec4d92 Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Tue, 18 Mar 2025 12:27:48 +0530
Subject: [PATCH v7 1/2] Implement the conflict detection for
multiple_unique_conflicts in logical replication
Introduce a new conflict type, multiple_unique_conflicts, to handle cases
where an incoming row during logical replication violates multiple UNIQUE
constraints.
Previously, the apply worker detected and reported only the first
encountered key conflict (insert_exists/update_exists), causing repeated
failures as each constraint violation need to be handled one by one making
the process slow and error-prone.
Now, the apply worker checks all unique constraints upfront and reports
multiple_unique_conflicts if multiple violations exist. This allows users
to resolve all conflicts at once by deleting all conflicting tuples rather
than dealing with them individually or skipping the transaction.
The CONFLICT_NUM_TYPES is not incremented since subscription stats do not
support multiple_unique_conflicts in this patch.
---
doc/src/sgml/logical-replication.sgml | 13 ++
src/backend/executor/execReplication.c | 30 +++--
src/backend/replication/logical/conflict.c | 72 +++++++----
src/backend/replication/logical/worker.c | 68 +++++------
src/include/replication/conflict.h | 24 +++-
src/test/subscription/meson.build | 1 +
src/test/subscription/t/035_conflicts.pl | 133 +++++++++++++++++++++
7 files changed, 267 insertions(+), 74 deletions(-)
create mode 100644 src/test/subscription/t/035_conflicts.pl
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 3d18e507bbc..4637e898b9f 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1877,6 +1877,19 @@ test_sub=# SELECT * from tab_gen_to_gen;
</para>
</listitem>
</varlistentry>
+ <varlistentry id="conflict-multiple-unique-conflicts" xreflabel="multiple_unique_conflicts">
+ <term><literal>multiple_unique_conflicts</literal></term>
+ <listitem>
+ <para>
+ Inserting or updating a row violates multiple
+ <literal>NOT DEFERRABLE</literal> unique constraints. Note that to log
+ the origin and commit timestamp details of conflicting keys, ensure
+ that <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+ is enabled on the subscriber. In this case, an error will be raised until
+ the conflict is resolved manually.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
Note that there are other conflict scenarios, such as exclusion constraint
violations. Currently, we do not provide additional details for them in the
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 0a9b880d250..a07f7f09f32 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -493,25 +493,33 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
ConflictType type, List *recheckIndexes,
TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
{
- /* Check all the unique indexes for a conflict */
+ List *conflicttuples = NIL;
+ TupleTableSlot *conflictslot;
+
+ /* Check all the unique indexes for conflicts */
foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
{
- TupleTableSlot *conflictslot;
-
if (list_member_oid(recheckIndexes, uniqueidx) &&
FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
&conflictslot))
{
- RepOriginId origin;
- TimestampTz committs;
- TransactionId xmin;
-
- GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
- ReportApplyConflict(estate, resultRelInfo, ERROR, type,
- searchslot, conflictslot, remoteslot,
- uniqueidx, xmin, origin, committs);
+ ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);
+
+ conflicttuple->slot = conflictslot;
+ conflicttuple->indexoid = uniqueidx;
+
+ GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
+ &conflicttuple->origin, &conflicttuple->ts);
+
+ conflicttuples = lappend(conflicttuples, conflicttuple);
}
}
+
+ /* Report the conflict if found */
+ if (conflicttuples)
+ ReportApplyConflict(estate, resultRelInfo, ERROR,
+ list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
+ searchslot, remoteslot, conflicttuples);
}
/*
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 772fc83e88b..b1614d3aaf6 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -29,11 +29,12 @@ static const char *const ConflictTypeNames[] = {
[CT_UPDATE_EXISTS] = "update_exists",
[CT_UPDATE_MISSING] = "update_missing",
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
- [CT_DELETE_MISSING] = "delete_missing"
+ [CT_DELETE_MISSING] = "delete_missing",
+ [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
static int errcode_apply_conflict(ConflictType type);
-static int errdetail_apply_conflict(EState *estate,
+static void errdetail_apply_conflict(EState *estate,
ResultRelInfo *relinfo,
ConflictType type,
TupleTableSlot *searchslot,
@@ -41,7 +42,7 @@ static int errdetail_apply_conflict(EState *estate,
TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
RepOriginId localorigin,
- TimestampTz localts);
+ TimestampTz localts, StringInfo err_msg);
static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
ConflictType type,
TupleTableSlot *searchslot,
@@ -90,15 +91,15 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
* 'searchslot' should contain the tuple used to search the local tuple to be
* updated or deleted.
*
- * 'localslot' should contain the existing local tuple, if any, that conflicts
- * with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the
- * transaction information related to this existing local tuple.
+ * 'conflictslots' list contains the existing local tuples, if any, that
+ * conflicts with the remote tuple. 'localxmins', 'localorigins', and 'localts'
+ * provide the transaction information related to the existing local tuples.
*
* 'remoteslot' should contain the remote new tuple, if any.
*
- * The 'indexoid' represents the OID of the unique index that triggered the
- * constraint violation error. We use this to report the key values for
- * conflicting tuple.
+ * The 'conflictindexes' list represents the OIDs of the unique index that
+ * triggered the constraint violation error. We use this to report the key
+ * values for conflicting tuple.
*
* The caller must ensure that the index with the OID 'indexoid' is locked so
* that we can fetch and display the conflicting key value.
@@ -106,16 +107,31 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
void
ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictType type, TupleTableSlot *searchslot,
- TupleTableSlot *localslot, TupleTableSlot *remoteslot,
- Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts)
+ TupleTableSlot *remoteslot, List *conflicttuples)
{
Relation localrel = relinfo->ri_RelationDesc;
+ StringInfoData err_detail;
- Assert(!OidIsValid(indexoid) ||
- CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+ initStringInfo(&err_detail);
- pgstat_report_subscription_conflict(MySubscription->oid, type);
+ /*
+ * Iterate over conflicting tuples, along with their commit timestamps,
+ * origins, and the conflicting indexes to assemble an errdetail() line.
+ */
+ foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
+ {
+ errdetail_apply_conflict(estate, relinfo, type, searchslot,
+ conflicttuple->slot, remoteslot,
+ conflicttuple->indexoid,
+ conflicttuple->xmin,
+ conflicttuple->origin,
+ conflicttuple->ts,
+ &err_detail);
+ }
+
+ /* Conflict stats are not gathered for multiple_unique_conflicts */
+ if (type != CT_MULTIPLE_UNIQUE_CONFLICTS)
+ pgstat_report_subscription_conflict(MySubscription->oid, type);
ereport(elevel,
errcode_apply_conflict(type),
@@ -123,9 +139,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
get_namespace_name(RelationGetNamespace(localrel)),
RelationGetRelationName(localrel),
ConflictTypeNames[type]),
- errdetail_apply_conflict(estate, relinfo, type, searchslot,
- localslot, remoteslot, indexoid,
- localxmin, localorigin, localts));
+ errdetail_internal("%s", err_detail.data));
}
/*
@@ -169,6 +183,7 @@ errcode_apply_conflict(ConflictType type)
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
return errcode(ERRCODE_UNIQUE_VIOLATION);
case CT_UPDATE_ORIGIN_DIFFERS:
case CT_UPDATE_MISSING:
@@ -191,12 +206,13 @@ errcode_apply_conflict(ConflictType type)
* replica identity columns, if any. The remote old tuple is excluded as its
* information is covered in the replica identity columns.
*/
-static int
+static void
errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
ConflictType type, TupleTableSlot *searchslot,
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts)
+ RepOriginId localorigin, TimestampTz localts,
+ StringInfo err_msg)
{
StringInfoData err_detail;
char *val_desc;
@@ -209,7 +225,9 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
- Assert(OidIsValid(indexoid));
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
+ Assert(OidIsValid(indexoid) &&
+ CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
if (localts)
{
@@ -291,7 +309,14 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
if (val_desc)
appendStringInfo(&err_detail, "\n%s", val_desc);
- return errdetail_internal("%s", err_detail.data);
+ /*
+ * Insert a blank line to visually separate the new detail line from the
+ * existing ones.
+ */
+ if (err_msg->len > 0)
+ appendStringInfoChar(err_msg, '\n');
+
+ appendStringInfo(err_msg, "%s", err_detail.data);
}
/*
@@ -323,7 +348,8 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
* Report the conflicting key values in the case of a unique constraint
* violation.
*/
- if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
+ if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
+ type == CT_MULTIPLE_UNIQUE_CONFLICTS)
{
Assert(OidIsValid(indexoid) && localslot);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 31ab69ea13a..e3b2b144942 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2674,7 +2674,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
LogicalRepRelMapEntry *relmapentry = edata->targetRel;
Relation localrel = relinfo->ri_RelationDesc;
EPQState epqstate;
- TupleTableSlot *localslot;
+ TupleTableSlot *localslot = NULL;
+ ConflictTupleInfo conflicttuple = {0};
bool found;
MemoryContext oldctx;
@@ -2693,16 +2694,13 @@ apply_handle_update_internal(ApplyExecutionData *edata,
*/
if (found)
{
- RepOriginId localorigin;
- TransactionId localxmin;
- TimestampTz localts;
-
/*
* Report the conflict if the tuple was modified by a different
* origin.
*/
- if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
- localorigin != replorigin_session_origin)
+ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
+ &conflicttuple.origin, &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
{
TupleTableSlot *newslot;
@@ -2710,9 +2708,11 @@ apply_handle_update_internal(ApplyExecutionData *edata,
newslot = table_slot_create(localrel, &estate->es_tupleTable);
slot_store_data(newslot, relmapentry, newtup);
+ conflicttuple.slot = localslot;
+
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
- remoteslot, localslot, newslot,
- InvalidOid, localxmin, localorigin, localts);
+ remoteslot, newslot,
+ list_make1(&conflicttuple));
}
/* Process and store remote tuple in the slot */
@@ -2741,9 +2741,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
* emitting a log message.
*/
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
- remoteslot, NULL, newslot,
- InvalidOid, InvalidTransactionId,
- InvalidRepOriginId, 0);
+ remoteslot, newslot, list_make1(&conflicttuple));
}
/* Cleanup. */
@@ -2861,6 +2859,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
EPQState epqstate;
TupleTableSlot *localslot;
+ ConflictTupleInfo conflicttuple = {0};
bool found;
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
@@ -2876,19 +2875,19 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
/* If found delete it. */
if (found)
{
- RepOriginId localorigin;
- TransactionId localxmin;
- TimestampTz localts;
-
/*
* Report the conflict if the tuple was modified by a different
* origin.
*/
- if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
- localorigin != replorigin_session_origin)
+ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
+ &conflicttuple.origin, &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
+ {
+ conflicttuple.slot = localslot;
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
- remoteslot, localslot, NULL,
- InvalidOid, localxmin, localorigin, localts);
+ remoteslot, NULL,
+ list_make1(&conflicttuple));
+ }
EvalPlanQualSetSlot(&epqstate, localslot);
@@ -2903,9 +2902,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
* emitting a log message.
*/
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
- remoteslot, NULL, NULL,
- InvalidOid, InvalidTransactionId,
- InvalidRepOriginId, 0);
+ remoteslot, NULL, list_make1(&conflicttuple));
}
/* Cleanup. */
@@ -3073,9 +3070,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
Relation partrel_new;
bool found;
EPQState epqstate;
- RepOriginId localorigin;
- TransactionId localxmin;
- TimestampTz localts;
+ ConflictTupleInfo conflicttuple = {0};
/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel(edata, partrel,
@@ -3093,11 +3088,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
* The tuple to be updated could not be found. Do nothing
* except for emitting a log message.
*/
- ReportApplyConflict(estate, partrelinfo,
- LOG, CT_UPDATE_MISSING,
- remoteslot_part, NULL, newslot,
- InvalidOid, InvalidTransactionId,
- InvalidRepOriginId, 0);
+ ReportApplyConflict(estate, partrelinfo, LOG,
+ CT_UPDATE_MISSING, remoteslot_part,
+ newslot, list_make1(&conflicttuple));
return;
}
@@ -3106,8 +3099,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
* Report the conflict if the tuple was modified by a
* different origin.
*/
- if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
- localorigin != replorigin_session_origin)
+ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
+ &conflicttuple.origin,
+ &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
{
TupleTableSlot *newslot;
@@ -3115,10 +3110,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
newslot = table_slot_create(partrel, &estate->es_tupleTable);
slot_store_data(newslot, part_entry, newtup);
+ conflicttuple.slot = localslot;
+
ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
- remoteslot_part, localslot, newslot,
- InvalidOid, localxmin, localorigin,
- localts);
+ remoteslot_part, newslot,
+ list_make1(&conflicttuple));
}
/*
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 37454dc9513..06d5d05c560 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -41,6 +41,9 @@ typedef enum
/* The row to be deleted is missing */
CT_DELETE_MISSING,
+ /* The row to be inserted/updated violates multiple unique constraint */
+ CT_MULTIPLE_UNIQUE_CONFLICTS,
+
/*
* Other conflicts, such as exclusion constraint violations, involve more
* complex rules than simple equality checks. These conflicts are left for
@@ -50,6 +53,22 @@ typedef enum
#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+
+/*
+ * Information for the exiting local tuple that caused the conflict.
+ */
+typedef struct ConflictTupleInfo
+{
+ TupleTableSlot *slot;
+ Oid indexoid; /* conflicting index */
+ TransactionId xmin; /* transaction ID that modified the existing
+ * local tuple */
+ RepOriginId origin; /* which origin modified the exiting local
+ * tuple */
+ TimestampTz ts; /* when the exiting local tuple was modified
+ * by the origin */
+} ConflictTupleInfo;
+
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
TransactionId *xmin,
RepOriginId *localorigin,
@@ -57,10 +76,7 @@ extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
int elevel, ConflictType type,
TupleTableSlot *searchslot,
- TupleTableSlot *localslot,
TupleTableSlot *remoteslot,
- Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts);
+ List *conflicttuples);
extern void InitConflictIndexes(ResultRelInfo *relInfo);
-
#endif
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index d40b49714f6..586ffba434e 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -41,6 +41,7 @@ tests += {
't/032_subscribe_use_index.pl',
't/033_run_as_table_owner.pl',
't/034_temporal.pl',
+ 't/035_conflicts.pl',
't/100_bugs.pl',
],
},
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
new file mode 100644
index 00000000000..f1417e313db
--- /dev/null
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -0,0 +1,133 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection of conflict type 'multiple_unique_conflicts'.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################
+# Setup
+###############################
+
+# Create a publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create a subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create a table on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE conf_tab (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);");
+
+# Create same table on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE conf_tab (a int PRIMARY key, b int unique, c int unique);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub_tab FOR TABLE conf_tab");
+
+# Create the subscription
+my $appname = 'sub_tab';
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE SUBSCRIPTION sub_tab
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION pub_tab;");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+##################################################
+# INSERT data on Pub and Sub
+##################################################
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (1,1,1);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,2,2), (3,3,3), (4,4,4);");
+
+##################################################
+# Test multiple_unique_conflicts due to INSERT
+##################################################
+my $log_offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,3,4);");
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+ qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ $log_offset);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(2\); existing local tuple \(2, 2, 2\); remote tuple \(2, 3, 4\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during insertion for conf_tab_pkey (a) = (2)'
+);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(3\); existing local tuple \(3, 3, 3\); remote tuple \(2, 3, 4\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during insertion for conf_tab_b_key (b) = (3)'
+);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_c_key\".*\n.*Key \(c\)=\(4\); existing local tuple \(4, 4, 4\); remote tuple \(2, 3, 4\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during insertion for conf_tab_c_key (c) = (4)'
+);
+
+# Truncate table to get rid of the error
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+##################################################
+# Test multiple_unique_conflicts due to UPDATE
+##################################################
+$log_offset = -s $node_subscriber->logfile;
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (5,5,5);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (6,6,6), (7,7,7), (8,8,8);");
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE conf_tab set a=6, b=7, c=8 where a=5;");
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+ qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ $log_offset);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(6\); existing local tuple \(6, 6, 6\); remote tuple \(6, 7, 8\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during update for conf_tab_pkey (a) = (6)'
+);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(7\); existing local tuple \(7, 7, 7\); remote tuple \(6, 7, 8\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during update for conf_tab_b_key (b) = (7)'
+);
+
+ok( $node_subscriber->log_contains(
+ qr/Key already exists in unique index \"conf_tab_c_key\".*\n.*Key \(c\)=\(8\); existing local tuple \(8, 8, 8\); remote tuple \(6, 7, 8\)./,
+ $log_offset),
+ 'multiple_unique_conflicts detected during update for conf_tab_c_key (c) = (8)'
+);
+
+done_testing();
--
2.34.1
v7-0002-Stats-collection-for-the-multiple_unique_conflict.patchapplication/octet-stream; name=v7-0002-Stats-collection-for-the-multiple_unique_conflict.patchDownload
From cd731a5ae3424f238ba99b56a8b645d7acb6fbe0 Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Fri, 21 Mar 2025 09:36:30 +0530
Subject: [PATCH v7 2/2] Stats collection for the multiple_unique_conflicts
The patch adds a new column 'confl_multiple_unique_conflicts' in view
pg_stat_subscription_stats to support stats collection for this conflict type.
---
doc/src/sgml/monitoring.sgml | 12 ++++++++++++
src/backend/catalog/system_views.sql | 1 +
src/backend/replication/logical/conflict.c | 4 +---
src/backend/utils/adt/pgstatfuncs.c | 6 ++++--
src/include/catalog/pg_proc.dat | 6 +++---
src/include/replication/conflict.h | 2 +-
src/test/regress/expected/rules.out | 3 ++-
src/test/subscription/t/035_conflicts.pl | 18 ++++++++++++++++++
8 files changed, 42 insertions(+), 10 deletions(-)
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index aaa6586d3a4..0960f5ba94a 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2250,6 +2250,18 @@ description | Waiting for a newly initialized WAL file to reach durable storage
</para></entry>
</row>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>confl_multiple_unique_conflicts</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times a row insertion or an updated row values violated multiple
+ <literal>NOT DEFERRABLE</literal> unique constraints during the
+ application of changes. See <xref linkend="conflict-multiple-unique-conflicts"/>
+ for details about this conflict.
+ </para></entry>
+ </row>
+
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index a4d2cfdcaf5..31d269b7ee0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1384,6 +1384,7 @@ CREATE VIEW pg_stat_subscription_stats AS
ss.confl_update_missing,
ss.confl_delete_origin_differs,
ss.confl_delete_missing,
+ ss.confl_multiple_unique_conflicts,
ss.stats_reset
FROM pg_subscription as s,
pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index b1614d3aaf6..63d28900a7f 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -129,9 +129,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
&err_detail);
}
- /* Conflict stats are not gathered for multiple_unique_conflicts */
- if (type != CT_MULTIPLE_UNIQUE_CONFLICTS)
- pgstat_report_subscription_conflict(MySubscription->oid, type);
+ pgstat_report_subscription_conflict(MySubscription->oid, type);
ereport(elevel,
errcode_apply_conflict(type),
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 662ce46cbc2..97af7c6554f 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2171,7 +2171,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 10
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 11
Oid subid = PG_GETARG_OID(0);
TupleDesc tupdesc;
Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
@@ -2203,7 +2203,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 890822eaf79..0d29ef50ff2 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5647,9 +5647,9 @@
{ oid => '6231', descr => 'statistics: information about subscription stats',
proname => 'pg_stat_get_subscription_stats', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
- proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}',
+ proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
prosrc => 'pg_stat_get_subscription_stats' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 06d5d05c560..a467bd86c69 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -51,7 +51,7 @@ typedef enum
*/
} ConflictType;
-#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+#define CONFLICT_NUM_TYPES (CT_MULTIPLE_UNIQUE_CONFLICTS + 1)
/*
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 62f69ac20b2..47478969135 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2157,9 +2157,10 @@ pg_stat_subscription_stats| SELECT ss.subid,
ss.confl_update_missing,
ss.confl_delete_origin_differs,
ss.confl_delete_missing,
+ ss.confl_multiple_unique_conflicts,
ss.stats_reset
FROM pg_subscription s,
- LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset);
+ LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
pg_stat_sys_indexes| SELECT relid,
indexrelid,
schemaname,
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index f1417e313db..e0607e849cb 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -91,6 +91,15 @@ ok( $node_subscriber->log_contains(
# Truncate table to get rid of the error
$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+# Confirm multiple_unique_conflicts in pg_stat_subscription_stats
+is( $node_subscriber->safe_psql(
+ 'postgres',
+ qq(SELECT confl_multiple_unique_conflicts = 1
+ FROM pg_stat_subscription_stats
+ WHERE subname = 'sub_tab')),
+ qq(t),
+ qq(Confirm that multiple_unique_conflicts has encounted 1 time.));
+
##################################################
# Test multiple_unique_conflicts due to UPDATE
##################################################
@@ -130,4 +139,13 @@ ok( $node_subscriber->log_contains(
'multiple_unique_conflicts detected during update for conf_tab_c_key (c) = (8)'
);
+# Confirm multiple_unique_conflicts in pg_stat_subscription_stats
+is( $node_subscriber->safe_psql(
+ 'postgres',
+ qq(SELECT confl_multiple_unique_conflicts = 2
+ FROM pg_stat_subscription_stats
+ WHERE subname = 'sub_tab')),
+ qq(t),
+ qq(Confirm that multiple_unique_conflicts has encounted 2 times.));
+
done_testing();
--
2.34.1
On Fri, Mar 21, 2025 at 12:50 PM Nisha Moond wrote:
Thanks, Hou-san, for the review and fix patches. I’ve incorporated
your suggestions.
Attached are the v7 patches, including patch 002, which implements
stats collection for 'multiple_unique_conflicts' in pg_stat_subscription_stats.
Thanks for updating the patches.
Here are some more comments:
1.
The comments atop of ReportApplyConflict() should be updated to reflect the
updates made to the input parameters.
2.
Add ConflictTupleInfo in typedefs.list.
3.
Few typos exiting => existing
4.
$node_subscriber->wait_for_log(
qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
We should avoid adding the elevel('ERROR') body here as the format could be
changed depending on the log_error_verbosity. Please refer to d13ff82 for
details.
Please see attachment 0001 for the proposed changes.
5.
ok( $node_subscriber->log_contains(
qr/Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(2\); existing local tuple \(2, 2, 2\); remote tuple \(2, 3, 4\)./,
$log_offset),
'multiple_unique_conflicts detected during insertion for conf_tab_pkey (a) = (2)'
);
...
ok( $node_subscriber->log_contains(
qr/Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(3\); existing local tuple \(3, 3, 3\); remote tuple \(2, 3, 4\)./,
$log_offset),
'multiple_unique_conflicts detected during insertion for conf_tab_b_key (b) = (3)'
);
Currently, different detail lines are checked in separate test cases. It would
be clearer to merge these checks, ensuring comprehensive validation
including line breaks. See attachment 0002 for the proposed changes.
6.
The document related to the conflict log format should be updated. E.g., all the
places that mentioned insert|update_exists might need to mention the new
multi-key conflict as well. And it would be better to mention there would be
multiple detail lines in the new conflict. Please see attachment 0003 for the
proposed changes.
Best Regards,
Hou zj
Attachments:
0001-fix-comments-and-tests.patch.txttext/plain; name=0001-fix-comments-and-tests.patch.txtDownload
From 20e2d08ba853a61523b6356b12cd600cc31217ad Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Fri, 21 Mar 2025 13:50:27 +0800
Subject: [PATCH v2 1/3] fix comments and tests
---
src/backend/replication/logical/conflict.c | 9 ++-------
src/include/replication/conflict.h | 15 ++++++---------
src/test/subscription/t/035_conflicts.pl | 2 +-
src/tools/pgindent/typedefs.list | 1 +
4 files changed, 10 insertions(+), 17 deletions(-)
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index b1614d3aaf6..9295c8ef6c1 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -91,15 +91,10 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
* 'searchslot' should contain the tuple used to search the local tuple to be
* updated or deleted.
*
- * 'conflictslots' list contains the existing local tuples, if any, that
- * conflicts with the remote tuple. 'localxmins', 'localorigins', and 'localts'
- * provide the transaction information related to the existing local tuples.
- *
* 'remoteslot' should contain the remote new tuple, if any.
*
- * The 'conflictindexes' list represents the OIDs of the unique index that
- * triggered the constraint violation error. We use this to report the key
- * values for conflicting tuple.
+ * 'conflicttuples' should be a list composed of ConflictTupleInfo pointer.
+ * Refer to the ConflictTupleInfo structure comments for details.
*
* The caller must ensure that the index with the OID 'indexoid' is locked so
* that we can fetch and display the conflicting key value.
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 06d5d05c560..4d0bab0dd28 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -55,18 +55,15 @@ typedef enum
/*
- * Information for the exiting local tuple that caused the conflict.
+ * Information for the existing local tuple that caused the conflict.
*/
typedef struct ConflictTupleInfo
{
- TupleTableSlot *slot;
- Oid indexoid; /* conflicting index */
- TransactionId xmin; /* transaction ID that modified the existing
- * local tuple */
- RepOriginId origin; /* which origin modified the exiting local
- * tuple */
- TimestampTz ts; /* when the exiting local tuple was modified
- * by the origin */
+ TupleTableSlot *slot; /* tuple slot holding the conflicting local tuple */
+ Oid indexoid; /* OID of the index where the conflict occurred */
+ TransactionId xmin; /* transaction ID of the modification causing the conflict */
+ RepOriginId origin; /* origin identifier of the modification */
+ TimestampTz ts; /* timestamp of when the modification on the conflicting local tuple occurred */
} ConflictTupleInfo;
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index f1417e313db..37f8af07995 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -109,7 +109,7 @@ $node_publisher->safe_psql('postgres',
# Confirm that this causes an error on the subscriber
$node_subscriber->wait_for_log(
- qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
$log_offset);
ok( $node_subscriber->log_contains(
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index bfa276d2d35..3fbf5a4c212 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -480,6 +480,7 @@ ConditionVariableMinimallyPadded
ConditionalStack
ConfigData
ConfigVariable
+ConflictTupleInfo
ConflictType
ConnCacheEntry
ConnCacheKey
--
2.30.0.windows.2
0002-merge-tests.patch.txttext/plain; name=0002-merge-tests.patch.txtDownload
From 12bd20f5d0deb1b31520a641df884034f8adf044 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Fri, 21 Mar 2025 14:09:01 +0800
Subject: [PATCH v2 2/3] merge tests
---
src/test/subscription/t/035_conflicts.pl | 52 ++++++++----------------
1 file changed, 16 insertions(+), 36 deletions(-)
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index 37f8af07995..d06e64747cb 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -67,26 +67,16 @@ $node_publisher->safe_psql('postgres',
# Confirm that this causes an error on the subscriber
$node_subscriber->wait_for_log(
- qr/ERROR: conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts.*
+.*Key already exists in unique index \"conf_tab_pkey\".*
+.*Key \(a\)=\(2\); existing local tuple \(2, 2, 2\); remote tuple \(2, 3, 4\).*
+.*Key already exists in unique index \"conf_tab_b_key\".*
+.*Key \(b\)=\(3\); existing local tuple \(3, 3, 3\); remote tuple \(2, 3, 4\).*
+.*Key already exists in unique index \"conf_tab_c_key\".*
+.*Key \(c\)=\(4\); existing local tuple \(4, 4, 4\); remote tuple \(2, 3, 4\)./,
$log_offset);
-ok( $node_subscriber->log_contains(
- qr/Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(2\); existing local tuple \(2, 2, 2\); remote tuple \(2, 3, 4\)./,
- $log_offset),
- 'multiple_unique_conflicts detected during insertion for conf_tab_pkey (a) = (2)'
-);
-
-ok( $node_subscriber->log_contains(
- qr/Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(3\); existing local tuple \(3, 3, 3\); remote tuple \(2, 3, 4\)./,
- $log_offset),
- 'multiple_unique_conflicts detected during insertion for conf_tab_b_key (b) = (3)'
-);
-
-ok( $node_subscriber->log_contains(
- qr/Key already exists in unique index \"conf_tab_c_key\".*\n.*Key \(c\)=\(4\); existing local tuple \(4, 4, 4\); remote tuple \(2, 3, 4\)./,
- $log_offset),
- 'multiple_unique_conflicts detected during insertion for conf_tab_c_key (c) = (4)'
-);
+pass('multiple_unique_conflicts detected during update');
# Truncate table to get rid of the error
$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
@@ -109,25 +99,15 @@ $node_publisher->safe_psql('postgres',
# Confirm that this causes an error on the subscriber
$node_subscriber->wait_for_log(
- qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+ qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts.*
+.*Key already exists in unique index \"conf_tab_pkey\".*
+.*Key \(a\)=\(6\); existing local tuple \(6, 6, 6\); remote tuple \(6, 7, 8\).*
+.*Key already exists in unique index \"conf_tab_b_key\".*
+.*Key \(b\)=\(7\); existing local tuple \(7, 7, 7\); remote tuple \(6, 7, 8\).*
+.*Key already exists in unique index \"conf_tab_c_key\".*
+.*Key \(c\)=\(8\); existing local tuple \(8, 8, 8\); remote tuple \(6, 7, 8\)./,
$log_offset);
-ok( $node_subscriber->log_contains(
- qr/Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(6\); existing local tuple \(6, 6, 6\); remote tuple \(6, 7, 8\)./,
- $log_offset),
- 'multiple_unique_conflicts detected during update for conf_tab_pkey (a) = (6)'
-);
-
-ok( $node_subscriber->log_contains(
- qr/Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(7\); existing local tuple \(7, 7, 7\); remote tuple \(6, 7, 8\)./,
- $log_offset),
- 'multiple_unique_conflicts detected during update for conf_tab_b_key (b) = (7)'
-);
-
-ok( $node_subscriber->log_contains(
- qr/Key already exists in unique index \"conf_tab_c_key\".*\n.*Key \(c\)=\(8\); existing local tuple \(8, 8, 8\); remote tuple \(6, 7, 8\)./,
- $log_offset),
- 'multiple_unique_conflicts detected during update for conf_tab_c_key (c) = (8)'
-);
+pass('multiple_unique_conflicts detected during insert');
done_testing();
--
2.30.0.windows.2
0003-add-documents.patch.txttext/plain; name=0003-add-documents.patch.txtDownload
From d4b7282365b7112953bcf3769f705bb4994ddd8d Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Fri, 21 Mar 2025 14:27:23 +0800
Subject: [PATCH v2 3/3] add documents
---
doc/src/sgml/logical-replication.sgml | 18 ++++++++++++++----
1 file changed, 14 insertions(+), 4 deletions(-)
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 4637e898b9f..abade232473 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1948,8 +1948,8 @@ DETAIL: <replaceable class="parameter">detailed_explanation</replaceable>.
<para>
The <literal>Key</literal> section includes the key values of the local
tuple that violated a unique constraint for
- <literal>insert_exists</literal> or <literal>update_exists</literal>
- conflicts.
+ <literal>insert_exists</literal>, <literal>update_exists</literal> or
+ <literal>multiple_unique_conflicts</literal> conflicts.
</para>
</listitem>
<listitem>
@@ -1958,8 +1958,8 @@ DETAIL: <replaceable class="parameter">detailed_explanation</replaceable>.
tuple if its origin differs from the remote tuple for
<literal>update_origin_differs</literal> or <literal>delete_origin_differs</literal>
conflicts, or if the key value conflicts with the remote tuple for
- <literal>insert_exists</literal> or <literal>update_exists</literal>
- conflicts.
+ <literal>insert_exists</literal>, <literal>update_exists</literal> or
+ <literal>multiple_unique_conflicts</literal> conflicts.
</para>
</listitem>
<listitem>
@@ -1995,6 +1995,16 @@ DETAIL: <replaceable class="parameter">detailed_explanation</replaceable>.
The large column values are truncated to 64 bytes.
</para>
</listitem>
+ <listitem>
+ <para>
+ Note that in case of <literal>multiple_unique_conflicts</literal> conflict,
+ multiple <replaceable class="parameter">detailed_explanation</replaceable>
+ and <replaceable class="parameter">detail_values</replaceable> lines
+ will be generated, each detailing the conflict information associated
+ with distinct unique
+ constraints.
+ </para>
+ </listitem>
</itemizedlist>
</listitem>
</varlistentry>
--
2.30.0.windows.2
On Fri, Mar 21, 2025 at 1:48 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:
On Fri, Mar 21, 2025 at 12:50 PM Nisha Moond wrote:
Thanks, Hou-san, for the review and fix patches. I’ve incorporated
your suggestions.
Attached are the v7 patches, including patch 002, which implements
stats collection for 'multiple_unique_conflicts' in pg_stat_subscription_stats.Thanks for updating the patches.
Here are some more comments:
1.
The comments atop of ReportApplyConflict() should be updated to reflect the
updates made to the input parameters.
Right, I also noticed this and updated it in the attached. Apart from
this, I have made a number of cosmetic changes in the attached. Please
review and include it in the next version unless you see any problem
with it.
--
With Regards,
Amit Kapila.
Attachments:
v7_0001_amit_1.patch.txttext/plain; charset=US-ASCII; name=v7_0001_amit_1.patch.txtDownload
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index a07f7f09f32..ede89ea3cf9 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -515,7 +515,7 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
}
}
- /* Report the conflict if found */
+ /* Report the conflict, if found */
if (conflicttuples)
ReportApplyConflict(estate, resultRelInfo, ERROR,
list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index b1614d3aaf6..257a60b9391 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -91,18 +91,13 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
* 'searchslot' should contain the tuple used to search the local tuple to be
* updated or deleted.
*
- * 'conflictslots' list contains the existing local tuples, if any, that
- * conflicts with the remote tuple. 'localxmins', 'localorigins', and 'localts'
- * provide the transaction information related to the existing local tuples.
- *
* 'remoteslot' should contain the remote new tuple, if any.
*
- * The 'conflictindexes' list represents the OIDs of the unique index that
- * triggered the constraint violation error. We use this to report the key
- * values for conflicting tuple.
+ * conflicttuples is a list of local tuples that caused the conflict and the
+ * conflict related information. See ConflictTupleInfo.
*
- * The caller must ensure that the index with the OID 'indexoid' is locked so
- * that we can fetch and display the conflicting key value.
+ * The caller must ensure that all the indexes passed in ConflictTupleInfo are
+ * locked so that we can fetch and display the conflicting key values.
*/
void
ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
@@ -114,12 +109,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
initStringInfo(&err_detail);
- /*
- * Iterate over conflicting tuples, along with their commit timestamps,
- * origins, and the conflicting indexes to assemble an errdetail() line.
- */
+ /* Form errdetail message by combining conflicting tuples information. */
foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
- {
errdetail_apply_conflict(estate, relinfo, type, searchslot,
conflicttuple->slot, remoteslot,
conflicttuple->indexoid,
@@ -127,7 +118,6 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
conflicttuple->origin,
conflicttuple->ts,
&err_detail);
- }
/* Conflict stats are not gathered for multiple_unique_conflicts */
if (type != CT_MULTIPLE_UNIQUE_CONFLICTS)
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 06d5d05c560..da7845744cd 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -53,7 +53,6 @@ typedef enum
#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
-
/*
* Information for the exiting local tuple that caused the conflict.
*/
@@ -63,7 +62,7 @@ typedef struct ConflictTupleInfo
Oid indexoid; /* conflicting index */
TransactionId xmin; /* transaction ID that modified the existing
* local tuple */
- RepOriginId origin; /* which origin modified the exiting local
+ RepOriginId origin; /* origin that modified the exiting local
* tuple */
TimestampTz ts; /* when the exiting local tuple was modified
* by the origin */
On Fri, Mar 21, 2025 at 3:38 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Mar 21, 2025 at 1:48 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:On Fri, Mar 21, 2025 at 12:50 PM Nisha Moond wrote:
Thanks, Hou-san, for the review and fix patches. I’ve incorporated
your suggestions.
Attached are the v7 patches, including patch 002, which implements
stats collection for 'multiple_unique_conflicts' in pg_stat_subscription_stats.Thanks for updating the patches.
Here are some more comments:
1.
The comments atop of ReportApplyConflict() should be updated to reflect the
updates made to the input parameters.Right, I also noticed this and updated it in the attached. Apart from
this, I have made a number of cosmetic changes in the attached. Please
review and include it in the next version unless you see any problem
with it.
Thanks for the review.
I’ve incorporated the changes and also addressed Hou-san’s comments
from [1]/messages/by-id/OS0PR01MB57169BA3CB76A79B30223E3894DB2@OS0PR01MB5716.jpnprd01.prod.outlook.com with suggested changes.
Attached are the v8 patches.
[1]: /messages/by-id/OS0PR01MB57169BA3CB76A79B30223E3894DB2@OS0PR01MB5716.jpnprd01.prod.outlook.com
--
Thanks,
Nisha
Attachments:
v8-0002-Stats-collection-for-the-multiple_unique_conflict.patchapplication/x-patch; name=v8-0002-Stats-collection-for-the-multiple_unique_conflict.patchDownload
From 918860468ab0b101aeceed68668c41ccdca06142 Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Fri, 21 Mar 2025 16:04:09 +0530
Subject: [PATCH v8 2/2] Stats collection for the multiple_unique_conflicts
The patch adds a new column 'confl_multiple_unique_conflicts' in view
pg_stat_subscription_stats to support stats collection for this conflict type.
---
doc/src/sgml/monitoring.sgml | 12 ++++++++++++
src/backend/catalog/system_views.sql | 1 +
src/backend/replication/logical/conflict.c | 4 +---
src/backend/utils/adt/pgstatfuncs.c | 6 ++++--
src/include/catalog/pg_proc.dat | 6 +++---
src/include/replication/conflict.h | 2 +-
src/test/regress/expected/rules.out | 3 ++-
7 files changed, 24 insertions(+), 10 deletions(-)
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index aaa6586d3a4..0960f5ba94a 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2250,6 +2250,18 @@ description | Waiting for a newly initialized WAL file to reach durable storage
</para></entry>
</row>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>confl_multiple_unique_conflicts</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times a row insertion or an updated row values violated multiple
+ <literal>NOT DEFERRABLE</literal> unique constraints during the
+ application of changes. See <xref linkend="conflict-multiple-unique-conflicts"/>
+ for details about this conflict.
+ </para></entry>
+ </row>
+
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index a4d2cfdcaf5..31d269b7ee0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1384,6 +1384,7 @@ CREATE VIEW pg_stat_subscription_stats AS
ss.confl_update_missing,
ss.confl_delete_origin_differs,
ss.confl_delete_missing,
+ ss.confl_multiple_unique_conflicts,
ss.stats_reset
FROM pg_subscription as s,
pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 257a60b9391..f1e92f2fc1a 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -119,9 +119,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
conflicttuple->ts,
&err_detail);
- /* Conflict stats are not gathered for multiple_unique_conflicts */
- if (type != CT_MULTIPLE_UNIQUE_CONFLICTS)
- pgstat_report_subscription_conflict(MySubscription->oid, type);
+ pgstat_report_subscription_conflict(MySubscription->oid, type);
ereport(elevel,
errcode_apply_conflict(type),
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 662ce46cbc2..97af7c6554f 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2171,7 +2171,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 10
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 11
Oid subid = PG_GETARG_OID(0);
TupleDesc tupdesc;
Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
@@ -2203,7 +2203,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 890822eaf79..0d29ef50ff2 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5647,9 +5647,9 @@
{ oid => '6231', descr => 'statistics: information about subscription stats',
proname => 'pg_stat_get_subscription_stats', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
- proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}',
+ proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
prosrc => 'pg_stat_get_subscription_stats' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 78b2d9fecc5..6c59125f256 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -51,7 +51,7 @@ typedef enum
*/
} ConflictType;
-#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+#define CONFLICT_NUM_TYPES (CT_MULTIPLE_UNIQUE_CONFLICTS + 1)
/*
* Information for the existing local tuple that caused the conflict.
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 62f69ac20b2..47478969135 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2157,9 +2157,10 @@ pg_stat_subscription_stats| SELECT ss.subid,
ss.confl_update_missing,
ss.confl_delete_origin_differs,
ss.confl_delete_missing,
+ ss.confl_multiple_unique_conflicts,
ss.stats_reset
FROM pg_subscription s,
- LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset);
+ LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
pg_stat_sys_indexes| SELECT relid,
indexrelid,
schemaname,
--
2.34.1
v8-0001-Implement-the-conflict-detection-for-multiple_uni.patchapplication/x-patch; name=v8-0001-Implement-the-conflict-detection-for-multiple_uni.patchDownload
From 008eef82c2fca1976883e8e2b50758ce6623f33b Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Tue, 18 Mar 2025 12:27:48 +0530
Subject: [PATCH v8 1/2] Implement the conflict detection for
multiple_unique_conflicts in logical replication
Introduce a new conflict type, multiple_unique_conflicts, to handle cases
where an incoming row during logical replication violates multiple UNIQUE
constraints.
Previously, the apply worker detected and reported only the first
encountered key conflict (insert_exists/update_exists), causing repeated
failures as each constraint violation need to be handled one by one making
the process slow and error-prone.
Now, the apply worker checks all unique constraints upfront and reports
multiple_unique_conflicts if multiple violations exist. This allows users
to resolve all conflicts at once by deleting all conflicting tuples rather
than dealing with them individually or skipping the transaction.
The CONFLICT_NUM_TYPES is not incremented since subscription stats do not
support multiple_unique_conflicts in this patch.
---
doc/src/sgml/logical-replication.sgml | 31 +++++-
src/backend/executor/execReplication.c | 30 ++++--
src/backend/replication/logical/conflict.c | 68 ++++++++-----
src/backend/replication/logical/worker.c | 68 ++++++-------
src/include/replication/conflict.h | 24 ++++-
src/test/subscription/meson.build | 1 +
src/test/subscription/t/035_conflicts.pl | 113 +++++++++++++++++++++
src/tools/pgindent/typedefs.list | 1 +
8 files changed, 255 insertions(+), 81 deletions(-)
create mode 100644 src/test/subscription/t/035_conflicts.pl
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 3d18e507bbc..abade232473 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1877,6 +1877,19 @@ test_sub=# SELECT * from tab_gen_to_gen;
</para>
</listitem>
</varlistentry>
+ <varlistentry id="conflict-multiple-unique-conflicts" xreflabel="multiple_unique_conflicts">
+ <term><literal>multiple_unique_conflicts</literal></term>
+ <listitem>
+ <para>
+ Inserting or updating a row violates multiple
+ <literal>NOT DEFERRABLE</literal> unique constraints. Note that to log
+ the origin and commit timestamp details of conflicting keys, ensure
+ that <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+ is enabled on the subscriber. In this case, an error will be raised until
+ the conflict is resolved manually.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
Note that there are other conflict scenarios, such as exclusion constraint
violations. Currently, we do not provide additional details for them in the
@@ -1935,8 +1948,8 @@ DETAIL: <replaceable class="parameter">detailed_explanation</replaceable>.
<para>
The <literal>Key</literal> section includes the key values of the local
tuple that violated a unique constraint for
- <literal>insert_exists</literal> or <literal>update_exists</literal>
- conflicts.
+ <literal>insert_exists</literal>, <literal>update_exists</literal> or
+ <literal>multiple_unique_conflicts</literal> conflicts.
</para>
</listitem>
<listitem>
@@ -1945,8 +1958,8 @@ DETAIL: <replaceable class="parameter">detailed_explanation</replaceable>.
tuple if its origin differs from the remote tuple for
<literal>update_origin_differs</literal> or <literal>delete_origin_differs</literal>
conflicts, or if the key value conflicts with the remote tuple for
- <literal>insert_exists</literal> or <literal>update_exists</literal>
- conflicts.
+ <literal>insert_exists</literal>, <literal>update_exists</literal> or
+ <literal>multiple_unique_conflicts</literal> conflicts.
</para>
</listitem>
<listitem>
@@ -1982,6 +1995,16 @@ DETAIL: <replaceable class="parameter">detailed_explanation</replaceable>.
The large column values are truncated to 64 bytes.
</para>
</listitem>
+ <listitem>
+ <para>
+ Note that in case of <literal>multiple_unique_conflicts</literal> conflict,
+ multiple <replaceable class="parameter">detailed_explanation</replaceable>
+ and <replaceable class="parameter">detail_values</replaceable> lines
+ will be generated, each detailing the conflict information associated
+ with distinct unique
+ constraints.
+ </para>
+ </listitem>
</itemizedlist>
</listitem>
</varlistentry>
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 0a9b880d250..ede89ea3cf9 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -493,25 +493,33 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
ConflictType type, List *recheckIndexes,
TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
{
- /* Check all the unique indexes for a conflict */
+ List *conflicttuples = NIL;
+ TupleTableSlot *conflictslot;
+
+ /* Check all the unique indexes for conflicts */
foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
{
- TupleTableSlot *conflictslot;
-
if (list_member_oid(recheckIndexes, uniqueidx) &&
FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
&conflictslot))
{
- RepOriginId origin;
- TimestampTz committs;
- TransactionId xmin;
-
- GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
- ReportApplyConflict(estate, resultRelInfo, ERROR, type,
- searchslot, conflictslot, remoteslot,
- uniqueidx, xmin, origin, committs);
+ ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);
+
+ conflicttuple->slot = conflictslot;
+ conflicttuple->indexoid = uniqueidx;
+
+ GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
+ &conflicttuple->origin, &conflicttuple->ts);
+
+ conflicttuples = lappend(conflicttuples, conflicttuple);
}
}
+
+ /* Report the conflict, if found */
+ if (conflicttuples)
+ ReportApplyConflict(estate, resultRelInfo, ERROR,
+ list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
+ searchslot, remoteslot, conflicttuples);
}
/*
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 772fc83e88b..257a60b9391 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -29,11 +29,12 @@ static const char *const ConflictTypeNames[] = {
[CT_UPDATE_EXISTS] = "update_exists",
[CT_UPDATE_MISSING] = "update_missing",
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
- [CT_DELETE_MISSING] = "delete_missing"
+ [CT_DELETE_MISSING] = "delete_missing",
+ [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
static int errcode_apply_conflict(ConflictType type);
-static int errdetail_apply_conflict(EState *estate,
+static void errdetail_apply_conflict(EState *estate,
ResultRelInfo *relinfo,
ConflictType type,
TupleTableSlot *searchslot,
@@ -41,7 +42,7 @@ static int errdetail_apply_conflict(EState *estate,
TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
RepOriginId localorigin,
- TimestampTz localts);
+ TimestampTz localts, StringInfo err_msg);
static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
ConflictType type,
TupleTableSlot *searchslot,
@@ -90,32 +91,37 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
* 'searchslot' should contain the tuple used to search the local tuple to be
* updated or deleted.
*
- * 'localslot' should contain the existing local tuple, if any, that conflicts
- * with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the
- * transaction information related to this existing local tuple.
- *
* 'remoteslot' should contain the remote new tuple, if any.
*
- * The 'indexoid' represents the OID of the unique index that triggered the
- * constraint violation error. We use this to report the key values for
- * conflicting tuple.
+ * conflicttuples is a list of local tuples that caused the conflict and the
+ * conflict related information. See ConflictTupleInfo.
*
- * The caller must ensure that the index with the OID 'indexoid' is locked so
- * that we can fetch and display the conflicting key value.
+ * The caller must ensure that all the indexes passed in ConflictTupleInfo are
+ * locked so that we can fetch and display the conflicting key values.
*/
void
ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictType type, TupleTableSlot *searchslot,
- TupleTableSlot *localslot, TupleTableSlot *remoteslot,
- Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts)
+ TupleTableSlot *remoteslot, List *conflicttuples)
{
Relation localrel = relinfo->ri_RelationDesc;
+ StringInfoData err_detail;
+
+ initStringInfo(&err_detail);
- Assert(!OidIsValid(indexoid) ||
- CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+ /* Form errdetail message by combining conflicting tuples information. */
+ foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
+ errdetail_apply_conflict(estate, relinfo, type, searchslot,
+ conflicttuple->slot, remoteslot,
+ conflicttuple->indexoid,
+ conflicttuple->xmin,
+ conflicttuple->origin,
+ conflicttuple->ts,
+ &err_detail);
- pgstat_report_subscription_conflict(MySubscription->oid, type);
+ /* Conflict stats are not gathered for multiple_unique_conflicts */
+ if (type != CT_MULTIPLE_UNIQUE_CONFLICTS)
+ pgstat_report_subscription_conflict(MySubscription->oid, type);
ereport(elevel,
errcode_apply_conflict(type),
@@ -123,9 +129,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
get_namespace_name(RelationGetNamespace(localrel)),
RelationGetRelationName(localrel),
ConflictTypeNames[type]),
- errdetail_apply_conflict(estate, relinfo, type, searchslot,
- localslot, remoteslot, indexoid,
- localxmin, localorigin, localts));
+ errdetail_internal("%s", err_detail.data));
}
/*
@@ -169,6 +173,7 @@ errcode_apply_conflict(ConflictType type)
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
return errcode(ERRCODE_UNIQUE_VIOLATION);
case CT_UPDATE_ORIGIN_DIFFERS:
case CT_UPDATE_MISSING:
@@ -191,12 +196,13 @@ errcode_apply_conflict(ConflictType type)
* replica identity columns, if any. The remote old tuple is excluded as its
* information is covered in the replica identity columns.
*/
-static int
+static void
errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
ConflictType type, TupleTableSlot *searchslot,
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts)
+ RepOriginId localorigin, TimestampTz localts,
+ StringInfo err_msg)
{
StringInfoData err_detail;
char *val_desc;
@@ -209,7 +215,9 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
- Assert(OidIsValid(indexoid));
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
+ Assert(OidIsValid(indexoid) &&
+ CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
if (localts)
{
@@ -291,7 +299,14 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
if (val_desc)
appendStringInfo(&err_detail, "\n%s", val_desc);
- return errdetail_internal("%s", err_detail.data);
+ /*
+ * Insert a blank line to visually separate the new detail line from the
+ * existing ones.
+ */
+ if (err_msg->len > 0)
+ appendStringInfoChar(err_msg, '\n');
+
+ appendStringInfo(err_msg, "%s", err_detail.data);
}
/*
@@ -323,7 +338,8 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
* Report the conflicting key values in the case of a unique constraint
* violation.
*/
- if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
+ if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
+ type == CT_MULTIPLE_UNIQUE_CONFLICTS)
{
Assert(OidIsValid(indexoid) && localslot);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 31ab69ea13a..e3b2b144942 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2674,7 +2674,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
LogicalRepRelMapEntry *relmapentry = edata->targetRel;
Relation localrel = relinfo->ri_RelationDesc;
EPQState epqstate;
- TupleTableSlot *localslot;
+ TupleTableSlot *localslot = NULL;
+ ConflictTupleInfo conflicttuple = {0};
bool found;
MemoryContext oldctx;
@@ -2693,16 +2694,13 @@ apply_handle_update_internal(ApplyExecutionData *edata,
*/
if (found)
{
- RepOriginId localorigin;
- TransactionId localxmin;
- TimestampTz localts;
-
/*
* Report the conflict if the tuple was modified by a different
* origin.
*/
- if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
- localorigin != replorigin_session_origin)
+ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
+ &conflicttuple.origin, &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
{
TupleTableSlot *newslot;
@@ -2710,9 +2708,11 @@ apply_handle_update_internal(ApplyExecutionData *edata,
newslot = table_slot_create(localrel, &estate->es_tupleTable);
slot_store_data(newslot, relmapentry, newtup);
+ conflicttuple.slot = localslot;
+
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
- remoteslot, localslot, newslot,
- InvalidOid, localxmin, localorigin, localts);
+ remoteslot, newslot,
+ list_make1(&conflicttuple));
}
/* Process and store remote tuple in the slot */
@@ -2741,9 +2741,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
* emitting a log message.
*/
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
- remoteslot, NULL, newslot,
- InvalidOid, InvalidTransactionId,
- InvalidRepOriginId, 0);
+ remoteslot, newslot, list_make1(&conflicttuple));
}
/* Cleanup. */
@@ -2861,6 +2859,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
EPQState epqstate;
TupleTableSlot *localslot;
+ ConflictTupleInfo conflicttuple = {0};
bool found;
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
@@ -2876,19 +2875,19 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
/* If found delete it. */
if (found)
{
- RepOriginId localorigin;
- TransactionId localxmin;
- TimestampTz localts;
-
/*
* Report the conflict if the tuple was modified by a different
* origin.
*/
- if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
- localorigin != replorigin_session_origin)
+ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
+ &conflicttuple.origin, &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
+ {
+ conflicttuple.slot = localslot;
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
- remoteslot, localslot, NULL,
- InvalidOid, localxmin, localorigin, localts);
+ remoteslot, NULL,
+ list_make1(&conflicttuple));
+ }
EvalPlanQualSetSlot(&epqstate, localslot);
@@ -2903,9 +2902,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
* emitting a log message.
*/
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
- remoteslot, NULL, NULL,
- InvalidOid, InvalidTransactionId,
- InvalidRepOriginId, 0);
+ remoteslot, NULL, list_make1(&conflicttuple));
}
/* Cleanup. */
@@ -3073,9 +3070,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
Relation partrel_new;
bool found;
EPQState epqstate;
- RepOriginId localorigin;
- TransactionId localxmin;
- TimestampTz localts;
+ ConflictTupleInfo conflicttuple = {0};
/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel(edata, partrel,
@@ -3093,11 +3088,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
* The tuple to be updated could not be found. Do nothing
* except for emitting a log message.
*/
- ReportApplyConflict(estate, partrelinfo,
- LOG, CT_UPDATE_MISSING,
- remoteslot_part, NULL, newslot,
- InvalidOid, InvalidTransactionId,
- InvalidRepOriginId, 0);
+ ReportApplyConflict(estate, partrelinfo, LOG,
+ CT_UPDATE_MISSING, remoteslot_part,
+ newslot, list_make1(&conflicttuple));
return;
}
@@ -3106,8 +3099,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
* Report the conflict if the tuple was modified by a
* different origin.
*/
- if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
- localorigin != replorigin_session_origin)
+ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
+ &conflicttuple.origin,
+ &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
{
TupleTableSlot *newslot;
@@ -3115,10 +3110,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
newslot = table_slot_create(partrel, &estate->es_tupleTable);
slot_store_data(newslot, part_entry, newtup);
+ conflicttuple.slot = localslot;
+
ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
- remoteslot_part, localslot, newslot,
- InvalidOid, localxmin, localorigin,
- localts);
+ remoteslot_part, newslot,
+ list_make1(&conflicttuple));
}
/*
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 37454dc9513..78b2d9fecc5 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -41,6 +41,9 @@ typedef enum
/* The row to be deleted is missing */
CT_DELETE_MISSING,
+ /* The row to be inserted/updated violates multiple unique constraint */
+ CT_MULTIPLE_UNIQUE_CONFLICTS,
+
/*
* Other conflicts, such as exclusion constraint violations, involve more
* complex rules than simple equality checks. These conflicts are left for
@@ -50,6 +53,22 @@ typedef enum
#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+/*
+ * Information for the existing local tuple that caused the conflict.
+ */
+typedef struct ConflictTupleInfo
+{
+ TupleTableSlot *slot; /* tuple slot holding the conflicting local
+ * tuple */
+ Oid indexoid; /* OID of the index where the conflict
+ * occurred */
+ TransactionId xmin; /* transaction ID of the modification causing
+ * the conflict */
+ RepOriginId origin; /* origin identifier of the modification */
+ TimestampTz ts; /* timestamp of when the modification on the
+ * conflicting local tuple occurred */
+} ConflictTupleInfo;
+
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
TransactionId *xmin,
RepOriginId *localorigin,
@@ -57,10 +76,7 @@ extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
int elevel, ConflictType type,
TupleTableSlot *searchslot,
- TupleTableSlot *localslot,
TupleTableSlot *remoteslot,
- Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts);
+ List *conflicttuples);
extern void InitConflictIndexes(ResultRelInfo *relInfo);
-
#endif
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index d40b49714f6..586ffba434e 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -41,6 +41,7 @@ tests += {
't/032_subscribe_use_index.pl',
't/033_run_as_table_owner.pl',
't/034_temporal.pl',
+ 't/035_conflicts.pl',
't/100_bugs.pl',
],
},
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
new file mode 100644
index 00000000000..d06e64747cb
--- /dev/null
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -0,0 +1,113 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection of conflict type 'multiple_unique_conflicts'.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################
+# Setup
+###############################
+
+# Create a publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create a subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create a table on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE conf_tab (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);");
+
+# Create same table on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE conf_tab (a int PRIMARY key, b int unique, c int unique);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub_tab FOR TABLE conf_tab");
+
+# Create the subscription
+my $appname = 'sub_tab';
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE SUBSCRIPTION sub_tab
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION pub_tab;");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+##################################################
+# INSERT data on Pub and Sub
+##################################################
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (1,1,1);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,2,2), (3,3,3), (4,4,4);");
+
+##################################################
+# Test multiple_unique_conflicts due to INSERT
+##################################################
+my $log_offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,3,4);");
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+ qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts.*
+.*Key already exists in unique index \"conf_tab_pkey\".*
+.*Key \(a\)=\(2\); existing local tuple \(2, 2, 2\); remote tuple \(2, 3, 4\).*
+.*Key already exists in unique index \"conf_tab_b_key\".*
+.*Key \(b\)=\(3\); existing local tuple \(3, 3, 3\); remote tuple \(2, 3, 4\).*
+.*Key already exists in unique index \"conf_tab_c_key\".*
+.*Key \(c\)=\(4\); existing local tuple \(4, 4, 4\); remote tuple \(2, 3, 4\)./,
+ $log_offset);
+
+pass('multiple_unique_conflicts detected during update');
+
+# Truncate table to get rid of the error
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+##################################################
+# Test multiple_unique_conflicts due to UPDATE
+##################################################
+$log_offset = -s $node_subscriber->logfile;
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (5,5,5);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (6,6,6), (7,7,7), (8,8,8);");
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE conf_tab set a=6, b=7, c=8 where a=5;");
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+ qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts.*
+.*Key already exists in unique index \"conf_tab_pkey\".*
+.*Key \(a\)=\(6\); existing local tuple \(6, 6, 6\); remote tuple \(6, 7, 8\).*
+.*Key already exists in unique index \"conf_tab_b_key\".*
+.*Key \(b\)=\(7\); existing local tuple \(7, 7, 7\); remote tuple \(6, 7, 8\).*
+.*Key already exists in unique index \"conf_tab_c_key\".*
+.*Key \(c\)=\(8\); existing local tuple \(8, 8, 8\); remote tuple \(6, 7, 8\)./,
+ $log_offset);
+
+pass('multiple_unique_conflicts detected during insert');
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index bfa276d2d35..3fbf5a4c212 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -480,6 +480,7 @@ ConditionVariableMinimallyPadded
ConditionalStack
ConfigData
ConfigVariable
+ConflictTupleInfo
ConflictType
ConnCacheEntry
ConnCacheKey
--
2.34.1
On Fri, Mar 21, 2025 at 4:43 PM Nisha Moond <nisha.moond412@gmail.com> wrote:
Attached are the v8 patches.
Thanks, the patches look good to me, so pushed.
--
With Regards,
Amit Kapila.
On Fri, 21 Mar 2025 at 16:44, Nisha Moond <nisha.moond412@gmail.com> wrote:
On Fri, Mar 21, 2025 at 3:38 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Mar 21, 2025 at 1:48 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:On Fri, Mar 21, 2025 at 12:50 PM Nisha Moond wrote:
Thanks, Hou-san, for the review and fix patches. I’ve incorporated
your suggestions.
Attached are the v7 patches, including patch 002, which implements
stats collection for 'multiple_unique_conflicts' in pg_stat_subscription_stats.Thanks for updating the patches.
Here are some more comments:
1.
The comments atop of ReportApplyConflict() should be updated to reflect the
updates made to the input parameters.Right, I also noticed this and updated it in the attached. Apart from
this, I have made a number of cosmetic changes in the attached. Please
review and include it in the next version unless you see any problem
with it.Thanks for the review.
I’ve incorporated the changes and also addressed Hou-san’s comments
from [1] with suggested changes.Attached are the v8 patches.
Shouldn't these statements be the other way:
pass('multiple_unique_conflicts detected during update');
should have been:
pass('multiple_unique_conflicts detected during insert');
and
pass('multiple_unique_conflicts detected during insert');
should have been:
pass('multiple_unique_conflicts detected during update');
if you are ok with above change, the attached patch has the change for the same.
Regards,
Vignesh
Attachments:
Update_expect_test_message.patchtext/x-patch; charset=US-ASCII; name=Update_expect_test_message.patchDownload
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index f9778db7cc9..3a4d44e1d0e 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -76,7 +76,7 @@ $node_subscriber->wait_for_log(
.*Key \(c\)=\(4\); existing local tuple \(4, 4, 4\); remote tuple \(2, 3, 4\)./,
$log_offset);
-pass('multiple_unique_conflicts detected during update');
+pass('multiple_unique_conflicts detected during insert');
# Truncate table to get rid of the error
$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
@@ -108,6 +108,6 @@ $node_subscriber->wait_for_log(
.*Key \(c\)=\(8\); existing local tuple \(8, 8, 8\); remote tuple \(6, 7, 8\)./,
$log_offset);
-pass('multiple_unique_conflicts detected during insert');
+pass('multiple_unique_conflicts detected during update');
done_testing();
On Tue, Mar 25, 2025 at 8:51 AM vignesh C <vignesh21@gmail.com> wrote:
On Fri, 21 Mar 2025 at 16:44, Nisha Moond <nisha.moond412@gmail.com> wrote:
On Fri, Mar 21, 2025 at 3:38 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Mar 21, 2025 at 1:48 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:On Fri, Mar 21, 2025 at 12:50 PM Nisha Moond wrote:
Thanks, Hou-san, for the review and fix patches. I’ve incorporated
your suggestions.
Attached are the v7 patches, including patch 002, which implements
stats collection for 'multiple_unique_conflicts' in pg_stat_subscription_stats.Thanks for updating the patches.
Here are some more comments:
1.
The comments atop of ReportApplyConflict() should be updated to reflect the
updates made to the input parameters.Right, I also noticed this and updated it in the attached. Apart from
this, I have made a number of cosmetic changes in the attached. Please
review and include it in the next version unless you see any problem
with it.Thanks for the review.
I’ve incorporated the changes and also addressed Hou-san’s comments
from [1] with suggested changes.Attached are the v8 patches.
Shouldn't these statements be the other way:
pass('multiple_unique_conflicts detected during update');
should have been:
pass('multiple_unique_conflicts detected during insert');and
pass('multiple_unique_conflicts detected during insert');
should have been:
pass('multiple_unique_conflicts detected during update');if you are ok with above change, the attached patch has the change for the same.
LGTM and thanks for the report. I'll push this change.
--
With Regards,
Amit Kapila.
Hi Nisha,
I saw this patch was already pushed [1]https://github.com/postgres/postgres/commit/73eba5004a06a744b6b8570e42432b9e9f75997b, but there was one thing I
never quite understood about this feature, and I didn't find the
answer in the thread posts above.
My question: Why is there only a single new conflict type being added here?
e.g.
Conflict due to INSERT
- single conflict ==> 'insert_exists'
- multiple conflicts ==> 'multiple_unique_conflicts'
Conflict due to UPDATE
- single conflict ==> 'update_exists'
- multiple conflicts ==> 'multiple_unique_conflicts'
My point is, if it is deemed useful for a user to know if a *single*
conflict was caused by an INSERT or by an UPDATE, then why is it not
equally useful to know if *multiple* conflicts were caused by an
INSERT or by an UPDATE?
In other words, instead of just 'multiple_unique_conflicts', why
wasn't this new conflict type split into two, something like
'insert_multiple_conflicts' and 'update_multiple_conflicts'?
======
[1]: https://github.com/postgres/postgres/commit/73eba5004a06a744b6b8570e42432b9e9f75997b
Kind Regards,
Peter Smith.
Fujitsu Australia
On Fri, Apr 4, 2025 at 10:41 AM Peter Smith <smithpb2250@gmail.com> wrote:
My point is, if it is deemed useful for a user to know if a *single*
conflict was caused by an INSERT or by an UPDATE, then why is it not
equally useful to know if *multiple* conflicts were caused by an
INSERT or by an UPDATE?
The reason is that users can resolve single insert_exists or
update_exists by using last_update_wins kind of resolution strategy
either manually or automatically (in the future, after we get that
patch committed). The same may not be true for multiple rows case or
at least it won't be as simple the case as for single row, one may
need to consider removing multiple rows which can lead to data
inconsistency, so we are planning an ERROR as the default resolution
startegy. This could be the reason that even BDR has a single
conflict_type for this case [1]https://www.enterprisedb.com/docs/pgd/4/bdr/conflicts/#insert-operations-that-violate-multiple-unique-constraints[2]https://www.enterprisedb.com/docs/pgd/4/bdr/conflicts/#update-operations-that-violate-multiple-unique-constraints. I don't deny the possibility that
in the future, one comes up with a case where separate conflict types
for these makes sense, and at that time, we can consider adding more
conflict types, but for now, there is no solid case for it that is why
we kept a single conflict type.
[1]: https://www.enterprisedb.com/docs/pgd/4/bdr/conflicts/#insert-operations-that-violate-multiple-unique-constraints
[2]: https://www.enterprisedb.com/docs/pgd/4/bdr/conflicts/#update-operations-that-violate-multiple-unique-constraints
--
With Regards,
Amit Kapila.