Collect statistics about conflicts in logical replication

Started by Zhijie Hou (Fujitsu)over 1 year ago27 messages
#1Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
1 attachment(s)

Hi hackers,
Cc people involved in the related work.

In the original conflict resolution thread[1]/messages/by-id/CAA4eK1LgPyzPr_Vrvvr4syrde4hyT=QQnGjdRUNP-tz3eYa=GQ@mail.gmail.com, we have decided to split the
conflict resolution work into multiple patches to facilitate incremental
progress towards supporting conflict resolution in logical replication, and one
of the work is statistics collection for the conflicts.

Following the discussions in the conflict resolution thread, the collection of
logical replication conflicts is important independently, which can help user
understand conflict stats (e.g., conflict rates) and potentially identify
portions of the application and other parts of the system to optimize. So, I am
starting a new thread for this feature.

The idea is to add columns(insert_exists_count, update_differ_count,
update_exists_count, update_missing_count, delete_differ_count,
delete_missing_count) in view pg_stat_subscription_stats to shows information
about the conflict which occur during the application of logical replication
changes. The conflict types originate from the committed work which is to
report additional information for each conflict in logical replication.

The patch for this feature is attached.

Suggestions and comments are highly appreciated.

[1]: /messages/by-id/CAA4eK1LgPyzPr_Vrvvr4syrde4hyT=QQnGjdRUNP-tz3eYa=GQ@mail.gmail.com

Best Regards,
Hou Zhijie

Attachments:

v1-0001-Collect-statistics-about-conflicts-in-logical-rep.patchapplication/octet-stream; name=v1-0001-Collect-statistics-about-conflicts-in-logical-rep.patchDownload
From 708ae67718f9f919f9f8dec4f455830bfb629e96 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 22 Aug 2024 17:52:50 +0800
Subject: [PATCH v1] Collect statistics about conflicts in logical replication

This commit adds columns in view pg_stat_subscription_stats to show
information about the conflict which occur during the application of
logical replication changes. Currently, the following columns are added.

insert_exists_count:
	Number of times a row insertion violated a NOT DEFERRABLE unique constraint.
update_differ_count:
	Number of times an update was performed on a row that was previously modified by another origin.
update_exists_count:
	Number of times that the updated value of a row violates a NOT DEFERRABLE unique constraint.
update_missing_count:
	Number of times that the tuple to be updated is missing.
delete_differ_count:
	Number of times a delete was performed on a row that was previously modified by another origin.
delete_missing_count:
	Number of times that the tuple to be deleted is missing.

The update_differ and delete_differ conflicts can be detected only when
track_commit_timestamp is enabled.
---
 doc/src/sgml/logical-replication.sgml         |   5 +-
 doc/src/sgml/monitoring.sgml                  |  74 ++++++++-
 src/backend/catalog/system_views.sql          |   6 +
 src/backend/replication/logical/conflict.c    |   5 +-
 .../utils/activity/pgstat_subscription.c      |  17 ++
 src/backend/utils/adt/pgstatfuncs.c           |  33 +++-
 src/include/catalog/pg_proc.dat               |   6 +-
 src/include/pgstat.h                          |   4 +
 src/include/replication/conflict.h            |   7 +
 src/test/regress/expected/rules.out           |   8 +-
 src/test/subscription/t/026_stats.pl          | 145 ++++++++++++++++--
 11 files changed, 283 insertions(+), 27 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index bee7e02983..f3e3641d37 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1585,8 +1585,9 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
   </para>
 
   <para>
-   Additional logging is triggered in the following <firstterm>conflict</firstterm>
-   cases:
+   Additional logging is triggered and the conflict statistics are collected (displayed in the
+   <link linkend="monitoring-pg-stat-subscription-stats"><structname>pg_stat_subscription_stats</structname></link> view)
+   in the following <firstterm>conflict</firstterm> cases:
    <variablelist>
     <varlistentry>
      <term><literal>insert_exists</literal></term>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 55417a6fa9..ea36d46253 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -507,7 +507,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_stats</structname><indexterm><primary>pg_stat_subscription_stats</primary></indexterm></entry>
-      <entry>One row per subscription, showing statistics about errors.
+      <entry>One row per subscription, showing statistics about errors and conflicts.
       See <link linkend="monitoring-pg-stat-subscription-stats">
       <structname>pg_stat_subscription_stats</structname></link> for details.
       </entry>
@@ -2157,7 +2157,9 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        <structfield>apply_error_count</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of times an error occurred while applying changes
+       Number of times an error occurred while applying changes. Note that any
+       conflict resulting in an apply error will be counted in both
+       apply_error_count and the corresponding conflict count.
       </para></entry>
      </row>
 
@@ -2171,6 +2173,74 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>insert_exists_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a row insertion violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint while applying
+       changes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>update_differ_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times an update was performed on a row that was previously
+       modified by another source while applying changes. This conflict is
+       counted only when the
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       option is enabled on the subscriber
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>update_exists_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times that the updated value of a row violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint while applying
+       changes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>update_missing_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times that the tuple to be updated was not found while applying
+       changes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>delete_differ_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a delete was performed on a row that was previously
+       modified by another source while applying changes. This conflict is
+       counted only when the
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       option is enabled on the subscriber
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>delete_missing_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times that the tuple to be deleted was not found while applying
+       changes
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 19cabc9a47..fcdd199117 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1365,6 +1365,12 @@ CREATE VIEW pg_stat_subscription_stats AS
         s.subname,
         ss.apply_error_count,
         ss.sync_error_count,
+        ss.insert_exists_count,
+        ss.update_differ_count,
+        ss.update_exists_count,
+        ss.update_missing_count,
+        ss.delete_differ_count,
+        ss.delete_missing_count,
         ss.stats_reset
     FROM pg_subscription as s,
          pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 0bc7959980..02f7892cb2 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -17,8 +17,9 @@
 #include "access/commit_ts.h"
 #include "access/tableam.h"
 #include "executor/executor.h"
+#include "pgstat.h"
 #include "replication/conflict.h"
-#include "replication/logicalrelation.h"
+#include "replication/worker_internal.h"
 #include "storage/lmgr.h"
 #include "utils/lsyscache.h"
 
@@ -114,6 +115,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 	Assert(!OidIsValid(indexoid) ||
 		   CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
 
+	pgstat_report_subscription_conflict(MySubscription->oid, type);
+
 	ereport(elevel,
 			errcode_apply_conflict(type),
 			errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c
index d9af8de658..e06c92727e 100644
--- a/src/backend/utils/activity/pgstat_subscription.c
+++ b/src/backend/utils/activity/pgstat_subscription.c
@@ -39,6 +39,21 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error)
 		pending->sync_error_count++;
 }
 
+/*
+ * Report a subscription conflict.
+ */
+void
+pgstat_report_subscription_conflict(Oid subid, ConflictType type)
+{
+	PgStat_EntryRef *entry_ref;
+	PgStat_BackendSubEntry *pending;
+
+	entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
+										  InvalidOid, subid, NULL);
+	pending = entry_ref->pending;
+	pending->conflict_count[type]++;
+}
+
 /*
  * Report creating the subscription.
  */
@@ -101,6 +116,8 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 #define SUB_ACC(fld) shsubent->stats.fld += localent->fld
 	SUB_ACC(apply_error_count);
 	SUB_ACC(sync_error_count);
+	for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
+		SUB_ACC(conflict_count[i]);
 #undef SUB_ACC
 
 	pgstat_unlock_entry(entry_ref);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 3221137123..870aee8e7b 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1966,13 +1966,14 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	4
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	10
 	Oid			subid = PG_GETARG_OID(0);
 	TupleDesc	tupdesc;
 	Datum		values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
 	bool		nulls[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
 	PgStat_StatSubEntry *subentry;
 	PgStat_StatSubEntry allzero;
+	int			i = 0;
 
 	/* Get subscription stats */
 	subentry = pgstat_fetch_stat_subscription(subid);
@@ -1985,7 +1986,19 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "insert_exists_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "update_differ_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "update_exists_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "update_missing_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "delete_differ_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "delete_missing_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -1997,19 +2010,25 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 	}
 
 	/* subid */
-	values[0] = ObjectIdGetDatum(subid);
+	values[i++] = ObjectIdGetDatum(subid);
 
 	/* apply_error_count */
-	values[1] = Int64GetDatum(subentry->apply_error_count);
+	values[i++] = Int64GetDatum(subentry->apply_error_count);
 
 	/* sync_error_count */
-	values[2] = Int64GetDatum(subentry->sync_error_count);
+	values[i++] = Int64GetDatum(subentry->sync_error_count);
+
+	/* conflict count */
+	for (int nconflict = 0; nconflict < CONFLICT_NUM_TYPES; nconflict++)
+		values[i++] = Int64GetDatum(subentry->conflict_count[nconflict]);
 
 	/* stats_reset */
 	if (subentry->stat_reset_timestamp == 0)
-		nulls[3] = true;
+		nulls[i] = true;
 	else
-		values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+		values[i] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+
+	Assert(i + 1 == PG_STAT_GET_SUBSCRIPTION_STATS_COLS);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4abc6d9526..3d5c2957c9 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5538,9 +5538,9 @@
 { oid => '6231', descr => 'statistics: information about subscription stats',
   proname => 'pg_stat_get_subscription_stats', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o}',
-  proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}',
+  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,apply_error_count,sync_error_count,insert_exists_count,update_differ_count,update_exists_count,update_missing_count,delete_differ_count,delete_missing_count,stats_reset}',
   prosrc => 'pg_stat_get_subscription_stats' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f63159c55c..adb91f5ab2 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -15,6 +15,7 @@
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
+#include "replication/conflict.h"
 #include "utils/backend_progress.h" /* for backward compatibility */
 #include "utils/backend_status.h"	/* for backward compatibility */
 #include "utils/relcache.h"
@@ -165,6 +166,7 @@ typedef struct PgStat_BackendSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 } PgStat_BackendSubEntry;
 
 /* ----------
@@ -423,6 +425,7 @@ typedef struct PgStat_StatSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatSubEntry;
 
@@ -725,6 +728,7 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void);
  */
 
 extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
+extern void pgstat_report_subscription_conflict(Oid subid, ConflictType conflict);
 extern void pgstat_create_subscription(Oid subid);
 extern void pgstat_drop_subscription(Oid subid);
 extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 02cb84da7e..7232c8889b 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -14,6 +14,11 @@
 
 /*
  * Conflict types that could occur while applying remote changes.
+ *
+ * This enum is used in statistics collection (see
+ * PgStat_StatSubEntry::conflict_count) as well, therefore, when adding new
+ * values or reordering existing ones, ensure to review and potentially adjust
+ * the corresponding statistics collection codes.
  */
 typedef enum
 {
@@ -42,6 +47,8 @@ typedef enum
 	 */
 } ConflictType;
 
+#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+
 extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
 									TransactionId *xmin,
 									RepOriginId *localorigin,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 862433ee52..1985d2ffad 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2139,9 +2139,15 @@ pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
     ss.sync_error_count,
+    ss.insert_exists_count,
+    ss.update_differ_count,
+    ss.update_exists_count,
+    ss.update_missing_count,
+    ss.delete_differ_count,
+    ss.delete_missing_count,
     ss.stats_reset
    FROM pg_subscription s,
-    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset);
+    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, insert_exists_count, update_differ_count, update_exists_count, update_missing_count, delete_differ_count, delete_missing_count, stats_reset);
 pg_stat_sys_indexes| SELECT relid,
     indexrelid,
     schemaname,
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index fb3e5629b3..47735282a9 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -16,6 +16,15 @@ $node_publisher->start;
 # Create subscriber node.
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
+
+# Enable track_commit_timestamp to detect origin-differ conflicts in logical
+# replication. Reduce wal_retrieve_retry_interval to 1ms to accelerate the
+# restart of the logical replication worker after encountering a conflict.
+$node_subscriber->append_conf(
+	'postgresql.conf', q{
+track_commit_timestamp = on
+wal_retrieve_retry_interval = 1ms
+});
 $node_subscriber->start;
 
 
@@ -30,6 +39,7 @@ sub create_sub_pub_w_errors
 		qq[
 	BEGIN;
 	CREATE TABLE $table_name(a int);
+	ALTER TABLE $table_name REPLICA IDENTITY FULL;
 	INSERT INTO $table_name VALUES (1);
 	COMMIT;
 	]);
@@ -95,7 +105,7 @@ sub create_sub_pub_w_errors
 	$node_subscriber->poll_query_until(
 		$db,
 		qq[
-	SELECT apply_error_count > 0
+	SELECT apply_error_count > 0 AND insert_exists_count > 0
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub_name'
 	])
@@ -105,6 +115,89 @@ sub create_sub_pub_w_errors
 	# Truncate test table so that apply worker can continue.
 	$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
 
+	# Insert a row on the subscriber.
+	$node_subscriber->safe_psql($db, qq(INSERT INTO $table_name VALUES (2)));
+
+   # Update the test table on the publisher. This operation will raise an
+   # error on the subscriber due to a violation of the unique constraint on
+   # the test table.
+	$node_publisher->safe_psql($db, qq(UPDATE $table_name SET a = 2;));
+
+	# Wait for the subscriber to report an update_exists conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT update_exists_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_exists conflict for subscription '$sub_name');
+
+	# Truncate test table to ensure the upcoming update operation is skipped
+	# and the test can continue.
+	$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
+
+	# Delete data from the test table on the publisher. This delete operation
+	# should be skipped on the subscriber since the table is already empty.
+	$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+	# Wait for the subscriber to report tuple missing conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT update_missing_count > 0 AND delete_missing_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for tuple missing conflict for subscription '$sub_name');
+
+	# Prepare data for further tests.
+	$node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1)));
+	$node_publisher->wait_for_catchup($sub_name);
+	$node_subscriber->safe_psql($db, qq(
+		TRUNCATE $table_name;
+		INSERT INTO $table_name VALUES (1);
+	));
+
+	# Update the data in the test table on the publisher. This should generate
+	# a conflict because it attempts to update a row on the subscriber that has
+	# been modified by a different origin.
+	$node_publisher->safe_psql($db, qq(UPDATE $table_name SET a = 2;));
+
+	# Wait for the subscriber to report an update_differ conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT update_differ_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_differ conflict for subscription '$sub_name');
+
+	# Prepare data for further tests.
+	$node_subscriber->safe_psql($db, qq(
+		TRUNCATE $table_name;
+		INSERT INTO $table_name VALUES (2);
+	));
+
+	# Delete data from the test table on the publisher. This should generate a
+	# conflict because it attempts to delete a row on the subscriber that has
+	# been modified by a different origin.
+	$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT delete_differ_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for delete_differ conflict for subscription '$sub_name');
+
 	return ($pub_name, $sub_name);
 }
 
@@ -128,12 +221,18 @@ is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
 	sync_error_count > 0,
+	insert_exists_count > 0,
+	update_differ_count > 0,
+	update_exists_count > 0,
+	update_missing_count > 0,
+	delete_differ_count > 0,
+	delete_missing_count > 0,
 	stats_reset IS NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Check that apply errors and sync errors are both > 0 and stats_reset is NULL for subscription '$sub1_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Check that apply errors, sync errors, and conflicts are both > 0 and stats_reset is NULL for subscription '$sub1_name'.)
 );
 
 # Reset a single subscription
@@ -146,12 +245,18 @@ is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	insert_exists_count = 0,
+	update_differ_count = 0,
+	update_exists_count = 0,
+	update_missing_count = 0,
+	delete_differ_count = 0,
+	delete_missing_count = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
 );
 
 # Get reset timestamp
@@ -186,12 +291,18 @@ is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
 	sync_error_count > 0,
+	insert_exists_count > 0,
+	update_differ_count > 0,
+	update_exists_count > 0,
+	update_missing_count > 0,
+	delete_differ_count > 0,
+	delete_missing_count > 0,
 	stats_reset IS NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub2_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both > 0 and stats_reset is NULL for sub '$sub2_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are both > 0 and stats_reset is NULL for sub '$sub2_name'.)
 );
 
 # Reset all subscriptions
@@ -203,24 +314,36 @@ is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	insert_exists_count = 0,
+	update_differ_count = 0,
+	update_exists_count = 0,
+	update_missing_count = 0,
+	delete_differ_count = 0,
+	delete_missing_count = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
 );
 
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	insert_exists_count = 0,
+	update_differ_count = 0,
+	update_exists_count = 0,
+	update_missing_count = 0,
+	delete_differ_count = 0,
+	delete_missing_count = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub2_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
 );
 
 $reset_time1 = $node_subscriber->safe_psql($db,
-- 
2.30.0.windows.2

#2Peter Smith
smithpb2250@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#1)
1 attachment(s)
Re: Collect statistics about conflicts in logical replication

Hi Hou-san. Here are some review comments for your patch v1-0001.

======
doc/src/sgml/logical-replication.sgml

nit - added a comma.

======
doc/src/sgml/monitoring.sgml

nit - use <literal> for 'apply_error_count'.
nit - added a period when there are multiple sentences.
nit - adjusted field descriptions using Chat-GPT clarification suggestions

======
src/include/pgstat.h

nit - change the param to 'type' -- ie. same as the implementation calls it

======
src/include/replication/conflict.h

nit - defined 'NUM_CONFLICT_TYPES' inside the enum (I think this way
is often used in other PG source enums)

======
src/test/subscription/t/026_stats.pl

1.
+ # Delete data from the test table on the publisher. This delete operation
+ # should be skipped on the subscriber since the table is already empty.
+ $node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+ # Wait for the subscriber to report tuple missing conflict.
+ $node_subscriber->poll_query_until(
+ $db,
+ qq[
+ SELECT update_missing_count > 0 AND delete_missing_count > 0
+ FROM pg_stat_subscription_stats
+ WHERE subname = '$sub_name'
+ ])
+   or die
+   qq(Timed out while waiting for tuple missing conflict for
subscription '$sub_name');

Can you write a comment to explain why the replicated DELETE is
expected to increment both the 'update_missing_count' and the
'delete_missing_count'?

~
nit - update several "Apply and Sync errors..." comments that did not
mention conflicts
nit - tweak comments wording for update_differ and delete_differ
nit - /both > 0/> 0/
nit - /both 0/0/

======
Kind Regards,
Peter Smith.
Fujitsu Australia

Attachments:

PS_NITPICKS_20240826_CDR_STATS_v1.txttext/plain; charset=US-ASCII; name=PS_NITPICKS_20240826_CDR_STATS_v1.txtDownload
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index f3e3641..f682369 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1585,7 +1585,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
   </para>
 
   <para>
-   Additional logging is triggered and the conflict statistics are collected (displayed in the
+   Additional logging is triggered, and the conflict statistics are collected (displayed in the
    <link linkend="monitoring-pg-stat-subscription-stats"><structname>pg_stat_subscription_stats</structname></link> view)
    in the following <firstterm>conflict</firstterm> cases:
    <variablelist>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index ea36d46..ac3c773 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2159,7 +2159,7 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       <para>
        Number of times an error occurred while applying changes. Note that any
        conflict resulting in an apply error will be counted in both
-       apply_error_count and the corresponding conflict count.
+       <literal>apply_error_count</literal> and the corresponding conflict count.
       </para></entry>
      </row>
 
@@ -2179,8 +2179,8 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para>
       <para>
        Number of times a row insertion violated a
-       <literal>NOT DEFERRABLE</literal> unique constraint while applying
-       changes
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes
       </para></entry>
      </row>
 
@@ -2189,11 +2189,11 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        <structfield>update_differ_count</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of times an update was performed on a row that was previously
-       modified by another source while applying changes. This conflict is
+       Number of times an update was applied to a row that had been previously
+       modified by another source during the application of changes. This conflict is
        counted only when the
        <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
-       option is enabled on the subscriber
+       option is enabled on the subscriber.
       </para></entry>
      </row>
 
@@ -2202,9 +2202,9 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        <structfield>update_exists_count</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of times that the updated value of a row violated a
-       <literal>NOT DEFERRABLE</literal> unique constraint while applying
-       changes
+       Number of times that an updated row value violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes
       </para></entry>
      </row>
 
@@ -2213,8 +2213,8 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        <structfield>update_missing_count</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of times that the tuple to be updated was not found while applying
-       changes
+       Number of times the tuple to be updated was not found during the
+       application of changes
       </para></entry>
      </row>
 
@@ -2223,11 +2223,11 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        <structfield>delete_differ_count</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of times a delete was performed on a row that was previously
-       modified by another source while applying changes. This conflict is
-       counted only when the
+       Number of times a delete operation was applied to row that had been
+       previously modified by another source during the application of changes.
+       This conflict is counted only when the
        <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
-       option is enabled on the subscriber
+       option is enabled on the subscriber.
       </para></entry>
      </row>
 
@@ -2236,8 +2236,8 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        <structfield>delete_missing_count</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of times that the tuple to be deleted was not found while applying
-       changes
+       Number of times the tuple to be deleted was not found during the application
+       of changes
       </para></entry>
      </row>
 
diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c
index e06c927..ebb0135 100644
--- a/src/backend/utils/activity/pgstat_subscription.c
+++ b/src/backend/utils/activity/pgstat_subscription.c
@@ -116,7 +116,7 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 #define SUB_ACC(fld) shsubent->stats.fld += localent->fld
 	SUB_ACC(apply_error_count);
 	SUB_ACC(sync_error_count);
-	for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
+	for (int i = 0; i < NUM_CONFLICT_TYPES; i++)
 		SUB_ACC(conflict_count[i]);
 #undef SUB_ACC
 
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 870aee8..7dd4b1e 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2019,7 +2019,7 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 	values[i++] = Int64GetDatum(subentry->sync_error_count);
 
 	/* conflict count */
-	for (int nconflict = 0; nconflict < CONFLICT_NUM_TYPES; nconflict++)
+	for (int nconflict = 0; nconflict < NUM_CONFLICT_TYPES; nconflict++)
 		values[i++] = Int64GetDatum(subentry->conflict_count[nconflict]);
 
 	/* stats_reset */
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index adb91f5..91b54aa 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -166,7 +166,7 @@ typedef struct PgStat_BackendSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
-	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
+	PgStat_Counter conflict_count[NUM_CONFLICT_TYPES];
 } PgStat_BackendSubEntry;
 
 /* ----------
@@ -425,7 +425,7 @@ typedef struct PgStat_StatSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
-	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
+	PgStat_Counter conflict_count[NUM_CONFLICT_TYPES];
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatSubEntry;
 
@@ -728,7 +728,7 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void);
  */
 
 extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
-extern void pgstat_report_subscription_conflict(Oid subid, ConflictType conflict);
+extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type);
 extern void pgstat_create_subscription(Oid subid);
 extern void pgstat_drop_subscription(Oid subid);
 extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 7232c88..37c9100 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -45,9 +45,9 @@ typedef enum
 	 * complex rules than simple equality checks. These conflicts are left for
 	 * future improvements.
 	 */
