AtEOXact_ApplyLauncher() and subtransactions

Started by Amit Khandekarover 7 years ago12 messages
#1Amit Khandekar
amitdkhan.pg@gmail.com
2 attachment(s)

Hi,

When a SUBSCRIPTION is altered, then the currently running
table-synchronization workers that are no longer needed for the
altered subscription, are terminated. This is done by the function
AtEOXact_ApplyLauncher() inside CommitTransaction(). So during each
ALTER-SUBSCRIPTION command, the on_commit_stop_workers list is
appended with new set of workers to be stopped. And then at commit,
AtEOXact_ApplyLauncher() stops the workers in the list.

But there is no handling for sub-transaction abort. Consider this :

-- On secondary, create a subscription that will initiate the table sync
CREATE SUBSCRIPTION mysub CONNECTION
'dbname=postgres host=localhost user=rep password=Password port=5432'
PUBLICATION mypub;

-- While the sync is going on, change the publication of the
-- subscription in a subtransaction, so that it adds up the synchronization
-- workers for the earlier publication into the 'on_commit_stop_workers' list
select pg_sleep(1);
begin;
savepoint a;
ALTER SUBSCRIPTION mysub SET PUBLICATION mypub_2;
rollback to a;
-- Commit will stop the above sync.
commit;

So the ALTER-SUBSCRIPTION has been rolled back. But the
on_commit_stop_workers is not reverted back to its earlier state. And
then at commit, the workers will be killed unnecessarily. I have
observed that when the workers are killed, they are restarted. But
consider the case where the original subscription was synchronizing
hundreds of tables. And just when it is about to finish, someone
changes the subscription inside a subtransaction and rolls it back,
and then commits the transaction. The whole synchronization gets
re-started from scratch.

Below log snippets show this behaviour :

< CREATE-SUBSCRIPTION command spawns workers >
2018-06-05 15:04:07.841 IST [39951] LOG: logical replication apply
worker for subscription "mysub" has started
2018-06-05 15:04:07.848 IST [39953] LOG: logical replication table
synchronization worker for subscription "mysub", table "article" has
started

< After some time, ALTER-SUBSCRIPTION rollbacks inside subtransaction,
and commits transaction. Workers are killed >
2018-06-05 15:04:32.903 IST [39953] FATAL: terminating logical
replication worker due to administrator command
2018-06-05 15:04:32.903 IST [39953] CONTEXT: COPY article, line 37116293
2018-06-05 15:04:32.904 IST [39666] LOG: background worker "logical
replication worker" (PID 39953) exited with exit code 1

< Workers are restarted >
2018-06-05 15:04:32.909 IST [40003] LOG: logical replication table
synchronization worker for subscription "mysub", table "article" has
started
< Synchronization done after some time >
2018-06-05 15:05:10.042 IST [40003] LOG: logical replication table
synchronization worker for subscription "mysub", table "article" has
finished

-----------

To reproduce the issue :
1. On master server, create the tables and publications using attached
setup_master.sql.
2. On secondary server, run the attached run_on_secondary.sql. This
will reproduce the issue as can be seen from the log output.

-----------

Fix :

I haven't written a patch for it, but I think we should have a
separate on_commit_stop_workers for each subtransaction. At
subtransaction commit, we replace the on_commit_stop_workers list of
the parent subtransaction with the one from the committed
subtransaction; and on abort, discard the list of the current
subtransaction. So have a stack of the lists.

-----------

Furthermore, before fixing this, we may have to fix another issue
which, I suspect, would surface even without subtransactions, as
explained below :

Suppose publication pubx is set for tables tx1 and and tx2.
And publication puby is for tables ty1 and ty2.

Subscription mysub is set to synchronise tables tx1 and tx2 :
CREATE SUBSCRIPTION mysub ... PUBLICATION pubx;

Now suppose the subscription is altered to synchronise ty1 and ty2,
and then again altered back to synchronise tx1 and tx2 in the same
transaction.
begin;
ALTER SUBSCRIPTION mysub set publication puby;
ALTER SUBSCRIPTION mysub set publication pubx;
commit;

Here, workers for tx1 and tx2 are added to on_commit_stop_workers
after the publication is set to puby. And then workers for ty1 and ty2
are further added to that list after the 2nd ALTER command where
publication is set to pubx, because the earlier ALTER command has
already changed the catalogs to denote that ty1 and ty2 are being
synchronised. Effectively, at commit, all the workers are targetted to
be stopped, when actually at commit time there is no final change in
the tables to be synchronised. What actually we should do is : for
each ALTER command we should compare the very first set of tables
being synchronised since the last committed ALTER command, rather than
checking the catalogs.

--
Thanks,
-Amit Khandekar
EnterpriseDB Corporation
The Postgres Database Company

Attachments:

setup_master.sqlapplication/octet-stream; name=setup_master.sqlDownload
run_on_secondary.sqlapplication/octet-stream; name=run_on_secondary.sqlDownload
#2Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Amit Khandekar (#1)
Re: AtEOXact_ApplyLauncher() and subtransactions

On 2018-Jun-05, Amit Khandekar wrote:

When a SUBSCRIPTION is altered, then the currently running
table-synchronization workers that are no longer needed for the
altered subscription, are terminated. This is done by the function
AtEOXact_ApplyLauncher() inside CommitTransaction(). So during each
ALTER-SUBSCRIPTION command, the on_commit_stop_workers list is
appended with new set of workers to be stopped. And then at commit,
AtEOXact_ApplyLauncher() stops the workers in the list.

But there is no handling for sub-transaction abort.

Peter, any comments here?

--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#3Peter Eisentraut
peter.eisentraut@2ndquadrant.com
In reply to: Amit Khandekar (#1)
Re: AtEOXact_ApplyLauncher() and subtransactions

On 6/5/18 07:02, Amit Khandekar wrote:

I haven't written a patch for it, but I think we should have a
separate on_commit_stop_workers for eachyou get subtransaction. At
subtransaction commit, we replace the on_commit_stop_workers list of
the parent subtransaction with the one from the committed
subtransaction; and on abort, discard the list of the current
subtransaction. So have a stack of the lists.

Is there maybe a more general mechanism we could attach this to? Maybe
resource owners?

Subscription mysub is set to synchronise tables tx1 and tx2 :
CREATE SUBSCRIPTION mysub ... PUBLICATION pubx;

Now suppose the subscription is altered to synchronise ty1 and ty2,
and then again altered back to synchronise tx1 and tx2 in the same
transaction.
begin;
ALTER SUBSCRIPTION mysub set publication puby;
ALTER SUBSCRIPTION mysub set publication pubx;
commit;

Here, workers for tx1 and tx2 are added to on_commit_stop_workers
after the publication is set to puby. And then workers for ty1 and ty2
are further added to that list after the 2nd ALTER command where
publication is set to pubx, because the earlier ALTER command has
already changed the catalogs to denote that ty1 and ty2 are being
synchronised. Effectively, at commit, all the workers are targetted to
be stopped, when actually at commit time there is no final change in
the tables to be synchronised.

I'm not so bothered about this scenario. When you drop and then
recreate a table in the same transaction, that doesn't mean you keep the
data that was previously in the table. If you want to *undo* a change,
you need to do rollback, not commit further changes that try to recreate
the previous state.

--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#4Amit Khandekar
amitdkhan.pg@gmail.com
In reply to: Peter Eisentraut (#3)
1 attachment(s)
Re: AtEOXact_ApplyLauncher() and subtransactions

On 15 June 2018 at 09:46, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:

On 6/5/18 07:02, Amit Khandekar wrote:

I haven't written a patch for it, but I think we should have a
separate on_commit_stop_workers for eachyou get subtransaction. At
subtransaction commit, we replace the on_commit_stop_workers list of
the parent subtransaction with the one from the committed
subtransaction; and on abort, discard the list of the current
subtransaction. So have a stack of the lists.

Is there maybe a more general mechanism we could attach this to? Maybe
resource owners?

You mean using something like ResourceOwnerRelease() ? We need to
merge the on_commit_stop_workers list into the parent transaction's
list. So we can't release the whole list on commit.

The way I am implementing this can be seen in attached
apply_launcher_subtrans_WIP.patch. (check launcher.c changes). I
haven't started testing it yet though.
on_commit_stop_workers denotes a list of subscriptions, and each
element also contains a list of relations for that subscription.
This list is built by ALTER-SUBSCRIPTION commands that have run in the
current (sub)transaction.

At sub-transaction start, the on_commit_stop_workers is pushed into a
stack. Then on_commit_stop_workers starts with empty list. This list
is then populated by ALTER-SUBCSRIPTION commands of the current
sub-transaction.

At sub-transaction commit, this list is merged into the list of parent
subtransaction, the parent transaction stack element is popped out,
and the merged list becomes the new on_commit_stop_workers.
So say, parent has sub1(tab1, tab2), sub2(tab2, tab3), where sub means
subscription.
and current on_commit_workers has sub2(tab4) and sub3(tab1)
At commit, for subscription sub2, we should replace the outer
sub2(tab2, tab3) with the newer sub2(tab4).
So, the merged list will have :
sub1(tab1, tab2), sub2(tab4), sub3(tab1)

At sub-transaction abort, the on_commit_stop_workers is discarded. The
parent transaction worker list is assigned back to
on_commit_stop_workers.

Subscription mysub is set to synchronise tables tx1 and tx2 :
CREATE SUBSCRIPTION mysub ... PUBLICATION pubx;

Now suppose the subscription is altered to synchronise ty1 and ty2,
and then again altered back to synchronise tx1 and tx2 in the same
transaction.
begin;
ALTER SUBSCRIPTION mysub set publication puby;
ALTER SUBSCRIPTION mysub set publication pubx;
commit;

Here, workers for tx1 and tx2 are added to on_commit_stop_workers
after the publication is set to puby. And then workers for ty1 and ty2
are further added to that list after the 2nd ALTER command where
publication is set to pubx, because the earlier ALTER command has
already changed the catalogs to denote that ty1 and ty2 are being
synchronised. Effectively, at commit, all the workers are targetted to
be stopped, when actually at commit time there is no final change in
the tables to be synchronised.

I'm not so bothered about this scenario. When you drop and then
recreate a table in the same transaction, that doesn't mean you keep the
data that was previously in the table. If you want to *undo* a change,
you need to do rollback, not commit further changes that try to recreate
the previous state.

May be the example I gave is not practical. But a user can as well do
something like :

CREATE SUBSCRIPTION mysub ... PUBLICATION pub1;
...
begin;
ALTER SUBSCRIPTION mysub set publication pub2;
....
ALTER SUBSCRIPTION mysub set publication pub3;
commit;
So pub3 is yet another publication. So here the old one is not set back again.

Or may there can be ALTER SUBSCRIPTION REFRESH.

I believe on_commit_stop_workers was designed keeping in mind that all
actions are to be done only at commit; we should not immediately stop
workers. So it implies that the workers it determines in the end of
commit, also should be accurate. I have anyways included the changes
for this in the same attached patch (changes are in
subscriptioncmds.c)

--
Thanks,
-Amit Khandekar
EnterpriseDB Corporation
The Postgres Database Company

Attachments:

apply_launcher_subtrans_WIP.patchapplication/octet-stream; name=apply_launcher_subtrans_WIP.patchDownload
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index f4e5ea8..44b8fe5 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -33,6 +33,7 @@
 #include "catalog/namespace.h"
 #include "catalog/storage.h"
 #include "commands/async.h"
+#include "commands/subscriptioncmds.h"
 #include "commands/tablecmds.h"
 #include "commands/trigger.h"
 #include "executor/spi.h"
@@ -2128,6 +2129,7 @@ CommitTransaction(void)
 	AtEOXact_HashTables(true);
 	AtEOXact_PgStat(true);
 	AtEOXact_Snapshot(true, false);
+	AtEOXact_Subscription();
 	AtEOXact_ApplyLauncher(true);
 	pgstat_report_xact_timestamp(0);
 
@@ -2607,6 +2609,7 @@ AbortTransaction(void)
 		AtEOXact_ComboCid();
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false);
+		AtEOXact_Subscription();
 		AtEOXact_ApplyLauncher(false);
 		pgstat_report_xact_timestamp(0);
 	}
@@ -4534,6 +4537,7 @@ StartSubTransaction(void)
 	AtSubStart_ResourceOwner();
 	AtSubStart_Notify();
 	AfterTriggerBeginSubXact();
+	AtSubStart_ApplyLauncher();
 
 	s->state = TRANS_INPROGRESS;
 
@@ -4637,6 +4641,7 @@ CommitSubTransaction(void)
 	AtEOSubXact_HashTables(true, s->nestingLevel);
 	AtEOSubXact_PgStat(true, s->nestingLevel);
 	AtSubCommit_Snapshot(s->nestingLevel);
