diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 8e6aef332c..7879b12af1 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -4637,6 +4637,7 @@ CommitSubTransaction(void) AtEOSubXact_HashTables(true, s->nestingLevel); AtEOSubXact_PgStat(true, s->nestingLevel); AtSubCommit_Snapshot(s->nestingLevel); + AtEOSubXact_ApplyLauncher(true, s->nestingLevel); /* * We need to restore the upper transaction's read-only state, in case the @@ -4790,6 +4791,7 @@ AbortSubTransaction(void) AtEOSubXact_HashTables(false, s->nestingLevel); AtEOSubXact_PgStat(false, s->nestingLevel); AtSubAbort_Snapshot(s->nestingLevel); + AtEOSubXact_ApplyLauncher(false, s->nestingLevel); } /* diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 6ef333b725..e3c24fcfae 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -79,7 +79,17 @@ typedef struct LogicalRepWorkerId Oid relid; } LogicalRepWorkerId; -static List *on_commit_stop_workers = NIL; +typedef struct StopWorkersData +{ + int nestDepth; + List *workers; + struct StopWorkersData *next; +} StopWorkersData; + +static StopWorkersData on_commit_stop_workers = +{ + 1, NIL, NULL +}; static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); @@ -559,17 +569,46 @@ logicalrep_worker_stop(Oid subid, Oid relid) void logicalrep_worker_stop_at_commit(Oid subid, Oid relid) { + int nestDepth = GetCurrentTransactionNestLevel(); LogicalRepWorkerId *wid; MemoryContext oldctx; /* Make sure we store the info in context that survives until commit. */ oldctx = MemoryContextSwitchTo(TopTransactionContext); + /* Allocate new tracking object. */ wid = palloc(sizeof(LogicalRepWorkerId)); wid->subid = subid; wid->relid = relid; - on_commit_stop_workers = lappend(on_commit_stop_workers, wid); + /* Check that previous transactions were properly cleaned up. */ + Assert(nestDepth >= on_commit_stop_workers.nestDepth); + + /* Now add the new object to the existing list, or create a new one. */ + if (nestDepth == on_commit_stop_workers.nestDepth || + on_commit_stop_workers.workers == NIL) + { + /* + * Either we're adding another entries at the same transaction nesting + * depth as before, or there are no entries with any other nesting + * depth. + */ + on_commit_stop_workers.nestDepth = nestDepth; + on_commit_stop_workers.workers = + lappend(on_commit_stop_workers.workers, wid); + } + else + { + /* + * There are entries at multiple nesting depths; don't mix them + * together. + */ + StopWorkersData *newdata = palloc(sizeof(StopWorkersData)); + + memcpy(newdata, &on_commit_stop_workers, sizeof(StopWorkersData)); + on_commit_stop_workers.workers = list_make1(wid); + on_commit_stop_workers.next = newdata; + } MemoryContextSwitchTo(oldctx); } @@ -823,7 +862,8 @@ ApplyLauncherShmemInit(void) bool XactManipulatesLogicalReplicationWorkers(void) { - return (on_commit_stop_workers != NIL); + return on_commit_stop_workers.workers != NIL || + on_commit_stop_workers.next != NULL; } /* @@ -832,11 +872,14 @@ XactManipulatesLogicalReplicationWorkers(void) void AtEOXact_ApplyLauncher(bool isCommit) { + Assert(on_commit_stop_workers.nestDepth == 1); + Assert(on_commit_stop_workers.next == NULL); + if (isCommit) { ListCell *lc; - foreach(lc, on_commit_stop_workers) + foreach(lc, on_commit_stop_workers.workers) { LogicalRepWorkerId *wid = lfirst(lc); @@ -848,13 +891,62 @@ AtEOXact_ApplyLauncher(bool isCommit) } /* - * No need to pfree on_commit_stop_workers. It was allocated in + * No need to pfree on_commit_stop_workers.workers. It was allocated in * transaction memory context, which is going to be cleaned soon. */ - on_commit_stop_workers = NIL; + on_commit_stop_workers.workers = NIL; on_commit_launcher_wakeup = false; } +void +AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth) +{ + StopWorkersData *old = on_commit_stop_workers.next; + + /* Exit immediately if there's no work to do at this level. */ + Assert(on_commit_stop_workers.nestDepth <= nestDepth); + if (nestDepth != on_commit_stop_workers.nestDepth) + return; + + /* + * If we're aborting, forget about everything that was done at this + * nesting level. Explicitly free memory to avoid a transaction-lifespan + * leak. + */ + if (!isCommit) + { + list_free_deep(on_commit_stop_workers.workers); + on_commit_stop_workers.workers = NIL; + } + + /* + * If there are no pending stop-worker requests at outer nesting levels, + * we can just decrement the notional nesting depth without doing any real + * work. + * + * We can also handle it this way if there are separate lists that are + * separated by more than 1 nesting level: they can't yet be merged, + * because there might be a rollback at an interventing level. + */ + if (old == NULL || (old->nestDepth < nestDepth - 1 && + on_commit_stop_workers.workers != NIL)) + { + on_commit_stop_workers.nestDepth--; + return; + } + + /* + * We need to pop the stack. Any remaining entries at this level should + * be merged with those from the next level. + */ + on_commit_stop_workers.workers = + list_concat(on_commit_stop_workers.workers, + on_commit_stop_workers.next->workers); + on_commit_stop_workers.nestDepth = old->nestDepth; + on_commit_stop_workers.next = old->next; + pfree(old); +} + /* * Request wakeup of the launcher on commit of the transaction. * diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index ef02512412..76c28c655b 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -23,6 +23,7 @@ extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherWakeupAtCommit(void); extern bool XactManipulatesLogicalReplicationWorkers(void); +extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth); extern void AtEOXact_ApplyLauncher(bool isCommit); extern bool IsLogicalLauncher(void);