-} ConflictType;
 
-#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+	NUM_CONFLICT_TYPES	/* must be last */
+} ConflictType;
 
 extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
 									TransactionId *xmin,
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index 4773528..e22b0b1 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -162,7 +162,7 @@ sub create_sub_pub_w_errors
 	));
 
 	# Update the data in the test table on the publisher. This should generate
-	# a conflict because it attempts to update a row on the subscriber that has
+	# a conflict because it causes subscriber to attempt to update a row that has
 	# been modified by a different origin.
 	$node_publisher->safe_psql($db, qq(UPDATE $table_name SET a = 2;));
 
@@ -184,7 +184,7 @@ sub create_sub_pub_w_errors
 	));
 
 	# Delete data from the test table on the publisher. This should generate a
-	# conflict because it attempts to delete a row on the subscriber that has
+	# conflict because it causes subscriber to attempt to delete a row that has
 	# been modified by a different origin.
 	$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
 
@@ -216,7 +216,7 @@ my ($pub1_name, $sub1_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table1_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
@@ -232,7 +232,7 @@ is( $node_subscriber->safe_psql(
 	WHERE subname = '$sub1_name')
 	),
 	qq(t|t|t|t|t|t|t|t|t),
-	qq(Check that apply errors, sync errors, and conflicts are both > 0 and stats_reset is NULL for subscription '$sub1_name'.)
+	qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.)
 );
 
 # Reset a single subscription
@@ -240,7 +240,7 @@ $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
 );
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats reset is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
@@ -256,7 +256,7 @@ is( $node_subscriber->safe_psql(
 	WHERE subname = '$sub1_name')
 	),
 	qq(t|t|t|t|t|t|t|t|t),
-	qq(Confirm that apply errors, sync errors, and conflicts are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
 );
 
 # Get reset timestamp
@@ -286,7 +286,7 @@ my ($pub2_name, $sub2_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table2_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
@@ -302,14 +302,14 @@ is( $node_subscriber->safe_psql(
 	WHERE subname = '$sub2_name')
 	),
 	qq(t|t|t|t|t|t|t|t|t),
-	qq(Confirm that apply errors, sync errors, and conflicts are both > 0 and stats_reset is NULL for sub '$sub2_name'.)
+	qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.)
 );
 
 # Reset all subscriptions
 $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats(NULL)));
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats reset is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
@@ -325,7 +325,7 @@ is( $node_subscriber->safe_psql(
 	WHERE subname = '$sub1_name')
 	),
 	qq(t|t|t|t|t|t|t|t|t),
-	qq(Confirm that apply errors, sync errors, and conflicts are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
 );
 
 is( $node_subscriber->safe_psql(
@@ -343,7 +343,7 @@ is( $node_subscriber->safe_psql(
 	WHERE subname = '$sub2_name')
 	),
 	qq(t|t|t|t|t|t|t|t|t),
-	qq(Confirm that apply errors, sync errors, and conflicts are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
 );
 
 $reset_time1 = $node_subscriber->safe_psql($db,
#3Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: Peter Smith (#2)
1 attachment(s)
RE: Collect statistics about conflicts in logical replication

On Monday, August 26, 2024 3:30 PM Peter Smith <smithpb2250@gmail.com> wrote:

======
src/include/replication/conflict.h

nit - defined 'NUM_CONFLICT_TYPES' inside the enum (I think this way is
often used in other PG source enums)

I think we have recently tended to avoid doing that, as it has been commented
that this style is somewhat deceptive and can cause confusion. See a previous
similar comment[1]/messages/by-id/202201130922.izanq4hkkqnx@alvherre.pgsql. The current style follows the other existing examples like:

#define IOOBJECT_NUM_TYPES (IOOBJECT_TEMP_RELATION + 1)
#define IOCONTEXT_NUM_TYPES (IOCONTEXT_VACUUM + 1)
#define IOOP_NUM_TYPES (IOOP_WRITEBACK + 1)
#define BACKEND_NUM_TYPES (B_LOGGER + 1)
...

======
src/test/subscription/t/026_stats.pl

1.
+ # Delete data from the test table on the publisher. This delete
+ operation # should be skipped on the subscriber since the table is already
empty.
+ $node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+ # Wait for the subscriber to report tuple missing conflict.
+ $node_subscriber->poll_query_until(
+ $db,
+ qq[
+ SELECT update_missing_count > 0 AND delete_missing_count > 0 FROM
+ pg_stat_subscription_stats WHERE subname = '$sub_name'
+ ])
+   or die
+   qq(Timed out while waiting for tuple missing conflict for
subscription '$sub_name');

Can you write a comment to explain why the replicated DELETE is
expected to increment both the 'update_missing_count' and the
'delete_missing_count'?

I think the comments several lines above the wait explained the reason[2].. # Truncate test table to ensure the upcoming update operation is skipped # and the test can continue. $node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));. I
slightly modified the comments to make it clear.

Other changes look good to me and have been merged, thanks!

Here is the V2 patch.

[1]: /messages/by-id/202201130922.izanq4hkkqnx@alvherre.pgsql

[2]: .. # Truncate test table to ensure the upcoming update operation is skipped # and the test can continue. $node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
..
# Truncate test table to ensure the upcoming update operation is skipped
# and the test can continue.
$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));

Best Regards,
Hou zj

Attachments:

v2-0001-Collect-statistics-about-conflicts-in-logical-rep.patchapplication/octet-stream; name=v2-0001-Collect-statistics-about-conflicts-in-logical-rep.patchDownload
From 09ce142dccb18d27459aa3042432a4870cb63599 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 22 Aug 2024 17:52:50 +0800
Subject: [PATCH v2] Collect statistics about conflicts in logical replication

This commit adds columns in view pg_stat_subscription_stats to show
information about the conflict which occur during the application of
logical replication changes. Currently, the following columns are added.

insert_exists_count:
	Number of times a row insertion violated a NOT DEFERRABLE unique constraint.
update_differ_count:
	Number of times an update was performed on a row that was previously modified by another origin.
update_exists_count:
	Number of times that the updated value of a row violates a NOT DEFERRABLE unique constraint.
update_missing_count:
	Number of times that the tuple to be updated is missing.
delete_differ_count:
	Number of times a delete was performed on a row that was previously modified by another origin.
delete_missing_count:
	Number of times that the tuple to be deleted is missing.

The update_differ and delete_differ conflicts can be detected only when
track_commit_timestamp is enabled.
---
 doc/src/sgml/logical-replication.sgml         |   5 +-
 doc/src/sgml/monitoring.sgml                  |  74 ++++++++-
 src/backend/catalog/system_views.sql          |   6 +
 src/backend/replication/logical/conflict.c    |   5 +-
 .../utils/activity/pgstat_subscription.c      |  17 ++
 src/backend/utils/adt/pgstatfuncs.c           |  33 +++-
 src/include/catalog/pg_proc.dat               |   6 +-
 src/include/pgstat.h                          |   4 +
 src/include/replication/conflict.h            |   7 +
 src/test/regress/expected/rules.out           |   8 +-
 src/test/subscription/t/026_stats.pl          | 153 ++++++++++++++++--
 11 files changed, 287 insertions(+), 31 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index bee7e02983..f6823694c9 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1585,8 +1585,9 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
   </para>
 
   <para>
-   Additional logging is triggered in the following <firstterm>conflict</firstterm>
-   cases:
+   Additional logging is triggered, and the conflict statistics are collected (displayed in the
+   <link linkend="monitoring-pg-stat-subscription-stats"><structname>pg_stat_subscription_stats</structname></link> view)
+   in the following <firstterm>conflict</firstterm> cases:
    <variablelist>
     <varlistentry>
      <term><literal>insert_exists</literal></term>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 55417a6fa9..ac3c773ea1 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -507,7 +507,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_stats</structname><indexterm><primary>pg_stat_subscription_stats</primary></indexterm></entry>
-      <entry>One row per subscription, showing statistics about errors.
+      <entry>One row per subscription, showing statistics about errors and conflicts.
       See <link linkend="monitoring-pg-stat-subscription-stats">
       <structname>pg_stat_subscription_stats</structname></link> for details.
       </entry>
@@ -2157,7 +2157,9 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        <structfield>apply_error_count</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of times an error occurred while applying changes
+       Number of times an error occurred while applying changes. Note that any
+       conflict resulting in an apply error will be counted in both
+       <literal>apply_error_count</literal> and the corresponding conflict count.
       </para></entry>
      </row>
 
@@ -2171,6 +2173,74 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>insert_exists_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a row insertion violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>update_differ_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times an update was applied to a row that had been previously
+       modified by another source during the application of changes. This conflict is
+       counted only when the
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       option is enabled on the subscriber.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>update_exists_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times that an updated row value violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>update_missing_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be updated was not found during the
+       application of changes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>delete_differ_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a delete operation was applied to row that had been
+       previously modified by another source during the application of changes.
+       This conflict is counted only when the
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       option is enabled on the subscriber.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>delete_missing_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be deleted was not found during the application
+       of changes
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 19cabc9a47..fcdd199117 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1365,6 +1365,12 @@ CREATE VIEW pg_stat_subscription_stats AS
         s.subname,
         ss.apply_error_count,
         ss.sync_error_count,
+        ss.insert_exists_count,
+        ss.update_differ_count,
+        ss.update_exists_count,
+        ss.update_missing_count,
+        ss.delete_differ_count,
+        ss.delete_missing_count,
         ss.stats_reset
     FROM pg_subscription as s,
          pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 0bc7959980..02f7892cb2 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -17,8 +17,9 @@
 #include "access/commit_ts.h"
 #include "access/tableam.h"
 #include "executor/executor.h"
+#include "pgstat.h"
 #include "replication/conflict.h"
-#include "replication/logicalrelation.h"
+#include "replication/worker_internal.h"
 #include "storage/lmgr.h"
 #include "utils/lsyscache.h"
 
@@ -114,6 +115,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 	Assert(!OidIsValid(indexoid) ||
 		   CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
 
+	pgstat_report_subscription_conflict(MySubscription->oid, type);
+
 	ereport(elevel,
 			errcode_apply_conflict(type),
 			errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c
index d9af8de658..e06c92727e 100644
--- a/src/backend/utils/activity/pgstat_subscription.c
+++ b/src/backend/utils/activity/pgstat_subscription.c
@@ -39,6 +39,21 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error)
 		pending->sync_error_count++;
 }
 
+/*
+ * Report a subscription conflict.
+ */
+void
+pgstat_report_subscription_conflict(Oid subid, ConflictType type)
+{
+	PgStat_EntryRef *entry_ref;
+	PgStat_BackendSubEntry *pending;
+
+	entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
+										  InvalidOid, subid, NULL);
+	pending = entry_ref->pending;
+	pending->conflict_count[type]++;
+}
+
 /*
  * Report creating the subscription.
  */
@@ -101,6 +116,8 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 #define SUB_ACC(fld) shsubent->stats.fld += localent->fld
 	SUB_ACC(apply_error_count);
 	SUB_ACC(sync_error_count);
+	for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
+		SUB_ACC(conflict_count[i]);
 #undef SUB_ACC
 
 	pgstat_unlock_entry(entry_ref);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 3221137123..870aee8e7b 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1966,13 +1966,14 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	4
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	10
 	Oid			subid = PG_GETARG_OID(0);
 	TupleDesc	tupdesc;
 	Datum		values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
 	bool		nulls[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
 	PgStat_StatSubEntry *subentry;
 	PgStat_StatSubEntry allzero;
+	int			i = 0;
 
 	/* Get subscription stats */
 	subentry = pgstat_fetch_stat_subscription(subid);
@@ -1985,7 +1986,19 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "insert_exists_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "update_differ_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "update_exists_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "update_missing_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "delete_differ_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "delete_missing_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -1997,19 +2010,25 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 	}
 
 	/* subid */
-	values[0] = ObjectIdGetDatum(subid);
+	values[i++] = ObjectIdGetDatum(subid);
 
 	/* apply_error_count */
-	values[1] = Int64GetDatum(subentry->apply_error_count);
+	values[i++] = Int64GetDatum(subentry->apply_error_count);
 
 	/* sync_error_count */
-	values[2] = Int64GetDatum(subentry->sync_error_count);
+	values[i++] = Int64GetDatum(subentry->sync_error_count);
+
+	/* conflict count */
+	for (int nconflict = 0; nconflict < CONFLICT_NUM_TYPES; nconflict++)
+		values[i++] = Int64GetDatum(subentry->conflict_count[nconflict]);
 
 	/* stats_reset */
 	if (subentry->stat_reset_timestamp == 0)
-		nulls[3] = true;
+		nulls[i] = true;
 	else
-		values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+		values[i] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+
+	Assert(i + 1 == PG_STAT_GET_SUBSCRIPTION_STATS_COLS);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4abc6d9526..3d5c2957c9 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5538,9 +5538,9 @@
 { oid => '6231', descr => 'statistics: information about subscription stats',
   proname => 'pg_stat_get_subscription_stats', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o}',
-  proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}',
+  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,apply_error_count,sync_error_count,insert_exists_count,update_differ_count,update_exists_count,update_missing_count,delete_differ_count,delete_missing_count,stats_reset}',
   prosrc => 'pg_stat_get_subscription_stats' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f63159c55c..be2c91168a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -15,6 +15,7 @@
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
+#include "replication/conflict.h"
 #include "utils/backend_progress.h" /* for backward compatibility */
 #include "utils/backend_status.h"	/* for backward compatibility */
 #include "utils/relcache.h"
@@ -165,6 +166,7 @@ typedef struct PgStat_BackendSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 } PgStat_BackendSubEntry;
 
 /* ----------
@@ -423,6 +425,7 @@ typedef struct PgStat_StatSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatSubEntry;
 
@@ -725,6 +728,7 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void);
  */
 
 extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
+extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type);
 extern void pgstat_create_subscription(Oid subid);
 extern void pgstat_drop_subscription(Oid subid);
 extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 02cb84da7e..7232c8889b 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -14,6 +14,11 @@
 
 /*
  * Conflict types that could occur while applying remote changes.
+ *
+ * This enum is used in statistics collection (see
+ * PgStat_StatSubEntry::conflict_count) as well, therefore, when adding new
+ * values or reordering existing ones, ensure to review and potentially adjust
+ * the corresponding statistics collection codes.
  */
 typedef enum
 {
@@ -42,6 +47,8 @@ typedef enum
 	 */
 } ConflictType;
 
+#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+
 extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
 									TransactionId *xmin,
 									RepOriginId *localorigin,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 862433ee52..1985d2ffad 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2139,9 +2139,15 @@ pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
     ss.sync_error_count,
+    ss.insert_exists_count,
+    ss.update_differ_count,
+    ss.update_exists_count,
+    ss.update_missing_count,
+    ss.delete_differ_count,
+    ss.delete_missing_count,
     ss.stats_reset
    FROM pg_subscription s,
-    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset);
+    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, insert_exists_count, update_differ_count, update_exists_count, update_missing_count, delete_differ_count, delete_missing_count, stats_reset);
 pg_stat_sys_indexes| SELECT relid,
     indexrelid,
     schemaname,
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index fb3e5629b3..0df31a60b1 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -16,6 +16,15 @@ $node_publisher->start;
 # Create subscriber node.
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
+
+# Enable track_commit_timestamp to detect origin-differ conflicts in logical
+# replication. Reduce wal_retrieve_retry_interval to 1ms to accelerate the
+# restart of the logical replication worker after encountering a conflict.
+$node_subscriber->append_conf(
+	'postgresql.conf', q{
+track_commit_timestamp = on
+wal_retrieve_retry_interval = 1ms
+});
 $node_subscriber->start;
 
 
@@ -30,6 +39,7 @@ sub create_sub_pub_w_errors
 		qq[
 	BEGIN;
 	CREATE TABLE $table_name(a int);
+	ALTER TABLE $table_name REPLICA IDENTITY FULL;
 	INSERT INTO $table_name VALUES (1);
 	COMMIT;
 	]);
@@ -95,7 +105,7 @@ sub create_sub_pub_w_errors
 	$node_subscriber->poll_query_until(
 		$db,
 		qq[
-	SELECT apply_error_count > 0
+	SELECT apply_error_count > 0 AND insert_exists_count > 0
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub_name'
 	])
@@ -105,6 +115,89 @@ sub create_sub_pub_w_errors
 	# Truncate test table so that apply worker can continue.
 	$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
 
+	# Insert a row on the subscriber.
+	$node_subscriber->safe_psql($db, qq(INSERT INTO $table_name VALUES (2)));
+
+    # Update the test table on the publisher. This operation will raise an
+    # error on the subscriber due to a violation of the unique constraint on
+    # the test table.
+	$node_publisher->safe_psql($db, qq(UPDATE $table_name SET a = 2;));
+
+	# Wait for the subscriber to report an update_exists conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT update_exists_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_exists conflict for subscription '$sub_name');
+
+	# Truncate the test table to ensure that the conflicting update operations
+	# are skipped, allowing the test to continue.
+	$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
+
+	# Delete data from the test table on the publisher. This delete operation
+	# should be skipped on the subscriber since the table is already empty.
+	$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+	# Wait for the subscriber to report tuple missing conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT update_missing_count > 0 AND delete_missing_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for tuple missing conflict for subscription '$sub_name');
+
+	# Prepare data for further tests.
+	$node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1)));
+	$node_publisher->wait_for_catchup($sub_name);
+	$node_subscriber->safe_psql($db, qq(
+		TRUNCATE $table_name;
+		INSERT INTO $table_name VALUES (1);
+	));
+
+	# Update the data in the test table on the publisher. This should generate
+	# a conflict because it causes subscriber to attempt to update a row that has
+	# been modified by a different origin.
+	$node_publisher->safe_psql($db, qq(UPDATE $table_name SET a = 2;));
+
+	# Wait for the subscriber to report an update_differ conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT update_differ_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_differ conflict for subscription '$sub_name');
+
+	# Prepare data for further tests.
+	$node_subscriber->safe_psql($db, qq(
+		TRUNCATE $table_name;
+		INSERT INTO $table_name VALUES (2);
+	));
+
+	# Delete data from the test table on the publisher. This should generate a
+	# conflict because it causes subscriber to attempt to delete a row that has
+	# been modified by a different origin.
+	$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT delete_differ_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for delete_differ conflict for subscription '$sub_name');
+
 	return ($pub_name, $sub_name);
 }
 
