diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 4353e14..f6e8331 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -432,14 +432,10 @@ DropSubscription(DropSubscriptionStmt *stmt) Oid subid; Datum datum; bool isnull; - char *subname; char *conninfo; + char *subname; char *slotname; - char originname[NAMEDATALEN]; - char *err = NULL; - RepOriginId originid; - WalReceiverConn *wrconn = NULL; - StringInfoData cmd; + LogicalRepWorker *worker = NULL; rel = heap_open(SubscriptionRelationId, RowExclusiveLock); @@ -508,56 +504,19 @@ DropSubscription(DropSubscriptionStmt *stmt) /* Clean up dependencies */ deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); - /* Protect against launcher restarting the worker. */ - LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE); - - /* Kill the apply worker so that the slot becomes accessible. */ - logicalrep_worker_stop(subid); - - LWLockRelease(LogicalRepLauncherLock); + /* Mark the apply worker need to be stopped */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); - /* Remove the origin tracking if exists. */ - snprintf(originname, sizeof(originname), "pg_%u", subid); - originid = replorigin_by_name(originname, true); - if (originid != InvalidRepOriginId) - replorigin_drop(originid); - - /* If the user asked to not drop the slot, we are done mow.*/ - if (!stmt->drop_slot) + worker = logicalrep_worker_find(subid); + if (worker) { - heap_close(rel, NoLock); - return; + if (stmt->drop_slot) + worker->drop_slot = true; + worker->need_to_stop = true; } + on_commit_launcher_wakeup = true; - /* - * Otherwise drop the replication slot at the publisher node using - * the replication connection. - */ - load_file("libpqwalreceiver", false); - - initStringInfo(&cmd); - appendStringInfo(&cmd, "DROP_REPLICATION_SLOT \"%s\"", slotname); - - wrconn = walrcv_connect(conninfo, true, subname, &err); - if (wrconn == NULL) - ereport(ERROR, - (errmsg("could not connect to publisher when attempting to " - "drop the replication slot \"%s\"", slotname), - errdetail("The error was: %s", err))); - - if (!walrcv_command(wrconn, cmd.data, &err)) - ereport(ERROR, - (errmsg("could not drop the replication slot \"%s\" on publisher", - slotname), - errdetail("The error was: %s", err))); - else - ereport(NOTICE, - (errmsg("dropped replication slot \"%s\" on publisher", - slotname))); - - walrcv_disconnect(wrconn); - - pfree(cmd.data); + LWLockRelease(LogicalRepWorkerLock); heap_close(rel, NoLock); } diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index d222cff..8e40960 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -73,7 +73,7 @@ static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); bool got_SIGTERM = false; -static bool on_commit_laucher_wakeup = false; +bool on_commit_launcher_wakeup = false; Datum pg_stat_get_subscription(PG_FUNCTION_ARGS); @@ -274,6 +274,8 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid) worker->dbid = dbid; worker->userid = userid; worker->subid = subid; + worker->need_to_stop = false; + worker->drop_slot = false; LWLockRelease(LogicalRepWorkerLock); @@ -305,28 +307,10 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid) /* * Stop the logical replication worker and wait until it detaches from the * slot. - * - * The caller must hold LogicalRepLauncherLock to ensure that new workers are - * not being started during this function call. */ void -logicalrep_worker_stop(Oid subid) +logicalrep_worker_stop(LogicalRepWorker *worker) { - LogicalRepWorker *worker; - - Assert(LWLockHeldByMe(LogicalRepLauncherLock)); - - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - - worker = logicalrep_worker_find(subid); - - /* No worker, nothing to do. */ - if (!worker) - { - LWLockRelease(LogicalRepWorkerLock); - return; - } - /* * If we found worker but it does not have proc set it is starting up, * wait for it to finish and then kill it. @@ -335,8 +319,6 @@ logicalrep_worker_stop(Oid subid) { int rc; - LWLockRelease(LogicalRepWorkerLock); - CHECK_FOR_INTERRUPTS(); /* Wait for signal. */ @@ -370,7 +352,6 @@ logicalrep_worker_stop(Oid subid) /* Now terminate the worker ... */ kill(worker->proc->pid, SIGTERM); - LWLockRelease(LogicalRepWorkerLock); /* ... and wait for it to die. */ for (;;) @@ -526,7 +507,7 @@ ApplyLauncherShmemInit(void) void AtCommit_ApplyLauncher(void) { - if (on_commit_laucher_wakeup) + if (on_commit_launcher_wakeup) ApplyLauncherWakeup(); } @@ -540,8 +521,8 @@ AtCommit_ApplyLauncher(void) void ApplyLauncherWakeupAtCommit(void) { - if (!on_commit_laucher_wakeup) - on_commit_laucher_wakeup = true; + if (!on_commit_launcher_wakeup) + on_commit_launcher_wakeup = true; } void @@ -579,6 +560,7 @@ ApplyLauncherMain(Datum main_arg) /* Enter main loop */ while (!got_SIGTERM) { + int i; int rc; List *sublist; ListCell *lc; @@ -590,6 +572,15 @@ ApplyLauncherMain(Datum main_arg) now = GetCurrentTimestamp(); + /* Check if there is worker that need to be stopped */ + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + if (w->need_to_stop) + logicalrep_worker_stop(w); + } + /* Limit the start retry to once a wal_retrieve_retry_interval */ if (TimestampDifferenceExceeds(last_start_time, now, wal_retrieve_retry_interval)) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 9383960..4e666bc 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1022,7 +1022,7 @@ ApplyLoop(void) } } - if (!in_remote_transaction) + if (!in_remote_transaction && !MyLogicalRepWorker->need_to_stop) { /* * If we didn't get any transactions for a while there might be @@ -1221,7 +1221,7 @@ reread_subscription(void) /* * Exit if the subscription was removed. * This normally should not happen as the worker gets killed - * during DROP SUBSCRIPTION. + * after DROP SUBSCRIPTION committed. */ if (!newsub) { @@ -1321,6 +1321,7 @@ ApplyWorkerMain(Datum main_arg) int server_version; TimeLineID startpointTLI; WalRcvStreamOptions options; + WalReceiverConn *conn = NULL; /* Attach to slot */ logicalrep_worker_attach(worker_slot); @@ -1422,8 +1423,55 @@ ApplyWorkerMain(Datum main_arg) /* Run the main loop. */ ApplyLoop(); + /* disconnect logical replication */ walrcv_disconnect(wrconn); + conn = walrcv_connect(MySubscription->conninfo, true, + MySubscription->name, &err); + if (conn == NULL) + ereport(ERROR, + (errmsg("could not connect to publisher when attempting to " + "drop the replication slot \"%s\"", MySubscription->slotname), + errdetail("The error was: %s", err))); + + /* + * Drop the replication slot at the publisher node using + * the replication connection. + */ + if (MyLogicalRepWorker->drop_slot) + { + StringInfoData cmd; + + initStringInfo(&cmd); + appendStringInfo(&cmd, "DROP_REPLICATION_SLOT \"%s\"", MySubscription->slotname); + + if (!walrcv_command(conn, cmd.data, &err)) + ereport(ERROR, + (errmsg("could not drop the replication slot \"%s\" on publisher", + MySubscription->slotname), + errdetail("The error was: %s", err))); + else + ereport(NOTICE, + (errmsg("dropped replication slot \"%s\" on publisher", + MySubscription->slotname))); + pfree(cmd.data); + } + + /* Remove the origin tracking if exists. */ + if (originid != InvalidRepOriginId) + { + replorigin_session_reset(); + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + + StartTransactionCommand(); + replorigin_drop(originid); + CommitTransactionCommand(); + } + + walrcv_disconnect(conn); + /* We should only get here if we received SIGTERM */ proc_exit(0); } diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 8cbf268..3db6f55 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -40,6 +40,8 @@ typedef struct LogicalRepWorker TimestampTz last_recv_time; XLogRecPtr reply_lsn; TimestampTz reply_time; + bool need_to_stop; /* Set by the backend executing DROP SUBSCRIPTION */ + bool drop_slot; /* Drop replication slot when exits? */ } LogicalRepWorker; /* libpqreceiver connection */ @@ -51,12 +53,13 @@ extern LogicalRepWorker *MyLogicalRepWorker; extern bool in_remote_transaction; extern bool got_SIGTERM; +extern bool on_commit_launcher_wakeup; extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid); extern int logicalrep_worker_count(Oid subid); extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid); -extern void logicalrep_worker_stop(Oid subid); +extern void logicalrep_worker_stop(LogicalRepWorker *worker); extern void logicalrep_worker_wakeup(Oid subid); extern void logicalrep_worker_sigterm(SIGNAL_ARGS); diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index b51740b..39dc97d 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -5,6 +5,27 @@ use PostgresNode; use TestLib; use Test::More tests => 11; +sub test_sub +{ + my ($self, $expected, $msg, $sql) = @_; + + my $timeout_max = 30; + my $timeout = 0; + my $result; + + while ($timeout < $timeout_max) + { + $result = $self->safe_psql('postgres', $sql); + + last if ($result eq $expected); + + $timeout++; + sleep 1; + } + + is ($result, $expected, $msg); +} + # Initialize publisher node my $node_publisher = get_new_node('publisher'); $node_publisher->init(allows_streaming => 'logical'); @@ -169,6 +190,13 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_full"); is($result, qq(11|0|100), 'check replicated insert after alter publication'); +# check if DROP SUBSCRIPTION is transactional +$node_subscriber->safe_psql('postgres', " +BEGIN; +DROP SUBSCRIPTION tap_sub; +ROLLBACK; +"); + # check all the cleanup $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); @@ -176,13 +204,13 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); is($result, qq(0), 'check subscription was dropped on subscriber'); -$result = - $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); -is($result, qq(0), 'check replication slot was dropped on publisher'); +test_sub($node_publisher, qq(0), + 'check replication slot was dropped on publisher', + "SELECT count(*) FROM pg_replication_slots"); -$result = - $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); -is($result, qq(0), 'check replication origin was dropped on subscriber'); +test_sub($node_publisher, qq(0), + 'check replication origin was dropped on subscriber', + "SELECT count(*) FROM pg_replication_origin"); $node_subscriber->stop('fast'); $node_publisher->stop('fast');