From 1ba4534a0efef3f0ff8e354a1b1192e3c50b5dff Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Mon, 6 Mar 2017 13:07:45 +0100 Subject: [PATCH] Add option to modify sync commit per subscription This also changes default behaviour of subscription workers to synchronous_commit = off --- doc/src/sgml/catalogs.sgml | 11 ++++++ doc/src/sgml/ref/alter_subscription.sgml | 1 + doc/src/sgml/ref/create_subscription.sgml | 15 ++++++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/commands/subscriptioncmds.c | 61 ++++++++++++++++++++++++------ src/backend/replication/logical/launcher.c | 2 +- src/backend/replication/logical/worker.c | 28 +++++++++++++- src/bin/pg_dump/pg_dump.c | 12 +++++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 5 ++- src/include/catalog/pg_subscription.h | 11 ++++-- src/test/regress/expected/subscription.out | 27 ++++++------- src/test/regress/sql/subscription.sql | 3 +- 13 files changed, 144 insertions(+), 34 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index daa85f2..71667bd 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -6383,6 +6383,17 @@ + subsynccommit + bool + + + If true, the apply for the subscription will run with + synchronous_commit set to local. + Otherwise it will have it set to false. + + + + subconninfo text diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index b34386d..2bb74f3 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -26,6 +26,7 @@ ALTER SUBSCRIPTION name WITH ( where suboption can be: SLOT NAME = slot_name + | SYNCHRONOUS_COMMIT = boolean ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...] WITH ( puboption [, ... ] ) ALTER SUBSCRIPTION name REFRESH PUBLICATION WITH ( puboption [, ... ] ) diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 91127ea..97432e0 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -29,6 +29,7 @@ CREATE SUBSCRIPTION subscription_name @@ -145,6 +146,20 @@ CREATE SUBSCRIPTION subscription_name + SYNCHRONOUS_COMMIT = boolean + + + Modifies the synchronous_commit setting of the + subscription workers. When set to true, the + synchronous_commit for the worker will be set to + local otherwise to false. The + default value is false independently of the default + synchronous_commit setting for the instance. + + + + + NOCONNECT diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 8850b7e..4dd527d 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -68,6 +68,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->name = pstrdup(NameStr(subform->subname)); sub->owner = subform->subowner; sub->enabled = subform->subenabled; + sub->synccommit = subform->subsynccommit; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index ef33ece..39c0f11 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -56,12 +56,13 @@ static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, char **slot_name, - bool *copy_data) + bool *copy_data, bool *synchronous_commit) { ListCell *lc; bool connect_given = false; bool create_slot_given = false; bool copy_data_given = false; + bool synchronous_commit_given = false; if (connect) *connect = true; @@ -76,6 +77,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *slot_name = NULL; if (copy_data) *copy_data = true; + if (synchronous_commit) + *synchronous_commit = false; /* Parse options */ foreach (lc, options) @@ -161,6 +164,26 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, copy_data_given = true; *copy_data = !defGetBoolean(defel); } + else if (strcmp(defel->defname, "synchronous_commit") == 0 && synchronous_commit) + { + if (synchronous_commit_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + synchronous_commit_given = true; + *synchronous_commit = defGetBoolean(defel); + } + else if (strcmp(defel->defname, "nosynchronous_commit") == 0 && synchronous_commit) + { + if (synchronous_commit_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + synchronous_commit_given = true; + *synchronous_commit = !defGetBoolean(defel); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -265,6 +288,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool enabled_given; bool enabled; bool copy_data; + bool synchronous_commit; char *conninfo; char *slotname; char originname[NAMEDATALEN]; @@ -276,7 +300,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Connection and publication should not be specified here. */ parse_subscription_options(stmt->options, &connect, &enabled_given, - &enabled, &create_slot, &slotname, ©_data); + &enabled, &create_slot, &slotname, ©_data, + &synchronous_commit); /* * Since creating a replication slot is not transactional, rolling back @@ -326,6 +351,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); + values[Anum_pg_subscription_subsynccommit - 1] = + BoolGetDatum(synchronous_commit); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); values[Anum_pg_subscription_subslotname - 1] = @@ -573,14 +600,26 @@ AlterSubscription(AlterSubscriptionStmt *stmt) { case ALTER_SUBSCRIPTION_OPTIONS: { - char *slot_name; + char *slot_name; + bool synchronous_commit; + Form_pg_subscription form; - parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, &slot_name, NULL); + form = (Form_pg_subscription) GETSTRUCT(tup); + synchronous_commit = form->subsynccommit; - values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slot_name)); - replaces[Anum_pg_subscription_subslotname - 1] = true; + parse_subscription_options(stmt->options, NULL, NULL, NULL, + NULL, &slot_name, NULL, + &synchronous_commit); + + if (slot_name) + { + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slot_name)); + replaces[Anum_pg_subscription_subslotname - 1] = true; + } + values[Anum_pg_subscription_subsynccommit - 1] = + BoolGetDatum(synchronous_commit); + replaces[Anum_pg_subscription_subsynccommit - 1] = true; update_tuple = true; break; @@ -593,7 +632,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, - NULL, NULL); + NULL, NULL, NULL); Assert(enabled_given); values[Anum_pg_subscription_subenabled - 1] = @@ -617,7 +656,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) Subscription *sub = GetSubscription(subid, false); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data); + NULL, NULL, ©_data, NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -638,7 +677,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) Subscription *sub = GetSubscription(subid, false); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data); + NULL, NULL, ©_data, NULL); AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 3e724de..b536a35 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -126,7 +126,7 @@ get_subscription_list(void) */ oldcxt = MemoryContextSwitchTo(resultcxt); - sub = (Subscription *) palloc(sizeof(Subscription)); + sub = (Subscription *) palloc0(sizeof(Subscription)); sub->oid = HeapTupleGetOid(tup); sub->dbid = subform->subdbid; sub->owner = subform->subowner; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5383364..6d3a269 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1340,6 +1340,21 @@ reread_subscription(void) } /* + * We need to make new connection to new slot if slot name has changed + * so exit here as well if that's the case. + */ + if (strcmp(newsub->slotname, MySubscription->slotname) != 0) + { + ereport(LOG, + (errmsg("logical replication worker for subscription \"%s\" will " + "restart because the replication slot name was changed", + MySubscription->name))); + + walrcv_disconnect(wrconn); + proc_exit(0); + } + + /* * Exit if publication list was changed. The launcher will start * new worker. */ @@ -1371,8 +1386,7 @@ reread_subscription(void) } /* Check for other changes that should never happen too. */ - if (newsub->dbid != MySubscription->dbid || - strcmp(newsub->slotname, MySubscription->slotname) != 0) + if (newsub->dbid != MySubscription->dbid) { elog(ERROR, "subscription %u changed unexpectedly", MyLogicalRepWorker->subid); @@ -1384,6 +1398,11 @@ reread_subscription(void) MemoryContextSwitchTo(oldctx); + /* Change synchronous commit according to the user's wishes */ + SetConfigOption("synchronous_commit", + MySubscription->synccommit ? "local" : "off", + PGC_BACKEND, PGC_S_OVERRIDE); + if (started_tx) CommitTransactionCommand(); @@ -1451,6 +1470,11 @@ ApplyWorkerMain(Datum main_arg) MySubscriptionValid = true; MemoryContextSwitchTo(oldctx); + /* Setup synchronous commit according to the user's wishes */ + SetConfigOption("synchronous_commit", + MySubscription->synccommit ? "local" : "off", + PGC_BACKEND, PGC_S_OVERRIDE); + if (!MySubscription->enabled) { ereport(LOG, diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index c7876fe..7f22ac5 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3622,6 +3622,7 @@ getSubscriptions(Archive *fout) int i_subname; int i_rolname; int i_subenabled; + int i_subsynccommit; int i_subconninfo; int i_subslotname; int i_subpublications; @@ -3639,7 +3640,8 @@ getSubscriptions(Archive *fout) appendPQExpBuffer(query, "SELECT s.tableoid, s.oid, s.subname," "(%s s.subowner) AS rolname, s.subenabled, " - " s.subconninfo, s.subslotname, s.subpublications " + " s.subsynccommit, s.subconninfo, s.subslotname, " + " s.subpublications " "FROM pg_catalog.pg_subscription s " "WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database" " WHERE datname = current_database())", @@ -3653,6 +3655,7 @@ getSubscriptions(Archive *fout) i_subname = PQfnumber(res, "subname"); i_rolname = PQfnumber(res, "rolname"); i_subenabled = PQfnumber(res, "subenabled"); + i_subsynccommit = PQfnumber(res, "subsynccommit"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subpublications = PQfnumber(res, "subpublications"); @@ -3670,6 +3673,8 @@ getSubscriptions(Archive *fout) subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname)); subinfo[i].subenabled = (strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0); + subinfo[i].subsynccommit = + (strcmp(PQgetvalue(res, i, i_subsynccommit), "t") == 0); subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname)); subinfo[i].subpublications = @@ -3739,6 +3744,11 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo) else appendPQExpBufferStr(query, "DISABLED"); + if (subinfo->subsynccommit) + appendPQExpBufferStr(query, "SYNCHRONOUS_COMMIT = true"); + else + appendPQExpBufferStr(query, "SYNCHRONOUS_COMMIT = false"); + appendPQExpBufferStr(query, ", SLOT NAME = "); appendStringLiteralAH(query, subinfo->subslotname, fout); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index a466527..5934eb0 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -604,6 +604,7 @@ typedef struct _SubscriptionInfo DumpableObject dobj; char *rolname; bool subenabled; + bool subsynccommit; char *subconninfo; char *subslotname; char *subpublications; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index e2e4cbc..3d4cc84 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5078,7 +5078,8 @@ describeSubscriptions(const char *pattern, bool verbose) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; - static const bool translate_columns[] = {false, false, false, false, false}; + static const bool translate_columns[] = {false, false, false, false, + false, false}; if (pset.sversion < 100000) { @@ -5104,7 +5105,9 @@ describeSubscriptions(const char *pattern, bool verbose) if (verbose) { appendPQExpBuffer(&buf, + ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", + gettext_noop("Synchronous Commit"), gettext_noop("Conninfo")); } diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0811880..62845e9 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -39,6 +39,7 @@ CATALOG(pg_subscription,6100) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6101) BKI_SCHE bool subenabled; /* True if the subscription is enabled * (the worker should be running) */ + bool subsynccommit; /* Should apply use synchronous commit? */ #ifdef CATALOG_VARLEN /* variable-length fields start here */ text subconninfo; /* Connection string to the publisher */ @@ -54,14 +55,15 @@ typedef FormData_pg_subscription *Form_pg_subscription; * compiler constants for pg_subscription * ---------------- */ -#define Natts_pg_subscription 7 +#define Natts_pg_subscription 8 #define Anum_pg_subscription_subdbid 1 #define Anum_pg_subscription_subname 2 #define Anum_pg_subscription_subowner 3 #define Anum_pg_subscription_subenabled 4 -#define Anum_pg_subscription_subconninfo 5 -#define Anum_pg_subscription_subslotname 6 -#define Anum_pg_subscription_subpublications 7 +#define Anum_pg_subscription_subsynccommit 5 +#define Anum_pg_subscription_subconninfo 6 +#define Anum_pg_subscription_subslotname 7 +#define Anum_pg_subscription_subpublications 8 typedef struct Subscription @@ -71,6 +73,7 @@ typedef struct Subscription char *name; /* Name of the subscription */ Oid owner; /* Oid of the subscription owner */ bool enabled; /* Indicates if the subscription is enabled */ + bool synccommit; /* Indicates if apply should use synchronous commit */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ List *publications; /* List of publication names to subscribe to */ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index d2187ee..213669b 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -26,18 +26,18 @@ reset client_min_messages; CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (NOCONNECT); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Conninfo ----------+---------------------------+---------+-------------+--------------------- - testsub | regress_subscription_user | f | {testpub} | dbname=doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Synchronous Commit | Conninfo +---------+---------------------------+---------+-------------+--------------------+--------------------- + testsub | regress_subscription_user | f | {testpub} | f | dbname=doesnotexist (1 row) ALTER SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist2'; \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Conninfo ----------+---------------------------+---------+-------------+---------------------- - testsub | regress_subscription_user | f | {testpub} | dbname=doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Synchronous Commit | Conninfo +---------+---------------------------+---------+-------------+--------------------+---------------------- + testsub | regress_subscription_user | f | {testpub} | f | dbname=doesnotexist2 (1 row) BEGIN; @@ -59,11 +59,12 @@ ALTER SUBSCRIPTION testsub DISABLE; COMMIT; ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; -\dRs - List of subscriptions - Name | Owner | Enabled | Publication --------------+---------------------------+---------+------------- - testsub_foo | regress_subscription_user | f | {testpub} +ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = true); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Synchronous Commit | Conninfo +-------------+---------------------------+---------+-------------+--------------------+---------------------- + testsub_foo | regress_subscription_user | f | {testpub} | t | dbname=doesnotexist2 (1 row) -- fail - cannot do DROP SUBSCRIPTION DROP SLOT inside transaction block diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 3bdf79d..41d3055 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -40,8 +40,9 @@ ALTER SUBSCRIPTION testsub DISABLE; COMMIT; ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; +ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = true); -\dRs +\dRs+ -- fail - cannot do DROP SUBSCRIPTION DROP SLOT inside transaction block BEGIN; -- 2.7.4