@@ -123,17 +216,23 @@ my ($pub1_name, $sub1_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table1_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
 	sync_error_count > 0,
+	insert_exists_count > 0,
+	update_differ_count > 0,
+	update_exists_count > 0,
+	update_missing_count > 0,
+	delete_differ_count > 0,
+	delete_missing_count > 0,
 	stats_reset IS NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Check that apply errors and sync errors are both > 0 and stats_reset is NULL for subscription '$sub1_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.)
 );
 
 # Reset a single subscription
@@ -141,17 +240,23 @@ $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
 );
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats reset is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	insert_exists_count = 0,
+	update_differ_count = 0,
+	update_exists_count = 0,
+	update_missing_count = 0,
+	delete_differ_count = 0,
+	delete_missing_count = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
 );
 
 # Get reset timestamp
@@ -181,46 +286,64 @@ my ($pub2_name, $sub2_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table2_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
 	sync_error_count > 0,
+	insert_exists_count > 0,
+	update_differ_count > 0,
+	update_exists_count > 0,
+	update_missing_count > 0,
+	delete_differ_count > 0,
+	delete_missing_count > 0,
 	stats_reset IS NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub2_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both > 0 and stats_reset is NULL for sub '$sub2_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.)
 );
 
 # Reset all subscriptions
 $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats(NULL)));
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats reset is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	insert_exists_count = 0,
+	update_differ_count = 0,
+	update_exists_count = 0,
+	update_missing_count = 0,
+	delete_differ_count = 0,
+	delete_missing_count = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
 );
 
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	insert_exists_count = 0,
+	update_differ_count = 0,
+	update_exists_count = 0,
+	update_missing_count = 0,
+	delete_differ_count = 0,
+	delete_missing_count = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub2_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
 );
 
 $reset_time1 = $node_subscriber->safe_psql($db,
-- 
2.30.0.windows.2

#4Peter Smith
smithpb2250@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#3)
1 attachment(s)
Re: Collect statistics about conflicts in logical replication

On Mon, Aug 26, 2024 at 10:13 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

On Monday, August 26, 2024 3:30 PM Peter Smith <smithpb2250@gmail.com> wrote:

======
src/include/replication/conflict.h

nit - defined 'NUM_CONFLICT_TYPES' inside the enum (I think this way is
often used in other PG source enums)

I think we have recently tended to avoid doing that, as it has been commented
that this style is somewhat deceptive and can cause confusion. See a previous
similar comment[1]. The current style follows the other existing examples like:

#define IOOBJECT_NUM_TYPES (IOOBJECT_TEMP_RELATION + 1)
#define IOCONTEXT_NUM_TYPES (IOCONTEXT_VACUUM + 1)
#define IOOP_NUM_TYPES (IOOP_WRITEBACK + 1)
#define BACKEND_NUM_TYPES (B_LOGGER + 1)

OK.

======
src/test/subscription/t/026_stats.pl

1.
+ # Delete data from the test table on the publisher. This delete
+ operation # should be skipped on the subscriber since the table is already
empty.
+ $node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+ # Wait for the subscriber to report tuple missing conflict.
+ $node_subscriber->poll_query_until(
+ $db,
+ qq[
+ SELECT update_missing_count > 0 AND delete_missing_count > 0 FROM
+ pg_stat_subscription_stats WHERE subname = '$sub_name'
+ ])
+   or die
+   qq(Timed out while waiting for tuple missing conflict for
subscription '$sub_name');

Can you write a comment to explain why the replicated DELETE is
expected to increment both the 'update_missing_count' and the
'delete_missing_count'?

I think the comments several lines above the wait explained the reason[2]. I
slightly modified the comments to make it clear.

1.
Right, but it still was not obvious to me what caused the
'update_missing_count'. On further study, I see it was a hangover from
the earlier UPDATE test case which was still stuck in an ERROR loop
attempting to do the update operation. e.g. before it was giving the
expected 'update_exists' conflicts but after the subscriber table
TRUNCATE the update conflict changes to give a 'update_missing'
conflict instead. I've updated the comment to reflect my
understanding. Please have a look to see if you agree.

~~~~

2.
I separated the tests for 'update_missing' and 'delete_missing',
putting the update_missing test *before* the DELETE. I felt the
expected results were much clearer when each test did just one thing.
Please have a look to see if you agree.

~~~

3.
+# Enable track_commit_timestamp to detect origin-differ conflicts in logical
+# replication. Reduce wal_retrieve_retry_interval to 1ms to accelerate the
+# restart of the logical replication worker after encountering a conflict.
+$node_subscriber->append_conf(
+ 'postgresql.conf', q{
+track_commit_timestamp = on
+wal_retrieve_retry_interval = 1ms
+});

Later, after CDR resolvers are implemented, it might be good to
revisit these conflict test cases and re-write them to use some
conflict resolvers like 'skip'. Then the subscriber won't give ERRORs
and restart apply workers all the time behind the scenes, so you won't
need the above configuration for accelerating the worker restarts. In
other words, running these tests might be more efficient if you can
avoid restarting workers all the time.

I suggest putting an XXX comment here as a reminder that these tests
should be revisited to make use of conflict resolvers in the future.

~~~

nit - not caused by this patch, but other comment inconsistencies
about "stats_reset timestamp" can be fixed in passing too.

======
Kind Regards,
Peter Smith.
Fujitsu Australia

Attachments:

PS_NITPICKS_20240827_CDR_STATS_v2.txttext/plain; charset=US-ASCII; name=PS_NITPICKS_20240827_CDR_STATS_v2.txtDownload
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index 0df31a6..d9589f0 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -134,24 +134,37 @@ sub create_sub_pub_w_errors
 	  or die
 	  qq(Timed out while waiting for update_exists conflict for subscription '$sub_name');
 
-	# Truncate the test table to ensure that the conflicting update operations
-	# are skipped, allowing the test to continue.
-	$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
+    # Truncate the subscriber side test table. Now that the table is empty,
+    # the update conflict ('update_existing') ERRORs will stop happening. A
+    # single update conflict 'update_missing' will be reported, but the update
+    # will be skipped on the subscriber, allowing the test to continue.
+    $node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
+
+	# Wait for the subscriber to report update_missing conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT update_missing_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_missing conflict for subscription '$sub_name');
 
 	# Delete data from the test table on the publisher. This delete operation
 	# should be skipped on the subscriber since the table is already empty.
 	$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
 
-	# Wait for the subscriber to report tuple missing conflict.
+	# Wait for the subscriber to report delete_missing conflict.
 	$node_subscriber->poll_query_until(
 		$db,
 		qq[
-	SELECT update_missing_count > 0 AND delete_missing_count > 0
+	SELECT delete_missing_count > 0
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub_name'
 	])
 	  or die
-	  qq(Timed out while waiting for tuple missing conflict for subscription '$sub_name');
+	  qq(Timed out while waiting for delete_missing conflict for subscription '$sub_name');
 
 	# Prepare data for further tests.
 	$node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1)));
@@ -216,7 +229,7 @@ my ($pub1_name, $sub1_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table1_name);
 
-# Apply errors, sync errors, and conflicts are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
@@ -240,7 +253,7 @@ $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
 );
 
-# Apply errors, sync errors, and conflicts are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
@@ -286,7 +299,7 @@ my ($pub2_name, $sub2_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table2_name);
 
-# Apply errors, sync errors, and conflicts are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
@@ -309,7 +322,7 @@ is( $node_subscriber->safe_psql(
 $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats(NULL)));
 
-# Apply errors, sync errors, and conflicts are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
#5Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: Peter Smith (#4)
1 attachment(s)
RE: Collect statistics about conflicts in logical replication

On Tuesday, August 27, 2024 10:59 AM Peter Smith <smithpb2250@gmail.com> wrote:

~~~

3.
+# Enable track_commit_timestamp to detect origin-differ conflicts in
+logical # replication. Reduce wal_retrieve_retry_interval to 1ms to
+accelerate the # restart of the logical replication worker after encountering a
conflict.
+$node_subscriber->append_conf(
+ 'postgresql.conf', q{
+track_commit_timestamp = on
+wal_retrieve_retry_interval = 1ms
+});

Later, after CDR resolvers are implemented, it might be good to revisit these
conflict test cases and re-write them to use some conflict resolvers like 'skip'.
Then the subscriber won't give ERRORs and restart apply workers all the time
behind the scenes, so you won't need the above configuration for accelerating
the worker restarts. In other words, running these tests might be more efficient
if you can avoid restarting workers all the time.

I suggest putting an XXX comment here as a reminder that these tests should
be revisited to make use of conflict resolvers in the future.

I think it would be too early to mention the resolution implementation detail
in the comments considering that the resolution is still not RFC. Also, I think
reducing wal_retrieve_retry_interval is a reasonable way to speed up the test
in this case because the test is not letting the worker to restart all the time, the
error causes the restart will be resolved immediately after the stats check. So, I
think adding XXX is not very appropriate.

Other comments look good to me.
I slightly adjusted few words and merged the changes. Thanks !

Here is V3 patch.

Best Regards,
Hou zj

Attachments:

v3-0001-Collect-statistics-about-conflicts-in-logical-rep.patchapplication/octet-stream; name=v3-0001-Collect-statistics-about-conflicts-in-logical-rep.patchDownload
From 53e17680f404bc65b1fb63e6244903c6ac1bf4bc Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 22 Aug 2024 17:52:50 +0800
Subject: [PATCH v3] Collect statistics about conflicts in logical replication

This commit adds columns in view pg_stat_subscription_stats to show
information about the conflict which occur during the application of
logical replication changes. Currently, the following columns are added.

insert_exists_count:
	Number of times a row insertion violated a NOT DEFERRABLE unique constraint.
update_differ_count:
	Number of times an update was performed on a row that was previously modified by another origin.
update_exists_count:
	Number of times that the updated value of a row violates a NOT DEFERRABLE unique constraint.
update_missing_count:
	Number of times that the tuple to be updated is missing.
delete_differ_count:
	Number of times a delete was performed on a row that was previously modified by another origin.
delete_missing_count:
	Number of times that the tuple to be deleted is missing.

The update_differ and delete_differ conflicts can be detected only when
track_commit_timestamp is enabled.
---
 doc/src/sgml/logical-replication.sgml         |   5 +-
 doc/src/sgml/monitoring.sgml                  |  74 +++++++-
 src/backend/catalog/system_views.sql          |   6 +
 src/backend/replication/logical/conflict.c    |   5 +-
 .../utils/activity/pgstat_subscription.c      |  17 ++
 src/backend/utils/adt/pgstatfuncs.c           |  33 +++-
 src/include/catalog/pg_proc.dat               |   6 +-
 src/include/pgstat.h                          |   4 +
 src/include/replication/conflict.h            |   7 +
 src/test/regress/expected/rules.out           |   8 +-
 src/test/subscription/t/026_stats.pl          | 166 ++++++++++++++++--
 11 files changed, 300 insertions(+), 31 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index bee7e02983..f6823694c9 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1585,8 +1585,9 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
   </para>
 
   <para>
-   Additional logging is triggered in the following <firstterm>conflict</firstterm>
-   cases:
+   Additional logging is triggered, and the conflict statistics are collected (displayed in the
+   <link linkend="monitoring-pg-stat-subscription-stats"><structname>pg_stat_subscription_stats</structname></link> view)
+   in the following <firstterm>conflict</firstterm> cases:
    <variablelist>
     <varlistentry>
      <term><literal>insert_exists</literal></term>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 55417a6fa9..ac3c773ea1 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -507,7 +507,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_stats</structname><indexterm><primary>pg_stat_subscription_stats</primary></indexterm></entry>
-      <entry>One row per subscription, showing statistics about errors.
+      <entry>One row per subscription, showing statistics about errors and conflicts.
       See <link linkend="monitoring-pg-stat-subscription-stats">
       <structname>pg_stat_subscription_stats</structname></link> for details.
       </entry>
@@ -2157,7 +2157,9 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        <structfield>apply_error_count</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of times an error occurred while applying changes
+       Number of times an error occurred while applying changes. Note that any
+       conflict resulting in an apply error will be counted in both
+       <literal>apply_error_count</literal> and the corresponding conflict count.
       </para></entry>
      </row>
 
@@ -2171,6 +2173,74 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>insert_exists_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a row insertion violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>update_differ_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times an update was applied to a row that had been previously
+       modified by another source during the application of changes. This conflict is
+       counted only when the
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       option is enabled on the subscriber.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>update_exists_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times that an updated row value violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>update_missing_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be updated was not found during the
+       application of changes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>delete_differ_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a delete operation was applied to row that had been
+       previously modified by another source during the application of changes.
+       This conflict is counted only when the
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       option is enabled on the subscriber.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>delete_missing_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be deleted was not found during the application
+       of changes
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 19cabc9a47..fcdd199117 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1365,6 +1365,12 @@ CREATE VIEW pg_stat_subscription_stats AS
         s.subname,
         ss.apply_error_count,
         ss.sync_error_count,
+        ss.insert_exists_count,
+        ss.update_differ_count,
+        ss.update_exists_count,
+        ss.update_missing_count,
+        ss.delete_differ_count,
+        ss.delete_missing_count,
         ss.stats_reset
     FROM pg_subscription as s,
          pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 0bc7959980..02f7892cb2 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -17,8 +17,9 @@
 #include "access/commit_ts.h"
 #include "access/tableam.h"
 #include "executor/executor.h"
+#include "pgstat.h"
 #include "replication/conflict.h"
-#include "replication/logicalrelation.h"
+#include "replication/worker_internal.h"
 #include "storage/lmgr.h"
 #include "utils/lsyscache.h"
 
@@ -114,6 +115,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 	Assert(!OidIsValid(indexoid) ||
 		   CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
 
+	pgstat_report_subscription_conflict(MySubscription->oid, type);
+
 	ereport(elevel,
 			errcode_apply_conflict(type),
 			errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c
index d9af8de658..e06c92727e 100644
--- a/src/backend/utils/activity/pgstat_subscription.c
+++ b/src/backend/utils/activity/pgstat_subscription.c
@@ -39,6 +39,21 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error)
 		pending->sync_error_count++;
 }
 
+/*
+ * Report a subscription conflict.
+ */
+void
+pgstat_report_subscription_conflict(Oid subid, ConflictType type)
+{
+	PgStat_EntryRef *entry_ref;
+	PgStat_BackendSubEntry *pending;
+
+	entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
+										  InvalidOid, subid, NULL);
+	pending = entry_ref->pending;
+	pending->conflict_count[type]++;
+}
+
 /*
  * Report creating the subscription.
  */
@@ -101,6 +116,8 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 #define SUB_ACC(fld) shsubent->stats.fld += localent->fld
 	SUB_ACC(apply_error_count);
 	SUB_ACC(sync_error_count);
+	for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
+		SUB_ACC(conflict_count[i]);
 #undef SUB_ACC
 
 	pgstat_unlock_entry(entry_ref);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 3221137123..870aee8e7b 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1966,13 +1966,14 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	4
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	10
 	Oid			subid = PG_GETARG_OID(0);
 	TupleDesc	tupdesc;
 	Datum		values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
 	bool		nulls[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
 	PgStat_StatSubEntry *subentry;
 	PgStat_StatSubEntry allzero;
+	int			i = 0;
 
 	/* Get subscription stats */
 	subentry = pgstat_fetch_stat_subscription(subid);
@@ -1985,7 +1986,19 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "insert_exists_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "update_differ_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "update_exists_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "update_missing_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "delete_differ_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "delete_missing_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -1997,19 +2010,25 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 	}
 
 	/* subid */
-	values[0] = ObjectIdGetDatum(subid);
+	values[i++] = ObjectIdGetDatum(subid);
 
 	/* apply_error_count */
-	values[1] = Int64GetDatum(subentry->apply_error_count);
+	values[i++] = Int64GetDatum(subentry->apply_error_count);
 
 	/* sync_error_count */
-	values[2] = Int64GetDatum(subentry->sync_error_count);
+	values[i++] = Int64GetDatum(subentry->sync_error_count);
+
+	/* conflict count */
+	for (int nconflict = 0; nconflict < CONFLICT_NUM_TYPES; nconflict++)
+		values[i++] = Int64GetDatum(subentry->conflict_count[nconflict]);
 
 	/* stats_reset */
 	if (subentry->stat_reset_timestamp == 0)
-		nulls[3] = true;
+		nulls[i] = true;
 	else
-		values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+		values[i] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+
+	Assert(i + 1 == PG_STAT_GET_SUBSCRIPTION_STATS_COLS);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4abc6d9526..3d5c2957c9 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5538,9 +5538,9 @@
 { oid => '6231', descr => 'statistics: information about subscription stats',
   proname => 'pg_stat_get_subscription_stats', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o}',
-  proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}',
+  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,apply_error_count,sync_error_count,insert_exists_count,update_differ_count,update_exists_count,update_missing_count,delete_differ_count,delete_missing_count,stats_reset}',
   prosrc => 'pg_stat_get_subscription_stats' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f63159c55c..be2c91168a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -15,6 +15,7 @@
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
+#include "replication/conflict.h"
 #include "utils/backend_progress.h" /* for backward compatibility */
 #include "utils/backend_status.h"	/* for backward compatibility */
 #include "utils/relcache.h"
@@ -165,6 +166,7 @@ typedef struct PgStat_BackendSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 } PgStat_BackendSubEntry;
 
 /* ----------
@@ -423,6 +425,7 @@ typedef struct PgStat_StatSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatSubEntry;
 
@@ -725,6 +728,7 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void);
  */
 
 extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
+extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type);
 extern void pgstat_create_subscription(Oid subid);
 extern void pgstat_drop_subscription(Oid subid);
 extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 02cb84da7e..7232c8889b 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -14,6 +14,11 @@
 
 /*
  * Conflict types that could occur while applying remote changes.
+ *
+ * This enum is used in statistics collection (see
+ * PgStat_StatSubEntry::conflict_count) as well, therefore, when adding new
+ * values or reordering existing ones, ensure to review and potentially adjust
+ * the corresponding statistics collection codes.
  */
 typedef enum
 {
@@ -42,6 +47,8 @@ typedef enum
 	 */
 } ConflictType;
 
+#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+
 extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
 									TransactionId *xmin,
 									RepOriginId *localorigin,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 862433ee52..1985d2ffad 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2139,9 +2139,15 @@ pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
     ss.sync_error_count,
+    ss.insert_exists_count,
+    ss.update_differ_count,
+    ss.update_exists_count,
+    ss.update_missing_count,
+    ss.delete_differ_count,
+    ss.delete_missing_count,
     ss.stats_reset
    FROM pg_subscription s,
-    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset);
+    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, insert_exists_count, update_differ_count, update_exists_count, update_missing_count, delete_differ_count, delete_missing_count, stats_reset);
 pg_stat_sys_indexes| SELECT relid,
     indexrelid,
     schemaname,
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index fb3e5629b3..44a7f7fec1 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -16,6 +16,15 @@ $node_publisher->start;
 # Create subscriber node.
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
+
+# Enable track_commit_timestamp to detect origin-differ conflicts in logical
+# replication. Reduce wal_retrieve_retry_interval to 1ms to accelerate the
+# restart of the logical replication worker after encountering a conflict.
+$node_subscriber->append_conf(
+	'postgresql.conf', q{
+track_commit_timestamp = on
+wal_retrieve_retry_interval = 1ms
+});
 $node_subscriber->start;
 
 
@@ -30,6 +39,7 @@ sub create_sub_pub_w_errors
 		qq[
 	BEGIN;
 	CREATE TABLE $table_name(a int);
+	ALTER TABLE $table_name REPLICA IDENTITY FULL;
 	INSERT INTO $table_name VALUES (1);
 	COMMIT;
 	]);
@@ -95,7 +105,7 @@ sub create_sub_pub_w_errors
 	$node_subscriber->poll_query_until(
 		$db,
 		qq[
-	SELECT apply_error_count > 0
+	SELECT apply_error_count > 0 AND insert_exists_count > 0
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub_name'
 	])
