diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5aae7b6..fbb0b96 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -597,8 +597,6 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) RemoveSubscriptionRel(sub->oid, relid); - logicalrep_worker_stop(sub->oid, relid); - namespace = get_namespace_name(get_rel_namespace(relid)); ereport(NOTICE, (errmsg("removed subscription for table %s.%s", diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 3ff08bf..12e71ea 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -212,8 +212,8 @@ wait_for_relation_state_change(Oid relid, char expected_state) * * Used when transitioning from SYNCWAIT state to CATCHUP. * - * Returns false if the apply worker has disappeared or the table state has been - * reset. + * Returns false if the apply worker or the subscription relation state + * has disappeared or the table state has been reset. */ static bool wait_for_worker_state_change(char expected_state) @@ -223,9 +223,18 @@ wait_for_worker_state_change(char expected_state) for (;;) { LogicalRepWorker *worker; + char relstate; + XLogRecPtr relstate_lsn; CHECK_FOR_INTERRUPTS(); + /* Check its state and died if it has disappeared */ + relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + &relstate_lsn, true); + if (relstate == SUBREL_STATE_UNKNOWN) + return false; + /* Bail if the apply has died. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, @@ -914,8 +923,12 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relstate_lsn = *origin_startpos; SpinLockRelease(&MyLogicalRepWorker->relmutex); - /* Wait for main apply worker to tell us to catchup. */ - wait_for_worker_state_change(SUBREL_STATE_CATCHUP); + /* + * Wait for main apply worker to tell us to catchup. Exit + * proc if the table sync worker is orphaned. + */ + if(!wait_for_worker_state_change(SUBREL_STATE_CATCHUP)) + finish_sync_worker(); /*---------- * There are now two possible states here: