From 2bf4c112198ad0430fbcf499f408aa39de8c4de8 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Wed, 3 Jul 2024 10:34:10 +0800 Subject: [PATCH v4 2/2] Collect statistics about conflicts in logical replication This commit adds columns in view pg_stat_subscription_stats to show information about the conflict which occur during the application of logical replication changes. Currently, the following columns are added. insert_exists_counts: Number of times inserting a row that iolates a NOT DEFERRABLE unique constraint. update_differ_counts: Number of times updating a row that was previously modified by another origin. update_missing_counts: Number of times that the tuple to be updated is missing. delete_missing_counts: Number of times that the tuple to be deleted is missing. delete_differ_counts: Number of times deleting a row that was previously modified by another origin. The conflicts will be tracked only when track_conflict option of the subscription is enabled. Additionally, update_differ and delete_differ can be detected only when track_commit_timestamp is enabled. --- doc/src/sgml/monitoring.sgml | 65 ++++++++++- doc/src/sgml/ref/create_subscription.sgml | 4 +- src/backend/catalog/system_views.sql | 5 + src/backend/replication/logical/conflict.c | 4 + .../utils/activity/pgstat_subscription.c | 17 +++ src/backend/utils/adt/pgstatfuncs.c | 22 +++- src/include/catalog/pg_proc.dat | 6 +- src/include/pgstat.h | 4 + src/include/replication/conflict.h | 2 + src/test/regress/expected/rules.out | 7 +- src/test/subscription/t/026_stats.pl | 102 ++++++++++++++++-- 11 files changed, 220 insertions(+), 18 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 991f629907..92fe4b1011 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -507,7 +507,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser pg_stat_subscription_statspg_stat_subscription_stats - One row per subscription, showing statistics about errors. + One row per subscription, showing statistics about errors and conflicts. See pg_stat_subscription_stats for details. @@ -2171,6 +2171,69 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + + insert_exists_count bigint + + + Number of times inserting a row that violates a + NOT DEFERRABLE unique constraint while applying + changes. This conflict is counted only if + detect_conflict + is enabled + + + + + + update_differ_count bigint + + + Number of times updating a row that was previously modified by another + source while applying changes. This conflict is counted only when the + detect_conflict + option of the subscription and track_commit_timestamp + are enabled + + + + + + update_missing_count bigint + + + Number of times that the tuple to be updated is not found while applying + changes. This conflict is counted only if + detect_conflict + is enabled + + + + + + delete_missing_count bigint + + + Number of times that the tuple to be deleted is not found while applying + changes. This conflict is counted only if + detect_conflict + is enabled + + + + + + delete_differ_count bigint + + + Number of times deleting a row that was previously modified by another + source while applying changes. This conflict is counted only when the + detect_conflict + option of the subscription and track_commit_timestamp + are enabled + + + stats_reset timestamp with time zone diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index caa523b9bd..2ca7bf20d2 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -437,8 +437,8 @@ CREATE SUBSCRIPTION subscription_namefalse. - When conflict detection is enabled, additional logging is triggered - in the following scenarios: + When conflict detection is enabled, additional logging is triggered and + the conflict statistics are collected in the following scenarios: insert_exists diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 30393e6d67..9c7b771ac3 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1370,6 +1370,11 @@ CREATE VIEW pg_stat_subscription_stats AS s.subname, ss.apply_error_count, ss.sync_error_count, + ss.insert_exists_count, + ss.update_differ_count, + ss.update_missing_count, + ss.delete_missing_count, + ss.delete_differ_count, ss.stats_reset FROM pg_subscription as s, pg_stat_get_subscription_stats(s.oid) as ss; diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index b90e64b05b..df5ff0df8e 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -15,8 +15,10 @@ #include "postgres.h" #include "access/commit_ts.h" +#include "pgstat.h" #include "replication/conflict.h" #include "replication/origin.h" +#include "replication/worker_internal.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -76,6 +78,8 @@ ReportApplyConflict(int elevel, ConflictType type, Relation localrel, RepOriginId localorigin, TimestampTz localts, TupleTableSlot *conflictslot) { + pgstat_report_subscription_conflict(MySubscription->oid, type); + ereport(elevel, errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION), errmsg("conflict %s detected on relation \"%s.%s\"", diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c index d9af8de658..e06c92727e 100644 --- a/src/backend/utils/activity/pgstat_subscription.c +++ b/src/backend/utils/activity/pgstat_subscription.c @@ -39,6 +39,21 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error) pending->sync_error_count++; } +/* + * Report a subscription conflict. + */ +void +pgstat_report_subscription_conflict(Oid subid, ConflictType type) +{ + PgStat_EntryRef *entry_ref; + PgStat_BackendSubEntry *pending; + + entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION, + InvalidOid, subid, NULL); + pending = entry_ref->pending; + pending->conflict_count[type]++; +} + /* * Report creating the subscription. */ @@ -101,6 +116,8 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) #define SUB_ACC(fld) shsubent->stats.fld += localent->fld SUB_ACC(apply_error_count); SUB_ACC(sync_error_count); + for (int i = 0; i < CONFLICT_NUM_TYPES; i++) + SUB_ACC(conflict_count[i]); #undef SUB_ACC pgstat_unlock_entry(entry_ref); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 3876339ee1..42f2d0f6ef 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -1966,7 +1966,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) Datum pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 4 +#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 9 Oid subid = PG_GETARG_OID(0); TupleDesc tupdesc; Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; @@ -1985,7 +1985,17 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "insert_exists_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "update_differ_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "update_missing_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "delete_missing_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "delete_differ_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2005,11 +2015,15 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) /* sync_error_count */ values[2] = Int64GetDatum(subentry->sync_error_count); + /* conflict count */ + for (int i = 0; i < CONFLICT_NUM_TYPES; i++) + values[3 + i] = Int64GetDatum(subentry->conflict_count[i]); + /* stats_reset */ if (subentry->stat_reset_timestamp == 0) - nulls[3] = true; + nulls[8] = true; else - values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp); + values[8] = TimestampTzGetDatum(subentry->stat_reset_timestamp); /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index d4ac578ae6..bd2ad8548d 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5505,9 +5505,9 @@ { oid => '6231', descr => 'statistics: information about subscription stats', proname => 'pg_stat_get_subscription_stats', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o}', - proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}', + proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,apply_error_count,sync_error_count,insert_exists_count,update_differ_count,update_missing_count,delete_missing_count,delete_differ_count,stats_reset}', prosrc => 'pg_stat_get_subscription_stats' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 2136239710..b957e7ad36 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -14,6 +14,7 @@ #include "datatype/timestamp.h" #include "portability/instr_time.h" #include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */ +#include "replication/conflict.h" #include "utils/backend_progress.h" /* for backward compatibility */ #include "utils/backend_status.h" /* for backward compatibility */ #include "utils/relcache.h" @@ -135,6 +136,7 @@ typedef struct PgStat_BackendSubEntry { PgStat_Counter apply_error_count; PgStat_Counter sync_error_count; + PgStat_Counter conflict_count[CONFLICT_NUM_TYPES]; } PgStat_BackendSubEntry; /* ---------- @@ -393,6 +395,7 @@ typedef struct PgStat_StatSubEntry { PgStat_Counter apply_error_count; PgStat_Counter sync_error_count; + PgStat_Counter conflict_count[CONFLICT_NUM_TYPES]; TimestampTz stat_reset_timestamp; } PgStat_StatSubEntry; @@ -695,6 +698,7 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void); */ extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error); +extern void pgstat_report_subscription_conflict(Oid subid, ConflictType conflict); extern void pgstat_create_subscription(Oid subid); extern void pgstat_drop_subscription(Oid subid); extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid); diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index a9f521aaca..a872cfd6dd 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -36,6 +36,8 @@ typedef enum CT_DELETE_DIFFER, } ConflictType; +#define CONFLICT_NUM_TYPES (CT_DELETE_DIFFER + 1) + extern bool GetTupleCommitTs(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts); extern void ReportApplyConflict(int elevel, ConflictType type, diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index e12ef4336a..d1dec3d16c 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2142,9 +2142,14 @@ pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, ss.sync_error_count, + ss.insert_exists_count, + ss.update_differ_count, + ss.update_missing_count, + ss.delete_missing_count, + ss.delete_differ_count, ss.stats_reset FROM pg_subscription s, - LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset); + LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, insert_exists_count, update_differ_count, update_missing_count, delete_missing_count, delete_differ_count, stats_reset); pg_stat_sys_indexes| SELECT relid, indexrelid, schemaname, diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl index fb3e5629b3..6f9d08020c 100644 --- a/src/test/subscription/t/026_stats.pl +++ b/src/test/subscription/t/026_stats.pl @@ -16,6 +16,7 @@ $node_publisher->start; # Create subscriber node. my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', 'track_commit_timestamp = on'); $node_subscriber->start; @@ -30,6 +31,7 @@ sub create_sub_pub_w_errors qq[ BEGIN; CREATE TABLE $table_name(a int); + ALTER TABLE $table_name REPLICA IDENTITY FULL; INSERT INTO $table_name VALUES (1); COMMIT; ]); @@ -53,7 +55,7 @@ sub create_sub_pub_w_errors # infinite error loop due to violating the unique constraint. my $sub_name = $table_name . '_sub'; $node_subscriber->safe_psql($db, - qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name) + qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name WITH (detect_conflict = on)) ); $node_publisher->wait_for_catchup($sub_name); @@ -95,7 +97,7 @@ sub create_sub_pub_w_errors $node_subscriber->poll_query_until( $db, qq[ - SELECT apply_error_count > 0 + SELECT apply_error_count > 0 AND insert_exists_count > 0 FROM pg_stat_subscription_stats WHERE subname = '$sub_name' ]) @@ -105,6 +107,67 @@ sub create_sub_pub_w_errors # Truncate test table so that apply worker can continue. $node_subscriber->safe_psql($db, qq(TRUNCATE $table_name)); + # Update and delete data to test table on the publisher, skipping the + # update and delete on the subscriber as there are no data in the test + # table. + $node_publisher->safe_psql($db, qq( + UPDATE $table_name SET a = 2; + DELETE FROM $table_name; + )); + + # Wait for the tuple missing to be reported. + $node_subscriber->poll_query_until( + $db, + qq[ + SELECT update_missing_count > 0 AND delete_missing_count > 0 + FROM pg_stat_subscription_stats + WHERE subname = '$sub_name' + ]) + or die + qq(Timed out while waiting for tuple missing conflict for subscription '$sub_name'); + + # Prepare data for further tests. + $node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1))); + $node_publisher->wait_for_catchup($sub_name); + $node_subscriber->safe_psql($db, qq( + TRUNCATE $table_name; + INSERT INTO $table_name VALUES (1); + )); + + # Update data to test table on the publisher, updating a row on the + # subscriber that was modified by a different origin. + $node_publisher->safe_psql($db, qq(UPDATE $table_name SET a = 2;)); + + $node_subscriber->poll_query_until( + $db, + qq[ + SELECT update_differ_count > 0 + FROM pg_stat_subscription_stats + WHERE subname = '$sub_name' + ]) + or die + qq(Timed out while waiting for update_differ conflict for subscription '$sub_name'); + + # Prepare data for further tests. + $node_subscriber->safe_psql($db, qq( + TRUNCATE $table_name; + INSERT INTO $table_name VALUES (2); + )); + + # Delete data to test table on the publisher, deleting a row on the + # subscriber that was modified by a different origin. + $node_publisher->safe_psql($db, qq(DELETE FROM $table_name;)); + + $node_subscriber->poll_query_until( + $db, + qq[ + SELECT delete_differ_count > 0 + FROM pg_stat_subscription_stats + WHERE subname = '$sub_name' + ]) + or die + qq(Timed out while waiting for delete_differ conflict for subscription '$sub_name'); + return ($pub_name, $sub_name); } @@ -128,11 +191,16 @@ is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count > 0, sync_error_count > 0, + insert_exists_count > 0, + update_differ_count > 0, + update_missing_count > 0, + delete_missing_count > 0, + delete_differ_count > 0, stats_reset IS NULL FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t), + qq(t|t|t|t|t|t|t|t), qq(Check that apply errors and sync errors are both > 0 and stats_reset is NULL for subscription '$sub1_name'.) ); @@ -146,11 +214,16 @@ is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, sync_error_count = 0, + insert_exists_count = 0, + update_differ_count = 0, + update_missing_count = 0, + delete_missing_count = 0, + delete_differ_count = 0, stats_reset IS NOT NULL FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t), + qq(t|t|t|t|t|t|t|t), qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.) ); @@ -186,11 +259,16 @@ is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count > 0, sync_error_count > 0, + insert_exists_count > 0, + update_differ_count > 0, + update_missing_count > 0, + delete_missing_count > 0, + delete_differ_count > 0, stats_reset IS NULL FROM pg_stat_subscription_stats WHERE subname = '$sub2_name') ), - qq(t|t|t), + qq(t|t|t|t|t|t|t|t), qq(Confirm that apply errors and sync errors are both > 0 and stats_reset is NULL for sub '$sub2_name'.) ); @@ -203,11 +281,16 @@ is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, sync_error_count = 0, + insert_exists_count = 0, + update_differ_count = 0, + update_missing_count = 0, + delete_missing_count = 0, + delete_differ_count = 0, stats_reset IS NOT NULL FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t), + qq(t|t|t|t|t|t|t|t), qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.) ); @@ -215,11 +298,16 @@ is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, sync_error_count = 0, + insert_exists_count = 0, + update_differ_count = 0, + update_missing_count = 0, + delete_missing_count = 0, + delete_differ_count = 0, stats_reset IS NOT NULL FROM pg_stat_subscription_stats WHERE subname = '$sub2_name') ), - qq(t|t|t), + qq(t|t|t|t|t|t|t|t), qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.) ); -- 2.30.0.windows.2