@@ -105,6 +115,102 @@ sub create_sub_pub_w_errors
 	# Truncate test table so that apply worker can continue.
 	$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
 
+	# Insert a row on the subscriber.
+	$node_subscriber->safe_psql($db, qq(INSERT INTO $table_name VALUES (2)));
+
+	# Update the test table on the publisher. This operation will raise an
+	# error on the subscriber due to a violation of the unique constraint on
+	# the test table.
+	$node_publisher->safe_psql($db, qq(UPDATE $table_name SET a = 2;));
+
+	# Wait for the subscriber to report an update_exists conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT update_exists_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_exists conflict for subscription '$sub_name');
+
+	# Truncate the subscriber side test table. Now that the table is empty, the
+	# update conflict (update_existing) ERRORs will stop happening. A single
+	# update_missing conflict will be reported, but the update will be skipped
+	# on the subscriber, allowing the test to continue.
+    $node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
+
+	# Wait for the subscriber to report update_missing conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT update_missing_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_missing conflict for subscription '$sub_name');
+
+	# Delete data from the test table on the publisher. This delete operation
+	# should be skipped on the subscriber since the table is already empty.
+	$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+	# Wait for the subscriber to report delete_missing conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT delete_missing_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for delete_missing conflict for subscription '$sub_name');
+
+	# Prepare data for further tests.
+	$node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1)));
+	$node_publisher->wait_for_catchup($sub_name);
+	$node_subscriber->safe_psql($db, qq(
+		TRUNCATE $table_name;
+		INSERT INTO $table_name VALUES (1);
+	));
+
+	# Update the data in the test table on the publisher. This should generate
+	# a conflict because it causes subscriber to attempt to update a row that has
+	# been modified by a different origin.
+	$node_publisher->safe_psql($db, qq(UPDATE $table_name SET a = 2;));
+
+	# Wait for the subscriber to report an update_differ conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT update_differ_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_differ conflict for subscription '$sub_name');
+
+	# Prepare data for further tests.
+	$node_subscriber->safe_psql($db, qq(
+		TRUNCATE $table_name;
+		INSERT INTO $table_name VALUES (2);
+	));
+
+	# Delete data from the test table on the publisher. This should generate a
+	# conflict because it causes subscriber to attempt to delete a row that has
+	# been modified by a different origin.
+	$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT delete_differ_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for delete_differ conflict for subscription '$sub_name');
+
 	return ($pub_name, $sub_name);
 }
 
@@ -123,17 +229,23 @@ my ($pub1_name, $sub1_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table1_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
 	sync_error_count > 0,
+	insert_exists_count > 0,
+	update_differ_count > 0,
+	update_exists_count > 0,
+	update_missing_count > 0,
+	delete_differ_count > 0,
+	delete_missing_count > 0,
 	stats_reset IS NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Check that apply errors and sync errors are both > 0 and stats_reset is NULL for subscription '$sub1_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.)
 );
 
 # Reset a single subscription
@@ -141,17 +253,23 @@ $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
 );
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	insert_exists_count = 0,
+	update_differ_count = 0,
+	update_exists_count = 0,
+	update_missing_count = 0,
+	delete_differ_count = 0,
+	delete_missing_count = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
 );
 
 # Get reset timestamp
@@ -181,46 +299,64 @@ my ($pub2_name, $sub2_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table2_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
 	sync_error_count > 0,
+	insert_exists_count > 0,
+	update_differ_count > 0,
+	update_exists_count > 0,
+	update_missing_count > 0,
+	delete_differ_count > 0,
+	delete_missing_count > 0,
 	stats_reset IS NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub2_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both > 0 and stats_reset is NULL for sub '$sub2_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.)
 );
 
 # Reset all subscriptions
 $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats(NULL)));
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	insert_exists_count = 0,
+	update_differ_count = 0,
+	update_exists_count = 0,
+	update_missing_count = 0,
+	delete_differ_count = 0,
+	delete_missing_count = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
 );
 
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	insert_exists_count = 0,
+	update_differ_count = 0,
+	update_exists_count = 0,
+	update_missing_count = 0,
+	delete_differ_count = 0,
+	delete_missing_count = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub2_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
 );
 
 $reset_time1 = $node_subscriber->safe_psql($db,
-- 
2.30.0.windows.2

#6shveta malik
shveta.malik@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#5)
Re: Collect statistics about conflicts in logical replication

On Tue, Aug 27, 2024 at 3:21 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

On Tuesday, August 27, 2024 10:59 AM Peter Smith <smithpb2250@gmail.com> wrote:

~~~

3.
+# Enable track_commit_timestamp to detect origin-differ conflicts in
+logical # replication. Reduce wal_retrieve_retry_interval to 1ms to
+accelerate the # restart of the logical replication worker after encountering a
conflict.
+$node_subscriber->append_conf(
+ 'postgresql.conf', q{
+track_commit_timestamp = on
+wal_retrieve_retry_interval = 1ms
+});

Later, after CDR resolvers are implemented, it might be good to revisit these
conflict test cases and re-write them to use some conflict resolvers like 'skip'.
Then the subscriber won't give ERRORs and restart apply workers all the time
behind the scenes, so you won't need the above configuration for accelerating
the worker restarts. In other words, running these tests might be more efficient
if you can avoid restarting workers all the time.

I suggest putting an XXX comment here as a reminder that these tests should
be revisited to make use of conflict resolvers in the future.

I think it would be too early to mention the resolution implementation detail
in the comments considering that the resolution is still not RFC. Also, I think
reducing wal_retrieve_retry_interval is a reasonable way to speed up the test
in this case because the test is not letting the worker to restart all the time, the
error causes the restart will be resolved immediately after the stats check. So, I
think adding XXX is not very appropriate.

Other comments look good to me.
I slightly adjusted few words and merged the changes. Thanks !

Here is V3 patch.

Thanks for the patch. Just thinking out loud, since we have names like
'apply_error_count', 'sync_error_count' which tells that they are
actually error-count, will it be better to have something similar in
conflict-count cases, like 'insert_exists_conflict_count',
'delete_missing_conflict_count' and so on. Thoughts?

I noticed that now we do mention this (as I suggested earlier):
+ Note that any conflict resulting in an apply error will be counted
in both apply_error_count and the corresponding conflict count.

But we do not mention clearly which ones are conflict-counts. As an
example, we have this:

+ insert_exists_count bigint:
+ Number of times a row insertion violated a NOT DEFERRABLE unique
constraint during the application of changes

It does not mention that it is a conflict count. So we need to either
change names or mention clearly against each that it is a conflict
count.

thanks
sHveta

#7Amit Kapila
amit.kapila16@gmail.com
In reply to: shveta malik (#6)
Re: Collect statistics about conflicts in logical replication

On Wed, Aug 28, 2024 at 11:43 AM shveta malik <shveta.malik@gmail.com> wrote:

Thanks for the patch. Just thinking out loud, since we have names like
'apply_error_count', 'sync_error_count' which tells that they are
actually error-count, will it be better to have something similar in
conflict-count cases, like 'insert_exists_conflict_count',
'delete_missing_conflict_count' and so on. Thoughts?

It would be better to have conflict in the names but OTOH it will make
the names a bit longer. The other alternatives could be (a)
insert_exists_confl_count, etc. (b) confl_insert_exists_count, etc.
(c) confl_insert_exists, etc. These are based on the column names in
the existing view pg_stat_database_conflicts [1]https://www.postgresql.org/docs/devel/monitoring-stats.html#MONITORING-PG-STAT-DATABASE-CONFLICTS-VIEW. The (c) looks better
than other options but it will make the conflict-related columns
different from error-related columns.

Yet another option is to have a different view like
pg_stat_subscription_conflicts but that sounds like going too far.

[1]: https://www.postgresql.org/docs/devel/monitoring-stats.html#MONITORING-PG-STAT-DATABASE-CONFLICTS-VIEW

--
With Regards,
Amit Kapila.

#8Peter Smith
smithpb2250@gmail.com
In reply to: Amit Kapila (#7)
Re: Collect statistics about conflicts in logical replication

On Wed, Aug 28, 2024 at 9:19 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Aug 28, 2024 at 11:43 AM shveta malik <shveta.malik@gmail.com> wrote:

Thanks for the patch. Just thinking out loud, since we have names like
'apply_error_count', 'sync_error_count' which tells that they are
actually error-count, will it be better to have something similar in
conflict-count cases, like 'insert_exists_conflict_count',
'delete_missing_conflict_count' and so on. Thoughts?

It would be better to have conflict in the names but OTOH it will make
the names a bit longer. The other alternatives could be (a)
insert_exists_confl_count, etc. (b) confl_insert_exists_count, etc.
(c) confl_insert_exists, etc. These are based on the column names in
the existing view pg_stat_database_conflicts [1]. The (c) looks better
than other options but it will make the conflict-related columns
different from error-related columns.

Yet another option is to have a different view like
pg_stat_subscription_conflicts but that sounds like going too far.

[1] - https://www.postgresql.org/docs/devel/monitoring-stats.html#MONITORING-PG-STAT-DATABASE-CONFLICTS-VIEW

Option (c) looked good to me.

Removing the suffix "_count" is OK. For example, try searching all of
Chapter 27 ("The Cumulative Statistics System") [1]https://www.postgresql.org/docs/devel/monitoring-stats.html for columns
described as "Number of ..." and you will find that a "_count" suffix
is used only rarely.

Adding the prefix "confl_" is OK. As mentioned, there is a precedent
for this. See "pg_stat_database_conflicts" [2]https://www.postgresql.org/docs/devel/monitoring-stats.html#MONITORING-PG-STAT-DATABASE-CONFLICTS-VIEW.

Mixing column names where some have and some do not have "_count"
suffixes may not be ideal, but I see no problem because there are
precedents for that too. E.g. see "pg_stat_replication_slots" [3]https://www.postgresql.org/docs/devel/monitoring-stats.html#MONITORING-PG-STAT-REPLICATION-SLOTS-VIEW, and
"pg_stat_all_tables" [4]https://www.postgresql.org/docs/devel/monitoring-stats.html#MONITORING-PG-STAT-ALL-TABLES-VIEW.

======
[1]: https://www.postgresql.org/docs/devel/monitoring-stats.html
[2]: https://www.postgresql.org/docs/devel/monitoring-stats.html#MONITORING-PG-STAT-DATABASE-CONFLICTS-VIEW
[3]: https://www.postgresql.org/docs/devel/monitoring-stats.html#MONITORING-PG-STAT-REPLICATION-SLOTS-VIEW
[4]: https://www.postgresql.org/docs/devel/monitoring-stats.html#MONITORING-PG-STAT-ALL-TABLES-VIEW

Kind Regards,
Peter Smith.
Fujitsu Australia

#9Peter Smith
smithpb2250@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#3)
Re: Collect statistics about conflicts in logical replication

Hi Hou-San.

I tried an experiment where I deliberately violated a primary key
during initial table synchronization.

For example:

test_sub=# create table t1(a int primary key);
CREATE TABLE

test_sub=# insert into t1 values(1);
INSERT 0 1

test_sub=# create subscription sub1 connection 'dbname=test_pub'
publication pub1 with (enabled=false);
2024-08-29 09:53:21.172 AEST [24186] WARNING: subscriptions created
by regression test cases should have names starting with "regress_"
WARNING: subscriptions created by regression test cases should have
names starting with "regress_"
NOTICE: created replication slot "sub1" on publisher
CREATE SUBSCRIPTION

test_sub=# select * from pg_stat_subscription_stats;
subid | subname | apply_error_count | sync_error_count |
insert_exists_count | update_differ_count | update_exists_count |
update_missing_count | de
lete_differ_count | delete_missing_count | stats_reset
-------+---------+-------------------+------------------+---------------------+---------------------+---------------------+----------------------+---
------------------+----------------------+-------------
16390 | sub1 | 0 | 0 |
0 | 0 | 0 |
0 |
0 | 0 |
(1 row)

test_sub=# alter subscription sub1 enable;
ALTER SUBSCRIPTION

test_sub=# 2024-08-29 09:53:57.245 AEST [4345] LOG: logical
replication apply worker for subscription "sub1" has started
2024-08-29 09:53:57.258 AEST [4347] LOG: logical replication table
synchronization worker for subscription "sub1", table "t1" has started
2024-08-29 09:53:57.311 AEST [4347] ERROR: duplicate key value
violates unique constraint "t1_pkey"
2024-08-29 09:53:57.311 AEST [4347] DETAIL: Key (a)=(1) already exists.
2024-08-29 09:53:57.311 AEST [4347] CONTEXT: COPY t1, line 1
2024-08-29 09:53:57.312 AEST [23501] LOG: background worker "logical
replication tablesync worker" (PID 4347) exited with exit code 1
2024-08-29 09:54:02.385 AEST [4501] LOG: logical replication table
synchronization worker for subscription "sub1", table "t1" has started
2024-08-29 09:54:02.462 AEST [4501] ERROR: duplicate key value
violates unique constraint "t1_pkey"
2024-08-29 09:54:02.462 AEST [4501] DETAIL: Key (a)=(1) already exists.
2024-08-29 09:54:02.462 AEST [4501] CONTEXT: COPY t1, line 1
2024-08-29 09:54:02.463 AEST [23501] LOG: background worker "logical
replication tablesync worker" (PID 4501) exited with exit code 1
2024-08-29 09:54:07.512 AEST [4654] LOG: logical replication table
synchronization worker for subscription "sub1", table "t1" has started
2024-08-29 09:54:07.580 AEST [4654] ERROR: duplicate key value
violates unique constraint "t1_pkey"
2024-08-29 09:54:07.580 AEST [4654] DETAIL: Key (a)=(1) already exists.
2024-08-29 09:54:07.580 AEST [4654] CONTEXT: COPY t1, line 1
...

test_sub=# alter subscription sub1 disable;'
ALTER SUBSCRIPTION
2024-08-29 09:55:10.329 AEST [4345] LOG: logical replication worker
for subscription "sub1" will stop because the subscription was
disabled

test_sub=# select * from pg_stat_subscription_stats;
subid | subname | apply_error_count | sync_error_count |
insert_exists_count | update_differ_count | update_exists_count |
update_missing_count | de
lete_differ_count | delete_missing_count | stats_reset
-------+---------+-------------------+------------------+---------------------+---------------------+---------------------+----------------------+---
------------------+----------------------+-------------
16390 | sub1 | 0 | 15 |
0 | 0 | 0 |
0 |
0 | 0 |
(1 row)

~~~

Notice how after a while there were multiple (15) 'sync_error_count' recorded.

According to the docs: 'insert_exists' happens when "Inserting a row
that violates a NOT DEFERRABLE unique constraint.". So why are there
not the same number of 'insert_exists_count' recorded in
pg_stat_subscription_stats?

The 'insert_exists' is either not happening or is not being counted
during table synchronization. Either way, it was not what I was
expecting. If it is not a bug, maybe the docs need to explain more
about the rules for 'insert_exists' during the initial table sync.

======
Kind Regards,
Peter Smith.
Fujitsu Australia

#10Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: Peter Smith (#9)
RE: Collect statistics about conflicts in logical replication

On Thursday, August 29, 2024 8:31 AM Peter Smith <smithpb2250@gmail.com> wrote:

Hi,

I tried an experiment where I deliberately violated a primary key during initial
table synchronization.

For example:

...

test_sub=# 2024-08-29 09:53:57.245 AEST [4345] LOG: logical replication
apply worker for subscription "sub1" has started
2024-08-29 09:53:57.258 AEST [4347] LOG: logical replication table
synchronization worker for subscription "sub1", table "t1" has started
2024-08-29 09:53:57.311 AEST [4347] ERROR: duplicate key value violates
unique constraint "t1_pkey"
2024-08-29 09:53:57.311 AEST [4347] DETAIL: Key (a)=(1) already exists.
2024-08-29 09:53:57.311 AEST [4347] CONTEXT: COPY t1, line 1
~~~

Notice how after a while there were multiple (15) 'sync_error_count' recorded.

According to the docs: 'insert_exists' happens when "Inserting a row that
violates a NOT DEFERRABLE unique constraint.". So why are there not the
same number of 'insert_exists_count' recorded in pg_stat_subscription_stats?

Because this error was caused by COPY instead of an INSERT (e.g., CONTEXT: COPY
t1, line 1), so this is as expected. The doc of conflict counts(
insert_exists_count) has already mentioned that it counts the conflict only *during the
application of changes* which is clear to me that it doesn't count the ones in
initial table synchronization. See the existing apply_error_count where we also
has similar wording(e.g. "an error occurred while applying changes").

Best Regards,
Hou zj

#11shveta malik
shveta.malik@gmail.com
In reply to: Peter Smith (#8)
Re: Collect statistics about conflicts in logical replication

On Thu, Aug 29, 2024 at 4:59 AM Peter Smith <smithpb2250@gmail.com> wrote:

On Wed, Aug 28, 2024 at 9:19 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Aug 28, 2024 at 11:43 AM shveta malik <shveta.malik@gmail.com> wrote:

Thanks for the patch. Just thinking out loud, since we have names like
'apply_error_count', 'sync_error_count' which tells that they are
actually error-count, will it be better to have something similar in
conflict-count cases, like 'insert_exists_conflict_count',
'delete_missing_conflict_count' and so on. Thoughts?

It would be better to have conflict in the names but OTOH it will make
the names a bit longer. The other alternatives could be (a)
insert_exists_confl_count, etc. (b) confl_insert_exists_count, etc.
(c) confl_insert_exists, etc. These are based on the column names in
the existing view pg_stat_database_conflicts [1]. The (c) looks better
than other options but it will make the conflict-related columns
different from error-related columns.

Yet another option is to have a different view like
pg_stat_subscription_conflicts but that sounds like going too far.

Yes, I think we are good with pg_stat_subscription_stats for the time being.

[1] - https://www.postgresql.org/docs/devel/monitoring-stats.html#MONITORING-PG-STAT-DATABASE-CONFLICTS-VIEW

Option (c) looked good to me.

+1 for option c. it should be okay to not have '_count' in the name.

Show quoted text

Removing the suffix "_count" is OK. For example, try searching all of
Chapter 27 ("The Cumulative Statistics System") [1] for columns
described as "Number of ..." and you will find that a "_count" suffix
is used only rarely.

Adding the prefix "confl_" is OK. As mentioned, there is a precedent
for this. See "pg_stat_database_conflicts" [2].

Mixing column names where some have and some do not have "_count"
suffixes may not be ideal, but I see no problem because there are
precedents for that too. E.g. see "pg_stat_replication_slots" [3], and
"pg_stat_all_tables" [4].

======
[1] https://www.postgresql.org/docs/devel/monitoring-stats.html
[2] https://www.postgresql.org/docs/devel/monitoring-stats.html#MONITORING-PG-STAT-DATABASE-CONFLICTS-VIEW
[3] https://www.postgresql.org/docs/devel/monitoring-stats.html#MONITORING-PG-STAT-REPLICATION-SLOTS-VIEW
[4] https://www.postgresql.org/docs/devel/monitoring-stats.html#MONITORING-PG-STAT-ALL-TABLES-VIEW

Kind Regards,
Peter Smith.
Fujitsu Australia

#12Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: shveta malik (#11)
1 attachment(s)
RE: Collect statistics about conflicts in logical replication

On Thursday, August 29, 2024 11:18 AM shveta malik <shveta.malik@gmail.com> wrote:

On Thu, Aug 29, 2024 at 4:59 AM Peter Smith <smithpb2250@gmail.com>
wrote:

On Wed, Aug 28, 2024 at 9:19 PM Amit Kapila <amit.kapila16@gmail.com>

wrote:

On Wed, Aug 28, 2024 at 11:43 AM shveta malik

<shveta.malik@gmail.com> wrote:

Thanks for the patch. Just thinking out loud, since we have names
like 'apply_error_count', 'sync_error_count' which tells that they
are actually error-count, will it be better to have something
similar in conflict-count cases, like
'insert_exists_conflict_count', 'delete_missing_conflict_count' and so

on. Thoughts?

It would be better to have conflict in the names but OTOH it will
make the names a bit longer. The other alternatives could be (a)
insert_exists_confl_count, etc. (b) confl_insert_exists_count, etc.
(c) confl_insert_exists, etc. These are based on the column names in
the existing view pg_stat_database_conflicts [1]. The (c) looks
better than other options but it will make the conflict-related
columns different from error-related columns.

Yet another option is to have a different view like
pg_stat_subscription_conflicts but that sounds like going too far.

Yes, I think we are good with pg_stat_subscription_stats for the time being.

[1] -

https://www.postgresql.org/docs/devel/monitoring-stats.html#MONITORI

NG-PG-STAT-DATABASE-CONFLICTS-VIEW

Option (c) looked good to me.

+1 for option c. it should be okay to not have '_count' in the name.

Agreed. Here is new version patch which change the names as suggested. I also
rebased the patch based on another renaming commit 640178c9.

Best Regards,
Hou zj

Attachments:

v4-0001-Collect-statistics-about-conflicts-in-logical-rep.patchapplication/octet-stream; name=v4-0001-Collect-statistics-about-conflicts-in-logical-rep.patchDownload
From 5ddbe6bf22aa6e0269a492cf9a42479ac43cee1d Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 29 Aug 2024 10:31:32 +0800
Subject: [PATCH v4] Collect statistics about conflicts in logical replication

This commit adds columns in view pg_stat_subscription_stats to show
information about the conflict which occur during the application of
logical replication changes. Currently, the following columns are added.

confl_insert_exists:
	Number of times a row insertion violated a NOT DEFERRABLE unique constraint.
confl_update_origin_differs:
	Number of times an update was performed on a row that was previously modified by another origin.
confl_update_exists:
	Number of times that the updated value of a row violates a NOT DEFERRABLE unique constraint.
confl_update_missing:
	Number of times that the tuple to be updated is missing.
confl_delete_origin_differs:
	Number of times a delete was performed on a row that was previously modified by another origin.
confl_delete_missing:
	Number of times that the tuple to be deleted is missing.

The update_origin_differs and delete_origin_differs conflicts can be detected
only when track_commit_timestamp is enabled.
---
 doc/src/sgml/logical-replication.sgml         |   5 +-
 doc/src/sgml/monitoring.sgml                  |  75 +++++++-
 src/backend/catalog/system_views.sql          |   6 +
 src/backend/replication/logical/conflict.c    |   5 +-
 .../utils/activity/pgstat_subscription.c      |  17 ++
 src/backend/utils/adt/pgstatfuncs.c           |  33 +++-
 src/include/catalog/pg_proc.dat               |   6 +-
 src/include/pgstat.h                          |   4 +
 src/include/replication/conflict.h            |   7 +
 src/test/regress/expected/rules.out           |   8 +-
 src/test/subscription/t/026_stats.pl          | 166 ++++++++++++++++--
 11 files changed, 301 insertions(+), 31 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 89b41011dd..fa0ff54bd6 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1585,8 +1585,9 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
   </para>
 
   <para>
-   Additional logging is triggered in the following <firstterm>conflict</firstterm>
-   cases:
+   Additional logging is triggered, and the conflict statistics are collected (displayed in the
+   <link linkend="monitoring-pg-stat-subscription-stats"><structname>pg_stat_subscription_stats</structname></link> view)
+   in the following <firstterm>conflict</firstterm> cases:
    <variablelist>
     <varlistentry>
      <term><literal>insert_exists</literal></term>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 55417a6fa9..cac2e98a72 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -507,7 +507,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_stats</structname><indexterm><primary>pg_stat_subscription_stats</primary></indexterm></entry>
-      <entry>One row per subscription, showing statistics about errors.
+      <entry>One row per subscription, showing statistics about errors and conflicts.
       See <link linkend="monitoring-pg-stat-subscription-stats">
       <structname>pg_stat_subscription_stats</structname></link> for details.
       </entry>
@@ -2157,7 +2157,10 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        <structfield>apply_error_count</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of times an error occurred while applying changes
+       Number of times an error occurred while applying changes. Note that any
+       conflict resulting in an apply error will be counted in both
+       <literal>apply_error_count</literal> and the corresponding conflict
+       count (e.g., <literal>confl_*</literal>).
       </para></entry>
      </row>
 
@@ -2171,6 +2174,74 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_insert_exists</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a row insertion violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_origin_differs</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times an update was applied to a row that had been previously
+       modified by another source during the application of changes. This conflict is
+       counted only when the
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       option is enabled on the subscriber.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_exists</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times that an updated row value violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_missing</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be updated was not found during the
+       application of changes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_delete_origin_differs</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a delete operation was applied to row that had been
+       previously modified by another source during the application of changes.
+       This conflict is counted only when the
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       option is enabled on the subscriber.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_delete_missing</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be deleted was not found during the application
+       of changes
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 19cabc9a47..7fd5d256a1 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1365,6 +1365,12 @@ CREATE VIEW pg_stat_subscription_stats AS
         s.subname,
         ss.apply_error_count,
         ss.sync_error_count,
+        ss.confl_insert_exists,
+        ss.confl_update_origin_differs,
+        ss.confl_update_exists,
+        ss.confl_update_missing,
+        ss.confl_delete_origin_differs,
+        ss.confl_delete_missing,
         ss.stats_reset
     FROM pg_subscription as s,
          pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index dde3b4d9c8..3a88c226d5 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -17,8 +17,9 @@
 #include "access/commit_ts.h"
 #include "access/tableam.h"
 #include "executor/executor.h"
+#include "pgstat.h"
 #include "replication/conflict.h"
-#include "replication/logicalrelation.h"
+#include "replication/worker_internal.h"
 #include "storage/lmgr.h"
 #include "utils/lsyscache.h"
 
@@ -114,6 +115,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 	Assert(!OidIsValid(indexoid) ||
 		   CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
 
+	pgstat_report_subscription_conflict(MySubscription->oid, type);
+
 	ereport(elevel,
 			errcode_apply_conflict(type),
 			errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c
index d9af8de658..e06c92727e 100644
--- a/src/backend/utils/activity/pgstat_subscription.c
+++ b/src/backend/utils/activity/pgstat_subscription.c
@@ -39,6 +39,21 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error)
 		pending->sync_error_count++;
 }
 
+/*
+ * Report a subscription conflict.
+ */
+void
+pgstat_report_subscription_conflict(Oid subid, ConflictType type)
+{
+	PgStat_EntryRef *entry_ref;
+	PgStat_BackendSubEntry *pending;
+
+	entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
+										  InvalidOid, subid, NULL);
+	pending = entry_ref->pending;
+	pending->conflict_count[type]++;
+}
+
 /*
  * Report creating the subscription.
  */
@@ -101,6 +116,8 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 #define SUB_ACC(fld) shsubent->stats.fld += localent->fld
 	SUB_ACC(apply_error_count);
 	SUB_ACC(sync_error_count);
+	for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
+		SUB_ACC(conflict_count[i]);
 #undef SUB_ACC
 
 	pgstat_unlock_entry(entry_ref);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 3221137123..97dc09ac0d 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1966,13 +1966,14 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	4
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	10
 	Oid			subid = PG_GETARG_OID(0);
 	TupleDesc	tupdesc;
 	Datum		values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
 	bool		nulls[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
 	PgStat_StatSubEntry *subentry;
 	PgStat_StatSubEntry allzero;
+	int			i = 0;
 
 	/* Get subscription stats */
 	subentry = pgstat_fetch_stat_subscription(subid);
@@ -1985,7 +1986,19 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -1997,19 +2010,25 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 	}
 
 	/* subid */
-	values[0] = ObjectIdGetDatum(subid);
+	values[i++] = ObjectIdGetDatum(subid);
 
 	/* apply_error_count */
-	values[1] = Int64GetDatum(subentry->apply_error_count);
+	values[i++] = Int64GetDatum(subentry->apply_error_count);
 
 	/* sync_error_count */
-	values[2] = Int64GetDatum(subentry->sync_error_count);
+	values[i++] = Int64GetDatum(subentry->sync_error_count);
+
+	/* conflict count */
+	for (int nconflict = 0; nconflict < CONFLICT_NUM_TYPES; nconflict++)
+		values[i++] = Int64GetDatum(subentry->conflict_count[nconflict]);
 
 	/* stats_reset */
 	if (subentry->stat_reset_timestamp == 0)
-		nulls[3] = true;
+		nulls[i] = true;
 	else
-		values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+		values[i] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+
+	Assert(i + 1 == PG_STAT_GET_SUBSCRIPTION_STATS_COLS);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4abc6d9526..01bf72d967 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5538,9 +5538,9 @@
 { oid => '6231', descr => 'statistics: information about subscription stats',
   proname => 'pg_stat_get_subscription_stats', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o}',
-  proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}',
+  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}',
   prosrc => 'pg_stat_get_subscription_stats' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f63159c55c..be2c91168a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -15,6 +15,7 @@
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
+#include "replication/conflict.h"
 #include "utils/backend_progress.h" /* for backward compatibility */
 #include "utils/backend_status.h"	/* for backward compatibility */
 #include "utils/relcache.h"
@@ -165,6 +166,7 @@ typedef struct PgStat_BackendSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 } PgStat_BackendSubEntry;
 
 /* ----------
@@ -423,6 +425,7 @@ typedef struct PgStat_StatSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatSubEntry;
 
@@ -725,6 +728,7 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void);
  */
 
 extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
+extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type);
 extern void pgstat_create_subscription(Oid subid);
 extern void pgstat_drop_subscription(Oid subid);
 extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index b37c7d2ca0..43430f808c 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -14,6 +14,11 @@
 
 /*
  * Conflict types that could occur while applying remote changes.
+ *
+ * This enum is used in statistics collection (see
+ * PgStat_StatSubEntry::conflict_count) as well, therefore, when adding new
+ * values or reordering existing ones, ensure to review and potentially adjust
+ * the corresponding statistics collection codes.
  */
 typedef enum
 {
@@ -42,6 +47,8 @@ typedef enum
 	 */
 } ConflictType;
 
+#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+
 extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
 									TransactionId *xmin,
 									RepOriginId *localorigin,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 862433ee52..a1626f3fae 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2139,9 +2139,15 @@ pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
     ss.sync_error_count,
+    ss.confl_insert_exists,
+    ss.confl_update_origin_differs,
+    ss.confl_update_exists,
+    ss.confl_update_missing,
+    ss.confl_delete_origin_differs,
+    ss.confl_delete_missing,
     ss.stats_reset
    FROM pg_subscription s,
-    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset);
+    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset);
 pg_stat_sys_indexes| SELECT relid,
     indexrelid,
     schemaname,
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index fb3e5629b3..cead46666d 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -16,6 +16,15 @@ $node_publisher->start;
 # Create subscriber node.
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
+
+# Enable track_commit_timestamp to detect origin-differ conflicts in logical
+# replication. Reduce wal_retrieve_retry_interval to 1ms to accelerate the
+# restart of the logical replication worker after encountering a conflict.
+$node_subscriber->append_conf(
+	'postgresql.conf', q{
+track_commit_timestamp = on
+wal_retrieve_retry_interval = 1ms
+});
 $node_subscriber->start;
 
 
@@ -30,6 +39,7 @@ sub create_sub_pub_w_errors
 		qq[
 	BEGIN;
 	CREATE TABLE $table_name(a int);
+	ALTER TABLE $table_name REPLICA IDENTITY FULL;
 	INSERT INTO $table_name VALUES (1);
 	COMMIT;
 	]);
@@ -95,7 +105,7 @@ sub create_sub_pub_w_errors
 	$node_subscriber->poll_query_until(
 		$db,
 		qq[
-	SELECT apply_error_count > 0
+	SELECT apply_error_count > 0 AND confl_insert_exists > 0
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub_name'
 	])
@@ -105,6 +115,102 @@ sub create_sub_pub_w_errors
 	# Truncate test table so that apply worker can continue.
 	$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
 
+	# Insert a row on the subscriber.
+	$node_subscriber->safe_psql($db, qq(INSERT INTO $table_name VALUES (2)));
+
+	# Update the test table on the publisher. This operation will raise an
+	# error on the subscriber due to a violation of the unique constraint on
+	# the test table.
+	$node_publisher->safe_psql($db, qq(UPDATE $table_name SET a = 2;));
+
+	# Wait for the subscriber to report an update_exists conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT confl_update_exists > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_exists conflict for subscription '$sub_name');
+
+	# Truncate the subscriber side test table. Now that the table is empty, the
+	# update conflict (update_existing) ERRORs will stop happening. A single
+	# update_missing conflict will be reported, but the update will be skipped
+	# on the subscriber, allowing the test to continue.
+    $node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
+
+	# Wait for the subscriber to report update_missing conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT confl_update_missing > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_missing conflict for subscription '$sub_name');
+
+	# Delete data from the test table on the publisher. This delete operation
+	# should be skipped on the subscriber since the table is already empty.
+	$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+	# Wait for the subscriber to report delete_missing conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT confl_delete_missing > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for delete_missing conflict for subscription '$sub_name');
+
+	# Prepare data for further tests.
+	$node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1)));
+	$node_publisher->wait_for_catchup($sub_name);
+	$node_subscriber->safe_psql($db, qq(
+		TRUNCATE $table_name;
+		INSERT INTO $table_name VALUES (1);
+	));
+
+	# Update the data in the test table on the publisher. This should generate
+	# a conflict because it causes subscriber to attempt to update a row that has
+	# been modified by a different origin.
+	$node_publisher->safe_psql($db, qq(UPDATE $table_name SET a = 2;));
+
+	# Wait for the subscriber to report an update_origin_differs conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT confl_update_origin_differs > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_origin_differs conflict for subscription '$sub_name');
+
+	# Prepare data for further tests.
+	$node_subscriber->safe_psql($db, qq(
+		TRUNCATE $table_name;
+		INSERT INTO $table_name VALUES (2);
+	));
+
+	# Delete data from the test table on the publisher. This should generate a
+	# conflict because it causes subscriber to attempt to delete a row that has
+	# been modified by a different origin.
+	$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT confl_delete_origin_differs > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for delete_origin_differs conflict for subscription '$sub_name');
+
 	return ($pub_name, $sub_name);
 }
 
@@ -123,17 +229,23 @@ my ($pub1_name, $sub1_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table1_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
 	sync_error_count > 0,
+	confl_insert_exists > 0,
+	confl_update_origin_differs > 0,
+	confl_update_exists > 0,
+	confl_update_missing > 0,
+	confl_delete_origin_differs > 0,
+	confl_delete_missing > 0,
 	stats_reset IS NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Check that apply errors and sync errors are both > 0 and stats_reset is NULL for subscription '$sub1_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.)
 );
 
 # Reset a single subscription
@@ -141,17 +253,23 @@ $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
 );
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	confl_insert_exists = 0,
+	confl_update_origin_differs = 0,
+	confl_update_exists = 0,
+	confl_update_missing = 0,
+	confl_delete_origin_differs = 0,
+	confl_delete_missing = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
 );
 
 # Get reset timestamp
@@ -181,46 +299,64 @@ my ($pub2_name, $sub2_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table2_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
 	sync_error_count > 0,
+	confl_insert_exists > 0,
+	confl_update_origin_differs > 0,
+	confl_update_exists > 0,
+	confl_update_missing > 0,
+	confl_delete_origin_differs > 0,
+	confl_delete_missing > 0,
 	stats_reset IS NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub2_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both > 0 and stats_reset is NULL for sub '$sub2_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.)
 );
 
 # Reset all subscriptions
 $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats(NULL)));
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	confl_insert_exists = 0,
+	confl_update_origin_differs = 0,
+	confl_update_exists = 0,
+	confl_update_missing = 0,
+	confl_delete_origin_differs = 0,
+	confl_delete_missing = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
 );
 
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	confl_insert_exists = 0,
+	confl_update_origin_differs = 0,
+	confl_update_exists = 0,
+	confl_update_missing = 0,
+	confl_delete_origin_differs = 0,
+	confl_delete_missing = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub2_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
 );
 
 $reset_time1 = $node_subscriber->safe_psql($db,
-- 
2.30.0.windows.2

#13shveta malik
shveta.malik@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#12)
Re: Collect statistics about conflicts in logical replication

On Thu, Aug 29, 2024 at 11:06 AM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

Agreed. Here is new version patch which change the names as suggested. I also
rebased the patch based on another renaming commit 640178c9.

Thanks for the patch. Few minor things:

1)
conflict.h:
* This enum is used in statistics collection (see
* PgStat_StatSubEntry::conflict_count) as well, therefore, when adding new
* values or reordering existing ones, ensure to review and potentially adjust
* the corresponding statistics collection codes.

--We shall mention PgStat_BackendSubEntry as well.

026_stats.pl:
2)
# Now that the table is empty, the
# update conflict (update_existing) ERRORs will stop happening.

--Shall it be update_exists instead of update_existing here:

3)
This is an existing comment above insert_exists conflict capture:
# Wait for the apply error to be reported.

--Shall we change to:
# Wait for the subscriber to report apply error and insert_exists conflict.

thanks
Shveta

#14Peter Smith
smithpb2250@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#12)
Re: Collect statistics about conflicts in logical replication

Hi Hou-San. Here are my review comments for v4-0001.

======

1. Add links in the docs

IMO it would be good for all these confl_* descriptions (in
doc/src/sgml/monitoring.sgml) to include links back to where each of
those conflict types was defined [1]https://www.postgresql.org/docs/devel/logical-replication-conflicts.html#LOGICAL-REPLICATION-CONFLICTS.

Indeed, when links are included to the original conflict type
information then I think you should remove mentioning
"track_commit_timestamp":
+       counted only when the
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       option is enabled on the subscriber.

It should be obvious that you cannot count a conflict if the conflict
does not happen, but I don't think we should scatter/duplicate those
rules in different places saying when certain conflicts can/can't
happen -- we should just link everywhere back to the original
description for those rules.

~~~

2. Arrange all the counts into an intuitive/natural order

There is an intuitive/natural ordering for these counts. For example,
the 'confl_*' count fields are in the order insert -> update ->
delete, which LGTM.

Meanwhile, the 'apply_error_count' and the 'sync_error_count' are not
in a good order.

IMO it makes more sense if everything is ordered as:
'sync_error_count', then 'apply_error_count', then all the 'confl_*'
counts.

This comment applies to lots of places, e.g.:
- docs (doc/src/sgml/monitoring.sgml)
- function pg_stat_get_subscription_stats (pg_proc.dat)
- view pg_stat_subscription_stats (src/backend/catalog/system_views.sql)
- TAP test SELECTs (test/subscription/t/026_stats.pl)

As all those places are already impacted by this patch, I think it
would be good if (in passing) we (if possible) also swapped the
sync/apply counts so everything is ordered intuitively top-to-bottom
or left-to-right.

======
[1]: https://www.postgresql.org/docs/devel/logical-replication-conflicts.html#LOGICAL-REPLICATION-CONFLICTS

Kind Regards,
Peter Smith.
Fujitsu Australia

#15shveta malik
shveta.malik@gmail.com
In reply to: shveta malik (#13)
Re: Collect statistics about conflicts in logical replication

On Fri, Aug 30, 2024 at 9:40 AM shveta malik <shveta.malik@gmail.com> wrote:

On Thu, Aug 29, 2024 at 11:06 AM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

Agreed. Here is new version patch which change the names as suggested. I also
rebased the patch based on another renaming commit 640178c9.

Thanks for the patch. Few minor things:

1)
conflict.h:
* This enum is used in statistics collection (see
* PgStat_StatSubEntry::conflict_count) as well, therefore, when adding new
* values or reordering existing ones, ensure to review and potentially adjust
* the corresponding statistics collection codes.

--We shall mention PgStat_BackendSubEntry as well.

026_stats.pl:
2)
# Now that the table is empty, the
# update conflict (update_existing) ERRORs will stop happening.

--Shall it be update_exists instead of update_existing here:

3)
This is an existing comment above insert_exists conflict capture:
# Wait for the apply error to be reported.

--Shall we change to:
# Wait for the subscriber to report apply error and insert_exists conflict.

1) I have tested the patch, works well.
2) Verified headers inclusions, all good
3) All my comments (very old ones when the patch was initially posted)
are now addressed.

So apart from the comments I posted in [1]/messages/by-id/CAJpy0uAZpzustNOMBhxBctHHWbBA=snTAYsLpoWZg+cqegmD-A@mail.gmail.com, I have no more comments.

[1]: /messages/by-id/CAJpy0uAZpzustNOMBhxBctHHWbBA=snTAYsLpoWZg+cqegmD-A@mail.gmail.com

thanks
Shveta