+	AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
 
 	/*
 	 * We need to restore the upper transaction's read-only state, in case the
@@ -4790,6 +4795,7 @@ AbortSubTransaction(void)
 		AtEOSubXact_HashTables(false, s->nestingLevel);
 		AtEOSubXact_PgStat(false, s->nestingLevel);
 		AtSubAbort_Snapshot(s->nestingLevel);
+		AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
 	}
 
 	/*
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f138e61..4f2c930 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -51,7 +51,29 @@
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 
+
+/*
+ * List of subscriptions, each containing the relations for that subscription.
+ * Each element has the relids for a given subscription that were present at
+ * the last COMMIT. For a subid, there exists an entry in this list only when
+ * the subscription relations are altered. Once the transaction ends, this list
+ * is again set back to NIL. This is done so that during commit, we know
+ * exactly which workers to stop: the relations for the last altered
+ * subscription should be compared with the relations for the last committed
+ * subscription changes.
+ */
+static List *committed_subrel_list = NIL;
+
+typedef struct SubscriptionRels
+{
+	Oid			subid;
+	int			numrels;
+	Oid		   *relids;
+} SubscriptionRels;
+
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static SubscriptionRels *get_subrels(Oid sub_oid,
+									 SubscriptionRels **commited_subrels);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -504,9 +526,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 {
 	char	   *err;
 	List	   *pubrel_names;
-	List	   *subrel_states;
-	Oid		   *subrel_local_oids;
+	SubscriptionRels *subrels;
+	SubscriptionRels *committed_subrels;
 	Oid		   *pubrel_local_oids;
+	List	   *stop_relids = NIL;
 	ListCell   *lc;
 	int			off;
 
@@ -525,24 +548,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 	/* We are done with the remote side, close connection. */
 	walrcv_disconnect(wrconn);
 
-	/* Get local table list. */
-	subrel_states = GetSubscriptionRelations(sub->oid);
-
-	/*
-	 * Build qsorted array of local table oids for faster lookup. This can
-	 * potentially contain all tables in the database so speed of lookup is
-	 * important.
-	 */
-	subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
-	off = 0;
-	foreach(lc, subrel_states)
-	{
-		SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
-
-		subrel_local_oids[off++] = relstate->relid;
-	}
-	qsort(subrel_local_oids, list_length(subrel_states),
-		  sizeof(Oid), oid_cmp);
+	subrels = get_subrels(sub->oid, &committed_subrels);
 
 	/*
 	 * Walk over the remote tables and try to match them to locally known
@@ -566,8 +572,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
 		pubrel_local_oids[off++] = relid;
 
-		if (!bsearch(&relid, subrel_local_oids,
-					 list_length(subrel_states), sizeof(Oid), oid_cmp))
+		if (!bsearch(&relid, subrels->relids,
+					 subrels->numrels, sizeof(Oid), oid_cmp))
 		{
 			AddSubscriptionRelState(sub->oid, relid,
 									copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
@@ -585,16 +591,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 	qsort(pubrel_local_oids, list_length(pubrel_names),
 		  sizeof(Oid), oid_cmp);
 
-	for (off = 0; off < list_length(subrel_states); off++)
+	for (off = 0; off < subrels->numrels; off++)
 	{
-		Oid			relid = subrel_local_oids[off];
+		Oid			relid = subrels->relids[off];
 
 		if (!bsearch(&relid, pubrel_local_oids,
 					 list_length(pubrel_names), sizeof(Oid), oid_cmp))
 		{
 			RemoveSubscriptionRel(sub->oid, relid);
 
-			logicalrep_worker_stop_at_commit(sub->oid, relid);
+			/* If these are the committed subrels, build the "stop" list right away */
+			if (subrels == committed_subrels)
+				stop_relids = lappend_oid(stop_relids, relid);
 
 			ereport(DEBUG1,
 					(errmsg("table \"%s.%s\" removed from subscription \"%s\"",
@@ -603,6 +611,24 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 							sub->name)));
 		}
 	}
+
+	/*
+	 * Now derive the workers to be stopped using the committed reloids. At
+	 * commit time, we will terminate them.
+	 */
+	if (subrels != committed_subrels)
+	{
+		for (off = 0; off < committed_subrels->numrels; off++)
+		{
+			Oid			relid = committed_subrels->relids[off];
+
+			if (!bsearch(&relid, pubrel_local_oids,
+						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
+				stop_relids = lappend_oid(stop_relids, relid);
+		}
+	}
+
+	logicalrep_insert_stop_workers(sub->oid, stop_relids);
 }
 
 /*
@@ -1172,3 +1198,88 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 
 	return tablelist;
 }
+
+/*
+ * Get the relation oids for a given subscription. Also update committed_subrels
+ * parameter with the rel oids for the relations that were present in the last
+ * committed change in the subscription.
+ */
+static SubscriptionRels *
+get_subrels(Oid sub_oid, SubscriptionRels **commited_subrels)
+{
+	ListCell   *lc;
+	int			off;
+	List	   *subrel_states;
+	Oid		   *subrel_local_oids;
+	SubscriptionRels *subrels = NULL;
+	MemoryContext old_context = CurrentMemoryContext;
+
+	/* Ger the committed subrels for the given subscription */
+	foreach(lc, committed_subrel_list)
+	{
+		SubscriptionRels *subrel = (SubscriptionRels *) lfirst(lc);
+
+		if (sub_oid == subrel->subid)
+			break;
+	}
+
+	/*
+	 * If we found a committed entry, that means an earlier ALTER-SUBSCRIPTION
+	 * has generated this entry.
+	 */
+	if (lc != NULL)
+		*commited_subrels = (SubscriptionRels *) lfirst(lc);
+
+	subrel_states = GetSubscriptionRelations(sub_oid);
+
+	/*
+	 * If we are creating this list for the first time in this transaction, we
+	 * need to maintain this list until transaction end.
+	 */
+	if (lc == NULL)
+		old_context = MemoryContextSwitchTo(TopTransactionContext);
+
+	/*
+	 * Build qsorted array of local table oids for faster lookup. This can
+	 * potentially contain all tables in the database so speed of lookup is
+	 * important.
+	 */
+	subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+	off = 0;
+	foreach(lc, subrel_states)
+	{
+		SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+
+		subrel_local_oids[off++] = relstate->relid;
+	}
+	qsort(subrel_local_oids, list_length(subrel_states), sizeof(Oid), oid_cmp);
+
+	subrels = palloc(sizeof(SubscriptionRels));
+	subrels->subid = sub_oid;
+	subrels->relids = subrel_local_oids;
+
+	/*
+	 * If there isn't already a subrel for this subscription saved, save this
+	 * one into the committed_subrel_list. And also pass this as the committed
+	 * rel oids.
+	 */
+	if (lc == NULL)
+	{
+		committed_subrel_list = lappend(committed_subrel_list, subrels);
+		*commited_subrels = subrels;
+	}
+
+	MemoryContextSwitchTo(old_context);
+
+	return subrels;
+}
+
+void
+AtEOXact_Subscription(void)
+{
+	/*
+	 * No need to pfree the list. In fact, it must have been already
+	 * freed because it was allocated in TopTransactionContext.
+	 */
+	committed_subrel_list = NIL;
+}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 6ef333b..6f462ee 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -76,11 +76,32 @@ LogicalRepCtxStruct *LogicalRepCtx;
 typedef struct LogicalRepWorkerId
 {
 	Oid			subid;
-	Oid			relid;
+	int			numrels;
+	List	   *relids;
 } LogicalRepWorkerId;
 
+typedef struct SubTransOnCommitStopWorkers
+{
+	struct SubTransOnCommitStopWorkers *parent; /* This might not be an
+												 * immediate parent */
+	int			nest_level;
+
+	/* List of subscriptions for current subtransaction nest level */
+	List	   *sub;
+} SubTransOnCommitStopWorkers;
+
+/*
+ * List of LogicalRepWorkerId elements. This list belongs to current
+ * subtransaction level
+ */
 static List *on_commit_stop_workers = NIL;
 
+/*
+ * Stack of subscription lists. Each stack element belongs to one particular
+ * subtransaction.
+ */
+static SubTransOnCommitStopWorkers *subtrans_stop_workers = NULL;
+
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
@@ -553,25 +574,42 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 	LWLockRelease(LogicalRepWorkerLock);
 }
 
-/*
- * Request worker for specified sub/rel to be stopped on commit.
- */
 void
-logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+logicalrep_insert_stop_workers(Oid subid, List *relids)
 {
-	LogicalRepWorkerId *wid;
-	MemoryContext oldctx;
+	ListCell   *lc;
 
-	/* Make sure we store the info in context that survives until commit. */
-	oldctx = MemoryContextSwitchTo(TopTransactionContext);
+	foreach(lc, on_commit_stop_workers)
+	{
+		LogicalRepWorkerId *wid = lfirst(lc);
+		if (wid->subid == subid)
+			break;
+	}
+
+	/* Didn't find a sub ? Insert a new one */
+	if (lc == NULL)
+	{
+		MemoryContext oldctx;
+		LogicalRepWorkerId *wid;
 
-	wid = palloc(sizeof(LogicalRepWorkerId));
-	wid->subid = subid;
-	wid->relid = relid;
+		/* Make sure we store the info in context that survives until commit. */
+		oldctx = MemoryContextSwitchTo(TopTransactionContext);
 
-	on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+		wid = palloc(sizeof(LogicalRepWorkerId));
+		wid->subid = subid;
+		wid->relids = list_copy(relids); /* TODO: Avoid the copy. */
+		lappend(on_commit_stop_workers, wid);
+
+		MemoryContextSwitchTo(oldctx);
+	}
+	else
+	{
+		/* Replace the existing reloids with the new set */
+		LogicalRepWorkerId *wid = lfirst(lc);
+		list_free(wid->relids);
+		wid->relids = relids;
+	}
 
-	MemoryContextSwitchTo(oldctx);
 }
 
 /*
@@ -827,20 +865,54 @@ XactManipulatesLogicalReplicationWorkers(void)
 }
 
 /*
+ * AtSubStart_ApplyLauncher() --- Take care of subtransaction start.
+ *
+ * Push the current on_commit_stop_workers into the stack.
+ */
+void
+AtSubStart_ApplyLauncher(void)
+{
+
+	if (on_commit_stop_workers != NIL)
+	{
+		SubTransOnCommitStopWorkers *temp;
+		MemoryContext old_cxt;
+
+		/* Keep the stack elements in TopTransactionContext for simplicity */
+		old_cxt = MemoryContextSwitchTo(TopTransactionContext);
+
+		temp = palloc(sizeof(SubTransOnCommitStopWorkers));
+		temp->parent = subtrans_stop_workers;
+		temp->nest_level = GetCurrentTransactionNestLevel() - 1;
+		temp->sub = on_commit_stop_workers;
+		subtrans_stop_workers = temp;
+
+		on_commit_stop_workers = NIL;
+
+		MemoryContextSwitchTo(old_cxt);
+	}
+}
+
+
+/*
  * Wakeup the launcher on commit if requested.
  */
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
+	Assert(subtrans_stop_workers == NULL);
+
 	if (isCommit)
 	{
-		ListCell   *lc;
+		ListCell   *wlc;
 
-		foreach(lc, on_commit_stop_workers)
+		foreach(wlc, on_commit_stop_workers)
 		{
-			LogicalRepWorkerId *wid = lfirst(lc);
+			LogicalRepWorkerId *wid = lfirst(wlc);
+			ListCell   *rlc;
 
-			logicalrep_worker_stop(wid->subid, wid->relid);
+			foreach(rlc, wid->relids)
+				logicalrep_worker_stop(wid->subid, lfirst_oid(rlc));
 		}
 
 		if (on_commit_launcher_wakeup)
@@ -853,6 +925,116 @@ AtEOXact_ApplyLauncher(bool isCommit)
 	 */
 	on_commit_stop_workers = NIL;
 	on_commit_launcher_wakeup = false;
+	subtrans_stop_workers = NULL;
+}
+
+/*
+ * On commit, merge the on_commit_stop_workers list into the immediate parent,
+ * if present.
+ * On rollback, discard the on_commit_stop_workers list.
+ * Pop out the immediate parent stack element, and assign it's workers list
+ * to the on_commit_stop_workers list.
+ */
+void
+AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
+{
+
+	if (isCommit)
+	{
+		MemoryContext oldctx;
+		ListCell   *lc;
+
+		/* Make sure we store the info in context that survives until commit. */
+		oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+		/*
+		 * If the upper level is present, and it is not an immediate
+		 * parent subtransaction, we don't have to do anything; the current
+		 * on_commit_stop_workers will be regarded as belonging to the
+		 * immediate parent sub-transaction. But if the upper level is an
+		 * immediate parent subtransaction, we need to merge the current
+		 * on_commit_stop_workers list into the immediate parent, make this
+		 * merged list as the current on_commit_stop_workers list.
+		 */
+		if (subtrans_stop_workers != NULL &&
+			subtrans_stop_workers->nest_level == nestDepth -1)
+		{
+			List	*temp_list = NIL;
+
+			/*
+			 * Merge the current list into the immediate parent.
+			 * So say, parent has sub1(tab1, tab2), sub2(tab2, tab3),
+			 * and current on_commit_workers has sub2(tab4) and sub3(tab1),
+			 * then the merged list will have :
+			 * sub1(tab1, tab2), sub2(tab4), sub3(tab1)
+			 */
+			foreach(lc, on_commit_stop_workers)
+			{
+				LogicalRepWorkerId *wid = lfirst(lc);
+				ListCell *lc1;
+
+				/* Search this subrel into the subrels of the top stack element */
+				foreach(lc1, subtrans_stop_workers->sub)
+				{
+					LogicalRepWorkerId *wid1 = lfirst(lc1);
+
+					if (wid->subid == wid1->subid)
+						break;
+				}
+
+				if (lc1 == NULL)
+				{
+					/*
+					 * Didn't find a subscription in the stack element. So
+					 * insert it.
+					 */
+					temp_list = lappend(temp_list, wid);
+				}
+				else
+				{
+					/*
+					 * Replace the earlier subrels of this subscription with
+					 * the new subrels.
+					 */
+					LogicalRepWorkerId *wid1 = lfirst(lc1);
+
+					list_free(wid1->relids);
+					pfree(wid1);
+					lfirst(lc1) = wid;
+				}
+
+			}
+			/* Add the new subscriptions that were not present in outer level */
+			subtrans_stop_workers->sub =
+				list_concat(subtrans_stop_workers->sub, temp_list);
+		}
+
+		MemoryContextSwitchTo(oldctx);
+	}
+	else
+	{
+		/* Abandon the current subtransaction workers list. */
+		list_free(on_commit_stop_workers);
+		on_commit_stop_workers = NIL;
+	}
+
+	/*
+	 * This is common for commit and abort. For commit, above we have already
+	 * merged the current list into parent.
+	 */
+	if (subtrans_stop_workers != NULL &&
+		subtrans_stop_workers->nest_level == nestDepth -1)
+	{
+		SubTransOnCommitStopWorkers *temp;
+
+		/* Make the parent transaction list as the current on_commit_stop_workers. */
+		on_commit_stop_workers = subtrans_stop_workers->sub;
+
+		/* Pop out the stack element */
+		temp = subtrans_stop_workers->parent;
+		pfree(subtrans_stop_workers);
+		subtrans_stop_workers = temp;
+	}
 }
 
 /*
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 6d70ad7..e14b91e 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -25,5 +25,6 @@ extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
 
 extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
 extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
+extern void AtEOXact_Subscription(void);
 
 #endif							/* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index ef02512..aa02041 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -23,6 +23,8 @@ extern void ApplyLauncherShmemInit(void);
 
 extern void ApplyLauncherWakeupAtCommit(void);
 extern bool XactManipulatesLogicalReplicationWorkers(void);
+extern void AtSubStart_ApplyLauncher(void);
+extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 1ce3b6b..1da6d6d 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -75,7 +75,7 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 						 Oid userid, Oid relid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
-extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
+extern void logicalrep_insert_stop_workers(Oid subid, List *relids);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
#5Amit Khandekar
amitdkhan.pg@gmail.com
In reply to: Amit Khandekar (#4)
1 attachment(s)
Re: AtEOXact_ApplyLauncher() and subtransactions

On 16 June 2018 at 00:03, Amit Khandekar <amitdkhan.pg@gmail.com> wrote:

The way I am implementing this can be seen in attached
apply_launcher_subtrans_WIP.patch. (check launcher.c changes). I
haven't started testing it yet though.

Attached patch passes some basic testing I did. Will do some more
testing, and some self-review and code organising, if required. I will
also split the patch into two : one containing the main issue
regarding subtransaction, and the other containing the other issue I
mentioned earlier that shows up without subtransaction as well.

--
Thanks,
-Amit Khandekar
EnterpriseDB Corporation
The Postgres Database Company

Attachments:

apply_launcher_subtrans.patchapplication/octet-stream; name=apply_launcher_subtrans.patchDownload
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8e6aef3..25544d6 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -33,6 +33,7 @@
 #include "catalog/namespace.h"
 #include "catalog/storage.h"
 #include "commands/async.h"
+#include "commands/subscriptioncmds.h"
 #include "commands/tablecmds.h"
 #include "commands/trigger.h"
 #include "executor/spi.h"
@@ -2128,6 +2129,7 @@ CommitTransaction(void)
 	AtEOXact_HashTables(true);
 	AtEOXact_PgStat(true);
 	AtEOXact_Snapshot(true, false);
+	AtEOXact_Subscription();
 	AtEOXact_ApplyLauncher(true);
 	pgstat_report_xact_timestamp(0);
 
@@ -2607,6 +2609,7 @@ AbortTransaction(void)
 		AtEOXact_ComboCid();
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false);
+		AtEOXact_Subscription();
 		AtEOXact_ApplyLauncher(false);
 		pgstat_report_xact_timestamp(0);
 	}
@@ -4534,6 +4537,7 @@ StartSubTransaction(void)
 	AtSubStart_ResourceOwner();
 	AtSubStart_Notify();
 	AfterTriggerBeginSubXact();
+	AtSubStart_ApplyLauncher();
 
 	s->state = TRANS_INPROGRESS;
 
@@ -4637,6 +4641,7 @@ CommitSubTransaction(void)
 	AtEOSubXact_HashTables(true, s->nestingLevel);
 	AtEOSubXact_PgStat(true, s->nestingLevel);
 	AtSubCommit_Snapshot(s->nestingLevel);
+	AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
 
 	/*
 	 * We need to restore the upper transaction's read-only state, in case the
@@ -4790,6 +4795,7 @@ AbortSubTransaction(void)
 		AtEOSubXact_HashTables(false, s->nestingLevel);
 		AtEOSubXact_PgStat(false, s->nestingLevel);
 		AtSubAbort_Snapshot(s->nestingLevel);
+		AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
 	}
 
 	/*
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f138e61..dcbe803 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -51,7 +51,29 @@
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 
+
+/*
+ * List of subscriptions, each containing the relations for that subscription.
+ * Each element has the relids for a given subscription that were present at
+ * the last COMMIT. For a subid, there exists an entry in this list only when
+ * the subscription relations are altered. Once the transaction ends, this list
+ * is again set back to NIL. This is done so that during commit, we know
+ * exactly which workers to stop: the relations for the last altered
+ * subscription should be compared with the relations for the last committed
+ * subscription changes.
+ */
+static List *committed_subrel_list = NIL;
+
+typedef struct SubscriptionRels
+{
+	Oid			subid;
+	int			numrels;
+	Oid		   *relids;
+} SubscriptionRels;
+
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static SubscriptionRels *get_subrels(Oid sub_oid,
+									 SubscriptionRels **commited_subrels);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -504,9 +526,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 {
 	char	   *err;
 	List	   *pubrel_names;
-	List	   *subrel_states;
-	Oid		   *subrel_local_oids;
+	SubscriptionRels *subrels;
+	SubscriptionRels *committed_subrels;
 	Oid		   *pubrel_local_oids;
+	List	   *stop_relids = NIL;
 	ListCell   *lc;
 	int			off;
 
@@ -525,24 +548,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 	/* We are done with the remote side, close connection. */
 	walrcv_disconnect(wrconn);
 
-	/* Get local table list. */
-	subrel_states = GetSubscriptionRelations(sub->oid);
-
-	/*
-	 * Build qsorted array of local table oids for faster lookup. This can
-	 * potentially contain all tables in the database so speed of lookup is
-	 * important.
-	 */
-	subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
-	off = 0;
-	foreach(lc, subrel_states)
-	{
-		SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
-
-		subrel_local_oids[off++] = relstate->relid;
-	}
-	qsort(subrel_local_oids, list_length(subrel_states),
-		  sizeof(Oid), oid_cmp);
+	subrels = get_subrels(sub->oid, &committed_subrels);
 
 	/*
 	 * Walk over the remote tables and try to match them to locally known
@@ -566,8 +572,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
 		pubrel_local_oids[off++] = relid;
 
-		if (!bsearch(&relid, subrel_local_oids,
-					 list_length(subrel_states), sizeof(Oid), oid_cmp))
+		if (!bsearch(&relid, subrels->relids,
+					 subrels->numrels, sizeof(Oid), oid_cmp))
 		{
 			AddSubscriptionRelState(sub->oid, relid,
 									copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
@@ -585,16 +591,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 	qsort(pubrel_local_oids, list_length(pubrel_names),
 		  sizeof(Oid), oid_cmp);
 
-	for (off = 0; off < list_length(subrel_states); off++)
+	for (off = 0; off < subrels->numrels; off++)
 	{
-		Oid			relid = subrel_local_oids[off];
+		Oid			relid = subrels->relids[off];
 
 		if (!bsearch(&relid, pubrel_local_oids,
 					 list_length(pubrel_names), sizeof(Oid), oid_cmp))
 		{
 			RemoveSubscriptionRel(sub->oid, relid);
 
-			logicalrep_worker_stop_at_commit(sub->oid, relid);
+			/* If these are the committed subrels, build the "stop" list right away */
+			if (subrels == committed_subrels)
+				stop_relids = lappend_oid(stop_relids, relid);
 
 			ereport(DEBUG1,
 					(errmsg("table \"%s.%s\" removed from subscription \"%s\"",
@@ -603,6 +611,24 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 							sub->name)));
 		}
 	}
+
+	/*
+	 * Now derive the workers to be stopped using the committed reloids. At
+	 * commit time, we will terminate them.
+	 */
+	if (committed_subrels != NULL && subrels != committed_subrels)
+	{
+		for (off = 0; off < committed_subrels->numrels; off++)
+		{
+			Oid			relid = committed_subrels->relids[off];
+
+			if (!bsearch(&relid, pubrel_local_oids,
+						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
+				stop_relids = lappend_oid(stop_relids, relid);
+		}
+	}
+
+	logicalrep_insert_stop_workers(sub->oid, stop_relids);
 }
 
 /*
@@ -1172,3 +1198,88 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 
 	return tablelist;
 }
+
+/*
+ * Get the relation oids for a given subscription. Also update committed_subrels
+ * parameter with the rel oids for the relations that were present in the last
+ * committed change in the subscription.
+ */
+static SubscriptionRels *
+get_subrels(Oid sub_oid, SubscriptionRels **commited_subrels)
+{
+	ListCell   *lc;
+	int			off;
+	List	   *subrel_states;
+	Oid		   *subrel_local_oids;
+	SubscriptionRels *subrels = NULL;
+	MemoryContext old_context = CurrentMemoryContext;
+	bool		found_committed_subrel = false;
+
+	*commited_subrels = NULL;
+
+	/* Ger the committed subrels for the given subscription */
+	foreach(lc, committed_subrel_list)
+	{
+		SubscriptionRels *subrel = (SubscriptionRels *) lfirst(lc);
+
+		if (sub_oid == subrel->subid)
+		{
+			found_committed_subrel = true;
+			break;
+		}
+	}
+
+	subrel_states = GetSubscriptionRelations(sub_oid);
+
+	/*
+	 * If we are creating this list for the first time in this transaction, we
+	 * need to maintain this list until transaction end.
+	 */
+	if (!found_committed_subrel)
+		old_context = MemoryContextSwitchTo(TopTransactionContext);
+
+	/*
+	 * Build qsorted array of local table oids for faster lookup. This can
+	 * potentially contain all tables in the database so speed of lookup is
+	 * important.
+	 */
+	subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+	off = 0;
+	foreach(lc, subrel_states)
+	{
+		SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+
+		subrel_local_oids[off++] = relstate->relid;
+	}
+	qsort(subrel_local_oids, list_length(subrel_states), sizeof(Oid), oid_cmp);
+
+	subrels = palloc(sizeof(SubscriptionRels));
+	subrels->subid = sub_oid;
+	subrels->numrels = list_length(subrel_states);
+	subrels->relids = subrel_local_oids;
+
+	/*
+	 * If there isn't already a subrel for this subscription saved, save this
+	 * one into the committed_subrel_list. And also pass this as the committed
+	 * rel oids.
+	 */
+	if (!found_committed_subrel)
+	{
+		committed_subrel_list = lappend(committed_subrel_list, subrels);
+		*commited_subrels = subrels;
+	}
+
+	MemoryContextSwitchTo(old_context);
+
+	return subrels;
+}
+
+void
+AtEOXact_Subscription(void)
+{
+	/*
+	 * No need to pfree the list. In fact, it must have been already
+	 * freed because it was allocated in TopTransactionContext.
+	 */
+	committed_subrel_list = NIL;
+}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 6ef333b..dc95c19 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -76,11 +76,31 @@ LogicalRepCtxStruct *LogicalRepCtx;
 typedef struct LogicalRepWorkerId
 {
 	Oid			subid;
-	Oid			relid;
+	List	   *relids;
 } LogicalRepWorkerId;
 
+typedef struct SubTransOnCommitStopWorkers
+{
+	struct SubTransOnCommitStopWorkers *parent; /* This might not be an
+												 * immediate parent */
+	int			nest_level;
+
+	/* List of subscriptions for current subtransaction nest level */
+	List	   *sub;
+} SubTransOnCommitStopWorkers;
+
+/*
+ * List of LogicalRepWorkerId elements. This list belongs to current
+ * subtransaction level
+ */
 static List *on_commit_stop_workers = NIL;
 
+/*
+ * Stack of subscription lists. Each stack element belongs to one particular
+ * subtransaction.
+ */
+static SubTransOnCommitStopWorkers *subtrans_stop_workers = NULL;
+
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
@@ -553,25 +573,42 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 	LWLockRelease(LogicalRepWorkerLock);
 }
 
-/*
- * Request worker for specified sub/rel to be stopped on commit.
- */
 void
-logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+logicalrep_insert_stop_workers(Oid subid, List *relids)
 {
-	LogicalRepWorkerId *wid;
-	MemoryContext oldctx;
+	ListCell   *lc;
 
-	/* Make sure we store the info in context that survives until commit. */
-	oldctx = MemoryContextSwitchTo(TopTransactionContext);
+	foreach(lc, on_commit_stop_workers)
+	{
+		LogicalRepWorkerId *wid = lfirst(lc);
+		if (wid->subid == subid)
+			break;
+	}
+
+	/* Didn't find a sub ? Insert a new one */
+	if (lc == NULL)
+	{
+		MemoryContext oldctx;
+		LogicalRepWorkerId *wid;
 
-	wid = palloc(sizeof(LogicalRepWorkerId));
-	wid->subid = subid;
-	wid->relid = relid;
+		/* Make sure we store the info in context that survives until commit. */
+		oldctx = MemoryContextSwitchTo(TopTransactionContext);
 
-	on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+		wid = palloc(sizeof(LogicalRepWorkerId));
+		wid->subid = subid;
+		wid->relids = list_copy(relids); /* TODO: Avoid the copy. */
+		on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+
+		MemoryContextSwitchTo(oldctx);
+	}
+	else
+	{
+		/* Replace the existing reloids with the new set */
+		LogicalRepWorkerId *wid = lfirst(lc);
+		list_free(wid->relids);
+		wid->relids = relids;
+	}
 
-	MemoryContextSwitchTo(oldctx);
 }
 
 /*
@@ -827,20 +864,54 @@ XactManipulatesLogicalReplicationWorkers(void)
 }
 
 /*
+ * AtSubStart_ApplyLauncher() --- Take care of subtransaction start.
+ *
+ * Push the current on_commit_stop_workers into the stack.
+ */
+void
+AtSubStart_ApplyLauncher(void)
+{
+
+	if (on_commit_stop_workers != NIL)
+	{
+		SubTransOnCommitStopWorkers *temp;
+		MemoryContext old_cxt;
+
+		/* Keep the stack elements in TopTransactionContext for simplicity */
+		old_cxt = MemoryContextSwitchTo(TopTransactionContext);
+
+		temp = palloc(sizeof(SubTransOnCommitStopWorkers));
+		temp->parent = subtrans_stop_workers;
+		temp->nest_level = GetCurrentTransactionNestLevel() - 1;
+		temp->sub = on_commit_stop_workers;
+		subtrans_stop_workers = temp;
+
+		on_commit_stop_workers = NIL;
+
+		MemoryContextSwitchTo(old_cxt);
+	}
+}
+
+
+/*
  * Wakeup the launcher on commit if requested.
  */
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
+	Assert(subtrans_stop_workers == NULL);
+
 	if (isCommit)
 	{
-		ListCell   *lc;
+		ListCell   *wlc;
 
-		foreach(lc, on_commit_stop_workers)
+		foreach(wlc, on_commit_stop_workers)
 		{
-			LogicalRepWorkerId *wid = lfirst(lc);
+			LogicalRepWorkerId *wid = lfirst(wlc);
+			ListCell   *rlc;
 
-			logicalrep_worker_stop(wid->subid, wid->relid);
+			foreach(rlc, wid->relids)
+				logicalrep_worker_stop(wid->subid, lfirst_oid(rlc));
 		}
 
 		if (on_commit_launcher_wakeup)
@@ -853,6 +924,116 @@ AtEOXact_ApplyLauncher(bool isCommit)
 	 */
 	on_commit_stop_workers = NIL;
 	on_commit_launcher_wakeup = false;
+	subtrans_stop_workers = NULL;
+}
+
+/*
+ * On commit, merge the on_commit_stop_workers list into the immediate parent,
+ * if present.
+ * On rollback, discard the on_commit_stop_workers list.
+ * Pop out the immediate parent stack element, and assign it's workers list
+ * to the on_commit_stop_workers list.
+ */
+void
+AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
+{
+
+	if (isCommit)
+	{
+		MemoryContext oldctx;
+		ListCell   *lc;
+
+		/* Make sure we store the info in context that survives until commit. */
+		oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+		/*
+		 * If the upper level is present, and it is not an immediate
+		 * parent subtransaction, we don't have to do anything; the current
+		 * on_commit_stop_workers will be regarded as belonging to the
+		 * immediate parent sub-transaction. But if the upper level is an
+		 * immediate parent subtransaction, we need to merge the current
+		 * on_commit_stop_workers list into the immediate parent, make this
+		 * merged list as the current on_commit_stop_workers list.
+		 */
+		if (subtrans_stop_workers != NULL &&
+			subtrans_stop_workers->nest_level == nestDepth -1)
+		{
+			List	*temp_list = NIL;
+
+			/*
+			 * Merge the current list into the immediate parent.
+			 * So say, parent has sub1(tab1, tab2), sub2(tab2, tab3),
+			 * and current on_commit_workers has sub2(tab4) and sub3(tab1),
+			 * then the merged list will have :
+			 * sub1(tab1, tab2), sub2(tab4), sub3(tab1)
+			 */
+			foreach(lc, on_commit_stop_workers)
+			{
+				LogicalRepWorkerId *wid = lfirst(lc);
+				ListCell *lc1;
+
+				/* Search this subrel into the subrels of the top stack element */
+				foreach(lc1, subtrans_stop_workers->sub)
+				{
+					LogicalRepWorkerId *wid1 = lfirst(lc1);
+
+					if (wid->subid == wid1->subid)
+						break;
+				}
+
+				if (lc1 == NULL)
+				{
+					/*
+					 * Didn't find a subscription in the stack element. So
+					 * insert it.
+					 */
+					temp_list = lappend(temp_list, wid);
+				}
+				else
+				{
+					/*
+					 * Replace the earlier subrels of this subscription with
+					 * the new subrels.
+					 */
+					LogicalRepWorkerId *wid1 = lfirst(lc1);
+
+					list_free(wid1->relids);
+					pfree(wid1);
+					lfirst(lc1) = wid;
+				}
+
+			}
+			/* Add the new subscriptions that were not present in outer level */
+			subtrans_stop_workers->sub =
+				list_concat(subtrans_stop_workers->sub, temp_list);
+		}
+
+		MemoryContextSwitchTo(oldctx);
+	}
+	else
+	{
+		/* Abandon the current subtransaction workers list. */
+		list_free(on_commit_stop_workers);
+		on_commit_stop_workers = NIL;
+	}
+
+	/*
+	 * This is common for commit and abort. For commit, above we have already
+	 * merged the current list into parent.
+	 */
+	if (subtrans_stop_workers != NULL &&
+		subtrans_stop_workers->nest_level == nestDepth -1)
+	{
+		SubTransOnCommitStopWorkers *temp;
+
+		/* Make the parent transaction list as the current on_commit_stop_workers. */
+		on_commit_stop_workers = subtrans_stop_workers->sub;
+
+		/* Pop out the stack element */
+		temp = subtrans_stop_workers->parent;
+		pfree(subtrans_stop_workers);
+		subtrans_stop_workers = temp;
+	}
 }
 
 /*
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 6d70ad7..e14b91e 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -25,5 +25,6 @@ extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
 
 extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
 extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
+extern void AtEOXact_Subscription(void);
 
 #endif							/* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index ef02512..aa02041 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -23,6 +23,8 @@ extern void ApplyLauncherShmemInit(void);
 
 extern void ApplyLauncherWakeupAtCommit(void);
 extern bool XactManipulatesLogicalReplicationWorkers(void);
+extern void AtSubStart_ApplyLauncher(void);
+extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 1ce3b6b..1da6d6d 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -75,7 +75,7 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 						 Oid userid, Oid relid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
-extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
+extern void logicalrep_insert_stop_workers(Oid subid, List *relids);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
#6Amit Khandekar
amitdkhan.pg@gmail.com
In reply to: Amit Khandekar (#5)
1 attachment(s)
Re: AtEOXact_ApplyLauncher() and subtransactions

On 18 June 2018 at 15:02, Amit Khandekar <amitdkhan.pg@gmail.com> wrote:

On 16 June 2018 at 00:03, Amit Khandekar <amitdkhan.pg@gmail.com> wrote:

The way I am implementing this can be seen in attached
apply_launcher_subtrans_WIP.patch. (check launcher.c changes). I
haven't started testing it yet though.

Attached patch passes some basic testing I did. Will do some more
testing, and some self-review and code organising, if required.

Done. Attached is v2 version of the patch. Comments welcome.

Changed GetSubscriptionRelations() to GetSubscriptionRelids(), which
now returns only the oids, not the subrel states. This was convenient
for storing the exact returned list into the committed subscription
rels. And anyways the subrel states were not used anywhere.

I will also split the patch into two : one containing the main issue
regarding subtransaction, and the other containing the other issue I
mentioned earlier that shows up without subtransaction as well.

Did not split the patch. The changes for the other issue that shows up
without subtransaction are all in subscriptioncmds.c , and that file
contains mostly the changes for this issue. So kept it as a single
patch. But if it gets inconvenient for someone while reviewing, I will
be happy to split it.

--
Thanks,
-Amit Khandekar
EnterpriseDB Corporation
The Postgres Database Company

Attachments:

apply_launcher_subtrans_v2.patchapplication/octet-stream; name=apply_launcher_subtrans_v2.patchDownload
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8e6aef3..25544d6 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -33,6 +33,7 @@
 #include "catalog/namespace.h"
 #include "catalog/storage.h"
 #include "commands/async.h"
+#include "commands/subscriptioncmds.h"
 #include "commands/tablecmds.h"
 #include "commands/trigger.h"
 #include "executor/spi.h"
@@ -2128,6 +2129,7 @@ CommitTransaction(void)
 	AtEOXact_HashTables(true);
 	AtEOXact_PgStat(true);
 	AtEOXact_Snapshot(true, false);
+	AtEOXact_Subscription();
 	AtEOXact_ApplyLauncher(true);
 	pgstat_report_xact_timestamp(0);
 
@@ -2607,6 +2609,7 @@ AbortTransaction(void)
 		AtEOXact_ComboCid();
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false);
+		AtEOXact_Subscription();
 		AtEOXact_ApplyLauncher(false);
 		pgstat_report_xact_timestamp(0);
 	}
@@ -4534,6 +4537,7 @@ StartSubTransaction(void)
 	AtSubStart_ResourceOwner();
 	AtSubStart_Notify();
 	AfterTriggerBeginSubXact();
+	AtSubStart_ApplyLauncher();
 
 	s->state = TRANS_INPROGRESS;
 
@@ -4637,6 +4641,7 @@ CommitSubTransaction(void)
 	AtEOSubXact_HashTables(true, s->nestingLevel);
 	AtEOSubXact_PgStat(true, s->nestingLevel);
 	AtSubCommit_Snapshot(s->nestingLevel);
+	AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
 
 	/*
 	 * We need to restore the upper transaction's read-only state, in case the
@@ -4790,6 +4795,7 @@ AbortSubTransaction(void)
 		AtEOSubXact_HashTables(false, s->nestingLevel);
 		AtEOSubXact_PgStat(false, s->nestingLevel);
 		AtSubAbort_Snapshot(s->nestingLevel);
+		AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
 	}
 
 	/*
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 8705d8b..0b1212a 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -429,12 +429,12 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 
 
 /*
- * Get all relations for subscription.
+ * Get reloids of all relations for subscription.
  *
- * Returned list is palloc'ed in current memory context.
+ * Returned list is palloc'ed in the specified 'memcxt'
  */
 List *
-GetSubscriptionRelations(Oid subid)
+GetSubscriptionRelids(Oid subid, MemoryContext memcxt)
 {
 	List	   *res = NIL;
 	Relation	rel;
@@ -442,6 +442,7 @@ GetSubscriptionRelations(Oid subid)
 	int			nkeys = 0;
 	ScanKeyData skey[2];
 	SysScanDesc scan;
+	MemoryContext old_context;
 
 	rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
 
@@ -453,20 +454,15 @@ GetSubscriptionRelations(Oid subid)
 	scan = systable_beginscan(rel, InvalidOid, false,
 							  NULL, nkeys, skey);
 
+	old_context = MemoryContextSwitchTo(memcxt);
 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
 	{
 		Form_pg_subscription_rel subrel;
-		SubscriptionRelState *relstate;
 
 		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
-
-		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
-		relstate->relid = subrel->srrelid;
-		relstate->state = subrel->srsubstate;
-		relstate->lsn = subrel->srsublsn;
-
-		res = lappend(res, relstate);
+		res = lappend_oid(res, subrel->srrelid);
 	}
+	MemoryContextSwitchTo(old_context);
 
 	/* Cleanup */
 	systable_endscan(scan);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f138e61..a4eee2e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -51,6 +51,19 @@
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 
+
+/*
+ * List of subscriptions, each containing the relations for that subscription.
+ * Each element has the relids for a given subscription that were updated on
+ * the last COMMIT. For a subid, there exists an entry in this list only when
+ * the subscription relations are altered. Once the transaction ends, this list
+ * is again set back to NIL. This is done so that during commit, we know
+ * exactly which workers to stop: the relations for the last altered
+ * subscription should be compared with the relations for the last committed
+ * subscription changes.
+ */
+static List *committed_subrels_list = NIL;
+
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
 
 /*
@@ -504,9 +517,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 {
 	char	   *err;
 	List	   *pubrel_names;
-	List	   *subrel_states;
+	List	   *subrelids;
+	SubscriptionRels *committed_subrels = NULL;
 	Oid		   *subrel_local_oids;
 	Oid		   *pubrel_local_oids;
+	List	   *stop_relids = NIL;
 	ListCell   *lc;
 	int			off;
 
@@ -525,24 +540,55 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 	/* We are done with the remote side, close connection. */
 	walrcv_disconnect(wrconn);
 
-	/* Get local table list. */
-	subrel_states = GetSubscriptionRelations(sub->oid);
+	/* Get the committed subrels for the given subscription */
+	foreach(lc, committed_subrels_list)
+	{
+		SubscriptionRels *subrels = (SubscriptionRels *) lfirst(lc);
+
+		if (sub->oid == subrels->subid)
+		{
+			committed_subrels = subrels;
+			break;
+		}
+	}
+
+	/*
+	 * Get local table list. If we are creating the committed subrel list for
+	 * the first time for this subscription in this transaction, we need to
+	 * maintain this subrel list until transaction end.
+	 */
+	subrelids = GetSubscriptionRelids(sub->oid,
+						committed_subrels ?
+						CurrentMemoryContext : TopTransactionContext);
+
+	if (!committed_subrels)
+	{
+		/*
+		 * We don't have a committed subrel for this subscription. Create one
+		 * in TopMemoryContext.
+		 */
+		SubscriptionRels *subrels;
+		MemoryContext old_context;
+
+		old_context = MemoryContextSwitchTo(TopTransactionContext);
+		subrels = palloc(sizeof(SubscriptionRels));
+		subrels->subid = sub->oid;
+		/* The subrelids are already allocated in TopTransactionContext */
+		subrels->relids = subrelids;
+		committed_subrels_list = lappend(committed_subrels_list, subrels);
+		MemoryContextSwitchTo(old_context);
+	}
 
 	/*
 	 * Build qsorted array of local table oids for faster lookup. This can
 	 * potentially contain all tables in the database so speed of lookup is
 	 * important.
 	 */
-	subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+	subrel_local_oids = palloc(list_length(subrelids) * sizeof(Oid));
 	off = 0;
-	foreach(lc, subrel_states)
-	{
-		SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
-
-		subrel_local_oids[off++] = relstate->relid;
-	}
-	qsort(subrel_local_oids, list_length(subrel_states),
-		  sizeof(Oid), oid_cmp);
+	foreach(lc, subrelids)
+		subrel_local_oids[off++] = lfirst_oid(lc);
+	qsort(subrel_local_oids, list_length(subrelids), sizeof(Oid), oid_cmp);
 
 	/*
 	 * Walk over the remote tables and try to match them to locally known
@@ -567,7 +613,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		pubrel_local_oids[off++] = relid;
 
 		if (!bsearch(&relid, subrel_local_oids,
-					 list_length(subrel_states), sizeof(Oid), oid_cmp))
+					 list_length(subrelids), sizeof(Oid), oid_cmp))
 		{
 			AddSubscriptionRelState(sub->oid, relid,
 									copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
@@ -585,7 +631,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 	qsort(pubrel_local_oids, list_length(pubrel_names),
 		  sizeof(Oid), oid_cmp);
 
-	for (off = 0; off < list_length(subrel_states); off++)
+	for (off = 0; off < list_length(subrelids); off++)
 	{
 		Oid			relid = subrel_local_oids[off];
 
@@ -594,7 +640,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		{
 			RemoveSubscriptionRel(sub->oid, relid);
 
-			logicalrep_worker_stop_at_commit(sub->oid, relid);
+			/*
+			 * If committed subrels are not allocated, the subrel_local_oids
+			 * already contain the committed subrels, so use them to derive the
+			 * workers to be stopped.
+			 */
+			if (!committed_subrels)
+				stop_relids = lappend_oid(stop_relids, relid);
 
 			ereport(DEBUG1,
 					(errmsg("table \"%s.%s\" removed from subscription \"%s\"",
@@ -603,6 +655,24 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 							sub->name)));
 		}
 	}
+
+	/*
+	 * Now derive the workers to be stopped using the committed reloids. At
+	 * commit time, we will terminate them.
+	 */
+	if (committed_subrels)
+	{
+		foreach(lc, committed_subrels->relids)
+		{
+			Oid			relid = lfirst_oid(lc);
+
+			if (!bsearch(&relid, pubrel_local_oids,
+						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
+				stop_relids = lappend_oid(stop_relids, relid);
+		}
+	}
+
+	logicalrep_insert_stop_workers(sub->oid, stop_relids);
 }
 
 /*
@@ -1172,3 +1242,17 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 
 	return tablelist;
 }
+
+/*
+ * Cleanup function for objects maintained during the transaction by
+ * subscription-refresh operation.
+ */
+void
+AtEOXact_Subscription(void)
+{
+	/*
+	 * No need to pfree the list. In fact, it must have been already
+	 * freed because it was allocated in TopTransactionContext.
+	 */
+	committed_subrels_list = NIL;
+}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 6ef333b..252114b 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -73,13 +73,28 @@ typedef struct LogicalRepCtxStruct
 
 LogicalRepCtxStruct *LogicalRepCtx;
 
-typedef struct LogicalRepWorkerId
+/*
+ * List of SubscriptionRels. This list represents the subscription relations
+ * for which the synchronization workers are to be stopped. Each subtransaction
+ * has its own list. Once a subtransaction starts, this list is pushed into
+ * the 'subtrans_stop_workers' stack.
+ */
+static List *on_commit_stop_workers = NIL;
+
+typedef struct SubTransOnCommitStopWorkers
 {
-	Oid			subid;
-	Oid			relid;
-} LogicalRepWorkerId;
+	struct SubTransOnCommitStopWorkers *parent; /* This might not be an
+												 * immediate parent */
+	int			nest_level;		/* Sub-transaction nest level */
+	List	   *stop_workers;	/* on_commit_stop_workers for this
+								 * subtransaction */
+} SubTransOnCommitStopWorkers;
 
-static List *on_commit_stop_workers = NIL;
+/*
+ * Stack of on_commit_stop_workers. Each stack element belongs to one
+ * particular subtransaction.
+ */
+static SubTransOnCommitStopWorkers *subtrans_stop_workers = NULL;
 
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
@@ -554,22 +569,44 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 }
 
 /*
- * Request worker for specified sub/rel to be stopped on commit.
+ * Request workers for the specified relids of a subscription to be stopped on
+ * commit. This replaces the earlier saved reloids of a given subscription.
  */
 void
-logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+logicalrep_insert_stop_workers(Oid subid, List *relids)
 {
-	LogicalRepWorkerId *wid;
+	ListCell   *lc;
 	MemoryContext oldctx;
 
-	/* Make sure we store the info in context that survives until commit. */
+	foreach(lc, on_commit_stop_workers)
+	{
+		SubscriptionRels *subrels = lfirst(lc);
+		if (subrels->subid == subid)
+			break;
+	}
+
+	/* Make sure we store the info in a context that survives until commit. */
 	oldctx = MemoryContextSwitchTo(TopTransactionContext);
 
-	wid = palloc(sizeof(LogicalRepWorkerId));
-	wid->subid = subid;
-	wid->relid = relid;
+	/* Didn't find a sub ? Insert a new one */
+	if (lc == NULL)
+	{
+		SubscriptionRels *subrels;
+
+
+		subrels = palloc(sizeof(SubscriptionRels));
+		subrels->subid = subid;
+		subrels->relids = list_copy(relids);
+		on_commit_stop_workers = lappend(on_commit_stop_workers, subrels);
 
-	on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+	}
+	else
+	{
+		/* Replace the existing reloids with the new set */
+		SubscriptionRels *subrels = lfirst(lc);
+		list_free(subrels->relids);
+		subrels->relids = list_copy(relids);
+	}
 
 	MemoryContextSwitchTo(oldctx);
 }
@@ -827,20 +864,54 @@ XactManipulatesLogicalReplicationWorkers(void)
 }
 
 /*
+ * AtSubStart_ApplyLauncher() --- Take care of subtransaction start.
+ *
+ * Push the current on_commit_stop_workers into the stack.
+ */
+void
+AtSubStart_ApplyLauncher(void)
+{
+
+	if (on_commit_stop_workers != NIL)
+	{
+		SubTransOnCommitStopWorkers *temp;
+		MemoryContext old_cxt;
+
+		/* Keep the stack elements in TopTransactionContext for simplicity */
+		old_cxt = MemoryContextSwitchTo(TopTransactionContext);
+
+		temp = palloc(sizeof(SubTransOnCommitStopWorkers));
+		temp->parent = subtrans_stop_workers;
+		temp->nest_level = GetCurrentTransactionNestLevel() - 1;
+		temp->stop_workers = on_commit_stop_workers;
+		subtrans_stop_workers = temp;
+
+		on_commit_stop_workers = NIL;
+
+		MemoryContextSwitchTo(old_cxt);
+	}
+}
+
+
+/*
  * Wakeup the launcher on commit if requested.
  */
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
+	Assert(subtrans_stop_workers == NULL);
+
 	if (isCommit)
 	{
-		ListCell   *lc;
+		ListCell   *wlc;
 
-		foreach(lc, on_commit_stop_workers)
+		foreach(wlc, on_commit_stop_workers)
 		{
-			LogicalRepWorkerId *wid = lfirst(lc);
+			SubscriptionRels *subrels = lfirst(wlc);
+			ListCell   *rlc;
 
-			logicalrep_worker_stop(wid->subid, wid->relid);
+			foreach(rlc, subrels->relids)
+				logicalrep_worker_stop(subrels->subid, lfirst_oid(rlc));
 		}
 
 		if (on_commit_launcher_wakeup)
@@ -852,10 +923,122 @@ AtEOXact_ApplyLauncher(bool isCommit)
 	 * transaction memory context, which is going to be cleaned soon.
 	 */
 	on_commit_stop_workers = NIL;
+	subtrans_stop_workers = NULL;
 	on_commit_launcher_wakeup = false;
 }
 
 /*
+ * On commit, merge the on_commit_stop_workers list into the immediate parent,
+ * if present.
+ * On rollback, discard the on_commit_stop_workers list.
+ * Pop out the immediate parent stack element, and assign it's workers list
+ * to the on_commit_stop_workers list.
+ */
+void
+AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
+{
+
+	if (isCommit)
+	{
+		MemoryContext oldctx;
+		ListCell   *lc;
+
+		/*
+		 * Make sure we store the info in a context that survives until commit.
+		 */
+		oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+		/*
+		 * If the upper level is present, and it is not an immediate
+		 * parent subtransaction, we don't have to do anything; the current
+		 * on_commit_stop_workers will be regarded as belonging to the
+		 * immediate parent sub-transaction. But if the upper level is an
+		 * immediate parent subtransaction, we need to merge the current
+		 * on_commit_stop_workers list into the immediate parent, make this
+		 * merged list as the current on_commit_stop_workers list.
+		 */
+		if (subtrans_stop_workers != NULL &&
+			subtrans_stop_workers->nest_level == nestDepth -1)
+		{
+			List	*temp_list = NIL;
+
+			/*
+			 * Merge the current list into the immediate parent.
+			 * So say, parent has sub1(tab1, tab2), sub2(tab2, tab3),
+			 * and current on_commit_workers has sub2(tab4) and sub3(tab1),
+			 * then the merged list will have :
+			 * sub1(tab1, tab2), sub2(tab4), sub3(tab1)
+			 */
+			foreach(lc, on_commit_stop_workers)
+			{
+				SubscriptionRels *subrels = lfirst(lc);
+				ListCell *lc1;
+
+				/* Search this subrel in the subrels of the top of stack. */
+				foreach(lc1, subtrans_stop_workers->stop_workers)
+				{
+					SubscriptionRels *stack_subrels = lfirst(lc1);
+
+					if (subrels->subid == stack_subrels->subid)
+						break;
+				}
+
+				if (lc1 == NULL)
+				{
+					/*
+					 * Didn't find a subscription in the stack element. So
+					 * insert it.
+					 */
+					temp_list = lappend(temp_list, subrels);
+				}
+				else
+				{
+					/*
+					 * Replace the earlier subrels of this subscription with
+					 * the new subrels.
+					 */
+					SubscriptionRels *stack_subrels = lfirst(lc1);
+
+					list_free(stack_subrels->relids);
+					pfree(stack_subrels);
+					lfirst(lc1) = subrels;
+				}
+
+			}
+			/* Add the new subscriptions that were not present in outer level */
+			subtrans_stop_workers->stop_workers =
+				list_concat(subtrans_stop_workers->stop_workers, temp_list);
+		}
+
+		MemoryContextSwitchTo(oldctx);
+	}
+	else
+	{
+		/* Abandon the current subtransaction workers list. */
+		list_free(on_commit_stop_workers);
+		on_commit_stop_workers = NIL;
+	}
+
+	/*
+	 * This is common for commit and abort. For commit, above we have already
+	 * merged the current list into parent.
+	 */
+	if (subtrans_stop_workers != NULL &&
+		subtrans_stop_workers->nest_level == nestDepth -1)
+	{
+		SubTransOnCommitStopWorkers *temp;
+
+		/* Make the parent transaction list as the current stop_workers. */
+		on_commit_stop_workers = subtrans_stop_workers->stop_workers;
+
+		/* Pop out the stack element */
+		temp = subtrans_stop_workers->parent;
+		pfree(subtrans_stop_workers);
+		subtrans_stop_workers = temp;
+	}
+}
+
+/*
  * Request wakeup of the launcher on commit of the transaction.
  *
  * This is used to send launcher signal to stop sleeping and process the
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 556cb94..c27930b 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -67,6 +67,12 @@ typedef struct SubscriptionRelState
 	char		state;
 } SubscriptionRelState;
 
+typedef struct SubscriptionRels
+{
+	Oid			subid;
+	List	   *relids; /* Subset of relations in the subscription. */
+} SubscriptionRels;
+
 extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
 						XLogRecPtr sublsn);
 extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
@@ -75,7 +81,7 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid,
 						XLogRecPtr *sublsn, bool missing_ok);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
-extern List *GetSubscriptionRelations(Oid subid);
+extern List *GetSubscriptionRelids(Oid subid, MemoryContext memcxt);
 extern List *GetSubscriptionNotReadyRelations(Oid subid);
 
 #endif							/* PG_SUBSCRIPTION_REL_H */
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 6d70ad7..e14b91e 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -25,5 +25,6 @@ extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
 
 extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
 extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
+extern void AtEOXact_Subscription(void);
 
 #endif							/* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index ef02512..aa02041 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -23,6 +23,8 @@ extern void ApplyLauncherShmemInit(void);
 
 extern void ApplyLauncherWakeupAtCommit(void);
 extern bool XactManipulatesLogicalReplicationWorkers(void);
