Force streaming every change in logical decoding

Started by shiy.fnst@fujitsu.comover 3 years ago63 messageshackers
Jump to latest
#1shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com

Hi hackers,

In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
sent to output plugin in streaming mode. But there is a restriction that the
minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
allow sending every change to output plugin without waiting until
logical_decoding_work_mem is exceeded.

This helps to test streaming mode. For example, to test "Avoid streaming the
transaction which are skipped" [1]/messages/by-id/CAFiTN-tHK=7LzfrPs8fbT2ksrOJGQbzywcgXst2bM9-rJJAAUg@mail.gmail.com, it needs many XLOG_XACT_INVALIDATIONS
messages. With the new option, it can be tested with fewer changes and in less
time. Also, this new option helps to test more scenarios for "Perform streaming
logical transactions by background workers" [2]/messages/by-id/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com.

[1]: /messages/by-id/CAFiTN-tHK=7LzfrPs8fbT2ksrOJGQbzywcgXst2bM9-rJJAAUg@mail.gmail.com
[2]: /messages/by-id/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com

Regards,
Shi yu

Attachments:

v1-0001-Allow-streaming-every-change-without-waiting-till.patchapplication/octet-stream; name=v1-0001-Allow-streaming-every-change-without-waiting-till.patchDownload+50-9
#2Amit Kapila
amit.kapila16@gmail.com
In reply to: shiy.fnst@fujitsu.com (#1)
Re: Force streaming every change in logical decoding

On Tue, Dec 6, 2022 at 11:53 AM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Hi hackers,

In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
sent to output plugin in streaming mode. But there is a restriction that the
minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
allow sending every change to output plugin without waiting until
logical_decoding_work_mem is exceeded.

This helps to test streaming mode. For example, to test "Avoid streaming the
transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
messages. With the new option, it can be tested with fewer changes and in less
time. Also, this new option helps to test more scenarios for "Perform streaming
logical transactions by background workers" [2].

Yeah, I think this can also help in reducing the time for various
tests in test_decoding/stream and
src/test/subscription/t/*_stream_*.pl file by reducing the number of
changes required to invoke streaming mode. Can we think of making this
GUC extendible to even test more options on server-side (publisher)
and client-side (subscriber) cases? For example, we can have something
like logical_replication_mode with the following valid values: (a)
server_serialize: this will serialize each change to file on
publishers and then on commit restore and send all changes; (b)
server_stream: this will stream each change as currently proposed in
your patch. Then if we want to extend it for subscriber-side testing
then we can introduce new options like client_serialize for the case
being discussed in the email [1]/messages/by-id/CAD21AoAVUfDrm4-=ykihNAmR7bTX-KpHXM9jc42RbHePJv5k1w@mail.gmail.com.

Thoughts?

[1]: /messages/by-id/CAD21AoAVUfDrm4-=ykihNAmR7bTX-KpHXM9jc42RbHePJv5k1w@mail.gmail.com

--
With Regards,
Amit Kapila.

#3Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Amit Kapila (#2)
Re: Force streaming every change in logical decoding

On Tue, Dec 6, 2022 at 7:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Dec 6, 2022 at 11:53 AM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Hi hackers,

In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
sent to output plugin in streaming mode. But there is a restriction that the
minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
allow sending every change to output plugin without waiting until
logical_decoding_work_mem is exceeded.

This helps to test streaming mode. For example, to test "Avoid streaming the
transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
messages. With the new option, it can be tested with fewer changes and in less
time. Also, this new option helps to test more scenarios for "Perform streaming
logical transactions by background workers" [2].

Yeah, I think this can also help in reducing the time for various
tests in test_decoding/stream and
src/test/subscription/t/*_stream_*.pl file by reducing the number of
changes required to invoke streaming mode.

+1

Can we think of making this
GUC extendible to even test more options on server-side (publisher)
and client-side (subscriber) cases? For example, we can have something
like logical_replication_mode with the following valid values: (a)
server_serialize: this will serialize each change to file on
publishers and then on commit restore and send all changes; (b)
server_stream: this will stream each change as currently proposed in
your patch. Then if we want to extend it for subscriber-side testing
then we can introduce new options like client_serialize for the case
being discussed in the email [1].

Setting logical_replication_mode = 'client_serialize' implies that the
publisher behaves as server_stream? or do you mean we can set like
logical_replication_mode = 'server_stream, client_serialize'?

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#4Peter Smith
smithpb2250@gmail.com
In reply to: Amit Kapila (#2)
Re: Force streaming every change in logical decoding

On Tue, Dec 6, 2022 at 9:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Dec 6, 2022 at 11:53 AM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Hi hackers,

In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
sent to output plugin in streaming mode. But there is a restriction that the
minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
allow sending every change to output plugin without waiting until
logical_decoding_work_mem is exceeded.

This helps to test streaming mode. For example, to test "Avoid streaming the
transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
messages. With the new option, it can be tested with fewer changes and in less
time. Also, this new option helps to test more scenarios for "Perform streaming
logical transactions by background workers" [2].

+1

Yeah, I think this can also help in reducing the time for various
tests in test_decoding/stream and
src/test/subscription/t/*_stream_*.pl file by reducing the number of
changes required to invoke streaming mode. Can we think of making this
GUC extendible to even test more options on server-side (publisher)
and client-side (subscriber) cases? For example, we can have something
like logical_replication_mode with the following valid values: (a)
server_serialize: this will serialize each change to file on
publishers and then on commit restore and send all changes; (b)
server_stream: this will stream each change as currently proposed in
your patch. Then if we want to extend it for subscriber-side testing
then we can introduce new options like client_serialize for the case
being discussed in the email [1].

Thoughts?

There is potential for lots of developer GUCs for testing/debugging in
the area of logical replication but IMO it might be better to keep
them all separated. Putting everything into a single
'logical_replication_mode' might cause difficulties later when/if you
want combinations of the different modes.

For example, instead of

logical_replication_mode = XXX/YYY/ZZZ

maybe something like below will give more flexibility.

logical_replication_dev_XXX = true/false
logical_replication_dev_YYY = true/false
logical_replication_dev_ZZZ = true/false

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#5Peter Smith
smithpb2250@gmail.com
In reply to: shiy.fnst@fujitsu.com (#1)
Re: Force streaming every change in logical decoding

On Tue, Dec 6, 2022 at 5:23 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Hi hackers,

In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
sent to output plugin in streaming mode. But there is a restriction that the
minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
allow sending every change to output plugin without waiting until
logical_decoding_work_mem is exceeded.

Some review comments for patch v1-0001.

1. Typos

In several places "Wheather/wheather" -> "Whether/whether"

======

src/backend/replication/logical/reorderbuffer.c

2. ReorderBufferCheckMemoryLimit

{
ReorderBufferTXN *txn;

- /* bail out if we haven't exceeded the memory limit */
- if (rb->size < logical_decoding_work_mem * 1024L)
+ /*
+ * Stream the changes immediately if force_stream_mode is on and the output
+ * plugin supports streaming. Otherwise wait until size exceeds
+ * logical_decoding_work_mem.
+ */
+ bool force_stream = (force_stream_mode && ReorderBufferCanStream(rb));
+
+ /* bail out if force_stream is false and we haven't exceeded the
memory limit */
+ if (!force_stream && rb->size < logical_decoding_work_mem * 1024L)
  return;
  /*
- * Loop until we reach under the memory limit.  One might think that just
- * by evicting the largest (sub)transaction we will come under the memory
- * limit based on assumption that the selected transaction is at least as
- * large as the most recent change (which caused us to go over the memory
- * limit). However, that is not true because a user can reduce the
+ * If force_stream is true, loop until there's no change. Otherwise, loop
+ * until we reach under the memory limit. One might think that just by
+ * evicting the largest (sub)transaction we will come under the memory limit
+ * based on assumption that the selected transaction is at least as large as
+ * the most recent change (which caused us to go over the memory limit).
+ * However, that is not true because a user can reduce the
  * logical_decoding_work_mem to a smaller value before the most recent
  * change.
  */
- while (rb->size >= logical_decoding_work_mem * 1024L)
+ while ((!force_stream && rb->size >= logical_decoding_work_mem * 1024L) ||
+    (force_stream && rb->size > 0))
  {
  /*
  * Pick the largest transaction (or subtransaction) and evict it from

IIUC this logic can be simplified quite a lot just by shifting that
"bail out" condition into the loop.

Something like:

while (true)
{
if (!(force_stream && rb->size > 0 || rb->size <
logical_decoding_work_mem * 1024L))
break;
...
}

------

Kind Regards,
Peter Smith.
Fujitsu Australia

#6Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Peter Smith (#4)
Re: Force streaming every change in logical decoding

On Wed, Dec 7, 2022 at 8:46 AM Peter Smith <smithpb2250@gmail.com> wrote:

On Tue, Dec 6, 2022 at 9:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Dec 6, 2022 at 11:53 AM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Hi hackers,

In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
sent to output plugin in streaming mode. But there is a restriction that the
minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
allow sending every change to output plugin without waiting until
logical_decoding_work_mem is exceeded.

This helps to test streaming mode. For example, to test "Avoid streaming the
transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
messages. With the new option, it can be tested with fewer changes and in less
time. Also, this new option helps to test more scenarios for "Perform streaming
logical transactions by background workers" [2].

+1

Yeah, I think this can also help in reducing the time for various
tests in test_decoding/stream and
src/test/subscription/t/*_stream_*.pl file by reducing the number of
changes required to invoke streaming mode. Can we think of making this
GUC extendible to even test more options on server-side (publisher)
and client-side (subscriber) cases? For example, we can have something
like logical_replication_mode with the following valid values: (a)
server_serialize: this will serialize each change to file on
publishers and then on commit restore and send all changes; (b)
server_stream: this will stream each change as currently proposed in
your patch. Then if we want to extend it for subscriber-side testing
then we can introduce new options like client_serialize for the case
being discussed in the email [1].

Thoughts?

There is potential for lots of developer GUCs for testing/debugging in
the area of logical replication but IMO it might be better to keep
them all separated. Putting everything into a single
'logical_replication_mode' might cause difficulties later when/if you
want combinations of the different modes.

I think we want the developer option that forces streaming changes
during logical decoding to be PGC_USERSET but probably the developer
option for testing the parallel apply feature would be PGC_SIGHUP.
Also, since streaming changes is not specific to logical replication
but to logical decoding, I'm not sure logical_replication_XXX is a
good name. IMO having force_stream_mode and a different GUC for
testing the parallel apply feature makes sense to me.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#7Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#3)
Re: Force streaming every change in logical decoding

On Tue, Dec 6, 2022 at 7:18 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Tue, Dec 6, 2022 at 7:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Dec 6, 2022 at 11:53 AM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Hi hackers,

In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
sent to output plugin in streaming mode. But there is a restriction that the
minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
allow sending every change to output plugin without waiting until
logical_decoding_work_mem is exceeded.

This helps to test streaming mode. For example, to test "Avoid streaming the
transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
messages. With the new option, it can be tested with fewer changes and in less
time. Also, this new option helps to test more scenarios for "Perform streaming
logical transactions by background workers" [2].

Yeah, I think this can also help in reducing the time for various
tests in test_decoding/stream and
src/test/subscription/t/*_stream_*.pl file by reducing the number of
changes required to invoke streaming mode.

+1

Can we think of making this
GUC extendible to even test more options on server-side (publisher)
and client-side (subscriber) cases? For example, we can have something
like logical_replication_mode with the following valid values: (a)
server_serialize: this will serialize each change to file on
publishers and then on commit restore and send all changes; (b)
server_stream: this will stream each change as currently proposed in
your patch. Then if we want to extend it for subscriber-side testing
then we can introduce new options like client_serialize for the case
being discussed in the email [1].

Setting logical_replication_mode = 'client_serialize' implies that the
publisher behaves as server_stream? or do you mean we can set like
logical_replication_mode = 'server_stream, client_serialize'?

The latter one (logical_replication_mode = 'server_stream,
client_serialize'). The idea is to cover more options with one GUC and
each option can be used individually as well as in combination with
others.

--
With Regards,
Amit Kapila.

#8Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#6)
Re: Force streaming every change in logical decoding

On Wed, Dec 7, 2022 at 7:31 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Wed, Dec 7, 2022 at 8:46 AM Peter Smith <smithpb2250@gmail.com> wrote:

Yeah, I think this can also help in reducing the time for various
tests in test_decoding/stream and
src/test/subscription/t/*_stream_*.pl file by reducing the number of
changes required to invoke streaming mode. Can we think of making this
GUC extendible to even test more options on server-side (publisher)
and client-side (subscriber) cases? For example, we can have something
like logical_replication_mode with the following valid values: (a)
server_serialize: this will serialize each change to file on
publishers and then on commit restore and send all changes; (b)
server_stream: this will stream each change as currently proposed in
your patch. Then if we want to extend it for subscriber-side testing
then we can introduce new options like client_serialize for the case
being discussed in the email [1].

Thoughts?

There is potential for lots of developer GUCs for testing/debugging in
the area of logical replication but IMO it might be better to keep
them all separated. Putting everything into a single
'logical_replication_mode' might cause difficulties later when/if you
want combinations of the different modes.

I think we want the developer option that forces streaming changes
during logical decoding to be PGC_USERSET but probably the developer
option for testing the parallel apply feature would be PGC_SIGHUP.

Ideally, that is true but if we want to combine the multiple modes in
one parameter, is there a harm in keeping it as PGC_SIGHUP?

Also, since streaming changes is not specific to logical replication
but to logical decoding, I'm not sure logical_replication_XXX is a
good name. IMO having force_stream_mode and a different GUC for
testing the parallel apply feature makes sense to me.

But if we want to have a separate variable for testing/debugging
streaming like force_stream_mode, why not for serializing as well? And
if we want for both then we can even think of combining them in one
variable as logical_decoding_mode with values as 'stream' and
'serialize'. The first one specified would be given preference. Also,
the name force_stream_mode doesn't seem to convey that it is for
logical decoding. We can probably have a separate variable for the
subscriber side.

On one side having separate GUCs for publisher and subscriber seems to
give better flexibility but having more GUCs also sometimes makes them
less usable. Here, my thought was to have a single or as few GUCs as
possible which can be extendible by providing multiple values instead
of having different GUCs. I was trying to map this with the existing
string parameters in developer options.

--
With Regards,
Amit Kapila.

#9Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Amit Kapila (#8)
Re: Force streaming every change in logical decoding

On Wed, Dec 7, 2022 at 12:55 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Dec 7, 2022 at 7:31 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Wed, Dec 7, 2022 at 8:46 AM Peter Smith <smithpb2250@gmail.com> wrote:

Yeah, I think this can also help in reducing the time for various
tests in test_decoding/stream and
src/test/subscription/t/*_stream_*.pl file by reducing the number of
changes required to invoke streaming mode. Can we think of making this
GUC extendible to even test more options on server-side (publisher)
and client-side (subscriber) cases? For example, we can have something
like logical_replication_mode with the following valid values: (a)
server_serialize: this will serialize each change to file on
publishers and then on commit restore and send all changes; (b)
server_stream: this will stream each change as currently proposed in
your patch. Then if we want to extend it for subscriber-side testing
then we can introduce new options like client_serialize for the case
being discussed in the email [1].

Thoughts?

There is potential for lots of developer GUCs for testing/debugging in
the area of logical replication but IMO it might be better to keep
them all separated. Putting everything into a single
'logical_replication_mode' might cause difficulties later when/if you
want combinations of the different modes.

I think we want the developer option that forces streaming changes
during logical decoding to be PGC_USERSET but probably the developer
option for testing the parallel apply feature would be PGC_SIGHUP.

Ideally, that is true but if we want to combine the multiple modes in
one parameter, is there a harm in keeping it as PGC_SIGHUP?

It's not a big harm but we will end up doing ALTER SYSTEM and
pg_reload_conf() even in regression tests (e.g. in
test_decoding/stream.sql).

Also, since streaming changes is not specific to logical replication
but to logical decoding, I'm not sure logical_replication_XXX is a
good name. IMO having force_stream_mode and a different GUC for
testing the parallel apply feature makes sense to me.

But if we want to have a separate variable for testing/debugging
streaming like force_stream_mode, why not for serializing as well? And
if we want for both then we can even think of combining them in one
variable as logical_decoding_mode with values as 'stream' and
'serialize'.

Making it enum makes sense to me.

The first one specified would be given preference. Also,
the name force_stream_mode doesn't seem to convey that it is for
logical decoding.

Agreed.

On one side having separate GUCs for publisher and subscriber seems to
give better flexibility but having more GUCs also sometimes makes them
less usable. Here, my thought was to have a single or as few GUCs as
possible which can be extendible by providing multiple values instead
of having different GUCs. I was trying to map this with the existing
string parameters in developer options.

I see your point. On the other hand, I'm not sure it's a good idea to
control different features by one GUC in general. The developer option
could be an exception?

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#10Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#9)
Re: Force streaming every change in logical decoding

On Wed, Dec 7, 2022 at 10:55 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Wed, Dec 7, 2022 at 12:55 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On one side having separate GUCs for publisher and subscriber seems to
give better flexibility but having more GUCs also sometimes makes them
less usable. Here, my thought was to have a single or as few GUCs as
possible which can be extendible by providing multiple values instead
of having different GUCs. I was trying to map this with the existing
string parameters in developer options.

I see your point. On the other hand, I'm not sure it's a good idea to
control different features by one GUC in general. The developer option
could be an exception?

I am not sure what is the best thing if this was proposed as a
non-developer option but it seems to me that having a single parameter
for publisher/subscriber, in this case, can serve our need for
testing/debugging. BTW, even though it is not a very good example but
we use max_replication_slots for different purposes on the publisher
(the limit for slots) and subscriber (the limit for origins).

--
With Regards,
Amit Kapila.

#11Dilip Kumar
dilipbalaut@gmail.com
In reply to: Peter Smith (#4)
Re: Force streaming every change in logical decoding

On Wed, Dec 7, 2022 at 5:16 AM Peter Smith <smithpb2250@gmail.com> wrote:

+1 for the idea

There is potential for lots of developer GUCs for testing/debugging in
the area of logical replication but IMO it might be better to keep
them all separated. Putting everything into a single
'logical_replication_mode' might cause difficulties later when/if you
want combinations of the different modes.

For example, instead of

logical_replication_mode = XXX/YYY/ZZZ

maybe something like below will give more flexibility.

logical_replication_dev_XXX = true/false
logical_replication_dev_YYY = true/false
logical_replication_dev_ZZZ = true/false

Even I agree that usability wise keeping them independent is better.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#12Dilip Kumar
dilipbalaut@gmail.com
In reply to: shiy.fnst@fujitsu.com (#1)
Re: Force streaming every change in logical decoding

On Tue, Dec 6, 2022 at 11:53 AM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Hi hackers,

In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
sent to output plugin in streaming mode. But there is a restriction that the
minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
allow sending every change to output plugin without waiting until
logical_decoding_work_mem is exceeded.

This helps to test streaming mode. For example, to test "Avoid streaming the
transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
messages. With the new option, it can be tested with fewer changes and in less
time. Also, this new option helps to test more scenarios for "Perform streaming
logical transactions by background workers" [2].

Some comments on the patch

1. Can you add one test case using this option

2. +     <varlistentry id="guc-force-stream-mode" xreflabel="force_stream_mode">
+      <term><varname>force_stream_mode</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>force_stream_mode</varname> configuration
parameter</primary>
+      </indexterm>
+      </term>

This GUC name "force_stream_mode" somehow appears like we are forcing
this streaming mode irrespective of whether the
subscriber has requested for this mode or not. But actually it is not
that, it is just streaming each change if
it is enabled. So we might need to think on the name (at least we
should avoid using *mode* in the name IMHO).

3.
-    while (rb->size >= logical_decoding_work_mem * 1024L)
+    while ((!force_stream && rb->size >= logical_decoding_work_mem * 1024L) ||
+           (force_stream && rb->size > 0))
     {

It seems like if force_stream is on then indirectly it is enabling
force serialization as well. Because once we enter into the loop
based on "force_stream" then it will either stream or serialize but I
guess we do not want to force serialize based on this parameter.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#13Peter Smith
smithpb2250@gmail.com
In reply to: Dilip Kumar (#12)
Re: Force streaming every change in logical decoding

On Sat, Dec 10, 2022 at 5:03 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Dec 6, 2022 at 11:53 AM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Hi hackers,

In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
sent to output plugin in streaming mode. But there is a restriction that the
minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
allow sending every change to output plugin without waiting until
logical_decoding_work_mem is exceeded.

This helps to test streaming mode. For example, to test "Avoid streaming the
transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
messages. With the new option, it can be tested with fewer changes and in less
time. Also, this new option helps to test more scenarios for "Perform streaming
logical transactions by background workers" [2].

Some comments on the patch

...

This GUC name "force_stream_mode" somehow appears like we are forcing
this streaming mode irrespective of whether the
subscriber has requested for this mode or not. But actually it is not
that, it is just streaming each change if
it is enabled. So we might need to think on the name (at least we
should avoid using *mode* in the name IMHO).

I thought the same. Names like those shown below might be more appropriate:
stream_checks_work_mem = true/false
stream_mode_checks_size = true/false
stream_for_large_tx_only = true/false
... etc.

The GUC name length could get a bit unwieldy but isn't it better for
it to have the correct meaning than to have a shorter but slightly
ambiguous name? Anyway, it is a developer option so I guess longer
names are less of a problem.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#14Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#11)
Re: Force streaming every change in logical decoding

On Sat, Dec 10, 2022 at 11:18 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Wed, Dec 7, 2022 at 5:16 AM Peter Smith <smithpb2250@gmail.com> wrote:

+1 for the idea

There is potential for lots of developer GUCs for testing/debugging in
the area of logical replication but IMO it might be better to keep
them all separated. Putting everything into a single
'logical_replication_mode' might cause difficulties later when/if you
want combinations of the different modes.

For example, instead of

logical_replication_mode = XXX/YYY/ZZZ

maybe something like below will give more flexibility.

logical_replication_dev_XXX = true/false
logical_replication_dev_YYY = true/false
logical_replication_dev_ZZZ = true/false

Even I agree that usability wise keeping them independent is better.

But OTOH, doesn't introducing multiple GUCs (one to allow streaming
each change, another to allow serialization, and a third one to
probably test subscriber-side work) for the purpose of testing, and
debugging logical replication code sound a bit more?

--
With Regards,
Amit Kapila.

#15Peter Smith
smithpb2250@gmail.com
In reply to: shiy.fnst@fujitsu.com (#1)
Re: Force streaming every change in logical decoding

On Tue, Dec 6, 2022 at 5:23 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Hi hackers,

In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
sent to output plugin in streaming mode. But there is a restriction that the
minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
allow sending every change to output plugin without waiting until
logical_decoding_work_mem is exceeded.

This helps to test streaming mode. For example, to test "Avoid streaming the
transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
messages. With the new option, it can be tested with fewer changes and in less
time. Also, this new option helps to test more scenarios for "Perform streaming
logical transactions by background workers" [2].

[1] /messages/by-id/CAFiTN-tHK=7LzfrPs8fbT2ksrOJGQbzywcgXst2bM9-rJJAAUg@mail.gmail.com
[2] /messages/by-id/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com

Hi, I've been doing some testing that makes use of this new developer
GUC `force_stream_mode`.

IIUC this GUC is used by the walsender during the logic of the
ReorderBufferCheckMemoryLimit(). Also, AFAIK the only way that the
walsender is going to know this GUC value is by inheritance from the
parent publisher at the time the walsender process gets launched.

I may be overthinking this. Isn't there potential for this to become
quite confusing depending on the timing of when this GUC is modified?

E.g.1 When the walsender is launched, it will use whatever is the
current value of this GUC.
E.g.2 But if the GUC is changed after the walsender is already
launched, then that will have no effect on the already running
walsender.

Is that understanding correct?

------
Kind Regards,
Peter Smith.
Fujitsu Australia.

#16Peter Smith
smithpb2250@gmail.com
In reply to: Peter Smith (#15)
Re: Force streaming every change in logical decoding

On Tue, Dec 13, 2022 at 2:33 PM Peter Smith <smithpb2250@gmail.com> wrote:

On Tue, Dec 6, 2022 at 5:23 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Hi hackers,

In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
sent to output plugin in streaming mode. But there is a restriction that the
minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
allow sending every change to output plugin without waiting until
logical_decoding_work_mem is exceeded.

This helps to test streaming mode. For example, to test "Avoid streaming the
transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
messages. With the new option, it can be tested with fewer changes and in less
time. Also, this new option helps to test more scenarios for "Perform streaming
logical transactions by background workers" [2].

[1] /messages/by-id/CAFiTN-tHK=7LzfrPs8fbT2ksrOJGQbzywcgXst2bM9-rJJAAUg@mail.gmail.com
[2] /messages/by-id/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com

Hi, I've been doing some testing that makes use of this new developer
GUC `force_stream_mode`.

IIUC this GUC is used by the walsender during the logic of the
ReorderBufferCheckMemoryLimit(). Also, AFAIK the only way that the
walsender is going to know this GUC value is by inheritance from the
parent publisher at the time the walsender process gets launched.

I may be overthinking this. Isn't there potential for this to become
quite confusing depending on the timing of when this GUC is modified?

E.g.1 When the walsender is launched, it will use whatever is the
current value of this GUC.
E.g.2 But if the GUC is changed after the walsender is already
launched, then that will have no effect on the already running
walsender.

Is that understanding correct?

I think I was mistaken above. It looks like even the already-launched
walsender gets the updated GUC value via a SIGHUP on the parent
publisher.

2022-12-13 16:31:33.453 AEDT [1902] LOG: received SIGHUP, reloading
configuration files
2022-12-13 16:31:33.455 AEDT [1902] LOG: parameter
"force_stream_mode" changed to "true"

Sorry for the noise.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#17shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
In reply to: Dilip Kumar (#12)
RE: Force streaming every change in logical decoding

On Sat, Dec 10, 2022 2:03 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Dec 6, 2022 at 11:53 AM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Hi hackers,

In logical decoding, when logical_decoding_work_mem is exceeded, the

changes are

sent to output plugin in streaming mode. But there is a restriction that the
minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC

to

allow sending every change to output plugin without waiting until
logical_decoding_work_mem is exceeded.

This helps to test streaming mode. For example, to test "Avoid streaming the
transaction which are skipped" [1], it needs many

XLOG_XACT_INVALIDATIONS

messages. With the new option, it can be tested with fewer changes and in

less

time. Also, this new option helps to test more scenarios for "Perform

streaming

logical transactions by background workers" [2].

Some comments on the patch

Thanks for your comments.

1. Can you add one test case using this option

I added a simple test to confirm the new option works.

2. +     <varlistentry id="guc-force-stream-mode"
xreflabel="force_stream_mode">
+      <term><varname>force_stream_mode</varname>
(<type>boolean</type>)
+      <indexterm>
+       <primary><varname>force_stream_mode</varname> configuration
parameter</primary>
+      </indexterm>
+      </term>

This GUC name "force_stream_mode" somehow appears like we are forcing
this streaming mode irrespective of whether the
subscriber has requested for this mode or not. But actually it is not
that, it is just streaming each change if
it is enabled. So we might need to think on the name (at least we
should avoid using *mode* in the name IMHO).

I think a similar GUC is force_parallel_mode, and if the query is parallel
unsafe or max_worker_processes is exceeded, force_parallel_mode will not work.
This is similar to what we do in this patch. So, maybe it's ok to use "mode". I
didn't change it in the new version of patch. What do you think?

3.
-    while (rb->size >= logical_decoding_work_mem * 1024L)
+    while ((!force_stream && rb->size >= logical_decoding_work_mem *
1024L) ||
+           (force_stream && rb->size > 0))
{

It seems like if force_stream is on then indirectly it is enabling
force serialization as well. Because once we enter into the loop
based on "force_stream" then it will either stream or serialize but I
guess we do not want to force serialize based on this parameter.

Agreed, I refactored the code and modified this point.

Please see the attached patch. I also fix Peter's comments[1]/messages/by-id/CAHut+PtOjZ_e-KLf26i1XLH2ffPEZGOmGSKy0wDjwyB_uvzxBQ@mail.gmail.com. The GUC name and
design are still under discussion, so I didn't modify them.

By the way, I noticed that the comment for ReorderBufferCheckMemoryLimit() on
HEAD missed something. I fix it in this patch, too.

[1]: /messages/by-id/CAHut+PtOjZ_e-KLf26i1XLH2ffPEZGOmGSKy0wDjwyB_uvzxBQ@mail.gmail.com

Regards,
Shi yu

Attachments:

v2-0001-Allow-streaming-every-change-without-waiting-till.patchapplication/octet-stream; name=v2-0001-Allow-streaming-every-change-without-waiting-till.patchDownload+100-31
#18Amit Kapila
amit.kapila16@gmail.com
In reply to: shiy.fnst@fujitsu.com (#17)
Re: Force streaming every change in logical decoding

On Wed, Dec 14, 2022 at 2:15 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Please see the attached patch. I also fix Peter's comments[1]. The GUC name and
design are still under discussion, so I didn't modify them.

Let me summarize the discussion on name and design till now. As per my
understanding, we have three requirements: (a) In publisher, stream
each change of transaction instead of waiting till
logical_decoding_work_mem or commit; this will help us to reduce the
test timings of current and future tests for replication of
in-progress transactions; (b) In publisher, serialize each change
instead of waiting till logical_decoding_work_mem; this can help
reduce the test time of tests related to serialization of changes in
logical decoding; (c) In subscriber, during parallel apply for
in-progress transactions (a new feature being discussed at [1]/messages/by-id/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com) allow
the system to switch to serialize mode (no more space in shared memory
queue between leader and parallel apply worker either due to a
parallel worker being busy or waiting on some lock) while sending
changes.

Having a GUC that controls these actions/features will allow us to
write tests with shorter duration and better predictability as
otherwise, they require a lot of changes. Apart from tests, these also
help to easily debug the required code. So they fit the Developer
Options category of GUC [2]https://www.postgresql.org/docs/devel/runtime-config-developer.html.

We have discussed three different ways to provide GUC for these
features. (1) Have separate GUCs like force_server_stream_mode,
force_server_serialize_mode, force_client_serialize_mode (we can use
different names for these) for each of these; (2) Have two sets of
GUCs for server and client. We can have logical_decoding_mode with
values as 'stream' and 'serialize' for the server and then
logical_apply_serialize = true/false for the client. (3) Have one GUC
like logical_replication_mode with values as 'server_stream',
'server_serialize', 'client_serialize'.

The names used here are tentative mainly to explain each of the
options, we can use different names once we decide among the above.

Thoughts?

[1]: /messages/by-id/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com
[2]: https://www.postgresql.org/docs/devel/runtime-config-developer.html

--
With Regards,
Amit Kapila.

#19Dilip Kumar
dilipbalaut@gmail.com
In reply to: shiy.fnst@fujitsu.com (#17)
Re: Force streaming every change in logical decoding

On Wed, Dec 14, 2022 at 2:15 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

-    while (rb->size >= logical_decoding_work_mem * 1024L)
+    while ((!force_stream && rb->size >= logical_decoding_work_mem *
1024L) ||
+           (force_stream && rb->size > 0))
{

It seems like if force_stream is on then indirectly it is enabling
force serialization as well. Because once we enter into the loop
based on "force_stream" then it will either stream or serialize but I
guess we do not want to force serialize based on this parameter.

Agreed, I refactored the code and modified this point.

After thinking more on this I feel the previous behavior made more
sense. Because without this patch if we cross the work_mem we try to
stream and if we can not stream for some reason e.g. partial change
then we serialize. And I feel your previous patch was mimicking the
same behavior for each change. Now in the new patch, we will try to
stream and if we can not we will queue the change so I feel we are
creating a new patch that actually doesn't exist without the force
mode.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#20Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#18)
Re: Force streaming every change in logical decoding

On Wed, Dec 14, 2022 at 5:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Dec 14, 2022 at 2:15 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Please see the attached patch. I also fix Peter's comments[1]. The GUC name and
design are still under discussion, so I didn't modify them.

Let me summarize the discussion on name and design till now. As per my
understanding, we have three requirements: (a) In publisher, stream
each change of transaction instead of waiting till
logical_decoding_work_mem or commit; this will help us to reduce the
test timings of current and future tests for replication of
in-progress transactions; (b) In publisher, serialize each change
instead of waiting till logical_decoding_work_mem; this can help
reduce the test time of tests related to serialization of changes in
logical decoding; (c) In subscriber, during parallel apply for
in-progress transactions (a new feature being discussed at [1]) allow
the system to switch to serialize mode (no more space in shared memory
queue between leader and parallel apply worker either due to a
parallel worker being busy or waiting on some lock) while sending
changes.

Having a GUC that controls these actions/features will allow us to
write tests with shorter duration and better predictability as
otherwise, they require a lot of changes. Apart from tests, these also
help to easily debug the required code. So they fit the Developer
Options category of GUC [2].

We have discussed three different ways to provide GUC for these
features. (1) Have separate GUCs like force_server_stream_mode,
force_server_serialize_mode, force_client_serialize_mode (we can use
different names for these) for each of these; (2) Have two sets of
GUCs for server and client. We can have logical_decoding_mode with
values as 'stream' and 'serialize' for the server and then
logical_apply_serialize = true/false for the client. (3) Have one GUC
like logical_replication_mode with values as 'server_stream',
'server_serialize', 'client_serialize'.

The names used here are tentative mainly to explain each of the
options, we can use different names once we decide among the above.

Thoughts?

I think option 2 makes sense because stream/serialize are two related
options and both are dependent on the same parameter
(logical_decoding_work_mem) so having a single know is logical. And
another GUC for serializing logical apply.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#21Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#20)
#22Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Amit Kapila (#18)
#23Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#22)
#24Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Amit Kapila (#23)
#25shveta malik
shveta.malik@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#24)
#26Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Amit Kapila (#23)
#27Peter Smith
smithpb2250@gmail.com
In reply to: shiy.fnst@fujitsu.com (#17)
#28Peter Smith
smithpb2250@gmail.com
In reply to: Masahiko Sawada (#26)
#29Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#28)
#30Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: shveta malik (#25)
#31shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
In reply to: Amit Kapila (#29)
#32shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
In reply to: Peter Smith (#27)
#33Masahiko Sawada
sawada.mshk@gmail.com
In reply to: shiy.fnst@fujitsu.com (#31)
#34Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#33)
#35Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Amit Kapila (#34)
#36Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Amit Kapila (#34)
#37Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Amit Kapila (#34)
#38Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#36)
#39Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Masahiko Sawada (#38)
#40Amit Kapila
amit.kapila16@gmail.com
In reply to: Kyotaro Horiguchi (#39)
#41Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#35)
#42Amit Kapila
amit.kapila16@gmail.com
In reply to: Kyotaro Horiguchi (#37)
#43shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
In reply to: Amit Kapila (#42)
#44Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Amit Kapila (#42)
#45Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Amit Kapila (#41)
#46Masahiko Sawada
sawada.mshk@gmail.com
In reply to: shiy.fnst@fujitsu.com (#43)
#47Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: shiy.fnst@fujitsu.com (#43)
#48Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#46)
#49Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#47)
#50Amit Kapila
amit.kapila16@gmail.com
In reply to: shiy.fnst@fujitsu.com (#43)
#51Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Hayato Kuroda (Fujitsu) (#47)
#52Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#51)
#53shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
In reply to: Amit Kapila (#50)
#54Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: shiy.fnst@fujitsu.com (#53)
#55shveta malik
shveta.malik@gmail.com
In reply to: shiy.fnst@fujitsu.com (#53)
#56Masahiko Sawada
sawada.mshk@gmail.com
In reply to: shiy.fnst@fujitsu.com (#53)
#57Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#56)
#58Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#57)
#59Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#58)
#60Andres Freund
andres@anarazel.de
In reply to: Amit Kapila (#59)
#61Amit Kapila
amit.kapila16@gmail.com
In reply to: Andres Freund (#60)
#62vignesh C
vignesh21@gmail.com
In reply to: Amit Kapila (#59)
#63shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
In reply to: Masahiko Sawada (#35)