From d331ee8fbc9f8b38c29479cdcf86cf5e3c8590bd Mon Sep 17 00:00:00 2001 From: Nisha Moond Date: Tue, 17 Feb 2026 12:06:02 +0530 Subject: [PATCH v1] Split pgstat_report_subscription_error by worker type Refactor the original function into three worker specific functions instead of passing the worker type to pgstat_report_subscription_*, which required inclusion of the internal header replication/worker_internal.h. New functions: pgstat_report_subscription_apply_error(int subid) pgstat_report_subscription_sequencesync_error(int subid) pgstat_report_subscription_tablesync_error(int subid) --- src/backend/commands/functioncmds.c | 1 + .../replication/logical/sequencesync.c | 3 +- src/backend/replication/logical/tablesync.c | 3 +- src/backend/replication/logical/worker.c | 14 +++-- src/backend/storage/ipc/procsignal.c | 1 + .../utils/activity/pgstat_subscription.c | 54 +++++++++++-------- src/include/pgstat.h | 7 +-- .../test_custom_stats/test_custom_var_stats.c | 1 + 8 files changed, 52 insertions(+), 32 deletions(-) diff --git a/src/backend/commands/functioncmds.c b/src/backend/commands/functioncmds.c index a516b037dea..242372b1e68 100644 --- a/src/backend/commands/functioncmds.c +++ b/src/backend/commands/functioncmds.c @@ -34,6 +34,7 @@ #include "access/htup_details.h" #include "access/table.h" +#include "access/xact.h" #include "catalog/catalog.h" #include "catalog/dependency.h" #include "catalog/indexing.h" diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index 165f909b3ba..deef43a59ac 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -750,8 +750,7 @@ start_sequence_sync(void) * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, - WORKERTYPE_SEQUENCESYNC); + pgstat_report_subscription_sequencesync_error(MySubscription->oid); PG_RE_THROW(); } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 19a3c21a863..5b92eb0d856 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1527,8 +1527,7 @@ start_table_sync(XLogRecPtr *origin_startpos, char **slotname) * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, - WORKERTYPE_TABLESYNC); + pgstat_report_subscription_tablesync_error(MySubscription->oid); PG_RE_THROW(); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 32725c48623..d718c158a11 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -5606,8 +5606,10 @@ start_apply(XLogRecPtr origin_startpos) * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, - MyLogicalRepWorker->type); + if (am_tablesync_worker()) + pgstat_report_subscription_tablesync_error(MySubscription->oid); + else + pgstat_report_subscription_apply_error(MySubscription->oid); PG_RE_THROW(); } @@ -5960,8 +5962,12 @@ DisableSubscriptionAndExit(void) * Report the worker failed during sequence synchronization, table * synchronization, or apply. */ - pgstat_report_subscription_error(MyLogicalRepWorker->subid, - MyLogicalRepWorker->type); + if (am_tablesync_worker()) + pgstat_report_subscription_tablesync_error(MySubscription->oid); + else if (am_sequencesync_worker()) + pgstat_report_subscription_sequencesync_error(MySubscription->oid); + else + pgstat_report_subscription_apply_error(MySubscription->oid); /* Disable the subscription */ StartTransactionCommand(); diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 5d33559926a..7505c9d3a37 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -22,6 +22,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "port/pg_bitutils.h" +#include "replication/logicalctl.h" #include "replication/logicalworker.h" #include "replication/walsender.h" #include "storage/condition_variable.h" diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c index 500b1899188..89b5532f7f6 100644 --- a/src/backend/utils/activity/pgstat_subscription.c +++ b/src/backend/utils/activity/pgstat_subscription.c @@ -22,10 +22,10 @@ /* - * Report a subscription error. + * Report a subscription error for apply worker. */ void -pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype) +pgstat_report_subscription_apply_error(Oid subid) { PgStat_EntryRef *entry_ref; PgStat_BackendSubEntry *pending; @@ -34,25 +34,37 @@ pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype) InvalidOid, subid, NULL); pending = entry_ref->pending; - switch (wtype) - { - case WORKERTYPE_APPLY: - pending->apply_error_count++; - break; - - case WORKERTYPE_SEQUENCESYNC: - pending->sync_seq_error_count++; - break; - - case WORKERTYPE_TABLESYNC: - pending->sync_table_error_count++; - break; - - default: - /* Should never happen. */ - Assert(0); - break; - } + pending->apply_error_count++; +} + +/* + * Report a subscription error for tablesync worker. + */ +void +pgstat_report_subscription_tablesync_error(Oid subid) +{ + PgStat_EntryRef *entry_ref; + PgStat_BackendSubEntry *pending; + + entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION, + InvalidOid, subid, NULL); + pending = entry_ref->pending; + pending->sync_table_error_count++; +} + +/* + * Report a subscription error for sequencesync worker. + */ +void +pgstat_report_subscription_sequencesync_error(Oid subid) +{ + PgStat_EntryRef *entry_ref; + PgStat_BackendSubEntry *pending; + + entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION, + InvalidOid, subid, NULL); + pending = entry_ref->pending; + pending->sync_seq_error_count++; } /* diff --git a/src/include/pgstat.h b/src/include/pgstat.h index fff7ecc2533..1f9bd6c86c2 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -16,7 +16,6 @@ #include "portability/instr_time.h" #include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */ #include "replication/conflict.h" -#include "replication/worker_internal.h" #include "utils/backend_progress.h" /* for backward compatibility */ /* IWYU pragma: export */ #include "utils/backend_status.h" /* for backward compatibility */ /* IWYU pragma: export */ #include "utils/pgstat_kind.h" @@ -775,8 +774,10 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void); * Functions in pgstat_subscription.c */ -extern void pgstat_report_subscription_error(Oid subid, - LogicalRepWorkerType wtype); +extern void pgstat_report_subscription_apply_error(Oid subid); +extern void pgstat_report_subscription_tablesync_error(Oid subid); +extern void pgstat_report_subscription_sequencesync_error(Oid subid); + extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type); extern void pgstat_create_subscription(Oid subid); extern void pgstat_drop_subscription(Oid subid); diff --git a/src/test/modules/test_custom_stats/test_custom_var_stats.c b/src/test/modules/test_custom_stats/test_custom_var_stats.c index 64a8fe63cce..da28afbd929 100644 --- a/src/test/modules/test_custom_stats/test_custom_var_stats.c +++ b/src/test/modules/test_custom_stats/test_custom_var_stats.c @@ -12,6 +12,7 @@ */ #include "postgres.h" +#include "access/htup_details.h" #include "common/hashfn.h" #include "funcapi.h" #include "storage/dsm_registry.h" -- 2.50.1 (Apple Git-155)