Subscription code improvements
Hi,
I have done some review of subscription handling (well self-review) and
here is the result of that (It's slightly improved version from another
thread [1]/messages/by-id/3e06c16c-f441-c606-985c-6d6239097f54@2ndquadrant.com).
I split it into several patches:
0001 - Makes subscription worker exit nicely when there is subscription
missing (ie was removed while it was starting) and also makes disabled
message look same as the message when disabled state was detected while
worker is running. This is mostly just nicer behavior for user (ie no
unexpected errors in log when you just disabled the subscription).
0002 - This is bugfix - the sync worker should exit when waiting for
apply and apply dies otherwise there is possibility of not being
correctly synchronized.
0003 - Splits the schizophrenic SetSubscriptionRelState function into
two which don't try to do broken upsert and report proper errors instead.
0004 - Solves the issue which Masahiko Sawada reported [2]/messages/by-id/CAD21AoBD4T2RwTiWD8YfXd+q+m9swX50YjqT5ibj2MzEBVnBhw@mail.gmail.com about ALTER
SUBSCRIPTION handling of workers not being transactional - we now do
killing of workers transactionally (and we do the same when possible in
DROP SUBSCRIPTION).
0005 - Bugfix to allow syscache access to subscription without being
connected to database - this should work since subscription is pinned
catalog, it wasn't caught because nothing in core is using it (yet).
0006 - Makes the locking of subscriptions more granular (this depends on
0005). This removes the ugly AccessExclusiveLock on the pg_subscription
catalog from DROP SUBSCRIPTION and also solves some potential race
conditions between launcher, ALTER SUBSCRIPTION and
process_syncing_tables_for_apply(). Also simplifies memory handling in
launcher as well as logicalrep_worker_stop() function. This basically
makes subscriptions behave like every other object in the database in
terms of locking.
Only the 0002, 0004 and 0005 are actual bug fixes, but I'd still like to
get it all into PG10 as especially the locking now behaves really
differently than everything else and that does not seem like a good idea.
[1]: /messages/by-id/3e06c16c-f441-c606-985c-6d6239097f54@2ndquadrant.com
/messages/by-id/3e06c16c-f441-c606-985c-6d6239097f54@2ndquadrant.com
[2]: /messages/by-id/CAD21AoBD4T2RwTiWD8YfXd+q+m9swX50YjqT5ibj2MzEBVnBhw@mail.gmail.com
/messages/by-id/CAD21AoBD4T2RwTiWD8YfXd+q+m9swX50YjqT5ibj2MzEBVnBhw@mail.gmail.com
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0001-Improve-messaging-during-logical-replication-worker-.patchtext/x-patch; name=0001-Improve-messaging-during-logical-replication-worker-.patchDownload
From 4c1ef64493ee930dfde3aa787c454a5d68ac3efd Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Thu, 6 Jul 2017 23:42:56 +0200
Subject: [PATCH 1/6] Improve messaging during logical replication worker
startup
---
src/backend/replication/logical/worker.c | 23 +++++++++++++++--------
1 file changed, 15 insertions(+), 8 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 0d48dfa..2e4099c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1527,24 +1527,31 @@ ApplyWorkerMain(Datum main_arg)
ALLOCSET_DEFAULT_SIZES);
StartTransactionCommand();
oldctx = MemoryContextSwitchTo(ApplyContext);
- MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
+ MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+ if (!MySubscription)
+ {
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription %u will "
+ "stop because the subscription was removed",
+ MyLogicalRepWorker->subid)));
+ proc_exit(0);
+ }
MySubscriptionValid = true;
MemoryContextSwitchTo(oldctx);
- /* Setup synchronous commit according to the user's wishes */
- SetConfigOption("synchronous_commit", MySubscription->synccommit,
- PGC_BACKEND, PGC_S_OVERRIDE);
-
if (!MySubscription->enabled)
{
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will not "
- "start because the subscription was disabled during startup",
+ (errmsg("logical replication apply worker for subscription \"%s\" will "
+ "stop because the subscription was disabled",
MySubscription->name)));
-
proc_exit(0);
}
+ /* Setup synchronous commit according to the user's wishes */
+ SetConfigOption("synchronous_commit", MySubscription->synccommit,
+ PGC_BACKEND, PGC_S_OVERRIDE);
+
/* Keep us informed about subscription changes. */
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
subscription_change_cb,
--
2.7.4
0002-Exit-in-sync-worker-if-relation-was-removed-during-s.patchtext/x-patch; name=0002-Exit-in-sync-worker-if-relation-was-removed-during-s.patchDownload
From b5d4d9a130658859bcf6e21ca3bed131dbdddb57 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Fri, 7 Jul 2017 00:04:43 +0200
Subject: [PATCH 2/6] Exit in sync worker if relation was removed during
startup
---
src/backend/replication/logical/tablesync.c | 14 ++++++++++++++
1 file changed, 14 insertions(+)
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 32abf5b..9fbdd8c 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -824,6 +824,20 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
&relstate_lsn, true);
+ /*
+ * The relation is not locked during startup of sync worker so it's
+ * possible that it has been removed in meantime. Exit gracefully in that
+ * case as it's perfectly normal scenario.
+ */
+ if (relstate == SUBREL_STATE_UNKNOWN)
+ {
+ ereport(LOG,
+ (errmsg("logical replication table synchronization worker for subscription \"%s\", "
+ "table \"%s\" will stop because the table is no longer subscribed",
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid))));
+ proc_exit(0);
+ }
CommitTransactionCommand();
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
--
2.7.4
0003-Split-the-SetSubscriptionRelState-function-into-two.patchtext/x-patch; name=0003-Split-the-SetSubscriptionRelState-function-into-two.patchDownload
From c676c0693cb4e30f0315eebe7424ae43e88c9cb2 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Thu, 6 Jul 2017 23:53:18 +0200
Subject: [PATCH 3/6] Split the SetSubscriptionRelState function into two
Removes ugly code spit, broken upsert logic and improves error reporting.
---
src/backend/catalog/pg_subscription.c | 133 ++++++++++++++++------------
src/backend/commands/subscriptioncmds.c | 10 +--
src/backend/replication/logical/tablesync.c | 34 ++++---
src/include/catalog/pg_subscription_rel.h | 6 +-
4 files changed, 99 insertions(+), 84 deletions(-)
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index fb53d71..b643e54 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -227,24 +227,15 @@ textarray_to_stringlist(ArrayType *textarray)
}
/*
- * Set the state of a subscription table.
- *
- * If update_only is true and the record for given table doesn't exist, do
- * nothing. This can be used to avoid inserting a new record that was deleted
- * by someone else. Generally, subscription DDL commands should use false,
- * workers should use true.
- *
- * The insert-or-update logic in this function is not concurrency safe so it
- * might raise an error in rare circumstances. But if we took a stronger lock
- * such as ShareRowExclusiveLock, we would risk more deadlocks.
+ * Add new state record for a subscription table.
*/
Oid
-SetSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn, bool update_only)
+AddSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn)
{
Relation rel;
HeapTuple tup;
- Oid subrelid = InvalidOid;
+ Oid subrelid;
bool nulls[Natts_pg_subscription_rel];
Datum values[Natts_pg_subscription_rel];
@@ -256,57 +247,81 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
ObjectIdGetDatum(relid),
ObjectIdGetDatum(subid));
+ if (HeapTupleIsValid(tup))
+ elog(ERROR, "subscription table %u in subscription %u already exists",
+ relid, subid);
- /*
- * If the record for given table does not exist yet create new record,
- * otherwise update the existing one.
- */
- if (!HeapTupleIsValid(tup) && !update_only)
- {
- /* Form the tuple. */
- memset(values, 0, sizeof(values));
- memset(nulls, false, sizeof(nulls));
- values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
- values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
- values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
- if (sublsn != InvalidXLogRecPtr)
- values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
- else
- nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
-
- tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
-
- /* Insert tuple into catalog. */
- subrelid = CatalogTupleInsert(rel, tup);
-
- heap_freetuple(tup);
- }
- else if (HeapTupleIsValid(tup))
- {
- bool replaces[Natts_pg_subscription_rel];
+ /* Form the tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
+ values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+ values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+ if (sublsn != InvalidXLogRecPtr)
+ values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+ else
+ nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
- /* Update the tuple. */
- memset(values, 0, sizeof(values));
- memset(nulls, false, sizeof(nulls));
- memset(replaces, false, sizeof(replaces));
+ tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
- replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
- values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+ /* Insert tuple into catalog. */
+ subrelid = CatalogTupleInsert(rel, tup);
- replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
- if (sublsn != InvalidXLogRecPtr)
- values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
- else
- nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+ heap_freetuple(tup);
- tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
- replaces);
+ /* Cleanup. */
+ heap_close(rel, NoLock);
- /* Update the catalog. */
- CatalogTupleUpdate(rel, &tup->t_self, tup);
+ return subrelid;
+}
- subrelid = HeapTupleGetOid(tup);
- }
+/*
+ * Update the state of a subscription table.
+ */
+Oid
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn)
+{
+ Relation rel;
+ HeapTuple tup;
+ Oid subrelid;
+ bool nulls[Natts_pg_subscription_rel];
+ Datum values[Natts_pg_subscription_rel];
+ bool replaces[Natts_pg_subscription_rel];
+
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+ rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+ /* Try finding existing mapping. */
+ tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(subid));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "subscription table %u in subscription %u does not exist",
+ relid, subid);
+
+ /* Update the tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+ values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+ replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+ if (sublsn != InvalidXLogRecPtr)
+ values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+ else
+ nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+ replaces);
+
+ /* Update the catalog. */
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+ subrelid = HeapTupleGetOid(tup);
/* Cleanup. */
heap_close(rel, NoLock);
@@ -381,6 +396,8 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
HeapTuple tup;
int nkeys = 0;
+ Assert(OidIsValid(subid) || OidIsValid(relid));
+
rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
if (OidIsValid(subid))
@@ -404,9 +421,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
/* Do the search and delete what we found. */
scan = heap_beginscan_catalog(rel, nkeys, skey);
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
- {
CatalogTupleDelete(rel, &tup->t_self);
- }
heap_endscan(scan);
heap_close(rel, RowExclusiveLock);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 6dc3f6e..8d144ab 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
CheckSubscriptionRelkind(get_rel_relkind(relid),
rv->schemaname, rv->relname);
- SetSubscriptionRelState(subid, relid, table_state,
- InvalidXLogRecPtr, false);
+ AddSubscriptionRelState(subid, relid, table_state,
+ InvalidXLogRecPtr);
}
/*
@@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
if (!bsearch(&relid, subrel_local_oids,
list_length(subrel_states), sizeof(Oid), oid_cmp))
{
- SetSubscriptionRelState(sub->oid, relid,
- copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
- InvalidXLogRecPtr, false);
+ AddSubscriptionRelState(sub->oid, relid,
+ copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+ InvalidXLogRecPtr);
ereport(NOTICE,
(errmsg("added subscription for table %s.%s",
quote_identifier(rv->schemaname),
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 9fbdd8c..2f6c7b4 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -298,11 +298,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
SpinLockRelease(&MyLogicalRepWorker->relmutex);
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relstate_lsn);
walrcv_endstreaming(wrconn, &tli);
finish_sync_worker();
@@ -427,9 +426,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
StartTransactionCommand();
started_tx = true;
}
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- rstate->relid, rstate->state,
- rstate->lsn, true);
+
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ rstate->relid, rstate->state,
+ rstate->lsn);
}
}
else
@@ -884,11 +884,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
/* Update the state and make it visible to others. */
StartTransactionCommand();
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relstate_lsn);
CommitTransactionCommand();
pgstat_report_stat(false);
@@ -973,11 +972,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* Update the new state in catalog. No need to bother
* with the shmem state as we are exiting for good.
*/
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- SUBREL_STATE_SYNCDONE,
- *origin_startpos,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ SUBREL_STATE_SYNCDONE,
+ *origin_startpos);
finish_sync_worker();
}
break;
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 991ca9d..c5b0b9c 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -70,8 +70,10 @@ typedef struct SubscriptionRelState
char state;
} SubscriptionRelState;
-extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn, bool update_only);
+extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn);
+extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn);
extern char GetSubscriptionRelState(Oid subid, Oid relid,
XLogRecPtr *sublsn, bool missing_ok);
extern void RemoveSubscriptionRel(Oid subid, Oid relid);
--
2.7.4
0004-Only-kill-sync-workers-at-commit-time-in-SUBSCRIPTIO.patchtext/x-patch; name=0004-Only-kill-sync-workers-at-commit-time-in-SUBSCRIPTIO.patchDownload
From 59debb27c5aefedb8add2925e983815a5e0b9bd6 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Thu, 6 Jul 2017 23:57:05 +0200
Subject: [PATCH 4/6] Only kill sync workers at commit time in SUBSCRIPTION DDL
---
src/backend/access/transam/xact.c | 9 +++++
src/backend/commands/subscriptioncmds.c | 26 +++++++++++--
src/backend/replication/logical/launcher.c | 60 +++++++++++++++++++++++++++++-
src/include/replication/logicallauncher.h | 1 +
src/include/replication/worker_internal.h | 1 +
5 files changed, 91 insertions(+), 6 deletions(-)
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b0aa69f..322502d 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2277,6 +2277,15 @@ PrepareTransaction(void)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE a transaction that has exported snapshots")));
+ /*
+ * Similar to above, don't allow PREPARE but for transaction that kill
+ * logical replication, workers.
+ */
+ if (XactManipulatesLogicalReplicationWorkers())
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));
+
/* Prevent cancel/die interrupt while cleaning up */
HOLD_INTERRUPTS();
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 8d144ab..f25a79f 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
RemoveSubscriptionRel(sub->oid, relid);
- logicalrep_worker_stop(sub->oid, relid);
+ logicalrep_worker_stop_at_commit(sub->oid, relid);
namespace = get_namespace_name(get_rel_namespace(relid));
ereport(NOTICE,
@@ -909,15 +909,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
ReleaseSysCache(tup);
+ /*
+ * If we are dropping slot, stop all the subscription workers immediately
+ * so that the slot is accessible, otherwise just shedule the stop at the
+ * end of the transaction.
+ *
+ * New workers won't be started because we hold exclusive lock on the
+ * subscription till the end of transaction.
+ */
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ subworkers = logicalrep_sub_workers_find(subid, false);
+ LWLockRelease(LogicalRepWorkerLock);
+ foreach (lc, subworkers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+ if (sub->slotname)
+ logicalrep_worker_stop(w->subid, w->relid);
+ else
+ logicalrep_worker_stop_at_commit(w->subid, w->relid);
+ }
+ list_free(subworkers);
+
/* Clean up dependencies */
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
/* Remove any associated relation synchronization states. */
RemoveSubscriptionRel(subid, InvalidOid);
- /* Kill the apply worker so that the slot becomes accessible. */
- logicalrep_worker_stop(subid, InvalidOid);
-
/* Remove the origin tracking if exists. */
snprintf(originname, sizeof(originname), "pg_%u", subid);
originid = replorigin_by_name(originname, true);
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index d165d51..caf4844 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -73,6 +73,14 @@ typedef struct LogicalRepCtxStruct
LogicalRepCtxStruct *LogicalRepCtx;
+typedef struct LogicalRepWorkerId
+{
+ Oid subid;
+ Oid relid;
+} LogicalRepWorkerId;
+
+static List *on_commit_stop_workers = NIL;
+
static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
@@ -514,6 +522,27 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}
/*
+ * Request worker for specified sub/rel to be stopped on commit.
+ */
+void
+logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+{
+ LogicalRepWorkerId *wid;
+ MemoryContext oldctx;
+
+ /* Make sure we store the info in context which survives until commit. */
+ oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+ wid = palloc(sizeof(LogicalRepWorkerId));
+ wid->subid = subid;
+ wid->relid = relid;
+
+ on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+
+ MemoryContextSwitchTo(oldctx);
+}
+
+/*
* Wake up (using latch) any logical replication worker for specified sub/rel.
*/
void
@@ -754,14 +783,41 @@ ApplyLauncherShmemInit(void)
}
/*
+ * XactManipulatesLogicalReplicationWorkers
+ * Check whether current transaction has manipulated logical replication
+ * workers.
+ */
+bool
+XactManipulatesLogicalReplicationWorkers(void)
+{
+ return (on_commit_stop_workers != NIL);
+}
+
+/*
* Wakeup the launcher on commit if requested.
*/
void
AtEOXact_ApplyLauncher(bool isCommit)
{
- if (isCommit && on_commit_launcher_wakeup)
- ApplyLauncherWakeup();
+ ListCell *lc;
+ if (isCommit)
+ {
+ foreach (lc, on_commit_stop_workers)
+ {
+ LogicalRepWorkerId *wid = lfirst(lc);
+ logicalrep_worker_stop(wid->subid, wid->relid);
+ }
+
+ if (on_commit_launcher_wakeup)
+ ApplyLauncherWakeup();
+ }
+
+ /*
+ * No need to pfree on_commit_stop_workers, it's been allocated in
+ * transaction memory context which is going to be cleaned soon.
+ */
+ on_commit_stop_workers = NIL;
on_commit_launcher_wakeup = false;
}
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index aac7d32..78016c4 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -22,6 +22,7 @@ extern Size ApplyLauncherShmemSize(void);
extern void ApplyLauncherShmemInit(void);
extern void ApplyLauncherWakeupAtCommit(void);
+extern bool XactManipulatesLogicalReplicationWorkers(void);
extern void AtEOXact_ApplyLauncher(bool isCommit);
extern bool IsLogicalLauncher(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 494a3a3..402b166 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -74,6 +74,7 @@ extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
--
2.7.4
0005-Allow-syscache-access-to-subscriptions-in-database-l.patchtext/x-patch; name=0005-Allow-syscache-access-to-subscriptions-in-database-l.patchDownload
From 6d32797e05ccacf627241e60b78661878c814806 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Fri, 7 Jul 2017 00:02:21 +0200
Subject: [PATCH 5/6] Allow syscache access to subscriptions in database-less
processes
---
src/backend/utils/cache/catcache.c | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/src/backend/utils/cache/catcache.c b/src/backend/utils/cache/catcache.c
index e7e8e3b..639b4eb 100644
--- a/src/backend/utils/cache/catcache.c
+++ b/src/backend/utils/cache/catcache.c
@@ -1052,10 +1052,12 @@ IndexScanOK(CatCache *cache, ScanKey cur_skey)
case AUTHNAME:
case AUTHOID:
case AUTHMEMMEMROLE:
+ case SUBSCRIPTIONOID:
+ case SUBSCRIPTIONNAME:
/*
- * Protect authentication lookups occurring before relcache has
- * collected entries for shared indexes.
+ * Protect authentication and subscription lookups occurring
+ * before relcache has collected entries for shared indexes.
*/
if (!criticalSharedRelcachesBuilt)
return false;
--
2.7.4
0006-Improve-locking-for-subscriptions-and-subscribed-rel.patchtext/x-patch; name=0006-Improve-locking-for-subscriptions-and-subscribed-rel.patchDownload
From c05006a7cb1be1c1949bf0413e5ca363177ece02 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Fri, 7 Jul 2017 16:27:17 +0200
Subject: [PATCH 6/6] Improve locking for subscriptions and subscribed
relations
Remove the exclusive lock on the catalog the DROP SUBSCRIPTION was
using and use more granular locking of individual subscriptions.
This should make overall behavior of the subscriptions and their workers
more sane in terms of concurrency and race conditions.
---
src/backend/commands/subscriptioncmds.c | 59 +++----
src/backend/replication/logical/launcher.c | 253 +++++++++++++---------------
src/backend/replication/logical/tablesync.c | 63 ++++---
src/include/replication/worker_internal.h | 5 +-
4 files changed, 172 insertions(+), 208 deletions(-)
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f25a79f..3dc1f4c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -636,12 +636,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
errmsg("subscription \"%s\" does not exist",
stmt->subname)));
+ subid = HeapTupleGetOid(tup);
+
/* must be owner */
- if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+ if (!pg_subscription_ownercheck(subid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
stmt->subname);
- subid = HeapTupleGetOid(tup);
sub = GetSubscription(subid, false);
/* Lock the subscription so nobody else can do anything with it. */
@@ -814,14 +815,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
ObjectAddress myself;
HeapTuple tup;
Oid subid;
- Datum datum;
- bool isnull;
- char *subname;
- char *conninfo;
- char *slotname;
+ List *subworkers;
+ ListCell *lc;
char originname[NAMEDATALEN];
- char *err = NULL;
RepOriginId originid;
+ char *err = NULL;
+ Subscription *sub;
WalReceiverConn *wrconn = NULL;
StringInfoData cmd;
@@ -829,7 +828,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* Lock pg_subscription with AccessExclusiveLock to ensure that the
* launcher doesn't restart new worker during dropping the subscription
*/
- rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
+ rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
CStringGetDatum(stmt->subname));
@@ -861,31 +860,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/* DROP hook for the subscription being removed */
InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
- /*
- * Lock the subscription so nobody else can do anything with it (including
- * the replication workers).
- */
- LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+ sub = GetSubscription(subid, false);
- /* Get subname */
- datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
- Anum_pg_subscription_subname, &isnull);
- Assert(!isnull);
- subname = pstrdup(NameStr(*DatumGetName(datum)));
-
- /* Get conninfo */
- datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
- Anum_pg_subscription_subconninfo, &isnull);
- Assert(!isnull);
- conninfo = TextDatumGetCString(datum);
-
- /* Get slotname */
- datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
- Anum_pg_subscription_subslotname, &isnull);
- if (!isnull)
- slotname = pstrdup(NameStr(*DatumGetName(datum)));
- else
- slotname = NULL;
+ /* Lock the subscription so nobody else can do anything with it. */
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
/*
* Since dropping a replication slot is not transactional, the replication
@@ -897,7 +875,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* of a subscription that is associated with a replication slot", but we
* don't have the proper facilities for that.
*/
- if (slotname)
+ if (sub->slotname)
PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
@@ -946,7 +924,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* If there is no slot associated with the subscription, we can finish
* here.
*/
- if (!slotname)
+ if (!sub->slotname)
{
heap_close(rel, NoLock);
return;
@@ -959,13 +937,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
load_file("libpqwalreceiver", false);
initStringInfo(&cmd);
- appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname));
+ appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s",
+ quote_identifier(sub->slotname));
- wrconn = walrcv_connect(conninfo, true, subname, &err);
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
if (wrconn == NULL)
ereport(ERROR,
(errmsg("could not connect to publisher when attempting to "
- "drop the replication slot \"%s\"", slotname),
+ "drop the replication slot \"%s\"", sub->slotname),
errdetail("The error was: %s", err),
errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
"to disassociate the subscription from the slot.")));
@@ -979,12 +958,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
(errmsg("could not drop the replication slot \"%s\" on publisher",
- slotname),
+ sub->slotname),
errdetail("The error was: %s", res->err)));
else
ereport(NOTICE,
(errmsg("dropped replication slot \"%s\" on publisher",
- slotname)));
+ sub->slotname)));
walrcv_clear_result(res);
}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index caf4844..eea125b 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -42,6 +42,7 @@
#include "replication/worker_internal.h"
#include "storage/ipc.h"
+#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
@@ -94,33 +95,23 @@ static bool on_commit_launcher_wakeup = false;
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
-
/*
- * Load the list of subscriptions.
- *
- * Only the fields interesting for worker start/stop functions are filled for
- * each subscription.
+ * Load the list of emabled subscription oids.
*/
static List *
-get_subscription_list(void)
+get_subscription_oids(void)
{
List *res = NIL;
Relation rel;
HeapScanDesc scan;
HeapTuple tup;
- MemoryContext resultcxt;
-
- /* This is the context that we will allocate our output data in */
- resultcxt = CurrentMemoryContext;
/*
- * Start a transaction so we can access pg_database, and get a snapshot.
* We don't have a use for the snapshot itself, but we're interested in
* the secondary effect that it sets RecentGlobalXmin. (This is critical
* for anything that reads heap pages, because HOT may decide to prune
* them even if the process doesn't attempt to modify any tuples.)
*/
- StartTransactionCommand();
(void) GetTransactionSnapshot();
rel = heap_open(SubscriptionRelationId, AccessShareLock);
@@ -129,34 +120,17 @@ get_subscription_list(void)
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
{
Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
- Subscription *sub;
- MemoryContext oldcxt;
- /*
- * Allocate our results in the caller's context, not the
- * transaction's. We do this inside the loop, and restore the original
- * context at the end, so that leaky things like heap_getnext() are
- * not called in a potentially long-lived context.
- */
- oldcxt = MemoryContextSwitchTo(resultcxt);
-
- sub = (Subscription *) palloc0(sizeof(Subscription));
- sub->oid = HeapTupleGetOid(tup);
- sub->dbid = subform->subdbid;
- sub->owner = subform->subowner;
- sub->enabled = subform->subenabled;
- sub->name = pstrdup(NameStr(subform->subname));
- /* We don't fill fields we are not interested in. */
-
- res = lappend(res, sub);
- MemoryContextSwitchTo(oldcxt);
+ /* We only care about enabled subscriptions. */
+ if (!subform->subenabled)
+ continue;
+
+ res = lappend_oid(res, HeapTupleGetOid(tup));
}
heap_endscan(scan);
heap_close(rel, AccessShareLock);
- CommitTransactionCommand();
-
return res;
}
@@ -258,23 +232,62 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
}
/*
- * Start new apply background worker.
+ * Similar as logicalrep_worker_find(), but returns list of all workers
+ * for the subscription instead just one.
+ */
+List *
+logicalrep_sub_workers_find(Oid subid, bool only_running)
+{
+ int i;
+ List *res = NIL;
+
+ Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+ /* Search for attached worker for a given subscription id. */
+ for (i = 0; i < max_logical_replication_workers; i++)
+ {
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+ if (w->in_use && w->subid == subid && (!only_running || w->proc))
+ res = lappend(res, w);
+ }
+
+ return res;
+}
+
+/*
+ * Start new logical replication background worker.
*/
void
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
- Oid relid)
+logicalrep_worker_launch(Oid subid, Oid relid)
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
int i;
int slot = 0;
- LogicalRepWorker *worker = NULL;
- int nsyncworkers;
+ List *subworkers;
+ ListCell *lc;
TimestampTz now;
+ int nsyncworkers = 0;
+ Subscription *sub;
+ LogicalRepWorker *worker = NULL;
ereport(DEBUG1,
- (errmsg("starting logical replication worker for subscription \"%s\"",
- subname)));
+ (errmsg("starting logical replication worker for subscription %u",
+ subid)));
+
+ /* Block any concurrent DDL on the subscription. */
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+ /* Get info about subscription. */
+ sub = GetSubscription(subid, true);
+ if (!sub)
+ {
+ ereport(DEBUG1,
+ (errmsg("subscription %u not found, not starting worker for it",
+ subid)));
+ return;
+ }
/* Report this after the initial starting message for consistency. */
if (max_replication_slots == 0)
@@ -302,7 +315,14 @@ retry:
}
}
- nsyncworkers = logicalrep_sync_worker_count(subid);
+ subworkers = logicalrep_sub_workers_find(subid, false);
+ foreach (lc, subworkers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+ if (w->relid != InvalidOid)
+ nsyncworkers ++;
+ }
+ list_free(subworkers);
now = GetCurrentTimestamp();
@@ -348,6 +368,7 @@ retry:
if (nsyncworkers >= max_sync_workers_per_subscription)
{
LWLockRelease(LogicalRepWorkerLock);
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
return;
}
@@ -358,6 +379,7 @@ retry:
if (worker == NULL)
{
LWLockRelease(LogicalRepWorkerLock);
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of logical replication worker slots"),
@@ -370,8 +392,8 @@ retry:
worker->in_use = true;
worker->generation++;
worker->proc = NULL;
- worker->dbid = dbid;
- worker->userid = userid;
+ worker->dbid = sub->dbid;
+ worker->userid = sub->owner;
worker->subid = subid;
worker->relid = relid;
worker->relstate = SUBREL_STATE_UNKNOWN;
@@ -382,8 +404,6 @@ retry:
worker->reply_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->reply_time);
- LWLockRelease(LogicalRepWorkerLock);
-
/* Register the new dynamic worker. */
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
@@ -402,8 +422,13 @@ retry:
bgw.bgw_notify_pid = MyProcPid;
bgw.bgw_main_arg = Int32GetDatum(slot);
+ /* Try to register the worker and cleanup in case of failure. */
if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
{
+ logicalrep_worker_cleanup(worker);
+ LWLockRelease(LogicalRepWorkerLock);
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"),
@@ -411,13 +436,24 @@ retry:
return;
}
+ /* Done with the worker array. */
+ LWLockRelease(LogicalRepWorkerLock);
+
/* Now wait until it attaches. */
WaitForReplicationWorkerAttach(worker, bgw_handle);
+
+ /*
+ * Worker either started or died, in any case we are done with the
+ * subscription.
+ */
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
}
/*
* Stop the logical replication worker for subid/relid, if any, and wait until
* it detaches from the slot.
+ *
+ * Callers of this function better have exclusive lock on the subscription.
*/
void
logicalrep_worker_stop(Oid subid, Oid relid)
@@ -425,7 +461,8 @@ logicalrep_worker_stop(Oid subid, Oid relid)
LogicalRepWorker *worker;
uint16 generation;
- LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ /* Exclusive is needed for logicalrep_worker_cleanup(). */
+ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
worker = logicalrep_worker_find(subid, relid, false);
@@ -436,56 +473,20 @@ logicalrep_worker_stop(Oid subid, Oid relid)
return;
}
+ /* If there is worker but it's not running, clean it up. */
+ if (!worker->proc)
+ {
+ logicalrep_worker_cleanup(worker);
+ LWLockRelease(LogicalRepWorkerLock);
+ return;
+ }
+
/*
* Remember which generation was our worker so we can check if what we see
* is still the same one.
*/
generation = worker->generation;
- /*
- * If we found a worker but it does not have proc set then it is still
- * starting up; wait for it to finish starting and then kill it.
- */
- while (worker->in_use && !worker->proc)
- {
- int rc;
-
- LWLockRelease(LogicalRepWorkerLock);
-
- /* Wait a bit --- we don't expect to have to wait long. */
- rc = WaitLatch(MyLatch,
- WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
- 10L, WAIT_EVENT_BGWORKER_STARTUP);
-
- /* emergency bailout if postmaster has died */
- if (rc & WL_POSTMASTER_DEATH)
- proc_exit(1);
-
- if (rc & WL_LATCH_SET)
- {
- ResetLatch(MyLatch);
- CHECK_FOR_INTERRUPTS();
- }
-
- /* Recheck worker status. */
- LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-
- /*
- * Check whether the worker slot is no longer used, which would mean
- * that the worker has exited, or whether the worker generation is
- * different, meaning that a different worker has taken the slot.
- */
- if (!worker->in_use || worker->generation != generation)
- {
- LWLockRelease(LogicalRepWorkerLock);
- return;
- }
-
- /* Worker has assigned proc, so it has started. */
- if (worker->proc)
- break;
- }
-
/* Now terminate the worker ... */
kill(worker->proc->pid, SIGTERM);
@@ -515,6 +516,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
CHECK_FOR_INTERRUPTS();
}
+ /*
+ * Shared lock is enough for the loop as we don't need to do the slot
+ * cleanup because at this point we know that the worker has attached
+ * to the shmem and will clean the slot on detach automatically.
+ */
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
}
@@ -682,30 +688,6 @@ logicalrep_launcher_sighup(SIGNAL_ARGS)
}
/*
- * Count the number of registered (not necessarily running) sync workers
- * for a subscription.
- */
-int
-logicalrep_sync_worker_count(Oid subid)
-{
- int i;
- int res = 0;
-
- Assert(LWLockHeldByMe(LogicalRepWorkerLock));
-
- /* Search for attached worker for a given subscription id. */
- for (i = 0; i < max_logical_replication_workers; i++)
- {
- LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-
- if (w->subid == subid && OidIsValid(w->relid))
- res++;
- }
-
- return res;
-}
-
-/*
* ApplyLauncherShmemSize
* Compute space needed for replication launcher shared memory
*/
@@ -875,8 +857,6 @@ ApplyLauncherMain(Datum main_arg)
int rc;
List *sublist;
ListCell *lc;
- MemoryContext subctx;
- MemoryContext oldctx;
TimestampTz now;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
@@ -888,41 +868,38 @@ ApplyLauncherMain(Datum main_arg)
if (TimestampDifferenceExceeds(last_start_time, now,
wal_retrieve_retry_interval))
{
- /* Use temporary context for the database list and worker info. */
- subctx = AllocSetContextCreate(TopMemoryContext,
- "Logical Replication Launcher sublist",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
- oldctx = MemoryContextSwitchTo(subctx);
-
- /* search for subscriptions to start or stop. */
- sublist = get_subscription_list();
-
- /* Start the missing workers for enabled subscriptions. */
+ /*
+ * Start new transaction so that we can take locks and snapshots.
+ *
+ * Any allocations will also be made inside the transaction memory
+ * context.
+ */
+ StartTransactionCommand();
+
+ /* Search for subscriptions to start. */
+ sublist = get_subscription_oids();
+
+ /* Start the missing workers. */
foreach(lc, sublist)
{
- Subscription *sub = (Subscription *) lfirst(lc);
+ Oid subid = lfirst_oid(lc);
LogicalRepWorker *w;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+ w = logicalrep_worker_find(subid, InvalidOid, false);
LWLockRelease(LogicalRepWorkerLock);
- if (sub->enabled && w == NULL)
+ if (w == NULL)
{
last_start_time = now;
wait_time = wal_retrieve_retry_interval;
- logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
- sub->owner, InvalidOid);
+ /* Start the worker. */
+ logicalrep_worker_launch(subid, InvalidOid);
}
}
- /* Switch back to original memory context. */
- MemoryContextSwitchTo(oldctx);
- /* Clean the temporary memory. */
- MemoryContextDelete(subctx);
+ CommitTransactionCommand();
}
else
{
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 2f6c7b4..d49d186 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -102,12 +102,13 @@
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
-#include "utils/snapmgr.h"
#include "storage/ipc.h"
+#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/snapmgr.h"
static bool table_states_valid = false;
@@ -211,10 +212,8 @@ wait_for_relation_state_change(Oid relid, char expected_state)
* worker to the expected one.
*
* Used when transitioning from SYNCWAIT state to CATCHUP.
- *
- * Returns false if the apply worker has disappeared.
*/
-static bool
+static void
wait_for_worker_state_change(char expected_state)
{
int rc;
@@ -230,7 +229,7 @@ wait_for_worker_state_change(char expected_state)
* enough to not give a misleading answer if we do it with no lock.)
*/
if (MyLogicalRepWorker->relstate == expected_state)
- return true;
+ return;
/*
* Bail out if the apply worker has died, else signal it we're
@@ -243,7 +242,10 @@ wait_for_worker_state_change(char expected_state)
logicalrep_worker_wakeup_ptr(worker);
LWLockRelease(LogicalRepWorkerLock);
if (!worker)
- break;
+ ereport(FATAL,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("terminating logical replication synchronization "
+ "worker due to subscription apply worker exit")));
/*
* Wait. We expect to get a latch signal back from the apply worker,
@@ -260,8 +262,6 @@ wait_for_worker_state_change(char expected_state)
if (rc & WL_LATCH_SET)
ResetLatch(MyLatch);
}
-
- return false;
}
/*
@@ -346,6 +346,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Assert(!IsTransactionState());
+#define ensure_transaction_and_lock() \
+ if (!started_tx) \
+ {\
+ StartTransactionCommand(); \
+ LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, \
+ AccessShareLock); \
+ started_tx = true; \
+ }
+
/* We need up-to-date sync state info for subscription tables here. */
if (!table_states_valid)
{
@@ -358,8 +367,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
list_free_deep(table_states);
table_states = NIL;
- StartTransactionCommand();
- started_tx = true;
+ ensure_transaction_and_lock();
/* Fetch all non-ready tables. */
rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
@@ -421,11 +429,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
{
rstate->state = SUBREL_STATE_READY;
rstate->lsn = current_lsn;
- if (!started_tx)
- {
- StartTransactionCommand();
- started_tx = true;
- }
+
+ ensure_transaction_and_lock();
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state,
@@ -476,12 +481,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* Enter busy loop and wait for synchronization worker to
* reach expected state (or die trying).
*/
- if (!started_tx)
- {
- StartTransactionCommand();
- started_tx = true;
- }
-
+ ensure_transaction_and_lock();
wait_for_relation_state_change(rstate->relid,
SUBREL_STATE_SYNCDONE);
}
@@ -495,8 +495,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* running sync workers for this subscription, while we have
* the lock.
*/
- int nsyncworkers =
- logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+ List *subworkers;
+ ListCell *lc;
+ int nsyncworkers = 0;
+
+ subworkers = logicalrep_sub_workers_find(MySubscription->oid,
+ false);
+ foreach (lc, subworkers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+ if (w->relid != InvalidOid)
+ nsyncworkers ++;
+ }
+ list_free(subworkers);
/* Now safe to release the LWLock */
LWLockRelease(LogicalRepWorkerLock);
@@ -518,10 +529,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
TimestampDifferenceExceeds(hentry->last_start_time, now,
wal_retrieve_retry_interval))
{
- logicalrep_worker_launch(MyLogicalRepWorker->dbid,
- MySubscription->oid,
- MySubscription->name,
- MyLogicalRepWorker->userid,
+ ensure_transaction_and_lock();
+ logicalrep_worker_launch(MySubscription->oid,
rstate->relid);
hentry->last_start_time = now;
}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 402b166..add7841 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -71,14 +71,13 @@ extern bool in_remote_transaction;
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
-extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
- Oid userid, Oid relid);
+extern void logicalrep_worker_launch(Oid subid, Oid relid);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
-extern int logicalrep_sync_worker_count(Oid subid);
+extern List *logicalrep_sub_workers_find(Oid subid, bool only_running);
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
void process_syncing_tables(XLogRecPtr current_lsn);
--
2.7.4
On Sat, Jul 8, 2017 at 5:19 AM, Petr Jelinek
<petr.jelinek@2ndquadrant.com> wrote:
Hi,
I have done some review of subscription handling (well self-review) and
here is the result of that (It's slightly improved version from another
thread [1]).
Thank you for working on this and patches!
I split it into several patches:
0001 - Makes subscription worker exit nicely when there is subscription
missing (ie was removed while it was starting) and also makes disabled
message look same as the message when disabled state was detected while
worker is running. This is mostly just nicer behavior for user (ie no
unexpected errors in log when you just disabled the subscription).0002 - This is bugfix - the sync worker should exit when waiting for
apply and apply dies otherwise there is possibility of not being
correctly synchronized.0003 - Splits the schizophrenic SetSubscriptionRelState function into
two which don't try to do broken upsert and report proper errors instead.0004 - Solves the issue which Masahiko Sawada reported [2] about ALTER
SUBSCRIPTION handling of workers not being transactional - we now do
killing of workers transactionally (and we do the same when possible in
DROP SUBSCRIPTION).0005 - Bugfix to allow syscache access to subscription without being
connected to database - this should work since subscription is pinned
catalog, it wasn't caught because nothing in core is using it (yet).0006 - Makes the locking of subscriptions more granular (this depends on
0005). This removes the ugly AccessExclusiveLock on the pg_subscription
catalog from DROP SUBSCRIPTION and also solves some potential race
conditions between launcher, ALTER SUBSCRIPTION and
process_syncing_tables_for_apply(). Also simplifies memory handling in
launcher as well as logicalrep_worker_stop() function. This basically
makes subscriptions behave like every other object in the database in
terms of locking.Only the 0002, 0004 and 0005 are actual bug fixes, but I'd still like to
get it all into PG10 as especially the locking now behaves really
differently than everything else and that does not seem like a good idea.
I'm now planing to review 0002, 0004 and 0005 patches first as they
are bug fixes. Should we add them to the open item list?
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Jul 12, 2017 at 11:19 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Sat, Jul 8, 2017 at 5:19 AM, Petr Jelinek
<petr.jelinek@2ndquadrant.com> wrote:Hi,
I have done some review of subscription handling (well self-review) and
here is the result of that (It's slightly improved version from another
thread [1]).Thank you for working on this and patches!
I split it into several patches:
0001 - Makes subscription worker exit nicely when there is subscription
missing (ie was removed while it was starting) and also makes disabled
message look same as the message when disabled state was detected while
worker is running. This is mostly just nicer behavior for user (ie no
unexpected errors in log when you just disabled the subscription).0002 - This is bugfix - the sync worker should exit when waiting for
apply and apply dies otherwise there is possibility of not being
correctly synchronized.0003 - Splits the schizophrenic SetSubscriptionRelState function into
two which don't try to do broken upsert and report proper errors instead.0004 - Solves the issue which Masahiko Sawada reported [2] about ALTER
SUBSCRIPTION handling of workers not being transactional - we now do
killing of workers transactionally (and we do the same when possible in
DROP SUBSCRIPTION).0005 - Bugfix to allow syscache access to subscription without being
connected to database - this should work since subscription is pinned
catalog, it wasn't caught because nothing in core is using it (yet).0006 - Makes the locking of subscriptions more granular (this depends on
0005). This removes the ugly AccessExclusiveLock on the pg_subscription
catalog from DROP SUBSCRIPTION and also solves some potential race
conditions between launcher, ALTER SUBSCRIPTION and
process_syncing_tables_for_apply(). Also simplifies memory handling in
launcher as well as logicalrep_worker_stop() function. This basically
makes subscriptions behave like every other object in the database in
terms of locking.Only the 0002, 0004 and 0005 are actual bug fixes, but I'd still like to
get it all into PG10 as especially the locking now behaves really
differently than everything else and that does not seem like a good idea.I'm now planing to review 0002, 0004 and 0005 patches first as they
are bug fixes.
I've reviewed 0002, 0004 and 0005 patches briefly. Here are some comments.
--
0002-Exit-in-sync-worker-if-relation-was-removed-during-s.patch
As Petr mentioned, if the table subscription is removed before the
table sync worker gets the subscription relation entry,
GetSubscriptionRelState could returns SUBREL_STATE_UNKNOWN and this
issue happens.
Actually we now can handle this case properly by commit
8dc7c338129d22a52d4afcf2f83a73041119efda. So this seems to be an
improvement and not be a release blocker. Therefore for this patch, I
think it's better to do ereport in the
switch(MyLogicalRepWorker->relstate) block.
BTW, since missing_ok of GetSubscriptionRelState() is always set to
true I think that we can remove it.
Also because the reason I mention at later part this fix might not be necessary.
--
0004-Only-kill-sync-workers-at-commit-time-in-SUBSCRIPTIO.patch
+ /*
+ * If we are dropping slot, stop all the subscription workers
immediately
+ * so that the slot is accessible, otherwise just shedule the
stop at the
+ * end of the transaction.
+ *
+ * New workers won't be started because we hold exclusive lock on the
+ * subscription till the end of transaction.
+ */
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ subworkers = logicalrep_sub_workers_find(subid, false);
+ LWLockRelease(LogicalRepWorkerLock);
+ foreach (lc, subworkers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+ if (sub->slotname)
+ logicalrep_worker_stop(w->subid, w->relid);
+ else
+ logicalrep_worker_stop_at_commit(w->subid, w->relid);
+ }
+ list_free(subworkers);
I think if we kill the all workers including table sync workers then
the fix by 0002 patch is not necessary actually, because the table
sync worker will not see that the subscription relation state has been
removed.
Also, logicalrep_sub_workers_find() function is defined in 0006 patch
but it would be better to move it to 0004 patch.
--
0005-Allow-syscache-access-to-subscriptions-in-database-l.patch
Do we need to update the comment at the top of IndexScanOK()?
To summary, I think we now have only one issue; ALTER SUBSCRIPTION is
not transactional, 0004 patch is addressing this issue . 0002 patch
seems an improvement patch to me, and it might be resolved by 0004
patch. 0005 patch is required by 0006 patch which is an improvement
patch. Am I missing something?
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, Jul 07, 2017 at 10:19:19PM +0200, Petr Jelinek wrote:
I have done some review of subscription handling (well self-review) and
here is the result of that (It's slightly improved version from another
thread [1]).
Only the 0002, 0004 and 0005 are actual bug fixes, but I'd still like to
get it all into PG10 as especially the locking now behaves really
differently than everything else and that does not seem like a good idea.
[Action required within three days. This is a generic notification.]
The above-described topic is currently a PostgreSQL 10 open item. Peter,
since you committed the patch believed to have created it, you own this open
item. If some other commit is more relevant or if this does not belong as a
v10 open item, please let us know. Otherwise, please observe the policy on
open item ownership[1]/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com and send a status update within three calendar days of
this message. Include a date for your subsequent status update. Testers may
discover new open items at any time, and I want to plan to get them all fixed
well in advance of shipping v10. Consequently, I will appreciate your efforts
toward speedy resolution. Thanks.
[1]: /messages/by-id/20170404140717.GA2675809@tornado.leadboat.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 8/1/17 00:17, Noah Misch wrote:
The above-described topic is currently a PostgreSQL 10 open item. Peter,
since you committed the patch believed to have created it, you own this open
item. If some other commit is more relevant or if this does not belong as a
v10 open item, please let us know. Otherwise, please observe the policy on
open item ownership[1] and send a status update within three calendar days of
this message. Include a date for your subsequent status update. Testers may
discover new open items at any time, and I want to plan to get them all fixed
well in advance of shipping v10. Consequently, I will appreciate your efforts
toward speedy resolution. Thanks.
I'm looking into this now and will report by Friday.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Petr Jelinek wrote:
I split it into several patches:
I wish you'd stop splitting error message strings across multiple lines.
I've been trapped by a faulty grep not matching a split error message a
number of times :-( I know by now to remove words until I get a match,
but it seems an unnecessary trap for the unwary.
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Alvaro Herrera <alvherre@2ndquadrant.com> writes:
I wish you'd stop splitting error message strings across multiple lines.
I've been trapped by a faulty grep not matching a split error message a
number of times :-( I know by now to remove words until I get a match,
but it seems an unnecessary trap for the unwary.
Yeah, that's my number one reason for not splitting error messages, too.
It's particularly nasty if similar strings appear in multiple places and
they're not all split alike, as you can get misled into thinking that a
reported error must have occurred in a place you found, rather than
someplace you didn't.
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
* Tom Lane (tgl@sss.pgh.pa.us) wrote:
Alvaro Herrera <alvherre@2ndquadrant.com> writes:
I wish you'd stop splitting error message strings across multiple lines.
I've been trapped by a faulty grep not matching a split error message a
number of times :-( I know by now to remove words until I get a match,
but it seems an unnecessary trap for the unwary.Yeah, that's my number one reason for not splitting error messages, too.
It's particularly nasty if similar strings appear in multiple places and
they're not all split alike, as you can get misled into thinking that a
reported error must have occurred in a place you found, rather than
someplace you didn't.
+1.
Thanks!
Stephen
On 7/13/17 23:53, Masahiko Sawada wrote:
To summary, I think we now have only one issue; ALTER SUBSCRIPTION is
not transactional, 0004 patch is addressing this issue .
We have two competing patches for this issue. This patch moves the
killing to the end of the DDL transaction. Your earlier patch makes the
tablesync work itself responsible for exiting. Do you wish to comment
which direction to pursue? (Doing both might also be an option?)
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, Aug 4, 2017 at 2:17 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:
On 7/13/17 23:53, Masahiko Sawada wrote:
To summary, I think we now have only one issue; ALTER SUBSCRIPTION is
not transactional, 0004 patch is addressing this issue .We have two competing patches for this issue. This patch moves the
killing to the end of the DDL transaction. Your earlier patch makes the
tablesync work itself responsible for exiting. Do you wish to comment
which direction to pursue? (Doing both might also be an option?)
To make ALTER SUBSCRIPTION REFRESH being transactional, I prefer
Petr's proposal. Because it can make ALTER SUBSCRIPTION and DROP
SUBSCRIPTION stop the table sync workers that are in progress of
copying data. I'm not sure killing table sync workers in DDL commands
would be acceptable but since it can free unnecessary slots of logical
replication workers and replication slots I'd prefer this idea.
However, even with this patch there is still an issue that NOTICE
messages "removed subscription for table public.t1" can be appeared
even if we rollback ALTER SUBSCRIPTION REFRESH command as I mentioned
on earlier thread. Since I think this behaviour will confuse users who
check server logs I'd like to deal with it, I don't have a good idea
though.
Also, I think we can incorporate the idea of my earlier proposal with
some changes (i.g. I'd choose the third option). In current
implementation, in case where a subscription relation state is
accidentally removed while the corresponding table sync worker is
progress of copying data, it cannot exit from a loop in
wait_for_worker_state_change unless the apply worker dies. So to be
more robust, table sync workers can finish with an error if its
subscription relation state has disappeared.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 8/4/17 12:02, Masahiko Sawada wrote:
To make ALTER SUBSCRIPTION REFRESH being transactional, I prefer
Petr's proposal. Because it can make ALTER SUBSCRIPTION and DROP
SUBSCRIPTION stop the table sync workers that are in progress of
copying data. I'm not sure killing table sync workers in DDL commands
would be acceptable but since it can free unnecessary slots of logical
replication workers and replication slots I'd prefer this idea.
OK, I have committed the 0004 patch.
However, even with this patch there is still an issue that NOTICE
messages "removed subscription for table public.t1" can be appeared
even if we rollback ALTER SUBSCRIPTION REFRESH command as I mentioned
on earlier thread. Since I think this behaviour will confuse users who
check server logs I'd like to deal with it, I don't have a good idea
though.
Maybe we can just remove those messages?
We don't get messages when we create a subscription about which tables
are part of it. So why do we need such messages when we refresh a
subscription?
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Aug 02, 2017 at 04:09:43PM -0400, Peter Eisentraut wrote:
On 8/1/17 00:17, Noah Misch wrote:
The above-described topic is currently a PostgreSQL 10 open item. Peter,
since you committed the patch believed to have created it, you own this open
item. If some other commit is more relevant or if this does not belong as a
v10 open item, please let us know. Otherwise, please observe the policy on
open item ownership[1] and send a status update within three calendar days of
this message. Include a date for your subsequent status update. Testers may
discover new open items at any time, and I want to plan to get them all fixed
well in advance of shipping v10. Consequently, I will appreciate your efforts
toward speedy resolution. Thanks.I'm looking into this now and will report by Friday.
This PostgreSQL 10 open item is past due for your status update. Kindly send
a status update within 24 hours, and include a date for your subsequent status
update. Refer to the policy on open item ownership:
/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Sat, Aug 5, 2017 at 10:29 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:
On 8/4/17 12:02, Masahiko Sawada wrote:
To make ALTER SUBSCRIPTION REFRESH being transactional, I prefer
Petr's proposal. Because it can make ALTER SUBSCRIPTION and DROP
SUBSCRIPTION stop the table sync workers that are in progress of
copying data. I'm not sure killing table sync workers in DDL commands
would be acceptable but since it can free unnecessary slots of logical
replication workers and replication slots I'd prefer this idea.OK, I have committed the 0004 patch.
Thank you!
However, even with this patch there is still an issue that NOTICE
messages "removed subscription for table public.t1" can be appeared
even if we rollback ALTER SUBSCRIPTION REFRESH command as I mentioned
on earlier thread. Since I think this behaviour will confuse users who
check server logs I'd like to deal with it, I don't have a good idea
though.Maybe we can just remove those messages?
We don't get messages when we create a subscription about which tables
are part of it. So why do we need such messages when we refresh a
subscription?
I think that the messages is useful when we add/remove tables to/from
the publication and then refresh the subscription, so we might want to
change it to DEBUG rather than elimination.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Sun, Aug 6, 2017 at 7:44 AM, Noah Misch <noah@leadboat.com> wrote:
On Wed, Aug 02, 2017 at 04:09:43PM -0400, Peter Eisentraut wrote:
On 8/1/17 00:17, Noah Misch wrote:
The above-described topic is currently a PostgreSQL 10 open item. Peter,
since you committed the patch believed to have created it, you own this open
item. If some other commit is more relevant or if this does not belong as a
v10 open item, please let us know. Otherwise, please observe the policy on
open item ownership[1] and send a status update within three calendar days of
this message. Include a date for your subsequent status update. Testers may
discover new open items at any time, and I want to plan to get them all fixed
well in advance of shipping v10. Consequently, I will appreciate your efforts
toward speedy resolution. Thanks.I'm looking into this now and will report by Friday.
This PostgreSQL 10 open item is past due for your status update. Kindly send
a status update within 24 hours, and include a date for your subsequent status
update. Refer to the policy on open item ownership:
/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com
I think this open item has closed by the commit
7e174fa793a2df89fe03d002a5087ef67abcdde8 ?
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 8/7/17 00:32, Masahiko Sawada wrote:
On Sun, Aug 6, 2017 at 7:44 AM, Noah Misch <noah@leadboat.com> wrote:
On Wed, Aug 02, 2017 at 04:09:43PM -0400, Peter Eisentraut wrote:
On 8/1/17 00:17, Noah Misch wrote:
The above-described topic is currently a PostgreSQL 10 open item. Peter,
since you committed the patch believed to have created it, you own this open
item. If some other commit is more relevant or if this does not belong as a
v10 open item, please let us know. Otherwise, please observe the policy on
open item ownership[1] and send a status update within three calendar days of
this message. Include a date for your subsequent status update. Testers may
discover new open items at any time, and I want to plan to get them all fixed
well in advance of shipping v10. Consequently, I will appreciate your efforts
toward speedy resolution. Thanks.I'm looking into this now and will report by Friday.
This PostgreSQL 10 open item is past due for your status update. Kindly send
a status update within 24 hours, and include a date for your subsequent status
update. Refer to the policy on open item ownership:
/messages/by-id/20170404140717.GA2675809@tornado.leadboat.comI think this open item has closed by the commit
7e174fa793a2df89fe03d002a5087ef67abcdde8 ?
Yes. I have updated the wiki page now.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 8/7/17 00:27, Masahiko Sawada wrote:
However, even with this patch there is still an issue that NOTICE
messages "removed subscription for table public.t1" can be appeared
even if we rollback ALTER SUBSCRIPTION REFRESH command as I mentioned
on earlier thread. Since I think this behaviour will confuse users who
check server logs I'd like to deal with it, I don't have a good idea
though.Maybe we can just remove those messages?
We don't get messages when we create a subscription about which tables
are part of it. So why do we need such messages when we refresh a
subscription?I think that the messages is useful when we add/remove tables to/from
the publication and then refresh the subscription, so we might want to
change it to DEBUG rather than elimination.
Good idea. Done that way.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Aug 7, 2017 at 10:22 PM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:
On 8/7/17 00:27, Masahiko Sawada wrote:
However, even with this patch there is still an issue that NOTICE
messages "removed subscription for table public.t1" can be appeared
even if we rollback ALTER SUBSCRIPTION REFRESH command as I mentioned
on earlier thread. Since I think this behaviour will confuse users who
check server logs I'd like to deal with it, I don't have a good idea
though.Maybe we can just remove those messages?
We don't get messages when we create a subscription about which tables
are part of it. So why do we need such messages when we refresh a
subscription?I think that the messages is useful when we add/remove tables to/from
the publication and then refresh the subscription, so we might want to
change it to DEBUG rather than elimination.Good idea. Done that way.
Thank you!
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Petr,
On Thu, Aug 3, 2017 at 5:24 AM, Stephen Frost <sfrost@snowman.net> wrote:
* Tom Lane (tgl@sss.pgh.pa.us) wrote:
Alvaro Herrera <alvherre@2ndquadrant.com> writes:
I wish you'd stop splitting error message strings across multiple lines.
I've been trapped by a faulty grep not matching a split error message a
number of times :-( I know by now to remove words until I get a match,
but it seems an unnecessary trap for the unwary.Yeah, that's my number one reason for not splitting error messages, too.
It's particularly nasty if similar strings appear in multiple places and
they're not all split alike, as you can get misled into thinking that a
reported error must have occurred in a place you found, rather than
someplace you didn't.+1.
Are you planning to work on remaining patches 0005 and 0006 that
improve the subscription codes in PG11 cycle? If not, I will take over
them and work on the next CF.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 8/8/17 05:58, Masahiko Sawada wrote:
Are you planning to work on remaining patches 0005 and 0006 that
improve the subscription codes in PG11 cycle? If not, I will take over
them and work on the next CF.
Based on your assessment, the remaining patches were not required bug
fixes. So I think preparing them for the next commit fest would be great.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Aug 17, 2017 at 9:10 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:
On 8/8/17 05:58, Masahiko Sawada wrote:
Are you planning to work on remaining patches 0005 and 0006 that
improve the subscription codes in PG11 cycle? If not, I will take over
them and work on the next CF.Based on your assessment, the remaining patches were not required bug
fixes. So I think preparing them for the next commit fest would be great.
Thank you for the comment.
After more thought, since 0001 and 0003 patches on the first mail also
improve the subscription codes and are worth to be considered, I
picked total 4 patches up and updated them. I'm planning to work on
these patches in the next CF.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
0001-Improve-messaging-during-logical-replication-worker-_v2.patchapplication/octet-stream; name=0001-Improve-messaging-during-logical-replication-worker-_v2.patchDownload
From 0d67184cc88ff807822bd86fd1862ff5f670da6d Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 17 Aug 2017 14:01:04 -0700
Subject: [PATCH 1/4] Improve messaging during logical replication worker startup
---
src/backend/replication/logical/worker.c | 11 +++++------
1 files changed, 5 insertions(+), 6 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7c2df57..4520c6e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1532,20 +1532,19 @@ ApplyWorkerMain(Datum main_arg)
MySubscriptionValid = true;
MemoryContextSwitchTo(oldctx);
- /* Setup synchronous commit according to the user's wishes */
- SetConfigOption("synchronous_commit", MySubscription->synccommit,
- PGC_BACKEND, PGC_S_OVERRIDE);
-
if (!MySubscription->enabled)
{
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will not "
- "start because the subscription was disabled during startup",
+ (errmsg("logical replication apply worker for subscription \"%s\" will stop because the subscription was disabled",
MySubscription->name)));
proc_exit(0);
}
+ /* Setup synchronous commit according to the user's wishes */
+ SetConfigOption("synchronous_commit", MySubscription->synccommit,
+ PGC_BACKEND, PGC_S_OVERRIDE);
+
/* Keep us informed about subscription changes. */
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
subscription_change_cb,
--
1.7.1
0002-Split-the-SetSubscriptionRelState-function-into-two_v2.patchapplication/octet-stream; name=0002-Split-the-SetSubscriptionRelState-function-into-two_v2.patchDownload
From 47487786cb969e49ad3cd18d7258d72d03885b57 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 17 Aug 2017 14:03:24 -0700
Subject: [PATCH 2/4] Split the SetSubscriptionRelState function into two
---
src/backend/catalog/pg_subscription.c | 133 +++++++++++++++------------
src/backend/commands/subscriptioncmds.c | 8 +-
src/backend/replication/logical/tablesync.c | 34 +++----
src/include/catalog/pg_subscription_rel.h | 6 +-
4 files changed, 98 insertions(+), 83 deletions(-)
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index fb53d71..b643e54 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -227,24 +227,15 @@ textarray_to_stringlist(ArrayType *textarray)
}
/*
- * Set the state of a subscription table.
- *
- * If update_only is true and the record for given table doesn't exist, do
- * nothing. This can be used to avoid inserting a new record that was deleted
- * by someone else. Generally, subscription DDL commands should use false,
- * workers should use true.
- *
- * The insert-or-update logic in this function is not concurrency safe so it
- * might raise an error in rare circumstances. But if we took a stronger lock
- * such as ShareRowExclusiveLock, we would risk more deadlocks.
+ * Add new state record for a subscription table.
*/
Oid
-SetSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn, bool update_only)
+AddSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn)
{
Relation rel;
HeapTuple tup;
- Oid subrelid = InvalidOid;
+ Oid subrelid;
bool nulls[Natts_pg_subscription_rel];
Datum values[Natts_pg_subscription_rel];
@@ -256,57 +247,81 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
ObjectIdGetDatum(relid),
ObjectIdGetDatum(subid));
+ if (HeapTupleIsValid(tup))
+ elog(ERROR, "subscription table %u in subscription %u already exists",
+ relid, subid);
- /*
- * If the record for given table does not exist yet create new record,
- * otherwise update the existing one.
- */
- if (!HeapTupleIsValid(tup) && !update_only)
- {
- /* Form the tuple. */
- memset(values, 0, sizeof(values));
- memset(nulls, false, sizeof(nulls));
- values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
- values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
- values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
- if (sublsn != InvalidXLogRecPtr)
- values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
- else
- nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
-
- tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
-
- /* Insert tuple into catalog. */
- subrelid = CatalogTupleInsert(rel, tup);
-
- heap_freetuple(tup);
- }
- else if (HeapTupleIsValid(tup))
- {
- bool replaces[Natts_pg_subscription_rel];
+ /* Form the tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
+ values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+ values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+ if (sublsn != InvalidXLogRecPtr)
+ values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+ else
+ nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
- /* Update the tuple. */
- memset(values, 0, sizeof(values));
- memset(nulls, false, sizeof(nulls));
- memset(replaces, false, sizeof(replaces));
+ tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
- replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
- values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+ /* Insert tuple into catalog. */
+ subrelid = CatalogTupleInsert(rel, tup);
- replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
- if (sublsn != InvalidXLogRecPtr)
- values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
- else
- nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+ heap_freetuple(tup);
- tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
- replaces);
+ /* Cleanup. */
+ heap_close(rel, NoLock);
- /* Update the catalog. */
- CatalogTupleUpdate(rel, &tup->t_self, tup);
+ return subrelid;
+}
- subrelid = HeapTupleGetOid(tup);
- }
+/*
+ * Update the state of a subscription table.
+ */
+Oid
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn)
+{
+ Relation rel;
+ HeapTuple tup;
+ Oid subrelid;
+ bool nulls[Natts_pg_subscription_rel];
+ Datum values[Natts_pg_subscription_rel];
+ bool replaces[Natts_pg_subscription_rel];
+
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+ rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+ /* Try finding existing mapping. */
+ tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(subid));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "subscription table %u in subscription %u does not exist",
+ relid, subid);
+
+ /* Update the tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+ values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+ replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+ if (sublsn != InvalidXLogRecPtr)
+ values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+ else
+ nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+ replaces);
+
+ /* Update the catalog. */
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+ subrelid = HeapTupleGetOid(tup);
/* Cleanup. */
heap_close(rel, NoLock);
@@ -381,6 +396,8 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
HeapTuple tup;
int nkeys = 0;
+ Assert(OidIsValid(subid) || OidIsValid(relid));
+
rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
if (OidIsValid(subid))
@@ -404,9 +421,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
/* Do the search and delete what we found. */
scan = heap_beginscan_catalog(rel, nkeys, skey);
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
- {
CatalogTupleDelete(rel, &tup->t_self);
- }
heap_endscan(scan);
heap_close(rel, RowExclusiveLock);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9bc1d17..354d037 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
CheckSubscriptionRelkind(get_rel_relkind(relid),
rv->schemaname, rv->relname);
- SetSubscriptionRelState(subid, relid, table_state,
- InvalidXLogRecPtr, false);
+ AddSubscriptionRelState(subid, relid, table_state,
+ InvalidXLogRecPtr);
}
/*
@@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
if (!bsearch(&relid, subrel_local_oids,
list_length(subrel_states), sizeof(Oid), oid_cmp))
{
- SetSubscriptionRelState(sub->oid, relid,
+ AddSubscriptionRelState(sub->oid, relid,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
- InvalidXLogRecPtr, false);
+ InvalidXLogRecPtr);
ereport(DEBUG1,
(errmsg("table \"%s.%s\" added to subscription \"%s\"",
rv->schemaname, rv->relname, sub->name)));
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 4cca0f1..42460b3 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -298,11 +298,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
SpinLockRelease(&MyLogicalRepWorker->relmutex);
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relstate_lsn);
walrcv_endstreaming(wrconn, &tli);
finish_sync_worker();
@@ -427,9 +426,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
StartTransactionCommand();
started_tx = true;
}
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- rstate->relid, rstate->state,
- rstate->lsn, true);
+
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ rstate->relid, rstate->state,
+ rstate->lsn);
}
}
else
@@ -870,11 +870,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
/* Update the state and make it visible to others. */
StartTransactionCommand();
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relstate_lsn);
CommitTransactionCommand();
pgstat_report_stat(false);
@@ -961,11 +960,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* Update the new state in catalog. No need to bother
* with the shmem state as we are exiting for good.
*/
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- SUBREL_STATE_SYNCDONE,
- *origin_startpos,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ SUBREL_STATE_SYNCDONE,
+ *origin_startpos);
finish_sync_worker();
}
break;
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 991ca9d..c5b0b9c 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -70,8 +70,10 @@ typedef struct SubscriptionRelState
char state;
} SubscriptionRelState;
-extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn, bool update_only);
+extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn);
+extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn);
extern char GetSubscriptionRelState(Oid subid, Oid relid,
XLogRecPtr *sublsn, bool missing_ok);
extern void RemoveSubscriptionRel(Oid subid, Oid relid);
--
1.7.1
0003-Allow-syscache-access-to-subscriptions-in-database-l_v2.patchapplication/octet-stream; name=0003-Allow-syscache-access-to-subscriptions-in-database-l_v2.patchDownload
From b503f9fd6badae779f80e27b16821767ddc24b93 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 17 Aug 2017 14:03:58 -0700
Subject: [PATCH 3/4] Allow syscache access to subscriptions in database-less processes
---
src/backend/utils/cache/catcache.c | 6 ++++--
1 files changed, 4 insertions(+), 2 deletions(-)
diff --git a/src/backend/utils/cache/catcache.c b/src/backend/utils/cache/catcache.c
index f894053..290279f 100644
--- a/src/backend/utils/cache/catcache.c
+++ b/src/backend/utils/cache/catcache.c
@@ -1001,10 +1001,12 @@ IndexScanOK(CatCache *cache, ScanKey cur_skey)
case AUTHNAME:
case AUTHOID:
case AUTHMEMMEMROLE:
+ case SUBSCRIPTIONOID:
+ case SUBSCRIPTIONNAME:
/*
- * Protect authentication lookups occurring before relcache has
- * collected entries for shared indexes.
+ * Protect authentication an dsubscription lookups occurring
+ * before relcache has collected entries for shared indexes.
*/
if (!criticalSharedRelcachesBuilt)
return false;
--
1.7.1
0004-Improve-locking-for-subscriptions-and-subscribed-rel_v2.patchapplication/octet-stream; name=0004-Improve-locking-for-subscriptions-and-subscribed-rel_v2.patchDownload
From 35d700dabbb583a2496b16ad7bb5a49a54d97f0d Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 17 Aug 2017 14:13:29 -0700
Subject: [PATCH 4/4] Improve locking for subscriptions and subscribed relations
---
src/backend/commands/subscriptioncmds.c | 57 ++-----
src/backend/replication/logical/launcher.c | 228 +++++++++++----------------
src/backend/replication/logical/tablesync.c | 63 +++++---
src/include/replication/worker_internal.h | 5 +-
4 files changed, 145 insertions(+), 208 deletions(-)
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 354d037..85b97c3 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -633,12 +633,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
errmsg("subscription \"%s\" does not exist",
stmt->subname)));
+ subid = HeapTupleGetOid(tup);
+
/* must be owner */
- if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+ if (!pg_subscription_ownercheck(subid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
stmt->subname);
- subid = HeapTupleGetOid(tup);
sub = GetSubscription(subid, false);
/* Lock the subscription so nobody else can do anything with it. */
@@ -811,16 +812,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
ObjectAddress myself;
HeapTuple tup;
Oid subid;
- Datum datum;
- bool isnull;
- char *subname;
- char *conninfo;
- char *slotname;
List *subworkers;
ListCell *lc;
char originname[NAMEDATALEN];
char *err = NULL;
RepOriginId originid;
+ Subscription *sub;
WalReceiverConn *wrconn = NULL;
StringInfoData cmd;
@@ -828,7 +825,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* Lock pg_subscription with AccessExclusiveLock to ensure that the
* launcher doesn't restart new worker during dropping the subscription
*/
- rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
+ rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
CStringGetDatum(stmt->subname));
@@ -860,31 +857,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/* DROP hook for the subscription being removed */
InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
- /*
- * Lock the subscription so nobody else can do anything with it (including
- * the replication workers).
- */
- LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+ sub = GetSubscription(subid, false);
- /* Get subname */
- datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
- Anum_pg_subscription_subname, &isnull);
- Assert(!isnull);
- subname = pstrdup(NameStr(*DatumGetName(datum)));
-
- /* Get conninfo */
- datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
- Anum_pg_subscription_subconninfo, &isnull);
- Assert(!isnull);
- conninfo = TextDatumGetCString(datum);
-
- /* Get slotname */
- datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
- Anum_pg_subscription_subslotname, &isnull);
- if (!isnull)
- slotname = pstrdup(NameStr(*DatumGetName(datum)));
- else
- slotname = NULL;
+ /* Lock the subscription so nobody else can do anything with it. */
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
/*
* Since dropping a replication slot is not transactional, the replication
@@ -896,7 +872,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* of a subscription that is associated with a replication slot", but we
* don't have the proper facilities for that.
*/
- if (slotname)
+ if (sub->slotname)
PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
@@ -923,7 +899,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
{
LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
- if (slotname)
+ if (sub->slotname)
logicalrep_worker_stop(w->subid, w->relid);
else
logicalrep_worker_stop_at_commit(w->subid, w->relid);
@@ -946,7 +922,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* If there is no slot associated with the subscription, we can finish
* here.
*/
- if (!slotname)
+ if (!sub->slotname)
{
heap_close(rel, NoLock);
return;
@@ -959,13 +935,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
load_file("libpqwalreceiver", false);
initStringInfo(&cmd);
- appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname));
+ appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s",
+ quote_identifier(sub->slotname));
- wrconn = walrcv_connect(conninfo, true, subname, &err);
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
if (wrconn == NULL)
ereport(ERROR,
(errmsg("could not connect to publisher when attempting to "
- "drop the replication slot \"%s\"", slotname),
+ "drop the replication slot \"%s\"", sub->slotname),
errdetail("The error was: %s", err),
errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
"to disassociate the subscription from the slot.")));
@@ -979,12 +956,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
(errmsg("could not drop the replication slot \"%s\" on publisher",
- slotname),
+ sub->slotname),
errdetail("The error was: %s", res->err)));
else
ereport(NOTICE,
(errmsg("dropped replication slot \"%s\" on publisher",
- slotname)));
+ sub->slotname)));
walrcv_clear_result(res);
}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 6c89442..e9bc9af 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -42,6 +42,7 @@
#include "replication/worker_internal.h"
#include "storage/ipc.h"
+#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
@@ -94,33 +95,23 @@ static bool on_commit_launcher_wakeup = false;
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
-
/*
- * Load the list of subscriptions.
- *
- * Only the fields interesting for worker start/stop functions are filled for
- * each subscription.
+ * Load the list of emabled subscription oids.
*/
static List *
-get_subscription_list(void)
+get_subscription_oids(void)
{
List *res = NIL;
Relation rel;
HeapScanDesc scan;
HeapTuple tup;
- MemoryContext resultcxt;
-
- /* This is the context that we will allocate our output data in */
- resultcxt = CurrentMemoryContext;
/*
- * Start a transaction so we can access pg_database, and get a snapshot.
* We don't have a use for the snapshot itself, but we're interested in
* the secondary effect that it sets RecentGlobalXmin. (This is critical
* for anything that reads heap pages, because HOT may decide to prune
* them even if the process doesn't attempt to modify any tuples.)
*/
- StartTransactionCommand();
(void) GetTransactionSnapshot();
rel = heap_open(SubscriptionRelationId, AccessShareLock);
@@ -129,34 +120,17 @@ get_subscription_list(void)
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
{
Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
- Subscription *sub;
- MemoryContext oldcxt;
- /*
- * Allocate our results in the caller's context, not the
- * transaction's. We do this inside the loop, and restore the original
- * context at the end, so that leaky things like heap_getnext() are
- * not called in a potentially long-lived context.
- */
- oldcxt = MemoryContextSwitchTo(resultcxt);
-
- sub = (Subscription *) palloc0(sizeof(Subscription));
- sub->oid = HeapTupleGetOid(tup);
- sub->dbid = subform->subdbid;
- sub->owner = subform->subowner;
- sub->enabled = subform->subenabled;
- sub->name = pstrdup(NameStr(subform->subname));
- /* We don't fill fields we are not interested in. */
-
- res = lappend(res, sub);
- MemoryContextSwitchTo(oldcxt);
+ /* We only care about enabled subscriptions. */
+ if (!subform->subenabled)
+ continue;
+
+ res = lappend_oid(res, HeapTupleGetOid(tup));
}
heap_endscan(scan);
heap_close(rel, AccessShareLock);
- CommitTransactionCommand();
-
return res;
}
@@ -282,23 +256,38 @@ logicalrep_workers_find(Oid subid, bool only_running)
}
/*
- * Start new apply background worker.
+ * Start new logical replication background worker.
*/
void
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
- Oid relid)
+logicalrep_worker_launch(Oid subid, Oid relid)
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
int i;
int slot = 0;
- LogicalRepWorker *worker = NULL;
- int nsyncworkers;
+ List *subworkers;
+ ListCell *lc;
TimestampTz now;
+ int nsyncworkers = 0;
+ Subscription *sub;
+ LogicalRepWorker *worker = NULL;
ereport(DEBUG1,
- (errmsg("starting logical replication worker for subscription \"%s\"",
- subname)));
+ (errmsg("starting logical replication worker for subscription %u",
+ subid)));
+
+ /* Block any concurrent DDL on the subscription. */
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+ /* Get info about subscription. */
+ sub = GetSubscription(subid, true);
+ if (!sub)
+ {
+ ereport(DEBUG1,
+ (errmsg("subscription %u not found, not starting worker for it",
+ subid)));
+ return;
+ }
/* Report this after the initial starting message for consistency. */
if (max_replication_slots == 0)
@@ -326,7 +315,14 @@ retry:
}
}
- nsyncworkers = logicalrep_sync_worker_count(subid);
+ subworkers = logicalrep_workers_find(subid, false);
+ foreach (lc, subworkers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+ if (w->relid != InvalidOid)
+ nsyncworkers ++;
+ }
+ list_free(subworkers);
now = GetCurrentTimestamp();
@@ -372,6 +368,7 @@ retry:
if (nsyncworkers >= max_sync_workers_per_subscription)
{
LWLockRelease(LogicalRepWorkerLock);
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
return;
}
@@ -382,6 +379,7 @@ retry:
if (worker == NULL)
{
LWLockRelease(LogicalRepWorkerLock);
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of logical replication worker slots"),
@@ -394,8 +392,8 @@ retry:
worker->in_use = true;
worker->generation++;
worker->proc = NULL;
- worker->dbid = dbid;
- worker->userid = userid;
+ worker->dbid = sub->dbid;
+ worker->userid = sub->owner;
worker->subid = subid;
worker->relid = relid;
worker->relstate = SUBREL_STATE_UNKNOWN;
@@ -406,8 +404,6 @@ retry:
worker->reply_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->reply_time);
- LWLockRelease(LogicalRepWorkerLock);
-
/* Register the new dynamic worker. */
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
@@ -426,8 +422,13 @@ retry:
bgw.bgw_notify_pid = MyProcPid;
bgw.bgw_main_arg = Int32GetDatum(slot);
+ /* Try to register the worker and cleanup in case of failure. */
if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
{
+ logicalrep_worker_cleanup(worker);
+ LWLockRelease(LogicalRepWorkerLock);
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"),
@@ -435,13 +436,24 @@ retry:
return;
}
+ /* Done with the worker array. */
+ LWLockRelease(LogicalRepWorkerLock);
+
/* Now wait until it attaches. */
WaitForReplicationWorkerAttach(worker, bgw_handle);
+
+ /*
+ * Worker either started or died, in any case we are done with the
+ * subscription.
+ */
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
}
/*
* Stop the logical replication worker for subid/relid, if any, and wait until
* it detaches from the slot.
+ *
+ * Callers of this function better have exclusive lock on the subscription.
*/
void
logicalrep_worker_stop(Oid subid, Oid relid)
@@ -449,7 +461,8 @@ logicalrep_worker_stop(Oid subid, Oid relid)
LogicalRepWorker *worker;
uint16 generation;
- LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ /* Exclusive is needed for logicalrep_worker_cleanup(). */
+ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
worker = logicalrep_worker_find(subid, relid, false);
@@ -460,56 +473,20 @@ logicalrep_worker_stop(Oid subid, Oid relid)
return;
}
+ /* If there is worker but it's not running, clean it up. */
+ if (!worker->proc)
+ {
+ logicalrep_worker_cleanup(worker);
+ LWLockRelease(LogicalRepWorkerLock);
+ return;
+ }
+
/*
* Remember which generation was our worker so we can check if what we see
* is still the same one.
*/
generation = worker->generation;
- /*
- * If we found a worker but it does not have proc set then it is still
- * starting up; wait for it to finish starting and then kill it.
- */
- while (worker->in_use && !worker->proc)
- {
- int rc;
-
- LWLockRelease(LogicalRepWorkerLock);
-
- /* Wait a bit --- we don't expect to have to wait long. */
- rc = WaitLatch(MyLatch,
- WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
- 10L, WAIT_EVENT_BGWORKER_STARTUP);
-
- /* emergency bailout if postmaster has died */
- if (rc & WL_POSTMASTER_DEATH)
- proc_exit(1);
-
- if (rc & WL_LATCH_SET)
- {
- ResetLatch(MyLatch);
- CHECK_FOR_INTERRUPTS();
- }
-
- /* Recheck worker status. */
- LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-
- /*
- * Check whether the worker slot is no longer used, which would mean
- * that the worker has exited, or whether the worker generation is
- * different, meaning that a different worker has taken the slot.
- */
- if (!worker->in_use || worker->generation != generation)
- {
- LWLockRelease(LogicalRepWorkerLock);
- return;
- }
-
- /* Worker has assigned proc, so it has started. */
- if (worker->proc)
- break;
- }
-
/* Now terminate the worker ... */
kill(worker->proc->pid, SIGTERM);
@@ -539,6 +516,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
CHECK_FOR_INTERRUPTS();
}
+ /*
+ * Shared lock is enough for the loop as we don't need to do the slot
+ * cleanup because at this point we know that the worker has attached
+ * to the shmem and will clean the slot on detach automatically.
+ */
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
}
@@ -706,30 +688,6 @@ logicalrep_launcher_sighup(SIGNAL_ARGS)
}
/*
- * Count the number of registered (not necessarily running) sync workers
- * for a subscription.
- */
-int
-logicalrep_sync_worker_count(Oid subid)
-{
- int i;
- int res = 0;
-
- Assert(LWLockHeldByMe(LogicalRepWorkerLock));
-
- /* Search for attached worker for a given subscription id. */
- for (i = 0; i < max_logical_replication_workers; i++)
- {
- LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-
- if (w->subid == subid && OidIsValid(w->relid))
- res++;
- }
-
- return res;
-}
-
-/*
* ApplyLauncherShmemSize
* Compute space needed for replication launcher shared memory
*/
@@ -899,8 +857,6 @@ ApplyLauncherMain(Datum main_arg)
int rc;
List *sublist;
ListCell *lc;
- MemoryContext subctx;
- MemoryContext oldctx;
TimestampTz now;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
@@ -912,28 +868,29 @@ ApplyLauncherMain(Datum main_arg)
if (TimestampDifferenceExceeds(last_start_time, now,
wal_retrieve_retry_interval))
{
- /* Use temporary context for the database list and worker info. */
- subctx = AllocSetContextCreate(TopMemoryContext,
- "Logical Replication Launcher sublist",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
- oldctx = MemoryContextSwitchTo(subctx);
-
- /* search for subscriptions to start or stop. */
- sublist = get_subscription_list();
-
- /* Start the missing workers for enabled subscriptions. */
+ /*
+ * Start new transaction so that we can take locks and snapshots.
+ *
+ * Any allocations will also be made inside the transaction memory
+ * context.
+ */
+ StartTransactionCommand();
+
+ /* Search for subscriptions to start. */
+ sublist = get_subscription_oids();
+
+ /* Start the missing workers. */
foreach(lc, sublist)
{
- Subscription *sub = (Subscription *) lfirst(lc);
+ Oid subid = lfirst_oid(lc);
+ Subscription *sub = GetSubscription(subid, false);
LogicalRepWorker *w;
if (!sub->enabled)
continue;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+ w = logicalrep_worker_find(subid, InvalidOid, false);
LWLockRelease(LogicalRepWorkerLock);
if (w == NULL)
@@ -941,15 +898,12 @@ ApplyLauncherMain(Datum main_arg)
last_start_time = now;
wait_time = wal_retrieve_retry_interval;
- logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
- sub->owner, InvalidOid);
+ /* Start the worker. */
+ logicalrep_worker_launch(subid, InvalidOid);
}
}
- /* Switch back to original memory context. */
- MemoryContextSwitchTo(oldctx);
- /* Clean the temporary memory. */
- MemoryContextDelete(subctx);
+ CommitTransactionCommand();
}
else
{
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 42460b3..fdc3515 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -102,12 +102,13 @@
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
-#include "utils/snapmgr.h"
#include "storage/ipc.h"
+#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/snapmgr.h"
static bool table_states_valid = false;
@@ -211,10 +212,8 @@ wait_for_relation_state_change(Oid relid, char expected_state)
* worker to the expected one.
*
* Used when transitioning from SYNCWAIT state to CATCHUP.
- *
- * Returns false if the apply worker has disappeared.
*/
-static bool
+static void
wait_for_worker_state_change(char expected_state)
{
int rc;
@@ -230,7 +229,7 @@ wait_for_worker_state_change(char expected_state)
* enough to not give a misleading answer if we do it with no lock.)
*/
if (MyLogicalRepWorker->relstate == expected_state)
- return true;
+ return;
/*
* Bail out if the apply worker has died, else signal it we're
@@ -243,7 +242,10 @@ wait_for_worker_state_change(char expected_state)
logicalrep_worker_wakeup_ptr(worker);
LWLockRelease(LogicalRepWorkerLock);
if (!worker)
- break;
+ ereport(FATAL,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("terminating logical replication synchronization "
+ "worker due to subscription apply worker exit")));
/*
* Wait. We expect to get a latch signal back from the apply worker,
@@ -260,8 +262,6 @@ wait_for_worker_state_change(char expected_state)
if (rc & WL_LATCH_SET)
ResetLatch(MyLatch);
}
-
- return false;
}
/*
@@ -346,6 +346,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Assert(!IsTransactionState());
+#define ensure_transaction_and_lock() \
+ if (!started_tx) \
+ {\
+ StartTransactionCommand(); \
+ LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, \
+ AccessShareLock); \
+ started_tx = true; \
+ }
+
/* We need up-to-date sync state info for subscription tables here. */
if (!table_states_valid)
{
@@ -358,8 +367,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
list_free_deep(table_states);
table_states = NIL;
- StartTransactionCommand();
- started_tx = true;
+ ensure_transaction_and_lock();
/* Fetch all non-ready tables. */
rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
@@ -421,11 +429,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
{
rstate->state = SUBREL_STATE_READY;
rstate->lsn = current_lsn;
- if (!started_tx)
- {
- StartTransactionCommand();
- started_tx = true;
- }
+
+ ensure_transaction_and_lock();
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state,
@@ -476,12 +481,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* Enter busy loop and wait for synchronization worker to
* reach expected state (or die trying).
*/
- if (!started_tx)
- {
- StartTransactionCommand();
- started_tx = true;
- }
-
+ ensure_transaction_and_lock();
wait_for_relation_state_change(rstate->relid,
SUBREL_STATE_SYNCDONE);
}
@@ -495,8 +495,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* running sync workers for this subscription, while we have
* the lock.
*/
- int nsyncworkers =
- logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+ List *subworkers;
+ ListCell *lc;
+ int nsyncworkers = 0;
+
+ subworkers = logicalrep_workers_find(MySubscription->oid,
+ false);
+ foreach (lc, subworkers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+ if (w->relid != InvalidOid)
+ nsyncworkers ++;
+ }
+ list_free(subworkers);
/* Now safe to release the LWLock */
LWLockRelease(LogicalRepWorkerLock);
@@ -518,10 +529,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
TimestampDifferenceExceeds(hentry->last_start_time, now,
wal_retrieve_retry_interval))
{
- logicalrep_worker_launch(MyLogicalRepWorker->dbid,
- MySubscription->oid,
- MySubscription->name,
- MyLogicalRepWorker->userid,
+ ensure_transaction_and_lock();
+ logicalrep_worker_launch(MySubscription->oid,
rstate->relid);
hentry->last_start_time = now;
}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 7b8728c..c82396d 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -72,15 +72,12 @@ extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
extern List *logicalrep_workers_find(Oid subid, bool only_running);
-extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
- Oid userid, Oid relid);
+extern void logicalrep_worker_launch(Oid dbid, Oid subid);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
-extern int logicalrep_sync_worker_count(Oid subid);
-
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
void process_syncing_tables(XLogRecPtr current_lsn);
void invalidate_syncing_table_states(Datum arg, int cacheid,
--
1.7.1
On Thu, Aug 17, 2017 at 8:17 PM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Thu, Aug 17, 2017 at 9:10 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:On 8/8/17 05:58, Masahiko Sawada wrote:
Are you planning to work on remaining patches 0005 and 0006 that
improve the subscription codes in PG11 cycle? If not, I will take over
them and work on the next CF.Based on your assessment, the remaining patches were not required bug
fixes. So I think preparing them for the next commit fest would be great.Thank you for the comment.
After more thought, since 0001 and 0003 patches on the first mail also
improve the subscription codes and are worth to be considered, I
picked total 4 patches up and updated them. I'm planning to work on
these patches in the next CF.
Added this item to the next commit fest.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, Aug 18, 2017 at 2:09 PM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Thu, Aug 17, 2017 at 8:17 PM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Thu, Aug 17, 2017 at 9:10 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:On 8/8/17 05:58, Masahiko Sawada wrote:
Are you planning to work on remaining patches 0005 and 0006 that
improve the subscription codes in PG11 cycle? If not, I will take over
them and work on the next CF.Based on your assessment, the remaining patches were not required bug
fixes. So I think preparing them for the next commit fest would be great.Thank you for the comment.
After more thought, since 0001 and 0003 patches on the first mail also
improve the subscription codes and are worth to be considered, I
picked total 4 patches up and updated them. I'm planning to work on
these patches in the next CF.Added this item to the next commit fest.
The patch set fails to apply. Please provide a rebased version. I am
moving this entry to next CF with waiting on author as status.
--
Michael
Greetings,
* Michael Paquier (michael.paquier@gmail.com) wrote:
On Fri, Aug 18, 2017 at 2:09 PM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Thu, Aug 17, 2017 at 8:17 PM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Thu, Aug 17, 2017 at 9:10 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:On 8/8/17 05:58, Masahiko Sawada wrote:
Are you planning to work on remaining patches 0005 and 0006 that
improve the subscription codes in PG11 cycle? If not, I will take over
them and work on the next CF.Based on your assessment, the remaining patches were not required bug
fixes. So I think preparing them for the next commit fest would be great.Thank you for the comment.
After more thought, since 0001 and 0003 patches on the first mail also
improve the subscription codes and are worth to be considered, I
picked total 4 patches up and updated them. I'm planning to work on
these patches in the next CF.Added this item to the next commit fest.
The patch set fails to apply. Please provide a rebased version. I am
moving this entry to next CF with waiting on author as status.
Masahiko Sawada, this patch is still in Waiting on Author and hasn't
progressed in a very long time. Is there any chance you'll be able to
provide an updated patch soon for review? Or should this patch be
closed out?
Thanks!
Stephen
On Tue, Jan 23, 2018 at 7:58 AM, Stephen Frost <sfrost@snowman.net> wrote:
Greetings,
* Michael Paquier (michael.paquier@gmail.com) wrote:
On Fri, Aug 18, 2017 at 2:09 PM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Thu, Aug 17, 2017 at 8:17 PM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Thu, Aug 17, 2017 at 9:10 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:On 8/8/17 05:58, Masahiko Sawada wrote:
Are you planning to work on remaining patches 0005 and 0006 that
improve the subscription codes in PG11 cycle? If not, I will take over
them and work on the next CF.Based on your assessment, the remaining patches were not required bug
fixes. So I think preparing them for the next commit fest would be great.Thank you for the comment.
After more thought, since 0001 and 0003 patches on the first mail also
improve the subscription codes and are worth to be considered, I
picked total 4 patches up and updated them. I'm planning to work on
these patches in the next CF.Added this item to the next commit fest.
The patch set fails to apply. Please provide a rebased version. I am
moving this entry to next CF with waiting on author as status.Masahiko Sawada, this patch is still in Waiting on Author and hasn't
progressed in a very long time. Is there any chance you'll be able to
provide an updated patch soon for review? Or should this patch be
closed out?
Thank you for notification. Since it seems to me that no one is
interested in this patch, it would be better to close out this patch.
This patch is a refactoring patch but subscription code seems to work
fine so far. If a problem appears around subscriptions, I might
propose it again.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On 1/24/18 02:33, Masahiko Sawada wrote:
Thank you for notification. Since it seems to me that no one is
interested in this patch, it would be better to close out this patch.
This patch is a refactoring patch but subscription code seems to work
fine so far. If a problem appears around subscriptions, I might
propose it again.
I have looked at the patches again. They seem generally reasonable, but
I don't see quite why we want or need them. More details would help
review them. Do they fix any bugs, or are they just rearranging code?
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Fri, Jan 26, 2018 at 11:41 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:
On 1/24/18 02:33, Masahiko Sawada wrote:
Thank you for notification. Since it seems to me that no one is
interested in this patch, it would be better to close out this patch.
This patch is a refactoring patch but subscription code seems to work
fine so far. If a problem appears around subscriptions, I might
propose it again.I have looked at the patches again.
Thank you for looking at this.
They seem generally reasonable, but
I don't see quite why we want or need them. More details would help
review them. Do they fix any bugs, or are they just rearranging code?
0002 patch rearranges the code. Currently SetSubscriptionRelState()
not only update but also register a record, and it is controlled by
update_only argument. This patch splits SetSubscriptionRelState() into
AddSubscriptionRelState() and UpdateSubscriptionRelstate(). 0001, 0003
and 0004 patch are improvement patches. 0001 patch improves messaging
during worker startup. 0004 patch, which requires 0003 patch, patch
reduce the lock level for DROP SUBSCRIPTION to RowExclusiveLock.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Hi Masahiko,
On 1/30/18 5:00 AM, Masahiko Sawada wrote:
On Fri, Jan 26, 2018 at 11:41 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:On 1/24/18 02:33, Masahiko Sawada wrote:
Thank you for notification. Since it seems to me that no one is
interested in this patch, it would be better to close out this patch.
This patch is a refactoring patch but subscription code seems to work
fine so far. If a problem appears around subscriptions, I might
propose it again.
This patch is still in the Waiting for Author state but it looks like
you intended to close it. Should I do so now?
Thanks,
--
-David
david@pgmasters.net
On Tue, Mar 6, 2018 at 11:17 PM, David Steele <david@pgmasters.net> wrote:
Hi Masahiko,
On 1/30/18 5:00 AM, Masahiko Sawada wrote:
On Fri, Jan 26, 2018 at 11:41 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:On 1/24/18 02:33, Masahiko Sawada wrote:
Thank you for notification. Since it seems to me that no one is
interested in this patch, it would be better to close out this patch.
This patch is a refactoring patch but subscription code seems to work
fine so far. If a problem appears around subscriptions, I might
propose it again.This patch is still in the Waiting for Author state but it looks like
you intended to close it. Should I do so now?
Uh, actually three(0001 - 0002) of four patches applies cleanly to
current HEAD, and I think we can regards them as "Needs Review". Only
0003 and 0004 patch are under "Waiting on Author". Since 0001 and 0002
are very simple I hope these patch get reviewed. It was a my fault to
get different patches into one CF entry.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
0001:
there are a bunch of other messages of the same ilk in the file. I
don't like how the current messages are worded; maybe Peter or Petr had
some reason why they're like that, but I would have split out the reason
for not starting or stopping into errdetail. Something like
errmsg("logical replication apply worker for subscription \"%s\" will not start", ...)
errdetail("Subscription has been disabled during startup.")
But I think we should change all these messages in unison rather than
only one of them.
Now, your patch changes "will not start" to "will stop". But is that
correct? ISTM that this happens when a worker is starting and
determines that it is not needed. So "will not start" is more correct.
"Will stop" would be correct if the worker had been running for some
time and suddenly decided to terminate, but that doesn't seem to be the
case, unless I misread the code.
Your patch also changes "disabled during startup" to just "disabled".
Is that a useful change? ISTM that if the subscription had been
disabled prior to startup, then the worker would not have started at
all, so we would not be seeing this message in the first place. Again,
maybe I am misreading the code? Please explain.
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
0002 looks like a good improvement to me. The existing routine is
messy, and apparently it's so just to save one LockSharedObject plus
cache lookup; IMO it's not worth it. Patched code looks simpler. If
there are cases where having the combined behavior is useful, it's not
clear what they are. (If I understand correctly, the reason is that a
sync worker could try to insert-or-update the row after some other
process deleted it [because of removing the table from subscription?]
... but that seems to work out *simpler* with the new code. So what's
up?)
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On 3/7/18 7:41 AM, Masahiko Sawada wrote:
On Tue, Mar 6, 2018 at 11:17 PM, David Steele <david@pgmasters.net> wrote:
Hi Masahiko,
On 1/30/18 5:00 AM, Masahiko Sawada wrote:
On Fri, Jan 26, 2018 at 11:41 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:On 1/24/18 02:33, Masahiko Sawada wrote:
Thank you for notification. Since it seems to me that no one is
interested in this patch, it would be better to close out this patch.
This patch is a refactoring patch but subscription code seems to work
fine so far. If a problem appears around subscriptions, I might
propose it again.This patch is still in the Waiting for Author state but it looks like
you intended to close it. Should I do so now?Uh, actually three(0001 - 0002) of four patches applies cleanly to
current HEAD, and I think we can regards them as "Needs Review". Only
0003 and 0004 patch are under "Waiting on Author". Since 0001 and 0002
are very simple I hope these patch get reviewed. It was a my fault to
get different patches into one CF entry.
I rather doubt you are going to attract any review as long is this stays
in Waiting on Author state -- which I notice it has been in since the
end of the November CF. That is reason enough to return it since we
have been taking a pretty firm stance on patches that have not been
updated for the CF.
I'm marking this submission Returned with Feedback.
Regards,
--
-David
david@pgmasters.net
David Steele wrote:
I'm marking this submission Returned with Feedback.
Not yet please.
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On 3/7/18 9:37 AM, Alvaro Herrera wrote:
David Steele wrote:
I'm marking this submission Returned with Feedback.
Not yet please.
Back to Waiting on Author state.
Regards,
--
-David
david@pgmasters.net
On Wed, Mar 7, 2018 at 11:13 PM, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
0001:
there are a bunch of other messages of the same ilk in the file. I
don't like how the current messages are worded; maybe Peter or Petr had
some reason why they're like that, but I would have split out the reason
for not starting or stopping into errdetail. Something likeerrmsg("logical replication apply worker for subscription \"%s\" will not start", ...)
errdetail("Subscription has been disabled during startup.")But I think we should change all these messages in unison rather than
only one of them.Now, your patch changes "will not start" to "will stop". But is that
correct? ISTM that this happens when a worker is starting and
determines that it is not needed. So "will not start" is more correct.
"Will stop" would be correct if the worker had been running for some
time and suddenly decided to terminate, but that doesn't seem to be the
case, unless I misread the code.Your patch also changes "disabled during startup" to just "disabled".
Is that a useful change? ISTM that if the subscription had been
disabled prior to startup, then the worker would not have started at
all, so we would not be seeing this message in the first place. Again,
maybe I am misreading the code? Please explain.
I think you're not misreading the code. I agree with you. "will not
start" and "disabled during startup" would be more appropriate here.
But Petr might have other opinion on this. Also I noticed I overlooked
one change of v1 patch proposed by Petr. Attached a new patch
incorporated your review comment and the hunk I overlooked.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
0001-Improve-messaging-during-logical-replication-worker-_v3.patchapplication/octet-stream; name=0001-Improve-messaging-during-logical-replication-worker-_v3.patchDownload
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 04985c9..f8785b0 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1533,24 +1533,32 @@ ApplyWorkerMain(Datum main_arg)
ALLOCSET_DEFAULT_SIZES);
StartTransactionCommand();
oldctx = MemoryContextSwitchTo(ApplyContext);
- MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
+ MySubscription = GetSubscription(MyLogicalRepWorker->subid, ture);
+
+ if (!MySubscription)
+ {
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription %u will not start because the subscription was removed during startup",
+ MyLogicalRepWorker->subid)));
+ proc_exit(0);
+ }
+
MySubscriptionValid = true;
MemoryContextSwitchTo(oldctx);
- /* Setup synchronous commit according to the user's wishes */
- SetConfigOption("synchronous_commit", MySubscription->synccommit,
- PGC_BACKEND, PGC_S_OVERRIDE);
-
if (!MySubscription->enabled)
{
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will not "
- "start because the subscription was disabled during startup",
+ (errmsg("logical replication apply worker for subscription \"%s\" will not start because the subscription was disabled during startup",
MySubscription->name)));
proc_exit(0);
}
+ /* Setup synchronous commit according to the user's wishes */
+ SetConfigOption("synchronous_commit", MySubscription->synccommit,
+ PGC_BACKEND, PGC_S_OVERRIDE);
+
/* Keep us informed about subscription changes. */
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
subscription_change_cb,
On Wed, Mar 7, 2018 at 11:13 PM, Alvaro Herrera <alvherre@alvh.no-ip.org>
wrote:
0002 looks like a good improvement to me. The existing routine is
messy, and apparently it's so just to save one LockSharedObject plus
cache lookup; IMO it's not worth it. Patched code looks simpler. If
there are cases where having the combined behavior is useful, it's not
clear what they are. (If I understand correctly, the reason is that a
sync worker could try to insert-or-update the row after some other
process deleted it [because of removing the table from subscription?]
... but that seems to work out *simpler* with the new code. So what's
up?)
The function calling SetSubscriptionRelState with update_only=false (i.g.
going to do insert-or-update) is two function: CreateSubscription() and
AlterSubscription_refresh(). AFAICS these two function actually doesn't
need such insert-or-update functionality because it doesn't happen that a
backend process creates/alters the same name subscription which already
exists. Since CreateSubscirption() inserts a heap into the system catalog
one transaction ends up with the error of key already exists if two process
tries to create the same name subscription . Similarly for
AlterSubscription_refresh(), since we acquire the AccessExclusiveLock on
the subscription object before getting the new table list in the
publication the updating a existing entry doesn't happen. So this patch
changes SetsubscriptionRelState() with update_only=fasle to
AddSubscriptionRelState() and others to UpdateSubscriptionRelState().
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
I have committed the patches
0001 Improve messaging during logical replication worker startup
0002 Split the SetSubscriptionRelState function into two
0004 doesn't apply anymore, 0003 doesn't seem to be very useful without
0004, as I understand it. It's also a bit more than I'm comfortable
reviewing or committing right now. So maybe move those to the next
commitfest or close the entry as returned with feedback?
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services