Perform streaming logical transactions by background workers and parallel apply
In this email, I would like to discuss allowing streaming logical
transactions (large in-progress transactions) by background workers
and parallel apply in general. The goal of this work is to improve the
performance of the apply work in logical replication.
Currently, for large transactions, the publisher sends the data in
multiple streams (changes divided into chunks depending upon
logical_decoding_work_mem), and then on the subscriber-side, the apply
worker writes the changes into temporary files and once it receives
the commit, it read from the file and apply the entire transaction. To
improve the performance of such transactions, we can instead allow
them to be applied via background workers. There could be multiple
ways to achieve this:
Approach-1: Assign a new bgworker (if available) as soon as the xact's
first stream came and the main apply worker will send changes to this
new worker via shared memory. We keep this worker assigned till the
transaction commit came and also wait for the worker to finish at
commit. This preserves commit ordering and avoid writing to and
reading from file in most cases. We still need to spill if there is no
worker available. We also need to allow stream_stop to complete by the
background worker to finish it to avoid deadlocks because T-1's
current stream of changes can update rows in conflicting order with
T-2's next stream of changes.
Approach-2: Assign another worker to spill the changes and only allow
to apply at the commit time by the same or another worker. Now, to
preserve, the commit order, we need to wait at commit so that the
assigned respective workers can finish. This won't avoid spilling to
disk and reading back at commit time but can help in receiving and
processing more data than we are doing currently but not sure if this
can win over Approach-1 because we still need to write and read from
the file and we need to probably use share memory queue to send the
data to other background workers to process it.
We need to change error handling to allow the above parallelization.
The current model for apply is such that if any error occurs while
applying we will simply report the error in server logs and the apply
worker will exit. On the restart, it will again get the transaction
data which previously failed and it will try to apply it again. Now,
in the new approach (say Approach-1), we need to ensure that all the
active workers that are applying in-progress transactions should also
exit before the main apply worker exit to allow rollback of currently
applied transactions and re-apply them as we get the data again. This
is required to avoid losing transactions if any later transaction got
committed and updated the replication origin as in such cases the
earlier transactions won't be resent. This won't be much different
than what we do now, where say two transactions, t-1, and t-2 have
multiple streams overlapped. Now, if the error happened before one of
those is completed via commit or rollback, all the data needs to be
resent by the server and processed again by the apply worker.
The next step in this area is to parallelize apply of all possible
transactions. I think the main things we need to care about to allow
this are:
1. Transaction dependency: We can't simply allow dependent
transactions to perform in parallel as that can lead to inconsistency.
Say, if we insert a row in the first transaction and update it in the
second transaction and allow both transactions to apply in parallel,
the insert-one may occur later and the update will fail.
2. Deadlocks: It can happen because now the transactions will be
applied in parallel. Say transaction T-1 updates row-2 and row-3 and
transaction T-2 updates row-3 and row-2, if we allow in parallel then
there is a chance of deadlock whereas there is no such risk in serial
execution where the commit order is preserved.
We can solve both problems if we allow only independent xacts to be
parallelized. The transactions would be considered dependent if they
operate on the same set of rows from the same table. Now apart from
this, there could be other cases where determining transaction
dependency won't be straightforward, so we can disallow those
transactions to participate in parallel apply. Those are the cases
where we can use functions in the table definition expressions. We can
think of identifying safe functions like all built-in functions, and
any immutable functions (and probably stable functions). We need to
check safety for cases such as (a) trigger functions, (b) column
default value expressions (as those can call functions), (c)
constraint expressions, (d) foreign keys, (e) operations on
partitioned tables (especially those performed via
publish_via_partition_root option) as we need to check for expressions
on all partitions.
The transactions that operate on the same set of tables and are
performing truncate can lead to deadlock, so we need to consider such
transactions as a dependent.
The basic idea is that for each running xact we can maintain the table
oid, row id(pkey or replica identity), and xid in the hash table in
apply worker. For any new xact, we need to check if it doesn't
conflict with one of the previous running xacts and only then allow it
to be applied parallelly. We can collect all the changes of a
transaction in the in-memory buffer while checking its dependency and
then allow it to perform by one of the available workers at commit. If
the rows for a particular transaction exceed a certain threshold then
we need to escalate to a table-level strategy which means any other
transaction operating on the same table will be considered dependent.
For very large transactions that didn't fit in the in-memory buffer,
either we need to spill those to disk or just decide to not
parallelize them. We need to remove rows from the hash table once the
transaction is applied completely.
The other thing we need to ensure while parallelizing independent
transactions is to preserve the commit order of transactions. This is
to ensure that in case of errors, we won't get replicas out of sync.
Say, if we allow the commit order to be changed then it is possible
that some later transaction has updated the replication_origin LSN to
a later value than the transaction for which the apply is in progress.
Now, if the error occurs for such an in-progress transaction, the
server won't send the changes for such a transaction as the
replication_origin's LSN would have moved ahead.
Even though we are preserving commit order there will be a benefit of
doing parallel apply as we should be able to parallelize most of the
writes in the transactions.
Thoughts?
Thanks to Hou-San and Shi-San for helping me to investigate these ideas.
--
With Regards,
Amit Kapila.
On Wednesday, April 6, 2022 1:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
In this email, I would like to discuss allowing streaming logical
transactions (large in-progress transactions) by background workers
and parallel apply in general. The goal of this work is to improve the
performance of the apply work in logical replication.Currently, for large transactions, the publisher sends the data in
multiple streams (changes divided into chunks depending upon
logical_decoding_work_mem), and then on the subscriber-side, the apply
worker writes the changes into temporary files and once it receives
the commit, it read from the file and apply the entire transaction. To
improve the performance of such transactions, we can instead allow
them to be applied via background workers. There could be multiple
ways to achieve this:Approach-1: Assign a new bgworker (if available) as soon as the xact's
first stream came and the main apply worker will send changes to this
new worker via shared memory. We keep this worker assigned till the
transaction commit came and also wait for the worker to finish at
commit. This preserves commit ordering and avoid writing to and
reading from file in most cases. We still need to spill if there is no
worker available. We also need to allow stream_stop to complete by the
background worker to finish it to avoid deadlocks because T-1's
current stream of changes can update rows in conflicting order with
T-2's next stream of changes.
Attach the POC patch for the Approach-1 of "Perform streaming logical
transactions by background workers". The patch is still a WIP patch as
there are serval TODO items left, including:
* error handling for bgworker
* support for SKIP the transaction in bgworker
* handle the case when there is no more worker available
(might need spill the data to the temp file in this case)
* some potential bugs
The original patch is borrowed from an old thread[1]/messages/by-id/8eda5118-2dd0-79a1-4fe9-eec7e334de17@postgrespro.ru and was rebased and
extended/cleaned by me. Comments and suggestions are welcome.
[1]: /messages/by-id/8eda5118-2dd0-79a1-4fe9-eec7e334de17@postgrespro.ru
Here are some performance results of the patch shared by Shi Yu off-list.
The performance was tested by varying
logical_decoding_work_mem, which include two cases:
1) bulk insert.
2) create savepoint and rollback to savepoint.
I used synchronous logical replication in the test, compared SQL execution
times before and after applying the patch.
The results are as follows. The bar charts and the details of the test are
Attached as well.
RESULT - bulk insert (5kk)
----------------------------------
logical_decoding_work_mem 64kB 128kB 256kB 512kB 1MB 2MB 4MB 8MB 16MB 32MB 64MB
HEAD 51.673 51.199 51.166 50.259 52.898 50.651 51.156 51.210 50.678 51.256 51.138
patched 36.198 35.123 34.223 29.198 28.712 29.090 29.709 29.408 34.367 34.716 35.439
RESULT - rollback to savepoint (600k)
----------------------------------
logical_decoding_work_mem 64kB 128kB 256kB 512kB 1MB 2MB 4MB 8MB 16MB 32MB 64MB
HEAD 31.101 31.087 30.931 31.015 30.920 31.109 30.863 31.008 30.875 30.775 29.903
patched 28.115 28.487 27.804 28.175 27.734 29.047 28.279 27.909 28.277 27.345 28.375
Summary:
1) bulk insert
For different logical_decoding_work_mem size, it takes about 30% ~ 45% less
time, which looks good to me. After applying this patch, it seems that the
performance is better when logical_decoding_work_mem is between 512kB and 8MB.
2) rollback to savepoint
There is an improvement of about 5% ~ 10% after applying this patch.
In this case, the patch spend less time handling the part that is not
rolled back, because it saves the time writing the changes into a temporary file
and reading the file. And for the part that is rolled back, it would spend more
time than HEAD, because it takes more time to write to filesystem and rollback
than writing a temporary file and truncating the file. Overall, the results looks
good.
Best regards,
Hou zj
On Friday, April 8, 2022 5:14 PM houzj.fnst@fujitsu.com <houzj.fnst@fujitsu.com> wrote:
On Wednesday, April 6, 2022 1:20 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:In this email, I would like to discuss allowing streaming logical
transactions (large in-progress transactions) by background workers
and parallel apply in general. The goal of this work is to improve the
performance of the apply work in logical replication.Currently, for large transactions, the publisher sends the data in
multiple streams (changes divided into chunks depending upon
logical_decoding_work_mem), and then on the subscriber-side, the apply
worker writes the changes into temporary files and once it receives
the commit, it read from the file and apply the entire transaction. To
improve the performance of such transactions, we can instead allow
them to be applied via background workers. There could be multiple
ways to achieve this:Approach-1: Assign a new bgworker (if available) as soon as the xact's
first stream came and the main apply worker will send changes to this
new worker via shared memory. We keep this worker assigned till the
transaction commit came and also wait for the worker to finish at
commit. This preserves commit ordering and avoid writing to and
reading from file in most cases. We still need to spill if there is no
worker available. We also need to allow stream_stop to complete by the
background worker to finish it to avoid deadlocks because T-1's
current stream of changes can update rows in conflicting order with
T-2's next stream of changes.Attach the POC patch for the Approach-1 of "Perform streaming logical
transactions by background workers". The patch is still a WIP patch as
there are serval TODO items left, including:* error handling for bgworker
* support for SKIP the transaction in bgworker
* handle the case when there is no more worker available
(might need spill the data to the temp file in this case)
* some potential bugsThe original patch is borrowed from an old thread[1] and was rebased and
extended/cleaned by me. Comments and suggestions are welcome.
Attach a new version patch which improved the error handling and handled the case
when there is no more worker available (will spill the data to the temp file in this case).
Currently, it still doesn't support skip the streamed transaction in bgworker, because
in this approach, we don't know the last lsn for the streamed transaction being applied,
so cannot get the lsn to SKIP. I will think more about it and keep testing the patch.
Best regards,
Hou zj
Attachments:
v2-0001-Perform-streaming-logical-transactions-by-background.patchapplication/octet-stream; name=v2-0001-Perform-streaming-logical-transactions-by-background.patchDownload+1377-158
On Thu, Apr 14, 2022 at 9:12 AM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
On Friday, April 8, 2022 5:14 PM houzj.fnst@fujitsu.com <houzj.fnst@fujitsu.com> wrote:
Attach a new version patch which improved the error handling and handled the case
when there is no more worker available (will spill the data to the temp file in this case).Currently, it still doesn't support skip the streamed transaction in bgworker, because
in this approach, we don't know the last lsn for the streamed transaction being applied,
so cannot get the lsn to SKIP. I will think more about it and keep testing the patch.
I think we can avoid performing the streaming transaction by bgworker
if skip_lsn is set. This needs some more thought but anyway I see
another problem in this patch. I think we won't be able to make the
decision whether to apply the change for a relation that is not in the
'READY' state (see should_apply_changes_for_rel) as we won't know
'remote_final_lsn' by that time for streaming transactions. I think
what we can do here is that before assigning the transaction to
bgworker, we can check if any of the rels is not in the 'READY' state,
we can make the transaction spill the changes as we are doing now.
Even if we do such a check, it is still possible that some rel on
which this transaction is performing operation can appear to be in
'non-ready' state after starting bgworker and for such a case I think
we need to give error and restart the transaction as we have no way to
know whether we need to perform an operation on the 'rel'. This is
possible if the user performs REFRESH PUBLICATION in parallel to this
transaction as that can add a new rel to the pg_subscription_rel.
--
With Regards,
Amit Kapila.
On Tuesday, April 19, 2022 2:58 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Thu, Apr 14, 2022 at 9:12 AM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:On Friday, April 8, 2022 5:14 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
Attach a new version patch which improved the error handling and handled
the case
when there is no more worker available (will spill the data to the temp file in
this case).
Currently, it still doesn't support skip the streamed transaction in bgworker,
because
in this approach, we don't know the last lsn for the streamed transaction
being applied,
so cannot get the lsn to SKIP. I will think more about it and keep testing the
patch.
I think we can avoid performing the streaming transaction by bgworker
if skip_lsn is set. This needs some more thought but anyway I see
another problem in this patch. I think we won't be able to make the
decision whether to apply the change for a relation that is not in the
'READY' state (see should_apply_changes_for_rel) as we won't know
'remote_final_lsn' by that time for streaming transactions. I think
what we can do here is that before assigning the transaction to
bgworker, we can check if any of the rels is not in the 'READY' state,
we can make the transaction spill the changes as we are doing now.
Even if we do such a check, it is still possible that some rel on
which this transaction is performing operation can appear to be in
'non-ready' state after starting bgworker and for such a case I think
we need to give error and restart the transaction as we have no way to
know whether we need to perform an operation on the 'rel'. This is
possible if the user performs REFRESH PUBLICATION in parallel to this
transaction as that can add a new rel to the pg_subscription_rel.
Changed as suggested.
Attach the new version patch which cleanup some code and fix above problem. For
now, it won't apply streaming transaction in bgworker if skiplsn is set or any
table is not in 'READY' state.
Besides, extent the subscription streaming option to ('on/off/apply(apply in
bgworker)/spool(spool to file)') so that user can control whether to apply The
transaction in a bgworker.
Best regards,
Hou zj
Attachments:
v3-0001-Perform-streaming-logical-transactions-by-background.patchapplication/octet-stream; name=v3-0001-Perform-streaming-logical-transactions-by-background.patchDownload+1564-209
On Wednesday, April 20, 2022 4:57 PM houzj.fnst@fujitsu.com wrote:
On Tuesday, April 19, 2022 2:58 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:On Thu, Apr 14, 2022 at 9:12 AM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:On Friday, April 8, 2022 5:14 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
Attach a new version patch which improved the error handling and
handledthe case
when there is no more worker available (will spill the data to the
temp file inthis case).
Currently, it still doesn't support skip the streamed transaction in
bgworker,because
in this approach, we don't know the last lsn for the streamed
transactionbeing applied,
so cannot get the lsn to SKIP. I will think more about it and keep
testing thepatch.
I think we can avoid performing the streaming transaction by bgworker
if skip_lsn is set. This needs some more thought but anyway I see
another problem in this patch. I think we won't be able to make the
decision whether to apply the change for a relation that is not in the
'READY' state (see should_apply_changes_for_rel) as we won't know
'remote_final_lsn' by that time for streaming transactions. I think
what we can do here is that before assigning the transaction to
bgworker, we can check if any of the rels is not in the 'READY' state,
we can make the transaction spill the changes as we are doing now.
Even if we do such a check, it is still possible that some rel on
which this transaction is performing operation can appear to be in
'non-ready' state after starting bgworker and for such a case I think
we need to give error and restart the transaction as we have no way to
know whether we need to perform an operation on the 'rel'. This is
possible if the user performs REFRESH PUBLICATION in parallel to this
transaction as that can add a new rel to the pg_subscription_rel.Changed as suggested.
Attach the new version patch which cleanup some code and fix above problem.
For now, it won't apply streaming transaction in bgworker if skiplsn is set or any
table is not in 'READY' state.Besides, extent the subscription streaming option to ('on/off/apply(apply in
bgworker)/spool(spool to file)') so that user can control whether to apply The
transaction in a bgworker.
Sorry, there was a miss in the pg_dump testcase which cause failure in CFbot.
Attach a new version patch which fix that.
Best regards,
Hou zj
Attachments:
v4-0001-Perform-streaming-logical-transactions-by-background.patchapplication/octet-stream; name=v4-0001-Perform-streaming-logical-transactions-by-background.patchDownload+1574-217
Hello Hou-san. Here are my review comments for v4-0001. Sorry, there
are so many of them (it is a big patch); some are trivial, and others
you might easily dismiss due to my misunderstanding of the code. But
hopefully, there are at least some comments that can be helpful in
improving the patch quality.
======
1. General comment - terms
Needs to be more consistent about what exactly you will call this new
worker. Sometimes called "locally apply worker"; sometimes "bgworker";
sometimes "subworker", sometimes "BGW", sometimes other variations etc
… Need to pick ONE good name then update all the references/comments
in the patch to use that name consistently throughout.
~~~
2. General comment - option values
I felt the "streaming" option values ought to be different from what
this patch proposes so it affected some of my following review
comments. (Later I give example what I thought the values should be).
~~~
3. General comment - bool option change to enum
This option change for "streaming" is similar to the options change
for "copy_data=force" that Vignesh is doing for his "infinite
recursion" patch v9-0002 [1]/messages/by-id/CALDaNm2Fe=g4Tx-DhzwD6NU0VRAfaPedXwWO01maNU7_OfS8fw@mail.gmail.com. Yet they seem implemented differently
(i.e. char versus enum). I think you should discuss the 2 approaches
with Vignesh and then code these option changes in a consistent way.
~~~
4. General comment - worker.c globals
There seems a growing number of global variables in the worker.c code.
I was wondering is it really necessary? because the logic becomes more
intricate now if you have to know that some global was set up as a
side-effect of some other function call. E.g maybe if you could do a
few more HTAB lookups to identify the bgworker then might not need to
rely on the globals so much?
======
5. Commit message - typo
and then on the subscriber-side, the apply worker writes the changes into
temporary files and once it receives the commit, it read from the file and
apply the entire transaction. To improve the performance of such transactions,
typo: "read" -> "reads"
typo: "apply" -> "applies"
~~~
6. Commit message - wording
In this approach, we assign a new bgworker (if available) as soon as the xact's
first stream came and the main apply worker will send changes to this new
worker via shared memory. The bgworker will directly apply the change instead
of writing it to temporary files. We keep this worker assigned till the
transaction commit came and also wait for the worker to finish at commit. This
wording: "came" -> "is received" (2x)
~~~
7. Commit message - terms
(this is the same point as comment #1)
I think there is too much changing of terminology. IMO it will be
easier if you always just call the current main apply workers the
"apply worker" and always call this new worker the "bgworker" (or some
better name). But never just call it the "worker".
~~~
8. Commit message - typo
transaction commit came and also wait for the worker to finish at commit. This
preserves commit ordering and avoid writing to and reading from file in most
cases. We still need to spill if there is no worker available. We also need to
typo: "avoid" -> "avoids"
~~~
9. Commit message - wording/typo
Also extend the subscription streaming option so that user can control whether
apply the streaming transaction in a bgworker or spill the change to disk. User
wording: "Also extend" -> "This patch also extends"
typo: "whether apply" -> "whether to apply"
~~~
10. Commit message - option values
apply the streaming transaction in a bgworker or spill the change to disk. User
can set the streaming option to 'on/off', 'apply', 'spool'. For now, 'on' and
Those values do not really seem intuitive to me. E.g. if you set
"apply" then you already said above that sometimes it might have to
spool anyway if there were no bgworkers available. Why not just name
them like "on/off/parallel"?
(I have written more about this in a later comment #14)
======
11. doc/src/sgml/catalogs.sgml - wording
+ Controls in which modes we handle the streaming of in-progress
transactions.
+ <literal>f</literal> = disallow streaming of in-progress transactions
wording: "Controls in which modes we handle..." -> "Controls how to handle..."
~~~
12. doc/src/sgml/catalogs.sgml - wording
+ <literal>a</literal> = apply changes directly in background worker
wording: "in background worker" -> "using a background worker"
~~~
13. doc/src/sgml/catalogs.sgml - option values
Anyway, all this page will be different if I can persuade you to
change the option values (see comment #14)
======
14. doc/src/sgml/ref/create_subscription.sgml - option values
Since the default value is "off" I felt these options would be
better/simpler if they are just like "off/on/parallel". E.g.
Specifically, I think the "on" should behave the same as the current
code does, so the user should deliberately choose to use this new
bgworker approach.
e.g.
- "off" = off, same as current PG15
- "on" = on, same as current PG15
- "parallel" = try to use the new bgworker to apply stream
======
15. src/backend/commands/subscriptioncmds.c - SubOpts
Vignesh uses similar code for his "infinite recursion" patch being
developed [1]/messages/by-id/CALDaNm2Fe=g4Tx-DhzwD6NU0VRAfaPedXwWO01maNU7_OfS8fw@mail.gmail.com but he used an enum but here you use a char. I think you
should discuss together both decide to use either enum or char for the
member so there is a consistency.
~~~
16. src/backend/commands/subscriptioncmds.c - combine conditions
+ /*
+ * The set of strings accepted here should match up with the
+ * grammar's opt_boolean_or_string production.
+ */
+ if (pg_strcasecmp(sval, "true") == 0)
+ return SUBSTREAM_APPLY;
+ if (pg_strcasecmp(sval, "false") == 0)
+ return SUBSTREAM_OFF;
+ if (pg_strcasecmp(sval, "on") == 0)
+ return SUBSTREAM_APPLY;
+ if (pg_strcasecmp(sval, "off") == 0)
+ return SUBSTREAM_OFF;
+ if (pg_strcasecmp(sval, "spool") == 0)
+ return SUBSTREAM_SPOOL;
+ if (pg_strcasecmp(sval, "apply") == 0)
+ return SUBSTREAM_APPLY;
Because I think the possible option values should be different to
these I can’t comment much on this code, except to suggest IMO the if
conditions should be combined where the options are considered to be
equivalent.
======
17. src/backend/replication/logical/launcher.c - stop_worker
@@ -72,6 +72,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
static void logicalrep_worker_detach(void);
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
+static void stop_worker(LogicalRepWorker *worker);
The function name does not seem consistent with the other similar static funcs.
~~~
18. src/backend/replication/logical/launcher.c - change if
@@ -225,7 +226,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool
only_running)
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
if (w->in_use && w->subid == subid && w->relid == relid &&
- (!only_running || w->proc))
+ (!only_running || w->proc) && !w->subworker)
{
Maybe code would be easier (and then you can comment it) if you do like:
/* TODO: comment here */
if (w->subworker)
continue;
~~~
19. src/backend/replication/logical/launcher.c -
logicalrep_worker_launch comment
@@ -262,9 +263,9 @@ logicalrep_workers_find(Oid subid, bool only_running)
/*
* Start new apply background worker, if possible.
*/
-void
+bool
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
- Oid relid)
+ Oid relid, dsm_handle subworker_dsm)
Saying "start new apply..." comment feels a bit misleading. E.g. this
is also called to start the sync worker. And also for the main apply
worker (which we are not really calling a "background worker" in other
places). So this is the same kind of terminology problem as my review
comment #1.
~~~
20. src/backend/replication/logical/launcher.c - asserts?
I thought maybe there should be some assertions in this code upfront.
E.g. cannot have OidIsValid(relid) and subworker_dsm valid at the same
time.
~~~
21. src/backend/replication/logical/launcher.c - terms
+ else
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication apply worker for subscription %u", subid);
I think the names of all these workers is a bit vague still in the
messages – e.g. "logical replication worker" versus "logical
replication apply worker" sounds too similar to me. So this is kind of
same as my review comment #1.
~~~
22. src/backend/replication/logical/launcher.c -
logicalrep_worker_stop double unlock?
@@ -450,6 +465,18 @@ logicalrep_worker_stop(Oid subid, Oid relid)
return;
}
+ stop_worker(worker);
+
+ LWLockRelease(LogicalRepWorkerLock);
+}
IIUC, sometimes it seems that stop_worker() function might already
release the lock before it returns. In that case won’t this other
explicit lock release be a problem?
~~~
23. src/backend/replication/logical/launcher.c - logicalrep_worker_detach
@@ -600,6 +625,28 @@ logicalrep_worker_attach(int slot)
static void
logicalrep_worker_detach(void)
{
+ /*
+ * If we are the main apply worker, stop all the sub apply workers we
+ * started before.
+ */
+ if (!MyLogicalRepWorker->subworker)
+ {
+ List *workers;
+ ListCell *lc;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
+ foreach(lc, workers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+ if (w->subworker)
+ stop_worker(w);
+ }
+
+ LWLockRelease(LogicalRepWorkerLock);
Can this have the same double-unlock problem as I described in the
previous review comment #22?
~~~
24. src/backend/replication/logical/launcher.c - ApplyLauncherMain
@@ -869,7 +917,7 @@ ApplyLauncherMain(Datum main_arg)
wait_time = wal_retrieve_retry_interval;
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
- sub->owner, InvalidOid);
+ sub->owner, InvalidOid, DSM_HANDLE_INVALID);
}
Now that the logicalrep_worker_launch is retuning a bool, should this
call be checking the return value and taking appropriate action if it
failed?
======
25. src/backend/replication/logical/origin.c - acquire comment
+ /*
+ * We allow the apply worker to get the slot which is acquired by its
+ * leader process.
+ */
+ else if (curstate->acquired_by != 0 && acquire)
The comment was not very clear to me. Does the term "apply worker" in
the comment make sense, or should that say "bgworker"? This might be
another example of my review comment #1.
~~~
26. src/backend/replication/logical/origin.c - acquire code
+ /*
+ * We allow the apply worker to get the slot which is acquired by its
+ * leader process.
+ */
+ else if (curstate->acquired_by != 0 && acquire)
{
ereport(ERROR,
I somehow felt that this param would be better called 'skip_acquire',
so all the callers would have to use the opposite boolean and then
this code would say like below (which seemed easier to me). YMMV.
else if (curstate->acquired_by != 0 && !skip_acquire)
{
ereport(ERROR,
=====
27. src/backend/replication/logical/tablesync.c
@@ -568,7 +568,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
MySubscription->oid,
MySubscription->name,
MyLogicalRepWorker->userid,
- rstate->relid);
+ rstate->relid,
+ DSM_HANDLE_INVALID);
hentry->last_start_time = now;
Now that the logicalrep_worker_launch is returning a bool, should this
call be checking that the launch was successful before it changes the
last_start_time?
======
28. src/backend/replication/logical/worker.c - file comment
+ * 1) Separate background workers
+ *
+ * Assign a new bgworker (if available) as soon as the xact's first stream came
+ * and the main apply worker will send changes to this new worker via shared
+ * memory. We keep this worker assigned till the transaction commit came and
+ * also wait for the worker to finish at commit. This preserves commit ordering
+ * and avoid writing to and reading from file in most cases. We still need to
+ * spill if there is no worker available. We also need to allow stream_stop to
+ * complete by the background worker to finish it to avoid deadlocks because
+ * T-1's current stream of changes can update rows in conflicting order with
+ * T-2's next stream of changes.
This comment fragment looks the same as the commit message so the
typos/wording reported already for the commit message are applicable
here too.
~~~
29. src/backend/replication/logical/worker.c - file comment
+ * If no worker is available to handle streamed transaction, we write the data
* to temporary files and then applied at once when the final commit arrives.
wording: "we write the data" -> "the data is written"
~~~
30. src/backend/replication/logical/worker.c - ParallelState
+typedef struct ParallelState
Add to typedefs.list
~~~
31. src/backend/replication/logical/worker.c - ParallelState flags
+typedef struct ParallelState
+{
+ slock_t mutex;
+ bool attached;
+ bool ready;
+ bool finished;
+ bool failed;
+ Oid subid;
+ TransactionId stream_xid;
+ uint32 n;
+} ParallelState;
Those bool states look independent to me. Should they be one enum
member instead of lots of bool members?
~~~
32. src/backend/replication/logical/worker.c - ParallelState comments
+typedef struct ParallelState
+{
+ slock_t mutex;
+ bool attached;
+ bool ready;
+ bool finished;
+ bool failed;
+ Oid subid;
+ TransactionId stream_xid;
+ uint32 n;
+} ParallelState;
Needs some comments. Some might be self-evident but some are not -
e.g. what is 'n'?
~~~
33. src/backend/replication/logical/worker.c - WorkerState
+typedef struct WorkerState
Add to typedefs.list
~~~
34. src/backend/replication/logical/worker.c - WorkerEntry
+typedef struct WorkerEntry
Add to typedefs.list
~~~
35. src/backend/replication/logical/worker.c - static function names
+/* Worker setup and interactions */
+static void setup_dsm(WorkerState *wstate);
+static WorkerState *setup_background_worker(void);
+static void wait_for_worker_ready(WorkerState *wstate, bool notify);
+static void wait_for_transaction_finish(WorkerState *wstate);
+static void send_data_to_worker(WorkerState *wstate, Size nbytes,
+ const void *data);
+static WorkerState *find_or_start_worker(TransactionId xid, bool start);
+static void free_stream_apply_worker(void);
+static bool transaction_applied_in_bgworker(TransactionId xid);
+static void check_workers_status(void);
All these new functions have random-looking names. Since they all are
new to this feature I thought they should all be named similarly...
e.g. something like
bgworker_setup
bgworker_check_status
bgworker_wait_for_ready
etc.
~~~
36. src/backend/replication/logical/worker.c - nchanges
+
+static uint32 nchanges = 0;
+
What is this? Needs a comment.
~~~
37. src/backend/replication/logical/worker.c - handle_streamed_transaction
static bool
handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
{
- TransactionId xid;
+ TransactionId current_xid = InvalidTransactionId;
/* not in streaming mode */
- if (!in_streamed_transaction)
+ if (!in_streamed_transaction && !isLogicalApplyWorker)
return false;
Is it correct to be testing the isLogicalApplyWorker here?
e.g. What if the streaming code is not using bgworkers at all?
At least maybe that comment (/* not in streaming mode */) should be updated?
~~~
38. src/backend/replication/logical/worker.c - handle_streamed_transaction
+ if (current_xid != stream_xid &&
+ !list_member_int(subxactlist, (int) current_xid))
+ {
+ MemoryContext oldctx;
+ char *spname = (char *) palloc(64 * sizeof(char));
+ sprintf(spname, "savepoint_for_xid_%u", current_xid);
Can't the name just be a char[64] on the stack?
~~~
39. src/backend/replication/logical/worker.c - handle_streamed_transaction
+ /*
+ * XXX The publisher side don't always send relation update message
+ * after the streaming transaction, so update the relation in main
+ * worker here.
+ */
typo: "don't" -> "doesn't" ?
~~~
40. src/backend/replication/logical/worker.c - apply_handle_commit_prepared
@@ -976,30 +1116,51 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
+
set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
Spurious whitespace?
~~~
41. src/backend/replication/logical/worker.c - apply_handle_commit_prepared
+ /* Check if we have prepared transaction in another bgworker */
+ if (transaction_applied_in_bgworker(prepare_data.xid))
+ {
+ elog(DEBUG1, "received commit for streamed transaction %u", prepare_data.xid);
- /* There is no transaction when COMMIT PREPARED is called */
- begin_replication_step();
+ /* Send commit message */
+ send_data_to_worker(stream_apply_worker, s->len, s->data);
It seems a bit complex/tricky that the code is always relying on all
the side-effects that the global stream_apply_worker will be set.
I am not sure if it is possible to remove the global and untangle
everything. E.g. Why not change the transaction_applied_in_bgworker to
return the bgworker (instead of return bool) and then can assign it to
a local var in this function.
Or can’t you do HTAB lookup in a few more places instead of carrying
around the knowledge of some global var that was initialized in some
other place?
It would be easier if you can eliminate having to be aware of
side-effects happening behind the scenes.
~~~
42. src/backend/replication/logical/worker.c - apply_handle_rollback_prepared
@@ -1019,35 +1180,51 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
+
set_apply_error_context_xact(rollback_data.xid,
rollback_data.rollback_end_lsn);
Spurious whitespace?
~~~
43. src/backend/replication/logical/worker.c - apply_handle_rollback_prepared
+ /* Check if we are processing the prepared transaction in a bgworker */
+ if (transaction_applied_in_bgworker(rollback_data.xid))
+ {
+ send_data_to_worker(stream_apply_worker, s->len, s->data);
Same as previous comment #41. Relies on the side effect of something
setting the global stream_apply_worker.
~~~
44. src/backend/replication/logical/worker.c - find_or_start_worker
+ /*
+ * For streaming transactions that is being applied in bgworker, we cannot
+ * decide whether to apply the change for a relation that is not in the
+ * READY state (see should_apply_changes_for_rel) as we won't know
+ * remote_final_lsn by that time. So, we don't start new bgworker in this
+ * case.
+ */
typo: "that is" -> "that are"
~~~
45. src/backend/replication/logical/worker.c - find_or_start_worker
+ if (MySubscription->stream != SUBSTREAM_APPLY)
+ return NULL;
...
+ else if (start && !XLogRecPtrIsInvalid(MySubscription->skiplsn))
+ return NULL;
...
+ else if (start && !AllTablesyncsReady())
+ return NULL;
+ else if (!start && ApplyWorkersHash == NULL)
+ return NULL;
I am not sure but I think most of that rejection if/else can probably
just be "if" (not "else if") because otherwise, the code would have
returned anyhow, right? Removing all the "else" might make the code
more readable.
~~~
46. src/backend/replication/logical/worker.c - find_or_start_worker
+ if (wstate == NULL)
+ {
+ /*
+ * If there is no more worker can be launched here, remove the
+ * entry in hash table.
+ */
+ hash_search(ApplyWorkersHash, &xid, HASH_REMOVE, &found);
+ return NULL;
+ }
wording: "If there is no more worker can be launched here, remove" ->
"If the bgworker cannot be launched, remove..."
~~~
47. src/backend/replication/logical/worker.c - free_stream_apply_worker
+/*
+ * Add the worker to the freelist and remove the entry from hash table.
+ */
+static void
+free_stream_apply_worker(void)
IMO it might be better to pass the bgworker here instead of silently
working with the global stream_apply_worker.
~~~
48. src/backend/replication/logical/worker.c - free_stream_apply_worker
+ elog(LOG, "adding finished apply worker #%u for xid %u to the idle list",
+ stream_apply_worker->pstate->n, stream_apply_worker->pstate->stream_xid);
Should the be an Assert here to check the bgworker state really was FINISHED?
~~~
49. src/backend/replication/logical/worker.c - serialize_stream_prepare
+static void
+serialize_stream_prepare(LogicalRepPreparedTxnData *prepare_data)
Missing function comment.
~~~
50. src/backend/replication/logical/worker.c - serialize_stream_start
-/*
- * Handle STREAM START message.
- */
static void
-apply_handle_stream_start(StringInfo s)
+serialize_stream_start(bool first_segment)
Missing function comment.
~~~
51. src/backend/replication/logical/worker.c - serialize_stream_stop
+static void
+serialize_stream_stop()
+{
Missing function comment.
~~~
52. src/backend/replication/logical/worker.c - general serialize_XXXX
I can see now that you have created many serialize_XXX functions which
seem to only be called one time. It looks like the only purpose is to
encapsulate the code to make the handler function shorter? But it
seems a bit uneven that you did this only for the serialize cases. If
you really want these separate functions then perhaps there ought to
also be the equivalent bgworker functions too. There seem to be always
3 scenarios:
i.e
1. Worker is the bgworker
2. Worker is Main Apply but a bgworker exists
3. Worker is Main apply and bgworker does not exist.
Perhaps every handler function should have THREE other little
functions that it calls appropriately?
~~~
53. src/backend/replication/logical/worker.c - serialize_stream_abort
+
+static void
+serialize_stream_abort(TransactionId xid, TransactionId subxid)
+{
Missing function comment.
~~~
54. src/backend/replication/logical/worker.c - apply_handle_stream_abort
+ if (isLogicalApplyWorker)
+ {
+ ereport(LOG,
+ (errcode_for_file_access(),
+ errmsg("[Apply BGW #%u] aborting current transaction xid=%u, subxid=%u",
+ MyParallelState->n, GetCurrentTransactionIdIfAny(),
GetCurrentSubTransactionId())));
Why is the errcode using errcode_for_file_access? (2x)
~~~
55. src/backend/replication/logical/worker.c - apply_handle_stream_abort
+ /*
+ * OK, so it's a subxact. Rollback to the savepoint.
+ *
+ * We also need to read the subxactlist, determine the offset
+ * tracked for the subxact, and truncate the list.
+ */
+ int i;
+ bool found = false;
+ char *spname = (char *) palloc(64 * sizeof(char));
Can that just be char[64] on the stack?
~~~
56. src/backend/replication/logical/worker.c - apply_dispatch
@@ -2511,6 +3061,7 @@ apply_dispatch(StringInfo s)
break;
case LOGICAL_REP_MSG_STREAM_START:
+ elog(LOG, "LOGICAL_REP_MSG_STREAM_START");
apply_handle_stream_start(s);
break;
I guess this is just for debugging purposes so you should put some
FIXME comment here as a reminder to get rid of it later?
~~~
57. src/backend/replication/logical/worker.c - store_flush_position,
isLogicalApplyWorker
@@ -2618,6 +3169,10 @@ store_flush_position(XLogRecPtr remote_lsn)
{
FlushPosition *flushpos;
+ /* We only need to collect the LSN in main apply worker */
+ if (isLogicalApplyWorker)
+ return;
+
This comment is not specific to this function, but for global
isLogicalApplyWorker IMO this should be implemented to look more like
the inline function am_tablesync_worker().
e.g. I think you should replace this global with something like
am_apply_bgworker()
Maybe it should do something like check the value of
MyLogicalRepWorker->subworker?
~~~
58. src/backend/replication/logical/worker.c - LogicalRepApplyLoop
@@ -3467,6 +4025,7 @@ TwoPhaseTransactionGid(Oid subid, TransactionId
xid, char *gid, int szgid)
snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
}
+
/*
* Execute the initial sync with error handling. Disable the subscription,
* if it's required.
Spurious whitespace
~~~
59. src/backend/replication/logical/worker.c - ApplyWorkerMain
@@ -3733,7 +4292,7 @@ ApplyWorkerMain(Datum main_arg)
options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
- options.proto.logical.streaming = MySubscription->stream;
+ options.proto.logical.streaming = (MySubscription->stream != SUBSTREAM_OFF);
options.proto.logical.twophase = false;
I was not sure why this is converting from an enum to a boolean? Is it right?
~~~
60. src/backend/replication/logical/worker.c - LogicalApplyBgwLoop
+ shmq_res = shm_mq_receive(mqh, &len, &data, false);
+
+ if (shmq_res != SHM_MQ_SUCCESS)
+ break;
Should this log some more error information here?
~~~
61. src/backend/replication/logical/worker.c - LogicalApplyBgwLoop
+ if (len == 0)
+ {
+ elog(LOG, "[Apply BGW #%u] got zero-length message, stopping", pst->n);
+ break;
+ }
+ else
+ {
+ XLogRecPtr start_lsn;
+ XLogRecPtr end_lsn;
+ TimestampTz send_time;
Maybe the "else" is not needed here, and if you remove it then it will
get rid of all the unnecessary indentation.
~~~
62. src/backend/replication/logical/worker.c - LogicalApplyBgwLoop
+ /*
+ * We use first byte of message for additional communication between
+ * main Logical replication worker and Apply BGWorkers, so if it
+ * differs from 'w', then process it first.
+ */
I was thinking maybe this switch should include
case 'w':
break;
because then for the "default" case you should give ERROR because
something unexpected arrived.
~~~
63. src/backend/replication/logical/worker.c - ApplyBgwShutdown
+static void
+ApplyBgwShutdown(int code, Datum arg)
+{
+ SpinLockAcquire(&MyParallelState->mutex);
+ MyParallelState->failed = true;
+ SpinLockRelease(&MyParallelState->mutex);
+
+ dsm_detach((dsm_segment *) DatumGetPointer(arg));
+}
Should this do detach first and set the flag last?
~~~
64. src/backend/replication/logical/worker.c - LogicalApplyBgwMain
+ /*
+ * Acquire a worker number.
+ *
+ * By convention, the process registering this background worker should
+ * have stored the control structure at key 0. We look up that key to
+ * find it. Our worker number gives our identity: there may be just one
+ * worker involved in this parallel operation, or there may be many.
+ */
Maybe there should be another elog closer to this comment? So as soon
as you know the BGW number log something?
e.g.
elog(LOG, "[Apply BGW #%u] starting", pst->n);
~~~
65. src/backend/replication/logical/worker.c - setup_background_worker
+/*
+ * Register background workers.
+ */
+static WorkerState *
+setup_background_worker(void)
I think that comment needs some more info because it is doing more
than just registering... it is successfully launching the worker
first.
~~~
66. src/backend/replication/logical/worker.c - setup_background_worker
+ if (launched)
+ {
+ /* Wait for worker to become ready. */
+ wait_for_worker_ready(wstate, false);
+
+ ApplyWorkersList = lappend(ApplyWorkersList, wstate);
+ nworkers += 1;
+ }
Do you really need to carry around this global 'nworkers' variable?
Can’t you just check the length of the ApplyWorkerList to get this
number?
~~~
67. src/backend/replication/logical/worker.c - send_data_to_worker
+/*
+ * Send the data to worker via shared-memory queue.
+ */
+static void
+send_data_to_worker(WorkerState *wstate, Size nbytes, const void *data)
wording: "to worker" -> "to the specified apply bgworker"
This is just another example of my comment #1.
~~~
68. src/backend/replication/logical/worker.c - send_data_to_worker
+ if (result != SHM_MQ_SUCCESS)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not send tuple to shared-memory queue")));
+}
typo: is "tuples" the right word here?
~~~
69. src/backend/replication/logical/worker.c - wait_for_worker_ready
+
+static void
+wait_for_worker_ready(WorkerState *wstate, bool notify)
+{
Missing function comment.
~~~
70. src/backend/replication/logical/worker.c - wait_for_worker_ready
+
+static void
+wait_for_worker_ready(WorkerState *wstate, bool notify)
+{
'notify' seems a bit of a poor name here. And this param seems a bit
of a strange side-effect for something called wait_for_worker_ready.
If really need to do this way maybe name it something more verbose
like 'notify_received_stream_stop'?
~~~
71. src/backend/replication/logical/worker.c - wait_for_worker_ready
+ if (!result)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("one or more background workers failed to start")));
Is the ERROR code reachable? IIUC there is no escape from the previous
for (;;) loop except when the result is set to true.
~~~
72. src/backend/replication/logical/worker.c - wait_for_transaction_finish
+
+static void
+wait_for_transaction_finish(WorkerState *wstate)
+{
Missing function comment.
~~~
73. src/backend/replication/logical/worker.c - wait_for_transaction_finish
+ if (finished)
+ {
+ break;
+ }
The brackets are not needed for 1 statement.
~~~
74. src/backend/replication/logical/worker.c - transaction_applied_in_bgworker
+static bool
+transaction_applied_in_bgworker(TransactionId xid)
Instead of side-effect assigning the global variable, why not return
the bgworker (or NULL) and let the caller work with the result?
~~~
75. src/backend/replication/logical/worker.c - check_workers_status
+/*
+ * Check the status of workers and report an error if any bgworker exit
+ * unexpectedly.
wording: -> "... if any bgworker has exited unexpectedly ..."
~~~
76. src/backend/replication/logical/worker.c - check_workers_status
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("Background worker %u exited unexpectedly",
+ wstate->pstate->n)));
Should that message also give more identifying info about the
*current* worker doing the ERROR - e.g.the one which found this the
other bgworker was failed? Or is that just the PIC in the log message
good enough?
~~~
77. src/backend/replication/logical/worker.c - check_workers_status
+ if (!AllTablesyncsReady() && nfreeworkers != list_length(ApplyWorkersList))
+ {
I did not really understand this code, but isn't there a possibility
that it will cause many restarts if the tablesyncs are taking a long
time to complete?
======
78. src/include/catalog/pg_subscription.
@@ -122,6 +122,18 @@ typedef struct Subscription
List *publications; /* List of publication names to subscribe to */
} Subscription;
+/* Disallow streaming in-progress transactions */
+#define SUBSTREAM_OFF 'f'
+
+/*
+ * Streaming transactions are written to a temporary file and applied only
+ * after the transaction is committed on upstream.
+ */
+#define SUBSTREAM_SPOOL 's'
+
+/* Streaming transactions are appied immediately via a background worker */
+#define SUBSTREAM_APPLY 'a'
IIRC Vignesh had a similar options requirement for his "infinite
recursion" patch [1]/messages/by-id/CALDaNm2Fe=g4Tx-DhzwD6NU0VRAfaPedXwWO01maNU7_OfS8fw@mail.gmail.com, except he was using enums instead of #define for
char. Maybe discuss with Vignesh (and either he should change or you
should change) so there is a consistent code style for the options.
======
79. src/include/replication/logicalproto.h - old extern
@@ -243,8 +243,10 @@ extern TransactionId
logicalrep_read_stream_start(StringInfo in,
extern void logicalrep_write_stream_stop(StringInfo out);
extern void logicalrep_write_stream_commit(StringInfo out,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
-extern TransactionId logicalrep_read_stream_commit(StringInfo out,
+extern TransactionId logicalrep_read_stream_commit_old(StringInfo out,
LogicalRepCommitData *commit_data);
Is anybody still using this "old" function? Maybe I missed it.
======
80. src/include/replication/logicalworker.h
@@ -13,6 +13,7 @@
#define LOGICALWORKER_H
extern void ApplyWorkerMain(Datum main_arg);
+extern void LogicalApplyBgwMain(Datum main_arg);
The new name seems inconsistent with the old one. What about calling
it ApplyBgworkerMain?
======
81. src/test/regress/expected/subscription.out
Isn't this missing some test cases for the new options added? E.g. I
never see streaming value is set to 's'.
======
82. src/test/subscription/t/029_on_error.pl
If options values were changed how I suggested (review comment #14)
then I think a change such as this would not be necessary because
everything would be backward compatible.
------
[1]: /messages/by-id/CALDaNm2Fe=g4Tx-DhzwD6NU0VRAfaPedXwWO01maNU7_OfS8fw@mail.gmail.com
Kind Regards,
Peter Smith.
Fujitsu Australia
On Friday, April 22, 2022 12:12 PM Peter Smith <smithpb2250@gmail.com> wrote:
Hello Hou-san. Here are my review comments for v4-0001. Sorry, there
are so many of them (it is a big patch); some are trivial, and others
you might easily dismiss due to my misunderstanding of the code. But
hopefully, there are at least some comments that can be helpful in
improving the patch quality.
Thanks for the comments !
I think most of the comments make sense and here are explanations for
some of them.
24. src/backend/replication/logical/launcher.c - ApplyLauncherMain
@@ -869,7 +917,7 @@ ApplyLauncherMain(Datum main_arg)
wait_time = wal_retrieve_retry_interval;logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid); + sub->owner, InvalidOid, DSM_HANDLE_INVALID); } Now that the logicalrep_worker_launch is retuning a bool, should this call be checking the return value and taking appropriate action if it failed?
Not sure we can change the logic of existing caller. I think only the new
caller in the patch is necessary to check this.
26. src/backend/replication/logical/origin.c - acquire code
+ /* + * We allow the apply worker to get the slot which is acquired by its + * leader process. + */ + else if (curstate->acquired_by != 0 && acquire) { ereport(ERROR,I somehow felt that this param would be better called 'skip_acquire',
so all the callers would have to use the opposite boolean and then
this code would say like below (which seemed easier to me). YMMV.else if (curstate->acquired_by != 0 && !skip_acquire)
{
ereport(ERROR,
Not sure about this.
59. src/backend/replication/logical/worker.c - ApplyWorkerMain
@@ -3733,7 +4292,7 @@ ApplyWorkerMain(Datum main_arg)
options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.binary = MySubscription->binary; - options.proto.logical.streaming = MySubscription->stream; + options.proto.logical.streaming = (MySubscription->stream != SUBSTREAM_OFF); options.proto.logical.twophase = false;I was not sure why this is converting from an enum to a boolean? Is it right?
I think it's ok, the "logical.streaming" is used in publisher which don't need
to know the exact type of the streaming(it only need to know whether the
streaming is enabled for now)
63. src/backend/replication/logical/worker.c - ApplyBgwShutdown
+static void +ApplyBgwShutdown(int code, Datum arg) +{ + SpinLockAcquire(&MyParallelState->mutex); + MyParallelState->failed = true; + SpinLockRelease(&MyParallelState->mutex); + + dsm_detach((dsm_segment *) DatumGetPointer(arg)); +}Should this do detach first and set the flag last?
Not sure about this. I think it's fine to detach this at the end.
76. src/backend/replication/logical/worker.c - check_workers_status
+ ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Background worker %u exited unexpectedly", + wstate->pstate->n)));Should that message also give more identifying info about the
*current* worker doing the ERROR - e.g.the one which found this the
other bgworker was failed? Or is that just the PIC in the log message
good enough?
Currently, only the main apply worker should report this error, so not sure do
we need to report the current worker.
77. src/backend/replication/logical/worker.c - check_workers_status
+ if (!AllTablesyncsReady() && nfreeworkers != list_length(ApplyWorkersList)) + {I did not really understand this code, but isn't there a possibility
that it will cause many restarts if the tablesyncs are taking a long
time to complete?
I think it's ok, after restarting, we won't start bgworker until all the table
is READY.
Best regards,
Hou zj
On Fri, Apr 8, 2022 at 2:44 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
On Wednesday, April 6, 2022 1:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
In this email, I would like to discuss allowing streaming logical
transactions (large in-progress transactions) by background workers
and parallel apply in general. The goal of this work is to improve the
performance of the apply work in logical replication.Currently, for large transactions, the publisher sends the data in
multiple streams (changes divided into chunks depending upon
logical_decoding_work_mem), and then on the subscriber-side, the apply
worker writes the changes into temporary files and once it receives
the commit, it read from the file and apply the entire transaction. To
improve the performance of such transactions, we can instead allow
them to be applied via background workers. There could be multiple
ways to achieve this:Approach-1: Assign a new bgworker (if available) as soon as the xact's
first stream came and the main apply worker will send changes to this
new worker via shared memory. We keep this worker assigned till the
transaction commit came and also wait for the worker to finish at
commit. This preserves commit ordering and avoid writing to and
reading from file in most cases. We still need to spill if there is no
worker available. We also need to allow stream_stop to complete by the
background worker to finish it to avoid deadlocks because T-1's
current stream of changes can update rows in conflicting order with
T-2's next stream of changes.Attach the POC patch for the Approach-1 of "Perform streaming logical
transactions by background workers". The patch is still a WIP patch as
there are serval TODO items left, including:* error handling for bgworker
* support for SKIP the transaction in bgworker
* handle the case when there is no more worker available
(might need spill the data to the temp file in this case)
* some potential bugsThe original patch is borrowed from an old thread[1] and was rebased and
extended/cleaned by me. Comments and suggestions are welcome.[1] /messages/by-id/8eda5118-2dd0-79a1-4fe9-eec7e334de17@postgrespro.ru
Here are some performance results of the patch shared by Shi Yu off-list.
The performance was tested by varying
logical_decoding_work_mem, which include two cases:1) bulk insert.
2) create savepoint and rollback to savepoint.I used synchronous logical replication in the test, compared SQL execution
times before and after applying the patch.The results are as follows. The bar charts and the details of the test are
Attached as well.RESULT - bulk insert (5kk)
----------------------------------
logical_decoding_work_mem 64kB 128kB 256kB 512kB 1MB 2MB 4MB 8MB 16MB 32MB 64MB
HEAD 51.673 51.199 51.166 50.259 52.898 50.651 51.156 51.210 50.678 51.256 51.138
patched 36.198 35.123 34.223 29.198 28.712 29.090 29.709 29.408 34.367 34.716 35.439RESULT - rollback to savepoint (600k)
----------------------------------
logical_decoding_work_mem 64kB 128kB 256kB 512kB 1MB 2MB 4MB 8MB 16MB 32MB 64MB
HEAD 31.101 31.087 30.931 31.015 30.920 31.109 30.863 31.008 30.875 30.775 29.903
patched 28.115 28.487 27.804 28.175 27.734 29.047 28.279 27.909 28.277 27.345 28.375Summary:
1) bulk insertFor different logical_decoding_work_mem size, it takes about 30% ~ 45% less
time, which looks good to me. After applying this patch, it seems that the
performance is better when logical_decoding_work_mem is between 512kB and 8MB.2) rollback to savepoint
There is an improvement of about 5% ~ 10% after applying this patch.
In this case, the patch spend less time handling the part that is not
rolled back, because it saves the time writing the changes into a temporary file
and reading the file. And for the part that is rolled back, it would spend more
time than HEAD, because it takes more time to write to filesystem and rollback
than writing a temporary file and truncating the file. Overall, the results looks
good.
One comment on the design:
We should have a strategy to release the workers which have completed
applying the transactions, else even though there are some idle
workers for one of the subscriptions, it cannot be used by other
subscriptions.
Like in the following case:
Let's say max_logical_replication_workers is set to 10, if
subscription sub_1 uses all the 10 workers to apply the transactions
and all the 10 workers have finished applying the transactions and
then subscription sub_2 requests some workers for applying
transactions, subscription sub_2 will not get any workers.
Maybe if the workers have completed applying the transactions,
subscription sub_2 should be able to get these workers in this case.
Regards,
Vignesh
On Monday, April 25, 2022 4:35 PM houzj.fnst@fujitsu.com <houzj.fnst@fujitsu.com> wrote:
On Friday, April 22, 2022 12:12 PM Peter Smith <smithpb2250@gmail.com>
wrote:Hello Hou-san. Here are my review comments for v4-0001. Sorry, there
are so many of them (it is a big patch); some are trivial, and others
you might easily dismiss due to my misunderstanding of the code. But
hopefully, there are at least some comments that can be helpful in
improving the patch quality.Thanks for the comments !
I think most of the comments make sense and here are explanations for some
of them.
Hi,
I addressed the rest of Peter's comments and here is a new version patch.
The naming of the newly introduced option and worker might
need more thought, so I haven't change all of them. I will think over
and change it later.
One comment I didn't address:
3. General comment - bool option change to enum
This option change for "streaming" is similar to the options change
for "copy_data=force" that Vignesh is doing for his "infinite
recursion" patch v9-0002 [1]. Yet they seem implemented differently
(i.e. char versus enum). I think you should discuss the 2 approaches
with Vignesh and then code these option changes in a consistent way.[1] /messages/by-id/CALDaNm2Fe=g4Tx-DhzwD6NU0VRAfaPedXwWO01maNU7_OfS8fw@mail.gmail.> com
I think the "streaming" option is a bit different from the "copy_data" option.
Because the "streaming" is a column of the system table (pg_subscription) which
should use "char" type to represent different values in this case(For example:
pg_class.relkind/pg_class.relpersistence/pg_class.relreplident ...).
And the "copy_data" option is not a system table column and I think it's fine
to use Enum for it.
Best regards,
Hou zj
Attachments:
v5-0001-Perform-streaming-logical-transactions-by-background.patchapplication/octet-stream; name=v5-0001-Perform-streaming-logical-transactions-by-background.patchDownload+1506-213
On Fri, Apr 29, 2022 10:07 AM Hou, Zhijie/侯 志杰 <houzj.fnst@fujitsu.com> wrote:
I addressed the rest of Peter's comments and here is a new version patch.
Thanks for your patch.
The patch modified streaming option in logical replication, it can be set to
'on', 'off' and 'apply'. The new option 'apply' haven't been tested in the tap test.
Attach a patch which modified the subscription tap test to cover both 'on' and
'apply' option. (The main patch is also attached to make cfbot happy.)
Besides, I noticed that for two-phase commit transactions, if the transaction is
prepared by a background worker, the background worker would be asked to handle
the message about commit/rollback this transaction. Is it possible that the
messages about commit/rollback prepared transaction are handled by apply worker
directly?
Regards,
Shi yu
Attachments:
v5-0002-Test-streaming-apply-option-in-tap-test.patchapplication/octet-stream; name=v5-0002-Test-streaming-apply-option-in-tap-test.patchDownload+740-577
v5-0001-Perform-streaming-logical-transactions-by-background.patchapplication/octet-stream; name=v5-0001-Perform-streaming-logical-transactions-by-background.patchDownload+1506-213
On Fri, Apr 8, 2022 at 6:14 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
On Wednesday, April 6, 2022 1:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
In this email, I would like to discuss allowing streaming logical
transactions (large in-progress transactions) by background workers
and parallel apply in general. The goal of this work is to improve the
performance of the apply work in logical replication.Currently, for large transactions, the publisher sends the data in
multiple streams (changes divided into chunks depending upon
logical_decoding_work_mem), and then on the subscriber-side, the apply
worker writes the changes into temporary files and once it receives
the commit, it read from the file and apply the entire transaction. To
improve the performance of such transactions, we can instead allow
them to be applied via background workers. There could be multiple
ways to achieve this:Approach-1: Assign a new bgworker (if available) as soon as the xact's
first stream came and the main apply worker will send changes to this
new worker via shared memory. We keep this worker assigned till the
transaction commit came and also wait for the worker to finish at
commit. This preserves commit ordering and avoid writing to and
reading from file in most cases. We still need to spill if there is no
worker available. We also need to allow stream_stop to complete by the
background worker to finish it to avoid deadlocks because T-1's
current stream of changes can update rows in conflicting order with
T-2's next stream of changes.Attach the POC patch for the Approach-1 of "Perform streaming logical
transactions by background workers". The patch is still a WIP patch as
there are serval TODO items left, including:* error handling for bgworker
* support for SKIP the transaction in bgworker
* handle the case when there is no more worker available
(might need spill the data to the temp file in this case)
* some potential bugs
Are you planning to support "Transaction dependency" Amit mentioned in
his first mail in this patch? IIUC since the background apply worker
applies the streamed changes as soon as receiving them from the main
apply worker, a conflict that doesn't happen in the current streaming
logical replication could happen.
Regards,
--
Masahiko Sawada
EDB: https://www.enterprisedb.com/
On Mon, May 2, 2022 at 11:47 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Fri, Apr 8, 2022 at 6:14 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:On Wednesday, April 6, 2022 1:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
In this email, I would like to discuss allowing streaming logical
transactions (large in-progress transactions) by background workers
and parallel apply in general. The goal of this work is to improve the
performance of the apply work in logical replication.Currently, for large transactions, the publisher sends the data in
multiple streams (changes divided into chunks depending upon
logical_decoding_work_mem), and then on the subscriber-side, the apply
worker writes the changes into temporary files and once it receives
the commit, it read from the file and apply the entire transaction. To
improve the performance of such transactions, we can instead allow
them to be applied via background workers. There could be multiple
ways to achieve this:Approach-1: Assign a new bgworker (if available) as soon as the xact's
first stream came and the main apply worker will send changes to this
new worker via shared memory. We keep this worker assigned till the
transaction commit came and also wait for the worker to finish at
commit. This preserves commit ordering and avoid writing to and
reading from file in most cases. We still need to spill if there is no
worker available. We also need to allow stream_stop to complete by the
background worker to finish it to avoid deadlocks because T-1's
current stream of changes can update rows in conflicting order with
T-2's next stream of changes.Attach the POC patch for the Approach-1 of "Perform streaming logical
transactions by background workers". The patch is still a WIP patch as
there are serval TODO items left, including:* error handling for bgworker
* support for SKIP the transaction in bgworker
* handle the case when there is no more worker available
(might need spill the data to the temp file in this case)
* some potential bugsAre you planning to support "Transaction dependency" Amit mentioned in
his first mail in this patch? IIUC since the background apply worker
applies the streamed changes as soon as receiving them from the main
apply worker, a conflict that doesn't happen in the current streaming
logical replication could happen.
This patch seems to be waiting for stream_stop to finish, so I don't
see how the issues related to "Transaction dependency" can arise? What
type of conflict/issues you have in mind?
--
With Regards,
Amit Kapila.
On Mon, May 2, 2022 at 6:09 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, May 2, 2022 at 11:47 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Fri, Apr 8, 2022 at 6:14 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:On Wednesday, April 6, 2022 1:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
In this email, I would like to discuss allowing streaming logical
transactions (large in-progress transactions) by background workers
and parallel apply in general. The goal of this work is to improve the
performance of the apply work in logical replication.Currently, for large transactions, the publisher sends the data in
multiple streams (changes divided into chunks depending upon
logical_decoding_work_mem), and then on the subscriber-side, the apply
worker writes the changes into temporary files and once it receives
the commit, it read from the file and apply the entire transaction. To
improve the performance of such transactions, we can instead allow
them to be applied via background workers. There could be multiple
ways to achieve this:Approach-1: Assign a new bgworker (if available) as soon as the xact's
first stream came and the main apply worker will send changes to this
new worker via shared memory. We keep this worker assigned till the
transaction commit came and also wait for the worker to finish at
commit. This preserves commit ordering and avoid writing to and
reading from file in most cases. We still need to spill if there is no
worker available. We also need to allow stream_stop to complete by the
background worker to finish it to avoid deadlocks because T-1's
current stream of changes can update rows in conflicting order with
T-2's next stream of changes.Attach the POC patch for the Approach-1 of "Perform streaming logical
transactions by background workers". The patch is still a WIP patch as
there are serval TODO items left, including:* error handling for bgworker
* support for SKIP the transaction in bgworker
* handle the case when there is no more worker available
(might need spill the data to the temp file in this case)
* some potential bugsAre you planning to support "Transaction dependency" Amit mentioned in
his first mail in this patch? IIUC since the background apply worker
applies the streamed changes as soon as receiving them from the main
apply worker, a conflict that doesn't happen in the current streaming
logical replication could happen.This patch seems to be waiting for stream_stop to finish, so I don't
see how the issues related to "Transaction dependency" can arise? What
type of conflict/issues you have in mind?
Suppose we set both publisher and subscriber:
On publisher:
create table test (i int);
insert into test values (0);
create publication test_pub for table test;
On subscriber:
create table test (i int primary key);
create subscription test_sub connection '...' publication test_pub; --
value 0 is replicated via initial sync
Now, both 'test' tables have value 0.
And suppose two concurrent transactions are executed on the publisher
in following order:
TX-1:
begin;
insert into test select generate_series(0, 10000); -- changes will be streamed;
TX-2:
begin;
delete from test where c = 0;
commit;
TX-1:
commit;
With the current streaming logical replication, these changes will be
applied successfully since the deletion is applied before the
(streamed) insertion. Whereas with the apply bgworker, it fails due to
an unique constraint violation since the insertion is applied first.
I've confirmed that it happens with v5 patch.
Regards,
--
Masahiko Sawada
EDB: https://www.enterprisedb.com/
On Mon, May 2, 2022 at 5:06 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Mon, May 2, 2022 at 6:09 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, May 2, 2022 at 11:47 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
Are you planning to support "Transaction dependency" Amit mentioned in
his first mail in this patch? IIUC since the background apply worker
applies the streamed changes as soon as receiving them from the main
apply worker, a conflict that doesn't happen in the current streaming
logical replication could happen.This patch seems to be waiting for stream_stop to finish, so I don't
see how the issues related to "Transaction dependency" can arise? What
type of conflict/issues you have in mind?Suppose we set both publisher and subscriber:
On publisher:
create table test (i int);
insert into test values (0);
create publication test_pub for table test;On subscriber:
create table test (i int primary key);
create subscription test_sub connection '...' publication test_pub; --
value 0 is replicated via initial syncNow, both 'test' tables have value 0.
And suppose two concurrent transactions are executed on the publisher
in following order:TX-1:
begin;
insert into test select generate_series(0, 10000); -- changes will be streamed;TX-2:
begin;
delete from test where c = 0;
commit;TX-1:
commit;With the current streaming logical replication, these changes will be
applied successfully since the deletion is applied before the
(streamed) insertion. Whereas with the apply bgworker, it fails due to
an unique constraint violation since the insertion is applied first.
I've confirmed that it happens with v5 patch.
Good point but I am not completely sure if doing transaction
dependency tracking for such cases is really worth it. I feel for such
concurrent cases users can anyway now also get conflicts, it is just a
matter of timing. One more thing to check transaction dependency, we
might need to spill the data for streaming transactions in which case
we might lose all the benefits of doing it via a background worker. Do
we see any simple way to avoid this?
--
With Regards,
Amit Kapila.
On Tue, May 3, 2022 at 2:15 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, May 2, 2022 at 5:06 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Mon, May 2, 2022 at 6:09 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, May 2, 2022 at 11:47 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
Are you planning to support "Transaction dependency" Amit mentioned in
his first mail in this patch? IIUC since the background apply worker
applies the streamed changes as soon as receiving them from the main
apply worker, a conflict that doesn't happen in the current streaming
logical replication could happen.This patch seems to be waiting for stream_stop to finish, so I don't
see how the issues related to "Transaction dependency" can arise? What
type of conflict/issues you have in mind?Suppose we set both publisher and subscriber:
On publisher:
create table test (i int);
insert into test values (0);
create publication test_pub for table test;On subscriber:
create table test (i int primary key);
create subscription test_sub connection '...' publication test_pub; --
value 0 is replicated via initial syncNow, both 'test' tables have value 0.
And suppose two concurrent transactions are executed on the publisher
in following order:TX-1:
begin;
insert into test select generate_series(0, 10000); -- changes will be streamed;TX-2:
begin;
delete from test where c = 0;
commit;TX-1:
commit;With the current streaming logical replication, these changes will be
applied successfully since the deletion is applied before the
(streamed) insertion. Whereas with the apply bgworker, it fails due to
an unique constraint violation since the insertion is applied first.
I've confirmed that it happens with v5 patch.Good point but I am not completely sure if doing transaction
dependency tracking for such cases is really worth it. I feel for such
concurrent cases users can anyway now also get conflicts, it is just a
matter of timing. One more thing to check transaction dependency, we
might need to spill the data for streaming transactions in which case
we might lose all the benefits of doing it via a background worker. Do
we see any simple way to avoid this?
Avoiding unexpected differences like this is why I suggested the
option should have to be explicitly enabled instead of being on by
default as it is in the current patch. See my review comment #14 [1]/messages/by-id/CAHut+PuqYP5eD5wcSCtk=a6KuMjat2UCzqyGoE7sieCaBsVskQ@mail.gmail.com.
It means the user won't have to change their existing code as a
workaround.
------
[1]: /messages/by-id/CAHut+PuqYP5eD5wcSCtk=a6KuMjat2UCzqyGoE7sieCaBsVskQ@mail.gmail.com
Kind Regards,
Peter Smith.
Fujitsu Australia
On Tue, May 3, 2022 at 5:16 PM Peter Smith <smithpb2250@gmail.com> wrote:
...
Avoiding unexpected differences like this is why I suggested the
option should have to be explicitly enabled instead of being on by
default as it is in the current patch. See my review comment #14 [1].
It means the user won't have to change their existing code as a
workaround.------
[1] /messages/by-id/CAHut+PuqYP5eD5wcSCtk=a6KuMjat2UCzqyGoE7sieCaBsVskQ@mail.gmail.com
Sorry I was wrong above. It seems this behaviour was already changed
in the latest patch v5 so now the option value 'on' means what it
always did. Thanks!
------
Kind Regards,
Peter Smith.
Fujitsu Australia
On Tue, May 3, 2022 at 9:45 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, May 2, 2022 at 5:06 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Mon, May 2, 2022 at 6:09 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, May 2, 2022 at 11:47 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
Are you planning to support "Transaction dependency" Amit mentioned in
his first mail in this patch? IIUC since the background apply worker
applies the streamed changes as soon as receiving them from the main
apply worker, a conflict that doesn't happen in the current streaming
logical replication could happen.This patch seems to be waiting for stream_stop to finish, so I don't
see how the issues related to "Transaction dependency" can arise? What
type of conflict/issues you have in mind?Suppose we set both publisher and subscriber:
On publisher:
create table test (i int);
insert into test values (0);
create publication test_pub for table test;On subscriber:
create table test (i int primary key);
create subscription test_sub connection '...' publication test_pub; --
value 0 is replicated via initial syncNow, both 'test' tables have value 0.
And suppose two concurrent transactions are executed on the publisher
in following order:TX-1:
begin;
insert into test select generate_series(0, 10000); -- changes will be streamed;TX-2:
begin;
delete from test where c = 0;
commit;TX-1:
commit;With the current streaming logical replication, these changes will be
applied successfully since the deletion is applied before the
(streamed) insertion. Whereas with the apply bgworker, it fails due to
an unique constraint violation since the insertion is applied first.
I've confirmed that it happens with v5 patch.Good point but I am not completely sure if doing transaction
dependency tracking for such cases is really worth it. I feel for such
concurrent cases users can anyway now also get conflicts, it is just a
matter of timing. One more thing to check transaction dependency, we
might need to spill the data for streaming transactions in which case
we might lose all the benefits of doing it via a background worker. Do
we see any simple way to avoid this?
I think the other kind of problem that can happen here is delete
followed by an insert. If in the example provided by you, TX-1
performs delete (say it is large enough to cause streaming) and TX-2
performs insert then I think it will block the apply worker because
insert will start waiting infinitely. Currently, I think it will lead
to conflict due to insert but that is still solvable by allowing users
to remove conflicting rows.
It seems both these problems are due to the reason that the table on
publisher and subscriber has different constraints otherwise, we would
have seen the same behavior on the publisher as well.
There could be a few ways to avoid these and similar problems:
a. detect the difference in constraints between publisher and
subscribers like primary key and probably others (like whether there
is any volatile function present in index expression) when applying
the change and then we give ERROR to the user that she must change the
streaming mode to 'spill' instead of 'apply' (aka parallel apply).
b. Same as (a) but instead of ERROR just LOG this information and
change the mode to spill for the transactions that operate on that
particular relation.
I think we can cache this information in LogicalRepRelMapEntry.
Thoughts?
--
With Regards,
Amit Kapila.
Here are my review comments for v5-0001.
I will take a look at the v5-0002 (TAP) patch another time.
======
1. Commit message
The message still refers to "apply background". Should that say "apply
background worker"?
Other parts just call this the "worker". Personally, I think it might
be better to coin some new term for this thing (e.g. "apply-bgworker"
or something like that of your choosing) so then you can just
concisely *always* refer to that everywhere without any ambiguity. e.g
same applies to every comment and every message in this patch. They
should all use identical terminology (e.g. "apply-bgworker").
~~~
2. Commit message
"We also need to allow stream_stop to complete by the apply background
to finish it to..."
Wording: ???
~~~
3. Commit message
This patch also extends the subscription streaming option so that user
can control whether apply the streaming transaction in a apply
background or spill the change to disk.
Wording: "user" -> "the user"
Typo: "whether apply" -> "whether to apply"
Typo: "a apply" -> "an apply"
~~~
4. Commit message
User can set the streaming option to 'on/off', 'apply'. For now,
'apply' means the streaming will be applied via a apply background if
available. 'on' means the streaming transaction will be spilled to
disk.
I think "apply" might not be the best choice of values for this
meaning, but I think Hou-san already said [1]/messages/by-id/OS0PR01MB5716E8D536552467EFB512EF94FC9@OS0PR01MB5716.jpnprd01.prod.outlook.com [PSv4] /messages/by-id/CAHut+PuqYP5eD5wcSCtk=a6KuMjat2UCzqyGoE7sieCaBsVskQ@mail.gmail.com that this was being
reconsidered.
~~~
5. doc/src/sgml/catalogs.sgml - formatting
@@ -7863,11 +7863,15 @@ SCRAM-SHA-256$<replaceable><iteration
count></replaceable>:<replaceable>&l
<row>
<entry role="catalog_table_entry"><para role="column_definition">
- <structfield>substream</structfield> <type>bool</type>
+ <structfield>substream</structfield> <type>char</type>
</para>
<para>
- If true, the subscription will allow streaming of in-progress
- transactions
+ Controls how to handle the streaming of in-progress transactions.
+ <literal>f</literal> = disallow streaming of in-progress transactions
+ <literal>o</literal> = spill the changes of in-progress transactions to
+ disk and apply at once after the transaction is committed on the
+ publisher.
+ <literal>a</literal> = apply changes directly using a background worker
</para></entry>
</row>
Needs to be consistent with other value lists on this page.
5a. The first sentence to end with ":"
5b. List items to end with ","
~~~
6. doc/src/sgml/ref/create_subscription.sgml
+ <para>
+ If set to <literal>apply</literal> incoming
+ changes are directly applied via one of the background worker, if
+ available. If no background worker is free to handle streaming
+ transaction then the changes are written to a file and applied after
+ the transaction is committed. Note that if error happen when applying
+ changes in background worker, it might not report the finish LSN of
+ the remote transaction in server log.
</para>
6a. Typo: "one of the background worker," -> "one of the background workers,"
6b. Wording
BEFORE
Note that if error happen when applying changes in background worker,
it might not report the finish LSN of the remote transaction in server
log.
SUGGESTION
Note that if an error happens when applying changes in a background
worker, it might not report the finish LSN of the remote transaction
in the server log.
~~~
7. src/backend/commands/subscriptioncmds.c - defGetStreamingMode
+static char
+defGetStreamingMode(DefElem *def)
+{
+ /*
+ * If no parameter given, assume "true" is meant.
+ */
+ if (def->arg == NULL)
+ return SUBSTREAM_ON;
But is that right? IIUC all the docs said that the default is OFF.
~~~
8. src/backend/commands/subscriptioncmds.c - defGetStreamingMode
+ /*
+ * The set of strings accepted here should match up with the
+ * grammar's opt_boolean_or_string production.
+ */
+ if (pg_strcasecmp(sval, "true") == 0 ||
+ pg_strcasecmp(sval, "on") == 0)
+ return SUBSTREAM_ON;
+ if (pg_strcasecmp(sval, "apply") == 0)
+ return SUBSTREAM_APPLY;
+ if (pg_strcasecmp(sval, "false") == 0 ||
+ pg_strcasecmp(sval, "off") == 0)
+ return SUBSTREAM_OFF;
Perhaps should re-order these OFF/ON/APPLY to be consistent with the
T_Integer case above here.
~~~
9. src/backend/replication/logical/launcher.c - logicalrep_worker_launch
The "start new apply background worker ..." function comment feels a
bit misleading now that seems what you are calling this new kind of
worker. E.g. this is also called to start the sync worker. And also
for the apply worker (which we are not really calling a "background
worker" in other places). This comment is the same as [PSv4] #19.
~~~
10. src/backend/replication/logical/launcher.c - logicalrep_worker_launch
@@ -275,6 +280,9 @@ logicalrep_worker_launch(Oid dbid, Oid subid,
const char *subname, Oid userid,
int nsyncworkers;
TimestampTz now;
+ /* We don't support table sync in subworker */
+ Assert(!((subworker_dsm != DSM_HANDLE_INVALID) && OidIsValid(relid)));
I think you should declare a new variable like:
bool is_subworker = subworker_dsm != DSM_HANDLE_INVALID;
Then this Assert can be simplified, and also you can re-use the
'is_subworker' later multiple times in this same function to simplify
lots of other code also.
~~~
11. src/backend/replication/logical/launcher.c - logicalrep_worker_stop_internal
+/*
+ * Workhorse for logicalrep_worker_stop() and logicalrep_worker_detach(). Stop
+ * the worker and wait for wait for it to die.
+ */
+static void
+logicalrep_worker_stop_internal(LogicalRepWorker *worker)
Typo: "wait for" is repeated 2x.
~~~
12. src/backend/replication/logical/origin.c - replorigin_session_setup
@@ -1110,7 +1110,11 @@ replorigin_session_setup(RepOriginId node)
if (curstate->roident != node)
continue;
- else if (curstate->acquired_by != 0)
+ /*
+ * We allow the apply worker to get the slot which is acquired by its
+ * leader process.
+ */
+ else if (curstate->acquired_by != 0 && acquire)
I still feel this is overly-cofusing. Shouldn't comment say "Allow the
apply bgworker to get the slot...".
Also the parameter name 'acquire' is hard to reconcile with the
comment. E.g. I feel all this would be easier to understand if the
param was was refactored with a name like 'bgworker' and the code was
changed to:
else if (curstate->acquired_by != 0 && !bgworker)
Of course, the value true/false would need to be flipped on calls too.
This is the same as my previous comment [PSv4] #26.
~~~
13. src/backend/replication/logical/proto.c
@@ -1138,14 +1138,11 @@ logicalrep_write_stream_commit(StringInfo out,
ReorderBufferTXN *txn,
/*
* Read STREAM COMMIT from the output stream.
*/
-TransactionId
+void
logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
{
- TransactionId xid;
uint8 flags;
- xid = pq_getmsgint(in, 4);
-
/* read flags (unused for now) */
flags = pq_getmsgbyte(in);
There is something incompatible with the read/write functions here.
The write writes the txid before the flags, but the read_commit does
not read it at all – if only reads the flags (???) if this is really
correct then I think there need to be some comments to explain WHY it
is correct.
NOTE: See also review comment 28 where I proposed another way to write
this code.
~~~
14. src/backend/replication/logical/worker.c - comment
The whole comment is similar to the commit message so any changes
there should be made here also.
~~~
15. src/backend/replication/logical/worker.c - ParallelState
+/*
+ * Shared information among apply workers.
+ */
+typedef struct ParallelState
It looks like there is already another typedef called "ParallelState"
because it is already in the typedefs.list. Maybe this name should be
changed or maybe make it static or something?
~~~
16. src/backend/replication/logical/worker.c - defines
+/*
+ * States for apply background worker.
+ */
+#define APPLY_BGWORKER_ATTACHED 'a'
+#define APPLY_BGWORKER_READY 'r'
+#define APPLY_BGWORKER_BUSY 'b'
+#define APPLY_BGWORKER_FINISHED 'f'
+#define APPLY_BGWORKER_EXIT 'e'
Those char states all look independent. So wouldn’t this be
represented better as an enum to reinforce that fact?
~~~
17. src/backend/replication/logical/worker.c - functions
+/* Worker setup and interactions */
+static WorkerState *apply_bgworker_setup(void);
+static WorkerState *find_or_start_apply_bgworker(TransactionId xid,
+ bool start);
Maybe rename to apply_bgworker_find_or_start() to match the pattern of
the others?
~~~
18. src/backend/replication/logical/worker.c - macros
+#define am_apply_bgworker() (MyLogicalRepWorker->subworker)
+#define applying_changes_in_bgworker() (in_streamed_transaction &&
stream_apply_worker != NULL)
18a. Somehow I felt these are not in the best place.
- Maybe am_apply_bgworker() should be in worker_internal.h?
- Maybe the applying_changes_in_bgworker() should be nearby the
stream_apply_worker declaration
18b. Maybe applying_changes_in_bgworker should be renamed to something
else to match the pattern of the others (e.g. "apply_bgworker_active"
or something)
~~~
19. src/backend/replication/logical/worker.c - handle_streamed_transaction
+ /*
+ * If we decided to apply the changes of this transaction in a apply
+ * background worker, pass the data to the worker.
+ */
Typo: "in a apply" -> "in an apply"
~~~
20. src/backend/replication/logical/worker.c - handle_streamed_transaction
+ /*
+ * XXX The publisher side doesn't always send relation update message
+ * after the streaming transaction, so update the relation in main
+ * apply worker here.
+ */
Wording: "doesn't always send relation update message" -> "doesn't
always send relation update messages" ??
~~~
21. src/backend/replication/logical/worker.c - apply_handle_commit_prepared
+ apply_bgworker_set_state(APPLY_BGWORKER_FINISHED);
It seems somewhat confusing to see calls to apply_bgworker_set_state()
when we may or may not even be an apply bgworker.
I know it adds more code, but I somehow feel it is more readable if
all these calls were changed to look below. Please consider it.
SUGGESTION
if (am_bgworker())
apply_bgworker_set_state(XXX);
Then you can also change the apply_bgworker_set_state to
Assert(am_apply_bgworker());
~~~
22. src/backend/replication/logical/worker.c - find_or_start_apply_bgworker
+
+ if (!start && ApplyWorkersHash == NULL)
+ return NULL;
+
IIUC maybe this extra check is not really necessary. I see no harm to
create the HashTable even if was called in this state. If the 'start'
flag is false then nothing is going to be found anyway, so it will
return NULL. e.g. Might as well make the code a few lines
shorter/simpler by removing this check.
~~~
23. src/backend/replication/logical/worker.c - apply_bgworker_free
+/*
+ * Add the worker to the freelist and remove the entry from hash table.
+ */
+static void
+apply_bgworker_free(WorkerState *wstate)
+{
+ bool found;
+ MemoryContext oldctx;
+ TransactionId xid = wstate->pstate->stream_xid;
If you are not going to check the value of 'found' then why bother to
pass this param at all? Can't you just pass NULL?
~~~
24. src/backend/replication/logical/worker.c - apply_bgworker_free
Should there be an Assert that the bgworker state really was FINISHED?
I think I asked this already [PSv4] #48.
~~~
24. src/backend/replication/logical/worker.c - apply_handle_stream_start
@@ -1088,24 +1416,71 @@ apply_handle_stream_prepare(StringInfo s)
logicalrep_read_stream_prepare(s, &prepare_data);
set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
- elog(DEBUG1, "received prepare for streamed transaction %u",
prepare_data.xid);
+ /*
+ * If we are in a bgworker, just prepare the transaction.
+ */
+ if (am_apply_bgworker())
Don’t need to say "If we are..." because the am_apply_worker()
condition makes it clear this is true.
~~~
25. src/backend/replication/logical/worker.c - apply_handle_stream_start
- if (MyLogicalRepWorker->stream_fileset == NULL)
+ stream_apply_worker = find_or_start_apply_bgworker(stream_xid, first_segment);
+
+ if (applying_changes_in_bgworker())
{
IIUC this condition seems overkill. I think you can just say if
(stream_apply_worker)
~~~
26. src/backend/replication/logical/worker.c - apply_handle_stream_abort
+ if (found)
+ {
+ elog(LOG, "rolled back to savepoint %s", spname);
+ RollbackToSavepoint(spname);
+ CommitTransactionCommand();
+ subxactlist = list_truncate(subxactlist, i + 1);
+ }
Should that elog use the "[Apply BGW #%u]" format like the others for BGW?
~~~
27. src/backend/replication/logical/worker.c - apply_handle_stream_abort
Should this function be setting stream_apply_worker = NULL somewhere
when all is done?
~~~
28. src/backend/replication/logical/worker.c - apply_handle_stream_commit
+/*
+ * Handle STREAM COMMIT message.
+ */
+static void
+apply_handle_stream_commit(StringInfo s)
+{
+ LogicalRepCommitData commit_data;
+ TransactionId xid;
+
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM COMMIT message without STREAM STOP")));
+
+ xid = pq_getmsgint(s, 4);
+ logicalrep_read_stream_commit(s, &commit_data);
+ set_apply_error_context_xact(xid, commit_data.commit_lsn);
There is something a bit odd about this code. I think the
logicalrep_read_stream_commit() should take another param and the Txid
be extracted/read only INSIDE that logicalrep_read_stream_commit
function. See also review comment #13.
~~~
29. src/backend/replication/logical/worker.c - apply_handle_stream_commit
I am unsure, but should something be setting the stream_apply_worker =
NULL somewhere when all is done?
~~~
30. src/backend/replication/logical/worker.c - LogicalApplyBgwLoop
30a.
+ if (shmq_res != SHM_MQ_SUCCESS)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("lost connection to the main apply worker")));
30b.
+ default:
+ elog(ERROR, "unexpected message");
+ break;
Should both those error messages have the "[Apply BGW #%u]" prefix
like the other BGW messages?
~~~
31. src/backend/replication/logical/worker.c - ApplyBgwShutdown
+/*
+ * Set the failed flag so that the main apply worker can realize we have
+ * shutdown.
+ */
+static void
+ApplyBgwShutdown(int code, Datum arg)
The comment does not seem to be in sync with the code. E.g.
Wording: "failed flag" -> "exit state" ??
~~~
32. src/backend/replication/logical/worker.c - ApplyBgwShutdown
+/*
+ * Set the failed flag so that the main apply worker can realize we have
+ * shutdown.
+ */
+static void
+ApplyBgwShutdown(int code, Datum arg)
If the 'code' param is deliberately unused it might be better to say
so in the comment...
~~~
33. src/backend/replication/logical/worker.c - LogicalApplyBgwMain
33a.
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("unable to map dynamic shared memory segment")));
33b.
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("bad magic number in dynamic shared memory segment")));
+
33c.
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription %u will not "
+ "start because the subscription was removed during startup",
+ MyLogicalRepWorker->subid)));
Should all these messages have "[Apply BGW ?]" prefix even though they
are not yet attached?
~~~
34. src/backend/replication/logical/worker.c - setup_dsm
+ * We need one key to register the location of the header, and we need
+ * nworkers keys to track the locations of the message queues.
+ */
This comment about 'nworkers' seems stale because that variable no
longer exists.
~~~
35. src/backend/replication/logical/worker.c - apply_bgworker_setup
+/*
+ * Start apply worker background worker process and allocat shared memory for
+ * it.
+ */
+static WorkerState *
+apply_bgworker_setup(void)
typo: "allocat" -> "allocate"
~~~
36. src/backend/replication/logical/worker.c - apply_bgworker_setup
+ elog(LOG, "setting up apply worker #%u", list_length(ApplyWorkersList) + 1)
Should this message have the standard "[Apply BGW %u]" pattern?
~~~
37. src/backend/replication/logical/worker.c - apply_bgworker_setup
+ if (launched)
+ {
+ /* Wait for worker to become ready. */
+ apply_bgworker_wait_for(wstate, APPLY_BGWORKER_ATTACHED);
+
+ ApplyWorkersList = lappend(ApplyWorkersList, wstate);
+ }
Since there is a state APPLY_BGWORKER_READY I think either this
comment is wrong or this passed parameter ATTACHED must be wrong.
~~~
38. src/backend/replication/logical/worker.c - apply_bgworker_send_data
+ if (result != SHM_MQ_SUCCESS)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not send tuples to shared-memory queue")));
+}
Wording: Is it right to ocall these "tuples" or better just say
"data"? I am not sure. Already asked this in [PSv4] #68
~~~
39. src/backend/replication/logical/worker.c - apply_bgworker_wait_for
+/*
+ * Wait until the state of apply background worker reach the 'wait_for_state'
+ */
+static void
+apply_bgworker_wait_for(WorkerState *wstate, char wait_for_state)
typo: "reach" -> "reaches"
~~~
40. src/backend/replication/logical/worker.c - apply_bgworker_wait_for
+ /* If the worker is ready, we have succeeded. */
+ SpinLockAcquire(&wstate->pstate->mutex);
+ status = wstate->pstate->state;
+ SpinLockRelease(&wstate->pstate->mutex);
+
+ if (status == wait_for_state)
+ break;
40a. What does this mention "ready". This function might be waiting
for a different state to that.
40b. Anyway, I think this comment should be a few lines lower, above
the if (status == wait_for_state)
~~~
41. src/backend/replication/logical/worker.c - apply_bgworker_wait_for
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("Background worker %u failed to apply transaction %u",
+ wstate->pstate->n, wstate->pstate->stream_xid)));
Should this message have the standard "[Apply BGW %u]" pattern?
~~~
42. src/backend/replication/logical/worker.c - check_workers_status
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("Background worker %u exited unexpectedly",
+ wstate->pstate->n)));
Should this message have the standard "[Apply BGW %u]" pattern? Or if
this is just from Apply worker maybe it should be clearer like "Apply
worker detected apply bgworker %u exited unexpectedly".
~~~
43. src/backend/replication/logical/worker.c - check_workers_status
+ ereport(LOG,
+ (errmsg("logical replication apply workers for subscription \"%s\"
will restart",
+ MySubscription->name),
+ errdetail("Cannot start table synchronization while bgworkers are "
+ "handling streamed replication transaction")));
I am not sure, but isn't the message backwards? e.g. Should it say more like:
"Cannot handle streamed transactions using bgworkers while table
synchronization is still in progress".
~~~
44. src/backend/replication/logical/worker.c - apply_bgworker_set_state
+ elog(LOG, "[Apply BGW #%u] set state to %c",
+ MyParallelState->n, state);
The line wrapping seemed overkill here.
~~~
45. src/backend/utils/activity/wait_event.c
@@ -388,6 +388,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT:
event_name = "HashGrowBucketsReinsert";
break;
+ case WAIT_EVENT_LOGICAL_APPLY_WORKER_READY:
+ event_name = "LogicalApplyWorkerReady";
+ break;
I am not sure this is the best name for this event since the only
place it is used (in apply_bgworker_wait_for) is not only waiting for
READY state. Maybe a name like WAIT_EVENT_LOGICAL_APPLY_BGWORKER or
WAIT_EVENT_LOGICAL_APPLY_WORKER_SYNC would be more appropriate? Need
to change the wait_event.h also.
~~~
46. src/include/catalog/pg_subscription.h
+/* Disallow streaming in-progress transactions */
+#define SUBSTREAM_OFF 'f'
+
+/*
+ * Streaming transactions are written to a temporary file and applied only
+ * after the transaction is committed on upstream.
+ */
+#define SUBSTREAM_ON 'o'
+
+/* Streaming transactions are appied immediately via a background worker */
+#define SUBSTREAM_APPLY 'a'
46a. There is not really any overarching comment that associates these
#defines back to the new 'stream' field so you are just supposed to
guess that's what they are for?
46b. I also feel that using 'o' for ON is not consistent with the 'f'
of OFF. IMO better to use 't/f' for true/false instead of 'o/f'. Also
don't forget update docs, pg_dump.c etc.
46c. Typo: "appied" -> "applied"
~~~~
47. src/test/regress/expected/subscription.out - missting test
Missing some test cases for all new option values? E.g. Where is the
test using streaming value is set to 'apply'. Same comment as [PSv4]
#81
------
[1]: /messages/by-id/OS0PR01MB5716E8D536552467EFB512EF94FC9@OS0PR01MB5716.jpnprd01.prod.outlook.com [PSv4] /messages/by-id/CAHut+PuqYP5eD5wcSCtk=a6KuMjat2UCzqyGoE7sieCaBsVskQ@mail.gmail.com
[PSv4] /messages/by-id/CAHut+PuqYP5eD5wcSCtk=a6KuMjat2UCzqyGoE7sieCaBsVskQ@mail.gmail.com
Kind Regards,
Peter Smith.
Fujitsu Australia
On Fri, Apr 29, 2022 at 3:22 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:
...
Thanks for your patch.
The patch modified streaming option in logical replication, it can be set to
'on', 'off' and 'apply'. The new option 'apply' haven't been tested in the tap test.
Attach a patch which modified the subscription tap test to cover both 'on' and
'apply' option. (The main patch is also attached to make cfbot happy.)
Here are my review comments for v5-0002 (TAP tests)
Your changes followed a similar pattern of refactoring so most of my
comments below is repeated for all the files.
======
1. Commit message
For the tap tests about streaming option in logical replication, test both
'on' and 'apply' option.
SUGGESTION
Change all TAP tests using the PUBLICATION "streaming" option, so they
now test both 'on' and 'apply' values.
~~~
2. src/test/subscription/t/015_stream.pl
+sub test_streaming
+{
I think the function should have a comment to say that its purpose is
to encapsulate all the common (stream related) test steps so the same
code can be run both for the streaming=on and streaming=apply cases.
~~~
3. src/test/subscription/t/015_stream.pl
+
+# Test streaming mode on
+# Test streaming mode apply
These comments fell too small. IMO they should both be more prominent like:
################################
# Test using streaming mode 'on'
################################
###################################
# Test using streaming mode 'apply'
###################################
~~~
4. src/test/subscription/t/015_stream.pl
+# Test streaming mode apply
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE (a > 2)");
$node_publisher->wait_for_catchup($appname);
I think those 2 lines do not really belong after the "# Test streaming
mode apply" comment. IIUC they are really just doing cleanup from the
prior test part so I think they should
a) be *above* this comment (and say "# cleanup the test data") or
b) maybe it is best to put all the cleanup lines actually inside the
'test_streaming' function so that the last thing the function does is
clean up after itself.
option b seems tidier to me.
~~~
5. src/test/subscription/t/016_stream_subxact.pl
sub test_streaming should be commented. (same as comment #2)
~~~
6. src/test/subscription/t/016_stream_subxact.pl
The comments for the different streaming nodes should be more
prominent. (same as comment #3)
~~~
7. src/test/subscription/t/016_stream_subxact.pl
+# Test streaming mode apply
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE (a > 2)");
$node_publisher->wait_for_catchup($appname);
These don't seem to belong here. They are clean up from the prior
test. (same as comment #4)
~~~
8. src/test/subscription/t/017_stream_ddl.pl
sub test_streaming should be commented. (same as comment #2)
~~~
9. src/test/subscription/t/017_stream_ddl.pl
The comments for the different streaming nodes should be more
prominent. (same as comment #3)
~~~
10. src/test/subscription/t/017_stream_ddl.pl
+# Test streaming mode apply
$node_publisher->safe_psql(
'postgres', q{
-BEGIN;
-INSERT INTO test_tab VALUES (2001, md5(2001::text), -2001, 2*2001);
-ALTER TABLE test_tab ADD COLUMN e INT;
-SAVEPOINT s1;
-INSERT INTO test_tab VALUES (2002, md5(2002::text), -2002, 2*2002, -3*2002);
-COMMIT;
+DELETE FROM test_tab WHERE (a > 2);
+ALTER TABLE test_tab DROP COLUMN c, DROP COLUMN d, DROP COLUMN e,
DROP COLUMN f;
});
$node_publisher->wait_for_catchup($appname);
These don't seem to belong here. They are clean up from the prior
test. (same as comment #4)
~~~
11. .../t/018_stream_subxact_abort.pl
sub test_streaming should be commented. (same as comment #2)
~~~
12. .../t/018_stream_subxact_abort.pl
The comments for the different streaming nodes should be more
prominent. (same as comment #3)
~~~
13. .../t/018_stream_subxact_abort.pl
+# Test streaming mode apply
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE (a > 2)");
$node_publisher->wait_for_catchup($appname);
These don't seem to belong here. They are clean up from the prior
test. (same as comment #4)
~~~
14. .../t/019_stream_subxact_ddl_abort.pl
sub test_streaming should be commented. (same as comment #2)
~~~
15. .../t/019_stream_subxact_ddl_abort.pl
The comments for the different streaming nodes should be more
prominent. (same as comment #3)
~~~
16. .../t/019_stream_subxact_ddl_abort.pl
+test_streaming($node_publisher, $node_subscriber, $appname);
+
+# Test streaming mode apply
$node_publisher->safe_psql(
'postgres', q{
-BEGIN;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i);
-ALTER TABLE test_tab ADD COLUMN c INT;
-SAVEPOINT s1;
-INSERT INTO test_tab SELECT i, md5(i::text), -i FROM
generate_series(501,1000) s(i);
-ALTER TABLE test_tab ADD COLUMN d INT;
-SAVEPOINT s2;
-INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM
generate_series(1001,1500) s(i);
-ALTER TABLE test_tab ADD COLUMN e INT;
-SAVEPOINT s3;
-INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM
generate_series(1501,2000) s(i);
+DELETE FROM test_tab WHERE (a > 2);
ALTER TABLE test_tab DROP COLUMN c;
-ROLLBACK TO s1;
-INSERT INTO test_tab SELECT i, md5(i::text), i FROM
generate_series(501,1000) s(i);
-COMMIT;
});
-
$node_publisher->wait_for_catchup($appname);
These don't seem to belong here. They are clean up from the prior
test. (same as comment #4)
~~~
17. .../subscription/t/022_twophase_cascade.
+# ---------------------
+# 2PC + STREAMING TESTS
+# ---------------------
+sub test_streaming
+{
I think maybe that 2PC comment should not have been moved. IMO it
belongs in the main test body...
~~~
18. .../subscription/t/022_twophase_cascade.
sub test_streaming should be commented. (same as comment #2)
~~~
19. .../subscription/t/022_twophase_cascade.
+sub test_streaming
+{
+ my ($node_A, $node_B, $node_C, $appname_B, $appname_C, $streaming) = @_;
If you called that '$streaming' param something more like
'$streaming_mode' it would read better I think.
~~~
20. .../subscription/t/023_twophase_stream.pl
sub test_streaming should be commented. (same as comment #2)
~~~
21. .../subscription/t/023_twophase_stream.pl
The comments for the different streaming nodes should be more
prominent. (same as comment #3)
~~~
22. .../subscription/t/023_twophase_stream.pl
+# Test streaming mode apply
$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
-
-# Then insert, update and delete enough rows to exceed the 64kB limit.
-$node_publisher->safe_psql('postgres', q{
- BEGIN;
- INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,
5000) s(i);
- UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
- DELETE FROM test_tab WHERE mod(a,3) = 0;
- PREPARE TRANSACTION 'test_prepared_tab';});
-
-$node_publisher->wait_for_catchup($appname);
-
-# check that transaction is in prepared state on subscriber
-$result = $node_subscriber->safe_psql('postgres', "SELECT count(*)
FROM pg_prepared_xacts;");
-is($result, qq(1), 'transaction is prepared on subscriber');
-
-# 2PC transaction gets aborted
-$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED
'test_prepared_tab';");
-
$node_publisher->wait_for_catchup($appname);
These don't seem to belong here. They are clean up from the prior
test. (same as comment #4)
------
Kind Regards,
Peter Smith.
Fujitsu Australia