From 65ad6d794bb242e43232dd34a5f843d2f4c3c3dd Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Mon, 26 Sep 2022 12:59:43 +1000 Subject: [PATCH v3] Add common function ReplicationOriginNameForLogicalRep. Make a common replication origin name formatting function to replace multiple snprintf() expressions. This also includes logic previously done by ReplicationOriginNameForTablesync(). Discussion: https://postgr.es/m/CAHut%2BPsa8hhfSE6ozUK-ih7GkQziAVAf4f3bqiXEj2nQiu-43g%40mail.gmail.com --- src/backend/commands/subscriptioncmds.c | 11 ++++++----- src/backend/replication/logical/tablesync.c | 18 +++--------------- src/backend/replication/logical/worker.c | 29 +++++++++++++++++++++++++++-- src/include/replication/worker_internal.h | 4 ++-- 4 files changed, 38 insertions(+), 24 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f3bfcca..0793234 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -657,7 +657,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOnOwner(SubscriptionRelationId, subid, owner); - snprintf(originname, sizeof(originname), "pg_%u", subid); + ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_create(originname); /* @@ -946,7 +946,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * origin and by this time the origin might be already * removed. For these reasons, passing missing_ok = true. */ - ReplicationOriginNameForTablesync(sub->oid, relid, originname, + ReplicationOriginNameForLogicalRep(sub->oid, relid, originname, sizeof(originname)); replorigin_drop_by_name(originname, true, false); } @@ -1315,7 +1315,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, char originname[NAMEDATALEN]; XLogRecPtr remote_lsn; - snprintf(originname, sizeof(originname), "pg_%u", subid); + ReplicationOriginNameForLogicalRep(subid, InvalidOid, + originname, sizeof(originname)); originid = replorigin_by_name(originname, false); remote_lsn = replorigin_get_progress(originid, false); @@ -1521,7 +1522,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * worker so passing missing_ok = true. This can happen for the states * before SUBREL_STATE_FINISHEDCOPY. */ - ReplicationOriginNameForTablesync(subid, relid, originname, + ReplicationOriginNameForLogicalRep(subid, relid, originname, sizeof(originname)); replorigin_drop_by_name(originname, true, false); } @@ -1533,7 +1534,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) RemoveSubscriptionRel(subid, InvalidOid); /* Remove the origin tracking if exists. */ - snprintf(originname, sizeof(originname), "pg_%u", subid); + ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_drop_by_name(originname, true, false); /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 9e52fc4..ff0f359 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -353,7 +353,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ StartTransactionCommand(); - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, + ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, originname, sizeof(originname)); @@ -505,7 +505,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * error while dropping we won't restart it to drop the * origin. So passing missing_ok = true. */ - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, + ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, rstate->relid, originname, sizeof(originname)); @@ -1194,18 +1194,6 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid, } /* - * Form the origin name for tablesync. - * - * Return the name in the supplied buffer. - */ -void -ReplicationOriginNameForTablesync(Oid suboid, Oid relid, - char *originname, Size szorgname) -{ - snprintf(originname, szorgname, "pg_%u_%u", suboid, relid); -} - -/* * Start syncing the table in the sync worker. * * If nothing needs to be done to sync the table, we exit the worker without @@ -1274,7 +1262,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY); /* Assign the origin tracking record name. */ - ReplicationOriginNameForTablesync(MySubscription->oid, + ReplicationOriginNameForLogicalRep(MySubscription->oid, MyLogicalRepWorker->relid, originname, sizeof(originname)); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 207a580..c2cd61a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -365,6 +365,30 @@ static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr ls static inline void reset_apply_error_context_info(void); /* + * Form the origin name for the subscription. + * + * This is a common function for tablesync and other workers. Tablesync workers + * must pass a valid relid. Other callers must pass relid = InvalidOid. + * + * Return the name in the supplied buffer. + */ +void +ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, + char *originname, Size szoriginname) +{ + if (OidIsValid(relid)) + { + /* Replication origin name for tablesync workers. */ + snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid); + } + else + { + /* Replication origin name for non-tablesync workers. */ + snprintf(originname, szoriginname, "pg_%u", suboid); + } +} + +/* * Should this worker apply changes for given relation. * * This is mainly needed for initial relation data sync as that runs in @@ -3679,7 +3703,7 @@ ApplyWorkerMain(Datum main_arg) * Allocate the origin name in long-lived context for error context * message. */ - ReplicationOriginNameForTablesync(MySubscription->oid, + ReplicationOriginNameForLogicalRep(MySubscription->oid, MyLogicalRepWorker->relid, originname, sizeof(originname)); @@ -3707,7 +3731,8 @@ ApplyWorkerMain(Datum main_arg) /* Setup replication origin tracking. */ StartTransactionCommand(); - snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid); + ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, + originname, sizeof(originname)); originid = replorigin_by_name(originname, true); if (!OidIsValid(originid)) originid = replorigin_create(originname); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index f82bc51..40a0b06 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -92,8 +92,8 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); extern int logicalrep_sync_worker_count(Oid subid); -extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, - char *originname, Size szorgname); +extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, + char *originname, Size szoriginname); extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); extern bool AllTablesyncsReady(void); -- 1.8.3.1