+extern void AtSubStart_ApplyLauncher(void);
+extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 1ce3b6b..1da6d6d 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -75,7 +75,7 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 						 Oid userid, Oid relid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
-extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
+extern void logicalrep_insert_stop_workers(Oid subid, List *relids);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
#7Amit Khandekar
amitdkhan.pg@gmail.com
In reply to: Amit Khandekar (#6)
Re: AtEOXact_ApplyLauncher() and subtransactions

Added this into the July 2018 commitfest :

https://commitfest.postgresql.org/18/1696/

On 20 June 2018 at 22:22, Amit Khandekar <amitdkhan.pg@gmail.com> wrote:

On 18 June 2018 at 15:02, Amit Khandekar <amitdkhan.pg@gmail.com> wrote:

On 16 June 2018 at 00:03, Amit Khandekar <amitdkhan.pg@gmail.com> wrote:

The way I am implementing this can be seen in attached
apply_launcher_subtrans_WIP.patch. (check launcher.c changes). I
haven't started testing it yet though.

Attached patch passes some basic testing I did. Will do some more
testing, and some self-review and code organising, if required.

Done. Attached is v2 version of the patch. Comments welcome.

Changed GetSubscriptionRelations() to GetSubscriptionRelids(), which
now returns only the oids, not the subrel states. This was convenient
for storing the exact returned list into the committed subscription
rels. And anyways the subrel states were not used anywhere.

I will also split the patch into two : one containing the main issue
regarding subtransaction, and the other containing the other issue I
mentioned earlier that shows up without subtransaction as well.

Did not split the patch. The changes for the other issue that shows up
without subtransaction are all in subscriptioncmds.c , and that file
contains mostly the changes for this issue. So kept it as a single
patch. But if it gets inconvenient for someone while reviewing, I will
be happy to split it.

--
Thanks,
-Amit Khandekar
EnterpriseDB Corporation
The Postgres Database Company

--
Thanks,
-Amit Khandekar
EnterpriseDB Corporation
The Postgres Database Company

#8Robert Haas
robertmhaas@gmail.com
In reply to: Amit Khandekar (#7)
1 attachment(s)
Re: AtEOXact_ApplyLauncher() and subtransactions

On Tue, Jun 26, 2018 at 6:25 AM, Amit Khandekar <amitdkhan.pg@gmail.com> wrote:

Added this into the July 2018 commitfest :

https://commitfest.postgresql.org/18/1696/

It seems to me that it would probably be better to separate this into
two patches, because I think there are really two separate issues.
With regard to the lack of proper subtransaction handling, I think it
would be best if we could avoid introducing a new AtSubStart function.
I wrote a patch for this issue that works that uses a slightly
different kind of stack than yours, which I have attached to this
email, and it creates stack levels lazily so that it doesn't need an
AtSubStart function. It would probably also be possible to adapt your
patch to create new stack levels on demand instead of at
subtransaction start. I'm not sure which approach is better, but I do
think it would be best not to use your patch as you have it now,
because that does unnecessary work at the beginning and end of every
subtransaction if there is an ALTER SUBSCRIPTION command pending at an
outer level, even though the subtransaction may never touch the
subscription state.

As for the other part of your fix, which I think basically boils down
to comparing the final states instead of just looking at what got
changed, the logic looks complicated and I don't think I fully
understand it, but here are a few comments.

+       subrelids = GetSubscriptionRelids(sub->oid,
+                                               committed_subrels ?
+                                               CurrentMemoryContext :
TopTransactionContext);

This looks ugly and dangerous. In the first place, if
GetSubscriptionRelids() needs to work in one of several memory
contexts, the best thing would probably be for the caller to be
responsible for saving and restoring the memory context as needed,
rather than passing it as an argument. Secondly, it's not very clear
why we need to do this. The comment says we have to do it, but it
doesn't give a reason.

+                        * Merge the current list into the immediate parent.
+                        * So say, parent has sub1(tab1, tab2),
sub2(tab2, tab3),
+                        * and current on_commit_workers has
sub2(tab4) and sub3(tab1),
+                        * then the merged list will have :
+                        * sub1(tab1, tab2), sub2(tab4), sub3(tab1)

I don't think this is very clear. Also, why is that the correct
answer? Why not sub2(tab2, tab3, tab4)?

+                       foreach(lc, on_commit_stop_workers)
+                       {
+                               SubscriptionRels *subrels = lfirst(lc);
+                               ListCell *lc1;
+
+                               /* Search this subrel in the subrels
of the top of stack. */
+                               foreach(lc1,
subtrans_stop_workers->stop_workers)

This might be very expensive if both lists are long. I guess that's
probably not very likely. You would need to have modify a lot of
subscriptions and then, within a subtransaction, modify a lot of
subscriptions again.

+       foreach(lc, committed_subrels_list)
+       {
+               SubscriptionRels *subrels = (SubscriptionRels *) lfirst(lc);
+
+               if (sub->oid == subrels->subid)
+               {
+                       committed_subrels = subrels;
+                       break;
+               }
+       }

This looks to be O(n^2) in the number of subscriptions modified in a
single transaction.

Like Peter, I'm not entirely sure how much we should worry about this
problem. However, if we're going to worry about it, I wonder if we
should worry harder and try to come up with a better data structure
than a bunch of linked lists. Maybe a hash table indexed by subid?

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

Attachments:

apply-launcher-subtrans-rmh.patchapplication/octet-stream; name=apply-launcher-subtrans-rmh.patchDownload
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8e6aef332c..7879b12af1 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -4637,6 +4637,7 @@ CommitSubTransaction(void)
 	AtEOSubXact_HashTables(true, s->nestingLevel);
 	AtEOSubXact_PgStat(true, s->nestingLevel);
 	AtSubCommit_Snapshot(s->nestingLevel);
+	AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
 
 	/*
 	 * We need to restore the upper transaction's read-only state, in case the
@@ -4790,6 +4791,7 @@ AbortSubTransaction(void)
 		AtEOSubXact_HashTables(false, s->nestingLevel);
 		AtEOSubXact_PgStat(false, s->nestingLevel);
 		AtSubAbort_Snapshot(s->nestingLevel);
+		AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
 	}
 
 	/*
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 6ef333b725..e3c24fcfae 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -79,7 +79,17 @@ typedef struct LogicalRepWorkerId
 	Oid			relid;
 } LogicalRepWorkerId;
 
-static List *on_commit_stop_workers = NIL;
+typedef struct StopWorkersData
+{
+	int			nestDepth;
+	List	   *workers;
+	struct StopWorkersData *next;
+}			StopWorkersData;
+
+static StopWorkersData on_commit_stop_workers =
+{
+	1, NIL, NULL
+};
 
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
@@ -559,17 +569,46 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 void
 logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
 {
+	int			nestDepth = GetCurrentTransactionNestLevel();
 	LogicalRepWorkerId *wid;
 	MemoryContext oldctx;
 
 	/* Make sure we store the info in context that survives until commit. */
 	oldctx = MemoryContextSwitchTo(TopTransactionContext);
 
+	/* Allocate new tracking object. */
 	wid = palloc(sizeof(LogicalRepWorkerId));
 	wid->subid = subid;
 	wid->relid = relid;
 
-	on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+	/* Check that previous transactions were properly cleaned up. */
+	Assert(nestDepth >= on_commit_stop_workers.nestDepth);
+
+	/* Now add the new object to the existing list, or create a new one. */
+	if (nestDepth == on_commit_stop_workers.nestDepth ||
+		on_commit_stop_workers.workers == NIL)
+	{
+		/*
+		 * Either we're adding another entries at the same transaction nesting
+		 * depth as before, or there are no entries with any other nesting
+		 * depth.
+		 */
+		on_commit_stop_workers.nestDepth = nestDepth;
+		on_commit_stop_workers.workers =
+			lappend(on_commit_stop_workers.workers, wid);
+	}
+	else
+	{
+		/*
+		 * There are entries at multiple nesting depths; don't mix them
+		 * together.
+		 */
+		StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
+
+		memcpy(newdata, &on_commit_stop_workers, sizeof(StopWorkersData));
+		on_commit_stop_workers.workers = list_make1(wid);
+		on_commit_stop_workers.next = newdata;
+	}
 
 	MemoryContextSwitchTo(oldctx);
 }
@@ -823,7 +862,8 @@ ApplyLauncherShmemInit(void)
 bool
 XactManipulatesLogicalReplicationWorkers(void)
 {
-	return (on_commit_stop_workers != NIL);
+	return on_commit_stop_workers.workers != NIL ||
+		on_commit_stop_workers.next != NULL;
 }
 
 /*
@@ -832,11 +872,14 @@ XactManipulatesLogicalReplicationWorkers(void)
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
+	Assert(on_commit_stop_workers.nestDepth == 1);
+	Assert(on_commit_stop_workers.next == NULL);
+
 	if (isCommit)
 	{
 		ListCell   *lc;
 
-		foreach(lc, on_commit_stop_workers)
+		foreach(lc, on_commit_stop_workers.workers)
 		{
 			LogicalRepWorkerId *wid = lfirst(lc);
 
@@ -848,13 +891,62 @@ AtEOXact_ApplyLauncher(bool isCommit)
 	}
 
 	/*
-	 * No need to pfree on_commit_stop_workers.  It was allocated in
+	 * No need to pfree on_commit_stop_workers.workers. It was allocated in
 	 * transaction memory context, which is going to be cleaned soon.
 	 */
-	on_commit_stop_workers = NIL;
+	on_commit_stop_workers.workers = NIL;
 	on_commit_launcher_wakeup = false;
 }
 
+void
+AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
+{
+	StopWorkersData *old = on_commit_stop_workers.next;
+
+	/* Exit immediately if there's no work to do at this level. */
+	Assert(on_commit_stop_workers.nestDepth <= nestDepth);
+	if (nestDepth != on_commit_stop_workers.nestDepth)
+		return;
+
+	/*
+	 * If we're aborting, forget about everything that was done at this
+	 * nesting level.  Explicitly free memory to avoid a transaction-lifespan
+	 * leak.
+	 */
+	if (!isCommit)
+	{
+		list_free_deep(on_commit_stop_workers.workers);
+		on_commit_stop_workers.workers = NIL;
+	}
+
+	/*
+	 * If there are no pending stop-worker requests at outer nesting levels,
+	 * we can just decrement the notional nesting depth without doing any real
+	 * work.
+	 *
+	 * We can also handle it this way if there are separate lists that are
+	 * separated by more than 1 nesting level: they can't yet be merged,
+	 * because there might be a rollback at an interventing level.
+	 */
+	if (old == NULL || (old->nestDepth < nestDepth - 1 &&
+						on_commit_stop_workers.workers != NIL))
+	{
+		on_commit_stop_workers.nestDepth--;
+		return;
+	}
+
+	/*
+	 * We need to pop the stack.  Any remaining entries at this level should
+	 * be merged with those from the next level.
+	 */
+	on_commit_stop_workers.workers =
+		list_concat(on_commit_stop_workers.workers,
+					on_commit_stop_workers.next->workers);
+	on_commit_stop_workers.nestDepth = old->nestDepth;
+	on_commit_stop_workers.next = old->next;
+	pfree(old);
+}
+
 /*
  * Request wakeup of the launcher on commit of the transaction.
  *
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index ef02512412..76c28c655b 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -23,6 +23,7 @@ extern void ApplyLauncherShmemInit(void);
 
 extern void ApplyLauncherWakeupAtCommit(void);
 extern bool XactManipulatesLogicalReplicationWorkers(void);
+extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
#9Amit Khandekar
amitdkhan.pg@gmail.com
In reply to: Robert Haas (#8)
Re: AtEOXact_ApplyLauncher() and subtransactions

On 4 July 2018 at 00:27, Robert Haas <robertmhaas@gmail.com> wrote:

On Tue, Jun 26, 2018 at 6:25 AM, Amit Khandekar <amitdkhan.pg@gmail.com> wrote:

Added this into the July 2018 commitfest :

https://commitfest.postgresql.org/18/1696/

It seems to me that it would probably be better to separate this into
two patches, because I think there are really two separate issues.

Ok, will do that.

With regard to the lack of proper subtransaction handling, I think it
would be best if we could avoid introducing a new AtSubStart function.
I wrote a patch for this issue that works that uses a slightly
different kind of stack than yours, which I have attached to this
email, and it creates stack levels lazily so that it doesn't need an
AtSubStart function.

Yes, I agree that if we can avoid this function, that would be good.
Couldn't find a proper way to do this. Will have a look at your patch.

As for the other part of your fix, which I think basically boils down
to comparing the final states instead of just looking at what got
changed, the logic looks complicated and I don't think I fully
understand it.

Once I split the patch, let me try to add up some comments to make it clearer.

but here are a few comments.

+       subrelids = GetSubscriptionRelids(sub->oid,
+                                               committed_subrels ?
+                                               CurrentMemoryContext :
TopTransactionContext);

This looks ugly and dangerous. In the first place, if
GetSubscriptionRelids() needs to work in one of several memory
contexts, the best thing would probably be for the caller to be
responsible for saving and restoring the memory context as needed,
rather than passing it as an argument.

I wanted to make the context switch only around the code which
allocates the list. I understand that all the other code in
GetSubscriptionRelids() such as heap_open() , systable_beginscan() etc
would not be affected if we run that in TopTransactionContext, because
that all gets freed in the cleanup functions at the bottom of the
function, but still I thought this way would be more robust for
future, in case there is some new code that allocates some temporary
memory which is not freed.

Secondly, it's not very clear
why we need to do this. The comment says we have to do it, but it
doesn't give a reason.

When committed_subrels is NULL, it means this is the first time we are
running the ALTER command since the main transaction started, and we
want to store the committed subrels list in the transaction context,
so that the subsequent ALTER commands compare with this list. May be
when I split the patch, I will come up with an example.

+                        * Merge the current list into the immediate parent.
+                        * So say, parent has sub1(tab1, tab2),
sub2(tab2, tab3),
+                        * and current on_commit_workers has
sub2(tab4) and sub3(tab1),
+                        * then the merged list will have :
+                        * sub1(tab1, tab2), sub2(tab4), sub3(tab1)

I don't think this is very clear. Also, why is that the correct
answer? Why not sub2(tab2, tab3, tab4)?

Parent sub-transaction has sub2(tab2, tab3), meaning that when the
subtransaction sub2 commits (and when the COMMIT happens), we should
stop the worker for (subscription sub2, table tab2) , and the worker
for (subscription sub2, table tab3).

And the current sub-transaction has modified the subscription sub2
such that the workers to be stopped are sub2(tab4). Note that this the
final list of workers to be stopped, regardless of the earlier ALTER
SUBSCRIPTION commands having run in the upper subtransaction. So this
list should completely replace the parent's sub2(tab2, tab3). We
always compare the subscription tables with the ones that are
committed, not with the ones that were last altered in the same
transaction. Hence the other patch.

+                       foreach(lc, on_commit_stop_workers)
+                       {
+                               SubscriptionRels *subrels = lfirst(lc);
+                               ListCell *lc1;
+
+                               /* Search this subrel in the subrels
of the top of stack. */
+                               foreach(lc1,
subtrans_stop_workers->stop_workers)

This might be very expensive if both lists are long. I guess that's
probably not very likely. You would need to have modify a lot of
subscriptions and then, within a subtransaction, modify a lot of
subscriptions again.

Yes that's what I also think.

+       foreach(lc, committed_subrels_list)
+       {
+               SubscriptionRels *subrels = (SubscriptionRels *) lfirst(lc);
+
+               if (sub->oid == subrels->subid)
+               {
+                       committed_subrels = subrels;
+                       break;
+               }
+       }

This looks to be O(n^2) in the number of subscriptions modified in a
single transaction.

Like Peter, I'm not entirely sure how much we should worry about this
problem.

At the upper sections, I have explained it a bit. Basically, without
this second patch, the main patch won't work if we have the same
subscription altered more than once in the sub-transaction stack.

However, if we're going to worry about it, I wonder if we
should worry harder and try to come up with a better data structure
than a bunch of linked lists. Maybe a hash table indexed by subid?

I had thought about using hash to search a subscription id, but then
thought in practice, there would not be too many subscriptions in the
same transaction. Hence continued to use the existing linked list data
structure; just that instead of a single linked list having
everything, I thought let's keep a list of tables belonging to the
same subscription in one linked list. The reason for this was that it
is easy to abandon or merge a subscription table list when a
sub-transaction rolls back or commits.

--
Thanks,
-Amit Khandekar
EnterpriseDB Corporation
The Postgres Database Company

#10Amit Khandekar
amitdkhan.pg@gmail.com
In reply to: Amit Khandekar (#9)
2 attachment(s)
Re: AtEOXact_ApplyLauncher() and subtransactions

On Thu, 5 Jul 2018 at 3:37 PM, Amit Khandekar <amitdkhan.pg@gmail.com> wrote:

On 4 July 2018 at 00:27, Robert Haas <robertmhaas@gmail.com> wrote:

On Tue, Jun 26, 2018 at 6:25 AM, Amit Khandekar <amitdkhan.pg@gmail.com> wrote:

Added this into the July 2018 commitfest :

https://commitfest.postgresql.org/18/1696/

It seems to me that it would probably be better to separate this into
two patches, because I think there are really two separate issues.

Ok, will do that.

With regard to the lack of proper subtransaction handling, I think it
would be best if we could avoid introducing a new AtSubStart function.
I wrote a patch for this issue that works that uses a slightly
different kind of stack than yours, which I have attached to this
email, and it creates stack levels lazily so that it doesn't need an
AtSubStart function.

Yes, I agree that if we can avoid this function, that would be good.
Couldn't find a proper way to do this. Will have a look at your patch.

I have split it into two patches.

0001 patch contains the main fix. In this patch I have used some
naming conventions and some comments that you used in your patch,
plus, I used your method of lazily allocating new stack level. The
stack is initially Null.

The fix for the other issue is in 0002 patch. Having separate rel oids
list for each subids is essential only for this issue. So the changes
for using this structure are in this patch, not the 0001 one. As you
suggested, I have kept the subids in hash table as against linked
list.

As for the other part of your fix, which I think basically boils down
to comparing the final states instead of just looking at what got
changed, the logic looks complicated and I don't think I fully
understand it.

Once I split the patch, let me try to add up some comments to make it clearer.

When a subscription is altered for the *first* time in a transaction,
an entry is created for that sub, in committed_subrels_table hash
table. That entry represents a cached list of tables belonging to that
subscription since the last committed change. For each ALTER
SUBSCRIPTION command, if we create the stop workers by comparing with
this cached list, we have the final list of stop-workers if committed
in this state. So if there are two ALTER commands for the same
subscription, the second one replaces the earlier stop-worker list by
its own list.

I have added some more comments in the below snippet as shown. Hope
that this helps :

@@ -594,7 +619,16 @@ AlterSubscription_refresh(Subscription *sub, bool
copy_data)
{
RemoveSubscriptionRel(sub->oid, relid);

- logicalrep_worker_stop_at_commit(sub->oid, relid);
+ /*
+ * If we found an entry in committed_subrels for this subid, that
+ * means subrelids represents a modified version of the
+ * committed_subrels_entry->relids. If we didn't find an entry, it
+ * means this is the first time we are altering the sub, so they
+ * both have the same committed list; so in that case, we avoid
+ * another iteration below, and create the stop workers here itself.
+ */
+ if (!sub_found)
+     stop_relids = lappend_oid(stop_relids, relid);

ereport(DEBUG1,
(errmsg("table \"%s.%s\" removed from subscription \"%s\"",

Attachments:

0001-Handle-subscriptions-workers-with-subtransactions.patchapplication/octet-stream; name=0001-Handle-subscriptions-workers-with-subtransactions.patchDownload
From 504ab6ec63c2c9667decfc4a000e80ff83cf0183 Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amit.khandekar@enterprisedb.com>
Date: Mon, 16 Jul 2018 11:31:23 +0530
Subject: [PATCH 1/2] Handle subscriptions workers with subtransactions.

AtEOXact_ApplyLauncher() did not handle the case where a subscription
REFRESH operation in a subtransaction could be aborted. The function
basically did not consider subtransaction.

Fix this by having a stack of stop worker lists, each one belonging
to corresponding subtransaction depth.
---
 src/backend/access/transam/xact.c          |   2 +
 src/backend/replication/logical/launcher.c | 115 +++++++++++++++++++++++++++--
 src/include/replication/logicallauncher.h  |   1 +
 3 files changed, 110 insertions(+), 8 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 1da1f13..9aa63c8 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -4637,6 +4637,7 @@ CommitSubTransaction(void)
 	AtEOSubXact_HashTables(true, s->nestingLevel);
 	AtEOSubXact_PgStat(true, s->nestingLevel);
 	AtSubCommit_Snapshot(s->nestingLevel);
+	AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
 
 	/*
 	 * We need to restore the upper transaction's read-only state, in case the
@@ -4790,6 +4791,7 @@ AbortSubTransaction(void)
 		AtEOSubXact_HashTables(false, s->nestingLevel);
 		AtEOSubXact_PgStat(false, s->nestingLevel);
 		AtSubAbort_Snapshot(s->nestingLevel);
+		AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
 	}
 
 	/*
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 6ef333b..32ab193 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId
 	Oid			relid;
 } LogicalRepWorkerId;
 
-static List *on_commit_stop_workers = NIL;
+typedef struct StopWorkersData
+{
+	int			nestDepth;				/* Sub-transaction nest level */
+	List	   *workers;				/* List of LogicalRepWorkerId */
+	struct StopWorkersData *parent;		/* This need not be an immediate
+										 * subtransaction parent */
+} StopWorkersData;
+
+/*
+ * Stack of StopWorkersData elements. Each stack element contains the workers
+ * to be stopped for that subtransaction.
+ */
+static StopWorkersData *on_commit_stop_workers = NULL;
 
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
@@ -559,17 +571,40 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 void
 logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
 {
+	int			nestDepth = GetCurrentTransactionNestLevel();
 	LogicalRepWorkerId *wid;
 	MemoryContext oldctx;
 
 	/* Make sure we store the info in context that survives until commit. */
 	oldctx = MemoryContextSwitchTo(TopTransactionContext);
 
+	/* Check that previous transactions were properly cleaned up. */
+	Assert(on_commit_stop_workers == NULL ||
+		   nestDepth >= on_commit_stop_workers->nestDepth);
+
+	/*
+	 * Push a new stack element if we don't already have one for the current
+	 * nestDepth.
+	 */
+	if (on_commit_stop_workers == NULL ||
+		nestDepth > on_commit_stop_workers->nestDepth)
+	{
+		StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
+		newdata->nestDepth = nestDepth;
+		newdata->workers = NIL;
+		newdata->parent = on_commit_stop_workers;
+		on_commit_stop_workers = newdata;
+	}
+
+	/*
+	 * Finally add a new worker into the worker list of the current
+	 * subtransaction.
+	 */
 	wid = palloc(sizeof(LogicalRepWorkerId));
 	wid->subid = subid;
 	wid->relid = relid;
-
-	on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+	on_commit_stop_workers->workers =
+		lappend(on_commit_stop_workers->workers, wid);
 
 	MemoryContextSwitchTo(oldctx);
 }
@@ -823,7 +858,7 @@ ApplyLauncherShmemInit(void)
 bool
 XactManipulatesLogicalReplicationWorkers(void)
 {
-	return (on_commit_stop_workers != NIL);
+	return (on_commit_stop_workers != NULL);
 }
 
 /*
@@ -832,15 +867,25 @@ XactManipulatesLogicalReplicationWorkers(void)
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
+
+	Assert(on_commit_stop_workers == NULL ||
+		   (on_commit_stop_workers->nestDepth == 1 &&
+			on_commit_stop_workers->parent == NULL));
+
 	if (isCommit)
 	{
 		ListCell   *lc;
 
-		foreach(lc, on_commit_stop_workers)
+		if (on_commit_stop_workers != NULL)
 		{
-			LogicalRepWorkerId *wid = lfirst(lc);
+			List	*workers = on_commit_stop_workers->workers;
 
-			logicalrep_worker_stop(wid->subid, wid->relid);
+			foreach(lc, workers)
+			{
+				LogicalRepWorkerId *wid = lfirst(lc);
+
+				logicalrep_worker_stop(wid->subid, wid->relid);
+			}
 		}
 
 		if (on_commit_launcher_wakeup)
@@ -851,11 +896,65 @@ AtEOXact_ApplyLauncher(bool isCommit)
 	 * No need to pfree on_commit_stop_workers.  It was allocated in
 	 * transaction memory context, which is going to be cleaned soon.
 	 */
-	on_commit_stop_workers = NIL;
+	on_commit_stop_workers = NULL;
 	on_commit_launcher_wakeup = false;
 }
 
 /*
+ * On commit, merge the current on_commit_stop_workers list into the
+ * immediate parent, if present.
+ * On rollback, discard the current on_commit_stop_workers list.
+ * Pop out the stack.
+ */
+void
+AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
+{
+	StopWorkersData *parent;
+
+	/* Exit immediately if there's no work to do at this level. */
+	if (on_commit_stop_workers == NULL ||
+		on_commit_stop_workers->nestDepth < nestDepth)
+		return;
+
+	Assert(on_commit_stop_workers->nestDepth == nestDepth);
+
+	parent = on_commit_stop_workers->parent;
+
+	if (isCommit)
+	{
+		/*
+		 * If the upper stack element is not an immediate parent
+		 * subtransaction, just decrement the notional nesting depth without
+		 * doing any real work.  Else, we need to merge the current workers
+		 * list into the parent.
+		 */
+		if (!parent || parent->nestDepth < nestDepth - 1)
+		{
+			on_commit_stop_workers->nestDepth--;
+			return;
+		}
+
+		parent->workers =
+			list_concat(parent->workers, on_commit_stop_workers->workers);
+	}
+	else
+	{
+		/*
+		 * Abandon everything that was done at this nesting level.  Explicitly
+		 * free memory to avoid a transaction-lifespan leak.
+		 */
+		list_free_deep(on_commit_stop_workers->workers);
+	}
+
+	/*
+	 * We have taken care of the current subtransaction workers list for both
+	 * abort or commit. So we are ready to pop the stack.
+	 */
+	pfree(on_commit_stop_workers);
+	on_commit_stop_workers = parent;
+}
+
+/*
  * Request wakeup of the launcher on commit of the transaction.
  *
  * This is used to send launcher signal to stop sleeping and process the
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index ef02512..9f840b7 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -24,6 +24,7 @@ extern void ApplyLauncherShmemInit(void);
 extern void ApplyLauncherWakeupAtCommit(void);
 extern bool XactManipulatesLogicalReplicationWorkers(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
+extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
 
 extern bool IsLogicalLauncher(void);
 
-- 
2.1.4

0002-Fix-issue-with-subscriptions-when-altered-twice-in-s.patchapplication/octet-stream; name=0002-Fix-issue-with-subscriptions-when-altered-twice-in-s.patchDownload
From ea17efb14d856995f4e7c3e87b78631c7cb13ced Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amit.khandekar@enterprisedb.com>
Date: Mon, 16 Jul 2018 12:00:31 +0530
Subject: [PATCH 2/2] Fix issue with subscriptions when altered twice in same
 transaction.

We keep track of workers to be stopped by adding them into an existing
list. But if a subscription is altered/refereshed twice, then this does
not work. For instance :

Suppose publication pubx is set for tables tx1 and and tx2.
Publication puby is for tables ty1 and ty2.
And publication pubz is for tables tx1, tx2, tx3.

Subscription mysub is initially set to synchronise tables tx1 and tx2 :
CREATE SUBSCRIPTION mysub ... PUBLICATION pubx;
And then it is altered like this :
begin;
ALTER SUBSCRIPTION mysub set publication puby;
ALTER SUBSCRIPTION mysub set publication pubz;
commit;

Here, at the 2nd ALTER command, while deriving the relations that no
more belong to mysub, we consider the pg_subscription_rel that was
updated by the 1st ALTER command. This just leads to addition of more
workers to the workers derived for the 1st ALTER command. Effectively,
at commit above, the stop-workers would contain tx1, tx2, ty1, ty2;
when it should contain nothing, because before the transaction, mysub
contained tx1 and tx2, and all of those are also present in mysyb at
transaction end since pubz contains those tables.

Fix this, by keeping track of what tables were present at the
transaction start.
---
 src/backend/access/transam/xact.c          |  3 +
 src/backend/catalog/pg_subscription.c      | 43 +++++++++----
 src/backend/commands/subscriptioncmds.c    | 96 +++++++++++++++++++++++++-----
 src/backend/replication/logical/launcher.c | 82 +++++++++++++++----------
 src/include/catalog/pg_subscription_rel.h  |  9 +++
 src/include/commands/subscriptioncmds.h    |  1 +
 src/include/replication/worker_internal.h  |  2 +-
 7 files changed, 179 insertions(+), 57 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 9aa63c8..aefdba2 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -33,6 +33,7 @@
 #include "catalog/namespace.h"
 #include "catalog/storage.h"
 #include "commands/async.h"
+#include "commands/subscriptioncmds.h"
 #include "commands/tablecmds.h"
 #include "commands/trigger.h"
 #include "executor/spi.h"
@@ -2128,6 +2129,7 @@ CommitTransaction(void)
 	AtEOXact_HashTables(true);
 	AtEOXact_PgStat(true);
 	AtEOXact_Snapshot(true, false);
+	AtEOXact_Subscription();
 	AtEOXact_ApplyLauncher(true);
 	pgstat_report_xact_timestamp(0);
 
@@ -2607,6 +2609,7 @@ AbortTransaction(void)
 		AtEOXact_ComboCid();
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false);
+		AtEOXact_Subscription();
 		AtEOXact_ApplyLauncher(false);
 		pgstat_report_xact_timestamp(0);
 	}
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 8705d8b..3d4695a 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -33,6 +33,7 @@
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
+#include "utils/memutils.h"
 #include "utils/pg_lsn.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
@@ -429,12 +430,12 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 
 
 /*
- * Get all relations for subscription.
+ * Get reloids of all relations for subscription.
  *
- * Returned list is palloc'ed in current memory context.
+ * Returned list is palloc'ed in the specified 'memcxt'
  */
 List *
-GetSubscriptionRelations(Oid subid)
+GetSubscriptionRelids(Oid subid, MemoryContext memcxt)
 {
 	List	   *res = NIL;
 	Relation	rel;
@@ -442,6 +443,7 @@ GetSubscriptionRelations(Oid subid)
 	int			nkeys = 0;
 	ScanKeyData skey[2];
 	SysScanDesc scan;
+	MemoryContext old_context;
 
 	rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
 
@@ -453,20 +455,15 @@ GetSubscriptionRelations(Oid subid)
 	scan = systable_beginscan(rel, InvalidOid, false,
 							  NULL, nkeys, skey);
 
+	old_context = MemoryContextSwitchTo(memcxt);
 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
 	{
 		Form_pg_subscription_rel subrel;
-		SubscriptionRelState *relstate;
 
 		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
-
-		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
-		relstate->relid = subrel->srrelid;
-		relstate->state = subrel->srsubstate;
-		relstate->lsn = subrel->srsublsn;
-
-		res = lappend(res, relstate);
+		res = lappend_oid(res, subrel->srrelid);
 	}
+	MemoryContextSwitchTo(old_context);
 
 	/* Cleanup */
 	systable_endscan(scan);
@@ -526,3 +523,27 @@ GetSubscriptionNotReadyRelations(Oid subid)
 
 	return res;
 }
+
+/*
+ * Create a hash table, hashed by subid. Each entry will contain a subset of
+ * relations belonging to the given subscription.
+ */
+HTAB *
+CreateSubscriptionRelHash(void)
+{
+	HASHCTL		ctl;
+
+	MemSet(&ctl, 0, sizeof(ctl));
+	ctl.keysize = sizeof(Oid);
+	ctl.entrysize = sizeof(SubscriptionRelEntry);
+
+	/*
+	 * All current users require the allocations to be valid until transaction
+	 * end.
+	 */
+	ctl.hcxt = TopTransactionContext;
+
+	return hash_create("SubscriptionRels",
+					   2,		/* typically, this will be small */
+					   &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f138e61..e377ff0 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -51,6 +51,18 @@
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 
+
+/*
+ * Hash table of subscriptions.  Each entry has the relids for a given
+ * subscription that were updated on the last COMMIT.  For a subid, there
+ * exists an entry in this hash table only when the subscription relations are
+ * altered.  Once the transaction ends, the hash table is destroyed and reset
+ * to NIL.  This is done so that during commit, we know exactly which workers
+ * to stop: the relations for the last altered subscription should be compared
+ * with the relations for the last committed subscription changes.
+ */
+static HTAB *committed_subrels_table = NULL;
+
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
 
 /*
@@ -504,9 +516,12 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 {
 	char	   *err;
 	List	   *pubrel_names;
-	List	   *subrel_states;
+	List	   *subrelids;
+	SubscriptionRelEntry *committed_subrels_entry;
+	bool		sub_found;
 	Oid		   *subrel_local_oids;
 	Oid		   *pubrel_local_oids;
+	List	   *stop_relids = NIL;
 	ListCell   *lc;
 	int			off;
 
@@ -525,24 +540,34 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 	/* We are done with the remote side, close connection. */
 	walrcv_disconnect(wrconn);
 
-	/* Get local table list. */
-	subrel_states = GetSubscriptionRelations(sub->oid);
+	/* Get the committed subrels for the given subscription */
+	if (committed_subrels_table == NULL)
+		committed_subrels_table = CreateSubscriptionRelHash();
+	committed_subrels_entry =
+		(SubscriptionRelEntry *) hash_search(committed_subrels_table,
+										 &sub->oid, HASH_ENTER, &sub_found);
+
+	/*
+	 * Get local table list. If we are creating the committed subrel list for
+	 * the first time for this subscription in this transaction, add them into
+	 * the committed_subrels_table, and also make sure the list is maintained
+	 * until transaction end.
+	 */
+	subrelids = GetSubscriptionRelids(sub->oid,
+					sub_found ?  CurrentMemoryContext : TopTransactionContext);
+	if (!sub_found)
+		committed_subrels_entry->relids = subrelids;
 
 	/*
 	 * Build qsorted array of local table oids for faster lookup. This can
 	 * potentially contain all tables in the database so speed of lookup is
 	 * important.
 	 */
-	subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+	subrel_local_oids = palloc(list_length(subrelids) * sizeof(Oid));
 	off = 0;
-	foreach(lc, subrel_states)
-	{
-		SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
-
-		subrel_local_oids[off++] = relstate->relid;
-	}
-	qsort(subrel_local_oids, list_length(subrel_states),
-		  sizeof(Oid), oid_cmp);
+	foreach(lc, subrelids)
+		subrel_local_oids[off++] = lfirst_oid(lc);
+	qsort(subrel_local_oids, list_length(subrelids), sizeof(Oid), oid_cmp);
 
 	/*
 	 * Walk over the remote tables and try to match them to locally known
@@ -567,7 +592,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		pubrel_local_oids[off++] = relid;
 
 		if (!bsearch(&relid, subrel_local_oids,
-					 list_length(subrel_states), sizeof(Oid), oid_cmp))
+					 list_length(subrelids), sizeof(Oid), oid_cmp))
 		{
 			AddSubscriptionRelState(sub->oid, relid,
 									copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
@@ -585,7 +610,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 	qsort(pubrel_local_oids, list_length(pubrel_names),
 		  sizeof(Oid), oid_cmp);
 
-	for (off = 0; off < list_length(subrel_states); off++)
+	for (off = 0; off < list_length(subrelids); off++)
 	{
 		Oid			relid = subrel_local_oids[off];
 
@@ -594,7 +619,16 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		{
 			RemoveSubscriptionRel(sub->oid, relid);
 
-			logicalrep_worker_stop_at_commit(sub->oid, relid);
+			/*
+			 * If we found an entry in committed_subrels for this subid, that
+			 * means subrelids represents a modified version of the
+			 * committed_subrels_entry->relids. If we didn't find an entry, it
+			 * means this is the first time we are altering the sub, so they
+			 * both have the same committed list; so in that case, we avoid
+			 * another iteration below, and create the stop workers here itself.
+			 */
+			if (!sub_found)
+				stop_relids = lappend_oid(stop_relids, relid);
 
 			ereport(DEBUG1,
 					(errmsg("table \"%s.%s\" removed from subscription \"%s\"",
@@ -603,6 +637,24 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 							sub->name)));
 		}
 	}
+
+	/*
+	 * Now derive the workers to be stopped using the committed reloids. At
+	 * commit time, we will terminate them.
+	 */
+	if (sub_found)
+	{
+		foreach(lc, committed_subrels_entry->relids)
+		{
+			Oid			relid = lfirst_oid(lc);
+
+			if (!bsearch(&relid, pubrel_local_oids,
+						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
+				stop_relids = lappend_oid(stop_relids, relid);
+		}
+	}
+
+	logicalrep_worker_stop_at_commit(sub->oid, stop_relids);
 }
 
 /*
@@ -1172,3 +1224,17 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 
 	return tablelist;
 }
+
+/*
+ * Cleanup function for objects maintained during the transaction by
+ * subscription-refresh operation.
+ */
+void
+AtEOXact_Subscription(void)
+{
+	/*
+	 * The hash table must have already been freed because it was allocated in
+	 * TopTransactionContext.
+	 */
+	committed_subrels_table = NULL;
+}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 32ab193..e3059d6 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -73,16 +73,10 @@ typedef struct LogicalRepCtxStruct
 
 LogicalRepCtxStruct *LogicalRepCtx;
 
-typedef struct LogicalRepWorkerId
-{
-	Oid			subid;
-	Oid			relid;
-} LogicalRepWorkerId;
-
 typedef struct StopWorkersData
 {
 	int			nestDepth;				/* Sub-transaction nest level */
-	List	   *workers;				/* List of LogicalRepWorkerId */
+	HTAB	   *workers;		/* Workers to be stopped. Hashed by subid */
 	struct StopWorkersData *parent;		/* This need not be an immediate
 										 * subtransaction parent */
 } StopWorkersData;
@@ -566,14 +560,16 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 }
 
 /*
- * Request worker for specified sub/rel to be stopped on commit.
+ * Request workers for the specified relids of a subscription to be stopped on
+ * commit. This replaces the earlier saved reloids of a given subscription.
  */
 void
-logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+logicalrep_worker_stop_at_commit(Oid subid, List *relids)
 {
 	int			nestDepth = GetCurrentTransactionNestLevel();
-	LogicalRepWorkerId *wid;
 	MemoryContext oldctx;
+	SubscriptionRelEntry *subrel_entry;
+	bool		sub_found;
 
 	/* Make sure we store the info in context that survives until commit. */
 	oldctx = MemoryContextSwitchTo(TopTransactionContext);
@@ -591,20 +587,22 @@ logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
 	{
 		StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
 		newdata->nestDepth = nestDepth;
-		newdata->workers = NIL;
+		newdata->workers = CreateSubscriptionRelHash();
 		newdata->parent = on_commit_stop_workers;
 		on_commit_stop_workers = newdata;
 	}
 
 	/*
-	 * Finally add a new worker into the worker list of the current
-	 * subtransaction.
+	 * If there's an existing entry, it means the same subscription was already
+	 * refreshed earlier in the current subtransaction. In that case, replace
+	 * the existing relations with the new ones.
 	 */
-	wid = palloc(sizeof(LogicalRepWorkerId));
-	wid->subid = subid;
-	wid->relid = relid;
-	on_commit_stop_workers->workers =
-		lappend(on_commit_stop_workers->workers, wid);
+	subrel_entry =
+		(SubscriptionRelEntry *) hash_search(on_commit_stop_workers->workers,
+											 &subid, HASH_ENTER, &sub_found);
+	if (sub_found)
+		list_free(subrel_entry->relids);
+	subrel_entry->relids = list_copy(relids);
 
 	MemoryContextSwitchTo(oldctx);
 }
@@ -874,17 +872,17 @@ AtEOXact_ApplyLauncher(bool isCommit)
 
 	if (isCommit)
 	{
-		ListCell   *lc;
-
 		if (on_commit_stop_workers != NULL)
 		{
-			List	*workers = on_commit_stop_workers->workers;
+			SubscriptionRelEntry *entry;
+			HASH_SEQ_STATUS hash_seq;
+			ListCell   *lc;
 
-			foreach(lc, workers)
+			hash_seq_init(&hash_seq, on_commit_stop_workers->workers);
+			while ((entry = (SubscriptionRelEntry *) hash_seq_search(&hash_seq)) != NULL)
 			{
-				LogicalRepWorkerId *wid = lfirst(lc);
-
-				logicalrep_worker_stop(wid->subid, wid->relid);
+				foreach(lc, entry->relids)
+					logicalrep_worker_stop(entry->subid, lfirst_oid(lc));
 			}
 		}
 
@@ -910,6 +908,10 @@ void
 AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
 {
 	StopWorkersData *parent;
+	HASH_SEQ_STATUS hash_seq;
+	HTAB *workers;
+	SubscriptionRelEntry *entry;
+
 
 	/* Exit immediately if there's no work to do at this level. */
 	if (on_commit_stop_workers == NULL ||
@@ -919,14 +921,14 @@ AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
 	Assert(on_commit_stop_workers->nestDepth == nestDepth);
 
 	parent = on_commit_stop_workers->parent;
+	workers = on_commit_stop_workers->workers;
 
 	if (isCommit)
 	{
 		/*
 		 * If the upper stack element is not an immediate parent
 		 * subtransaction, just decrement the notional nesting depth without
-		 * doing any real work.  Else, we need to merge the current workers
-		 * list into the parent.
+		 * doing any real work.
 		 */
 		if (!parent || parent->nestDepth < nestDepth - 1)
 		{
@@ -934,8 +936,24 @@ AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
 			return;
 		}
 
-		parent->workers =
-			list_concat(parent->workers, on_commit_stop_workers->workers);
+		/* Else, we need to merge the current workers list into the parent. */
+		hash_seq_init(&hash_seq, workers);
+		while ((entry = (SubscriptionRelEntry *) hash_seq_search(&hash_seq)) != NULL)
+		{
+			bool		sub_found;
+			SubscriptionRelEntry *parent_entry;
+
+			parent_entry =
+				(SubscriptionRelEntry *) hash_search(parent->workers,
+										 &entry->subid, HASH_ENTER, &sub_found);
+			/*
+			 * Replace the parent's workers with the current subtransaction's
+			 * workers.
+			 */
+			if (sub_found)
+				list_free(parent_entry->relids);
+			parent_entry->relids = entry->relids;
+		}
 	}
 	else
 	{
@@ -943,7 +961,11 @@ AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
 		 * Abandon everything that was done at this nesting level.  Explicitly
 		 * free memory to avoid a transaction-lifespan leak.
 		 */
-		list_free_deep(on_commit_stop_workers->workers);
+		hash_seq_init(&hash_seq, workers);
+		while ((entry = (SubscriptionRelEntry *) hash_seq_search(&hash_seq)) != NULL)
+			list_free(entry->relids);
+
+		hash_destroy(workers);
 	}
 
 	/*
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 556cb94..acc6905 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -23,6 +23,7 @@
 
 #include "access/xlogdefs.h"
 #include "nodes/pg_list.h"
+#include "utils/hsearch.h"
 
 /* ----------------
  *		pg_subscription_rel definition. cpp turns this into
@@ -67,6 +68,12 @@ typedef struct SubscriptionRelState
 	char		state;
 } SubscriptionRelState;
 
+typedef struct SubscriptionRelEntry
+{
+	Oid			subid;		/* hash key - must be first */
+	List	   *relids;		/* Subset of relations in the subscription. */
+} SubscriptionRelEntry;
+
 extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
 						XLogRecPtr sublsn);
 extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
@@ -76,6 +83,8 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid,
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
 extern List *GetSubscriptionRelations(Oid subid);
+extern List *GetSubscriptionRelids(Oid subid, MemoryContext memcxt);
 extern List *GetSubscriptionNotReadyRelations(Oid subid);
+extern HTAB *CreateSubscriptionRelHash(void);
 
 #endif							/* PG_SUBSCRIPTION_REL_H */
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 6d70ad7..e14b91e 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -25,5 +25,6 @@ extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
 
 extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
 extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
+extern void AtEOXact_Subscription(void);
 
 #endif							/* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 1ce3b6b..1c48a33 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -75,7 +75,7 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 						 Oid userid, Oid relid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
-extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
+extern void logicalrep_worker_stop_at_commit(Oid subid, List *relids);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
-- 
2.1.4

#11Robert Haas
robertmhaas@gmail.com
In reply to: Amit Khandekar (#10)
Re: AtEOXact_ApplyLauncher() and subtransactions

On Mon, Jul 16, 2018 at 2:36 AM, Amit Khandekar <amitdkhan.pg@gmail.com> wrote:

0001 patch contains the main fix. In this patch I have used some
naming conventions and some comments that you used in your patch,
plus, I used your method of lazily allocating new stack level. The
stack is initially Null.

Committed and back-patched to v11 and v10.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

#12Amit Khandekar
amitdkhan.pg@gmail.com
In reply to: Robert Haas (#11)
Re: AtEOXact_ApplyLauncher() and subtransactions

On 17 July 2018 at 03:29, Robert Haas <robertmhaas@gmail.com> wrote:

On Mon, Jul 16, 2018 at 2:36 AM, Amit Khandekar <amitdkhan.pg@gmail.com> wrote:

0001 patch contains the main fix. In this patch I have used some
naming conventions and some comments that you used in your patch,
plus, I used your method of lazily allocating new stack level. The
stack is initially Null.

Committed and back-patched to v11 and v10.

Thanks Robert.