From d0089cf006f4d3afbccf1923761a85afee713b91 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 12 May 2017 16:53:08 +0900 Subject: [PATCH 2/2] Wait for table sync worker to finish when apply worker exits. --- src/backend/replication/logical/launcher.c | 52 +++++++++++++++++++++++++++++- src/include/replication/worker_internal.h | 1 + 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index dfce49d..9888758 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -403,8 +403,54 @@ retry: } /* + * Stop all table sync workers associated with given subid. + * + * This function is called by apply worker. Since table sync + * worker associated with same subscription is launched by + * only the apply worker. We don't need to acquire + * LogicalRepLauncherLock here. + */ +void +logicalrep_sync_workers_stop(Oid subid) +{ + List *relid_list = NIL; + ListCell *cell; + int i; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + /* + * Walks the workers array and get relid list that matches + * given subscription id. + */ + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + if (w->in_use && w->subid == subid && + OidIsValid(w->relid)) + relid_list = lappend_oid(relid_list, w->relid); + } + + LWLockRelease(LogicalRepWorkerLock); + + /* Return if there is no table sync worker associated with myself */ + if (relid_list == NIL) + return; + + foreach (cell, relid_list) + { + Oid relid = lfirst_oid(cell); + + logicalrep_worker_stop(subid, relid); + } +} + +/* * Stop the logical replication worker and wait until it detaches from the - * slot. + * slot. This function can be called by both logical replication launcher + * and apply worker to stop apply worker and table sync worker. + * * * The caller must hold LogicalRepLauncherLock to ensure that new workers are * not being started during this function call. @@ -573,6 +619,10 @@ logicalrep_worker_attach(int slot) static void logicalrep_worker_detach(void) { + /* Stop all sync workers associated if apply worker */ + if (!am_tablesync_worker()) + logicalrep_sync_workers_stop(MyLogicalRepWorker->subid); + /* Block concurrent access. */ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 26788fe..2fec0b0 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -78,6 +78,7 @@ extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, extern void logicalrep_worker_stop(Oid subid, Oid relid); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); +extern void logicalrep_sync_workers_stop(Oid subid); extern int logicalrep_sync_worker_count(Oid subid); -- 2.8.1