From 35d700dabbb583a2496b16ad7bb5a49a54d97f0d Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Thu, 17 Aug 2017 14:13:29 -0700 Subject: [PATCH 4/4] Improve locking for subscriptions and subscribed relations --- src/backend/commands/subscriptioncmds.c | 57 ++----- src/backend/replication/logical/launcher.c | 228 +++++++++++---------------- src/backend/replication/logical/tablesync.c | 63 +++++--- src/include/replication/worker_internal.h | 5 +- 4 files changed, 145 insertions(+), 208 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 354d037..85b97c3 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -633,12 +633,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) errmsg("subscription \"%s\" does not exist", stmt->subname))); + subid = HeapTupleGetOid(tup); + /* must be owner */ - if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId())) + if (!pg_subscription_ownercheck(subid, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION, stmt->subname); - subid = HeapTupleGetOid(tup); sub = GetSubscription(subid, false); /* Lock the subscription so nobody else can do anything with it. */ @@ -811,16 +812,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ObjectAddress myself; HeapTuple tup; Oid subid; - Datum datum; - bool isnull; - char *subname; - char *conninfo; - char *slotname; List *subworkers; ListCell *lc; char originname[NAMEDATALEN]; char *err = NULL; RepOriginId originid; + Subscription *sub; WalReceiverConn *wrconn = NULL; StringInfoData cmd; @@ -828,7 +825,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * Lock pg_subscription with AccessExclusiveLock to ensure that the * launcher doesn't restart new worker during dropping the subscription */ - rel = heap_open(SubscriptionRelationId, AccessExclusiveLock); + rel = heap_open(SubscriptionRelationId, RowExclusiveLock); tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId, CStringGetDatum(stmt->subname)); @@ -860,31 +857,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* DROP hook for the subscription being removed */ InvokeObjectDropHook(SubscriptionRelationId, subid, 0); - /* - * Lock the subscription so nobody else can do anything with it (including - * the replication workers). - */ - LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock); + sub = GetSubscription(subid, false); - /* Get subname */ - datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, - Anum_pg_subscription_subname, &isnull); - Assert(!isnull); - subname = pstrdup(NameStr(*DatumGetName(datum))); - - /* Get conninfo */ - datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, - Anum_pg_subscription_subconninfo, &isnull); - Assert(!isnull); - conninfo = TextDatumGetCString(datum); - - /* Get slotname */ - datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, - Anum_pg_subscription_subslotname, &isnull); - if (!isnull) - slotname = pstrdup(NameStr(*DatumGetName(datum))); - else - slotname = NULL; + /* Lock the subscription so nobody else can do anything with it. */ + LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock); /* * Since dropping a replication slot is not transactional, the replication @@ -896,7 +872,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * of a subscription that is associated with a replication slot", but we * don't have the proper facilities for that. */ - if (slotname) + if (sub->slotname) PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION"); @@ -923,7 +899,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) { LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); - if (slotname) + if (sub->slotname) logicalrep_worker_stop(w->subid, w->relid); else logicalrep_worker_stop_at_commit(w->subid, w->relid); @@ -946,7 +922,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * If there is no slot associated with the subscription, we can finish * here. */ - if (!slotname) + if (!sub->slotname) { heap_close(rel, NoLock); return; @@ -959,13 +935,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) load_file("libpqwalreceiver", false); initStringInfo(&cmd); - appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname)); + appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", + quote_identifier(sub->slotname)); - wrconn = walrcv_connect(conninfo, true, subname, &err); + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); if (wrconn == NULL) ereport(ERROR, (errmsg("could not connect to publisher when attempting to " - "drop the replication slot \"%s\"", slotname), + "drop the replication slot \"%s\"", sub->slotname), errdetail("The error was: %s", err), errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) " "to disassociate the subscription from the slot."))); @@ -979,12 +956,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) if (res->status != WALRCV_OK_COMMAND) ereport(ERROR, (errmsg("could not drop the replication slot \"%s\" on publisher", - slotname), + sub->slotname), errdetail("The error was: %s", res->err))); else ereport(NOTICE, (errmsg("dropped replication slot \"%s\" on publisher", - slotname))); + sub->slotname))); walrcv_clear_result(res); } diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 6c89442..e9bc9af 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -42,6 +42,7 @@ #include "replication/worker_internal.h" #include "storage/ipc.h" +#include "storage/lmgr.h" #include "storage/proc.h" #include "storage/procarray.h" #include "storage/procsignal.h" @@ -94,33 +95,23 @@ static bool on_commit_launcher_wakeup = false; Datum pg_stat_get_subscription(PG_FUNCTION_ARGS); - /* - * Load the list of subscriptions. - * - * Only the fields interesting for worker start/stop functions are filled for - * each subscription. + * Load the list of emabled subscription oids. */ static List * -get_subscription_list(void) +get_subscription_oids(void) { List *res = NIL; Relation rel; HeapScanDesc scan; HeapTuple tup; - MemoryContext resultcxt; - - /* This is the context that we will allocate our output data in */ - resultcxt = CurrentMemoryContext; /* - * Start a transaction so we can access pg_database, and get a snapshot. * We don't have a use for the snapshot itself, but we're interested in * the secondary effect that it sets RecentGlobalXmin. (This is critical * for anything that reads heap pages, because HOT may decide to prune * them even if the process doesn't attempt to modify any tuples.) */ - StartTransactionCommand(); (void) GetTransactionSnapshot(); rel = heap_open(SubscriptionRelationId, AccessShareLock); @@ -129,34 +120,17 @@ get_subscription_list(void) while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) { Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup); - Subscription *sub; - MemoryContext oldcxt; - /* - * Allocate our results in the caller's context, not the - * transaction's. We do this inside the loop, and restore the original - * context at the end, so that leaky things like heap_getnext() are - * not called in a potentially long-lived context. - */ - oldcxt = MemoryContextSwitchTo(resultcxt); - - sub = (Subscription *) palloc0(sizeof(Subscription)); - sub->oid = HeapTupleGetOid(tup); - sub->dbid = subform->subdbid; - sub->owner = subform->subowner; - sub->enabled = subform->subenabled; - sub->name = pstrdup(NameStr(subform->subname)); - /* We don't fill fields we are not interested in. */ - - res = lappend(res, sub); - MemoryContextSwitchTo(oldcxt); + /* We only care about enabled subscriptions. */ + if (!subform->subenabled) + continue; + + res = lappend_oid(res, HeapTupleGetOid(tup)); } heap_endscan(scan); heap_close(rel, AccessShareLock); - CommitTransactionCommand(); - return res; } @@ -282,23 +256,38 @@ logicalrep_workers_find(Oid subid, bool only_running) } /* - * Start new apply background worker. + * Start new logical replication background worker. */ void -logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, - Oid relid) +logicalrep_worker_launch(Oid subid, Oid relid) { BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; int i; int slot = 0; - LogicalRepWorker *worker = NULL; - int nsyncworkers; + List *subworkers; + ListCell *lc; TimestampTz now; + int nsyncworkers = 0; + Subscription *sub; + LogicalRepWorker *worker = NULL; ereport(DEBUG1, - (errmsg("starting logical replication worker for subscription \"%s\"", - subname))); + (errmsg("starting logical replication worker for subscription %u", + subid))); + + /* Block any concurrent DDL on the subscription. */ + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + + /* Get info about subscription. */ + sub = GetSubscription(subid, true); + if (!sub) + { + ereport(DEBUG1, + (errmsg("subscription %u not found, not starting worker for it", + subid))); + return; + } /* Report this after the initial starting message for consistency. */ if (max_replication_slots == 0) @@ -326,7 +315,14 @@ retry: } } - nsyncworkers = logicalrep_sync_worker_count(subid); + subworkers = logicalrep_workers_find(subid, false); + foreach (lc, subworkers) + { + LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); + if (w->relid != InvalidOid) + nsyncworkers ++; + } + list_free(subworkers); now = GetCurrentTimestamp(); @@ -372,6 +368,7 @@ retry: if (nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); + UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); return; } @@ -382,6 +379,7 @@ retry: if (worker == NULL) { LWLockRelease(LogicalRepWorkerLock); + UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); ereport(WARNING, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("out of logical replication worker slots"), @@ -394,8 +392,8 @@ retry: worker->in_use = true; worker->generation++; worker->proc = NULL; - worker->dbid = dbid; - worker->userid = userid; + worker->dbid = sub->dbid; + worker->userid = sub->owner; worker->subid = subid; worker->relid = relid; worker->relstate = SUBREL_STATE_UNKNOWN; @@ -406,8 +404,6 @@ retry: worker->reply_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->reply_time); - LWLockRelease(LogicalRepWorkerLock); - /* Register the new dynamic worker. */ memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | @@ -426,8 +422,13 @@ retry: bgw.bgw_notify_pid = MyProcPid; bgw.bgw_main_arg = Int32GetDatum(slot); + /* Try to register the worker and cleanup in case of failure. */ if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) { + logicalrep_worker_cleanup(worker); + LWLockRelease(LogicalRepWorkerLock); + UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + ereport(WARNING, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("out of background worker slots"), @@ -435,13 +436,24 @@ retry: return; } + /* Done with the worker array. */ + LWLockRelease(LogicalRepWorkerLock); + /* Now wait until it attaches. */ WaitForReplicationWorkerAttach(worker, bgw_handle); + + /* + * Worker either started or died, in any case we are done with the + * subscription. + */ + UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); } /* * Stop the logical replication worker for subid/relid, if any, and wait until * it detaches from the slot. + * + * Callers of this function better have exclusive lock on the subscription. */ void logicalrep_worker_stop(Oid subid, Oid relid) @@ -449,7 +461,8 @@ logicalrep_worker_stop(Oid subid, Oid relid) LogicalRepWorker *worker; uint16 generation; - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + /* Exclusive is needed for logicalrep_worker_cleanup(). */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); worker = logicalrep_worker_find(subid, relid, false); @@ -460,56 +473,20 @@ logicalrep_worker_stop(Oid subid, Oid relid) return; } + /* If there is worker but it's not running, clean it up. */ + if (!worker->proc) + { + logicalrep_worker_cleanup(worker); + LWLockRelease(LogicalRepWorkerLock); + return; + } + /* * Remember which generation was our worker so we can check if what we see * is still the same one. */ generation = worker->generation; - /* - * If we found a worker but it does not have proc set then it is still - * starting up; wait for it to finish starting and then kill it. - */ - while (worker->in_use && !worker->proc) - { - int rc; - - LWLockRelease(LogicalRepWorkerLock); - - /* Wait a bit --- we don't expect to have to wait long. */ - rc = WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, - 10L, WAIT_EVENT_BGWORKER_STARTUP); - - /* emergency bailout if postmaster has died */ - if (rc & WL_POSTMASTER_DEATH) - proc_exit(1); - - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); - } - - /* Recheck worker status. */ - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - - /* - * Check whether the worker slot is no longer used, which would mean - * that the worker has exited, or whether the worker generation is - * different, meaning that a different worker has taken the slot. - */ - if (!worker->in_use || worker->generation != generation) - { - LWLockRelease(LogicalRepWorkerLock); - return; - } - - /* Worker has assigned proc, so it has started. */ - if (worker->proc) - break; - } - /* Now terminate the worker ... */ kill(worker->proc->pid, SIGTERM); @@ -539,6 +516,11 @@ logicalrep_worker_stop(Oid subid, Oid relid) CHECK_FOR_INTERRUPTS(); } + /* + * Shared lock is enough for the loop as we don't need to do the slot + * cleanup because at this point we know that the worker has attached + * to the shmem and will clean the slot on detach automatically. + */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); } @@ -706,30 +688,6 @@ logicalrep_launcher_sighup(SIGNAL_ARGS) } /* - * Count the number of registered (not necessarily running) sync workers - * for a subscription. - */ -int -logicalrep_sync_worker_count(Oid subid) -{ - int i; - int res = 0; - - Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - - /* Search for attached worker for a given subscription id. */ - for (i = 0; i < max_logical_replication_workers; i++) - { - LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - - if (w->subid == subid && OidIsValid(w->relid)) - res++; - } - - return res; -} - -/* * ApplyLauncherShmemSize * Compute space needed for replication launcher shared memory */ @@ -899,8 +857,6 @@ ApplyLauncherMain(Datum main_arg) int rc; List *sublist; ListCell *lc; - MemoryContext subctx; - MemoryContext oldctx; TimestampTz now; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; @@ -912,28 +868,29 @@ ApplyLauncherMain(Datum main_arg) if (TimestampDifferenceExceeds(last_start_time, now, wal_retrieve_retry_interval)) { - /* Use temporary context for the database list and worker info. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - oldctx = MemoryContextSwitchTo(subctx); - - /* search for subscriptions to start or stop. */ - sublist = get_subscription_list(); - - /* Start the missing workers for enabled subscriptions. */ + /* + * Start new transaction so that we can take locks and snapshots. + * + * Any allocations will also be made inside the transaction memory + * context. + */ + StartTransactionCommand(); + + /* Search for subscriptions to start. */ + sublist = get_subscription_oids(); + + /* Start the missing workers. */ foreach(lc, sublist) { - Subscription *sub = (Subscription *) lfirst(lc); + Oid subid = lfirst_oid(lc); + Subscription *sub = GetSubscription(subid, false); LogicalRepWorker *w; if (!sub->enabled) continue; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); + w = logicalrep_worker_find(subid, InvalidOid, false); LWLockRelease(LogicalRepWorkerLock); if (w == NULL) @@ -941,15 +898,12 @@ ApplyLauncherMain(Datum main_arg) last_start_time = now; wait_time = wal_retrieve_retry_interval; - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid); + /* Start the worker. */ + logicalrep_worker_launch(subid, InvalidOid); } } - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); + CommitTransactionCommand(); } else { diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 42460b3..fdc3515 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -102,12 +102,13 @@ #include "replication/walreceiver.h" #include "replication/worker_internal.h" -#include "utils/snapmgr.h" #include "storage/ipc.h" +#include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/snapmgr.h" static bool table_states_valid = false; @@ -211,10 +212,8 @@ wait_for_relation_state_change(Oid relid, char expected_state) * worker to the expected one. * * Used when transitioning from SYNCWAIT state to CATCHUP. - * - * Returns false if the apply worker has disappeared. */ -static bool +static void wait_for_worker_state_change(char expected_state) { int rc; @@ -230,7 +229,7 @@ wait_for_worker_state_change(char expected_state) * enough to not give a misleading answer if we do it with no lock.) */ if (MyLogicalRepWorker->relstate == expected_state) - return true; + return; /* * Bail out if the apply worker has died, else signal it we're @@ -243,7 +242,10 @@ wait_for_worker_state_change(char expected_state) logicalrep_worker_wakeup_ptr(worker); LWLockRelease(LogicalRepWorkerLock); if (!worker) - break; + ereport(FATAL, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("terminating logical replication synchronization " + "worker due to subscription apply worker exit"))); /* * Wait. We expect to get a latch signal back from the apply worker, @@ -260,8 +262,6 @@ wait_for_worker_state_change(char expected_state) if (rc & WL_LATCH_SET) ResetLatch(MyLatch); } - - return false; } /* @@ -346,6 +346,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); +#define ensure_transaction_and_lock() \ + if (!started_tx) \ + {\ + StartTransactionCommand(); \ + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, \ + AccessShareLock); \ + started_tx = true; \ + } + /* We need up-to-date sync state info for subscription tables here. */ if (!table_states_valid) { @@ -358,8 +367,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) list_free_deep(table_states); table_states = NIL; - StartTransactionCommand(); - started_tx = true; + ensure_transaction_and_lock(); /* Fetch all non-ready tables. */ rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); @@ -421,11 +429,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) { rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; - if (!started_tx) - { - StartTransactionCommand(); - started_tx = true; - } + + ensure_transaction_and_lock(); UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, @@ -476,12 +481,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * Enter busy loop and wait for synchronization worker to * reach expected state (or die trying). */ - if (!started_tx) - { - StartTransactionCommand(); - started_tx = true; - } - + ensure_transaction_and_lock(); wait_for_relation_state_change(rstate->relid, SUBREL_STATE_SYNCDONE); } @@ -495,8 +495,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * running sync workers for this subscription, while we have * the lock. */ - int nsyncworkers = - logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + List *subworkers; + ListCell *lc; + int nsyncworkers = 0; + + subworkers = logicalrep_workers_find(MySubscription->oid, + false); + foreach (lc, subworkers) + { + LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); + if (w->relid != InvalidOid) + nsyncworkers ++; + } + list_free(subworkers); /* Now safe to release the LWLock */ LWLockRelease(LogicalRepWorkerLock); @@ -518,10 +529,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) TimestampDifferenceExceeds(hentry->last_start_time, now, wal_retrieve_retry_interval)) { - logicalrep_worker_launch(MyLogicalRepWorker->dbid, - MySubscription->oid, - MySubscription->name, - MyLogicalRepWorker->userid, + ensure_transaction_and_lock(); + logicalrep_worker_launch(MySubscription->oid, rstate->relid); hentry->last_start_time = now; } diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 7b8728c..c82396d 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -72,15 +72,12 @@ extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running); -extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, - Oid userid, Oid relid); +extern void logicalrep_worker_launch(Oid dbid, Oid subid); extern void logicalrep_worker_stop(Oid subid, Oid relid); extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); -extern int logicalrep_sync_worker_count(Oid subid); - extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); void process_syncing_tables(XLogRecPtr current_lsn); void invalidate_syncing_table_states(Datum arg, int cacheid, -- 1.7.1