#16shveta malik
shveta.malik@gmail.com
In reply to: Peter Smith (#14)
Re: Collect statistics about conflicts in logical replication

On Fri, Aug 30, 2024 at 10:53 AM Peter Smith <smithpb2250@gmail.com> wrote:

Hi Hou-San. Here are my review comments for v4-0001.

======

1. Add links in the docs

IMO it would be good for all these confl_* descriptions (in
doc/src/sgml/monitoring.sgml) to include links back to where each of
those conflict types was defined [1].

Indeed, when links are included to the original conflict type
information then I think you should remove mentioning
"track_commit_timestamp":
+       counted only when the
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       option is enabled on the subscriber.

It should be obvious that you cannot count a conflict if the conflict
does not happen, but I don't think we should scatter/duplicate those
rules in different places saying when certain conflicts can/can't
happen -- we should just link everywhere back to the original
description for those rules.

+1, will make the doc better.

~~~

2. Arrange all the counts into an intuitive/natural order

There is an intuitive/natural ordering for these counts. For example,
the 'confl_*' count fields are in the order insert -> update ->
delete, which LGTM.

Meanwhile, the 'apply_error_count' and the 'sync_error_count' are not
in a good order.

IMO it makes more sense if everything is ordered as:
'sync_error_count', then 'apply_error_count', then all the 'confl_*'
counts.

This comment applies to lots of places, e.g.:
- docs (doc/src/sgml/monitoring.sgml)
- function pg_stat_get_subscription_stats (pg_proc.dat)
- view pg_stat_subscription_stats (src/backend/catalog/system_views.sql)
- TAP test SELECTs (test/subscription/t/026_stats.pl)

As all those places are already impacted by this patch, I think it
would be good if (in passing) we (if possible) also swapped the
sync/apply counts so everything is ordered intuitively top-to-bottom
or left-to-right.

Not sure about this though. It does not seem to belong to the current patch.

thanks
Shveta

#17Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: shveta malik (#16)
1 attachment(s)
RE: Collect statistics about conflicts in logical replication

On Friday, August 30, 2024 2:24 PM shveta malik <shveta.malik@gmail.com> wrote:

On Fri, Aug 30, 2024 at 10:53 AM Peter Smith <smithpb2250@gmail.com>
wrote:

Hi Hou-San. Here are my review comments for v4-0001.

Thanks Shveta and Peter for giving comments !

======

1. Add links in the docs

IMO it would be good for all these confl_* descriptions (in
doc/src/sgml/monitoring.sgml) to include links back to where each of
those conflict types was defined [1].

Indeed, when links are included to the original conflict type
information then I think you should remove mentioning
"track_commit_timestamp":
+ counted only when the
+ <link

linkend="guc-track-commit-timestamp"><varname>track_commit_timesta
mp</varname></link>

+ option is enabled on the subscriber.

It should be obvious that you cannot count a conflict if the conflict
does not happen, but I don't think we should scatter/duplicate those
rules in different places saying when certain conflicts can/can't
happen -- we should just link everywhere back to the original
description for those rules.

+1, will make the doc better.

Changed. To add link to each conflict type, I added "<varlistentry
id="conflict-xx, xreflabel=xx" to each conflict in logical-replication.sgml.

~~~

2. Arrange all the counts into an intuitive/natural order

There is an intuitive/natural ordering for these counts. For example,
the 'confl_*' count fields are in the order insert -> update ->
delete, which LGTM.

Meanwhile, the 'apply_error_count' and the 'sync_error_count' are not
in a good order.

IMO it makes more sense if everything is ordered as:
'sync_error_count', then 'apply_error_count', then all the 'confl_*'
counts.

This comment applies to lots of places, e.g.:
- docs (doc/src/sgml/monitoring.sgml)
- function pg_stat_get_subscription_stats (pg_proc.dat)
- view pg_stat_subscription_stats
(src/backend/catalog/system_views.sql)
- TAP test SELECTs (test/subscription/t/026_stats.pl)

As all those places are already impacted by this patch, I think it
would be good if (in passing) we (if possible) also swapped the
sync/apply counts so everything is ordered intuitively top-to-bottom
or left-to-right.

Not sure about this though. It does not seem to belong to the current patch.

I also don't think we should handle that in this patch.

Here is V5 patch which addressed above and Shveta's[1]/messages/by-id/CAJpy0uAZpzustNOMBhxBctHHWbBA=snTAYsLpoWZg+cqegmD-A@mail.gmail.com comments.

[1]: /messages/by-id/CAJpy0uAZpzustNOMBhxBctHHWbBA=snTAYsLpoWZg+cqegmD-A@mail.gmail.com

Best Regards,
Hou zj

Attachments:

v5-0001-Collect-statistics-about-conflicts-in-logical-rep.patchapplication/octet-stream; name=v5-0001-Collect-statistics-about-conflicts-in-logical-rep.patchDownload
From d5b4046e876d0e6f59589964b203ea8f48f418b2 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 29 Aug 2024 10:31:32 +0800
Subject: [PATCH v5] Collect statistics about conflicts in logical replication

This commit adds columns in view pg_stat_subscription_stats to show
information about the conflict which occur during the application of
logical replication changes. Currently, the following columns are added.

confl_insert_exists:
	Number of times a row insertion violated a NOT DEFERRABLE unique constraint.
confl_update_origin_differs:
	Number of times an update was performed on a row that was previously modified by another origin.
confl_update_exists:
	Number of times that the updated value of a row violates a NOT DEFERRABLE unique constraint.
confl_update_missing:
	Number of times that the tuple to be updated is missing.
confl_delete_origin_differs:
	Number of times a delete was performed on a row that was previously modified by another origin.
confl_delete_missing:
	Number of times that the tuple to be deleted is missing.

The update_origin_differs and delete_origin_differs conflicts can be detected
only when track_commit_timestamp is enabled.
---
 doc/src/sgml/logical-replication.sgml         |  17 +-
 doc/src/sgml/monitoring.sgml                  |  77 +++++++-
 src/backend/catalog/system_views.sql          |   6 +
 src/backend/replication/logical/conflict.c    |   5 +-
 .../utils/activity/pgstat_subscription.c      |  17 ++
 src/backend/utils/adt/pgstatfuncs.c           |  33 +++-
 src/include/catalog/pg_proc.dat               |   6 +-
 src/include/pgstat.h                          |   4 +
 src/include/replication/conflict.h            |   8 +
 src/test/regress/expected/rules.out           |   8 +-
 src/test/subscription/t/026_stats.pl          | 171 ++++++++++++++++--
 11 files changed, 313 insertions(+), 39 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 46917f9f94..df62eb45ff 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1582,10 +1582,11 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
   </para>
 
   <para>
-   Additional logging is triggered in the following <firstterm>conflict</firstterm>
-   cases:
+   Additional logging is triggered, and the conflict statistics are collected (displayed in the
+   <link linkend="monitoring-pg-stat-subscription-stats"><structname>pg_stat_subscription_stats</structname></link> view)
+   in the following <firstterm>conflict</firstterm> cases:
    <variablelist>
-    <varlistentry>
+    <varlistentry id="conflict-insert-exists" xreflabel="insert_exists">
      <term><literal>insert_exists</literal></term>
      <listitem>
       <para>
@@ -1598,7 +1599,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-update-origin-differs" xreflabel="update_origin_differs">
      <term><literal>update_origin_differs</literal></term>
      <listitem>
       <para>
@@ -1610,7 +1611,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-update-exists" xreflabel="update_exists">
      <term><literal>update_exists</literal></term>
      <listitem>
       <para>
@@ -1627,7 +1628,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-update-missing" xreflabel="update_missing">
      <term><literal>update_missing</literal></term>
      <listitem>
       <para>
@@ -1636,7 +1637,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-delete-origin-differs" xreflabel="delete_origin_differs">
      <term><literal>delete_origin_differs</literal></term>
      <listitem>
       <para>
@@ -1648,7 +1649,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-delete-missing" xreflabel="delete_missing">
      <term><literal>delete_missing</literal></term>
      <listitem>
       <para>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 55417a6fa9..933de6fe07 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -507,7 +507,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_stats</structname><indexterm><primary>pg_stat_subscription_stats</primary></indexterm></entry>
-      <entry>One row per subscription, showing statistics about errors.
+      <entry>One row per subscription, showing statistics about errors and conflicts.
       See <link linkend="monitoring-pg-stat-subscription-stats">
       <structname>pg_stat_subscription_stats</structname></link> for details.
       </entry>
@@ -2157,7 +2157,10 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        <structfield>apply_error_count</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of times an error occurred while applying changes
+       Number of times an error occurred while applying changes. Note that any
+       conflict resulting in an apply error will be counted in both
+       <literal>apply_error_count</literal> and the corresponding conflict
+       count (e.g., <literal>confl_*</literal>).
       </para></entry>
      </row>
 
@@ -2171,6 +2174,76 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_insert_exists</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a row insertion violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes. See <xref linkend="conflict-insert-exists"/>
+       for details about this conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_origin_differs</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times an update was applied to a row that had been previously
+       modified by another source during the application of changes. See
+       <xref linkend="conflict-update-origin-differs"/> for details about this
+       conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_exists</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times that an updated row value violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes. See <xref linkend="conflict-update-exists"/>
+       for details about this conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_missing</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be updated was not found during the
+       application of changes. See <xref linkend="conflict-update-missing"/>
+       for details about this conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_delete_origin_differs</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a delete operation was applied to row that had been
+       previously modified by another source during the application of changes.
+       See <xref linkend="conflict-delete-origin-differs"/> for details about
+       this conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_delete_missing</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be deleted was not found during the application
+       of changes. See <xref linkend="conflict-delete-missing"/> for details
+       about this conflict.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 19cabc9a47..7fd5d256a1 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1365,6 +1365,12 @@ CREATE VIEW pg_stat_subscription_stats AS
         s.subname,
         ss.apply_error_count,
         ss.sync_error_count,
+        ss.confl_insert_exists,
+        ss.confl_update_origin_differs,
+        ss.confl_update_exists,
+        ss.confl_update_missing,
+        ss.confl_delete_origin_differs,
+        ss.confl_delete_missing,
         ss.stats_reset
     FROM pg_subscription as s,
          pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index a1437d4f77..5d9ff626bd 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -17,8 +17,9 @@
 #include "access/commit_ts.h"
 #include "access/tableam.h"
 #include "executor/executor.h"
+#include "pgstat.h"
 #include "replication/conflict.h"
-#include "replication/logicalrelation.h"
+#include "replication/worker_internal.h"
 #include "storage/lmgr.h"
 #include "utils/lsyscache.h"
 
@@ -114,6 +115,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 	Assert(!OidIsValid(indexoid) ||
 		   CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
 
+	pgstat_report_subscription_conflict(MySubscription->oid, type);
+
 	ereport(elevel,
 			errcode_apply_conflict(type),
 			errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c
index d9af8de658..e06c92727e 100644
--- a/src/backend/utils/activity/pgstat_subscription.c
+++ b/src/backend/utils/activity/pgstat_subscription.c
@@ -39,6 +39,21 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error)
 		pending->sync_error_count++;
 }
 
+/*
+ * Report a subscription conflict.
+ */
+void
+pgstat_report_subscription_conflict(Oid subid, ConflictType type)
+{
+	PgStat_EntryRef *entry_ref;
+	PgStat_BackendSubEntry *pending;
+
+	entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
+										  InvalidOid, subid, NULL);
+	pending = entry_ref->pending;
+	pending->conflict_count[type]++;
+}
+
 /*
  * Report creating the subscription.
  */
@@ -101,6 +116,8 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 #define SUB_ACC(fld) shsubent->stats.fld += localent->fld
 	SUB_ACC(apply_error_count);
 	SUB_ACC(sync_error_count);
+	for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
+		SUB_ACC(conflict_count[i]);
 #undef SUB_ACC
 
 	pgstat_unlock_entry(entry_ref);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 3221137123..97dc09ac0d 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1966,13 +1966,14 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	4
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	10
 	Oid			subid = PG_GETARG_OID(0);
 	TupleDesc	tupdesc;
 	Datum		values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
 	bool		nulls[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
 	PgStat_StatSubEntry *subentry;
 	PgStat_StatSubEntry allzero;
+	int			i = 0;
 
 	/* Get subscription stats */
 	subentry = pgstat_fetch_stat_subscription(subid);
@@ -1985,7 +1986,19 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -1997,19 +2010,25 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 	}
 
 	/* subid */
-	values[0] = ObjectIdGetDatum(subid);
+	values[i++] = ObjectIdGetDatum(subid);
 
 	/* apply_error_count */
-	values[1] = Int64GetDatum(subentry->apply_error_count);
+	values[i++] = Int64GetDatum(subentry->apply_error_count);
 
 	/* sync_error_count */
-	values[2] = Int64GetDatum(subentry->sync_error_count);
+	values[i++] = Int64GetDatum(subentry->sync_error_count);
+
+	/* conflict count */
+	for (int nconflict = 0; nconflict < CONFLICT_NUM_TYPES; nconflict++)
+		values[i++] = Int64GetDatum(subentry->conflict_count[nconflict]);
 
 	/* stats_reset */
 	if (subentry->stat_reset_timestamp == 0)
-		nulls[3] = true;
+		nulls[i] = true;
 	else
-		values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+		values[i] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+
+	Assert(i + 1 == PG_STAT_GET_SUBSCRIPTION_STATS_COLS);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 85f42be1b3..ff5436acac 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5538,9 +5538,9 @@
 { oid => '6231', descr => 'statistics: information about subscription stats',
   proname => 'pg_stat_get_subscription_stats', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o}',
-  proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}',
+  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}',
   prosrc => 'pg_stat_get_subscription_stats' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f63159c55c..be2c91168a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -15,6 +15,7 @@
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
+#include "replication/conflict.h"
 #include "utils/backend_progress.h" /* for backward compatibility */
 #include "utils/backend_status.h"	/* for backward compatibility */
 #include "utils/relcache.h"
@@ -165,6 +166,7 @@ typedef struct PgStat_BackendSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 } PgStat_BackendSubEntry;
 
 /* ----------
@@ -423,6 +425,7 @@ typedef struct PgStat_StatSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatSubEntry;
 
@@ -725,6 +728,7 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void);
  */
 
 extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
+extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type);
 extern void pgstat_create_subscription(Oid subid);
 extern void pgstat_drop_subscription(Oid subid);
 extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index ca797fb41c..f58a76fe79 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -14,6 +14,12 @@
 
 /*
  * Conflict types that could occur while applying remote changes.
+ *
+ * This enum is used in statistics collection (see
+ * PgStat_StatSubEntry::conflict_count and PgStat_StatSubEntry::conflict_count)
+ * as well, therefore, when adding new values or reordering existing ones,
+ * ensure to review and potentially adjust the corresponding statistics
+ * collection codes.
  */
 typedef enum
 {
@@ -42,6 +48,8 @@ typedef enum
 	 */
 } ConflictType;
 
+#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+
 extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
 									TransactionId *xmin,
 									RepOriginId *localorigin,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 862433ee52..a1626f3fae 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2139,9 +2139,15 @@ pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
     ss.sync_error_count,
+    ss.confl_insert_exists,
+    ss.confl_update_origin_differs,
+    ss.confl_update_exists,
+    ss.confl_update_missing,
+    ss.confl_delete_origin_differs,
+    ss.confl_delete_missing,
     ss.stats_reset
    FROM pg_subscription s,
-    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset);
+    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset);
 pg_stat_sys_indexes| SELECT relid,
     indexrelid,
     schemaname,
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index fb3e5629b3..70de5e5ac3 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -16,6 +16,15 @@ $node_publisher->start;
 # Create subscriber node.
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
+
+# Enable track_commit_timestamp to detect origin-differ conflicts in logical
+# replication. Reduce wal_retrieve_retry_interval to 1ms to accelerate the
+# restart of the logical replication worker after encountering a conflict.
+$node_subscriber->append_conf(
+	'postgresql.conf', q{
+track_commit_timestamp = on
+wal_retrieve_retry_interval = 1ms
+});
 $node_subscriber->start;
 
 
@@ -30,6 +39,7 @@ sub create_sub_pub_w_errors
 		qq[
 	BEGIN;
 	CREATE TABLE $table_name(a int);
+	ALTER TABLE $table_name REPLICA IDENTITY FULL;
 	INSERT INTO $table_name VALUES (1);
 	COMMIT;
 	]);
@@ -91,20 +101,117 @@ sub create_sub_pub_w_errors
 	# subscriber due to violation of the unique constraint on test table.
 	$node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1)));
 
-	# Wait for the apply error to be reported.
+	# Wait for the subscriber to report both an apply error and an
+	# insert_exists conflict.
 	$node_subscriber->poll_query_until(
 		$db,
 		qq[
-	SELECT apply_error_count > 0
+	SELECT apply_error_count > 0 AND confl_insert_exists > 0
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub_name'
 	])
 	  or die
-	  qq(Timed out while waiting for apply error for subscription '$sub_name');
+	  qq(Timed out while waiting for apply error and insert_exists conflict for subscription '$sub_name');
 
 	# Truncate test table so that apply worker can continue.
 	$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
 
+	# Insert a row on the subscriber.
+	$node_subscriber->safe_psql($db, qq(INSERT INTO $table_name VALUES (2)));
+
+	# Update the test table on the publisher. This operation will raise an
+	# error on the subscriber due to a violation of the unique constraint on
+	# the test table.
+	$node_publisher->safe_psql($db, qq(UPDATE $table_name SET a = 2;));
+
+	# Wait for the subscriber to report an update_exists conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT confl_update_exists > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_exists conflict for subscription '$sub_name');
+
+	# Truncate the subscriber side test table. Now that the table is empty, the
+	# update conflict (update_exists) ERRORs will stop happening. A single
+	# update_missing conflict will be reported, but the update will be skipped
+	# on the subscriber, allowing the test to continue.
+    $node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
+
+	# Wait for the subscriber to report update_missing conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT confl_update_missing > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_missing conflict for subscription '$sub_name');
+
+	# Delete data from the test table on the publisher. This delete operation
+	# should be skipped on the subscriber since the table is already empty.
+	$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+	# Wait for the subscriber to report a delete_missing conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT confl_delete_missing > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for delete_missing conflict for subscription '$sub_name');
+
+	# Prepare data for further tests.
+	$node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1)));
+	$node_publisher->wait_for_catchup($sub_name);
+	$node_subscriber->safe_psql($db, qq(
+		TRUNCATE $table_name;
+		INSERT INTO $table_name VALUES (1);
+	));
+
+	# Update the data in the test table on the publisher. This should generate
+	# a conflict because it causes subscriber to attempt to update a row that has
+	# been modified by a different origin.
+	$node_publisher->safe_psql($db, qq(UPDATE $table_name SET a = 2;));
+
+	# Wait for the subscriber to report an update_origin_differs conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT confl_update_origin_differs > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_origin_differs conflict for subscription '$sub_name');
+
+	# Prepare data for further tests.
+	$node_subscriber->safe_psql($db, qq(
+		TRUNCATE $table_name;
+		INSERT INTO $table_name VALUES (2);
+	));
+
+	# Delete data from the test table on the publisher. This should generate a
+	# conflict because it causes subscriber to attempt to delete a row that has
+	# been modified by a different origin.
+	$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT confl_delete_origin_differs > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for delete_origin_differs conflict for subscription '$sub_name');
+
 	return ($pub_name, $sub_name);
 }
 
@@ -123,17 +230,23 @@ my ($pub1_name, $sub1_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table1_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
 	sync_error_count > 0,
+	confl_insert_exists > 0,
+	confl_update_origin_differs > 0,
+	confl_update_exists > 0,
+	confl_update_missing > 0,
+	confl_delete_origin_differs > 0,
+	confl_delete_missing > 0,
 	stats_reset IS NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Check that apply errors and sync errors are both > 0 and stats_reset is NULL for subscription '$sub1_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.)
 );
 
 # Reset a single subscription
@@ -141,17 +254,23 @@ $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
 );
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	confl_insert_exists = 0,
+	confl_update_origin_differs = 0,
+	confl_update_exists = 0,
+	confl_update_missing = 0,
+	confl_delete_origin_differs = 0,
+	confl_delete_missing = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
 );
 
 # Get reset timestamp
@@ -181,46 +300,64 @@ my ($pub2_name, $sub2_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table2_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
 	sync_error_count > 0,
+	confl_insert_exists > 0,
+	confl_update_origin_differs > 0,
+	confl_update_exists > 0,
+	confl_update_missing > 0,
+	confl_delete_origin_differs > 0,
+	confl_delete_missing > 0,
 	stats_reset IS NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub2_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both > 0 and stats_reset is NULL for sub '$sub2_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.)
 );
 
 # Reset all subscriptions
 $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats(NULL)));
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	confl_insert_exists = 0,
+	confl_update_origin_differs = 0,
+	confl_update_exists = 0,
+	confl_update_missing = 0,
+	confl_delete_origin_differs = 0,
+	confl_delete_missing = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
 );
 
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	confl_insert_exists = 0,
+	confl_update_origin_differs = 0,
+	confl_update_exists = 0,
+	confl_update_missing = 0,
+	confl_delete_origin_differs = 0,
+	confl_delete_missing = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub2_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
 );
 
 $reset_time1 = $node_subscriber->safe_psql($db,
-- 
2.30.0.windows.2

#18shveta malik
shveta.malik@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#17)
Re: Collect statistics about conflicts in logical replication

On Fri, Aug 30, 2024 at 12:15 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

Here is V5 patch which addressed above and Shveta's[1] comments.

The patch looks good to me.

thanks
Shveta

#19Peter Smith
smithpb2250@gmail.com
In reply to: shveta malik (#18)
Re: Collect statistics about conflicts in logical replication

On Fri, Aug 30, 2024 at 6:36 PM shveta malik <shveta.malik@gmail.com> wrote:

On Fri, Aug 30, 2024 at 12:15 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

Here is V5 patch which addressed above and Shveta's[1] comments.

The patch looks good to me.

Patch v5 LGTM.

======
Kind Regards,
Peter Smith.
Fujitsu Australia

#20Peter Smith
smithpb2250@gmail.com
In reply to: shveta malik (#16)
Re: Collect statistics about conflicts in logical replication

On Fri, Aug 30, 2024 at 4:24 PM shveta malik <shveta.malik@gmail.com> wrote:

On Fri, Aug 30, 2024 at 10:53 AM Peter Smith <smithpb2250@gmail.com> wrote:

...

2. Arrange all the counts into an intuitive/natural order

There is an intuitive/natural ordering for these counts. For example,
the 'confl_*' count fields are in the order insert -> update ->
delete, which LGTM.

Meanwhile, the 'apply_error_count' and the 'sync_error_count' are not
in a good order.

IMO it makes more sense if everything is ordered as:
'sync_error_count', then 'apply_error_count', then all the 'confl_*'
counts.

This comment applies to lots of places, e.g.:
- docs (doc/src/sgml/monitoring.sgml)
- function pg_stat_get_subscription_stats (pg_proc.dat)
- view pg_stat_subscription_stats (src/backend/catalog/system_views.sql)
- TAP test SELECTs (test/subscription/t/026_stats.pl)

As all those places are already impacted by this patch, I think it
would be good if (in passing) we (if possible) also swapped the
sync/apply counts so everything is ordered intuitively top-to-bottom
or left-to-right.

Not sure about this though. It does not seem to belong to the current patch.

Fair enough. But, besides being inappropriate to include in the
current patch, do you think the suggestion to reorder them made sense?
If it has some merit, then I will propose it again as a separate
thread.

======
Kind Regards,
Peter Smith.
Fujitsu Australia

#21shveta malik
shveta.malik@gmail.com
In reply to: Peter Smith (#20)
Re: Collect statistics about conflicts in logical replication

On Mon, Sep 2, 2024 at 4:20 AM Peter Smith <smithpb2250@gmail.com> wrote:

On Fri, Aug 30, 2024 at 4:24 PM shveta malik <shveta.malik@gmail.com> wrote:

On Fri, Aug 30, 2024 at 10:53 AM Peter Smith <smithpb2250@gmail.com> wrote:

...

2. Arrange all the counts into an intuitive/natural order

There is an intuitive/natural ordering for these counts. For example,
the 'confl_*' count fields are in the order insert -> update ->
delete, which LGTM.

Meanwhile, the 'apply_error_count' and the 'sync_error_count' are not
in a good order.

IMO it makes more sense if everything is ordered as:
'sync_error_count', then 'apply_error_count', then all the 'confl_*'
counts.

This comment applies to lots of places, e.g.:
- docs (doc/src/sgml/monitoring.sgml)
- function pg_stat_get_subscription_stats (pg_proc.dat)
- view pg_stat_subscription_stats (src/backend/catalog/system_views.sql)
- TAP test SELECTs (test/subscription/t/026_stats.pl)

As all those places are already impacted by this patch, I think it
would be good if (in passing) we (if possible) also swapped the
sync/apply counts so everything is ordered intuitively top-to-bottom
or left-to-right.

Not sure about this though. It does not seem to belong to the current patch.

Fair enough. But, besides being inappropriate to include in the
current patch, do you think the suggestion to reorder them made sense?
If it has some merit, then I will propose it again as a separate
thread.

Yes, I think it makes sense. With respect to internal code, it might
be still okay as is, but when it comes to pg_stat_subscription_stats,
I think it is better if user finds it in below order:
subid | subname | sync_error_count | apply_error_count | confl_*

rather than the existing one:
subid | subname | apply_error_count | sync_error_count | confl_*

thanks
Shveta

#22Peter Smith
smithpb2250@gmail.com
In reply to: shveta malik (#21)
Re: Collect statistics about conflicts in logical replication

On Mon, Sep 2, 2024 at 1:28 PM shveta malik <shveta.malik@gmail.com> wrote:

On Mon, Sep 2, 2024 at 4:20 AM Peter Smith <smithpb2250@gmail.com> wrote:

On Fri, Aug 30, 2024 at 4:24 PM shveta malik <shveta.malik@gmail.com> wrote:

On Fri, Aug 30, 2024 at 10:53 AM Peter Smith <smithpb2250@gmail.com> wrote:

...

2. Arrange all the counts into an intuitive/natural order

There is an intuitive/natural ordering for these counts. For example,
the 'confl_*' count fields are in the order insert -> update ->
delete, which LGTM.

Meanwhile, the 'apply_error_count' and the 'sync_error_count' are not
in a good order.

IMO it makes more sense if everything is ordered as:
'sync_error_count', then 'apply_error_count', then all the 'confl_*'
counts.

This comment applies to lots of places, e.g.:
- docs (doc/src/sgml/monitoring.sgml)
- function pg_stat_get_subscription_stats (pg_proc.dat)
- view pg_stat_subscription_stats (src/backend/catalog/system_views.sql)
- TAP test SELECTs (test/subscription/t/026_stats.pl)

As all those places are already impacted by this patch, I think it
would be good if (in passing) we (if possible) also swapped the
sync/apply counts so everything is ordered intuitively top-to-bottom
or left-to-right.

Not sure about this though. It does not seem to belong to the current patch.

Fair enough. But, besides being inappropriate to include in the
current patch, do you think the suggestion to reorder them made sense?
If it has some merit, then I will propose it again as a separate
thread.

Yes, I think it makes sense. With respect to internal code, it might
be still okay as is, but when it comes to pg_stat_subscription_stats,
I think it is better if user finds it in below order:
subid | subname | sync_error_count | apply_error_count | confl_*

rather than the existing one:
subid | subname | apply_error_count | sync_error_count | confl_*

Hi Shveta, Thanks. FYI - I created a new thread for this here [1]/messages/by-id/CAHut+PvbOw90wgGF4aV1HyYtX=6pjWc+pn8_fep7L=aLXwXkqg@mail.gmail.com.

======
[1]: /messages/by-id/CAHut+PvbOw90wgGF4aV1HyYtX=6pjWc+pn8_fep7L=aLXwXkqg@mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia

#23Amit Kapila
amit.kapila16@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#17)
Re: Collect statistics about conflicts in logical replication

On Fri, Aug 30, 2024 at 12:15 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

Here is V5 patch which addressed above and Shveta's[1] comments.

Testing the stats for all types of conflicts is not required for this
patch especially because they increase the timings by 3-4s. We can add
tests for one or two types of conflicts.

*
(see
+ * PgStat_StatSubEntry::conflict_count and PgStat_StatSubEntry::conflict_count)

There is a typo in the above comment.

--
With Regards,
Amit Kapila.

#24Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: Amit Kapila (#23)
1 attachment(s)
RE: Collect statistics about conflicts in logical replication

On Tuesday, September 3, 2024 7:12 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Fri, Aug 30, 2024 at 12:15 PM Zhijie Hou (Fujitsu) <houzj.fnst@fujitsu.com>
wrote:

Here is V5 patch which addressed above and Shveta's[1] comments.

Testing the stats for all types of conflicts is not required for this patch
especially because they increase the timings by 3-4s. We can add tests for one
or two types of conflicts.

*
(see
+ * PgStat_StatSubEntry::conflict_count and
+ PgStat_StatSubEntry::conflict_count)

There is a typo in the above comment.

Thanks for the comments. I have addressed the comments and adjusted the tests.
In the V6 patch, Only insert_exists and delete_missing are tested.

I confirmed that it only increased the testing time by 1 second on my machine.

Best Regards,
Hou zj

Attachments:

v6-0001-Collect-statistics-about-conflicts-in-logical-rep.patchapplication/octet-stream; name=v6-0001-Collect-statistics-about-conflicts-in-logical-rep.patchDownload
From b60cf8760ec53500e6001ae09c1ad25422481802 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 29 Aug 2024 10:31:32 +0800
Subject: [PATCH v6] Collect statistics about conflicts in logical replication

This commit adds columns in view pg_stat_subscription_stats to show
information about the conflict which occur during the application of
logical replication changes. Currently, the following columns are added.

confl_insert_exists:
	Number of times a row insertion violated a NOT DEFERRABLE unique constraint.
confl_update_origin_differs:
	Number of times an update was performed on a row that was previously modified by another origin.
confl_update_exists:
	Number of times that the updated value of a row violates a NOT DEFERRABLE unique constraint.
confl_update_missing:
	Number of times that the tuple to be updated is missing.
confl_delete_origin_differs:
	Number of times a delete was performed on a row that was previously modified by another origin.
confl_delete_missing:
	Number of times that the tuple to be deleted is missing.

The update_origin_differs and delete_origin_differs conflicts can be detected
only when track_commit_timestamp is enabled.
---
 doc/src/sgml/logical-replication.sgml         | 17 ++--
 doc/src/sgml/monitoring.sgml                  | 77 ++++++++++++++++++-
 src/backend/catalog/system_views.sql          |  6 ++
 src/backend/replication/logical/conflict.c    |  5 +-
 .../utils/activity/pgstat_subscription.c      | 17 ++++
 src/backend/utils/adt/pgstatfuncs.c           | 33 ++++++--
 src/include/catalog/pg_proc.dat               |  6 +-
 src/include/pgstat.h                          |  4 +
 src/include/replication/conflict.h            |  8 ++
 src/test/regress/expected/rules.out           |  8 +-
 src/test/subscription/t/026_stats.pl          | 67 +++++++++++-----
 11 files changed, 208 insertions(+), 40 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 46917f9f94..df62eb45ff 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1582,10 +1582,11 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
   </para>
 
   <para>
-   Additional logging is triggered in the following <firstterm>conflict</firstterm>
-   cases:
+   Additional logging is triggered, and the conflict statistics are collected (displayed in the
+   <link linkend="monitoring-pg-stat-subscription-stats"><structname>pg_stat_subscription_stats</structname></link> view)
+   in the following <firstterm>conflict</firstterm> cases:
    <variablelist>
-    <varlistentry>
+    <varlistentry id="conflict-insert-exists" xreflabel="insert_exists">
      <term><literal>insert_exists</literal></term>
      <listitem>
       <para>
@@ -1598,7 +1599,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-update-origin-differs" xreflabel="update_origin_differs">
      <term><literal>update_origin_differs</literal></term>
      <listitem>
       <para>
@@ -1610,7 +1611,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-update-exists" xreflabel="update_exists">
      <term><literal>update_exists</literal></term>
      <listitem>
       <para>
@@ -1627,7 +1628,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-update-missing" xreflabel="update_missing">
      <term><literal>update_missing</literal></term>
      <listitem>
       <para>
@@ -1636,7 +1637,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-delete-origin-differs" xreflabel="delete_origin_differs">
      <term><literal>delete_origin_differs</literal></term>
      <listitem>
       <para>
@@ -1648,7 +1649,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-delete-missing" xreflabel="delete_missing">
      <term><literal>delete_missing</literal></term>
      <listitem>
       <para>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 55417a6fa9..933de6fe07 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -507,7 +507,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_stats</structname><indexterm><primary>pg_stat_subscription_stats</primary></indexterm></entry>
-      <entry>One row per subscription, showing statistics about errors.
+      <entry>One row per subscription, showing statistics about errors and conflicts.
       See <link linkend="monitoring-pg-stat-subscription-stats">
       <structname>pg_stat_subscription_stats</structname></link> for details.
       </entry>
@@ -2157,7 +2157,10 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        <structfield>apply_error_count</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of times an error occurred while applying changes
+       Number of times an error occurred while applying changes. Note that any
+       conflict resulting in an apply error will be counted in both
+       <literal>apply_error_count</literal> and the corresponding conflict
+       count (e.g., <literal>confl_*</literal>).
       </para></entry>
      </row>
 
@@ -2171,6 +2174,76 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_insert_exists</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a row insertion violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes. See <xref linkend="conflict-insert-exists"/>
+       for details about this conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_origin_differs</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times an update was applied to a row that had been previously
+       modified by another source during the application of changes. See
+       <xref linkend="conflict-update-origin-differs"/> for details about this
+       conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_exists</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times that an updated row value violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes. See <xref linkend="conflict-update-exists"/>
+       for details about this conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_missing</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be updated was not found during the
+       application of changes. See <xref linkend="conflict-update-missing"/>
+       for details about this conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_delete_origin_differs</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a delete operation was applied to row that had been
+       previously modified by another source during the application of changes.
+       See <xref linkend="conflict-delete-origin-differs"/> for details about
+       this conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_delete_missing</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be deleted was not found during the application
+       of changes. See <xref linkend="conflict-delete-missing"/> for details
+       about this conflict.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 19cabc9a47..7fd5d256a1 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1365,6 +1365,12 @@ CREATE VIEW pg_stat_subscription_stats AS
         s.subname,
         ss.apply_error_count,
         ss.sync_error_count,
+        ss.confl_insert_exists,
+        ss.confl_update_origin_differs,
+        ss.confl_update_exists,
+        ss.confl_update_missing,
+        ss.confl_delete_origin_differs,
+        ss.confl_delete_missing,
         ss.stats_reset
     FROM pg_subscription as s,
          pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index a1437d4f77..5d9ff626bd 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -17,8 +17,9 @@
 #include "access/commit_ts.h"
 #include "access/tableam.h"
 #include "executor/executor.h"
+#include "pgstat.h"
 #include "replication/conflict.h"
-#include "replication/logicalrelation.h"
+#include "replication/worker_internal.h"
 #include "storage/lmgr.h"
 #include "utils/lsyscache.h"
 
@@ -114,6 +115,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 	Assert(!OidIsValid(indexoid) ||
 		   CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
 
+	pgstat_report_subscription_conflict(MySubscription->oid, type);
+
 	ereport(elevel,
 			errcode_apply_conflict(type),
 			errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c
index d9af8de658..e06c92727e 100644
--- a/src/backend/utils/activity/pgstat_subscription.c
+++ b/src/backend/utils/activity/pgstat_subscription.c
@@ -39,6 +39,21 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error)
 		pending->sync_error_count++;
 }
 
+/*
+ * Report a subscription conflict.
+ */
+void
+pgstat_report_subscription_conflict(Oid subid, ConflictType type)
+{
+	PgStat_EntryRef *entry_ref;
+	PgStat_BackendSubEntry *pending;
+
+	entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
+										  InvalidOid, subid, NULL);
+	pending = entry_ref->pending;
+	pending->conflict_count[type]++;
+}
+
 /*
  * Report creating the subscription.
  */
@@ -101,6 +116,8 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 #define SUB_ACC(fld) shsubent->stats.fld += localent->fld
 	SUB_ACC(apply_error_count);
 	SUB_ACC(sync_error_count);
+	for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
+		SUB_ACC(conflict_count[i]);
 #undef SUB_ACC
 
 	pgstat_unlock_entry(entry_ref);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 3221137123..97dc09ac0d 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1966,13 +1966,14 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	4
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	10
 	Oid			subid = PG_GETARG_OID(0);
 	TupleDesc	tupdesc;
 	Datum		values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
 	bool		nulls[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
 	PgStat_StatSubEntry *subentry;
 	PgStat_StatSubEntry allzero;
+	int			i = 0;
 
 	/* Get subscription stats */
 	subentry = pgstat_fetch_stat_subscription(subid);
@@ -1985,7 +1986,19 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -1997,19 +2010,25 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 	}
 
 	/* subid */
-	values[0] = ObjectIdGetDatum(subid);
+	values[i++] = ObjectIdGetDatum(subid);
 
 	/* apply_error_count */
-	values[1] = Int64GetDatum(subentry->apply_error_count);
+	values[i++] = Int64GetDatum(subentry->apply_error_count);
 
 	/* sync_error_count */
-	values[2] = Int64GetDatum(subentry->sync_error_count);
+	values[i++] = Int64GetDatum(subentry->sync_error_count);
+
+	/* conflict count */
+	for (int nconflict = 0; nconflict < CONFLICT_NUM_TYPES; nconflict++)
+		values[i++] = Int64GetDatum(subentry->conflict_count[nconflict]);
 
 	/* stats_reset */
 	if (subentry->stat_reset_timestamp == 0)
-		nulls[3] = true;
+		nulls[i] = true;
 	else
-		values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+		values[i] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+
+	Assert(i + 1 == PG_STAT_GET_SUBSCRIPTION_STATS_COLS);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 85f42be1b3..ff5436acac 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5538,9 +5538,9 @@
 { oid => '6231', descr => 'statistics: information about subscription stats',
   proname => 'pg_stat_get_subscription_stats', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o}',
-  proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}',
+  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}',
   prosrc => 'pg_stat_get_subscription_stats' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f63159c55c..be2c91168a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -15,6 +15,7 @@
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
+#include "replication/conflict.h"
 #include "utils/backend_progress.h" /* for backward compatibility */
 #include "utils/backend_status.h"	/* for backward compatibility */
 #include "utils/relcache.h"
@@ -165,6 +166,7 @@ typedef struct PgStat_BackendSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 } PgStat_BackendSubEntry;
 
 /* ----------
@@ -423,6 +425,7 @@ typedef struct PgStat_StatSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatSubEntry;
 
@@ -725,6 +728,7 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void);
  */
 
 extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
+extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type);
 extern void pgstat_create_subscription(Oid subid);
 extern void pgstat_drop_subscription(Oid subid);
 extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index ca797fb41c..c759677ff5 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -14,6 +14,12 @@
 
 /*
  * Conflict types that could occur while applying remote changes.
+ *
+ * This enum is used in statistics collection (see
+ * PgStat_StatSubEntry::conflict_count and
+ * PgStat_BackendSubEntry::conflict_count) as well, therefore, when adding new
+ * values or reordering existing ones, ensure to review and potentially adjust
+ * the corresponding statistics collection codes.
  */
 typedef enum
 {
@@ -42,6 +48,8 @@ typedef enum
 	 */
 } ConflictType;
 
+#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+
 extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
 									TransactionId *xmin,
 									RepOriginId *localorigin,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 862433ee52..a1626f3fae 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2139,9 +2139,15 @@ pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
     ss.sync_error_count,
+    ss.confl_insert_exists,
+    ss.confl_update_origin_differs,
+    ss.confl_update_exists,
+    ss.confl_update_missing,
+    ss.confl_delete_origin_differs,
+    ss.confl_delete_missing,
     ss.stats_reset
    FROM pg_subscription s,
-    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset);
+    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset);
 pg_stat_sys_indexes| SELECT relid,
     indexrelid,
     schemaname,
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index fb3e5629b3..513ab6b00c 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -16,8 +16,12 @@ $node_publisher->start;
 # Create subscriber node.
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
-$node_subscriber->start;
 
+# Enable track_commit_timestamp to detect origin-differ conflicts in logical
+# replication.
+$node_subscriber->append_conf('postgresql.conf',
+	'track_commit_timestamp = on');
+$node_subscriber->start;
 
 sub create_sub_pub_w_errors
 {
@@ -30,6 +34,7 @@ sub create_sub_pub_w_errors
 		qq[
 	BEGIN;
 	CREATE TABLE $table_name(a int);
+	ALTER TABLE $table_name REPLICA IDENTITY FULL;
 	INSERT INTO $table_name VALUES (1);
 	COMMIT;
 	]);
