From b73cb8f15c1965a5328fecf13728b4c215b9ef1c Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Tue, 1 Jun 2021 07:46:44 -0700 Subject: [PATCH] WIP refactor parse_subscription_options --- src/backend/commands/subscriptioncmds.c | 489 ++++++++++++------------ src/include/commands/subscriptioncmds.h | 32 ++ 2 files changed, 270 insertions(+), 251 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 8aa6de1785..dd782d5ee6 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -60,160 +60,165 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, * accommodate that. */ static void -parse_subscription_options(List *options, - bool *connect, - bool *enabled_given, bool *enabled, - bool *create_slot, - bool *slot_name_given, char **slot_name, - bool *copy_data, - char **synchronous_commit, - bool *refresh, - bool *binary_given, bool *binary, - bool *streaming_given, bool *streaming) +parse_subscription_options(List *stmt_options, SubOpts *opts) { ListCell *lc; - bool connect_given = false; - bool create_slot_given = false; - bool copy_data_given = false; - bool refresh_given = false; + bits32 supported_opts; + bits32 specified_opts; + SubOptVals *vals; - /* If connect is specified, the others also need to be. */ - Assert(!connect || (enabled && create_slot && copy_data)); + supported_opts = opts->supported_opts; + vals = &opts->vals; + specified_opts = SUBOPT_NONE; - if (connect) - *connect = true; - if (enabled) - { - *enabled_given = false; - *enabled = true; - } - if (create_slot) - *create_slot = true; - if (slot_name) - { - *slot_name_given = false; - *slot_name = NULL; - } - if (copy_data) - *copy_data = true; - if (synchronous_commit) - *synchronous_commit = NULL; - if (refresh) - *refresh = true; - if (binary) - { - *binary_given = false; - *binary = false; - } - if (streaming) - { - *streaming_given = false; - *streaming = false; - } + Assert(supported_opts != SUBOPT_NONE); + + /* If connect option is supported, the others also need to be. */ + Assert((supported_opts & SUBOPT_CONNECT) == 0 || + ((supported_opts & SUBOPT_ENABLED) != 0 && + (supported_opts & SUBOPT_CREATE_SLOT) != 0 && + (supported_opts & SUBOPT_COPY_DATA) != 0)); + + /* Set default values for the supported options. */ + if ((supported_opts & SUBOPT_CONNECT) != 0) + vals->connect = true; + + if ((supported_opts & SUBOPT_ENABLED) != 0) + vals->enabled = true; + + if ((supported_opts & SUBOPT_CREATE_SLOT) != 0) + vals->create_slot = true; + + if ((supported_opts & SUBOPT_SLOT_NAME) != 0) + vals->slot_name = NULL; + + if ((supported_opts & SUBOPT_COPY_DATA) != 0) + vals->copy_data = true; + + if ((supported_opts & SUBOPT_SYNCHRONOUS_COMMIT) != 0) + vals->synchronous_commit = NULL; + + if ((supported_opts & SUBOPT_REFRESH) != 0) + vals->refresh = true; + + if ((supported_opts & SUBOPT_BINARY) != 0) + vals->binary = false; + + if ((supported_opts & SUBOPT_STREAMING) != 0) + vals->streaming = false; /* Parse options */ - foreach(lc, options) + foreach(lc, stmt_options) { DefElem *defel = (DefElem *) lfirst(lc); - if (strcmp(defel->defname, "connect") == 0 && connect) + if ((supported_opts & SUBOPT_CONNECT) != 0 && + strcmp(defel->defname, "connect") == 0) { - if (connect_given) + if ((specified_opts & SUBOPT_CONNECT) != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - connect_given = true; - *connect = defGetBoolean(defel); + specified_opts |= SUBOPT_CONNECT; + vals->connect = defGetBoolean(defel); } - else if (strcmp(defel->defname, "enabled") == 0 && enabled) + else if ((supported_opts & SUBOPT_ENABLED) != 0 && + strcmp(defel->defname, "enabled") == 0) { - if (*enabled_given) + if ((specified_opts & SUBOPT_ENABLED) != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *enabled_given = true; - *enabled = defGetBoolean(defel); + specified_opts |= SUBOPT_ENABLED; + vals->enabled = defGetBoolean(defel); } - else if (strcmp(defel->defname, "create_slot") == 0 && create_slot) + else if ((supported_opts & SUBOPT_CREATE_SLOT) != 0 && + strcmp(defel->defname, "create_slot") == 0) { - if (create_slot_given) + if ((specified_opts & SUBOPT_CREATE_SLOT) != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - create_slot_given = true; - *create_slot = defGetBoolean(defel); + specified_opts |= SUBOPT_CREATE_SLOT; + vals->create_slot = defGetBoolean(defel); } - else if (strcmp(defel->defname, "slot_name") == 0 && slot_name) + else if ((supported_opts & SUBOPT_SLOT_NAME) != 0 && + strcmp(defel->defname, "slot_name") == 0) { - if (*slot_name_given) + if ((specified_opts & SUBOPT_SLOT_NAME) != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *slot_name_given = true; - *slot_name = defGetString(defel); + specified_opts |= SUBOPT_SLOT_NAME; + vals->slot_name = defGetString(defel); /* Setting slot_name = NONE is treated as no slot name. */ - if (strcmp(*slot_name, "none") == 0) - *slot_name = NULL; + if (strcmp(vals->slot_name, "none") == 0) + vals->slot_name = NULL; } - else if (strcmp(defel->defname, "copy_data") == 0 && copy_data) + else if ((supported_opts & SUBOPT_COPY_DATA) != 0 && + strcmp(defel->defname, "copy_data") == 0) { - if (copy_data_given) + if ((specified_opts & SUBOPT_COPY_DATA) != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - copy_data_given = true; - *copy_data = defGetBoolean(defel); + specified_opts |= SUBOPT_COPY_DATA; + vals->copy_data = defGetBoolean(defel); } - else if (strcmp(defel->defname, "synchronous_commit") == 0 && - synchronous_commit) + else if ((supported_opts & SUBOPT_SYNCHRONOUS_COMMIT) != 0 && + strcmp(defel->defname, "synchronous_commit") == 0) { - if (*synchronous_commit) + if ((specified_opts & SUBOPT_SYNCHRONOUS_COMMIT) != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *synchronous_commit = defGetString(defel); + specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT; + vals->synchronous_commit = defGetString(defel); /* Test if the given value is valid for synchronous_commit GUC. */ - (void) set_config_option("synchronous_commit", *synchronous_commit, + (void) set_config_option("synchronous_commit", vals->synchronous_commit, PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, false, 0, false); } - else if (strcmp(defel->defname, "refresh") == 0 && refresh) + else if ((supported_opts & SUBOPT_REFRESH) != 0 && + strcmp(defel->defname, "refresh") == 0) { - if (refresh_given) + if ((specified_opts & SUBOPT_REFRESH) != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - refresh_given = true; - *refresh = defGetBoolean(defel); + specified_opts |= SUBOPT_REFRESH; + vals->refresh = defGetBoolean(defel); } - else if (strcmp(defel->defname, "binary") == 0 && binary) + else if ((supported_opts & SUBOPT_BINARY) != 0 && + strcmp(defel->defname, "binary") == 0) { - if (*binary_given) + if ((specified_opts & SUBOPT_BINARY) != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *binary_given = true; - *binary = defGetBoolean(defel); + specified_opts |= SUBOPT_BINARY; + vals->binary = defGetBoolean(defel); } - else if (strcmp(defel->defname, "streaming") == 0 && streaming) + else if ((supported_opts & SUBOPT_STREAMING) != 0 && + strcmp(defel->defname, "streaming") == 0) { - if (*streaming_given) + if ((specified_opts & SUBOPT_STREAMING) != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *streaming_given = true; - *streaming = defGetBoolean(defel); + specified_opts |= SUBOPT_STREAMING; + vals->streaming = defGetBoolean(defel); } else ereport(ERROR, @@ -225,66 +230,83 @@ parse_subscription_options(List *options, * We've been explicitly asked to not connect, that requires some * additional processing. */ - if (connect && !*connect) + if ((supported_opts & SUBOPT_CONNECT) != 0 && + !vals->connect) { + char *incompat_opt = NULL; + /* Check for incompatible options from the user. */ - if (enabled && *enabled_given && *enabled) + if ((supported_opts & SUBOPT_ENABLED) != 0 && + (specified_opts & SUBOPT_ENABLED) != 0 && + vals->enabled) + incompat_opt = "enabled = true"; + else if ((supported_opts & SUBOPT_CREATE_SLOT) != 0 && + (specified_opts & SUBOPT_CREATE_SLOT) != 0 && + vals->create_slot) + incompat_opt = "create_slot = true"; + else if ((supported_opts & SUBOPT_COPY_DATA) != 0 && + (specified_opts & SUBOPT_COPY_DATA) != 0 && + vals->copy_data) + incompat_opt = "copy_data = true"; + + if (incompat_opt) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), /*- translator: both %s are strings of the form "option = value" */ errmsg("%s and %s are mutually exclusive options", - "connect = false", "enabled = true"))); - - if (create_slot && create_slot_given && *create_slot) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("%s and %s are mutually exclusive options", - "connect = false", "create_slot = true"))); - - if (copy_data && copy_data_given && *copy_data) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("%s and %s are mutually exclusive options", - "connect = false", "copy_data = true"))); + "connect = false", incompat_opt))); /* Change the defaults of other options. */ - *enabled = false; - *create_slot = false; - *copy_data = false; + vals->enabled = false; + vals->create_slot = false; + vals->copy_data = false; } /* * Do additional checking for disallowed combination when slot_name = NONE * was used. */ - if (slot_name && *slot_name_given && !*slot_name) + if ((supported_opts & SUBOPT_SLOT_NAME) != 0 && + (specified_opts & SUBOPT_SLOT_NAME) != 0 && + !vals->slot_name) { - if (enabled && *enabled_given && *enabled) + char *incompat_opt = NULL; + char *required_opt = NULL; + + if ((supported_opts & SUBOPT_ENABLED) != 0 && + (specified_opts & SUBOPT_ENABLED) != 0 && + vals->enabled) + incompat_opt = "enabled = true"; + else if ((supported_opts & SUBOPT_CREATE_SLOT) != 0 && + (specified_opts & SUBOPT_CREATE_SLOT) != 0 && + vals->create_slot) + incompat_opt = "create_slot = true"; + + if (incompat_opt) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), /*- translator: both %s are strings of the form "option = value" */ errmsg("%s and %s are mutually exclusive options", - "slot_name = NONE", "enabled = true"))); - - if (create_slot && create_slot_given && *create_slot) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("%s and %s are mutually exclusive options", - "slot_name = NONE", "create_slot = true"))); - - if (enabled && !*enabled_given && *enabled) + "slot_name = NONE", incompat_opt))); + + if ((supported_opts & SUBOPT_ENABLED) != 0 && + (specified_opts & SUBOPT_ENABLED) == 0 && + vals->enabled) + required_opt = "enabled = false"; + else if ((supported_opts & SUBOPT_CREATE_SLOT) != 0 && + (specified_opts & SUBOPT_CREATE_SLOT) == 0 && + vals->create_slot) + required_opt = "create_slot = false"; + + if (required_opt) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), /*- translator: both %s are strings of the form "option = value" */ errmsg("subscription with %s must also set %s", - "slot_name = NONE", "enabled = false"))); - - if (create_slot && !create_slot_given && *create_slot) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("subscription with %s must also set %s", - "slot_name = NONE", "create_slot = false"))); + "slot_name = NONE", required_opt))); } + + opts->specified_opts = specified_opts; } /* @@ -331,37 +353,28 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) Datum values[Natts_pg_subscription]; Oid owner = GetUserId(); HeapTuple tup; - bool connect; - bool enabled_given; - bool enabled; - bool copy_data; - bool streaming; - bool streaming_given; - char *synchronous_commit; char *conninfo; - char *slotname; - bool slotname_given; - bool binary; - bool binary_given; char originname[NAMEDATALEN]; - bool create_slot; List *publications; + SubOpts opts; + + MemSet(&opts, 0, sizeof(SubOpts)); + + opts.supported_opts |= SUBOPT_CONNECT; + opts.supported_opts |= SUBOPT_ENABLED; + opts.supported_opts |= SUBOPT_CREATE_SLOT; + opts.supported_opts |= SUBOPT_SLOT_NAME; + opts.supported_opts |= SUBOPT_COPY_DATA; + opts.supported_opts |= SUBOPT_SYNCHRONOUS_COMMIT; + opts.supported_opts |= SUBOPT_BINARY; + opts.supported_opts |= SUBOPT_STREAMING; /* * Parse and check options. * * Connection and publication should not be specified here. */ - parse_subscription_options(stmt->options, - &connect, - &enabled_given, &enabled, - &create_slot, - &slotname_given, &slotname, - ©_data, - &synchronous_commit, - NULL, /* no "refresh" */ - &binary_given, &binary, - &streaming_given, &streaming); + parse_subscription_options(stmt->options, &opts); /* * Since creating a replication slot is not transactional, rolling back @@ -369,7 +382,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * CREATE SUBSCRIPTION inside a transaction block if creating a * replication slot. */ - if (create_slot) + if (opts.vals.create_slot) PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)"); if (!superuser()) @@ -399,12 +412,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) stmt->subname))); } - if (!slotname_given && slotname == NULL) - slotname = stmt->subname; + if ((opts.specified_opts & SUBOPT_SLOT_NAME) == 0 && + opts.vals.slot_name == NULL) + opts.vals.slot_name = stmt->subname; /* The default for synchronous_commit of subscriptions is off. */ - if (synchronous_commit == NULL) - synchronous_commit = "off"; + if (opts.vals.synchronous_commit == NULL) + opts.vals.synchronous_commit = "off"; conninfo = stmt->conninfo; publications = stmt->publication; @@ -426,18 +440,18 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subname - 1] = DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); - values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); - values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); - values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming); + values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.vals.enabled); + values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.vals.binary); + values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.vals.streaming); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); - if (slotname) + if (opts.vals.slot_name) values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slotname)); + DirectFunctionCall1(namein, CStringGetDatum(opts.vals.slot_name)); else nulls[Anum_pg_subscription_subslotname - 1] = true; values[Anum_pg_subscription_subsynccommit - 1] = - CStringGetTextDatum(synchronous_commit); + CStringGetTextDatum(opts.vals.synchronous_commit); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications); @@ -456,7 +470,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Connect to remote side to execute requested commands and fetch table * info. */ - if (connect) + if (opts.vals.connect) { char *err; WalReceiverConn *wrconn; @@ -476,7 +490,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Set sync state based on if we were asked to do data copy or * not. */ - table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + table_state = opts.vals.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; /* * Get the table list from publisher and build local table status @@ -503,15 +517,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * won't use the initial snapshot for anything, so no need to * export it. */ - if (create_slot) + if (opts.vals.create_slot) { - Assert(slotname); + Assert(opts.vals.slot_name); - walrcv_create_slot(wrconn, slotname, false, + walrcv_create_slot(wrconn, opts.vals.slot_name, false, CRS_NOEXPORT_SNAPSHOT, NULL); ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", - slotname))); + opts.vals.slot_name))); } } PG_FINALLY(); @@ -528,7 +542,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) table_close(rel, RowExclusiveLock); - if (enabled) + if (opts.vals.enabled) ApplyLauncherWakeupAtCommit(); ObjectAddressSet(myself, SubscriptionRelationId, subid); @@ -797,59 +811,51 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) { case ALTER_SUBSCRIPTION_OPTIONS: { - char *slotname; - bool slotname_given; - char *synchronous_commit; - bool binary_given; - bool binary; - bool streaming_given; - bool streaming; - - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - NULL, NULL, /* no "enabled" */ - NULL, /* no "create_slot" */ - &slotname_given, &slotname, - NULL, /* no "copy_data" */ - &synchronous_commit, - NULL, /* no "refresh" */ - &binary_given, &binary, - &streaming_given, &streaming); - - if (slotname_given) + SubOpts opts; + + MemSet(&opts, 0, sizeof(SubOpts)); + + opts.supported_opts |= SUBOPT_SLOT_NAME; + opts.supported_opts |= SUBOPT_SYNCHRONOUS_COMMIT; + opts.supported_opts |= SUBOPT_BINARY; + opts.supported_opts |= SUBOPT_STREAMING; + + parse_subscription_options(stmt->options, &opts); + + if ((opts.specified_opts & SUBOPT_SLOT_NAME) != 0) { - if (sub->enabled && !slotname) + if (sub->enabled && !opts.vals.slot_name) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot set %s for enabled subscription", "slot_name = NONE"))); - if (slotname) + if (opts.vals.slot_name) values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slotname)); + DirectFunctionCall1(namein, CStringGetDatum(opts.vals.slot_name)); else nulls[Anum_pg_subscription_subslotname - 1] = true; replaces[Anum_pg_subscription_subslotname - 1] = true; } - if (synchronous_commit) + if (opts.vals.synchronous_commit) { values[Anum_pg_subscription_subsynccommit - 1] = - CStringGetTextDatum(synchronous_commit); + CStringGetTextDatum(opts.vals.synchronous_commit); replaces[Anum_pg_subscription_subsynccommit - 1] = true; } - if (binary_given) + if ((opts.specified_opts & SUBOPT_BINARY) != 0) { values[Anum_pg_subscription_subbinary - 1] = - BoolGetDatum(binary); + BoolGetDatum(opts.vals.binary); replaces[Anum_pg_subscription_subbinary - 1] = true; } - if (streaming_given) + if ((opts.specified_opts & SUBOPT_STREAMING) != 0) { values[Anum_pg_subscription_substream - 1] = - BoolGetDatum(streaming); + BoolGetDatum(opts.vals.streaming); replaces[Anum_pg_subscription_substream - 1] = true; } @@ -859,31 +865,25 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_ENABLED: { - bool enabled, - enabled_given; - - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - &enabled_given, &enabled, - NULL, /* no "create_slot" */ - NULL, NULL, /* no "slot_name" */ - NULL, /* no "copy_data" */ - NULL, /* no "synchronous_commit" */ - NULL, /* no "refresh" */ - NULL, NULL, /* no "binary" */ - NULL, NULL); /* no streaming */ - Assert(enabled_given); - - if (!sub->slotname && enabled) + SubOpts opts; + + MemSet(&opts, 0, sizeof(SubOpts)); + + opts.supported_opts |= SUBOPT_ENABLED; + + parse_subscription_options(stmt->options, &opts); + Assert((opts.specified_opts & SUBOPT_ENABLED) != 0); + + if (!sub->slotname && opts.vals.enabled) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot enable subscription that does not have a slot name"))); values[Anum_pg_subscription_subenabled - 1] = - BoolGetDatum(enabled); + BoolGetDatum(opts.vals.enabled); replaces[Anum_pg_subscription_subenabled - 1] = true; - if (enabled) + if (opts.vals.enabled) ApplyLauncherWakeupAtCommit(); update_tuple = true; @@ -904,19 +904,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_SET_PUBLICATION: { - bool copy_data; - bool refresh; - - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - NULL, NULL, /* no "enabled" */ - NULL, /* no "create_slot" */ - NULL, NULL, /* no "slot_name" */ - ©_data, - NULL, /* no "synchronous_commit" */ - &refresh, - NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + SubOpts opts; + + MemSet(&opts, 0, sizeof(SubOpts)); + + opts.supported_opts |= SUBOPT_COPY_DATA; + opts.supported_opts |= SUBOPT_REFRESH; + + parse_subscription_options(stmt->options, &opts); + values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); replaces[Anum_pg_subscription_subpublications - 1] = true; @@ -924,7 +920,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) update_tuple = true; /* Refresh if user asked us to. */ - if (refresh) + if (opts.vals.refresh) { if (!sub->enabled) ereport(ERROR, @@ -937,7 +933,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, copy_data); + AlterSubscription_refresh(sub, opts.vals.copy_data); } break; @@ -947,23 +943,19 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_DROP_PUBLICATION: { bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION; - bool copy_data; - bool refresh; List *publist; + SubOpts opts; + + MemSet(&opts, 0, sizeof(SubOpts)); + + opts.supported_opts |= SUBOPT_REFRESH; + + if (isadd) + opts.supported_opts |= SUBOPT_COPY_DATA; publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname); - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - NULL, NULL, /* no "enabled" */ - NULL, /* no "create_slot" */ - NULL, NULL, /* no "slot_name" */ - isadd ? ©_data : NULL, /* for drop, no - * "copy_data" */ - NULL, /* no "synchronous_commit" */ - &refresh, - NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + parse_subscription_options(stmt->options, &opts); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publist); @@ -972,7 +964,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) update_tuple = true; /* Refresh if user asked us to. */ - if (refresh) + if (opts.vals.refresh) { if (!sub->enabled) ereport(ERROR, @@ -985,7 +977,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) /* Only refresh the added/dropped list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, copy_data); + AlterSubscription_refresh(sub, opts.vals.copy_data); } break; @@ -993,27 +985,22 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_REFRESH: { - bool copy_data; + SubOpts opts; + + MemSet(&opts, 0, sizeof(SubOpts)); + + opts.supported_opts |= SUBOPT_COPY_DATA; if (!sub->enabled) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - NULL, NULL, /* no "enabled" */ - NULL, /* no "create_slot" */ - NULL, NULL, /* no "slot_name" */ - ©_data, - NULL, /* no "synchronous_commit" */ - NULL, /* no "refresh" */ - NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + parse_subscription_options(stmt->options, &opts); PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); - AlterSubscription_refresh(sub, copy_data); + AlterSubscription_refresh(sub, opts.vals.copy_data); break; } diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 3b926f35d7..59461f7b6f 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -26,4 +26,36 @@ extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel); extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId); extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); +typedef struct SubOptVals +{ + bool connect; + bool enabled; + bool create_slot; + char *slot_name; + bool copy_data; + char *synchronous_commit; + bool refresh; + bool binary; + bool streaming; +} SubOptVals; + +/* options for CREATE/ALTER SUBSCRIPTION  */ +typedef struct SubOpts +{ + bits32 supported_opts; /* bitmask of supported SUBOPT_* */ + bits32 specified_opts; /* bitmask of user specified SUBOPT_* */ + SubOptVals vals; +} SubOpts; + +#define SUBOPT_NONE 0x00000000 +#define SUBOPT_CONNECT 0x00000001 +#define SUBOPT_ENABLED 0x00000002 +#define SUBOPT_CREATE_SLOT 0x00000004 +#define SUBOPT_SLOT_NAME 0x00000008 +#define SUBOPT_COPY_DATA 0x00000010 +#define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020 +#define SUBOPT_REFRESH 0x00000040 +#define SUBOPT_BINARY 0x00000080 +#define SUBOPT_STREAMING 0x00000100 + #endif /* SUBSCRIPTIONCMDS_H */ -- 2.25.1