diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 8e6aef3..25544d6 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -33,6 +33,7 @@ #include "catalog/namespace.h" #include "catalog/storage.h" #include "commands/async.h" +#include "commands/subscriptioncmds.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/spi.h" @@ -2128,6 +2129,7 @@ CommitTransaction(void) AtEOXact_HashTables(true); AtEOXact_PgStat(true); AtEOXact_Snapshot(true, false); + AtEOXact_Subscription(); AtEOXact_ApplyLauncher(true); pgstat_report_xact_timestamp(0); @@ -2607,6 +2609,7 @@ AbortTransaction(void) AtEOXact_ComboCid(); AtEOXact_HashTables(false); AtEOXact_PgStat(false); + AtEOXact_Subscription(); AtEOXact_ApplyLauncher(false); pgstat_report_xact_timestamp(0); } @@ -4534,6 +4537,7 @@ StartSubTransaction(void) AtSubStart_ResourceOwner(); AtSubStart_Notify(); AfterTriggerBeginSubXact(); + AtSubStart_ApplyLauncher(); s->state = TRANS_INPROGRESS; @@ -4637,6 +4641,7 @@ CommitSubTransaction(void) AtEOSubXact_HashTables(true, s->nestingLevel); AtEOSubXact_PgStat(true, s->nestingLevel); AtSubCommit_Snapshot(s->nestingLevel); + AtEOSubXact_ApplyLauncher(true, s->nestingLevel); /* * We need to restore the upper transaction's read-only state, in case the @@ -4790,6 +4795,7 @@ AbortSubTransaction(void) AtEOSubXact_HashTables(false, s->nestingLevel); AtEOSubXact_PgStat(false, s->nestingLevel); AtSubAbort_Snapshot(s->nestingLevel); + AtEOSubXact_ApplyLauncher(false, s->nestingLevel); } /* diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 8705d8b..0b1212a 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -429,12 +429,12 @@ RemoveSubscriptionRel(Oid subid, Oid relid) /* - * Get all relations for subscription. + * Get reloids of all relations for subscription. * - * Returned list is palloc'ed in current memory context. + * Returned list is palloc'ed in the specified 'memcxt' */ List * -GetSubscriptionRelations(Oid subid) +GetSubscriptionRelids(Oid subid, MemoryContext memcxt) { List *res = NIL; Relation rel; @@ -442,6 +442,7 @@ GetSubscriptionRelations(Oid subid) int nkeys = 0; ScanKeyData skey[2]; SysScanDesc scan; + MemoryContext old_context; rel = heap_open(SubscriptionRelRelationId, AccessShareLock); @@ -453,20 +454,15 @@ GetSubscriptionRelations(Oid subid) scan = systable_beginscan(rel, InvalidOid, false, NULL, nkeys, skey); + old_context = MemoryContextSwitchTo(memcxt); while (HeapTupleIsValid(tup = systable_getnext(scan))) { Form_pg_subscription_rel subrel; - SubscriptionRelState *relstate; subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); - - relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); - relstate->relid = subrel->srrelid; - relstate->state = subrel->srsubstate; - relstate->lsn = subrel->srsublsn; - - res = lappend(res, relstate); + res = lappend_oid(res, subrel->srrelid); } + MemoryContextSwitchTo(old_context); /* Cleanup */ systable_endscan(scan); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f138e61..a4eee2e 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -51,6 +51,19 @@ #include "utils/memutils.h" #include "utils/syscache.h" + +/* + * List of subscriptions, each containing the relations for that subscription. + * Each element has the relids for a given subscription that were updated on + * the last COMMIT. For a subid, there exists an entry in this list only when + * the subscription relations are altered. Once the transaction ends, this list + * is again set back to NIL. This is done so that during commit, we know + * exactly which workers to stop: the relations for the last altered + * subscription should be compared with the relations for the last committed + * subscription changes. + */ +static List *committed_subrels_list = NIL; + static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); /* @@ -504,9 +517,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) { char *err; List *pubrel_names; - List *subrel_states; + List *subrelids; + SubscriptionRels *committed_subrels = NULL; Oid *subrel_local_oids; Oid *pubrel_local_oids; + List *stop_relids = NIL; ListCell *lc; int off; @@ -525,24 +540,55 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) /* We are done with the remote side, close connection. */ walrcv_disconnect(wrconn); - /* Get local table list. */ - subrel_states = GetSubscriptionRelations(sub->oid); + /* Get the committed subrels for the given subscription */ + foreach(lc, committed_subrels_list) + { + SubscriptionRels *subrels = (SubscriptionRels *) lfirst(lc); + + if (sub->oid == subrels->subid) + { + committed_subrels = subrels; + break; + } + } + + /* + * Get local table list. If we are creating the committed subrel list for + * the first time for this subscription in this transaction, we need to + * maintain this subrel list until transaction end. + */ + subrelids = GetSubscriptionRelids(sub->oid, + committed_subrels ? + CurrentMemoryContext : TopTransactionContext); + + if (!committed_subrels) + { + /* + * We don't have a committed subrel for this subscription. Create one + * in TopMemoryContext. + */ + SubscriptionRels *subrels; + MemoryContext old_context; + + old_context = MemoryContextSwitchTo(TopTransactionContext); + subrels = palloc(sizeof(SubscriptionRels)); + subrels->subid = sub->oid; + /* The subrelids are already allocated in TopTransactionContext */ + subrels->relids = subrelids; + committed_subrels_list = lappend(committed_subrels_list, subrels); + MemoryContextSwitchTo(old_context); + } /* * Build qsorted array of local table oids for faster lookup. This can * potentially contain all tables in the database so speed of lookup is * important. */ - subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); + subrel_local_oids = palloc(list_length(subrelids) * sizeof(Oid)); off = 0; - foreach(lc, subrel_states) - { - SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); - - subrel_local_oids[off++] = relstate->relid; - } - qsort(subrel_local_oids, list_length(subrel_states), - sizeof(Oid), oid_cmp); + foreach(lc, subrelids) + subrel_local_oids[off++] = lfirst_oid(lc); + qsort(subrel_local_oids, list_length(subrelids), sizeof(Oid), oid_cmp); /* * Walk over the remote tables and try to match them to locally known @@ -567,7 +613,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) pubrel_local_oids[off++] = relid; if (!bsearch(&relid, subrel_local_oids, - list_length(subrel_states), sizeof(Oid), oid_cmp)) + list_length(subrelids), sizeof(Oid), oid_cmp)) { AddSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, @@ -585,7 +631,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) qsort(pubrel_local_oids, list_length(pubrel_names), sizeof(Oid), oid_cmp); - for (off = 0; off < list_length(subrel_states); off++) + for (off = 0; off < list_length(subrelids); off++) { Oid relid = subrel_local_oids[off]; @@ -594,7 +640,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) { RemoveSubscriptionRel(sub->oid, relid); - logicalrep_worker_stop_at_commit(sub->oid, relid); + /* + * If committed subrels are not allocated, the subrel_local_oids + * already contain the committed subrels, so use them to derive the + * workers to be stopped. + */ + if (!committed_subrels) + stop_relids = lappend_oid(stop_relids, relid); ereport(DEBUG1, (errmsg("table \"%s.%s\" removed from subscription \"%s\"", @@ -603,6 +655,24 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) sub->name))); } } + + /* + * Now derive the workers to be stopped using the committed reloids. At + * commit time, we will terminate them. + */ + if (committed_subrels) + { + foreach(lc, committed_subrels->relids) + { + Oid relid = lfirst_oid(lc); + + if (!bsearch(&relid, pubrel_local_oids, + list_length(pubrel_names), sizeof(Oid), oid_cmp)) + stop_relids = lappend_oid(stop_relids, relid); + } + } + + logicalrep_insert_stop_workers(sub->oid, stop_relids); } /* @@ -1172,3 +1242,17 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) return tablelist; } + +/* + * Cleanup function for objects maintained during the transaction by + * subscription-refresh operation. + */ +void +AtEOXact_Subscription(void) +{ + /* + * No need to pfree the list. In fact, it must have been already + * freed because it was allocated in TopTransactionContext. + */ + committed_subrels_list = NIL; +} diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 6ef333b..252114b 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -73,13 +73,28 @@ typedef struct LogicalRepCtxStruct LogicalRepCtxStruct *LogicalRepCtx; -typedef struct LogicalRepWorkerId +/* + * List of SubscriptionRels. This list represents the subscription relations + * for which the synchronization workers are to be stopped. Each subtransaction + * has its own list. Once a subtransaction starts, this list is pushed into + * the 'subtrans_stop_workers' stack. + */ +static List *on_commit_stop_workers = NIL; + +typedef struct SubTransOnCommitStopWorkers { - Oid subid; - Oid relid; -} LogicalRepWorkerId; + struct SubTransOnCommitStopWorkers *parent; /* This might not be an + * immediate parent */ + int nest_level; /* Sub-transaction nest level */ + List *stop_workers; /* on_commit_stop_workers for this + * subtransaction */ +} SubTransOnCommitStopWorkers; -static List *on_commit_stop_workers = NIL; +/* + * Stack of on_commit_stop_workers. Each stack element belongs to one + * particular subtransaction. + */ +static SubTransOnCommitStopWorkers *subtrans_stop_workers = NULL; static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); @@ -554,22 +569,44 @@ logicalrep_worker_stop(Oid subid, Oid relid) } /* - * Request worker for specified sub/rel to be stopped on commit. + * Request workers for the specified relids of a subscription to be stopped on + * commit. This replaces the earlier saved reloids of a given subscription. */ void -logicalrep_worker_stop_at_commit(Oid subid, Oid relid) +logicalrep_insert_stop_workers(Oid subid, List *relids) { - LogicalRepWorkerId *wid; + ListCell *lc; MemoryContext oldctx; - /* Make sure we store the info in context that survives until commit. */ + foreach(lc, on_commit_stop_workers) + { + SubscriptionRels *subrels = lfirst(lc); + if (subrels->subid == subid) + break; + } + + /* Make sure we store the info in a context that survives until commit. */ oldctx = MemoryContextSwitchTo(TopTransactionContext); - wid = palloc(sizeof(LogicalRepWorkerId)); - wid->subid = subid; - wid->relid = relid; + /* Didn't find a sub ? Insert a new one */ + if (lc == NULL) + { + SubscriptionRels *subrels; + + + subrels = palloc(sizeof(SubscriptionRels)); + subrels->subid = subid; + subrels->relids = list_copy(relids); + on_commit_stop_workers = lappend(on_commit_stop_workers, subrels); - on_commit_stop_workers = lappend(on_commit_stop_workers, wid); + } + else + { + /* Replace the existing reloids with the new set */ + SubscriptionRels *subrels = lfirst(lc); + list_free(subrels->relids); + subrels->relids = list_copy(relids); + } MemoryContextSwitchTo(oldctx); } @@ -827,20 +864,54 @@ XactManipulatesLogicalReplicationWorkers(void) } /* + * AtSubStart_ApplyLauncher() --- Take care of subtransaction start. + * + * Push the current on_commit_stop_workers into the stack. + */ +void +AtSubStart_ApplyLauncher(void) +{ + + if (on_commit_stop_workers != NIL) + { + SubTransOnCommitStopWorkers *temp; + MemoryContext old_cxt; + + /* Keep the stack elements in TopTransactionContext for simplicity */ + old_cxt = MemoryContextSwitchTo(TopTransactionContext); + + temp = palloc(sizeof(SubTransOnCommitStopWorkers)); + temp->parent = subtrans_stop_workers; + temp->nest_level = GetCurrentTransactionNestLevel() - 1; + temp->stop_workers = on_commit_stop_workers; + subtrans_stop_workers = temp; + + on_commit_stop_workers = NIL; + + MemoryContextSwitchTo(old_cxt); + } +} + + +/* * Wakeup the launcher on commit if requested. */ void AtEOXact_ApplyLauncher(bool isCommit) { + Assert(subtrans_stop_workers == NULL); + if (isCommit) { - ListCell *lc; + ListCell *wlc; - foreach(lc, on_commit_stop_workers) + foreach(wlc, on_commit_stop_workers) { - LogicalRepWorkerId *wid = lfirst(lc); + SubscriptionRels *subrels = lfirst(wlc); + ListCell *rlc; - logicalrep_worker_stop(wid->subid, wid->relid); + foreach(rlc, subrels->relids) + logicalrep_worker_stop(subrels->subid, lfirst_oid(rlc)); } if (on_commit_launcher_wakeup) @@ -852,10 +923,122 @@ AtEOXact_ApplyLauncher(bool isCommit) * transaction memory context, which is going to be cleaned soon. */ on_commit_stop_workers = NIL; + subtrans_stop_workers = NULL; on_commit_launcher_wakeup = false; } /* + * On commit, merge the on_commit_stop_workers list into the immediate parent, + * if present. + * On rollback, discard the on_commit_stop_workers list. + * Pop out the immediate parent stack element, and assign it's workers list + * to the on_commit_stop_workers list. + */ +void +AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth) +{ + + if (isCommit) + { + MemoryContext oldctx; + ListCell *lc; + + /* + * Make sure we store the info in a context that survives until commit. + */ + oldctx = MemoryContextSwitchTo(TopTransactionContext); + + /* + * If the upper level is present, and it is not an immediate + * parent subtransaction, we don't have to do anything; the current + * on_commit_stop_workers will be regarded as belonging to the + * immediate parent sub-transaction. But if the upper level is an + * immediate parent subtransaction, we need to merge the current + * on_commit_stop_workers list into the immediate parent, make this + * merged list as the current on_commit_stop_workers list. + */ + if (subtrans_stop_workers != NULL && + subtrans_stop_workers->nest_level == nestDepth -1) + { + List *temp_list = NIL; + + /* + * Merge the current list into the immediate parent. + * So say, parent has sub1(tab1, tab2), sub2(tab2, tab3), + * and current on_commit_workers has sub2(tab4) and sub3(tab1), + * then the merged list will have : + * sub1(tab1, tab2), sub2(tab4), sub3(tab1) + */ + foreach(lc, on_commit_stop_workers) + { + SubscriptionRels *subrels = lfirst(lc); + ListCell *lc1; + + /* Search this subrel in the subrels of the top of stack. */ + foreach(lc1, subtrans_stop_workers->stop_workers) + { + SubscriptionRels *stack_subrels = lfirst(lc1); + + if (subrels->subid == stack_subrels->subid) + break; + } + + if (lc1 == NULL) + { + /* + * Didn't find a subscription in the stack element. So + * insert it. + */ + temp_list = lappend(temp_list, subrels); + } + else + { + /* + * Replace the earlier subrels of this subscription with + * the new subrels. + */ + SubscriptionRels *stack_subrels = lfirst(lc1); + + list_free(stack_subrels->relids); + pfree(stack_subrels); + lfirst(lc1) = subrels; + } + + } + /* Add the new subscriptions that were not present in outer level */ + subtrans_stop_workers->stop_workers = + list_concat(subtrans_stop_workers->stop_workers, temp_list); + } + + MemoryContextSwitchTo(oldctx); + } + else + { + /* Abandon the current subtransaction workers list. */ + list_free(on_commit_stop_workers); + on_commit_stop_workers = NIL; + } + + /* + * This is common for commit and abort. For commit, above we have already + * merged the current list into parent. + */ + if (subtrans_stop_workers != NULL && + subtrans_stop_workers->nest_level == nestDepth -1) + { + SubTransOnCommitStopWorkers *temp; + + /* Make the parent transaction list as the current stop_workers. */ + on_commit_stop_workers = subtrans_stop_workers->stop_workers; + + /* Pop out the stack element */ + temp = subtrans_stop_workers->parent; + pfree(subtrans_stop_workers); + subtrans_stop_workers = temp; + } +} + +/* * Request wakeup of the launcher on commit of the transaction. * * This is used to send launcher signal to stop sleeping and process the diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 556cb94..c27930b 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -67,6 +67,12 @@ typedef struct SubscriptionRelState char state; } SubscriptionRelState; +typedef struct SubscriptionRels +{ + Oid subid; + List *relids; /* Subset of relations in the subscription. */ +} SubscriptionRels; + extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn); extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state, @@ -75,7 +81,7 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, bool missing_ok); extern void RemoveSubscriptionRel(Oid subid, Oid relid); -extern List *GetSubscriptionRelations(Oid subid); +extern List *GetSubscriptionRelids(Oid subid, MemoryContext memcxt); extern List *GetSubscriptionNotReadyRelations(Oid subid); #endif /* PG_SUBSCRIPTION_REL_H */ diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 6d70ad7..e14b91e 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -25,5 +25,6 @@ extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel); extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId); extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); +extern void AtEOXact_Subscription(void); #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index ef02512..aa02041 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -23,6 +23,8 @@ extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherWakeupAtCommit(void); extern bool XactManipulatesLogicalReplicationWorkers(void); +extern void AtSubStart_ApplyLauncher(void); +extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth); extern void AtEOXact_ApplyLauncher(bool isCommit); extern bool IsLogicalLauncher(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 1ce3b6b..1da6d6d 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -75,7 +75,7 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running); extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid); extern void logicalrep_worker_stop(Oid subid, Oid relid); -extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid); +extern void logicalrep_insert_stop_workers(Oid subid, List *relids); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);