@@ -91,20 +96,36 @@ sub create_sub_pub_w_errors
 	# subscriber due to violation of the unique constraint on test table.
 	$node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1)));
 
-	# Wait for the apply error to be reported.
+	# Wait for the subscriber to report both an apply error and an
+	# insert_exists conflict.
 	$node_subscriber->poll_query_until(
 		$db,
 		qq[
-	SELECT apply_error_count > 0
+	SELECT apply_error_count > 0 AND confl_insert_exists > 0
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub_name'
 	])
 	  or die
-	  qq(Timed out while waiting for apply error for subscription '$sub_name');
+	  qq(Timed out while waiting for apply error and insert_exists conflict for subscription '$sub_name');
 
 	# Truncate test table so that apply worker can continue.
 	$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
 
+	# Delete data from the test table on the publisher. This delete operation
+	# should be skipped on the subscriber since the table is already empty.
+	$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+	# Wait for the subscriber to report a delete_missing conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT confl_delete_missing > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for delete_missing conflict for subscription '$sub_name');
+
 	return ($pub_name, $sub_name);
 }
 
@@ -123,17 +144,19 @@ my ($pub1_name, $sub1_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table1_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
 	sync_error_count > 0,
+	confl_insert_exists > 0,
+	confl_delete_missing > 0,
 	stats_reset IS NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Check that apply errors and sync errors are both > 0 and stats_reset is NULL for subscription '$sub1_name'.)
+	qq(t|t|t|t|t),
+	qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.)
 );
 
 # Reset a single subscription
@@ -141,17 +164,19 @@ $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
 );
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	confl_insert_exists = 0,
+	confl_delete_missing = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
+	qq(t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
 );
 
 # Get reset timestamp
@@ -181,46 +206,52 @@ my ($pub2_name, $sub2_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table2_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
 	sync_error_count > 0,
+	confl_insert_exists > 0,
+	confl_delete_missing > 0,
 	stats_reset IS NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub2_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both > 0 and stats_reset is NULL for sub '$sub2_name'.)
+	qq(t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.)
 );
 
 # Reset all subscriptions
 $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats(NULL)));
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	confl_insert_exists = 0,
+	confl_delete_missing = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
+	qq(t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
 );
 
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	confl_insert_exists = 0,
+	confl_delete_missing = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub2_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
+	qq(t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
 );
 
 $reset_time1 = $node_subscriber->safe_psql($db,
-- 
2.30.0.windows.2

#25Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: Zhijie Hou (Fujitsu) (#24)
1 attachment(s)
RE: Collect statistics about conflicts in logical replication

On Tuesday, September 3, 2024 7:23 PM Zhijie Hou (Fujitsu) <houzj.fnst@fujitsu.com> wrote:

On Tuesday, September 3, 2024 7:12 PM Amit Kapila
<amit.kapila16@gmail.com> wrote:

On Fri, Aug 30, 2024 at 12:15 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com>
wrote:

Here is V5 patch which addressed above and Shveta's[1] comments.

Testing the stats for all types of conflicts is not required for this
patch especially because they increase the timings by 3-4s. We can add
tests for one or two types of conflicts.

*
(see
+ * PgStat_StatSubEntry::conflict_count and
+ PgStat_StatSubEntry::conflict_count)

There is a typo in the above comment.

Thanks for the comments. I have addressed the comments and adjusted the
tests.
In the V6 patch, Only insert_exists and delete_missing are tested.

I confirmed that it only increased the testing time by 1 second on my machine.

Sorry, I sent the wrong patch in last email, please refer to the correct patch here.

Best Regards,
Hou zj

Attachments:

v6_2-0001-Collect-statistics-about-conflicts-in-logical-rep.patchapplication/octet-stream; name=v6_2-0001-Collect-statistics-about-conflicts-in-logical-rep.patchDownload
From cedb2b3fafc57649b53e7013c08d36af10059f1e Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 29 Aug 2024 10:31:32 +0800
Subject: [PATCH v6] Collect statistics about conflicts in logical replication

This commit adds columns in view pg_stat_subscription_stats to show
information about the conflict which occur during the application of
logical replication changes. Currently, the following columns are added.

confl_insert_exists:
	Number of times a row insertion violated a NOT DEFERRABLE unique constraint.
confl_update_origin_differs:
	Number of times an update was performed on a row that was previously modified by another origin.
confl_update_exists:
	Number of times that the updated value of a row violates a NOT DEFERRABLE unique constraint.
confl_update_missing:
	Number of times that the tuple to be updated is missing.
confl_delete_origin_differs:
	Number of times a delete was performed on a row that was previously modified by another origin.
confl_delete_missing:
	Number of times that the tuple to be deleted is missing.

The update_origin_differs and delete_origin_differs conflicts can be detected
only when track_commit_timestamp is enabled.
---
 doc/src/sgml/logical-replication.sgml         | 17 ++--
 doc/src/sgml/monitoring.sgml                  | 77 ++++++++++++++++++-
 src/backend/catalog/system_views.sql          |  6 ++
 src/backend/replication/logical/conflict.c    |  5 +-
 .../utils/activity/pgstat_subscription.c      | 17 ++++
 src/backend/utils/adt/pgstatfuncs.c           | 33 ++++++--
 src/include/catalog/pg_proc.dat               |  6 +-
 src/include/pgstat.h                          |  4 +
 src/include/replication/conflict.h            |  8 ++
 src/test/regress/expected/rules.out           |  8 +-
 src/test/subscription/t/026_stats.pl          | 61 +++++++++++----
 11 files changed, 203 insertions(+), 39 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 46917f9f94..df62eb45ff 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1582,10 +1582,11 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
   </para>
 
   <para>
-   Additional logging is triggered in the following <firstterm>conflict</firstterm>
-   cases:
+   Additional logging is triggered, and the conflict statistics are collected (displayed in the
+   <link linkend="monitoring-pg-stat-subscription-stats"><structname>pg_stat_subscription_stats</structname></link> view)
+   in the following <firstterm>conflict</firstterm> cases:
    <variablelist>
-    <varlistentry>
+    <varlistentry id="conflict-insert-exists" xreflabel="insert_exists">
      <term><literal>insert_exists</literal></term>
      <listitem>
       <para>
@@ -1598,7 +1599,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-update-origin-differs" xreflabel="update_origin_differs">
      <term><literal>update_origin_differs</literal></term>
      <listitem>
       <para>
@@ -1610,7 +1611,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-update-exists" xreflabel="update_exists">
      <term><literal>update_exists</literal></term>
      <listitem>
       <para>
@@ -1627,7 +1628,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-update-missing" xreflabel="update_missing">
      <term><literal>update_missing</literal></term>
      <listitem>
       <para>
@@ -1636,7 +1637,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-delete-origin-differs" xreflabel="delete_origin_differs">
      <term><literal>delete_origin_differs</literal></term>
      <listitem>
       <para>
@@ -1648,7 +1649,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-delete-missing" xreflabel="delete_missing">
      <term><literal>delete_missing</literal></term>
      <listitem>
       <para>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 55417a6fa9..933de6fe07 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -507,7 +507,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_stats</structname><indexterm><primary>pg_stat_subscription_stats</primary></indexterm></entry>
-      <entry>One row per subscription, showing statistics about errors.
+      <entry>One row per subscription, showing statistics about errors and conflicts.
       See <link linkend="monitoring-pg-stat-subscription-stats">
       <structname>pg_stat_subscription_stats</structname></link> for details.
       </entry>
@@ -2157,7 +2157,10 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        <structfield>apply_error_count</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of times an error occurred while applying changes
+       Number of times an error occurred while applying changes. Note that any
+       conflict resulting in an apply error will be counted in both
+       <literal>apply_error_count</literal> and the corresponding conflict
+       count (e.g., <literal>confl_*</literal>).
       </para></entry>
      </row>
 
@@ -2171,6 +2174,76 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_insert_exists</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a row insertion violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes. See <xref linkend="conflict-insert-exists"/>
+       for details about this conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_origin_differs</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times an update was applied to a row that had been previously
+       modified by another source during the application of changes. See
+       <xref linkend="conflict-update-origin-differs"/> for details about this
+       conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_exists</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times that an updated row value violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes. See <xref linkend="conflict-update-exists"/>
+       for details about this conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_missing</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be updated was not found during the
+       application of changes. See <xref linkend="conflict-update-missing"/>
+       for details about this conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_delete_origin_differs</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a delete operation was applied to row that had been
+       previously modified by another source during the application of changes.
+       See <xref linkend="conflict-delete-origin-differs"/> for details about
+       this conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_delete_missing</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be deleted was not found during the application
+       of changes. See <xref linkend="conflict-delete-missing"/> for details
+       about this conflict.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 19cabc9a47..7fd5d256a1 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1365,6 +1365,12 @@ CREATE VIEW pg_stat_subscription_stats AS
         s.subname,
         ss.apply_error_count,
         ss.sync_error_count,
+        ss.confl_insert_exists,
+        ss.confl_update_origin_differs,
+        ss.confl_update_exists,
+        ss.confl_update_missing,
+        ss.confl_delete_origin_differs,
+        ss.confl_delete_missing,
         ss.stats_reset
     FROM pg_subscription as s,
          pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index a1437d4f77..5d9ff626bd 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -17,8 +17,9 @@
 #include "access/commit_ts.h"
 #include "access/tableam.h"
 #include "executor/executor.h"
+#include "pgstat.h"
 #include "replication/conflict.h"
-#include "replication/logicalrelation.h"
+#include "replication/worker_internal.h"
 #include "storage/lmgr.h"
 #include "utils/lsyscache.h"
 
@@ -114,6 +115,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 	Assert(!OidIsValid(indexoid) ||
 		   CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
 
+	pgstat_report_subscription_conflict(MySubscription->oid, type);
+
 	ereport(elevel,
 			errcode_apply_conflict(type),
 			errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c
index d9af8de658..e06c92727e 100644
--- a/src/backend/utils/activity/pgstat_subscription.c
+++ b/src/backend/utils/activity/pgstat_subscription.c
@@ -39,6 +39,21 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error)
 		pending->sync_error_count++;
 }
 
+/*
+ * Report a subscription conflict.
+ */
+void
+pgstat_report_subscription_conflict(Oid subid, ConflictType type)
+{
+	PgStat_EntryRef *entry_ref;
+	PgStat_BackendSubEntry *pending;
+
+	entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
+										  InvalidOid, subid, NULL);
+	pending = entry_ref->pending;
+	pending->conflict_count[type]++;
+}
+
 /*
  * Report creating the subscription.
  */
@@ -101,6 +116,8 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 #define SUB_ACC(fld) shsubent->stats.fld += localent->fld
 	SUB_ACC(apply_error_count);
 	SUB_ACC(sync_error_count);
+	for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
+		SUB_ACC(conflict_count[i]);
 #undef SUB_ACC
 
 	pgstat_unlock_entry(entry_ref);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 3221137123..97dc09ac0d 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1966,13 +1966,14 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	4
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	10
 	Oid			subid = PG_GETARG_OID(0);
 	TupleDesc	tupdesc;
 	Datum		values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
 	bool		nulls[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
 	PgStat_StatSubEntry *subentry;
 	PgStat_StatSubEntry allzero;
+	int			i = 0;
 
 	/* Get subscription stats */
 	subentry = pgstat_fetch_stat_subscription(subid);
@@ -1985,7 +1986,19 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -1997,19 +2010,25 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 	}
 
 	/* subid */
-	values[0] = ObjectIdGetDatum(subid);
+	values[i++] = ObjectIdGetDatum(subid);
 
 	/* apply_error_count */
-	values[1] = Int64GetDatum(subentry->apply_error_count);
+	values[i++] = Int64GetDatum(subentry->apply_error_count);
 
 	/* sync_error_count */
-	values[2] = Int64GetDatum(subentry->sync_error_count);
+	values[i++] = Int64GetDatum(subentry->sync_error_count);
+
+	/* conflict count */
+	for (int nconflict = 0; nconflict < CONFLICT_NUM_TYPES; nconflict++)
+		values[i++] = Int64GetDatum(subentry->conflict_count[nconflict]);
 
 	/* stats_reset */
 	if (subentry->stat_reset_timestamp == 0)
-		nulls[3] = true;
+		nulls[i] = true;
 	else
-		values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+		values[i] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+
+	Assert(i + 1 == PG_STAT_GET_SUBSCRIPTION_STATS_COLS);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 85f42be1b3..ff5436acac 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5538,9 +5538,9 @@
 { oid => '6231', descr => 'statistics: information about subscription stats',
   proname => 'pg_stat_get_subscription_stats', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o}',
-  proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}',
+  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}',
   prosrc => 'pg_stat_get_subscription_stats' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f63159c55c..be2c91168a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -15,6 +15,7 @@
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
+#include "replication/conflict.h"
 #include "utils/backend_progress.h" /* for backward compatibility */
 #include "utils/backend_status.h"	/* for backward compatibility */
 #include "utils/relcache.h"
@@ -165,6 +166,7 @@ typedef struct PgStat_BackendSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 } PgStat_BackendSubEntry;
 
 /* ----------
@@ -423,6 +425,7 @@ typedef struct PgStat_StatSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatSubEntry;
 
@@ -725,6 +728,7 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void);
  */
 
 extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
+extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type);
 extern void pgstat_create_subscription(Oid subid);
 extern void pgstat_drop_subscription(Oid subid);
 extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index ca797fb41c..c759677ff5 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -14,6 +14,12 @@
 
 /*
  * Conflict types that could occur while applying remote changes.
+ *
+ * This enum is used in statistics collection (see
+ * PgStat_StatSubEntry::conflict_count and
+ * PgStat_BackendSubEntry::conflict_count) as well, therefore, when adding new
+ * values or reordering existing ones, ensure to review and potentially adjust
+ * the corresponding statistics collection codes.
  */
 typedef enum
 {
@@ -42,6 +48,8 @@ typedef enum
 	 */
 } ConflictType;
 
+#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+
 extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
 									TransactionId *xmin,
 									RepOriginId *localorigin,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 862433ee52..a1626f3fae 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2139,9 +2139,15 @@ pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
     ss.sync_error_count,
+    ss.confl_insert_exists,
+    ss.confl_update_origin_differs,
+    ss.confl_update_exists,
+    ss.confl_update_missing,
+    ss.confl_delete_origin_differs,
+    ss.confl_delete_missing,
     ss.stats_reset
    FROM pg_subscription s,
-    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset);
+    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset);
 pg_stat_sys_indexes| SELECT relid,
     indexrelid,
     schemaname,
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index fb3e5629b3..6b6a5b0b1b 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -30,6 +30,7 @@ sub create_sub_pub_w_errors
 		qq[
 	BEGIN;
 	CREATE TABLE $table_name(a int);
+	ALTER TABLE $table_name REPLICA IDENTITY FULL;
 	INSERT INTO $table_name VALUES (1);
 	COMMIT;
 	]);
@@ -91,20 +92,36 @@ sub create_sub_pub_w_errors
 	# subscriber due to violation of the unique constraint on test table.
 	$node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1)));
 
-	# Wait for the apply error to be reported.
+	# Wait for the subscriber to report both an apply error and an
+	# insert_exists conflict.
 	$node_subscriber->poll_query_until(
 		$db,
 		qq[
-	SELECT apply_error_count > 0
+	SELECT apply_error_count > 0 AND confl_insert_exists > 0
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub_name'
 	])
 	  or die
-	  qq(Timed out while waiting for apply error for subscription '$sub_name');
+	  qq(Timed out while waiting for apply error and insert_exists conflict for subscription '$sub_name');
 
 	# Truncate test table so that apply worker can continue.
 	$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
 
+	# Delete data from the test table on the publisher. This delete operation
+	# should be skipped on the subscriber since the table is already empty.
+	$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+	# Wait for the subscriber to report a delete_missing conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT confl_delete_missing > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for delete_missing conflict for subscription '$sub_name');
+
 	return ($pub_name, $sub_name);
 }
 
@@ -123,17 +140,19 @@ my ($pub1_name, $sub1_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table1_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
 	sync_error_count > 0,
+	confl_insert_exists > 0,
+	confl_delete_missing > 0,
 	stats_reset IS NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Check that apply errors and sync errors are both > 0 and stats_reset is NULL for subscription '$sub1_name'.)
+	qq(t|t|t|t|t),
+	qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.)
 );
 
 # Reset a single subscription
@@ -141,17 +160,19 @@ $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
 );
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	confl_insert_exists = 0,
+	confl_delete_missing = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
+	qq(t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
 );
 
 # Get reset timestamp
@@ -181,46 +202,52 @@ my ($pub2_name, $sub2_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table2_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
 	sync_error_count > 0,
+	confl_insert_exists > 0,
+	confl_delete_missing > 0,
 	stats_reset IS NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub2_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both > 0 and stats_reset is NULL for sub '$sub2_name'.)
+	qq(t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.)
 );
 
 # Reset all subscriptions
 $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats(NULL)));
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	confl_insert_exists = 0,
+	confl_delete_missing = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
+	qq(t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
 );
 
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	confl_insert_exists = 0,
+	confl_delete_missing = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub2_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
+	qq(t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
 );
 
 $reset_time1 = $node_subscriber->safe_psql($db,
-- 
2.30.0.windows.2

#26Peter Smith
smithpb2250@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#24)
Re: Collect statistics about conflicts in logical replication

On Tue, Sep 3, 2024 at 9:23 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

On Tuesday, September 3, 2024 7:12 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

Testing the stats for all types of conflicts is not required for this patch
especially because they increase the timings by 3-4s. We can add tests for one
or two types of conflicts.

...

Thanks for the comments. I have addressed the comments and adjusted the tests.
In the V6 patch, Only insert_exists and delete_missing are tested.

I confirmed that it only increased the testing time by 1 second on my machine.

Best Regards,
Hou zj

It seems a pity to throw away perfectly good test cases just because
they increase how long the suite takes to run.

This seems like yet another example of where we could have made good
use of the 'PG_TEST_EXTRA' environment variable. I have been trying to
propose adding "subscription" support for this in another thread [1]/messages/by-id/CAHut+Psgtnr5BgcqYwD3PSf-AsUtVDE_j799AaZeAjJvE6HGtA@mail.gmail.com.
By using this variable to make some tests conditional, we could have
the best of both worlds. e.g.
- retain all tests, but
- by default, only run a subset of those tests (to keep default test
execution time low).

I hope that if the idea to use PG_TEST_EXTRA for "subscription" tests
gets accepted then later we can revisit this, and put all the removed
extra test cases back in again.

======
[1]: /messages/by-id/CAHut+Psgtnr5BgcqYwD3PSf-AsUtVDE_j799AaZeAjJvE6HGtA@mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia

#27Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#26)
Re: Collect statistics about conflicts in logical replication

On Wed, Sep 4, 2024 at 9:17 AM Peter Smith <smithpb2250@gmail.com> wrote:

On Tue, Sep 3, 2024 at 9:23 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

I confirmed that it only increased the testing time by 1 second on my machine.

It seems a pity to throw away perfectly good test cases just because
they increase how long the suite takes to run.

We can take every possible test, but I worry about the time they
consume without adding much value and the maintenance burden. I feel
like core-code we should pay attention to tests as well and don't try
to jam all the possible tests testing mostly similar stuff. Each time
before committing or otherwise verifying the patch, we run make
check-world, so don't want that time to go enormously high. Having
said that, I don't want the added functionality shouldn't be tested
properly and I try my best to achieve that.

This seems like yet another example of where we could have made good
use of the 'PG_TEST_EXTRA' environment variable. I have been trying to
propose adding "subscription" support for this in another thread [1].
By using this variable to make some tests conditional, we could have
the best of both worlds. e.g.
- retain all tests, but
- by default, only run a subset of those tests (to keep default test
execution time low).

I hope that if the idea to use PG_TEST_EXTRA for "subscription" tests
gets accepted then later we can revisit this, and put all the removed
extra test cases back in again.

I am not convinced that tests that are less useful than others or are
increasing the time are good to be kept under PG_TEST_EXTRA but if
more people advocate such an approach then it is worth considering.

--
With Regards,
Amit Kapila.