logical replication empty transactions

Started by Jeff Janesabout 6 years ago108 messages
#1Jeff Janes
Jeff Janes
jeff.janes@gmail.com

After setting up logical replication of a slowly changing table using the
built in pub/sub facility, I noticed way more network traffic than made
sense. Looking into I see that every transaction in that database on the
master gets sent to the replica. 99.999+% of them are empty transactions
('B' message and 'C' message with nothing in between) because the
transactions don't touch any tables in the publication, only non-replicated
tables. Is doing it this way necessary for some reason? Couldn't we hold
the transmission of 'B' until something else comes along, and then if that
next thing is 'C' drop both of them?

There is a comment for WalSndPrepareWrite which seems to foreshadow such a
thing, but I don't really see how to use it in this case. I want to drop
two messages, not one.

* Don't do anything lasting in here, it's quite possible that nothing will
be done
* with the data.

This applies to all version which have support for pub/sub, including the
recent commits to 13dev.

I've searched through the voluminous mailing list threads for when this
feature was being presented to see if it was already discussed, but since
every word I can think to search on occurs in virtually every message in
the threads in some context or another, I didn't have much luck.

Cheers,

Jeff

#2Euler Taveira
Euler Taveira
euler@timbira.com.br
In reply to: Jeff Janes (#1)
1 attachment(s)
Re: logical replication empty transactions

Em seg., 21 de out. de 2019 às 21:20, Jeff Janes
<jeff.janes@gmail.com> escreveu:

After setting up logical replication of a slowly changing table using the built in pub/sub facility, I noticed way more network traffic than made sense. Looking into I see that every transaction in that database on the master gets sent to the replica. 99.999+% of them are empty transactions ('B' message and 'C' message with nothing in between) because the transactions don't touch any tables in the publication, only non-replicated tables. Is doing it this way necessary for some reason? Couldn't we hold the transmission of 'B' until something else comes along, and then if that next thing is 'C' drop both of them?

That is not optimal. Those empty transactions is a waste of bandwidth.
We can suppress them if no changes will be sent. test_decoding
implements "skip empty transaction" as you described above and I did
something similar to it. Patch is attached.

--
Euler Taveira Timbira -
http://www.timbira.com.br/
PostgreSQL: Consultoria, Desenvolvimento, Suporte 24x7 e Treinamento

Attachments:

0001-Skip-empty-transactions-for-logical-replication.patchtext/x-patch; charset=US-ASCII; name=0001-Skip-empty-transactions-for-logical-replication.patch
#3Jeff Janes
Jeff Janes
jeff.janes@gmail.com
In reply to: Euler Taveira (#2)
Re: logical replication empty transactions

On Fri, Nov 8, 2019 at 8:59 PM Euler Taveira <euler@timbira.com.br> wrote:

Em seg., 21 de out. de 2019 às 21:20, Jeff Janes
<jeff.janes@gmail.com> escreveu:

After setting up logical replication of a slowly changing table using

the built in pub/sub facility, I noticed way more network traffic than made
sense. Looking into I see that every transaction in that database on the
master gets sent to the replica. 99.999+% of them are empty transactions
('B' message and 'C' message with nothing in between) because the
transactions don't touch any tables in the publication, only non-replicated
tables. Is doing it this way necessary for some reason? Couldn't we hold
the transmission of 'B' until something else comes along, and then if that
next thing is 'C' drop both of them?

That is not optimal. Those empty transactions is a waste of bandwidth.
We can suppress them if no changes will be sent. test_decoding
implements "skip empty transaction" as you described above and I did
something similar to it. Patch is attached.

Thanks. I didn't think it would be that simple, because I thought we would
need some way to fake an acknowledgement for any dropped empty
transactions, to keep the LSN advancing and allow WAL to get recycled on
the master. But it turns out the opposite. While your patch drops the
network traffic by a lot, there is still a lot of traffic. Now it is
keep-alives, rather than 'B' and 'C'. I don't know why I am getting a few
hundred keep alives every second when the timeouts are at their defaults,
but it is better than several thousand 'B' and 'C'.

My setup here was just to create, publish, and subscribe to a inactive
dummy table, while having pgbench running on the master (with unpublished
tables). I have not created an intentionally slow network, but I am
testing it over wifi, which is inherently kind of slow.

Cheers,

Jeff

#4Dilip Kumar
Dilip Kumar
dilipbalaut@gmail.com
In reply to: Euler Taveira (#2)
Re: logical replication empty transactions

On Sat, Nov 9, 2019 at 7:29 AM Euler Taveira <euler@timbira.com.br> wrote:

Em seg., 21 de out. de 2019 às 21:20, Jeff Janes
<jeff.janes@gmail.com> escreveu:

After setting up logical replication of a slowly changing table using the built in pub/sub facility, I noticed way more network traffic than made sense. Looking into I see that every transaction in that database on the master gets sent to the replica. 99.999+% of them are empty transactions ('B' message and 'C' message with nothing in between) because the transactions don't touch any tables in the publication, only non-replicated tables. Is doing it this way necessary for some reason? Couldn't we hold the transmission of 'B' until something else comes along, and then if that next thing is 'C' drop both of them?

That is not optimal. Those empty transactions is a waste of bandwidth.
We can suppress them if no changes will be sent. test_decoding
implements "skip empty transaction" as you described above and I did
something similar to it. Patch is attached.

I think this significantly reduces the network bandwidth for empty
transactions. I have briefly reviewed the patch and it looks good to
me.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#5Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#4)
Re: logical replication empty transactions

On Mon, Mar 2, 2020 at 9:01 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Sat, Nov 9, 2019 at 7:29 AM Euler Taveira <euler@timbira.com.br> wrote:

Em seg., 21 de out. de 2019 às 21:20, Jeff Janes
<jeff.janes@gmail.com> escreveu:

After setting up logical replication of a slowly changing table using the built in pub/sub facility, I noticed way more network traffic than made sense. Looking into I see that every transaction in that database on the master gets sent to the replica. 99.999+% of them are empty transactions ('B' message and 'C' message with nothing in between) because the transactions don't touch any tables in the publication, only non-replicated tables. Is doing it this way necessary for some reason? Couldn't we hold the transmission of 'B' until something else comes along, and then if that next thing is 'C' drop both of them?

That is not optimal. Those empty transactions is a waste of bandwidth.
We can suppress them if no changes will be sent. test_decoding
implements "skip empty transaction" as you described above and I did
something similar to it. Patch is attached.

I think this significantly reduces the network bandwidth for empty
transactions. I have briefly reviewed the patch and it looks good to
me.

One thing that is not clear to me is how will we advance restart_lsn
if we don't send any empty xact in a system where there are many such
xacts? IIRC, the restart_lsn is advanced based on confirmed_flush lsn
sent by subscriber. After this change, the subscriber won't be able
to send the confirmed_flush and for a long time, we won't be able to
advance restart_lsn. Is that correct, if so, why do we think that is
acceptable? One might argue that restart_lsn will be advanced as soon
as we send the first non-empty xact, but not sure if that is good
enough. What do you think?

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#6Dilip Kumar
Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#5)
Re: logical replication empty transactions

On Mon, Mar 2, 2020 at 4:56 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Mar 2, 2020 at 9:01 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Sat, Nov 9, 2019 at 7:29 AM Euler Taveira <euler@timbira.com.br> wrote:

Em seg., 21 de out. de 2019 às 21:20, Jeff Janes
<jeff.janes@gmail.com> escreveu:

After setting up logical replication of a slowly changing table using the built in pub/sub facility, I noticed way more network traffic than made sense. Looking into I see that every transaction in that database on the master gets sent to the replica. 99.999+% of them are empty transactions ('B' message and 'C' message with nothing in between) because the transactions don't touch any tables in the publication, only non-replicated tables. Is doing it this way necessary for some reason? Couldn't we hold the transmission of 'B' until something else comes along, and then if that next thing is 'C' drop both of them?

That is not optimal. Those empty transactions is a waste of bandwidth.
We can suppress them if no changes will be sent. test_decoding
implements "skip empty transaction" as you described above and I did
something similar to it. Patch is attached.

I think this significantly reduces the network bandwidth for empty
transactions. I have briefly reviewed the patch and it looks good to
me.

One thing that is not clear to me is how will we advance restart_lsn
if we don't send any empty xact in a system where there are many such
xacts? IIRC, the restart_lsn is advanced based on confirmed_flush lsn
sent by subscriber. After this change, the subscriber won't be able
to send the confirmed_flush and for a long time, we won't be able to
advance restart_lsn. Is that correct, if so, why do we think that is
acceptable? One might argue that restart_lsn will be advanced as soon
as we send the first non-empty xact, but not sure if that is good
enough. What do you think?

It seems like a valid point. One idea could be that we can track the
last commit LSN which we streamed and if the confirmed flush location
is already greater than that then even if we skip the sending the
commit message we can increase the confirm flush location locally.
Logically, it should not cause any problem because once we have got
the confirmation for whatever we have streamed so far. So for other
commits(which we are skipping), we can we advance it locally because
we are sure that we don't have any streamed commit which is not yet
confirmed by the subscriber. This is just my thought, but if we
think from the code and design perspective then it might complicate
the things and sounds hackish.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#7Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#6)
Re: logical replication empty transactions

On Tue, Mar 3, 2020 at 9:35 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Mar 2, 2020 at 4:56 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

One thing that is not clear to me is how will we advance restart_lsn
if we don't send any empty xact in a system where there are many such
xacts? IIRC, the restart_lsn is advanced based on confirmed_flush lsn
sent by subscriber. After this change, the subscriber won't be able
to send the confirmed_flush and for a long time, we won't be able to
advance restart_lsn. Is that correct, if so, why do we think that is
acceptable? One might argue that restart_lsn will be advanced as soon
as we send the first non-empty xact, but not sure if that is good
enough. What do you think?

It seems like a valid point. One idea could be that we can track the
last commit LSN which we streamed and if the confirmed flush location
is already greater than that then even if we skip the sending the
commit message we can increase the confirm flush location locally.
Logically, it should not cause any problem because once we have got
the confirmation for whatever we have streamed so far. So for other
commits(which we are skipping), we can we advance it locally because
we are sure that we don't have any streamed commit which is not yet
confirmed by the subscriber.

Will this work after restart? Do you want to persist the information
of last streamed commit LSN?

This is just my thought, but if we
think from the code and design perspective then it might complicate
the things and sounds hackish.

Another idea could be that we stream the transaction after some
threshold number (say 100 or anything we think is reasonable) of empty
xacts. This will reduce the traffic without tinkering with the core
design too much.

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#8Dilip Kumar
Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#7)
Re: logical replication empty transactions

On Tue, Mar 3, 2020 at 1:54 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Mar 3, 2020 at 9:35 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Mar 2, 2020 at 4:56 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

One thing that is not clear to me is how will we advance restart_lsn
if we don't send any empty xact in a system where there are many such
xacts? IIRC, the restart_lsn is advanced based on confirmed_flush lsn
sent by subscriber. After this change, the subscriber won't be able
to send the confirmed_flush and for a long time, we won't be able to
advance restart_lsn. Is that correct, if so, why do we think that is
acceptable? One might argue that restart_lsn will be advanced as soon
as we send the first non-empty xact, but not sure if that is good
enough. What do you think?

It seems like a valid point. One idea could be that we can track the
last commit LSN which we streamed and if the confirmed flush location
is already greater than that then even if we skip the sending the
commit message we can increase the confirm flush location locally.
Logically, it should not cause any problem because once we have got
the confirmation for whatever we have streamed so far. So for other
commits(which we are skipping), we can we advance it locally because
we are sure that we don't have any streamed commit which is not yet
confirmed by the subscriber.

Will this work after restart? Do you want to persist the information
of last streamed commit LSN?

We will not persist the last streamed commit LSN, this variable is in
memory just to track whether we have got confirmation up to that
location or not, once we have confirmation up to that location and if
we are not streaming any transaction (because those are empty
transactions) then we can just advance the confirmed flush location
and based on that we can update the restart point as well and those
will be persisted. Basically, "last streamed commit LSN" is just a
marker that their still something pending to be confirmed from the
subscriber so until that we can not simply advance the confirm flush
location or restart point based on the empty transactions. But, if
there is nothing pending to be confirmed we can advance. So if we are
streaming then we will get confirmation from subscriber otherwise we
can advance it locally. So, in either case, the confirmed flush
location and restart point will keep moving.

This is just my thought, but if we
think from the code and design perspective then it might complicate
the things and sounds hackish.

Another idea could be that we stream the transaction after some
threshold number (say 100 or anything we think is reasonable) of empty
xacts. This will reduce the traffic without tinkering with the core
design too much.

Yeah, this could be also an option.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#9Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#8)
Re: logical replication empty transactions

On Tue, Mar 3, 2020 at 2:17 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Mar 3, 2020 at 1:54 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Mar 3, 2020 at 9:35 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Mar 2, 2020 at 4:56 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

One thing that is not clear to me is how will we advance restart_lsn
if we don't send any empty xact in a system where there are many such
xacts? IIRC, the restart_lsn is advanced based on confirmed_flush lsn
sent by subscriber. After this change, the subscriber won't be able
to send the confirmed_flush and for a long time, we won't be able to
advance restart_lsn. Is that correct, if so, why do we think that is
acceptable? One might argue that restart_lsn will be advanced as soon
as we send the first non-empty xact, but not sure if that is good
enough. What do you think?

It seems like a valid point. One idea could be that we can track the
last commit LSN which we streamed and if the confirmed flush location
is already greater than that then even if we skip the sending the
commit message we can increase the confirm flush location locally.
Logically, it should not cause any problem because once we have got
the confirmation for whatever we have streamed so far. So for other
commits(which we are skipping), we can we advance it locally because
we are sure that we don't have any streamed commit which is not yet
confirmed by the subscriber.

Will this work after restart? Do you want to persist the information
of last streamed commit LSN?

We will not persist the last streamed commit LSN, this variable is in
memory just to track whether we have got confirmation up to that
location or not, once we have confirmation up to that location and if
we are not streaming any transaction (because those are empty
transactions) then we can just advance the confirmed flush location
and based on that we can update the restart point as well and those
will be persisted. Basically, "last streamed commit LSN" is just a
marker that their still something pending to be confirmed from the
subscriber so until that we can not simply advance the confirm flush
location or restart point based on the empty transactions. But, if
there is nothing pending to be confirmed we can advance. So if we are
streaming then we will get confirmation from subscriber otherwise we
can advance it locally. So, in either case, the confirmed flush
location and restart point will keep moving.

Okay, so this might work out, but it might look a bit ad-hoc.

This is just my thought, but if we
think from the code and design perspective then it might complicate
the things and sounds hackish.

Another idea could be that we stream the transaction after some
threshold number (say 100 or anything we think is reasonable) of empty
xacts. This will reduce the traffic without tinkering with the core
design too much.

Yeah, this could be also an option.

Okay.

Peter E, Petr J, others, do you have any opinion on what is the best
way forward for this thread? I think it would be really good if we
can reduce the network traffic due to these empty transactions.

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#10Euler Taveira
Euler Taveira
euler.taveira@2ndquadrant.com
In reply to: Amit Kapila (#7)
Re: logical replication empty transactions

On Tue, 3 Mar 2020 at 05:24, Amit Kapila <amit.kapila16@gmail.com> wrote:

Another idea could be that we stream the transaction after some
threshold number (say 100 or anything we think is reasonable) of empty
xacts. This will reduce the traffic without tinkering with the core
design too much.

Amit, I suggest an interval to control this setting. Time is something we

have control; transactions aren't (depending on workload).
pg_stat_replication query interval usually is not milliseconds, however,
you can execute thousands of transactions in a second. If we agree on that
idea I can add it to the patch.

Regards,

--
Euler Taveira http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#11Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Euler Taveira (#10)
Re: logical replication empty transactions

On Wed, Mar 4, 2020 at 7:17 AM Euler Taveira
<euler.taveira@2ndquadrant.com> wrote:

On Tue, 3 Mar 2020 at 05:24, Amit Kapila <amit.kapila16@gmail.com> wrote:

Another idea could be that we stream the transaction after some
threshold number (say 100 or anything we think is reasonable) of empty
xacts. This will reduce the traffic without tinkering with the core
design too much.

Amit, I suggest an interval to control this setting. Time is something we have control; transactions aren't (depending on workload). pg_stat_replication query interval usually is not milliseconds, however, you can execute thousands of transactions in a second. If we agree on that idea I can add it to the patch.

Do you mean to say that if for some threshold interval we didn't
stream any transaction, then we can send the next empty transaction to
the subscriber? If so, then isn't it possible that the empty xacts
happen irregularly after the specified interval and then we still end
up sending them all. I might be missing something here, so can you
please explain your idea in detail? Basically, how will it work and
how will it solve the problem.

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#12Dilip Kumar
Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#11)
Re: logical replication empty transactions

On Wed, Mar 4, 2020 at 9:12 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Mar 4, 2020 at 7:17 AM Euler Taveira
<euler.taveira@2ndquadrant.com> wrote:

On Tue, 3 Mar 2020 at 05:24, Amit Kapila <amit.kapila16@gmail.com> wrote:

Another idea could be that we stream the transaction after some
threshold number (say 100 or anything we think is reasonable) of empty
xacts. This will reduce the traffic without tinkering with the core
design too much.

Amit, I suggest an interval to control this setting. Time is something we have control; transactions aren't (depending on workload). pg_stat_replication query interval usually is not milliseconds, however, you can execute thousands of transactions in a second. If we agree on that idea I can add it to the patch.

Do you mean to say that if for some threshold interval we didn't
stream any transaction, then we can send the next empty transaction to
the subscriber? If so, then isn't it possible that the empty xacts
happen irregularly after the specified interval and then we still end
up sending them all. I might be missing something here, so can you
please explain your idea in detail? Basically, how will it work and
how will it solve the problem.

IMHO, the threshold should be based on the commit LSN. Our main
reason we want to send empty transactions after a certain
transaction/duration is that we want the restart_lsn to be moving
forward so that if we need to restart the replication slot we don't
need to process a lot of extra WAL. So assume we set the threshold
based on transaction count then there is still a possibility that we
might process a few very big transactions then we will have to process
them again after the restart. OTOH, if we set based on an interval
then even if there is not much work going on, still we end up sending
the empty transaction as pointed by Amit.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#13Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#12)
Re: logical replication empty transactions

On Wed, Mar 4, 2020 at 9:52 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Wed, Mar 4, 2020 at 9:12 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Mar 4, 2020 at 7:17 AM Euler Taveira
<euler.taveira@2ndquadrant.com> wrote:

On Tue, 3 Mar 2020 at 05:24, Amit Kapila <amit.kapila16@gmail.com> wrote:

Another idea could be that we stream the transaction after some
threshold number (say 100 or anything we think is reasonable) of empty
xacts. This will reduce the traffic without tinkering with the core
design too much.

Amit, I suggest an interval to control this setting. Time is something we have control; transactions aren't (depending on workload). pg_stat_replication query interval usually is not milliseconds, however, you can execute thousands of transactions in a second. If we agree on that idea I can add it to the patch.

Do you mean to say that if for some threshold interval we didn't
stream any transaction, then we can send the next empty transaction to
the subscriber? If so, then isn't it possible that the empty xacts
happen irregularly after the specified interval and then we still end
up sending them all. I might be missing something here, so can you
please explain your idea in detail? Basically, how will it work and
how will it solve the problem.

IMHO, the threshold should be based on the commit LSN. Our main
reason we want to send empty transactions after a certain
transaction/duration is that we want the restart_lsn to be moving
forward so that if we need to restart the replication slot we don't
need to process a lot of extra WAL. So assume we set the threshold
based on transaction count then there is still a possibility that we
might process a few very big transactions then we will have to process
them again after the restart.

Won't the subscriber eventually send the flush location for the large
transactions which will move the restart_lsn?

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#14Dilip Kumar
Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#13)
Re: logical replication empty transactions

On Wed, Mar 4, 2020 at 10:50 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Mar 4, 2020 at 9:52 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Wed, Mar 4, 2020 at 9:12 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Mar 4, 2020 at 7:17 AM Euler Taveira
<euler.taveira@2ndquadrant.com> wrote:

On Tue, 3 Mar 2020 at 05:24, Amit Kapila <amit.kapila16@gmail.com> wrote:

Another idea could be that we stream the transaction after some
threshold number (say 100 or anything we think is reasonable) of empty
xacts. This will reduce the traffic without tinkering with the core
design too much.

Amit, I suggest an interval to control this setting. Time is something we have control; transactions aren't (depending on workload). pg_stat_replication query interval usually is not milliseconds, however, you can execute thousands of transactions in a second. If we agree on that idea I can add it to the patch.

Do you mean to say that if for some threshold interval we didn't
stream any transaction, then we can send the next empty transaction to
the subscriber? If so, then isn't it possible that the empty xacts
happen irregularly after the specified interval and then we still end
up sending them all. I might be missing something here, so can you
please explain your idea in detail? Basically, how will it work and
how will it solve the problem.

IMHO, the threshold should be based on the commit LSN. Our main
reason we want to send empty transactions after a certain
transaction/duration is that we want the restart_lsn to be moving
forward so that if we need to restart the replication slot we don't
need to process a lot of extra WAL. So assume we set the threshold
based on transaction count then there is still a possibility that we
might process a few very big transactions then we will have to process
them again after the restart.

Won't the subscriber eventually send the flush location for the large
transactions which will move the restart_lsn?

I meant large empty transactions (basically we can not send anything
to the subscriber). So my point was if there are only large
transactions in the system which we can not stream because those
tables are not published. Then keeping threshold based on transaction
count will not help much because even if we don't reach the
transaction count threshold, we still might need to process a lot of
data if we don't stream the commit for the empty transactions. So
instead of tracking transaction count can we track LSN, and LSN
different since we last stream some change cross the threshold then we
will stream the next empty transaction.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#15Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#14)
Re: logical replication empty transactions

On Wed, Mar 4, 2020 at 11:16 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Wed, Mar 4, 2020 at 10:50 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Mar 4, 2020 at 9:52 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

IMHO, the threshold should be based on the commit LSN. Our main
reason we want to send empty transactions after a certain
transaction/duration is that we want the restart_lsn to be moving
forward so that if we need to restart the replication slot we don't
need to process a lot of extra WAL. So assume we set the threshold
based on transaction count then there is still a possibility that we
might process a few very big transactions then we will have to process
them again after the restart.

Won't the subscriber eventually send the flush location for the large
transactions which will move the restart_lsn?

I meant large empty transactions (basically we can not send anything
to the subscriber). So my point was if there are only large
transactions in the system which we can not stream because those
tables are not published. Then keeping threshold based on transaction
count will not help much because even if we don't reach the
transaction count threshold, we still might need to process a lot of
data if we don't stream the commit for the empty transactions. So
instead of tracking transaction count can we track LSN, and LSN
different since we last stream some change cross the threshold then we
will stream the next empty transaction.

You have a point and it may be better to keep threshold based on LSN
if we want to keep any threshold, but keeping on transaction count
seems to be a bit straightforward. Let us see if anyone else has any
opinion on this matter?

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#16Dilip Kumar
Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#15)
Re: logical replication empty transactions

On Wed, Mar 4, 2020 at 3:47 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Mar 4, 2020 at 11:16 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Wed, Mar 4, 2020 at 10:50 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Mar 4, 2020 at 9:52 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

IMHO, the threshold should be based on the commit LSN. Our main
reason we want to send empty transactions after a certain
transaction/duration is that we want the restart_lsn to be moving
forward so that if we need to restart the replication slot we don't
need to process a lot of extra WAL. So assume we set the threshold
based on transaction count then there is still a possibility that we
might process a few very big transactions then we will have to process
them again after the restart.

Won't the subscriber eventually send the flush location for the large
transactions which will move the restart_lsn?

I meant large empty transactions (basically we can not send anything
to the subscriber). So my point was if there are only large
transactions in the system which we can not stream because those
tables are not published. Then keeping threshold based on transaction
count will not help much because even if we don't reach the
transaction count threshold, we still might need to process a lot of
data if we don't stream the commit for the empty transactions. So
instead of tracking transaction count can we track LSN, and LSN
different since we last stream some change cross the threshold then we
will stream the next empty transaction.

You have a point and it may be better to keep threshold based on LSN
if we want to keep any threshold, but keeping on transaction count
seems to be a bit straightforward. Let us see if anyone else has any
opinion on this matter?

Ok, that make sense.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#17Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#16)
Re: logical replication empty transactions

On Wed, Mar 4, 2020 at 4:04 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Wed, Mar 4, 2020 at 3:47 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Mar 4, 2020 at 11:16 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Wed, Mar 4, 2020 at 10:50 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Mar 4, 2020 at 9:52 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

IMHO, the threshold should be based on the commit LSN. Our main
reason we want to send empty transactions after a certain
transaction/duration is that we want the restart_lsn to be moving
forward so that if we need to restart the replication slot we don't
need to process a lot of extra WAL. So assume we set the threshold
based on transaction count then there is still a possibility that we
might process a few very big transactions then we will have to process
them again after the restart.

Won't the subscriber eventually send the flush location for the large
transactions which will move the restart_lsn?

I meant large empty transactions (basically we can not send anything
to the subscriber). So my point was if there are only large
transactions in the system which we can not stream because those
tables are not published. Then keeping threshold based on transaction
count will not help much because even if we don't reach the
transaction count threshold, we still might need to process a lot of
data if we don't stream the commit for the empty transactions. So
instead of tracking transaction count can we track LSN, and LSN
different since we last stream some change cross the threshold then we
will stream the next empty transaction.

You have a point and it may be better to keep threshold based on LSN
if we want to keep any threshold, but keeping on transaction count
seems to be a bit straightforward. Let us see if anyone else has any
opinion on this matter?

Ok, that make sense.

Euler, can we try to update the patch based on the number of
transactions threshold and see how it works?

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#18Euler Taveira
Euler Taveira
euler.taveira@2ndquadrant.com
In reply to: Amit Kapila (#17)
Re: logical replication empty transactions

On Thu, 5 Mar 2020 at 05:45, Amit Kapila <amit.kapila16@gmail.com> wrote:

Euler, can we try to update the patch based on the number of
transactions threshold and see how it works?

I will do.

--
Euler Taveira http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#19Craig Ringer
Craig Ringer
craig@2ndquadrant.com
In reply to: Amit Kapila (#5)
Re: logical replication empty transactions

On Mon, 2 Mar 2020 at 19:26, Amit Kapila <amit.kapila16@gmail.com> wrote:

One thing that is not clear to me is how will we advance restart_lsn
if we don't send any empty xact in a system where there are many such
xacts?

Same way we already do it for writes that are not replicated over
logical replication, like vacuum work etc. The upstream sends feedback
with reply-requested. The downstream replies. The upstream advances
confirmed_flush_lsn, and that lazily updates restart_lsn.

The bigger issue here is that if you don't send empty txns on logical
replication you don't get an eager, timely response from the
replica(s), which delays synchronous replication. You need to send
empty txns when synchronous replication is enabled, or instead poke
the walsender to force immediate feedback with reply requested.

--
Craig Ringer http://www.2ndQuadrant.com/
2ndQuadrant - PostgreSQL Solutions for the Enterprise

#20Andres Freund
Andres Freund
andres@anarazel.de
In reply to: Craig Ringer (#19)
Re: logical replication empty transactions

Hi,

On 2020-03-06 13:53:02 +0800, Craig Ringer wrote:

On Mon, 2 Mar 2020 at 19:26, Amit Kapila <amit.kapila16@gmail.com> wrote:

One thing that is not clear to me is how will we advance restart_lsn
if we don't send any empty xact in a system where there are many such
xacts?

Same way we already do it for writes that are not replicated over
logical replication, like vacuum work etc. The upstream sends feedback
with reply-requested. The downstream replies. The upstream advances
confirmed_flush_lsn, and that lazily updates restart_lsn.

It'll still delay it a bit.

The bigger issue here is that if you don't send empty txns on logical
replication you don't get an eager, timely response from the
replica(s), which delays synchronous replication. You need to send
empty txns when synchronous replication is enabled, or instead poke
the walsender to force immediate feedback with reply requested.

Somewhat independent from the issue at hand: It'd be really good if we
could evolve the syncrep framework to support per-database waiting... It
shouldn't be that hard, and the current situation sucks quite a bit (and
yes, I'm to blame).

I'm not quite sure what you mean by "poke the walsender"? Kinda sounds
like sending a signal, but decoding happens inside after the walsender,
so there's no need for that. Do you just mean somehow requesting that
walsender sends a feedback message?

To address the volume we could:

1a) Introduce a pgoutput message type to indicate that the LSN has
advanced, without needing separate BEGIN/COMMIT. Right now BEGIN is
21 bytes, COMMIT is 26. But we really don't need that much here. A
single message should do the trick.

1b) Add a LogicalOutputPluginWriterUpdateProgress parameter (and
possibly rename) that indicates that we are intentionally "ignoring"
WAL. For walsender that callback then could check if it could just
forward the position of the client (if it was entirely caught up
before), or if it should send a feedback request (if syncrep is
enabled, or distance is big).

2) Reduce the rate of 'empty transaction'/feedback request messages. If
we know that we're not going to be blocked waiting for more WAL, or
blocked sending messages out to the network, we don't immediately need
to send out the messages. Instead we could continue decoding until
there's actual data, or until we're going to get blocked.

We could e.g. have a new LogicalDecodingContext callback that is
called whenever WalSndWaitForWal() would wait. That'd check if there's
a pending "need" to send out a 'empty transaction'/feedback request
message. The "need" flag would get cleared whenever we send out data
bearing an LSN for other reasons.

Greetings,

Andres Freund

#21Craig Ringer
Craig Ringer
craig@2ndquadrant.com
In reply to: Andres Freund (#20)
Re: logical replication empty transactions

On Tue, 10 Mar 2020 at 02:30, Andres Freund <andres@anarazel.de> wrote:

Hi,

On 2020-03-06 13:53:02 +0800, Craig Ringer wrote:

On Mon, 2 Mar 2020 at 19:26, Amit Kapila <amit.kapila16@gmail.com>

wrote:

One thing that is not clear to me is how will we advance restart_lsn
if we don't send any empty xact in a system where there are many such
xacts?

Same way we already do it for writes that are not replicated over
logical replication, like vacuum work etc. The upstream sends feedback
with reply-requested. The downstream replies. The upstream advances
confirmed_flush_lsn, and that lazily updates restart_lsn.

It'll still delay it a bit.

Right, but we don't generally care because there's no sync rep txn waiting
for confirmation. If we lose progress due to a crash it doesn't matter. It
does delay removal of old WAL a little, but it hardly matters.

Somewhat independent from the issue at hand: It'd be really good if we
could evolve the syncrep framework to support per-database waiting... It
shouldn't be that hard, and the current situation sucks quite a bit (and
yes, I'm to blame).

Hardly, you just didn't get the chance to fix that on top of the umpteen
other things you had to change to make all the logical stuff work. You
didn't break it, just didn't implement every single possible enhancement
all at once. Shocking, I tell you.

I'm not quite sure what you mean by "poke the walsender"? Kinda sounds

like sending a signal, but decoding happens inside after the walsender,
so there's no need for that. Do you just mean somehow requesting that
walsender sends a feedback message?

Right. I had in mind something like sending a ProcSignal via our funky
multiplexed signal mechanism to ask the walsender to immediately generate a
keepalive message with a reply-requested flag, then set the walsender's
latch so we wake it promptly.

To address the volume we could:

1a) Introduce a pgoutput message type to indicate that the LSN has
advanced, without needing separate BEGIN/COMMIT. Right now BEGIN is
21 bytes, COMMIT is 26. But we really don't need that much here. A
single message should do the trick.

It would. Is it worth caring though? Especially since it seems rather
unlikely that the actual network data volume of begin/commit msgs will be
much of a concern. It's not like we're PITRing logical streams, and if we
did, we could just filter out empty commits on the receiver side.

That message pretty much already exists in the form of a walsender
keepalive anyway so we might as well re-use that and not upset the protocol.

1b) Add a LogicalOutputPluginWriterUpdateProgress parameter (and
possibly rename) that indicates that we are intentionally "ignoring"
WAL. For walsender that callback then could check if it could just
forward the position of the client (if it was entirely caught up
before), or if it should send a feedback request (if syncrep is
enabled, or distance is big).

I can see something like that being very useful, because at present only
the output plugin knows if a txn is "empty" as far as that particular slot
and output plugin is concerned. The reorder buffering mechanism cannot do
relation-level filtering before it sends the changes to the output plugin
during ReorderBufferCommit, since it only knows about relfilenodes not
relation oids. And the output plugin might be doing finer grained filtering
using row-filter expressions or who knows what else.

But as described above that will only help for txns done in DBs other than
the one the logical slot is for or txns known to have an empty
ReorderBuffer when the commit is seen.

If there's a txn in the slot's db with a non-empty reorderbuffer, the
output plugin won't know if the txn is empty or not until it finishes
processing all callbacks and sees the commit for the txn. So it will
generally have emitted the Begin message on the wire by the time it knows
it has nothing useful to say. And Pg won't know that this txn is empty as
far as this output plugin with this particular slot, set of output plugin
params, and current user-catalog state is concerned, so it won't have any
way to call the output plugin's "update progress" callback instead of the
usual begin/change/commit callbacks.

But I think we can already skip empty txns unless sync-rep is enabled with
no core changes, and send empty txns as walsender keepalives instead, by
altering only output plugins, like this:

* Stash BEGIN data in plugin's LogicalDecodingContext.output_plugin_private
when plugin's begin callback called, don't write anything to the outstream
* Write out BEGIN message lazily when any other callback generates a
message that does need to be written out
* If no BEGIN written by the time COMMIT callback called, discard the
COMMIT too. Check if sync rep enabled. if it is,
call LogicalDecodingContext.update_progress from within the output plugin
commit handler, otherwise just ignore the commit totally. Probably by
calling OutputPluginUpdateProgress().

We could e.g. have a new LogicalDecodingContext callback that is

called whenever WalSndWaitForWal() would wait. That'd check if there's
a pending "need" to send out a 'empty transaction'/feedback request
message. The "need" flag would get cleared whenever we send out data
bearing an LSN for other reasons.

I can see that being handy, yes. But it won't necessarily help with the
sync rep issue, since other sync rep txns may continue to generate WAL
while others wait for commit-confirmations that won't come from the logical
replica.

While we're speaking of adding output plugin hooks, I keep on trying to
think of a sensible way to do a plugin-defined reply handler, so the
downstream end can send COPY BOTH messages of some new msgkind back to the
walsender, which will pass them to the output plugin if it implements the
appropriate handle_reply_message (or whatever) callback. That much is
trivial to implement, where I keep getting a bit stuck is with whether
there's a sensible snapshot that can be set to call the output plugin reply
handler with. We wouldn't want to switch to a current non-historic snapshot
because of all the cache flushes that'd cause, but there isn't necessarily
a valid and safe historic snapshot to set when we're not within
ReorderBufferCommit is there?

I'd love to get rid of the need to "connect back" to a provider over plain
libpq connections to communicate with it. The ability to run SQL on the
walsender conn helps. But really, so much more would be possible if we
could just have the downstream end *reply* on the same connection using
COPY BOTH, much like it sends replay progress updates right now. It'd let
us manage relation/attribute/type metadata caches better for example.

Thoughts?

--
Craig Ringer http://www.2ndQuadrant.com/
2ndQuadrant - PostgreSQL Solutions for the Enterprise

#22Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Craig Ringer (#21)
Re: logical replication empty transactions

The patch no longer applies, because of additions in the test source. Otherwise, I have tested the patch and confirmed that updates and deletes on tables with deferred primary keys work with logical replication.

The new status of this patch is: Waiting on Author

#23Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Ajin Cherian (#22)
Re: logical replication empty transactions

Sorry, I replied in the wrong thread. Please ignore above mail.

Show quoted text
#24Rahila Syed
Rahila Syed
rahila.syed@2ndquadrant.com
In reply to: Craig Ringer (#21)
Re: logical replication empty transactions

Hi,

Please see below review of the
0001-Skip-empty-transactions-for-logical-replication.patch

The make check passes.

 +               /* output BEGIN if we haven't yet */
 +               if (!data->xact_wrote_changes)
 +                       pgoutput_begin(ctx, txn);
 +
 +               data->xact_wrote_changes = true;
 +
IMO, xact_wrote_changes flag is better set inside the if condition as it
does not need to
be set repeatedly in subsequent calls to the same function.

* Stash BEGIN data in plugin's
LogicalDecodingContext.output_plugin_private when plugin's begin
callback called, don't write anything to the outstream
* Write out BEGIN message lazily when any other callback generates a
message that does need to be written out
* If no BEGIN written by the time COMMIT callback called, discard the
COMMIT too. Check if sync rep enabled. if it is,
call LogicalDecodingContext.update_progress
from within the output plugin commit handler, otherwise just ignore
the commit totally. Probably by calling OutputPluginUpdateProgress().

I think the code in the patch is similar to what has been described by
Craig in the above snippet,
except instead of stashing the BEGIN message and sending the message
lazily, it simply maintains a flag
in LogicalDecodingContext.output_plugin_private which defers calling
output plugin's begin callback,
until any other callback actually generates a remote write.

Also, the patch does not contain the last part where he describes
having OutputPluginUpdateProgress()
for synchronous replication enabled transactions.
However, some basic testing suggests that the patch does not have any
notable adverse effect on
either the replication lag or the sync_rep performance.

I performed tests by setting up publisher and subscriber on the same
machine with synchronous_commit = on and
ran pgbench -c 12 -j 6 -T 300 on unpublished pgbench tables.

I see that  confirmed_flush_lsn is catching up just fine without any
notable delay as compared to the test results without
the patch.

Also, the TPS for synchronous replication of empty txns with and without
the patch remains similar.

Having said that, these are initial findings and I understand better
performance tests are required to measure
reduction in consumption of network bandwidth and impact on synchronous
replication and replication lag.

Thank you,
Rahila Syed

#25Michael Paquier
Michael Paquier
michael@paquier.xyz
In reply to: Rahila Syed (#24)
Re: logical replication empty transactions

On Wed, Jul 29, 2020 at 08:08:06PM +0530, Rahila Syed wrote:

The make check passes.

Since then, the patch is failing to apply, waiting on author and the
thread has died 6 weeks or so ago, so I am marking it as RwF in the
CF.
--
Michael

#26Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Michael Paquier (#25)
1 attachment(s)
Re: logical replication empty transactions

On Thu, Sep 17, 2020 at 3:29 PM Michael Paquier <michael@paquier.xyz> wrote:

On Wed, Jul 29, 2020 at 08:08:06PM +0530, Rahila Syed wrote:

The make check passes.

Since then, the patch is failing to apply, waiting on author and the
thread has died 6 weeks or so ago, so I am marking it as RwF in the
CF.

I've rebased the patch and made changes so that the patch supports
"streaming in-progress transactions" and handling of logical decoding
messages (transactional and non-transactional).
I see that this patch not only makes sure that empty transactions are not
sent but also does call OutputPluginUpdateProgress when an empty
transaction is not sent, as a result the confirmed_flush_lsn is kept
moving. I also see no hangs when synchronous_standby is configured.
Do let me know your thoughts on this patch.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v2-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v2-0001-Skip-empty-transactions-for-logical-replication.patch
#27Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Ajin Cherian (#26)
1 attachment(s)
Re: logical replication empty transactions

On Thu, Apr 15, 2021 at 1:29 PM Ajin Cherian <itsajin@gmail.com> wrote:

I've rebased the patch and made changes so that the patch supports
"streaming in-progress transactions" and handling of logical decoding
messages (transactional and non-transactional).
I see that this patch not only makes sure that empty transactions are not
sent but also does call OutputPluginUpdateProgress when an empty
transaction is not sent, as a result the confirmed_flush_lsn is kept
moving. I also see no hangs when synchronous_standby is configured.
Do let me know your thoughts on this patch.

Removed some debug logs and typos.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v3-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v3-0001-Skip-empty-transactions-for-logical-replication.patch
#28Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: Ajin Cherian (#27)
Re: logical replication empty transactions

On Thu, Apr 15, 2021 at 4:39 PM Ajin Cherian <itsajin@gmail.com> wrote:

On Thu, Apr 15, 2021 at 1:29 PM Ajin Cherian <itsajin@gmail.com> wrote:

I've rebased the patch and made changes so that the patch supports "streaming in-progress transactions" and handling of logical decoding
messages (transactional and non-transactional).
I see that this patch not only makes sure that empty transactions are not sent but also does call OutputPluginUpdateProgress when an empty
transaction is not sent, as a result the confirmed_flush_lsn is kept moving. I also see no hangs when synchronous_standby is configured.
Do let me know your thoughts on this patch.

REVIEW COMMENTS

I applied this patch to today's HEAD and successfully ran "make check"
and also the subscription TAP tests.

Here are a some review comments:

------

1. The patch v3 applied OK but with whitespace warnings

[postgres@CentOS7-x64 oss_postgres_2PC]$ git apply
../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch
../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch:98:
indent with spaces.
/* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */
../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch:99:
indent with spaces.
if (!data->xact_wrote_changes && !in_streaming && transactional)
warning: 2 lines add whitespace errors.

------

2. Please create a CF entry in [1]https://commitfest.postgresql.org/33/ for this patch.

------

3. Patch comment

The comment describes the problem and then suddenly just says
"Postpone the BEGIN message until the first change."

I suggest changing it to say more like... "(blank line) This patch
addresses the above problem by postponing the BEGIN message until the
first change."

------

4. pgoutput.h

Maybe for consistency with the context member, the comment for the new
member should be to the right instead of above it?

@@ -20,6 +20,9 @@ typedef struct PGOutputData
MemoryContext context; /* private memory context for transient
* allocations */

+ /* flag indicating whether messages have previously been sent */
+ bool        xact_wrote_changes;
+

------

5. pgoutput.h

+ /* flag indicating whether messages have previously been sent */

"previously been sent" --> "already been sent" ??

------

6. pgoutput.h - misleading member name

Actually, now that I have read all the rest of the code and how this
member is used I feel that this name is very misleading. e.g. For
"streaming" case then you still are writing changes but are not
setting this member at all - therefore it does not always mean what it
says.

I feel a better name for this would be something like
"sent_begin_txn". Then if you have sent BEGIN it is true. If you
haven't sent BEGIN it is false. It eliminates all ambiguity naming it
this way instead.

(This makes my feedback #5 redundant because the comment will be a bit
different if you do this).

------

7. pgoutput.c - function pgoutput_begin_txn

@@ -345,6 +345,23 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
static void
pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{

I guess that you still needed to pass the txn because that is how the
API is documented, right?

But I am wondering if you ought to flag it as unused so you wont get
some BF machine giving warnings about it.

e.g. Syntax like this?

pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN * txn) {
(void)txn;
...

------

8. pgoutput.c - function pgoutput_begin_txn

@@ -345,6 +345,23 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+ PGOutputData *data = ctx->output_plugin_private;
+
+ /*
+ * Don't send BEGIN message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
+ * using bandwidth on something with little/no use for logical replication.
+ */
+ data->xact_wrote_changes = false;
+ elog(LOG,"Holding of begin");
+}

Why is this loglevel LOG? Looks like leftover debugging.

------

9. pgoutput.c - function pgoutput_commit_txn

@@ -384,8 +401,14 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
  XLogRecPtr commit_lsn)
 {
+ PGOutputData *data = ctx->output_plugin_private;
+
  OutputPluginUpdateProgress(ctx);
+ /* skip COMMIT message if nothing was sent */
+ if (!data->xact_wrote_changes)
+ return;
+

In the case where you decided to do nothing does it make sense that
you still called the function OutputPluginUpdateProgress(ctx); ?
I thought perhaps that your new check should come first so this call
would never happen.

------

10. pgoutput.c - variable declarations without casts

+ PGOutputData *data = ctx->output_plugin_private;

I noticed the new stack variable you declare have no casts.

This differs from the existing code which always looks like:
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;

There are a couple of examples of this so please search new code to
find them all.

------

11. pgoutput.c - function pgoutput_change

@@ -551,6 +574,13 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Assert(false);
}

+ /* output BEGIN if we haven't yet */
+ if (!data->xact_wrote_changes && !in_streaming)
+ {
+ pgoutput_begin(ctx, txn);
+ data->xact_wrote_changes = true;
+ }

If the variable is renamed as previously suggested then the assignment
data->sent_BEGIN_txn = true; can be assigned in just 1 common place
INSIDE the pgoutput_begin function.

------

12. pgoutput.c - pgoutput_truncate function

@@ -693,6 +723,13 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,

  if (nrelids > 0)
  {
+ /* output BEGIN if we haven't yet */
+ if (!data->xact_wrote_changes && !in_streaming)
+ {
+ pgoutput_begin(ctx, txn);
+ data->xact_wrote_changes = true;
+ }

(same comment as above)

If the variable is renamed as previously suggested then the assignment
data->sent_BEGIN_txn = true; can be assigned in just 1 common place
INSIDE the pgoutput_begin function.

13. pgoutput.c - pgoutput_message

@@ -725,6 +762,13 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
if (in_streaming)
xid = txn->xid;

+    /* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */
+    if (!data->xact_wrote_changes && !in_streaming && transactional)
+ {
+ pgoutput_begin(ctx, txn);
+ data->xact_wrote_changes = true;
+ }

(same comment as above)

If the variable is renamed as previously suggested then the assignment
data->sent_BEGIN_txn = true; can be assigned in just 1 common place
INSIDE the pgoutput_begin function.

------

14. Test Code.

I noticed that there is no test code specifically for seeing if empty
transactions get sent or not. Is it possible to write such a test or
is this traffic improvement only observable using the debugger?

------
[1]: https://commitfest.postgresql.org/33/

Kind Regards,
Peter Smith.
Fujitsu Australia

#29Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Peter Smith (#28)
1 attachment(s)
Re: logical replication empty transactions

On Mon, Apr 19, 2021 at 6:22 PM Peter Smith <smithpb2250@gmail.com> wrote:

Here are a some review comments:

------

1. The patch v3 applied OK but with whitespace warnings

[postgres@CentOS7-x64 oss_postgres_2PC]$ git apply

../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch

../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch:98:
indent with spaces.
/* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */

../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch:99:
indent with spaces.
if (!data->xact_wrote_changes && !in_streaming && transactional)
warning: 2 lines add whitespace errors.

------

Fixed.

2. Please create a CF entry in [1] for this patch.

------

3. Patch comment

The comment describes the problem and then suddenly just says
"Postpone the BEGIN message until the first change."

I suggest changing it to say more like... "(blank line) This patch
addresses the above problem by postponing the BEGIN message until the
first change."

------

Updated.

4. pgoutput.h

Maybe for consistency with the context member, the comment for the new
member should be to the right instead of above it?

@@ -20,6 +20,9 @@ typedef struct PGOutputData
MemoryContext context; /* private memory context for transient
* allocations */

+ /* flag indicating whether messages have previously been sent */
+ bool        xact_wrote_changes;
+

------

5. pgoutput.h

+ /* flag indicating whether messages have previously been sent */

"previously been sent" --> "already been sent" ??

------

6. pgoutput.h - misleading member name

Actually, now that I have read all the rest of the code and how this
member is used I feel that this name is very misleading. e.g. For
"streaming" case then you still are writing changes but are not
setting this member at all - therefore it does not always mean what it
says.

I feel a better name for this would be something like
"sent_begin_txn". Then if you have sent BEGIN it is true. If you
haven't sent BEGIN it is false. It eliminates all ambiguity naming it
this way instead.

(This makes my feedback #5 redundant because the comment will be a bit
different if you do this).

------

Fixed above comments.

7. pgoutput.c - function pgoutput_begin_txn

@@ -345,6 +345,23 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
static void
pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{

I guess that you still needed to pass the txn because that is how the
API is documented, right?

But I am wondering if you ought to flag it as unused so you wont get
some BF machine giving warnings about it.

e.g. Syntax like this?

pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN * txn) {
(void)txn;
...

Updated.

------

8. pgoutput.c - function pgoutput_begin_txn

@@ -345,6 +345,23 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
static void
pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
+ PGOutputData *data = ctx->output_plugin_private;
+
+ /*
+ * Don't send BEGIN message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were
on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
+ * using bandwidth on something with little/no use for logical
replication.
+ */
+ data->xact_wrote_changes = false;
+ elog(LOG,"Holding of begin");
+}

Why is this loglevel LOG? Looks like leftover debugging.

Removed.

------

9. pgoutput.c - function pgoutput_commit_txn

@@ -384,8 +401,14 @@ static void
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
+ PGOutputData *data = ctx->output_plugin_private;
+
OutputPluginUpdateProgress(ctx);
+ /* skip COMMIT message if nothing was sent */
+ if (!data->xact_wrote_changes)
+ return;
+

In the case where you decided to do nothing does it make sense that
you still called the function OutputPluginUpdateProgress(ctx); ?
I thought perhaps that your new check should come first so this call
would never happen.

Even though the empty transaction is not sent, the LSN is tracked as
decoded, hence the progress needs to be updated.

------

10. pgoutput.c - variable declarations without casts

+ PGOutputData *data = ctx->output_plugin_private;

I noticed the new stack variable you declare have no casts.

This differs from the existing code which always looks like:
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;

There are a couple of examples of this so please search new code to
find them all.

-----

Fixed.

11. pgoutput.c - function pgoutput_change

@@ -551,6 +574,13 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Assert(false);
}

+ /* output BEGIN if we haven't yet */
+ if (!data->xact_wrote_changes && !in_streaming)
+ {
+ pgoutput_begin(ctx, txn);
+ data->xact_wrote_changes = true;
+ }

If the variable is renamed as previously suggested then the assignment
data->sent_BEGIN_txn = true; can be assigned in just 1 common place
INSIDE the pgoutput_begin function.

------

Updated.

12. pgoutput.c - pgoutput_truncate function

@@ -693,6 +723,13 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,

if (nrelids > 0)
{
+ /* output BEGIN if we haven't yet */
+ if (!data->xact_wrote_changes && !in_streaming)
+ {
+ pgoutput_begin(ctx, txn);
+ data->xact_wrote_changes = true;
+ }

(same comment as above)

If the variable is renamed as previously suggested then the assignment
data->sent_BEGIN_txn = true; can be assigned in just 1 common place
INSIDE the pgoutput_begin function.

13. pgoutput.c - pgoutput_message

@@ -725,6 +762,13 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
if (in_streaming)
xid = txn->xid;

+    /* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */
+    if (!data->xact_wrote_changes && !in_streaming && transactional)
+ {
+ pgoutput_begin(ctx, txn);
+ data->xact_wrote_changes = true;
+ }

(same comment as above)

If the variable is renamed as previously suggested then the assignment
data->sent_BEGIN_txn = true; can be assigned in just 1 common place
INSIDE the pgoutput_begin function.

------

Fixed.

14. Test Code.

I noticed that there is no test code specifically for seeing if empty
transactions get sent or not. Is it possible to write such a test or
is this traffic improvement only observable using the debugger?

The 020_messages.pl actually has a test case for tracking empty messages
even though it is part of the messages test.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v4-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v4-0001-Skip-empty-transactions-for-logical-replication.patch
#30Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Ajin Cherian (#29)
Re: logical replication empty transactions

An earlier comment from Anders:

We could e.g. have a new LogicalDecodingContext callback that is
called whenever WalSndWaitForWal() would wait. That'd check if there's
a pending "need" to send out a 'empty transaction'/feedback request
message. The "need" flag would get cleared whenever we send out data
bearing an LSN for other reasons.

I think the current Keep Alive messages already achieve this by
sending the current LSN as part of the Keep Alive messages.
/* construct the message... */
resetStringInfo(&output_message);
pq_sendbyte(&output_message, 'k');
pq_sendint64(&output_message, sentPtr); <=== Last sent WAL LSN
pq_sendint64(&output_message, GetCurrentTimestamp());
pq_sendbyte(&output_message, requestReply ? 1 : 0);

I'm not sure if anything more is required to keep empty transactions
updated as part of synchronous replicas. If my understanding on this
is not correct, let me know.

regards,
Ajin Cherian
Fujitsu Australia

#31Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: Ajin Cherian (#29)
Re: logical replication empty transactions

On Fri, Apr 23, 2021 at 3:46 PM Ajin Cherian <itsajin@gmail.com> wrote:

On Mon, Apr 19, 2021 at 6:22 PM Peter Smith <smithpb2250@gmail.com> wrote:

Here are a some review comments:

------

1. The patch v3 applied OK but with whitespace warnings

[postgres@CentOS7-x64 oss_postgres_2PC]$ git apply
../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch
../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch:98:
indent with spaces.
/* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */
../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch:99:
indent with spaces.
if (!data->xact_wrote_changes && !in_streaming && transactional)
warning: 2 lines add whitespace errors.

------

Fixed.

2. Please create a CF entry in [1] for this patch.

------

3. Patch comment

The comment describes the problem and then suddenly just says
"Postpone the BEGIN message until the first change."

I suggest changing it to say more like... "(blank line) This patch
addresses the above problem by postponing the BEGIN message until the
first change."

------

Updated.

4. pgoutput.h

Maybe for consistency with the context member, the comment for the new
member should be to the right instead of above it?

@@ -20,6 +20,9 @@ typedef struct PGOutputData
MemoryContext context; /* private memory context for transient
* allocations */

+ /* flag indicating whether messages have previously been sent */
+ bool        xact_wrote_changes;
+

------

5. pgoutput.h

+ /* flag indicating whether messages have previously been sent */

"previously been sent" --> "already been sent" ??

------

6. pgoutput.h - misleading member name

Actually, now that I have read all the rest of the code and how this
member is used I feel that this name is very misleading. e.g. For
"streaming" case then you still are writing changes but are not
setting this member at all - therefore it does not always mean what it
says.

I feel a better name for this would be something like
"sent_begin_txn". Then if you have sent BEGIN it is true. If you
haven't sent BEGIN it is false. It eliminates all ambiguity naming it
this way instead.

(This makes my feedback #5 redundant because the comment will be a bit
different if you do this).

------

Fixed above comments.

7. pgoutput.c - function pgoutput_begin_txn

@@ -345,6 +345,23 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
static void
pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{

I guess that you still needed to pass the txn because that is how the
API is documented, right?

But I am wondering if you ought to flag it as unused so you wont get
some BF machine giving warnings about it.

e.g. Syntax like this?

pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN * txn) {
(void)txn;
...

Updated.

------

8. pgoutput.c - function pgoutput_begin_txn

@@ -345,6 +345,23 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
static void
pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
+ PGOutputData *data = ctx->output_plugin_private;
+
+ /*
+ * Don't send BEGIN message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
+ * using bandwidth on something with little/no use for logical replication.
+ */
+ data->xact_wrote_changes = false;
+ elog(LOG,"Holding of begin");
+}

Why is this loglevel LOG? Looks like leftover debugging.

Removed.

------

9. pgoutput.c - function pgoutput_commit_txn

@@ -384,8 +401,14 @@ static void
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
+ PGOutputData *data = ctx->output_plugin_private;
+
OutputPluginUpdateProgress(ctx);
+ /* skip COMMIT message if nothing was sent */
+ if (!data->xact_wrote_changes)
+ return;
+

In the case where you decided to do nothing does it make sense that
you still called the function OutputPluginUpdateProgress(ctx); ?
I thought perhaps that your new check should come first so this call
would never happen.

Even though the empty transaction is not sent, the LSN is tracked as decoded, hence the progress needs to be updated.

------

10. pgoutput.c - variable declarations without casts

+ PGOutputData *data = ctx->output_plugin_private;

I noticed the new stack variable you declare have no casts.

This differs from the existing code which always looks like:
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;

There are a couple of examples of this so please search new code to
find them all.

-----

Fixed.

11. pgoutput.c - function pgoutput_change

@@ -551,6 +574,13 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Assert(false);
}

+ /* output BEGIN if we haven't yet */
+ if (!data->xact_wrote_changes && !in_streaming)
+ {
+ pgoutput_begin(ctx, txn);
+ data->xact_wrote_changes = true;
+ }

If the variable is renamed as previously suggested then the assignment
data->sent_BEGIN_txn = true; can be assigned in just 1 common place
INSIDE the pgoutput_begin function.

------

Updated.

12. pgoutput.c - pgoutput_truncate function

@@ -693,6 +723,13 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,

if (nrelids > 0)
{
+ /* output BEGIN if we haven't yet */
+ if (!data->xact_wrote_changes && !in_streaming)
+ {
+ pgoutput_begin(ctx, txn);
+ data->xact_wrote_changes = true;
+ }

(same comment as above)

If the variable is renamed as previously suggested then the assignment
data->sent_BEGIN_txn = true; can be assigned in just 1 common place
INSIDE the pgoutput_begin function.

13. pgoutput.c - pgoutput_message

@@ -725,6 +762,13 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
if (in_streaming)
xid = txn->xid;

+    /* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */
+    if (!data->xact_wrote_changes && !in_streaming && transactional)
+ {
+ pgoutput_begin(ctx, txn);
+ data->xact_wrote_changes = true;
+ }

(same comment as above)

If the variable is renamed as previously suggested then the assignment
data->sent_BEGIN_txn = true; can be assigned in just 1 common place
INSIDE the pgoutput_begin function.

------

Fixed.

14. Test Code.

I noticed that there is no test code specifically for seeing if empty
transactions get sent or not. Is it possible to write such a test or
is this traffic improvement only observable using the debugger?

The 020_messages.pl actually has a test case for tracking empty messages even though it is part of the messages test.

regards,
Ajin Cherian
Fujitsu Australia

Thanks for addressing my v3 review comments above.

I tested the latest v4.

The v4 patch applied cleanly.

make check-world completed successfully.

So this patch v4 looks LGTM, apart from the following 2 nitpick comments:

======

1. Suggest to add a blank line after the (void)txn; ?

@@ -345,10 +345,29 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+ (void)txn; /* keep compiler quiet */
+ /*
+ * Don't send BEGIN message here. Instead, postpone it until the first

======

2. Unnecessary statement blocks?

AFAIK those { } are not the usual PG code-style when there is only one
statement, so suggest to remove them.

Appies to 3 places:

@@ -551,6 +576,12 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Assert(false);
}

+ /* output BEGIN if we haven't yet */
+ if (!data->sent_begin_txn && !in_streaming)
+ {
+ pgoutput_begin(ctx, txn);
+ }

@@ -693,6 +724,12 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,

  if (nrelids > 0)
  {
+ /* output BEGIN if we haven't yet */
+ if (!data->sent_begin_txn && !in_streaming)
+ {
+ pgoutput_begin(ctx, txn);
+ }

@@ -725,6 +762,12 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
if (in_streaming)
xid = txn->xid;

+ /* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */
+ if (!data->sent_begin_txn && !in_streaming && transactional)
+ {
+ pgoutput_begin(ctx, txn);
+ }

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#32Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Peter Smith (#31)
1 attachment(s)
Re: logical replication empty transactions

On Mon, Apr 26, 2021 at 4:29 PM Peter Smith <smithpb2250@gmail.com> wrote:

The v4 patch applied cleanly.

make check-world completed successfully.

So this patch v4 looks LGTM, apart from the following 2 nitpick comments:

======

1. Suggest to add a blank line after the (void)txn; ?

@@ -345,10 +345,29 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
static void
pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+ (void)txn; /* keep compiler quiet */
+ /*
+ * Don't send BEGIN message here. Instead, postpone it until the first

Fixed.

======

2. Unnecessary statement blocks?

AFAIK those { } are not the usual PG code-style when there is only one
statement, so suggest to remove them.

Appies to 3 places:

@@ -551,6 +576,12 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Assert(false);
}

+ /* output BEGIN if we haven't yet */
+ if (!data->sent_begin_txn && !in_streaming)
+ {
+ pgoutput_begin(ctx, txn);
+ }

@@ -693,6 +724,12 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,

if (nrelids > 0)
{
+ /* output BEGIN if we haven't yet */
+ if (!data->sent_begin_txn && !in_streaming)
+ {
+ pgoutput_begin(ctx, txn);
+ }

@@ -725,6 +762,12 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
if (in_streaming)
xid = txn->xid;

+ /* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */
+ if (!data->sent_begin_txn && !in_streaming && transactional)
+ {
+ pgoutput_begin(ctx, txn);
+ }

Fixed.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v5-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v5-0001-Skip-empty-transactions-for-logical-replication.patch
#33Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Ajin Cherian (#32)
1 attachment(s)
Re: logical replication empty transactions

On Tue, Apr 27, 2021 at 1:49 PM Ajin Cherian <itsajin@gmail.com> wrote:

Rebased the patch as it was no longer applying.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v6-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v6-0001-Skip-empty-transactions-for-logical-replication.patch
#34vignesh C
vignesh C
vignesh21@gmail.com
In reply to: Ajin Cherian (#33)
Re: logical replication empty transactions

On Tue, May 25, 2021 at 6:36 PM Ajin Cherian <itsajin@gmail.com> wrote:

On Tue, Apr 27, 2021 at 1:49 PM Ajin Cherian <itsajin@gmail.com> wrote:

Rebased the patch as it was no longer applying.

Thanks for the updated patch, few comments:
1) I'm not sure if we could add some tests for skip empty
transactions, if possible add a few tests.

2) We could add some debug level log messages for the transaction that
will be skipped.

3) You could keep this variable below the other bool variables in the structure:
+       bool        sent_begin_txn;     /* flag indicating whether begin
+
  * has already been sent */
+
4) You can split the comments to multi-line as it exceeds 80 chars
+       /* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */
+       if (!data->sent_begin_txn && !in_streaming && transactional)
+               pgoutput_begin(ctx, txn);

Regards,
Vignesh

#35Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: vignesh C (#34)
1 attachment(s)
Re: logical replication empty transactions

On Thu, May 27, 2021 at 8:58 PM vignesh C <vignesh21@gmail.com> wrote:

Thanks for the updated patch, few comments:
1) I'm not sure if we could add some tests for skip empty
transactions, if possible add a few tests.

Added a few tests for prepared transactions as well as the existing
test in 020_messages.pl also tests regular transactions.

2) We could add some debug level log messages for the transaction that
will be skipped.

Added.

3) You could keep this variable below the other bool variables in the structure:
+       bool        sent_begin_txn;     /* flag indicating whether begin
+
* has already been sent */
+

I've moved this variable around, so this comment no longer is valid.

4) You can split the comments to multi-line as it exceeds 80 chars
+       /* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */
+       if (!data->sent_begin_txn && !in_streaming && transactional)
+               pgoutput_begin(ctx, txn);

Done.

I've had to rebase the patch after a recent commit by Amit Kapila of
supporting two-phase commits in pub-sub [1]/messages/by-id/CAHut+PueG6u3vwG8DU=JhJiWa2TwmZ=bDqPchZkBky7ykzA7MA@mail.gmail.com.
Also I've modified the patch to also skip replicating empty prepared
transactions. Do let me know if you have any comments.

regards,
Ajin Cherian
Fujitsu Australia
[1]: /messages/by-id/CAHut+PueG6u3vwG8DU=JhJiWa2TwmZ=bDqPchZkBky7ykzA7MA@mail.gmail.com

Attachments:

v7-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v7-0001-Skip-empty-transactions-for-logical-replication.patch
#36osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
In reply to: Ajin Cherian (#35)
RE: logical replication empty transactions

On Wednesday, July 14, 2021 9:30 PM Ajin Cherian <itsajin@gmail.com> wrote:

I've had to rebase the patch after a recent commit by Amit Kapila of supporting
two-phase commits in pub-sub [1].
Also I've modified the patch to also skip replicating empty prepared
transactions. Do let me know if you have any comments.

Hi

I started to test this patch but will give you some really minor quick feedbacks.

(1) pg_logical_slot_get_binary_changes() params.

Technically, looks better to have proto_version 3 & two_phase option for the function
to test empty prepare ? I felt proto_version 1 doesn't support 2PC.
[1]: https://www.postgresql.org/docs/devel/protocol-logicalrep-message-formats.html
are available since protocol version 3." Then, if the test wants to skip empty *prepares*,
I suggest to update the proto_version and set two_phase 'on'.

+##############################
+# Test empty prepares
+##############################
...
+# peek at the contents of the slot
+$result = $node_publisher->safe_psql(
+   'postgres', qq(
+       SELECT get_byte(data, 0)
+       FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL,
+           'proto_version', '1',
+           'publication_names', 'tap_pub')
+));

(2) The following sentences may start with a lowercase letter.
There are other similar codes for this.

+ elog(DEBUG1, "Skipping replication of an empty transaction");

[1]: https://www.postgresql.org/docs/devel/protocol-logicalrep-message-formats.html

Best Regards,
Takamichi Osumi

#37Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: osumi.takamichi@fujitsu.com (#36)
Re: logical replication empty transactions

Hi Ajin,

I have reviewed the v7 patch and given my feedback comments below.

Apply OK
Build OK
make check OK
TAP (subscriptions) make check OK
Build PG Docs (html) OK

Although I made lots of review comments below, the important point is
that none of them are functional - they are only minore re-wordings
and some code refactoring that I thought would make the code simpler
and/or easier to read. YMMV, so please feel free to disagree with any
of them.

//////////

1a. Commit Comment - wording

BEFORE
This patch addresses the above problem by postponing the BEGIN / BEGIN
PREPARE message until the first change.

AFTER
This patch addresses the above problem by postponing the BEGIN / BEGIN
PREPARE messages until the first change is encountered.

------

1b. Commit Comment - wording

BEFORE
While processing a COMMIT message or a PREPARE message, if there is no
other change for that transaction, do not send COMMIT message or
PREPARE message.

AFTER
If (when processing a COMMIT / PREPARE message) we find there had been
no other change for that transaction, then do not send the COMMIT /
PREPARE message.

------

2. doc/src/sgml/logicaldecoding.sgml - wording

@@ -884,11 +884,19 @@ typedef void (*LogicalDecodePrepareCB) (struct
LogicalDecodingContext *ctx,
       The required <function>commit_prepared_cb</function> callback is called
       whenever a transaction <command>COMMIT PREPARED</command> has
been decoded.
       The <parameter>gid</parameter> field, which is part of the
-      <parameter>txn</parameter> parameter, can be used in this callback.
+      <parameter>txn</parameter> parameter, can be used in this callback. The
+      parameters <parameter>prepare_end_lsn</parameter> and
+      <parameter>prepare_time</parameter> can be used to check if the plugin
+      has received this <command>PREPARE TRANSACTION</command> in which case
+      it can commit the transaction, otherwise, it can skip the commit. The
+      <parameter>gid</parameter> alone is not sufficient because the downstream
+      node can have a prepared transaction with the same identifier.

=>

(some minor rewording of the last part)

AFTER:

The parameters <parameter>prepare_end_lsn</parameter> and
<parameter>prepare_time</parameter> can be used to check if the plugin
has received this <command>PREPARE TRANSACTION</command> or not. If
yes, it can commit the transaction, otherwise, it can skip the commit.
The <parameter>gid</parameter> alone is not sufficient to determine
this because the downstream node may already have a prepared
transaction with the same identifier.

------

3. src/backend/replication/logical/proto.c - whitespace

@@ -244,12 +248,16 @@ logicalrep_read_commit_prepared(StringInfo in,
LogicalRepCommitPreparedTxnData *
elog(ERROR, "unrecognized flags %u in commit prepared message", flags);

  /* read fields */
+ prepare_data->prepare_end_lsn = pq_getmsgint64(in);
+ if (prepare_data->prepare_end_lsn == InvalidXLogRecPtr)
+ elog(ERROR,"prepare_end_lsn is not set in commit prepared message");

=>

There is missing space before the 2nd elog param.

------

4. src/backend/replication/logical/worker.c - comment typos

  /*
- * Update origin state so we can restart streaming from correct position
- * in case of crash.
+ * It is possible that we haven't received the prepare because
+ * the transaction did not have any changes relevant to this
+ * subscription and was essentially an empty prepare. In which case,
+ * the walsender is optimized to drop the empty transaction and the
+ * accompanying prepare. Silently ignore if we don't find the prepared
+ * transaction.
  */

4a. =>

"and was essentially an empty prepare" --> "so was essentially an empty prepare"

4b. =>

"In which case" --> "In this case"

------

5. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn

@@ -410,10 +417,32 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+ PGOutputTxnData    *data = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));
+
+ /*
+ * Don't send BEGIN message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
+ * using bandwidth on something with little/no use for logical replication.
+ */
+ data->sent_begin_txn = false;
+ txn->output_plugin_private = data;
+}

=>

I felt that since this message postponement is now the new behaviour
of this function then probably this should all be a function level
comment instead of the comment being in the body of the function

------

6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin

+
+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)

=>

Even though it is kind of obvious, it is probably better to provide a
function comment here too

------

7. src/backend/replication/pgoutput/pgoutput.c - pgoutput_commit_txn

@@ -428,8 +457,22 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
  XLogRecPtr commit_lsn)
 {
+ PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private;
+ bool            skip;
+
+ Assert(data);
+ skip = !data->sent_begin_txn;
+ pfree(data);
+ txn->output_plugin_private = NULL;
  OutputPluginUpdateProgress(ctx);
+ /* skip COMMIT message if nothing was sent */
+ if (skip)
+ {
+ elog(DEBUG1, "Skipping replication of an empty transaction");
+ return;
+ }
+

7a. =>

I felt that the comment "skip COMMIT message if nothing was sent"
should be done at the point where you *decide* to skip or not. So you
could either move that comment to where the skip variable is assigned.
Or (my preference) leave the comment where it is but change the
variable name to be sent_begin = !data->sent_begin_txn;

------

Regardless I think the comment should be elaborated a bit to describe
the reason more.

7b. =>

BEFORE
/* skip COMMIT message if nothing was sent */

AFTER
/* If a BEGIN message was not yet sent, then it means there were no
relevant changes encountered, so we can skip the COMMIT message too.
*/

------

8. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_prepare_txn

@@ -441,10 +484,28 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
 static void
 pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+ /*
+ * Don't send BEGIN PREPARE message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN PREPARE and COMMIT PREPARED messages
+ * to subscribers, using bandwidth on something with little/no use
+ * for logical replication.
+ */
+ pgoutput_begin_txn(ctx, txn);
+}

8a. =>

Like previously, I felt that this big comment should be at the
function level of pgoutput_begin_prepare_txn instead of in the body of
the function.

------

8b. =>

And then the body comment would be something simple like:

/* Delegate to assign the begin sent flag as false same as for the
BEGIN message. */
pgoutput_begin_txn(ctx, txn);

------

9. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_prepare

+
+static void
+pgoutput_begin_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)

=>

Probably this needs a function comment.

------

10. src/backend/replication/pgoutput/pgoutput.c - pgoutput_prepare_txn

@@ -459,8 +520,18 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
  XLogRecPtr prepare_lsn)
 {
+ PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
+
+ Assert(data);
  OutputPluginUpdateProgress(ctx);
+ /* skip PREPARE message if nothing was sent */
+ if (!data->sent_begin_txn)

=>

Maybe elaborate on that "skip PREPARE message if nothing was sent"
comment in a way similar to my review comment 7b. For example,

AFTER
/* If the BEGIN was not yet sent, then it means there were no relevant
changes encountered, so we can skip the PREPARE message too. */

------

11. src/backend/replication/pgoutput/pgoutput.c - pgoutput_commit_prepared_txn

@@ -471,12 +542,33 @@ pgoutput_prepare_txn(LogicalDecodingContext
*ctx, ReorderBufferTXN *txn,
  */
 static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
- XLogRecPtr commit_lsn)
+ XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
 {
+ PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
+
  OutputPluginUpdateProgress(ctx);
+ /*
+ * skip sending COMMIT PREPARED message if prepared transaction
+ * has not been sent.
+ */
+ if (data)

=>

Similar to previous review comment 10, I think the reason for the skip
should be elaborated a little bit. For example,

AFTER
/* If the BEGIN PREPARE was not yet sent, then it means there were no
relevant changes encountered, so we can skip the COMMIT PREPARED
message too. */

------

12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_rollback_prepared_txn

=> Similar as for pgoutput_comment_prepared_txn (see review comment 11)

------

13. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change

@@ -639,11 +749,16 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  Relation relation, ReorderBufferChange *change)
 {
  PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
  MemoryContext old;
  RelationSyncEntry *relentry;
  TransactionId xid = InvalidTransactionId;
  Relation ancestor = NULL;
+ /* If not streaming, should have setup txndata as part of
BEGIN/BEGIN PREPARE */
+ if (!in_streaming)
+ Assert(txndata);
+
  if (!is_publishable_relation(relation))
  return;

13a. =>

I felt the streaming logic with the txndata is a bit confusing. I
think it would be easier to have another local variable (sent_begin)
and use it like this...

bool sent_begin;
if (in_streaming)
{
sent_begin = true;
else
{
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
Assert(txndata)
sent_begin = txn->sent_begin_txn;
}

...

------

+ /* output BEGIN if we haven't yet */

13b. =>

I thought the comment is not quite right

AFTER
/* Output BEGIN / BEGIN PREPARE if we haven't yet */

------

+ if (!in_streaming && !txndata->sent_begin_txn)
+ {
+ if (rbtxn_prepared(txn))
+ pgoutput_begin_prepare(ctx, txn);
+ else
+ pgoutput_begin(ctx, txn);
+ }
+

13.c =>

If you introduce the variable (as suggested in 13a) this code becomes
much simpler:

AFTER

if (!sent_begin)
{
if (rbtxn_prepared(txn))
pgoutput_begin_prepare(ctx, txn)
else
pgoutput_begin(ctx, txn);
}

------

14. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate

=>

All the similar review comments made for pg_change (13a, 13b, 13c)
apply to pgoutput_truncate here also.

------

15. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message

@@ -842,6 +980,7 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
const char *message)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata;
TransactionId xid = InvalidTransactionId;

=>

This variable should be declared in the block where it is used,
similar to the suggestion 13a.

Also is it just an accidental omission that you did Assert(txndata)
for all the other places but not here?

------

Kind Regards,
Peter Smith.
Fujitsu Australia

#38Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Peter Smith (#37)
1 attachment(s)
Re: logical replication empty transactions

On Mon, Jul 19, 2021 at 3:24 PM Peter Smith <smithpb2250@gmail.com> wrote:

1a. Commit Comment - wording

updated.

1b. Commit Comment - wording

updated.

2. doc/src/sgml/logicaldecoding.sgml - wording

@@ -884,11 +884,19 @@ typedef void (*LogicalDecodePrepareCB) (struct
LogicalDecodingContext *ctx,
The required <function>commit_prepared_cb</function> callback is called
whenever a transaction <command>COMMIT PREPARED</command> has
been decoded.
The <parameter>gid</parameter> field, which is part of the
-      <parameter>txn</parameter> parameter, can be used in this callback.
+      <parameter>txn</parameter> parameter, can be used in this callback. The
+      parameters <parameter>prepare_end_lsn</parameter> and
+      <parameter>prepare_time</parameter> can be used to check if the plugin
+      has received this <command>PREPARE TRANSACTION</command> in which case
+      it can commit the transaction, otherwise, it can skip the commit. The
+      <parameter>gid</parameter> alone is not sufficient because the downstream
+      node can have a prepared transaction with the same identifier.

=>

(some minor rewording of the last part)

updated.

3. src/backend/replication/logical/proto.c - whitespace

@@ -244,12 +248,16 @@ logicalrep_read_commit_prepared(StringInfo in,
LogicalRepCommitPreparedTxnData *
elog(ERROR, "unrecognized flags %u in commit prepared message", flags);

/* read fields */
+ prepare_data->prepare_end_lsn = pq_getmsgint64(in);
+ if (prepare_data->prepare_end_lsn == InvalidXLogRecPtr)
+ elog(ERROR,"prepare_end_lsn is not set in commit prepared message");

=>

There is missing space before the 2nd elog param.

fixed.

4a. =>

"and was essentially an empty prepare" --> "so was essentially an empty prepare"

4b. =>

"In which case" --> "In this case"

------

fixed.

I felt that since this message postponement is now the new behaviour
of this function then probably this should all be a function level
comment instead of the comment being in the body of the function

------

6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin

+
+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)

=>

Even though it is kind of obvious, it is probably better to provide a
function comment here too

------

Changed accordingly.

I felt that the comment "skip COMMIT message if nothing was sent"
should be done at the point where you *decide* to skip or not. So you
could either move that comment to where the skip variable is assigned.
Or (my preference) leave the comment where it is but change the
variable name to be sent_begin = !data->sent_begin_txn;

Updated the comment to where the skip variable is assigned.

------

Regardless I think the comment should be elaborated a bit to describe
the reason more.

7b. =>

BEFORE
/* skip COMMIT message if nothing was sent */

AFTER
/* If a BEGIN message was not yet sent, then it means there were no
relevant changes encountered, so we can skip the COMMIT message too.
*/

Updated accordingly.

------

Like previously, I felt that this big comment should be at the
function level of pgoutput_begin_prepare_txn instead of in the body of
the function.

------

8b. =>

And then the body comment would be something simple like:

/* Delegate to assign the begin sent flag as false same as for the
BEGIN message. */
pgoutput_begin_txn(ctx, txn);

Updated accordingly.

------

9. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_prepare

+
+static void
+pgoutput_begin_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)

=>

Probably this needs a function comment.

Updated.

------

10. src/backend/replication/pgoutput/pgoutput.c - pgoutput_prepare_txn

@@ -459,8 +520,18 @@ static void
pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
+ PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
+
+ Assert(data);
OutputPluginUpdateProgress(ctx);
+ /* skip PREPARE message if nothing was sent */
+ if (!data->sent_begin_txn)

=>

Maybe elaborate on that "skip PREPARE message if nothing was sent"
comment in a way similar to my review comment 7b. For example,

AFTER
/* If the BEGIN was not yet sent, then it means there were no relevant
changes encountered, so we can skip the PREPARE message too. */

Updated.

------

11. src/backend/replication/pgoutput/pgoutput.c - pgoutput_commit_prepared_txn

@@ -471,12 +542,33 @@ pgoutput_prepare_txn(LogicalDecodingContext
*ctx, ReorderBufferTXN *txn,
*/
static void
pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
- XLogRecPtr commit_lsn)
+ XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
{
+ PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
+
OutputPluginUpdateProgress(ctx);
+ /*
+ * skip sending COMMIT PREPARED message if prepared transaction
+ * has not been sent.
+ */
+ if (data)

=>

Similar to previous review comment 10, I think the reason for the skip
should be elaborated a little bit. For example,

AFTER
/* If the BEGIN PREPARE was not yet sent, then it means there were no
relevant changes encountered, so we can skip the COMMIT PREPARED
message too. */

------

Updated accordingly.

12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_rollback_prepared_txn

=> Similar as for pgoutput_comment_prepared_txn (see review comment 11)

------

Updated,

13. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change

@@ -639,11 +749,16 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
MemoryContext old;
RelationSyncEntry *relentry;
TransactionId xid = InvalidTransactionId;
Relation ancestor = NULL;
+ /* If not streaming, should have setup txndata as part of
BEGIN/BEGIN PREPARE */
+ if (!in_streaming)
+ Assert(txndata);
+
if (!is_publishable_relation(relation))
return;

13a. =>

I felt the streaming logic with the txndata is a bit confusing. I
think it would be easier to have another local variable (sent_begin)
and use it like this...

bool sent_begin;
if (in_streaming)
{
sent_begin = true;
else
{
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
Assert(txndata)
sent_begin = txn->sent_begin_txn;
}

I did not make the change, because in case of streaming "Sent_begin"
is not true, so it seemed incorrect coding it
that way. Instead , I have modified the comment to mention that
streaming transaction do not send BEG / BEGIN PREPARE.

...

------

+ /* output BEGIN if we haven't yet */

13b. =>

I thought the comment is not quite right

AFTER
/* Output BEGIN / BEGIN PREPARE if we haven't yet */

------

Updated.

+ if (!in_streaming && !txndata->sent_begin_txn)
+ {
+ if (rbtxn_prepared(txn))
+ pgoutput_begin_prepare(ctx, txn);
+ else
+ pgoutput_begin(ctx, txn);
+ }
+

13.c =>

If you introduce the variable (as suggested in 13a) this code becomes
much simpler:

Skipped this. (reason mentioned above)

------

14. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate

=>

All the similar review comments made for pg_change (13a, 13b, 13c)
apply to pgoutput_truncate here also.

------

Updated.

15. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message

@@ -842,6 +980,7 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
const char *message)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata;
TransactionId xid = InvalidTransactionId;

=>

This variable should be declared in the block where it is used,
similar to the suggestion 13a.

Also is it just an accidental omission that you did Assert(txndata)
for all the other places but not here?

Moved location of the variable and added an assert.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v8-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v8-0001-Skip-empty-transactions-for-logical-replication.patch
#39Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: osumi.takamichi@fujitsu.com (#36)
Re: logical replication empty transactions

On Thu, Jul 15, 2021 at 3:50 PM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:

I started to test this patch but will give you some really minor quick feedbacks.

(1) pg_logical_slot_get_binary_changes() params.

Technically, looks better to have proto_version 3 & two_phase option for the function
to test empty prepare ? I felt proto_version 1 doesn't support 2PC.
[1] says "The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared)
are available since protocol version 3." Then, if the test wants to skip empty *prepares*,
I suggest to update the proto_version and set two_phase 'on'.

Updated accordingly.

(2) The following sentences may start with a lowercase letter.
There are other similar codes for this.

+ elog(DEBUG1, "Skipping replication of an empty transaction");

Fixed this.

I've addressed these comments in version 8 of the patch.

regards,
Ajin Cherian
Fujitsu Australia

#40Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: Ajin Cherian (#39)
Re: logical replication empty transactions

Hi Ajin.

I have reviewed the v8 patch and my feedback comments are below:

//////////

1. Apply v8 gave multiple whitespace warnings.

------

2. Commit comment - wording

If (when processing a COMMIT / PREPARE message) we find there had been
no other change for that transaction, then do not send the COMMIT /
PREPARE message. This means that pgoutput will skip BEGIN / COMMIT
or BEGIN PREPARE / PREPARE messages for transactions that are empty.

=>

Shouldn't this also mention some other messages that may be skipped?
- COMMIT PREPARED
- ROLLBACK PREPARED

------

3. doc/src/sgml/logicaldecoding.sgml - wording

@@ -884,11 +884,20 @@ typedef void (*LogicalDecodePrepareCB) (struct
LogicalDecodingContext *ctx,
       The required <function>commit_prepared_cb</function> callback is called
       whenever a transaction <command>COMMIT PREPARED</command> has
been decoded.
       The <parameter>gid</parameter> field, which is part of the
-      <parameter>txn</parameter> parameter, can be used in this callback.
+      <parameter>txn</parameter> parameter, can be used in this callback. The
+      parameters <parameter>prepare_end_lsn</parameter> and
+      <parameter>prepare_time</parameter> can be used to check if the plugin
+      has received this <command>PREPARE TRANSACTION</command> command or not.
+      If yes, it can commit the transaction, otherwise, it can skip the commit.
+      The <parameter>gid</parameter> alone is not sufficient to determine this
+      because the downstream may already have a prepared transaction with the
+      same identifier.

=>

Typo: Should that say "downstream node" instead of just "downstream" ?

------

4. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn
callback comment

@@ -406,14 +413,38 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,

 /*
  * BEGIN callback
+ * Don't send BEGIN message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on

=>

Typo: "BEGIN callback" --> "BEGIN callback." (with the period).

And, I think maybe it will be better if it has a separating blank line too.

e.g.

/*
* BEGIN callback.
*
* Don't send BEGIN ....

(NOTE: this review comment applies to other callback function comments
too, so please hunt them all down)

------

5. src/backend/replication/pgoutput/pgoutput.c - data / txndata

 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+ PGOutputTxnData    *data = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));

=>

There is some inconsistent naming of the local variable in the patch.
Sometimes it is called "data"; Sometimes it is called "txdata" etc. It
would be better to just stick with the same variable name everywhere.

(NOTE: this comment applies to several places in this patch)

------

6. src/backend/replication/pgoutput/pgoutput.c - Strange way to use Assert

+ /* If not streaming, should have setup txndata as part of
BEGIN/BEGIN PREPARE */
+ if (!in_streaming)
+ Assert(txndata);
+

=>

This style of Assert code seemed strange to me. In production mode
isn't that going to evaluate to some condition with a ((void) true)
body? IMO it might be better to just include the streaming check as
part of the Assert. For example:

BEFORE
if (!in_streaming)
Assert(txndata);

AFTER
Assert(in_streaming || txndata);

(NOTE: This same review comment applies in at least 3 places in this
patch, so please hunt them all down)

------

7. src/backend/replication/pgoutput/pgoutput.c - comment wording

@@ -677,6 +810,18 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Assert(false);
}

+ /*
+ * output BEGIN / BEGIN PREPARE if we haven't yet,
+     * while streaming no need to send BEGIN / BEGIN PREPARE.
+ */
+ if (!in_streaming && !txndata->sent_begin_txn)

=>

English not really that comment is. The comment should also start with
uppercase.

(NOTE: This same comment was in couple of places in the patch)

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#41Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Peter Smith (#40)
1 attachment(s)
Re: logical replication empty transactions

On Thu, Jul 22, 2021 at 6:11 PM Peter Smith <smithpb2250@gmail.com> wrote:

Hi Ajin.

I have reviewed the v8 patch and my feedback comments are below:

//////////

1. Apply v8 gave multiple whitespace warnings.

------

2. Commit comment - wording

If (when processing a COMMIT / PREPARE message) we find there had been
no other change for that transaction, then do not send the COMMIT /
PREPARE message. This means that pgoutput will skip BEGIN / COMMIT
or BEGIN PREPARE / PREPARE messages for transactions that are empty.

=>

Shouldn't this also mention some other messages that may be skipped?
- COMMIT PREPARED
- ROLLBACK PREPARED

Updated.

------

3. doc/src/sgml/logicaldecoding.sgml - wording

@@ -884,11 +884,20 @@ typedef void (*LogicalDecodePrepareCB) (struct
LogicalDecodingContext *ctx,
The required <function>commit_prepared_cb</function> callback is called
whenever a transaction <command>COMMIT PREPARED</command> has
been decoded.
The <parameter>gid</parameter> field, which is part of the
-      <parameter>txn</parameter> parameter, can be used in this callback.
+      <parameter>txn</parameter> parameter, can be used in this callback. The
+      parameters <parameter>prepare_end_lsn</parameter> and
+      <parameter>prepare_time</parameter> can be used to check if the plugin
+      has received this <command>PREPARE TRANSACTION</command> command or not.
+      If yes, it can commit the transaction, otherwise, it can skip the commit.
+      The <parameter>gid</parameter> alone is not sufficient to determine this
+      because the downstream may already have a prepared transaction with the
+      same identifier.

=>

Typo: Should that say "downstream node" instead of just "downstream" ?

------

Updated.

4. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn
callback comment

@@ -406,14 +413,38 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,

/*
* BEGIN callback
+ * Don't send BEGIN message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on

=>

Typo: "BEGIN callback" --> "BEGIN callback." (with the period).

And, I think maybe it will be better if it has a separating blank line too.

e.g.

/*
* BEGIN callback.
*
* Don't send BEGIN ....

(NOTE: this review comment applies to other callback function comments
too, so please hunt them all down)

------

Updated.

5. src/backend/replication/pgoutput/pgoutput.c - data / txndata

static void
pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
+ PGOutputTxnData    *data = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));

=>

There is some inconsistent naming of the local variable in the patch.
Sometimes it is called "data"; Sometimes it is called "txdata" etc. It
would be better to just stick with the same variable name everywhere.

(NOTE: this comment applies to several places in this patch)

------

I've changed all occurance of PGOutputTxnData to txndata. Note that
there is another structure PGOutputData which still uses the name
data.

6. src/backend/replication/pgoutput/pgoutput.c - Strange way to use Assert

+ /* If not streaming, should have setup txndata as part of
BEGIN/BEGIN PREPARE */
+ if (!in_streaming)
+ Assert(txndata);
+

=>

This style of Assert code seemed strange to me. In production mode
isn't that going to evaluate to some condition with a ((void) true)
body? IMO it might be better to just include the streaming check as
part of the Assert. For example:

BEFORE
if (!in_streaming)
Assert(txndata);

AFTER
Assert(in_streaming || txndata);

(NOTE: This same review comment applies in at least 3 places in this
patch, so please hunt them all down)

Updated.

------

7. src/backend/replication/pgoutput/pgoutput.c - comment wording

@@ -677,6 +810,18 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Assert(false);
}

+ /*
+ * output BEGIN / BEGIN PREPARE if we haven't yet,
+     * while streaming no need to send BEGIN / BEGIN PREPARE.
+ */
+ if (!in_streaming && !txndata->sent_begin_txn)

=>

English not really that comment is. The comment should also start with
uppercase.

(NOTE: This same comment was in couple of places in the patch)

Updated.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v9-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v9-0001-Skip-empty-transactions-for-logical-replication.patch
#42Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: Ajin Cherian (#41)
Re: logical replication empty transactions

I have reviewed the v9 patch and my feedback comments are below:

//////////

1. Apply v9 gave multiple whitespace warnings

$ git apply v9-0001-Skip-empty-transactions-for-logical-replication.patch
v9-0001-Skip-empty-transactions-for-logical-replication.patch:479:
indent with spaces.
* If the BEGIN PREPARE was not yet sent, then it means there were no
v9-0001-Skip-empty-transactions-for-logical-replication.patch:480:
indent with spaces.
* relevant changes encountered, so we can skip the ROLLBACK PREPARED
v9-0001-Skip-empty-transactions-for-logical-replication.patch:481:
indent with spaces.
* messsage too.
v9-0001-Skip-empty-transactions-for-logical-replication.patch:482:
indent with spaces.
*/
warning: 4 lines add whitespace errors.

------

2. Commit comment - wording

pgoutput will also skip COMMIT PREPARED and ROLLBACK PREPARED messages
for transactions which were skipped.

=>

Is that correct? Or did you mean to say:

AFTER
pgoutput will also skip COMMIT PREPARED and ROLLBACK PREPARED messages
for transactions that are empty.

------

3. src/backend/replication/pgoutput/pgoutput.c - typo

+ /*
+ * If the BEGIN PREPARE was not yet sent, then it means there were no
+ * relevant changes encountered, so we can skip the COMMIT PREPARED
+ * messsage too.
+ */

Typo: "messsage" --> "message"

(NOTE this same typo is in 2 places)

------

Kind Regards,
Peter Smith.
Fujitsu Australia

#43Greg Nancarrow
Greg Nancarrow
gregn4422@gmail.com
In reply to: Ajin Cherian (#41)
Re: logical replication empty transactions

On Thu, Jul 22, 2021 at 11:37 PM Ajin Cherian <itsajin@gmail.com> wrote:

I have some minor comments on the v9 patch:

(1) Several whitespace warnings on patch application

(2) Suggested patch comment change:

BEFORE:
The current logical replication behaviour is to send every transaction to
subscriber even though the transaction is empty (because it does not
AFTER:
The current logical replication behaviour is to send every transaction to
subscriber even though the transaction might be empty (because it does not

(3) Comment needed for added struct defn:

typedef struct PGOutputTxnData

(4) Improve comment.

Can you add a comma (or add words) in the below sentence, so we know
how to read it?

+ /*
+ * Delegate to assign the begin sent flag as false same as for the
+ * BEGIN message.
+ */

Regards,
Greg Nancarrow
Fujitsu Australia

#44Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Greg Nancarrow (#43)
1 attachment(s)
Re: logical replication empty transactions

On Fri, Jul 23, 2021 at 10:26 AM Greg Nancarrow <gregn4422@gmail.com> wrote:

On Thu, Jul 22, 2021 at 11:37 PM Ajin Cherian <itsajin@gmail.com> wrote:

I have some minor comments on the v9 patch:

(1) Several whitespace warnings on patch application

Fixed.

(2) Suggested patch comment change:

BEFORE:
The current logical replication behaviour is to send every transaction to
subscriber even though the transaction is empty (because it does not
AFTER:
The current logical replication behaviour is to send every transaction to
subscriber even though the transaction might be empty (because it does not

Changed accordingly.

(3) Comment needed for added struct defn:

typedef struct PGOutputTxnData

Added.

(4) Improve comment.

Can you add a comma (or add words) in the below sentence, so we know
how to read it?

Updated.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v10-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v10-0001-Skip-empty-transactions-for-logical-replication.patch
#45Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Peter Smith (#42)
Re: logical replication empty transactions

On Fri, Jul 23, 2021 at 10:13 AM Peter Smith <smithpb2250@gmail.com> wrote:

I have reviewed the v9 patch and my feedback comments are below:

//////////

1. Apply v9 gave multiple whitespace warnings

Fixed.

------

2. Commit comment - wording

pgoutput will also skip COMMIT PREPARED and ROLLBACK PREPARED messages
for transactions which were skipped.

=>

Is that correct? Or did you mean to say:

AFTER
pgoutput will also skip COMMIT PREPARED and ROLLBACK PREPARED messages
for transactions that are empty.

------

Updated.

3. src/backend/replication/pgoutput/pgoutput.c - typo

+ /*
+ * If the BEGIN PREPARE was not yet sent, then it means there were no
+ * relevant changes encountered, so we can skip the COMMIT PREPARED
+ * messsage too.
+ */

Typo: "messsage" --> "message"

(NOTE this same typo is in 2 places)

Fixed.

I have made these changes in v10 of the patch.

regards,
Ajin Cherian
Fujitsu Australia

#46Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: Ajin Cherian (#45)
Re: logical replication empty transactions

I have reviewed the v10 patch.

Apply / build / test was all OK.

Just one review comment:

//////////

1. Typo

@@ -130,6 +132,17 @@ typedef struct RelationSyncEntry
TupleConversionMap *map;
} RelationSyncEntry;

+/*
+ * Maintain a per-transaction level variable to track whether the
+ * transaction has sent BEGIN or BEGIN PREPARE. BEGIN or BEGIN PREPARE
+ * is only sent when the first change in a transaction is processed.
+ * This make it possible to skip transactions that are empty.
+ */

=>

typo: "make it possible" --> "makes it possible"

------

Kind Regards,
Peter Smith.
Fujitsu Australia

#47Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Peter Smith (#46)
1 attachment(s)
Re: logical replication empty transactions

On Fri, Jul 23, 2021 at 7:38 PM Peter Smith <smithpb2250@gmail.com> wrote:

I have reviewed the v10 patch.

Apply / build / test was all OK.

Just one review comment:

//////////

1. Typo

@@ -130,6 +132,17 @@ typedef struct RelationSyncEntry
TupleConversionMap *map;
} RelationSyncEntry;

+/*
+ * Maintain a per-transaction level variable to track whether the
+ * transaction has sent BEGIN or BEGIN PREPARE. BEGIN or BEGIN PREPARE
+ * is only sent when the first change in a transaction is processed.
+ * This make it possible to skip transactions that are empty.
+ */

=>

typo: "make it possible" --> "makes it possible"

fixed.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v11-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v11-0001-Skip-empty-transactions-for-logical-replication.patch
#48Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: Ajin Cherian (#47)
Re: logical replication empty transactions

FYI - I have checked the v11 patch. Everything applies, builds, and
tests OK for me, and I have no more review comments. So v11 LGTM.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#49Greg Nancarrow
Greg Nancarrow
gregn4422@gmail.com
In reply to: Ajin Cherian (#47)
Re: logical replication empty transactions

On Fri, Jul 23, 2021 at 8:09 PM Ajin Cherian <itsajin@gmail.com> wrote:

fixed.

The v11 patch LGTM.

Regards,
Greg Nancarrow
Fujitsu Australia

#50osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
In reply to: Ajin Cherian (#47)
RE: logical replication empty transactions

On Friday, July 23, 2021 7:10 PM Ajin Cherian <itsajin@gmail.com> wrote:

On Fri, Jul 23, 2021 at 7:38 PM Peter Smith <smithpb2250@gmail.com> wrote:

I have reviewed the v10 patch.

The patch v11 looks good to me as well.
Thanks for addressing my past comments.

Best Regards,
Takamichi Osumi

#51Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: Ajin Cherian (#47)
7 attachment(s)
Re: logical replication empty transactions

Hi Ajin.

I have spent some time studying how your "empty transaction" (v11)
patch will affect network traffic and transaction throughput.

BLUF
====

For my test environment the general observations with the patch applied are:
- There is a potentially large reduction of network traffic (depends
on the number of empty transactions sent)
- Transaction throughput improved up to 7% (average ~2% across
mixtures) for Synchronous mode
- Transaction throughput improved up to 7% (average ~3% across
mixtures) for NOT Synchronous mode

So this patch LGTM.

TEST INFORMATION
================

Overview
-------------

1. There are 2 similar tables. One table is published; the other is not.

2. Equivalent simple SQL operations are performed on these tables. E.g.
- INSERT/UPDATE/DELETE using normal COMMIT
- INSERT/UPDATE/DELETE using 2PC COMMIT PREPARED

3. pg_bench is used to measure the throughput for different mixes of
empty and not-empty transactions sent. E.g.
- 0% are empty
- 25% are empty
- 50% are empty
- 75% are empty
- 100% are empty

4. The apply_dispatch code has been temporarily modified to log the
number of protocol messages/bytes being processed.
- At the conclusion of the test run the logs are processed to extract
the numbers.

5. Each test run is 15 minutes elapsed time.

6. The tests are repeated without/with your patch applied
- So, there are 2 (without/with patch) x 5 (different mixes) = 10 test results
- Transaction throughput results are from pg_bench
- Protocol message bytes are extracted from the logs (from modified
apply_dispatch)

7. Also, the entire set of 10 test cases was repeated with
synchronous_standby_names setting enable/disabled.
- Enabled, so the results are for total round-trip processing of the pub/sub.
- Disabled. no waiting at the publisher side.

Configuration
-------------------

My environment is a single test machine with 2 PG instances (for pub and sub).

Using default configs except:

PUB-node
- wal_level = logical
- max_wal_senders = 10
- logical_decoding_work_mem = 64kB
- checkpoint_timeout = 30min
- min_wal_size = 10GB
- max_wal_size = 20GB
- shared_buffers = 2GB
- synchronous_standby_names = 'sync_sub' (for synchronous testing only)

SUB-node
- max_worker_processes = 11
- max_logical_replication_workers = 10
- checkpoint_timeout = 30min
- min_wal_size = 10GB
- max_wal_size = 20GB
- shared_buffers = 2GB

SQL files
-------------

Contents of test_empty_not_published.sql:

-- Operations for table not published
BEGIN;
INSERT INTO test_tab_nopub VALUES(1, 'foo');
UPDATE test_tab_nopub SET b = 'bar' WHERE a = 1;
DELETE FROM test_tab_nopub WHERE a = 1;
COMMIT;

-- 2PC operations for table not published
BEGIN;
INSERT INTO test_tab_nopub VALUES(2, 'fizz');
UPDATE test_tab_nopub SET b = 'bang' WHERE a = 2;
DELETE FROM test_tab_nopub WHERE a = 2;
PREPARE TRANSACTION 'gid_nopub';
COMMIT PREPARED 'gid_nopub';

~~

Contents of test_empty_published.sql:

(same as above but the table is called test_tab)

SQL Tables
----------------

(tables are the same apart from the name)

CREATE TABLE test_tab (a int primary key, b text, c timestamptz
DEFAULT now(), d bigint DEFAULT 999);

CREATE TABLE test_tab_nopub (a int primary key, b text, c timestamptz
DEFAULT now(), d bigint DEFAULT 999);

Example pg_bench command
------------------------

(this example is showing a test for a 25% mix of empty transactions)

pgbench -s 100 -T 900 -c 1 -f test_empty_not_published.sql@5 -f
test_empty_published.sql@15 test_pub

RESULTS / OBSERVATIONS
======================

Synchronous Mode
----------------

- As the percentage mix of empty transactions increases, so does the
transaction throughput. I assume this is because we are using
synchronous mode; so when there is less waiting time, then there is
more time available for transaction processing

- The performance was generally similar before/after the patch, but
there was an observed throughput improvement of ~2% (averaged across
all mixes)

- The number of protocol bytes is associated with the number of
transactions that are processed during the test time of 15 minutes.
This adds up to a significant number of bytes even when the
transactions are empty.

- For the unpatched code as the transaction rate increases, then so
does the number of traffic bytes.

- The patch improves this significantly by eliminating all the empty
transaction traffic.

- Before the patch, even "empty transactions" are processing some
bytes, so it can never reach zero. After the patch, empty transaction
traffic is eliminated entirely.

NOT Synchronous Mode
--------------------

- Since there is no synchronous waiting for round trips, the
transaction throughput is generally consistent regardless of the empty
transaction mix.

- There is a hint of a small overall improvement in throughput as the
empty transaction mix approaches near 100%. For my test environment
both the pub/sub nodes are using the same machine/CPU, so I guess is
that when there is less CPU spent processing messages in the Apply
Worker then there is more CPU available to pump transactions at the
publisher side.

- The patch transaction throughput seems ~3% better than for
non-patched. This might also be attributable to the same reason
mentioned above - less CPU spent processing empty messages at the
subscriber side leaves more CPU available to pump transactions from
the publisher side.

- The number of protocol bytes is associated with the number of
transactions that are processed during the test time of 15 minutes.

- Because the transaction throughput is consistent, the traffic of
protocol bytes here is determined mainly by the proportion of "empty
transactions" in the mixture.

- Before the patch, even “empty transactions” are processing some
bytes, so it can never reach zero. After the patch, the empty
transaction traffic is eliminated entirely.

- Before the patch, even “empty transactions” are processing some
bytes, so it can never reach zero. After the patch, the empty
transaction traffic is eliminated entirely.

ATTACHMENTS
===========

PSA

A1. A PDF version of my test report (also includes raw result data)
A2. Sync: Graph of Transaction throughput
A3. Sync: Graph of Protocol bytes (total)
A4. Sync: Graph of Protocol bytes (per transaction)
A5. Not-Sync: Graph of Transaction throughput
A6. Not-Sync: Graph of Protocol bytes (total)
A7. Not-Sync: Graph of Protocol bytes (per transaction)

------
Kind Regards,
Peter Smith.
Fujitsu Australia.

Attachments:

PS-empty-tx-testing-15min.pdfapplication/pdf; name=PS-empty-tx-testing-15min.pdf
Sync-bytes-per-tx.PNGimage/png; name=Sync-bytes-per-tx.PNG
Sync-bytes-total.PNGimage/png; name=Sync-bytes-total.PNG
Sync-tx-throughput.PNGimage/png; name=Sync-tx-throughput.PNG
NotSync-tx-throughput.PNGimage/png; name=NotSync-tx-throughput.PNG
NotSync-bytes-total.PNGimage/png; name=NotSync-bytes-total.PNG
NotSync-bytes-per-tx.PNGimage/png; name=NotSync-bytes-per-tx.PNG
#52Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Ajin Cherian (#47)
Re: logical replication empty transactions

On Fri, Jul 23, 2021 at 3:39 PM Ajin Cherian <itsajin@gmail.com> wrote:

Let's first split the patch for prepared and non-prepared cases as
that will help to focus on each of them separately. BTW, why haven't
you considered implementing point 1b as explained by Andres in his
email [1]/messages/by-id/20200309183018.tzkzwu635sd366ej@alap3.anarazel.de? I think we can send a keepalive message in case of
synchronous replication when we skip an empty transaction, otherwise,
it might delay in responding to transactions synchronous_commit mode.
I think in the tests done in the thread, it might not have been shown
because we are already sending keepalives too frequently. But what if
someone disables wal_sender_timeout or kept it to a very large value?
See WalSndKeepaliveIfNecessary. The other thing you might want to look
at is if the reason for frequent keepalives is the same as described
in the email [2]/messages/by-id/CALtH27cip5uQNJb4uHjLXtx1R52ELqXVfcP9fhHr=AvFo1dtqw@mail.gmail.com.

Few other miscellaneous comments:
1.
static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
- XLogRecPtr commit_lsn)
+ XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
 {
+ PGOutputTxnData    *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
  OutputPluginUpdateProgress(ctx);
+ /*
+ * If the BEGIN PREPARE was not yet sent, then it means there were no
+ * relevant changes encountered, so we can skip the COMMIT PREPARED
+ * message too.
+ */
+ if (txndata)
+ {
+ bool skip = !txndata->sent_begin_txn;
+ pfree(txndata);
+ txn->output_plugin_private = NULL;

How is this supposed to work after the restart when prepared is sent
before the restart and we are just sending commit_prepared after
restart? Won't this lead to sending commit_prepared even when the
corresponding prepare is not sent? Can we think of a better way to
deal with this?

2.
@@ -222,8 +224,10 @@ logicalrep_write_commit_prepared(StringInfo out,
ReorderBufferTXN *txn,
pq_sendbyte(out, flags);

/* send fields */
+ pq_sendint64(out, prepare_end_lsn);
pq_sendint64(out, commit_lsn);
pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, prepare_time);

Doesn't this means a change of protocol and how is it suppose to work
when say publisher is 15 and subscriber from 14 which I think works
without such a change?

[1]: /messages/by-id/20200309183018.tzkzwu635sd366ej@alap3.anarazel.de
[2]: /messages/by-id/CALtH27cip5uQNJb4uHjLXtx1R52ELqXVfcP9fhHr=AvFo1dtqw@mail.gmail.com

--
With Regards,
Amit Kapila.

#53Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Amit Kapila (#52)
2 attachment(s)
Re: logical replication empty transactions

On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Fri, Jul 23, 2021 at 3:39 PM Ajin Cherian <itsajin@gmail.com> wrote:

Let's first split the patch for prepared and non-prepared cases as
that will help to focus on each of them separately.

As a first shot, I have split the patch into prepared and non-prepared cases,

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v12-0002-Skip-empty-prepared-transactions-for-logical-rep.patchapplication/octet-stream; name=v12-0002-Skip-empty-prepared-transactions-for-logical-rep.patch
v12-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v12-0001-Skip-empty-transactions-for-logical-replication.patch
#54Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: Ajin Cherian (#53)
Re: logical replication empty transactions

On Sat, Aug 7, 2021 at 12:01 AM Ajin Cherian <itsajin@gmail.com> wrote:

On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Fri, Jul 23, 2021 at 3:39 PM Ajin Cherian <itsajin@gmail.com> wrote:

Let's first split the patch for prepared and non-prepared cases as
that will help to focus on each of them separately.

As a first shot, I have split the patch into prepared and non-prepared cases,

I have reviewed the v12* split patch set.

Apply / build / test was all OK

Below are my code review comments (mostly cosmetic).

//////////

Comments for v12-0001
=====================

1. Patch comment

=>

This comment as-is might have been OK before the 2PC code was
committed, but now that the 2PC is part of the HEAD perhaps this
comment needs to be expanded just to say this patch is ONLY for fixing
empty transactions for the cases of non-"streaming" and
non-"two_phase", and the other kinds will be tackled separately.

------

2. src/backend/replication/pgoutput/pgoutput.c - PGOutputTxnData comment

+/*
+ * Maintain a per-transaction level variable to track whether the
+ * transaction has sent BEGIN or BEGIN PREPARE. BEGIN or BEGIN PREPARE
+ * is only sent when the first change in a transaction is processed.
+ * This makes it possible to skip transactions that are empty.
+ */

=>

Maybe this is true for the combined v12-0001/v12-0002 case but just
for the v12-0001 patch I think it is nor right to imply that some
skipping of the BEGIN_PREPARE is possible, because IIUC it isn;t
implemented in the *this* patch/

------

3. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn whitespace

+ PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));

=>

Misaligned indentation?

------

4. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change brackets

+ /*
+ * Output BEGIN if we haven't yet, unless streaming.
+ */
+ if (!in_streaming && !txndata->sent_begin_txn)
+ {
+ pgoutput_begin(ctx, txn);
+ }

=>

The brackets are not needed for the if with a single statement.

------

5. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate
brackets/comment

+ /*
+ * output BEGIN if we haven't yet,
+ * while streaming no need to send BEGIN / BEGIN PREPARE.
+ */
+ if (!in_streaming && !txndata->sent_begin_txn)
+ {
+ pgoutput_begin(ctx, txn);
+ }

5a. =>

Same as review comment 4. The brackets are not needed for the if with
a single statement.

5b. =>

Notice this code is the same as cited in review comment 4. So probably
the code comment should be consistent/same also?

------

6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message brackets

+ Assert(txndata);
+ if (!txndata->sent_begin_txn)
+ {
+ pgoutput_begin(ctx, txn);
+ }

=>

The brackets are not needed for the if with a single statement.

------

7. typdefs.list

=> The structure PGOutputTxnData was added in v12-0001, so the
typedefs.list probably should also be updated.

//////////

Comments for v12-0002
=====================

8. Patch comment

This patch addresses the above problem by postponing the BEGIN / BEGIN
PREPARE messages until the first change is encountered.
If (when processing a COMMIT / PREPARE message) we find there had been
no other change for that transaction, then do not send the COMMIT /
PREPARE message. This means that pgoutput will skip BEGIN / COMMIT
or BEGIN PREPARE / PREPARE messages for transactions that are empty.
pgoutput will also skip COMMIT PREPARED and ROLLBACK PREPARED messages
for transactions that are empty.

8a. =>

I’m not sure this comment is 100% correct for this specific patch. The
whole BEGIN/COMMIT was already handled by the v12-0001 patch, right?
So really this comment should only be mentioning about BEGIN PREPARE
and COMMIT PREPARED I thought.

8b. =>

I think there should also be some mention that this patch is not
handling the "streaming" case of empty tx at all.

------

9. src/backend/replication/logical/proto.c - protocol version

@@ -248,8 +250,10 @@ logicalrep_write_commit_prepared(StringInfo out,
ReorderBufferTXN *txn,
pq_sendbyte(out, flags);

/* send fields */
+ pq_sendint64(out, prepare_end_lsn);
pq_sendint64(out, commit_lsn);
pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, prepare_time);
pq_sendint64(out, txn->xact_time.commit_time);
pq_sendint32(out, txn->xid);

=>

I agree with a previous feedback comment from Amit - Probably there is
some protocol version requirement/implications here because the
message format has been changed in logicalrep_write_commit_prepared
and logicalrep_read_commit_prepared.

e.g. Does this code need to be cognisant of the version and behave
differently accordingly?

------

10. src/backend/replication/pgoutput/pgoutput.c -
pgoutput_begin_prepare flag moved?

+ Assert(txndata);
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin_prepare(ctx->out, txn);
+ txndata->sent_begin_txn = true;

send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
send_replication_origin);

OutputPluginWrite(ctx, true);
- txndata->sent_begin_txn = true;
- txn->output_plugin_private = txndata;
}

=>

In the v12-0001 patch, you set the begin_txn flags AFTER the
OuputPluginWrite, but in the v12-0002 you set them BEFORE the
OuputPluginWrite. Why the difference? Maybe it should be consistent?

------

11. src/test/subscription/t/021_twophase.pl - proto_version tests needed?

Does this need some other tests to make sure the older proto_version
is still usable? Refer also to the review comment 9.

------

12. src/tools/pgindent/typedefs.list - PGOutputTxnData

PGOutputData
+PGOutputTxnData
PGPROC

=>

This change looks good, but I think it should have been done in
v12-0001 and not here in v12-0002.

------

Kind Regards,
Peter Smith.
Fujitsu Australia

#55Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Amit Kapila (#52)
1 attachment(s)
Re: logical replication empty transactions

On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Fri, Jul 23, 2021 at 3:39 PM Ajin Cherian <itsajin@gmail.com> wrote:

Let's first split the patch for prepared and non-prepared cases as
that will help to focus on each of them separately. BTW, why haven't
you considered implementing point 1b as explained by Andres in his
email [1]? I think we can send a keepalive message in case of
synchronous replication when we skip an empty transaction, otherwise,
it might delay in responding to transactions synchronous_commit mode.
I think in the tests done in the thread, it might not have been shown
because we are already sending keepalives too frequently. But what if
someone disables wal_sender_timeout or kept it to a very large value?
See WalSndKeepaliveIfNecessary. The other thing you might want to look
at is if the reason for frequent keepalives is the same as described
in the email [2].

I have tried to address the comment here by modifying the
ctx->update_progress callback function (WalSndUpdateProgress) provided
for plugins. I have added an option
by which the callback can specify if it wants to send keep_alives. And
when the callback is called with that option set, walsender updates a
flag force_keep_alive_syncrep.
The Walsender in the WalSndWaitForWal for loop, checks this flag and
if synchronous replication is enabled, then sends a keep alive.
Currently this logic
is added as an else to the current logic that is already there in
WalSndWaitForWal, which is probably considered unnecessary and a
source of the keep alive flood
that you talked about. So, I can change that according to how that fix
shapes up there. I have also added an extern function in syncrep.c
that makes it possible
for walsender to query if synchronous replication is turned on.

The reason I had to turn on a flag and rely on the WalSndWaitForWal to
send the keep alive in its next iteration is because I tried doing
this directly when a
commit is skipped but it didn't work. The reason for this is that when
the commit is being decoded the sentptr at the moment is at the commit
LSN and the keep alive
will be sent for the commit LSN but the syncrep wait is waiting for
end_lsn of the transaction which is the next LSN. So, sending a keep
alive at the moment the
commit is decoded doesn't seem to solve the problem of the waiting
synchronous reply.

Few other miscellaneous comments:
1.
static void
pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
- XLogRecPtr commit_lsn)
+ XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
{
+ PGOutputTxnData    *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
OutputPluginUpdateProgress(ctx);
+ /*
+ * If the BEGIN PREPARE was not yet sent, then it means there were no
+ * relevant changes encountered, so we can skip the COMMIT PREPARED
+ * message too.
+ */
+ if (txndata)
+ {
+ bool skip = !txndata->sent_begin_txn;
+ pfree(txndata);
+ txn->output_plugin_private = NULL;

How is this supposed to work after the restart when prepared is sent
before the restart and we are just sending commit_prepared after
restart? Won't this lead to sending commit_prepared even when the
corresponding prepare is not sent? Can we think of a better way to
deal with this?

I have tried to resolve this by adding logic in worker,c to silently
ignore spurious commit_prepareds. But this change required checking if
the prepare exists on the
subscriber before attempting the commit_prepared but the current API
that checks this requires prepare time and transaction end_lsn. But
for this I had to
change the protocol of commit_prepared, and I understand that this
would break backward compatibility between subscriber and publisher
(you have raised this issue as well).
I am not sure how else to handle this, let me know if you have any
other ideas. One option could be to have another API to check if the
prepare exists on the subscriber with
the prepared 'gid' alone, without checking prepare_time or end_lsn.
Let me know if this idea works.

I have left out the patch 0002 for prepared transactions until we
arrive at a decision on how to address the above issue.

Peter,
I have also addressed the comments you've raised on patch 0001, please
have a look and confirm.

Regards,
Ajin Cherian
Fujitsu Australia.

Attachments:

v13-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v13-0001-Skip-empty-transactions-for-logical-replication.patch
#56Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: Ajin Cherian (#55)
Re: logical replication empty transactions

On Fri, Aug 13, 2021 at 9:01 PM Ajin Cherian <itsajin@gmail.com> wrote:

On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Fri, Jul 23, 2021 at 3:39 PM Ajin Cherian <itsajin@gmail.com> wrote:

Let's first split the patch for prepared and non-prepared cases as
that will help to focus on each of them separately. BTW, why haven't
you considered implementing point 1b as explained by Andres in his
email [1]? I think we can send a keepalive message in case of
synchronous replication when we skip an empty transaction, otherwise,
it might delay in responding to transactions synchronous_commit mode.
I think in the tests done in the thread, it might not have been shown
because we are already sending keepalives too frequently. But what if
someone disables wal_sender_timeout or kept it to a very large value?
See WalSndKeepaliveIfNecessary. The other thing you might want to look
at is if the reason for frequent keepalives is the same as described
in the email [2].

I have tried to address the comment here by modifying the
ctx->update_progress callback function (WalSndUpdateProgress) provided
for plugins. I have added an option
by which the callback can specify if it wants to send keep_alives. And
when the callback is called with that option set, walsender updates a
flag force_keep_alive_syncrep.
The Walsender in the WalSndWaitForWal for loop, checks this flag and
if synchronous replication is enabled, then sends a keep alive.
Currently this logic
is added as an else to the current logic that is already there in
WalSndWaitForWal, which is probably considered unnecessary and a
source of the keep alive flood
that you talked about. So, I can change that according to how that fix
shapes up there. I have also added an extern function in syncrep.c
that makes it possible
for walsender to query if synchronous replication is turned on.

The reason I had to turn on a flag and rely on the WalSndWaitForWal to
send the keep alive in its next iteration is because I tried doing
this directly when a
commit is skipped but it didn't work. The reason for this is that when
the commit is being decoded the sentptr at the moment is at the commit
LSN and the keep alive
will be sent for the commit LSN but the syncrep wait is waiting for
end_lsn of the transaction which is the next LSN. So, sending a keep
alive at the moment the
commit is decoded doesn't seem to solve the problem of the waiting
synchronous reply.

Few other miscellaneous comments:
1.
static void
pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
- XLogRecPtr commit_lsn)
+ XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
{
+ PGOutputTxnData    *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
OutputPluginUpdateProgress(ctx);
+ /*
+ * If the BEGIN PREPARE was not yet sent, then it means there were no
+ * relevant changes encountered, so we can skip the COMMIT PREPARED
+ * message too.
+ */
+ if (txndata)
+ {
+ bool skip = !txndata->sent_begin_txn;
+ pfree(txndata);
+ txn->output_plugin_private = NULL;

How is this supposed to work after the restart when prepared is sent
before the restart and we are just sending commit_prepared after
restart? Won't this lead to sending commit_prepared even when the
corresponding prepare is not sent? Can we think of a better way to
deal with this?

I have tried to resolve this by adding logic in worker,c to silently
ignore spurious commit_prepareds. But this change required checking if
the prepare exists on the
subscriber before attempting the commit_prepared but the current API
that checks this requires prepare time and transaction end_lsn. But
for this I had to
change the protocol of commit_prepared, and I understand that this
would break backward compatibility between subscriber and publisher
(you have raised this issue as well).
I am not sure how else to handle this, let me know if you have any
other ideas. One option could be to have another API to check if the
prepare exists on the subscriber with
the prepared 'gid' alone, without checking prepare_time or end_lsn.
Let me know if this idea works.

I have left out the patch 0002 for prepared transactions until we
arrive at a decision on how to address the above issue.

Peter,
I have also addressed the comments you've raised on patch 0001, please
have a look and confirm.

I have reviewed the v13-0001 patch.

Apply / build / test was all OK

Below are my code review comments.

//////////

Comments for v13-0001
=====================

1. Patch comment

=>

Probably this comment should include some description for the new
"keepalive" logic as well.

------

2. src/backend/replication/syncrep.c - new function

@@ -330,6 +330,18 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
}

 /*
+ * Check if Sync Rep is enabled
+ */
+bool
+SyncRepEnabled(void)
+{
+ if (SyncRepRequested() && ((volatile WalSndCtlData *)
WalSndCtl)->sync_standbys_defined)
+ return true;
+ else
+ return false;
+}
+

2a. Function comment =>

Why abbreviations in the comment? Why not say "synchronous
replication" instead of "Sync Rep".

~~

2b. if/else =>

Remove the if/else. e.g.

return SyncRepRequested() && ((volatile WalSndCtlData *)
WalSndCtl)->sync_standbys_defined;

~~

2c. Call the new function =>

There is some existing similar code in SyncRepWaitForLSN(), e.g.

if (!SyncRepRequested() ||
!((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
return;

Now that you have a new function you maybe can call it from here, e.g.

if (!SyncRepEnabled())
return;

------

3. src/backend/replication/walsender.c - whitespace

+ if (send_keep_alive)
+ force_keep_alive_syncrep = true;
+
+

=>

Extra blank line?

------

4. src/backend/replication/walsender.c - call keepalive

  if (MyWalSnd->flush < sentPtr &&
  MyWalSnd->write < sentPtr &&
  !waiting_for_ping_response)
+ {
  WalSndKeepalive(false);
+ }
+ else
+ {
+ if (force_keep_alive_syncrep && SyncRepEnabled())
+ WalSndKeepalive(false);
+ }

4a. Move the SynRepEnabled() call =>

I think it is not necessary to call the SynRepEnabled() here. Instead,
it might be better if this is called back when you assign the
force_keep_alive_syncrep flag. So change the WalSndUpdateProgress,
e.g.

BEFORE
if (send_keep_alive)
force_keep_alive_syncrep = true;
AFTER
force_keep_alive_syncrep = send_keep_alive && SyncRepEnabled();

Note: Also, that assignment also deserves a big comment to say what it is doing.

~~

4b. Change the if/else =>

If you make the change for 4a. then perhaps the keepalive if/else is
overkill and could be changed.e.g.

if (force_keep_alive_syncrep ||
MyWalSnd->flush < sentPtr &&
MyWalSnd->write < sentPtr &&
!waiting_for_ping_response)
WalSndKeepalive(false);

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#57Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Peter Smith (#56)
1 attachment(s)
Re: logical replication empty transactions

On Mon, Aug 16, 2021 at 4:44 PM Peter Smith <smithpb2250@gmail.com> wrote:

I have reviewed the v13-0001 patch.

Apply / build / test was all OK

Below are my code review comments.

//////////

Comments for v13-0001
=====================

1. Patch comment

=>

Probably this comment should include some description for the new
"keepalive" logic as well.

Added.

------

2. src/backend/replication/syncrep.c - new function

@@ -330,6 +330,18 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
}

/*
+ * Check if Sync Rep is enabled
+ */
+bool
+SyncRepEnabled(void)
+{
+ if (SyncRepRequested() && ((volatile WalSndCtlData *)
WalSndCtl)->sync_standbys_defined)
+ return true;
+ else
+ return false;
+}
+

2a. Function comment =>

Why abbreviations in the comment? Why not say "synchronous
replication" instead of "Sync Rep".

Changed.

~~

2b. if/else =>

Remove the if/else. e.g.

return SyncRepRequested() && ((volatile WalSndCtlData *)
WalSndCtl)->sync_standbys_defined;

~~

Changed.

2c. Call the new function =>

There is some existing similar code in SyncRepWaitForLSN(), e.g.

if (!SyncRepRequested() ||
!((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
return;

Now that you have a new function you maybe can call it from here, e.g.

if (!SyncRepEnabled())
return;

Updated.

------

3. src/backend/replication/walsender.c - whitespace

+ if (send_keep_alive)
+ force_keep_alive_syncrep = true;
+
+

=>

Extra blank line?

Removed.

------

4. src/backend/replication/walsender.c - call keepalive

if (MyWalSnd->flush < sentPtr &&
MyWalSnd->write < sentPtr &&
!waiting_for_ping_response)
+ {
WalSndKeepalive(false);
+ }
+ else
+ {
+ if (force_keep_alive_syncrep && SyncRepEnabled())
+ WalSndKeepalive(false);
+ }

4a. Move the SynRepEnabled() call =>

I think it is not necessary to call the SynRepEnabled() here. Instead,
it might be better if this is called back when you assign the
force_keep_alive_syncrep flag. So change the WalSndUpdateProgress,
e.g.

BEFORE
if (send_keep_alive)
force_keep_alive_syncrep = true;
AFTER
force_keep_alive_syncrep = send_keep_alive && SyncRepEnabled();

Note: Also, that assignment also deserves a big comment to say what it is doing.

~~

changed.

4b. Change the if/else =>

If you make the change for 4a. then perhaps the keepalive if/else is
overkill and could be changed.e.g.

if (force_keep_alive_syncrep ||
MyWalSnd->flush < sentPtr &&
MyWalSnd->write < sentPtr &&
!waiting_for_ping_response)
WalSndKeepalive(false);

Changed.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v14-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v14-0001-Skip-empty-transactions-for-logical-replication.patch
#58Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: Ajin Cherian (#57)
Re: logical replication empty transactions

I reviewed the v14-0001 patch.

All my previous comments have been addressed.

Apply / build / test was all OK.

------

More review comments:

1. Params names in the function declarations should match the rest of the code.

1a. src/include/replication/logical.h

@@ -26,7 +26,8 @@ typedef LogicalOutputPluginWriterWrite
LogicalOutputPluginWriterPrepareWrite;

 typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct
LogicalDecodingContext *lr,
  XLogRecPtr Ptr,
- TransactionId xid
+ TransactionId xid,
+ bool send_keep_alive

=>
Change "send_keep_alive" --> "send_keepalive"

~~

1b. src/include/replication/output_plugin.h

@@ -243,6 +243,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext
*ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx,
bool last_write);
-extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext
*ctx, bool send_keep_alive);

=>
Change "send_keep_alive" --> "send_keepalive"

------

2. Comment should be capitalized - src/backend/replication/walsender.c

@@ -170,6 +170,9 @@ static TimestampTz last_reply_timestamp = 0;
/* Have we sent a heartbeat message asking for reply, since last reply? */
static bool waiting_for_ping_response = false;

+/* force keep alive when skipping transactions in synchronous
replication mode */
+static bool force_keepalive_syncrep = false;

=>
"force" --> "Force"

------

Otherwise, v14-0001 LGTM.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#59Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Peter Smith (#58)
1 attachment(s)
Re: logical replication empty transactions

On Wed, Aug 25, 2021 at 5:15 PM Peter Smith <smithpb2250@gmail.com> wrote:

I reviewed the v14-0001 patch.

All my previous comments have been addressed.

Apply / build / test was all OK.

------

More review comments:

1. Params names in the function declarations should match the rest of the code.

1a. src/include/replication/logical.h

@@ -26,7 +26,8 @@ typedef LogicalOutputPluginWriterWrite
LogicalOutputPluginWriterPrepareWrite;

typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct
LogicalDecodingContext *lr,
XLogRecPtr Ptr,
- TransactionId xid
+ TransactionId xid,
+ bool send_keep_alive

=>
Change "send_keep_alive" --> "send_keepalive"

~~

1b. src/include/replication/output_plugin.h

@@ -243,6 +243,6 @@ typedef struct OutputPluginCallbacks
/* Functions in replication/logical/logical.c */
extern void OutputPluginPrepareWrite(struct LogicalDecodingContext
*ctx, bool last_write);
extern void OutputPluginWrite(struct LogicalDecodingContext *ctx,
bool last_write);
-extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext
*ctx, bool send_keep_alive);

=>
Change "send_keep_alive" --> "send_keepalive"

------

2. Comment should be capitalized - src/backend/replication/walsender.c

@@ -170,6 +170,9 @@ static TimestampTz last_reply_timestamp = 0;
/* Have we sent a heartbeat message asking for reply, since last reply? */
static bool waiting_for_ping_response = false;

+/* force keep alive when skipping transactions in synchronous
replication mode */
+static bool force_keepalive_syncrep = false;

=>
"force" --> "Force"

------

Otherwise, v14-0001 LGTM.

Thanks for the comments. Addressed them in the attached patch.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v15-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v15-0001-Skip-empty-transactions-for-logical-replication.patch
#60Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Ajin Cherian (#59)
1 attachment(s)
Re: logical replication empty transactions

On Wed, Sep 1, 2021 at 8:57 PM Ajin Cherian <itsajin@gmail.com> wrote:

Thanks for the comments. Addressed them in the attached patch.

regards,
Ajin Cherian
Fujitsu Australia

Minor update to rebase the patch so that it applies clean on HEAD.

regards,
Ajin Cherian

regards,
Ajin Cherian

Attachments:

v16-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v16-0001-Skip-empty-transactions-for-logical-replication.patch
#61osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
In reply to: Ajin Cherian (#60)
RE: logical replication empty transactions

On Tuesday, January 11, 2022 6:43 PM Ajin Cherian <itsajin@gmail.com> wrote:

Minor update to rebase the patch so that it applies clean on HEAD.

Hi, thanks for you rebase.

Several comments.

(1) the commit message

"
transactions, keepalive messages are sent to keep the LSN locations updated
on the standby.
This patch does not skip empty transactions that are "streaming" or "two-phase".
"

I suggest that one blank line might be needed before the last paragraph.

(2) Could you please remove one pair of curly brackets for one sentence below ?

@@ -1546,10 +1557,13 @@ WalSndWaitForWal(XLogRecPtr loc)
                 * otherwise idle, this keepalive will trigger a reply. Processing the
                 * reply will update these MyWalSnd locations.
                 */
-               if (MyWalSnd->flush < sentPtr &&
+               if (force_keepalive_syncrep ||
+                       (MyWalSnd->flush < sentPtr &&
                        MyWalSnd->write < sentPtr &&
-                       !waiting_for_ping_response)
+                       !waiting_for_ping_response))
+               {
                        WalSndKeepalive(false);
+               }

(3) Is this patch's reponsibility to intialize the data in pgoutput_begin_prepare_txn ?

@@ -433,6 +487,8 @@ static void
 pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
        bool            send_replication_origin = txn->origin_id != InvalidRepOriginId;
+       PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
+                                                                                                                sizeof(PGOutputTxnData));

OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin_prepare(ctx->out, txn);

Even if we need this initialization for either non streaming case
or non two_phase case, there can be another issue.
We don't free the allocated memory for this data, right ?
There's only one place to use free in the entire patch,
which is in the pgoutput_commit_txn(). So,
corresponding free of memory looked necessary
in the two phase commit functions.

(4) SyncRepEnabled's better alignment.

IIUC, SyncRepEnabled is called not only by the walsender but also by other backends
via CommitTransaction -> RecordTransactionCommit -> SyncRepWaitForLSN.
Then, the place to add the prototype function for SyncRepEnabled seems not appropriate,
strictly speaking or requires a comment like /* called by wal sender or other backends */.

@@ -90,6 +90,7 @@ extern void SyncRepCleanupAtProcExit(void);
/* called by wal sender */
extern void SyncRepInitConfig(void);
extern void SyncRepReleaseWaiters(void);
+extern bool SyncRepEnabled(void);

Even if we intend it is only used by the walsender, the current code place
of SyncRepEnabled in the syncrep.c might not be perfect.
In this file, seemingly we have a section for functions for wal sender processes
and the place where you wrote it is not here.

at src/backend/replication/syncrep.c, find a comment below.
/*
* ===========================================================
* Synchronous Replication functions for wal sender processes
* ===========================================================
*/

(5) minor alignment for expressing a couple of messages.

@@ -777,6 +846,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Oid *relids;
TransactionId xid = InvalidTransactionId;

+       /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
+       Assert(in_streaming || txndata);

In the commit message, the way you write is below.
...
skip BEGIN / COMMIT messages for transactions that are empty. The patch
...

In this case, we have spaces back and forth for "BEGIN / COMMIT".
Then, I suggest to unify all of those to show better alignment.

Best Regards,
Takamichi Osumi

#62osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
In reply to: Ajin Cherian (#60)
RE: logical replication empty transactions

On Tuesday, January 11, 2022 6:43 PM From: Ajin Cherian <itsajin@gmail.com> wrote:

Minor update to rebase the patch so that it applies clean on HEAD.

Hi, let me share some additional comments on v16.

(1) comment of pgoutput_change

@@ -630,11 +688,15 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                Relation relation, ReorderBufferChange *change)
 {
        PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+       PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
        MemoryContext old;
        RelationSyncEntry *relentry;
        TransactionId xid = InvalidTransactionId;
        Relation        ancestor = NULL;
+       /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
+       Assert(in_streaming || txndata);
+

In my humble opinion, the comment should not touch BEGIN PREPARE,
because this patch's scope doesn't include two phase commit.
(We could add this in another patch to extend the scope after the commit ?)

This applies to pgoutput_truncate's comment.

(2) "keep alive" should be "keepalive" in WalSndUpdateProgress

        /*
+        * When skipping empty transactions in synchronous replication, we need
+        * to send a keep alive to keep the MyWalSnd locations updated.
+        */
+       force_keepalive_syncrep = send_keepalive && SyncRepEnabled();
+

Also, this applies to the comment for force_keepalive_syncrep.

(3) Should finish the second sentence with period in the comment of pgoutput_message.

@@ -845,6 +923,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (in_streaming)
xid = txn->xid;

+       /*
+        * Output BEGIN if we haven't yet.
+        * Avoid for streaming and non-transactional messages

(4) "begin" can be changed to "BEGIN" in the comment of PGOutputTxnData definition.

In the entire patch, when we express BEGIN message,
we use capital letters "BEGIN" except for one place.
We can apply the same to this place as well.

+typedef struct PGOutputTxnData
+{
+       bool sent_begin_txn;    /* flag indicating whether begin has been sent */
+} PGOutputTxnData;
+

(5) inconsistent way to write Assert statements with blank lines

In the below case, it'd be better to insert one blank line
after the Assert();

+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
        bool            send_replication_origin = txn->origin_id != InvalidRepOriginId;
+       PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;

+ Assert(txndata);
OutputPluginPrepareWrite(ctx, !send_replication_origin);

(6) new codes in the pgoutput_commit_txn looks messy slightly

@@ -419,7 +455,25 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                        XLogRecPtr commit_lsn)
 {
-       OutputPluginUpdateProgress(ctx);
+       PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+       bool            skip;
+
+       Assert(txndata);
+
+       /*
+        * If a BEGIN message was not yet sent, then it means there were no relevant
+        * changes encountered, so we can skip the COMMIT message too.
+        */
+       skip = !txndata->sent_begin_txn;
+       pfree(txndata);
+       txn->output_plugin_private = NULL;
+       OutputPluginUpdateProgress(ctx, skip);

Could we conduct a refactoring for this new part ?
IMO, writing codes to free the data structure at the top
of function seems weird.

One idea is to export some part there
and write a new function, something like below.

static bool
txn_sent_begin(ReorderBufferTXN *txn)
{
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
bool needs_skip;

Assert(txndata);

needs_skip = !txndata->sent_begin_txn;

pfree(txndata);
txn->output_plugin_private = NULL;

return needs_skip;
}

FYI, I had a look at the v12-0002-Skip-empty-prepared-transactions-for-logical-rep.patch
for reference of pgoutput_rollback_prepared_txn and pgoutput_commit_prepared_txn.
Looks this kind of function might work for future extensions as well.
What did you think ?

Best Regards,
Takamichi Osumi

#63Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: osumi.takamichi@fujitsu.com (#61)
1 attachment(s)
Re: logical replication empty transactions

On Wed, Jan 26, 2022 at 8:33 PM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:

On Tuesday, January 11, 2022 6:43 PM Ajin Cherian <itsajin@gmail.com> wrote:

Minor update to rebase the patch so that it applies clean on HEAD.

Hi, thanks for you rebase.

Several comments.

(1) the commit message

"
transactions, keepalive messages are sent to keep the LSN locations updated
on the standby.
This patch does not skip empty transactions that are "streaming" or "two-phase".
"

I suggest that one blank line might be needed before the last paragraph.

Changed.

(2) Could you please remove one pair of curly brackets for one sentence below ?

@@ -1546,10 +1557,13 @@ WalSndWaitForWal(XLogRecPtr loc)
* otherwise idle, this keepalive will trigger a reply. Processing the
* reply will update these MyWalSnd locations.
*/
-               if (MyWalSnd->flush < sentPtr &&
+               if (force_keepalive_syncrep ||
+                       (MyWalSnd->flush < sentPtr &&
MyWalSnd->write < sentPtr &&
-                       !waiting_for_ping_response)
+                       !waiting_for_ping_response))
+               {
WalSndKeepalive(false);
+               }

Changed.

(3) Is this patch's reponsibility to intialize the data in pgoutput_begin_prepare_txn ?

@@ -433,6 +487,8 @@ static void
pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
bool            send_replication_origin = txn->origin_id != InvalidRepOriginId;
+       PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
+                                                                                                                sizeof(PGOutputTxnData));

OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin_prepare(ctx->out, txn);

Even if we need this initialization for either non streaming case
or non two_phase case, there can be another issue.
We don't free the allocated memory for this data, right ?
There's only one place to use free in the entire patch,
which is in the pgoutput_commit_txn(). So,
corresponding free of memory looked necessary
in the two phase commit functions.

Actually it is required for begin_prepare to set the data type, so
that the checks in the pgoutput_change can make sure that
the begin prepare is sent. I've also added a free in commit_prepared code.

(4) SyncRepEnabled's better alignment.

IIUC, SyncRepEnabled is called not only by the walsender but also by other backends
via CommitTransaction -> RecordTransactionCommit -> SyncRepWaitForLSN.
Then, the place to add the prototype function for SyncRepEnabled seems not appropriate,
strictly speaking or requires a comment like /* called by wal sender or other backends */.

@@ -90,6 +90,7 @@ extern void SyncRepCleanupAtProcExit(void);
/* called by wal sender */
extern void SyncRepInitConfig(void);
extern void SyncRepReleaseWaiters(void);
+extern bool SyncRepEnabled(void);

Even if we intend it is only used by the walsender, the current code place
of SyncRepEnabled in the syncrep.c might not be perfect.
In this file, seemingly we have a section for functions for wal sender processes
and the place where you wrote it is not here.

at src/backend/replication/syncrep.c, find a comment below.
/*
* ===========================================================
* Synchronous Replication functions for wal sender processes
* ===========================================================
*/

Changed.

(5) minor alignment for expressing a couple of messages.

@@ -777,6 +846,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Oid *relids;
TransactionId xid = InvalidTransactionId;

+       /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
+       Assert(in_streaming || txndata);

In the commit message, the way you write is below.
...
skip BEGIN / COMMIT messages for transactions that are empty. The patch
...

In this case, we have spaces back and forth for "BEGIN / COMMIT".
Then, I suggest to unify all of those to show better alignment.

fixed.

regards,
Ajin Cherian

Attachments:

v17-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v17-0001-Skip-empty-transactions-for-logical-replication.patch
#64Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: osumi.takamichi@fujitsu.com (#62)
Re: logical replication empty transactions

On Thu, Jan 27, 2022 at 12:16 AM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:

On Tuesday, January 11, 2022 6:43 PM From: Ajin Cherian <itsajin@gmail.com> wrote:

Minor update to rebase the patch so that it applies clean on HEAD.

Hi, let me share some additional comments on v16.

(1) comment of pgoutput_change

@@ -630,11 +688,15 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+       PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
MemoryContext old;
RelationSyncEntry *relentry;
TransactionId xid = InvalidTransactionId;
Relation        ancestor = NULL;
+       /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
+       Assert(in_streaming || txndata);
+

In my humble opinion, the comment should not touch BEGIN PREPARE,
because this patch's scope doesn't include two phase commit.
(We could add this in another patch to extend the scope after the commit ?)

We have to include BEGIN PREPARE as well, as the txndata has to be
setup. Only difference is that we will not skip empty transaction in
BEGIN PREPARE

This applies to pgoutput_truncate's comment.

(2) "keep alive" should be "keepalive" in WalSndUpdateProgress

/*
+        * When skipping empty transactions in synchronous replication, we need
+        * to send a keep alive to keep the MyWalSnd locations updated.
+        */
+       force_keepalive_syncrep = send_keepalive && SyncRepEnabled();
+

Also, this applies to the comment for force_keepalive_syncrep.

Fixed.

(3) Should finish the second sentence with period in the comment of pgoutput_message.

@@ -845,6 +923,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (in_streaming)
xid = txn->xid;

+       /*
+        * Output BEGIN if we haven't yet.
+        * Avoid for streaming and non-transactional messages

Fixed.

(4) "begin" can be changed to "BEGIN" in the comment of PGOutputTxnData definition.

In the entire patch, when we express BEGIN message,
we use capital letters "BEGIN" except for one place.
We can apply the same to this place as well.

+typedef struct PGOutputTxnData
+{
+       bool sent_begin_txn;    /* flag indicating whether begin has been sent */
+} PGOutputTxnData;
+

Fixed.

(5) inconsistent way to write Assert statements with blank lines

In the below case, it'd be better to insert one blank line
after the Assert();

+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
bool            send_replication_origin = txn->origin_id != InvalidRepOriginId;
+       PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;

+ Assert(txndata);
OutputPluginPrepareWrite(ctx, !send_replication_origin);

Fixed.

(6) new codes in the pgoutput_commit_txn looks messy slightly

@@ -419,7 +455,25 @@ static void
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
-       OutputPluginUpdateProgress(ctx);
+       PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+       bool            skip;
+
+       Assert(txndata);
+
+       /*
+        * If a BEGIN message was not yet sent, then it means there were no relevant
+        * changes encountered, so we can skip the COMMIT message too.
+        */
+       skip = !txndata->sent_begin_txn;
+       pfree(txndata);
+       txn->output_plugin_private = NULL;
+       OutputPluginUpdateProgress(ctx, skip);

Could we conduct a refactoring for this new part ?
IMO, writing codes to free the data structure at the top
of function seems weird.

One idea is to export some part there
and write a new function, something like below.

static bool
txn_sent_begin(ReorderBufferTXN *txn)
{
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
bool needs_skip;

Assert(txndata);

needs_skip = !txndata->sent_begin_txn;

pfree(txndata);
txn->output_plugin_private = NULL;

return needs_skip;
}

FYI, I had a look at the v12-0002-Skip-empty-prepared-transactions-for-logical-rep.patch
for reference of pgoutput_rollback_prepared_txn and pgoutput_commit_prepared_txn.
Looks this kind of function might work for future extensions as well.
What did you think ?

I changed a bit, but I'd hold a comprehensive rewrite when a future
patch supports skipping
empty transactions in two-phase transactions and streaming transactions.

regards,
Ajin Cherian

#65osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
In reply to: Ajin Cherian (#63)
RE: logical replication empty transactions

On Thursday, January 27, 2022 9:57 PM Ajin Cherian <itsajin@gmail.com> wrote:
Hi, thanks for your patch update.

On Wed, Jan 26, 2022 at 8:33 PM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:

On Tuesday, January 11, 2022 6:43 PM Ajin Cherian <itsajin@gmail.com>

wrote:

(3) Is this patch's reponsibility to intialize the data in

pgoutput_begin_prepare_txn ?

@@ -433,6 +487,8 @@ static void
pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) {
bool send_replication_origin = txn->origin_id !=

InvalidRepOriginId;

+ PGOutputTxnData *txndata =

MemoryContextAllocZero(ctx->context,

+
+ sizeof(PGOutputTxnData));

OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin_prepare(ctx->out, txn);

Even if we need this initialization for either non streaming case or
non two_phase case, there can be another issue.
We don't free the allocated memory for this data, right ?
There's only one place to use free in the entire patch, which is in
the pgoutput_commit_txn(). So, corresponding free of memory looked
necessary in the two phase commit functions.

Actually it is required for begin_prepare to set the data type, so that the checks
in the pgoutput_change can make sure that the begin prepare is sent. I've also
added a free in commit_prepared code.

Okay, but if we choose the design that this patch takes
care of the initialization in pgoutput_begin_prepare_txn(),
we need another free in pgoutput_rollback_prepared_txn().
Could you please add some codes similar to pgoutput_commit_prepared_txn() to the same ?
If we simply execute rollback prepared for non streaming transaction,
we don't free it.

Some other new minor comments.

(a) can be "synchronous replication", instead of "Synchronous Replication"

When we have a look at the syncrep.c, we use the former usually in
a normal comment.

 /*
+ * Check if Synchronous Replication is enabled
+ */

(b) move below pgoutput_truncate two codes to the case where if nrelids > 0.

@@ -770,6 +850,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                  int nrelations, Relation relations[], ReorderBufferChange *change)
 {
        PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+       PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
        MemoryContext old;
        RelationSyncEntry *relentry;
        int                     i;
@@ -777,6 +858,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        Oid                *relids;
        TransactionId xid = InvalidTransactionId;
+       /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
+       Assert(in_streaming || txndata);
+

(c) fix indent with spaces (for the one sentence of SyncRepEnabled)

@@ -539,6 +538,15 @@ SyncRepReleaseWaiters(void)
}

 /*
+ * Check if Synchronous Replication is enabled
+ */
+bool
+SyncRepEnabled(void)
+{
+    return SyncRepRequested() && ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined;
+}
+
+/*

This can be detected by git am.

Best Regards,
Takamichi Osumi

#66Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: osumi.takamichi@fujitsu.com (#65)
1 attachment(s)
Re: logical replication empty transactions

On Sun, Jan 30, 2022 at 7:04 PM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:

On Thursday, January 27, 2022 9:57 PM Ajin Cherian <itsajin@gmail.com> wrote:
Hi, thanks for your patch update.

On Wed, Jan 26, 2022 at 8:33 PM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:

On Tuesday, January 11, 2022 6:43 PM Ajin Cherian <itsajin@gmail.com>

wrote:

(3) Is this patch's reponsibility to intialize the data in

pgoutput_begin_prepare_txn ?

@@ -433,6 +487,8 @@ static void
pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) {
bool send_replication_origin = txn->origin_id !=

InvalidRepOriginId;

+ PGOutputTxnData *txndata =

MemoryContextAllocZero(ctx->context,

+
+ sizeof(PGOutputTxnData));

OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin_prepare(ctx->out, txn);

Even if we need this initialization for either non streaming case or
non two_phase case, there can be another issue.
We don't free the allocated memory for this data, right ?
There's only one place to use free in the entire patch, which is in
the pgoutput_commit_txn(). So, corresponding free of memory looked
necessary in the two phase commit functions.

Actually it is required for begin_prepare to set the data type, so that the checks
in the pgoutput_change can make sure that the begin prepare is sent. I've also
added a free in commit_prepared code.

Okay, but if we choose the design that this patch takes
care of the initialization in pgoutput_begin_prepare_txn(),
we need another free in pgoutput_rollback_prepared_txn().
Could you please add some codes similar to pgoutput_commit_prepared_txn() to the same ?
If we simply execute rollback prepared for non streaming transaction,
we don't free it.

Fixed.

Some other new minor comments.

(a) can be "synchronous replication", instead of "Synchronous Replication"

When we have a look at the syncrep.c, we use the former usually in
a normal comment.

/*
+ * Check if Synchronous Replication is enabled
+ */

Fixed.

(b) move below pgoutput_truncate two codes to the case where if nrelids > 0.

@@ -770,6 +850,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+       PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
MemoryContext old;
RelationSyncEntry *relentry;
int                     i;
@@ -777,6 +858,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Oid                *relids;
TransactionId xid = InvalidTransactionId;
+       /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
+       Assert(in_streaming || txndata);
+

Fixed.

(c) fix indent with spaces (for the one sentence of SyncRepEnabled)

@@ -539,6 +538,15 @@ SyncRepReleaseWaiters(void)
}

/*
+ * Check if Synchronous Replication is enabled
+ */
+bool
+SyncRepEnabled(void)
+{
+    return SyncRepRequested() && ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined;
+}
+
+/*

This can be detected by git am.

Fixed.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v18-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v18-0001-Skip-empty-transactions-for-logical-replication.patch
#67osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
In reply to: Ajin Cherian (#66)
RE: logical replication empty transactions

Hi,

Thank you for your updating the patch.

I'll quote one of the past discussions
in order to make this thread go forward or more active.
On Friday, August 13, 2021 8:01 PM Ajin Cherian <itsajin@gmail.com> wrote:

On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:

On Fri, Jul 23, 2021 at 3:39 PM Ajin Cherian <itsajin@gmail.com> wrote:

Let's first split the patch for prepared and non-prepared cases as
that will help to focus on each of them separately. BTW, why haven't
you considered implementing point 1b as explained by Andres in his
email [1]? I think we can send a keepalive message in case of
synchronous replication when we skip an empty transaction, otherwise,
it might delay in responding to transactions synchronous_commit mode.
I think in the tests done in the thread, it might not have been shown
because we are already sending keepalives too frequently. But what if
someone disables wal_sender_timeout or kept it to a very large value?
See WalSndKeepaliveIfNecessary. The other thing you might want to look
at is if the reason for frequent keepalives is the same as described
in the email [2].

I have tried to address the comment here by modifying the
ctx->update_progress callback function (WalSndUpdateProgress) provided
for plugins. I have added an option
by which the callback can specify if it wants to send keep_alives. And when
the callback is called with that option set, walsender updates a flag
force_keep_alive_syncrep.
The Walsender in the WalSndWaitForWal for loop, checks this flag and if
synchronous replication is enabled, then sends a keep alive.
Currently this logic
is added as an else to the current logic that is already there in
WalSndWaitForWal, which is probably considered unnecessary and a source of
the keep alive flood that you talked about. So, I can change that according to
how that fix shapes up there. I have also added an extern function in syncrep.c
that makes it possible for walsender to query if synchronous replication is
turned on.

Changing the timing to send the keepalive to the decoding commit
timing didn't look impossible to me, although my suggestion
can be ad-hoc.

After the initialization of sentPtr(by confirmed_flush lsn),
sentPtr is updated from logical_decoding_ctx->reader->EndRecPtr in XLogSendLogical.
In the XLogSendLogical, we update it after we execute LogicalDecodingProcessRecord.
This order leads to the current implementation to wait the next iteration
to send a keepalive in WalSndWaitForWal.

But, I felt we can utilize end_lsn passed to ReorderBufferCommit for updating
sentPtr. The end_lsn is the lsn same as the ctx->reader->EndRecPtr,
which means advancing the timing to update the sentPtr for the commit case.
Then if the transaction is empty in synchronous mode,
send the keepalive in WalSndUpdateProgress directly,
instead of having the force_keepalive_syncrep flag and having it true.

Best Regards,
Takamichi Osumi

#68osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
In reply to: Ajin Cherian (#66)
RE: logical replication empty transactions

Hi

I'll quote one other remaining discussion of this thread again
to invoke more attentions from the community.
On Friday, August 13, 2021 8:01 PM Ajin Cherian <itsajin@gmail.com> wrote:

On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:

Few other miscellaneous comments:
1.
static void
pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
- XLogRecPtr commit_lsn)
+ XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, TimestampTz
+ prepare_time)
{
+ PGOutputTxnData    *txndata = (PGOutputTxnData *)

txn->output_plugin_private;

+
OutputPluginUpdateProgress(ctx);

+ /*
+ * If the BEGIN PREPARE was not yet sent, then it means there were no
+ * relevant changes encountered, so we can skip the COMMIT PREPARED
+ * message too.
+ */
+ if (txndata)
+ {
+ bool skip = !txndata->sent_begin_txn; pfree(txndata);
+ txn->output_plugin_private = NULL;

How is this supposed to work after the restart when prepared is sent
before the restart and we are just sending commit_prepared after
restart? Won't this lead to sending commit_prepared even when the
corresponding prepare is not sent? Can we think of a better way to
deal with this?

I have tried to resolve this by adding logic in worker,c to silently ignore spurious
commit_prepareds. But this change required checking if the prepare exists on
the subscriber before attempting the commit_prepared but the current API that
checks this requires prepare time and transaction end_lsn. But for this I had to
change the protocol of commit_prepared, and I understand that this would
break backward compatibility between subscriber and publisher (you have
raised this issue as well).
I am not sure how else to handle this, let me know if you have any other ideas.

I feel if we don't want to change the protocol of commit_prepared,
we need to make the publisher solely judge whether the prepare was empty or not,
after the restart.

One idea I thought at the beginning was to utilize and apply
the existing mechanism to spill ReorderBufferSerializeTXN object to local disk,
by postponing the prepare txn object cleanup and when the walsender exits
and commit prepared didn't come, spilling the transaction's data,
then restoring it after the restart in the DecodePrepare.
However, this idea wasn't crash-safe fundamentally. It means,
if the publisher crashes before spilling the empty prepare transaction,
we fail to detect the prepare was empty and come down to send the commit_prepared
in the situation where the subscriber didn't get the prepare data again.
So, I thought to utilize the spill mechanism didn't work for this purpose.

Another idea would be, to create an empty file under the the pg_replslot/slotname
with a prefix different from "xid" in the DecodePrepare before the shutdown
if the prepare was empty, and bypass the cleanup of the serialized txns
and check the existence after the restart. But, this is pretty ad-hoc and I wasn't sure
if to address the corner case of the restart has the strong enough justification
to create this new file format.

Therefore, in my humble opinion, the idea of protocol change slightly wins,
since the impact of the protocol change would not be big. We introduced
the protocol version 3 in the devel version and the number of users should be little.

Best Regards,
Takamichi Osumi

#69Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Ajin Cherian (#66)
Re: logical replication empty transactions

On Mon, Jan 31, 2022 at 6:18 PM Ajin Cherian <itsajin@gmail.com> wrote:

Few comments:
=============
1. Is there any particular why the patch is not skipping empty xacts
for streaming (in-progress) transactions as noted in the commit
message as well?

2.
+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ Assert(txndata);

I think here you can add an assert for sent_begin_txn to be always false?

3.
+/*
+ * Send BEGIN.
+ * This is where the BEGIN is actually sent. This is called
+ * while processing the first change of the transaction.
+ */

Have an empty line between the first two lines to ensure consistency
with nearby comments. Also, the formatting of these lines appears
awkward, either run pgindent or make sure lines are not too short.

4. Do we really need to make any changes in PREPARE
transaction-related functions if can't skip in that case? I think you
can have a check if the output plugin private variable is not set then
ignore special optimization for sending begin.

--
With Regards,
Amit Kapila.

#70Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: osumi.takamichi@fujitsu.com (#68)
Re: logical replication empty transactions

On Wed, Feb 16, 2022 at 8:45 AM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:

[ideas to skip empty prepare/commit_prepare ....]

I feel if we don't want to change the protocol of commit_prepared,
we need to make the publisher solely judge whether the prepare was empty or not,
after the restart.

One idea I thought at the beginning was to utilize and apply
the existing mechanism to spill ReorderBufferSerializeTXN object to local disk,
by postponing the prepare txn object cleanup and when the walsender exits
and commit prepared didn't come, spilling the transaction's data,
then restoring it after the restart in the DecodePrepare.
However, this idea wasn't crash-safe fundamentally. It means,
if the publisher crashes before spilling the empty prepare transaction,
we fail to detect the prepare was empty and come down to send the commit_prepared
in the situation where the subscriber didn't get the prepare data again.
So, I thought to utilize the spill mechanism didn't work for this purpose.

Another idea would be, to create an empty file under the the pg_replslot/slotname
with a prefix different from "xid" in the DecodePrepare before the shutdown
if the prepare was empty, and bypass the cleanup of the serialized txns
and check the existence after the restart. But, this is pretty ad-hoc and I wasn't sure
if to address the corner case of the restart has the strong enough justification
to create this new file format.

I think for this idea to work you need to create such an empty file
each time we skip empty prepare as the system might crash after
prepare and we won't get time to create such a file. I don't think it
is advisable to do I/O to save the network message.

Therefore, in my humble opinion, the idea of protocol change slightly wins,
since the impact of the protocol change would not be big. We introduced
the protocol version 3 in the devel version and the number of users should be little.

There is also the cost of the additional check (whether prepared xact
exists) at the time of processing each commit prepared message. I
think if we want to go in this direction then it is better to do it
via a subscription parameter (say skip_empty_prepare_xact or something
like that) so that we can pay the additional cost of such a check
conditionally when such a parameter is set by the user. I feel for now
we can document in comments why we can't skip empty prepared
transactions and maybe as an idea(s) worth exploring to implement the
same. OTOH, if multiple agree on such a solution we can even try to
implement it and see if that works.

--
With Regards,
Amit Kapila.

#71Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: osumi.takamichi@fujitsu.com (#68)
Re: logical replication empty transactions

On Wed, Feb 16, 2022 at 2:15 PM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:

Another idea would be, to create an empty file under the the pg_replslot/slotname
with a prefix different from "xid" in the DecodePrepare before the shutdown
if the prepare was empty, and bypass the cleanup of the serialized txns
and check the existence after the restart. But, this is pretty ad-hoc and I wasn't sure
if to address the corner case of the restart has the strong enough justification
to create this new file format.

Yes, this doesn't look very efficient.

Therefore, in my humble opinion, the idea of protocol change slightly wins,
since the impact of the protocol change would not be big. We introduced
the protocol version 3 in the devel version and the number of users should be little.

Yes, but we don't want to break backward compatibility for this small
added optimization.

Amit,

I will work on your comments.

regards,
Ajin Cherian
Fujitsu Australia

#72Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Amit Kapila (#69)
Re: logical replication empty transactions

On Thu, Feb 17, 2022 at 4:12 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Jan 31, 2022 at 6:18 PM Ajin Cherian <itsajin@gmail.com> wrote:

Few comments:
=============

One more comment:
@@ -1546,10 +1557,11 @@ WalSndWaitForWal(XLogRecPtr loc)
  * otherwise idle, this keepalive will trigger a reply. Processing the
  * reply will update these MyWalSnd locations.
  */
- if (MyWalSnd->flush < sentPtr &&
+ if (force_keepalive_syncrep ||
+ (MyWalSnd->flush < sentPtr &&
  MyWalSnd->write < sentPtr &&
- !waiting_for_ping_response)
- WalSndKeepalive(false);
+ !waiting_for_ping_response))
+ WalSndKeepalive(false);

Will this allow syncrep to proceed in case we are skipping the
transaction? Won't we need to send a feedback message with
'requestReply' true in this case as we release syncrep waiters while
processing standby message, see
ProcessStandbyReplyMessage->SyncRepReleaseWaiters. Without
'requestReply', the subscriber might not send any message and the
syncrep won't proceed. Why do you decide to delay sending this message
till WalSndWaitForWal()? It may not be called for each transaction.

I feel we should try to device a test case to test this sync
replication mechanism such that without this particular change the
sync rep transaction waits momentarily but with this change it doesn't
wait. I am not entirely sure whether we can devise an automated test
as this is timing related issue but I guess we can at least manually
try to produce a case.

--
With Regards,
Amit Kapila.

#73Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: osumi.takamichi@fujitsu.com (#67)
Re: logical replication empty transactions

On Tue, Feb 8, 2022 at 5:27 AM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:

On Friday, August 13, 2021 8:01 PM Ajin Cherian <itsajin@gmail.com> wrote:

On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:

Changing the timing to send the keepalive to the decoding commit
timing didn't look impossible to me, although my suggestion
can be ad-hoc.

After the initialization of sentPtr(by confirmed_flush lsn),
sentPtr is updated from logical_decoding_ctx->reader->EndRecPtr in XLogSendLogical.
In the XLogSendLogical, we update it after we execute LogicalDecodingProcessRecord.
This order leads to the current implementation to wait the next iteration
to send a keepalive in WalSndWaitForWal.

But, I felt we can utilize end_lsn passed to ReorderBufferCommit for updating
sentPtr. The end_lsn is the lsn same as the ctx->reader->EndRecPtr,
which means advancing the timing to update the sentPtr for the commit case.
Then if the transaction is empty in synchronous mode,
send the keepalive in WalSndUpdateProgress directly,
instead of having the force_keepalive_syncrep flag and having it true.

You have a point in that we don't need to delay sending this message
till next WalSndWaitForWal() but I don't see why we need to change
anything about update of sentPtr.

--
With Regards,
Amit Kapila.

#74osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
In reply to: Amit Kapila (#73)
RE: logical replication empty transactions

On Friday, February 18, 2022 6:18 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Feb 8, 2022 at 5:27 AM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:

On Friday, August 13, 2021 8:01 PM Ajin Cherian <itsajin@gmail.com> wrote:

On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:

Changing the timing to send the keepalive to the decoding commit
timing didn't look impossible to me, although my suggestion can be
ad-hoc.

After the initialization of sentPtr(by confirmed_flush lsn), sentPtr
is updated from logical_decoding_ctx->reader->EndRecPtr in

XLogSendLogical.

In the XLogSendLogical, we update it after we execute

LogicalDecodingProcessRecord.

This order leads to the current implementation to wait the next
iteration to send a keepalive in WalSndWaitForWal.

But, I felt we can utilize end_lsn passed to ReorderBufferCommit for
updating sentPtr. The end_lsn is the lsn same as the
ctx->reader->EndRecPtr, which means advancing the timing to update the

sentPtr for the commit case.

Then if the transaction is empty in synchronous mode, send the
keepalive in WalSndUpdateProgress directly, instead of having the
force_keepalive_syncrep flag and having it true.

You have a point in that we don't need to delay sending this message till next
WalSndWaitForWal() but I don't see why we need to change anything about
update of sentPtr.

Yeah, you're right.
Now I think we don't need the update of sentPtr to send a keepalive.

I thought we can send a keepalive message
after its update in XLogSendLogical or any appropriate place for it after the existing update.

Best Regards,
Takamichi Osumi

#75Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: osumi.takamichi@fujitsu.com (#74)
Re: logical replication empty transactions

On Fri, Feb 18, 2022 at 3:06 PM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:

On Friday, February 18, 2022 6:18 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Feb 8, 2022 at 5:27 AM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:

On Friday, August 13, 2021 8:01 PM Ajin Cherian <itsajin@gmail.com> wrote:

On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:

Changing the timing to send the keepalive to the decoding commit
timing didn't look impossible to me, although my suggestion can be
ad-hoc.

After the initialization of sentPtr(by confirmed_flush lsn), sentPtr
is updated from logical_decoding_ctx->reader->EndRecPtr in

XLogSendLogical.

In the XLogSendLogical, we update it after we execute

LogicalDecodingProcessRecord.

This order leads to the current implementation to wait the next
iteration to send a keepalive in WalSndWaitForWal.

But, I felt we can utilize end_lsn passed to ReorderBufferCommit for
updating sentPtr. The end_lsn is the lsn same as the
ctx->reader->EndRecPtr, which means advancing the timing to update the

sentPtr for the commit case.

Then if the transaction is empty in synchronous mode, send the
keepalive in WalSndUpdateProgress directly, instead of having the
force_keepalive_syncrep flag and having it true.

You have a point in that we don't need to delay sending this message till next
WalSndWaitForWal() but I don't see why we need to change anything about
update of sentPtr.

Yeah, you're right.
Now I think we don't need the update of sentPtr to send a keepalive.

I thought we can send a keepalive message
after its update in XLogSendLogical or any appropriate place for it after the existing update.

Yeah, I think there could be multiple ways (a) We can send such a keep
alive in WalSndUpdateProgress() itself by using ctx->write_location.
For this, we need to modify WalSndKeepalive() to take sentPtr as
input. (b) set some flag in WalSndUpdateProgress() and then do it
somewhere in WalSndLoop probably in WalSndKeepaliveIfNecessary, or
maybe there is another better way.

--
With Regards,
Amit Kapila.

#76Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: Ajin Cherian (#66)
Re: logical replication empty transactions

FYI - the latest v18 patch no longer applies due to a recent push [1]https://github.com/postgres/postgres/commit/52e4f0cd472d39d07732b99559989ea3b615be78.

------
[1]: https://github.com/postgres/postgres/commit/52e4f0cd472d39d07732b99559989ea3b615be78

Kind Regards,
Peter Smith.
Fujitsu Australia

#77Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Amit Kapila (#69)
1 attachment(s)
Re: logical replication empty transactions

On Thu, Feb 17, 2022 at 9:42 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Jan 31, 2022 at 6:18 PM Ajin Cherian <itsajin@gmail.com> wrote:

Few comments:
=============
1. Is there any particular why the patch is not skipping empty xacts
for streaming (in-progress) transactions as noted in the commit
message as well?

I have added support for skipping streaming transaction.

2.
+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ Assert(txndata);

I think here you can add an assert for sent_begin_txn to be always false?

Added.

3.
+/*
+ * Send BEGIN.
+ * This is where the BEGIN is actually sent. This is called
+ * while processing the first change of the transaction.
+ */

Have an empty line between the first two lines to ensure consistency
with nearby comments. Also, the formatting of these lines appears
awkward, either run pgindent or make sure lines are not too short.

Changed.

4. Do we really need to make any changes in PREPARE
transaction-related functions if can't skip in that case? I think you
can have a check if the output plugin private variable is not set then
ignore special optimization for sending begin.

I have modified this as well.

I have also rebased the patch after it did not apply due to a new commit.

I will next work on testing and improving the keepalive logic while
skipping transactions.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v19-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v19-0001-Skip-empty-transactions-for-logical-replication.patch
#78wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Ajin Cherian (#77)
RE: logical replication empty transactions

On Feb, Wed 23, 2022 at 10:58 PM Ajin Cherian <itsajin@gmail.com> wrote:

Few comments to V19-0001:

1. I think we should adjust the alignment format.
git am ../v19-0001-Skip-empty-transactions-for-logical-replication.patch
.git/rebase-apply/patch:197: indent with spaces.
* Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
.git/rebase-apply/patch:198: indent with spaces.
* is sent. If not, send now.
.git/rebase-apply/patch:199: indent with spaces.
*/
.git/rebase-apply/patch:201: indent with spaces.
pgoutput_send_stream_start(ctx, toptxn);
.git/rebase-apply/patch:204: indent with spaces.
pgoutput_begin(ctx, toptxn);
warning: 5 lines add whitespace errors.

2. Structure member initialization.
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+	PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
+														 sizeof(PGOutputTxnData));
+
+	txndata->sent_begin_txn = false;
+	txn->output_plugin_private = txndata;
+}
Do we need to set sent_stream_start and sent_any_stream to false here?

3. Maybe we should add Assert(txndata) like function pgoutput_commit_txn in
other functions.

4. In addition, I think we should keep a unified style.
a). log style (maybe first one is better.)
First style : "Skipping replication of an empty transaction in XXX"
Second style : "skipping replication of an empty transaction"
b) flag name (maybe second one is better.)
First style : variable "sent_begin_txn" in function pgoutput_stream_*.
Second style : variable "skip" in function pgoutput_commit_txn.

Regards,
Wang wei

#79Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: Ajin Cherian (#77)
Re: logical replication empty transactions

Hi. Here are my review comments for the v19 patch.

======

1. Commit message

The current logical replication behavior is to send every transaction to
subscriber even though the transaction is empty (because it does not
contain changes from the selected publications).

SUGGESTION
"to subscriber even though" --> "to the subscriber even if"

~~~

2. Commit message

This patch addresses the above problem by postponing the BEGIN message
until the first change. While processing a COMMIT message,
if there is no other change for that transaction,
do not send COMMIT message. It means that pgoutput will
skip BEGIN/COMMIT messages for transactions that are empty.

SUGGESTION
"if there is" --> "if there was"
"do not send COMMIT message" --> "do not send the COMMIT message"
"It means that pgoutput" --> "This means that pgoutput"

~~~

3. Commit message

Shouldn't there be some similar description about using a lazy send
mechanism for STREAM START?

~~~

4. src/backend/replication/pgoutput/pgoutput.c - typedef struct PGOutputTxnData

+/*
+ * Maintain a per-transaction level variable to track whether the
+ * transaction has sent BEGIN. BEGIN is only sent when the first
+ * change in a transaction is processed. This makes it possible
+ * to skip transactions that are empty.
+ */
+typedef struct PGOutputTxnData
+{
+   bool sent_begin_txn;    /* flag indicating whether BEGIN has been sent */
+   bool sent_stream_start; /* flag indicating if stream start has been sent */
+   bool sent_any_stream;   /* flag indicating if any stream has been sent */
+} PGOutputTxnData;
+

The struct comment looks stale because it doesn't mention anything
about the similar lazy send mechanism for STREAM_START.

~~~

5. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn

 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+ PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));
+
+ txndata->sent_begin_txn = false;
+ txn->output_plugin_private = txndata;
+}

You don’t need to assign the other members 'sent_stream_start',
'sent_any_stream' because you are doing MemoryContextAllocZero anyway,
but for the same reason you did not really need to assign the
'sent_begin_txn' flag either.

I guess for consistency maybe it is better to (a) set all of them or
(b) set none of them. I prefer (b).

~~~

6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin

I feel the 'pgoutput_begin' function is not well named. It makes some
of the code where they are called look quite confusing.

For streaming there is:
1. pgoutput_stream_start (does not send)
2. pgoutput_send_stream_start (does send)
so it is very clear.

OTOH there are
3. pgoutput_begin_txn (does not send)
4. pgoutput_begin (does send)

For consistency I think the 'pgoutput_begin' name should be changed to
include "send" verb
1. pgoutput_begin_txn (does not send)
2. pgoutput_send_begin_txn (does send)

~~~

7. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema

@@ -594,6 +663,20 @@ maybe_send_schema(LogicalDecodingContext *ctx,
if (schema_sent)
return;

+   /* set up txndata */
+   txndata = toptxn->output_plugin_private;
+
+   /*
+    * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
+    * is sent. If not, send now.
+    */
+   if (in_streaming && !txndata->sent_stream_start)
+       pgoutput_send_stream_start(ctx, toptxn);
+   else if (txndata && !txndata->sent_begin_txn)
+   {
+       pgoutput_begin(ctx, toptxn);
+   }
+

How come the in_streaming case is not checking for a NULL txndata
before referencing it? Even if it is OK to do that, some more comments
or assertions might help for this piece of code.
(Stop-Press: see later comments #9, #10)

~~~

8. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema

@@ -594,6 +663,20 @@ maybe_send_schema(LogicalDecodingContext *ctx,
if (schema_sent)
return;

+   /* set up txndata */
+   txndata = toptxn->output_plugin_private;
+
+   /*
+    * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
+    * is sent. If not, send now.
+    */

What part of this code is doing anything about "BEGIN PREPARE" ?

~~~

9. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change

@@ -1183,6 +1267,15 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Assert(false);
}

+ /* If streaming, send STREAM START if we haven't yet */
+ if (in_streaming && (txndata && !txndata->sent_stream_start))
+ pgoutput_send_stream_start(ctx, txn);
+ /*
+ * Output BEGIN if we haven't yet, unless streaming.
+ */
+ else if (!in_streaming && (txndata && !txndata->sent_begin_txn))
+ pgoutput_begin(ctx, txn);
+

The above code fragment looks more like what IU was expecting should
be in 'maybe_send_schema',

If you expand it out (and tweak the comments) it can become much less
complex looking IMO

e.g.

if (in_streaming)
{
/* If streaming, send STREAM START if we haven't yet */
if (txndata && !txndata->sent_stream_start)
pgoutput_send_stream_start(ctx, txn);
}
else
{
/* If not streaming, send BEGIN if we haven't yet */
if (txndata && !txndata->sent_begin_txn)
pgoutput_begin(ctx, txn);
}

Also, IIUC for the 'in_streaming' case you can Assert(txndata); so
then the code can be made even simpler.

~~~

10. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate

@ -1397,6 +1491,17 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,

  if (nrelids > 0)
  {
+ txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* If streaming, send STREAM START if we haven't yet */
+ if (in_streaming && (txndata && !txndata->sent_stream_start))
+ pgoutput_send_stream_start(ctx, txn);
+ /*
+ * output BEGIN if we haven't yet, unless streaming.
+ */
+ else if (!in_streaming && (txndata && !txndata->sent_begin_txn))
+ pgoutput_begin(ctx, txn);

So now I have seen almost identical code repeated in 3 places so I am
beginning to think these should just be encapsulated in some common
function to call to do the deferred "send". Thoughts?

~~~

11. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message

@@ -1429,6 +1534,24 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
if (in_streaming)
xid = txn->xid;

+ /*
+ * Output BEGIN if we haven't yet.
+ * Avoid for streaming and non-transactional messages.
+ */
+ if (in_streaming || transactional)
+ {
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* If streaming, send STREAM START if we haven't yet */
+ if (in_streaming && (txndata && !txndata->sent_stream_start))
+ pgoutput_send_stream_start(ctx, txn);
+ else if (transactional)
+ {
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_begin(ctx, txn);
+ }
+ }

Does that comment at the top of that code fragment accurately match
this code? It seemed a bit muddled/stale to me.

~~~

12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_start

  /*
+ * Don't actually send stream start here, instead set a flag that indicates
+ * that stream start hasn't been sent and wait for the first actual change
+ * for this stream to be sent and then send stream start. This is done
+ * to avoid sending empty streams without any changes.
+ */
+ if (txndata == NULL)
+ {
+ txndata =
+ MemoryContextAllocZero(ctx->context, sizeof(PGOutputTxnData));
+ txndata->sent_begin_txn = false;
+ txndata->sent_any_stream = false;
+ txn->output_plugin_private = txndata;
+ }

IMO there is no need to set the members – just let the
MemoryContextAllocZero take care of all that. Then the code is simpler
and it also saves wondering if anything was accidentally missed.

~~~

13. src/backend/replication/pgoutput/pgoutput.c - pgoutput_send_stream_start

+pgoutput_send_stream_start(struct LogicalDecodingContext *ctx,
+   ReorderBufferTXN *txn)
+{
+ bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+
+ /*
  * If we already sent the first stream for this transaction then don't
  * send the origin id in the subsequent streams.
  */
- if (rbtxn_is_streamed(txn))
+ if (txndata->sent_any_stream)
  send_replication_origin = false;

Given this usage, I wonder if there is a better name for the txndata
member - e.g. 'sent_first_stream' ?

~~~

14. src/backend/replication/pgoutput/pgoutput.c - pgoutput_send_stream_start

- /* we're streaming a chunk of transaction now */
- in_streaming = true;
+ /*
+ * Set the flags that indicate that changes were sent as part of
+ * the transaction and the stream.
+ */
+ txndata->sent_begin_txn = txndata->sent_stream_start = true;
+ txndata->sent_any_stream = true;

Why is this setting member 'sent_begin_txn' true also? It seems odd to
say so because the BEGIN was not actually sent at all, right?

~~~

15. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_abort

@@ -1572,6 +1740,20 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,

  /* determine the toplevel transaction */
  toptxn = (txn->toptxn) ? txn->toptxn : txn;
+ txndata = toptxn->output_plugin_private;
+ sent_begin_txn = txndata->sent_begin_txn;
+
+ if (txn->toptxn == NULL)
+ {
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
+ }
+
+ if (!sent_begin_txn)
+ {
+ elog(DEBUG1, "Skipping replication of an empty transaction in stream abort");
+ return;
+ }

I didn't really understand why this code is checking the
'sent_begin_txn' member instead of the 'sent_stream_start' member?

~~~

16. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_commit

@@ -1598,7 +1782,17 @@ pgoutput_stream_commit(struct
LogicalDecodingContext *ctx,
Assert(!in_streaming);
Assert(rbtxn_is_streamed(txn));

- OutputPluginUpdateProgress(ctx);
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
+
+ /* If no changes were part of this transaction then drop the commit */
+ if (!sent_begin_txn)
+ {
+ elog(DEBUG1, "Skipping replication of an empty transaction in stream commit");
+ return;
+ }

(Same as previous comment #15). I didn't really understand why this
code is checking the 'sent_begin_txn' member instead of the
'sent_stream_start' member?

~~~

17. src/backend/replication/syncrep.c - SyncRepEnabled

@@ -539,6 +538,15 @@ SyncRepReleaseWaiters(void)
}

 /*
+ * Check if synchronous replication is enabled
+ */
+bool
+SyncRepEnabled(void)
+{
+ return SyncRepRequested() && ((volatile WalSndCtlData *)
WalSndCtl)->sync_standbys_defined;
+}

That code was once inline in 'SyncRepWaitForLSN' before it was turned
into a function, and there is a long comment in SyncRepWaitForLSN
describing the risks of this logic. e.g.

<quote>
... If it's true, we need to check it again
* later while holding the lock, to check the flag and operate the sync
* rep queue atomically. This is necessary to avoid the race condition
* described in SyncRepUpdateSyncStandbysDefined().
</quote>

This same function is now called from walsender.c. I think maybe it is
OK but please confirm it.

Anyway, the point is maybe this SyncRepEnabled function should be
better commented to make some reference about the race concerns of the
original comment. Otherwise some future caller of this function may be
unaware of it and come to grief.

-------
Kind Regards,
Peter Smith.
Fujitsu Australia

#80Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Amit Kapila (#75)
1 attachment(s)
Re: logical replication empty transactions

On Fri, Feb 18, 2022 at 9:27 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

Yeah, I think there could be multiple ways (a) We can send such a keep
alive in WalSndUpdateProgress() itself by using ctx->write_location.
For this, we need to modify WalSndKeepalive() to take sentPtr as
input. (b) set some flag in WalSndUpdateProgress() and then do it
somewhere in WalSndLoop probably in WalSndKeepaliveIfNecessary, or
maybe there is another better way.

Thanks for the suggestion Amit and Osumi-san, I experimented with both
the suggestions but finally decided to use
(a)Modifying WalSndKeepalive() to take an LSN optionally as input and
passed in the ctx->write_location.

I also verified that if I block the WalSndKeepalive() in
WalSndWaitForWal, then my new code sends the keepalive
when skipping transactions and the syncrep gets back feedback..

I will address comments from Peter and Wang in my next patch update.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v20-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v20-0001-Skip-empty-transactions-for-logical-replication.patch
#81Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Peter Smith (#79)
1 attachment(s)
Re: logical replication empty transactions

On Fri, Feb 25, 2022 at 9:17 PM Peter Smith <smithpb2250@gmail.com> wrote:

Hi. Here are my review comments for the v19 patch.

======

1. Commit message

The current logical replication behavior is to send every transaction to
subscriber even though the transaction is empty (because it does not
contain changes from the selected publications).

SUGGESTION
"to subscriber even though" --> "to the subscriber even if"

Fixed.

~~~

2. Commit message

This patch addresses the above problem by postponing the BEGIN message
until the first change. While processing a COMMIT message,
if there is no other change for that transaction,
do not send COMMIT message. It means that pgoutput will
skip BEGIN/COMMIT messages for transactions that are empty.

SUGGESTION
"if there is" --> "if there was"
"do not send COMMIT message" --> "do not send the COMMIT message"
"It means that pgoutput" --> "This means that pgoutput"

~~~

Fixed.

3. Commit message

Shouldn't there be some similar description about using a lazy send
mechanism for STREAM START?

~~~

Added.

4. src/backend/replication/pgoutput/pgoutput.c - typedef struct PGOutputTxnData

+/*
+ * Maintain a per-transaction level variable to track whether the
+ * transaction has sent BEGIN. BEGIN is only sent when the first
+ * change in a transaction is processed. This makes it possible
+ * to skip transactions that are empty.
+ */
+typedef struct PGOutputTxnData
+{
+   bool sent_begin_txn;    /* flag indicating whether BEGIN has been sent */
+   bool sent_stream_start; /* flag indicating if stream start has been sent */
+   bool sent_any_stream;   /* flag indicating if any stream has been sent */
+} PGOutputTxnData;
+

The struct comment looks stale because it doesn't mention anything
about the similar lazy send mechanism for STREAM_START.

~~~

Added.

5. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn

static void
pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
+ PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));
+
+ txndata->sent_begin_txn = false;
+ txn->output_plugin_private = txndata;
+}

You don’t need to assign the other members 'sent_stream_start',
'sent_any_stream' because you are doing MemoryContextAllocZero anyway,
but for the same reason you did not really need to assign the
'sent_begin_txn' flag either.

I guess for consistency maybe it is better to (a) set all of them or
(b) set none of them. I prefer (b).

~~~

Did (b)

6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin

I feel the 'pgoutput_begin' function is not well named. It makes some
of the code where they are called look quite confusing.

For streaming there is:
1. pgoutput_stream_start (does not send)
2. pgoutput_send_stream_start (does send)
so it is very clear.

OTOH there are
3. pgoutput_begin_txn (does not send)
4. pgoutput_begin (does send)

For consistency I think the 'pgoutput_begin' name should be changed to
include "send" verb
1. pgoutput_begin_txn (does not send)
2. pgoutput_send_begin_txn (does send)

~~~

Changed as mentioned.

7. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema

@@ -594,6 +663,20 @@ maybe_send_schema(LogicalDecodingContext *ctx,
if (schema_sent)
return;

+   /* set up txndata */
+   txndata = toptxn->output_plugin_private;
+
+   /*
+    * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
+    * is sent. If not, send now.
+    */
+   if (in_streaming && !txndata->sent_stream_start)
+       pgoutput_send_stream_start(ctx, toptxn);
+   else if (txndata && !txndata->sent_begin_txn)
+   {
+       pgoutput_begin(ctx, toptxn);
+   }
+

How come the in_streaming case is not checking for a NULL txndata
before referencing it? Even if it is OK to do that, some more comments
or assertions might help for this piece of code.
(Stop-Press: see later comments #9, #10)

~~~

Updated.

8. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema

@@ -594,6 +663,20 @@ maybe_send_schema(LogicalDecodingContext *ctx,
if (schema_sent)
return;

+   /* set up txndata */
+   txndata = toptxn->output_plugin_private;
+
+   /*
+    * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
+    * is sent. If not, send now.
+    */

What part of this code is doing anything about "BEGIN PREPARE" ?

~~~

Removed that reference.

9. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change

@@ -1183,6 +1267,15 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Assert(false);
}

+ /* If streaming, send STREAM START if we haven't yet */
+ if (in_streaming && (txndata && !txndata->sent_stream_start))
+ pgoutput_send_stream_start(ctx, txn);
+ /*
+ * Output BEGIN if we haven't yet, unless streaming.
+ */
+ else if (!in_streaming && (txndata && !txndata->sent_begin_txn))
+ pgoutput_begin(ctx, txn);
+

The above code fragment looks more like what IU was expecting should
be in 'maybe_send_schema',

If you expand it out (and tweak the comments) it can become much less
complex looking IMO

e.g.

if (in_streaming)
{
/* If streaming, send STREAM START if we haven't yet */
if (txndata && !txndata->sent_stream_start)
pgoutput_send_stream_start(ctx, txn);
}
else
{
/* If not streaming, send BEGIN if we haven't yet */
if (txndata && !txndata->sent_begin_txn)
pgoutput_begin(ctx, txn);
}

Also, IIUC for the 'in_streaming' case you can Assert(txndata); so
then the code can be made even simpler.

Chose your example.

~~~

10. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate

@ -1397,6 +1491,17 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,

if (nrelids > 0)
{
+ txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* If streaming, send STREAM START if we haven't yet */
+ if (in_streaming && (txndata && !txndata->sent_stream_start))
+ pgoutput_send_stream_start(ctx, txn);
+ /*
+ * output BEGIN if we haven't yet, unless streaming.
+ */
+ else if (!in_streaming && (txndata && !txndata->sent_begin_txn))
+ pgoutput_begin(ctx, txn);

So now I have seen almost identical code repeated in 3 places so I am
beginning to think these should just be encapsulated in some common
function to call to do the deferred "send". Thoughts?

~~~

Not sure if we want to add a function call overhead.

11. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message

@@ -1429,6 +1534,24 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
if (in_streaming)
xid = txn->xid;

+ /*
+ * Output BEGIN if we haven't yet.
+ * Avoid for streaming and non-transactional messages.
+ */
+ if (in_streaming || transactional)
+ {
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* If streaming, send STREAM START if we haven't yet */
+ if (in_streaming && (txndata && !txndata->sent_stream_start))
+ pgoutput_send_stream_start(ctx, txn);
+ else if (transactional)
+ {
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_begin(ctx, txn);
+ }
+ }

Does that comment at the top of that code fragment accurately match
this code? It seemed a bit muddled/stale to me.

~~~

Fixed.

12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_start

/*
+ * Don't actually send stream start here, instead set a flag that indicates
+ * that stream start hasn't been sent and wait for the first actual change
+ * for this stream to be sent and then send stream start. This is done
+ * to avoid sending empty streams without any changes.
+ */
+ if (txndata == NULL)
+ {
+ txndata =
+ MemoryContextAllocZero(ctx->context, sizeof(PGOutputTxnData));
+ txndata->sent_begin_txn = false;
+ txndata->sent_any_stream = false;
+ txn->output_plugin_private = txndata;
+ }

IMO there is no need to set the members – just let the
MemoryContextAllocZero take care of all that. Then the code is simpler
and it also saves wondering if anything was accidentally missed.

Fixed.

~~~

13. src/backend/replication/pgoutput/pgoutput.c - pgoutput_send_stream_start

+pgoutput_send_stream_start(struct LogicalDecodingContext *ctx,
+   ReorderBufferTXN *txn)
+{
+ bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+
+ /*
* If we already sent the first stream for this transaction then don't
* send the origin id in the subsequent streams.
*/
- if (rbtxn_is_streamed(txn))
+ if (txndata->sent_any_stream)
send_replication_origin = false;

Given this usage, I wonder if there is a better name for the txndata
member - e.g. 'sent_first_stream' ?

~~~

Changed.

14. src/backend/replication/pgoutput/pgoutput.c - pgoutput_send_stream_start

- /* we're streaming a chunk of transaction now */
- in_streaming = true;
+ /*
+ * Set the flags that indicate that changes were sent as part of
+ * the transaction and the stream.
+ */
+ txndata->sent_begin_txn = txndata->sent_stream_start = true;
+ txndata->sent_any_stream = true;

Why is this setting member 'sent_begin_txn' true also? It seems odd to
say so because the BEGIN was not actually sent at all, right?

~~~

You can have transactions that are partially streamed and partially
not. So if there
is a transaction that started as streaming, but when it is committed,
it is replicated
as part of the commit, then when the changes are decoded, we shouldn't
be sending a "begin"
again.

15. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_abort

@@ -1572,6 +1740,20 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,

/* determine the toplevel transaction */
toptxn = (txn->toptxn) ? txn->toptxn : txn;
+ txndata = toptxn->output_plugin_private;
+ sent_begin_txn = txndata->sent_begin_txn;
+
+ if (txn->toptxn == NULL)
+ {
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
+ }
+
+ if (!sent_begin_txn)
+ {
+ elog(DEBUG1, "Skipping replication of an empty transaction in stream abort");
+ return;
+ }

I didn't really understand why this code is checking the
'sent_begin_txn' member instead of the 'sent_stream_start' member?

Yes, changed this to check "sent_first_stream"

~~~

16. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_commit

@@ -1598,7 +1782,17 @@ pgoutput_stream_commit(struct
LogicalDecodingContext *ctx,
Assert(!in_streaming);
Assert(rbtxn_is_streamed(txn));

- OutputPluginUpdateProgress(ctx);
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
+
+ /* If no changes were part of this transaction then drop the commit */
+ if (!sent_begin_txn)
+ {
+ elog(DEBUG1, "Skipping replication of an empty transaction in stream commit");
+ return;
+ }

(Same as previous comment #15). I didn't really understand why this
code is checking the 'sent_begin_txn' member instead of the
'sent_stream_start' member?

~~~

Changed.

17. src/backend/replication/syncrep.c - SyncRepEnabled

@@ -539,6 +538,15 @@ SyncRepReleaseWaiters(void)
}

/*
+ * Check if synchronous replication is enabled
+ */
+bool
+SyncRepEnabled(void)
+{
+ return SyncRepRequested() && ((volatile WalSndCtlData *)
WalSndCtl)->sync_standbys_defined;
+}

That code was once inline in 'SyncRepWaitForLSN' before it was turned
into a function, and there is a long comment in SyncRepWaitForLSN
describing the risks of this logic. e.g.

<quote>
... If it's true, we need to check it again
* later while holding the lock, to check the flag and operate the sync
* rep queue atomically. This is necessary to avoid the race condition
* described in SyncRepUpdateSyncStandbysDefined().
</quote>

This same function is now called from walsender.c. I think maybe it is
OK but please confirm it.

Anyway, the point is maybe this SyncRepEnabled function should be
better commented to make some reference about the race concerns of the
original comment. Otherwise some future caller of this function may be
unaware of it and come to grief.

Leaving this for now, not sure what wording is appropriate to use here.

On Wed, Feb 23, 2022 at 5:24 PM wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com> wrote:

On Feb, Wed 23, 2022 at 10:58 PM Ajin Cherian <itsajin@gmail.com> wrote:

Few comments to V19-0001:

1. I think we should adjust the alignment format.
git am ../v19-0001-Skip-empty-transactions-for-logical-replication.patch
.git/rebase-apply/patch:197: indent with spaces.
* Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
.git/rebase-apply/patch:198: indent with spaces.
* is sent. If not, send now.
.git/rebase-apply/patch:199: indent with spaces.
*/
.git/rebase-apply/patch:201: indent with spaces.
pgoutput_send_stream_start(ctx, toptxn);
.git/rebase-apply/patch:204: indent with spaces.
pgoutput_begin(ctx, toptxn);
warning: 5 lines add whitespace errors.

Fixed.

2. Structure member initialization.
static void
pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
+       PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
+                                                                                                                sizeof(PGOutputTxnData));
+
+       txndata->sent_begin_txn = false;
+       txn->output_plugin_private = txndata;
+}
Do we need to set sent_stream_start and sent_any_stream to false here?

Fixed

3. Maybe we should add Assert(txndata) like function pgoutput_commit_txn in
other functions.

4. In addition, I think we should keep a unified style.
a). log style (maybe first one is better.)
First style : "Skipping replication of an empty transaction in XXX"
Second style : "skipping replication of an empty transaction"
b) flag name (maybe second one is better.)
First style : variable "sent_begin_txn" in function pgoutput_stream_*.
Second style : variable "skip" in function pgoutput_commit_txn.

Fixed,

Regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v21-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v21-0001-Skip-empty-transactions-for-logical-replication.patch
#82shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
In reply to: Ajin Cherian (#81)
RE: logical replication empty transactions

Hi,

Here are some comments on the v21 patch.

1.
+ WalSndKeepalive(false, 0);

Maybe we can use InvalidXLogRecPtr here, instead of 0.

2.
+ pq_sendint64(&output_message, writePtr ? writePtr : sentPtr);

Similarly, should we use XLogRecPtrIsInvalid()?

3.
@@ -1183,6 +1269,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Assert(false);
}

+   if (in_streaming)
+	{
+		/* If streaming, send STREAM START if we haven't yet */
+		if (txndata && !txndata->sent_stream_start)
+		pgoutput_send_stream_start(ctx, txn);
+	}
+	else
+	{
+		/* If not streaming, send BEGIN if we haven't yet */
+		if (txndata && !txndata->sent_begin_txn)
+		pgoutput_send_begin(ctx, txn);
+	}
+
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);

I am not sure if it is suitable to send begin or stream_start here, because the
row filter is not checked yet. That means, empty transactions caused by row
filter are not skipped.

4.
@@ -1617,9 +1829,21 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 							ReorderBufferTXN *txn,
 							XLogRecPtr prepare_lsn)
 {
+	PGOutputTxnData *txndata = txn->output_plugin_private;
+	bool			sent_begin_txn = txndata->sent_begin_txn;
+
 	Assert(rbtxn_is_streamed(txn));
-	OutputPluginUpdateProgress(ctx);
+	pfree(txndata);
+	txn->output_plugin_private = NULL;
+
+	if (!sent_begin_txn)
+	{
+		elog(DEBUG1, "Skipping replication of an empty transaction in stream prepare");
+		return;
+	}
+
+	OutputPluginUpdateProgress(ctx, false);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);

I notice that the patch skips stream prepared transaction, this would cause an
error on subscriber side when committing this transaction on publisher side, so
I think we'd better not do that.

For example:
(set logical_decoding_work_mem = 64kB, max_prepared_transactions = 10 in
postgresql.conf)

-- publisher
create table test (a int, b text, primary key(a));
create table test2 (a int, b text, primary key(a));
create publication pub for table test;

-- subscriber
create table test (a int, b text, primary key(a));
create table test2 (a int, b text, primary key(a));
create subscription sub connection 'dbname=postgres port=5432' publication pub with(two_phase=on, streaming=on);

-- publisher
begin;
INSERT INTO test2 SELECT i, md5(i::text) FROM generate_series(1, 1000) s(i);
prepare transaction 't';
commit prepared 't';

The error message in subscriber log:
ERROR: prepared transaction with identifier "pg_gid_16391_722" does not exist

Regards,
Shi yu

#83Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: shiy.fnst@fujitsu.com (#82)
Re: logical replication empty transactions

On Wed, Mar 2, 2022 at 1:01 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

4.
@@ -1617,9 +1829,21 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
+       PGOutputTxnData *txndata = txn->output_plugin_private;
+       bool                    sent_begin_txn = txndata->sent_begin_txn;
+
Assert(rbtxn_is_streamed(txn));
-       OutputPluginUpdateProgress(ctx);
+       pfree(txndata);
+       txn->output_plugin_private = NULL;
+
+       if (!sent_begin_txn)
+       {
+               elog(DEBUG1, "Skipping replication of an empty transaction in stream prepare");
+               return;
+       }
+
+       OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);

I notice that the patch skips stream prepared transaction, this would cause an
error on subscriber side when committing this transaction on publisher side, so
I think we'd better not do that.

For example:
(set logical_decoding_work_mem = 64kB, max_prepared_transactions = 10 in
postgresql.conf)

-- publisher
create table test (a int, b text, primary key(a));
create table test2 (a int, b text, primary key(a));
create publication pub for table test;

-- subscriber
create table test (a int, b text, primary key(a));
create table test2 (a int, b text, primary key(a));
create subscription sub connection 'dbname=postgres port=5432' publication pub with(two_phase=on, streaming=on);

-- publisher
begin;
INSERT INTO test2 SELECT i, md5(i::text) FROM generate_series(1, 1000) s(i);
prepare transaction 't';
commit prepared 't';

The error message in subscriber log:
ERROR: prepared transaction with identifier "pg_gid_16391_722" does not exist

Thanks for the test. I guess this mixed streaming+two-phase runs into
the same problem that
was there while skipping two-phased transactions. If the eventual
commit prepared comes after a restart,
then there is no way of knowing if the original transaction was
skipped or not and we can't know if the commit prepared
needs to be sent. I tried not skipping the "stream prepare", but that
causes a crash in the apply worker
as it tries to find the non-existent streamed file. We could add logic
to silently ignore a spurious "stream prepare"
but that might not be ideal. Any thoughts on how to address this? Or
else, we will need to avoid skipping streamed
transactions as well.

regards,
Ajin Cherian
Fujitsu Australia

#84Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: shiy.fnst@fujitsu.com (#82)
1 attachment(s)
Re: logical replication empty transactions

On Wed, Mar 2, 2022 at 1:01 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Hi,

Here are some comments on the v21 patch.

1.
+ WalSndKeepalive(false, 0);

Maybe we can use InvalidXLogRecPtr here, instead of 0.

Fixed.

2.
+ pq_sendint64(&output_message, writePtr ? writePtr : sentPtr);

Similarly, should we use XLogRecPtrIsInvalid()?

Fixed

3.
@@ -1183,6 +1269,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Assert(false);
}

+   if (in_streaming)
+       {
+               /* If streaming, send STREAM START if we haven't yet */
+               if (txndata && !txndata->sent_stream_start)
+               pgoutput_send_stream_start(ctx, txn);
+       }
+       else
+       {
+               /* If not streaming, send BEGIN if we haven't yet */
+               if (txndata && !txndata->sent_begin_txn)
+               pgoutput_send_begin(ctx, txn);
+       }
+
+
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);

I am not sure if it is suitable to send begin or stream_start here, because the
row filter is not checked yet. That means, empty transactions caused by row
filter are not skipped.

Moved the check down, so that row_filters are taken into account.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v22-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v22-0001-Skip-empty-transactions-for-logical-replication.patch
#85Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Ajin Cherian (#84)
2 attachment(s)
Re: logical replication empty transactions

I have split the patch into two. I have kept the logic of skipping
streaming changes in the second patch.
I will work on the second patch once we can figure out a solution for
the COMMIT PREPARED after restart problem.

regards,
Ajin Cherian

Attachments:

v23-0002-Skip-empty-streamed-transactions-for-logical-rep.patchapplication/octet-stream; name=v23-0002-Skip-empty-streamed-transactions-for-logical-rep.patch
v23-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v23-0001-Skip-empty-transactions-for-logical-replication.patch
#86Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: Ajin Cherian (#85)
Re: logical replication empty transactions

On Fri, Mar 4, 2022 at 12:41 PM Ajin Cherian <itsajin@gmail.com> wrote:

I have split the patch into two. I have kept the logic of skipping
streaming changes in the second patch.
I will work on the second patch once we can figure out a solution for
the COMMIT PREPARED after restart problem.

Please see below my review comments for the first patch only (v23-0001)

======

1. Patch failed to apply cleanly - whitespace warnings.

git apply ../patches_misc/v23-0001-Skip-empty-transactions-for-logical-replication.patch
../patches_misc/v23-0001-Skip-empty-transactions-for-logical-replication.patch:68:
trailing whitespace.
* change in a transaction is processed. This makes it possible
warning: 1 line adds whitespace errors.

~~~

2. src/backend/replication/pgoutput/pgoutput.c - typedef struct PGOutputTxnData

+/*
+ * Maintain a per-transaction level variable to track whether the
+ * transaction has sent BEGIN. BEGIN is only sent when the first
+ * change in a transaction is processed. This makes it possible
+ * to skip transactions that are empty.
+ */
+typedef struct PGOutputTxnData

I felt that this comment is describing details all about its bool
member but I think maybe it should be describing something also about
the structure itself (because this is the structure comment). E.g. it
should say about it only being allocated by the pgoutput_begin_txn()
and it is accessible via txn->output_plugin_private. Maybe also say
this has subtle implications if this is NULL then it means the tx
can't be 2PC etc...

~~~

3. src/backend/replication/pgoutput/pgoutput.c - pgoutput_send_begin

+/*
+ * Send BEGIN.
+ *
+ * This is where the BEGIN is actually sent. This is called while processing
+ * the first change of the transaction.
+ */
+static void
+pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)

IMO there is no need to repeat "This is where the BEGIN is actually
sent.", because "Send BEGIN." already said the same thing :-)

~~~

4. src/backend/replication/pgoutput/pgoutput.c - pgoutput_commit_txn

+ /*
+ * If a BEGIN message was not yet sent, then it means there were no relevant
+ * changes encountered, so we can skip the COMMIT message too.
+ */
+ sent_begin_txn = txndata->sent_begin_txn;
+ txn->output_plugin_private = NULL;
+ OutputPluginUpdateProgress(ctx, !sent_begin_txn);
+
+ pfree(txndata);

Not quite sure why this pfree is positioned where it is (after that
function call). I felt this should be a couple of lines up so txndata
is freed as soon as you had no more use for it (i.e. after you copied
the bool from it)

~~~

5. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema

@@ -594,6 +658,13 @@ maybe_send_schema(LogicalDecodingContext *ctx,
if (schema_sent)
return;

+   /* set up txndata */
+   txndata = toptxn->output_plugin_private;

The comment does quite feel right. Nothing is "setting up" anything.
Really, all this does is assign a reference to the tx private data.
Probably better with no comment at all?

~~~

6. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema

I observed that every call to the maybe_send_schema function also has
adjacent code that already/always is checking to call
pgoutput_send_begin_tx function.

So then I am wondering is the added logic to the maybe_send_schema
even needed at all? It looks a bit redundant. Thoughts?

~~~

7. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change

@@ -1141,6 +1212,7 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  Relation relation, ReorderBufferChange *change)
 {
  PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
  MemoryContext old;

Maybe if is worth deferring this assignment until after the row-filter
check. Otherwise, you are maybe doing it for nothing and IIRC this is
hot code so the less you do here the better. OTOH a single assignment
probably amounts to almost nothing.

~~~

8. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change

@@ -1354,6 +1438,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata;
MemoryContext old;

This variable declaration should be done later in the block where it
is assigned.

~~~

9. src/backend/replication/pgoutput/pgoutput.c - suggestion

I notice there is quite a few places in the patch that look like:

+ txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+

It might be worth considering encapsulating all those in a helper function like:
pgoutput_maybe_send_begin(ctx, txn)

It would certainly be a lot tidier.

~~~

10. src/backend/replication/syncrep.c - SyncRepEnabled

@@ -539,6 +538,15 @@ SyncRepReleaseWaiters(void)
}

 /*
+ * Check if synchronous replication is enabled
+ */
+bool
+SyncRepEnabled(void)

Missing period for that function comment.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#87shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
In reply to: Ajin Cherian (#85)
RE: logical replication empty transactions

On Fri, Mar 4, 2022 9:41 AM Ajin Cherian <itsajin@gmail.com> wrote:

I have split the patch into two. I have kept the logic of skipping
streaming changes in the second patch.
I will work on the second patch once we can figure out a solution for
the COMMIT PREPARED after restart problem.

Thanks for updating the patch.

A comment on v23-0001 patch.

@@ -1429,6 +1520,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (in_streaming)
xid = txn->xid;

+	/*
+	 * Output BEGIN if we haven't yet.
+	 * Avoid for non-transactional messages.
+	 */
+	if (in_streaming || transactional)
+	{
+		PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+		/* Send BEGIN if we haven't yet */
+		if (txndata && !txndata->sent_begin_txn)
+			pgoutput_send_begin(ctx, txn);
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_message(ctx->out,
 							 xid,

I think we don't need to send BEGIN if in_streaming is true, right? The first
patch doesn't skip streamed transaction, so should we modify
+ if (in_streaming || transactional)
to
+ if (!in_streaming && transactional)
?

Regards,
Shi yu

#88Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: shiy.fnst@fujitsu.com (#87)
2 attachment(s)
Re: logical replication empty transactions

On Mon, Mar 7, 2022 at 7:50 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

On Fri, Mar 4, 2022 9:41 AM Ajin Cherian <itsajin@gmail.com> wrote:

I have split the patch into two. I have kept the logic of skipping
streaming changes in the second patch.
I will work on the second patch once we can figure out a solution for
the COMMIT PREPARED after restart problem.

Thanks for updating the patch.

A comment on v23-0001 patch.

@@ -1429,6 +1520,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (in_streaming)
xid = txn->xid;

+       /*
+        * Output BEGIN if we haven't yet.
+        * Avoid for non-transactional messages.
+        */
+       if (in_streaming || transactional)
+       {
+               PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+               /* Send BEGIN if we haven't yet */
+               if (txndata && !txndata->sent_begin_txn)
+                       pgoutput_send_begin(ctx, txn);
+       }
+
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_message(ctx->out,
xid,
I think we don't need to send BEGIN if in_streaming is true, right? The first
patch doesn't skip streamed transaction, so should we modify
+       if (in_streaming || transactional)
to
+       if (!in_streaming && transactional)
?

Fixed.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v24-0002-Skip-empty-streamed-transactions-for-logical-rep.patchapplication/octet-stream; name=v24-0002-Skip-empty-streamed-transactions-for-logical-rep.patch
v24-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v24-0001-Skip-empty-transactions-for-logical-replication.patch
#89Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Ajin Cherian (#88)
2 attachment(s)
Re: logical replication empty transactions

On Mon, Mar 7, 2022 at 11:44 PM Ajin Cherian <itsajin@gmail.com> wrote:

Fixed.

regards,
Ajin Cherian
Fujitsu Australia

Rebased the patch and fixed some whitespace errors.
regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v25-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v25-0001-Skip-empty-transactions-for-logical-replication.patch
v25-0002-Skip-empty-streamed-transactions-for-logical-rep.patchapplication/octet-stream; name=v25-0002-Skip-empty-streamed-transactions-for-logical-rep.patch
#90Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Ajin Cherian (#89)
Re: logical replication empty transactions

On Wed, Mar 16, 2022 at 12:33 PM Ajin Cherian <itsajin@gmail.com> wrote:

On Mon, Mar 7, 2022 at 11:44 PM Ajin Cherian <itsajin@gmail.com> wrote:

Fixed.

Review comments/suggestions:
=========================
1. Isn't it sufficient to call pgoutput_send_begin from
maybe_send_schema as that is commonplace for all others and is always
the first message we send? If so, I think we can remove it from other
places?
2. Can we write some comments to explain why we don't skip streaming
or prepared empty transactions and some possible solutions (the
protocol change and additional subscription parameter as discussed
[1]: ) as discussed in this thread pgoutput.c? 3. Can we add a simple test for it in one of the existing test files(say in 001_rep_changes.pl)? 4. I think we can drop the skip streaming patch as we can't do that for now.
3. Can we add a simple test for it in one of the existing test
files(say in 001_rep_changes.pl)?
4. I think we can drop the skip streaming patch as we can't do that for now.

--
With Regards,
Amit Kapila.

#91Ajin Cherian
Ajin Cherian
itsajin@gmail.com
In reply to: Amit Kapila (#90)
1 attachment(s)
Re: logical replication empty transactions

On Thu, Mar 17, 2022 at 10:43 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

Review comments/suggestions:
=========================
1. Isn't it sufficient to call pgoutput_send_begin from
maybe_send_schema as that is commonplace for all others and is always
the first message we send? If so, I think we can remove it from other
places?

I've done the other way, I've removed it from maybe_send_schema as we
always call this
prior to calling maybe_send_schema.

2. Can we write some comments to explain why we don't skip streaming
or prepared empty transactions and some possible solutions (the
protocol change and additional subscription parameter as discussed
[1]) as discussed in this thread pgoutput.c?

I've added comment in the header of pgoutput_begin_prepare_txn() and
pgoutput_stream_start()

3. Can we add a simple test for it in one of the existing test
files(say in 001_rep_changes.pl)?

added a simple test.

4. I think we can drop the skip streaming patch as we can't do that for now.

Dropped,

In addition, I have also added a few more comments explaining why the begin send
is delayed in pgoutput_change till row_filter is checked and also ran pgindent.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v26-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v26-0001-Skip-empty-transactions-for-logical-replication.patch
#92Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Ajin Cherian (#91)
1 attachment(s)
Re: logical replication empty transactions

On Sat, Mar 19, 2022 at 9:10 AM Ajin Cherian <itsajin@gmail.com> wrote:

On Thu, Mar 17, 2022 at 10:43 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

3. Can we add a simple test for it in one of the existing test
files(say in 001_rep_changes.pl)?

added a simple test.

This doesn't verify if the transaction is skipped. I think we should
extend this test to check for a DEBUG message in the Logs (you need to
probably set log_min_messages to DEBUG1 for this test). As an example,
you can check the patch [1]/messages/by-id/CAA4eK1JbLRj6pSUENfDFsqj0+adNob_=RPXpnUnWFBskVi5JhA@mail.gmail.com. Also, it seems by mistake you have added
wait_for_catchup() twice.

Few other comments:
=================
1. Let's keep the parameter name as skipped_empty_xact in
OutputPluginUpdateProgress so as to not confuse with the other patch's
[2]: /messages/by-id/CAA4eK1LGnaPuWs2M4sDfpd6JQZjoh4DGAsgUvNW=Or8i9z6K8w@mail.gmail.com
keep_alive message so as to not make the syncrep wait whereas in the
other patch we only need to send it periodically based on
wal_sender_timeout parameter.
2. The new function SyncRepEnabled() seems confusing to me as the
comments in SyncRepWaitForLSN() clearly state why we need to first
read the parameter 'sync_standbys_defined' without any lock then read
it again with a lock if the parameter is true. So, I just put that
check back and also added a similar check in WalSndUpdateProgress.
3.
@@ -1392,11 +1481,21 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
continue;

  relids[nrelids++] = relid;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
  maybe_send_schema(ctx, change, relation, relentry);
  }
  if (nrelids > 0)
  {
+ txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+

Why do we need to try sending the begin in the second check? I think
it should be sufficient to do it in the above loop.

I have made these and a number of other changes in the attached patch.
Do let me know what you think of the attached?

[1]: /messages/by-id/CAA4eK1JbLRj6pSUENfDFsqj0+adNob_=RPXpnUnWFBskVi5JhA@mail.gmail.com
[2]: /messages/by-id/CAA4eK1LGnaPuWs2M4sDfpd6JQZjoh4DGAsgUvNW=Or8i9z6K8w@mail.gmail.com

--
With Regards,
Amit Kapila.

Attachments:

v27-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v27-0001-Skip-empty-transactions-for-logical-replication.patch
#93houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: Amit Kapila (#92)
1 attachment(s)
RE: logical replication empty transactions

On Monday, March 21, 2022 6:01 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Sat, Mar 19, 2022 at 9:10 AM Ajin Cherian <itsajin@gmail.com> wrote:

On Thu, Mar 17, 2022 at 10:43 PM Amit Kapila <amit.kapila16@gmail.com>

wrote:

3. Can we add a simple test for it in one of the existing test
files(say in 001_rep_changes.pl)?

added a simple test.

This doesn't verify if the transaction is skipped. I think we should
extend this test to check for a DEBUG message in the Logs (you need to
probably set log_min_messages to DEBUG1 for this test). As an example,
you can check the patch [1]. Also, it seems by mistake you have added
wait_for_catchup() twice.

I added a testcase to check the DEBUG message.

Few other comments:
=================
1. Let's keep the parameter name as skipped_empty_xact in
OutputPluginUpdateProgress so as to not confuse with the other patch's
[2] keep_alive parameter. I think in this case we must send the
keep_alive message so as to not make the syncrep wait whereas in the
other patch we only need to send it periodically based on
wal_sender_timeout parameter.
2. The new function SyncRepEnabled() seems confusing to me as the
comments in SyncRepWaitForLSN() clearly state why we need to first
read the parameter 'sync_standbys_defined' without any lock then read
it again with a lock if the parameter is true. So, I just put that
check back and also added a similar check in WalSndUpdateProgress.
3.
@@ -1392,11 +1481,21 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
continue;

relids[nrelids++] = relid;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
maybe_send_schema(ctx, change, relation, relentry);
}
if (nrelids > 0)
{
+ txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+

Why do we need to try sending the begin in the second check? I think
it should be sufficient to do it in the above loop.

I have made these and a number of other changes in the attached patch.
Do let me know what you think of the attached?

The changes look good to me.
And I did some basic tests for the patch and didn’t find some other problems.

Attach the new version patch.

Best regards,
Hou zj

Attachments:

v28-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v28-0001-Skip-empty-transactions-for-logical-replication.patch
#94houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: houzj.fnst@fujitsu.com (#93)
1 attachment(s)
RE: logical replication empty transactions

On Monday, March 21, 2022 6:01 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:

On Sat, Mar 19, 2022 at 9:10 AM Ajin Cherian <itsajin@gmail.com> wrote:

On Thu, Mar 17, 2022 at 10:43 PM Amit Kapila
<amit.kapila16@gmail.com>

wrote:

3. Can we add a simple test for it in one of the existing test
files(say in 001_rep_changes.pl)?

added a simple test.

This doesn't verify if the transaction is skipped. I think we should
extend this test to check for a DEBUG message in the Logs (you need to
probably set log_min_messages to DEBUG1 for this test). As an example,
you can check the patch [1]. Also, it seems by mistake you have added
wait_for_catchup() twice.

I added a testcase to check the DEBUG message.

Few other comments:
=================
1. Let's keep the parameter name as skipped_empty_xact in
OutputPluginUpdateProgress so as to not confuse with the other patch's
[2] keep_alive parameter. I think in this case we must send the
keep_alive message so as to not make the syncrep wait whereas in the
other patch we only need to send it periodically based on
wal_sender_timeout parameter.
2. The new function SyncRepEnabled() seems confusing to me as the
comments in SyncRepWaitForLSN() clearly state why we need to first
read the parameter 'sync_standbys_defined' without any lock then read
it again with a lock if the parameter is true. So, I just put that
check back and also added a similar check in WalSndUpdateProgress.
3.
@@ -1392,11 +1481,21 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
continue;

relids[nrelids++] = relid;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn) pgoutput_send_begin(ctx,
+ txn);
maybe_send_schema(ctx, change, relation, relentry);
}
if (nrelids > 0)
{
+ txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn) pgoutput_send_begin(ctx,
+ txn);
+

Why do we need to try sending the begin in the second check? I think
it should be sufficient to do it in the above loop.

I have made these and a number of other changes in the attached patch.
Do let me know what you think of the attached?

The changes look good to me.
And I did some basic tests for the patch and didn’t find some other problems.

Attach the new version patch.

Oh, sorry, I posted the wrong patch, here is the correct one.

Best regards,
Hou zj

Attachments:

v28-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v28-0001-Skip-empty-transactions-for-logical-replication.patch
#95Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: houzj.fnst@fujitsu.com (#94)
2 attachment(s)
Re: logical replication empty transactions

On Tue, Mar 22, 2022 at 7:25 AM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

On Monday, March 21, 2022 6:01 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:

Oh, sorry, I posted the wrong patch, here is the correct one.

The test change looks good to me. I think additionally we can verify
that the record is not reflected in the subscriber table. Apart from
that, I had made minor changes mostly in the comments in the attached
patch. If those look okay to you, please include those in the next
version.

--
With Regards,
Amit Kapila.

Attachments:

v28-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v28-0001-Skip-empty-transactions-for-logical-replication.patch
v28_diff_amit.1.patchapplication/octet-stream; name=v28_diff_amit.1.patch
#96houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: Amit Kapila (#95)
1 attachment(s)
RE: logical replication empty transactions

On Tuesday, March 22, 2022 7:50 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Mar 22, 2022 at 7:25 AM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

On Monday, March 21, 2022 6:01 PM Amit Kapila
<amit.kapila16@gmail.com>
wrote:

Oh, sorry, I posted the wrong patch, here is the correct one.

The test change looks good to me. I think additionally we can verify that the
record is not reflected in the subscriber table. Apart from that, I had made
minor changes mostly in the comments in the attached patch. If those look
okay to you, please include those in the next version.

Thanks, the changes look good to me, I merged the diff patch.

Attach the new version patch which include the following changes:

- Fix a typo
- Change the requestreply flag of the newly added WalSndKeepalive to false,
because the subscriber can judge whether it's necessary to post a reply based
on the received LSN.
- Add a testcase to make sure there is no data in subscriber side when the
transaction is skipped.
- Change the name of flag skipped_empty_xact to skipped_xact which seems more
understandable.
- Merge Amit's suggested changes.

Best regards,
Hou zj

Attachments:

v29-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v29-0001-Skip-empty-transactions-for-logical-replication.patch
#97shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
In reply to: houzj.fnst@fujitsu.com (#96)
2 attachment(s)
RE: logical replication empty transactions

On Thursday, March 24, 2022 11:19 AM Hou, Zhijie/侯 志杰 <houzj.fnst@fujitsu.com> wrote:

Attach the new version patch which include the following changes:

- Fix a typo
- Change the requestreply flag of the newly added WalSndKeepalive to false,
because the subscriber can judge whether it's necessary to post a reply
based
on the received LSN.
- Add a testcase to make sure there is no data in subscriber side when the
transaction is skipped.
- Change the name of flag skipped_empty_xact to skipped_xact which seems
more
understandable.
- Merge Amit's suggested changes.

Hi,

This patch skips sending BEGIN/COMMIT messages for empty transactions and saves
network bandwidth. So I tried to do a test to see how does it affect bandwidth.

This test refers to the previous test by Peter[1]/messages/by-id/CAHut+PuyqcDJO0X2BxY+9ycF+ew3x77FiCbTJQGnLDbNmMASZQ@mail.gmail.com. I temporarily modified the
code in worker.c to log the length of the data received by the subscriber (after
calling walrcv_receive()). At the conclusion of the test run, the logs are
processed to extract the numbers.

[1]: /messages/by-id/CAHut+PuyqcDJO0X2BxY+9ycF+ew3x77FiCbTJQGnLDbNmMASZQ@mail.gmail.com

The number of transactions is fixed (1000), and I tested different mixes of
empty and not-empty transactions sent - 0%, 25%, 50%, 100%. The patch will send
keepalive message when skipping empty transaction in synchronous replication
mode, so I tested both synchronous replication and asynchronous replication.

The results are as follows, and attach the bar chart.

Sync replication - size of sending data
--------------------------------------------------------------------
0% 25% 50% 75% 100%
HEAD 335211 281655 223661 170271 115108
patched 335217 256617 173878 98095 18108

Async replication - size of sending data
--------------------------------------------------------------------
0% 25% 50% 75% 100%
HEAD 339379 285835 236343 184227 115000
patched 335077 260953 180022 113333 18126

The details of the test is also attached.

Summary of result:
In both synchronous replication mode and asynchronous replication mode, as more
empty transactions, the improvement is more obvious. Even if when there is no
empty transaction, I can't see any overhead.

Regards,
Shi yu

Attachments:

performance_test.PNGimage/png; name=performance_test.PNG
details.txttext/plain; name=details.txt
#98houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: houzj.fnst@fujitsu.com (#96)
RE: logical replication empty transactions

On Thursday, March 24, 2022 11:19 AM houzj.fnst@fujitsu.com wrote:

On Tuesday, March 22, 2022 7:50 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:

On Tue, Mar 22, 2022 at 7:25 AM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

On Monday, March 21, 2022 6:01 PM Amit Kapila
<amit.kapila16@gmail.com>
wrote:

Oh, sorry, I posted the wrong patch, here is the correct one.

The test change looks good to me. I think additionally we can verify
that the record is not reflected in the subscriber table. Apart from
that, I had made minor changes mostly in the comments in the attached
patch. If those look okay to you, please include those in the next version.

Thanks, the changes look good to me, I merged the diff patch.

Attach the new version patch which include the following changes:

- Fix a typo
- Change the requestreply flag of the newly added WalSndKeepalive to false,
because the subscriber can judge whether it's necessary to post a reply
based
on the received LSN.
- Add a testcase to make sure there is no data in subscriber side when the
transaction is skipped.
- Change the name of flag skipped_empty_xact to skipped_xact which seems
more
understandable.
- Merge Amit's suggested changes.

I did some more review for the newly added keepalive message and confirmed that
it's necessary to send this in sync mode.

+	if (skipped_xact &&
+		SyncRepRequested() &&
+		((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+		WalSndKeepalive(false, ctx->write_location);

Because in sync replication, the publisher need to get the reply from
subscirber to release the waiter. After applying the patch, we don't send empty
transaction to subscriber, so we won't get a reply without this keepalive
message. Although the walsender usually invoke WalSndWaitForWal() which will
also send a keepalive message to subscriber, and we could get a reply and
release the wait. But WalSndWaitForWal() is not always invoked for each record.
When reading the page, we won't invoke WalSndWaitForWal() if we already have
the record in our buffer[1]ReadPageInternal( ... /* check whether we have all the requested data already */ if (targetSegNo == state->seg.ws_segno && targetPageOff == state->segoff && reqLen <= state->readLen) return state->readLen; ....

[1]: ReadPageInternal( ... /* check whether we have all the requested data already */ if (targetSegNo == state->seg.ws_segno && targetPageOff == state->segoff && reqLen <= state->readLen) return state->readLen; ...
...
/* check whether we have all the requested data already */
if (targetSegNo == state->seg.ws_segno &&
targetPageOff == state->segoff && reqLen <= state->readLen)
return state->readLen;
...

Based on above, if we don't have the newly added keepalive message in the
patch, the transaction could wait for a bit more time to finish.

For example, I did some experiments to confirm:
1. Set LOG_SNAPSHOT_INTERVAL_MS and checkpoint_timeout to a bigger value to
make sure it doesn't generate extra WAL which could affect the test.
2. Use debugger to attach the walsender and let it stop in the WalSndWaitForWal()
3. Start two clients and modify un-published table
postgres1 # INSERT INTO not_rep VALUES(1);
---- waiting
postgres2 # INSERT INTO not_rep VALUES(1);
---- waiting
4. Release the walsender, and we can see it won't send a keepalive to
subscriber until it has handled all the above two transactions, which means
the two transaction will wait until all of them has been decoded. This
behavior doesn't looks good and is inconsistent with the current
behavior(the transaction will finish after decoding it or after sending it
to sub if necessary).

So, I think the newly add keepalive message makes sense.

Best regards,
Hou zj

#99houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: houzj.fnst@fujitsu.com (#98)
1 attachment(s)
RE: logical replication empty transactions

On Friday, March 25, 2022 8:31 AM houzj.fnst@fujitsu.com <houzj.fnst@fujitsu.com> wrote:

On Thursday, March 24, 2022 11:19 AM houzj.fnst@fujitsu.com wrote:

On Tuesday, March 22, 2022 7:50 PM Amit Kapila

<amit.kapila16@gmail.com>

wrote:

On Tue, Mar 22, 2022 at 7:25 AM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

On Monday, March 21, 2022 6:01 PM Amit Kapila
<amit.kapila16@gmail.com>
wrote:

Oh, sorry, I posted the wrong patch, here is the correct one.

The test change looks good to me. I think additionally we can verify
that the record is not reflected in the subscriber table. Apart from
that, I had made minor changes mostly in the comments in the attached
patch. If those look okay to you, please include those in the next version.

Thanks, the changes look good to me, I merged the diff patch.

Attach the new version patch which include the following changes:

- Fix a typo
- Change the requestreply flag of the newly added WalSndKeepalive to false,
because the subscriber can judge whether it's necessary to post a reply
based
on the received LSN.
- Add a testcase to make sure there is no data in subscriber side when the
transaction is skipped.
- Change the name of flag skipped_empty_xact to skipped_xact which seems
more
understandable.
- Merge Amit's suggested changes.

I did some more review for the newly added keepalive message and confirmed
that it's necessary to send this in sync mode.

Since commit 75b1521 added decoding of sequence to logical
replication, this patch needs to have send begin message in
pgoutput_sequence if necessary.

Attach the new version patch with this change.

Best regards,
Hou zj

Attachments:

v30-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v30-0001-Skip-empty-transactions-for-logical-replication.patch
#100Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: houzj.fnst@fujitsu.com (#99)
Re: logical replication empty transactions

On Fri, Mar 25, 2022 at 12:50 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

Attach the new version patch with this change.

Few comments:
=================
1. I think we can move the keep_alive check after the tracklag record
check to keep it consistent with another patch [1]/messages/by-id/OS3PR01MB6275C64F264662E84D2FB7AE9E1D9@OS3PR01MB6275.jpnprd01.prod.outlook.com.
2. Add the comment about the new parameter skipped_xact atop
WalSndUpdateProgress.
3. I think we need to call pq_flush_if_writable after sending a
keepalive message to avoid delaying sync transactions.

[1]: /messages/by-id/OS3PR01MB6275C64F264662E84D2FB7AE9E1D9@OS3PR01MB6275.jpnprd01.prod.outlook.com

--
With Regards,
Amit Kapila.

#101houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: Amit Kapila (#100)
1 attachment(s)
RE: logical replication empty transactions

On Monday, March 28, 2022 3:08 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Fri, Mar 25, 2022 at 12:50 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

Attach the new version patch with this change.

Few comments:

Thanks for the comments.

=================
1. I think we can move the keep_alive check after the tracklag record
check to keep it consistent with another patch [1].

Changed.

2. Add the comment about the new parameter skipped_xact atop
WalSndUpdateProgress.

Added.

3. I think we need to call pq_flush_if_writable after sending a
keepalive message to avoid delaying sync transactions.

Agreed.
If we don’t flush the data, we might flush the keepalive later than before. And
we could get the reply later as well and then the release of syncwait could be
delayed.

Attach the new version patch which addressed the above comments.
The patch also adds a loop after the newly added keepalive message
to make sure the message is actually flushed to the client like what
did in WalSndWriteData.

Best regards,
Hou zj

Attachments:

v32-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v32-0001-Skip-empty-transactions-for-logical-replication.patch
#102Masahiko Sawada
Masahiko Sawada
sawada.mshk@gmail.com
In reply to: houzj.fnst@fujitsu.com (#101)
Re: logical replication empty transactions

On Mon, Mar 28, 2022 at 9:22 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

On Monday, March 28, 2022 3:08 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Fri, Mar 25, 2022 at 12:50 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

Attach the new version patch with this change.

Few comments:

Thanks for the comments.

=================
1. I think we can move the keep_alive check after the tracklag record
check to keep it consistent with another patch [1].

Changed.

2. Add the comment about the new parameter skipped_xact atop
WalSndUpdateProgress.

Added.

3. I think we need to call pq_flush_if_writable after sending a
keepalive message to avoid delaying sync transactions.

Agreed.
If we don’t flush the data, we might flush the keepalive later than before. And
we could get the reply later as well and then the release of syncwait could be
delayed.

Attach the new version patch which addressed the above comments.
The patch also adds a loop after the newly added keepalive message
to make sure the message is actually flushed to the client like what
did in WalSndWriteData.

Thank you for updating the patch!

Some comments:

+       if (skipped_xact &&
+               SyncRepRequested() &&
+               ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+       {
+               WalSndKeepalive(false, ctx->write_location);

I think we can use 'lsn' since it is actually ctx->write_location.

---
+       if (!sent_begin_txn)
+       {
+               elog(DEBUG1, "Skipped replication of an empty
transaction with XID: %u", txn->xid);
+               return;
+       }

The log message should start with lowercase.

---
+# Note that the current location of the log file is not grabbed immediately
+# after reloading the configuration, but after sending one SQL command to
+# the node so as we are sure that the reloading has taken effect.
+$log_location = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_notrep VALUES (11)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$logfile = slurp_file($node_publisher->logfile, $log_location);

I think we should get the log location of the publisher node, not
subscriber node.

Regards,

--
Masahiko Sawada
EDB: https://www.enterprisedb.com/

#103houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: Masahiko Sawada (#102)
1 attachment(s)
RE: logical replication empty transactions

On Tuesday, March 29, 2022 3:20 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

Some comments:

Thanks for the comments!

+       if (skipped_xact &&
+               SyncRepRequested() &&
+               ((volatile WalSndCtlData *)
WalSndCtl)->sync_standbys_defined)
+       {
+               WalSndKeepalive(false, ctx->write_location);

I think we can use 'lsn' since it is actually ctx->write_location.

Agreed, and changed.

---
+       if (!sent_begin_txn)
+       {
+               elog(DEBUG1, "Skipped replication of an empty
transaction with XID: %u", txn->xid);
+               return;
+       }

The log message should start with lowercase.

Changed.

---
+# Note that the current location of the log file is not grabbed
+immediately # after reloading the configuration, but after sending one
+SQL command to # the node so as we are sure that the reloading has taken
effect.
+$log_location = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_notrep VALUES
+(11)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$logfile = slurp_file($node_publisher->logfile, $log_location);

I think we should get the log location of the publisher node, not subscriber
node.

Changed.

Attach the new version patch which addressed the
above comments and slightly adjusted some code comments.

Best regards,
Hou zj

Attachments:

v33-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v33-0001-Skip-empty-transactions-for-logical-replication.patch
#104Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: houzj.fnst@fujitsu.com (#103)
Re: logical replication empty transactions

On Tue, Mar 29, 2022 at 2:05 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

Attach the new version patch which addressed the
above comments and slightly adjusted some code comments.

The patch looks good to me. One minor suggestion is to change the
function name ProcessPendingWritesAndTimeOut() to
ProcessPendingWrites().

--
With Regards,
Amit Kapila.

#105houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: Amit Kapila (#104)
1 attachment(s)
RE: logical replication empty transactions

On Tuesday, March 29, 2022 5:12 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Mar 29, 2022 at 2:05 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

Attach the new version patch which addressed the above comments and
slightly adjusted some code comments.

The patch looks good to me. One minor suggestion is to change the function
name ProcessPendingWritesAndTimeOut() to ProcessPendingWrites().

Thanks for the comment.
Attach the new version patch with this change.

Best regards,
Hou zj

Attachments:

v34-0001-Skip-empty-transactions-for-logical-replication.patchapplication/octet-stream; name=v34-0001-Skip-empty-transactions-for-logical-replication.patch
#106shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
In reply to: houzj.fnst@fujitsu.com (#105)
3 attachment(s)
RE: logical replication empty transactions

On Tue, Mar 29, 2022 5:15 PM Hou, Zhijie/侯 志杰 <houzj.fnst@fujitsu.com> wrote:

Thanks for the comment.
Attach the new version patch with this change.

Hi,

I did a performance test for this patch to see if it affects performance when
publishing empty transactions, based on the v32 patch.

In this test, I use synchronous logical replication, and publish a table with no
operations on it. The test uses pgbench, each run takes 15 minutes, and I take
median of 3 runs. Drop and recreate db after each run.

The results are as follows, and attach the bar chart. The details of the test is
also attached.

TPS - publishing empty transactions (scale factor 1)
--------------------------------------------------------------------
4 threads 8 threads 16 threads
HEAD 4818.2837 4353.6243 3888.5995
patched 5111.2936 4555.1629 4024.4286

TPS - publishing empty transactions (scale factor 100)
--------------------------------------------------------------------
4 threads 8 threads 16 threads
HEAD 9066.6465 16118.0453 21485.1207
patched 9357.3361 16638.6409 24503.6829

There is an improvement of more than 3% after applying this patch, and in the
best case, it improves by 14%, which looks good to me.

Regards,
Shi yu

Attachments:

TPS_empty_transactions_scale_factor_1.PNGimage/png; name=TPS_empty_transactions_scale_factor_1.PNG
TPS_empty_transactions_scale_factor_100.PNGimage/png; name=TPS_empty_transactions_scale_factor_100.PNG
details.txttext/plain; name=details.txt
#107Masahiko Sawada
Masahiko Sawada
sawada.mshk@gmail.com
In reply to: houzj.fnst@fujitsu.com (#105)
Re: logical replication empty transactions

On Tue, Mar 29, 2022 at 6:15 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

On Tuesday, March 29, 2022 5:12 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Mar 29, 2022 at 2:05 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

Attach the new version patch which addressed the above comments and
slightly adjusted some code comments.

The patch looks good to me. One minor suggestion is to change the function
name ProcessPendingWritesAndTimeOut() to ProcessPendingWrites().

Thanks for the comment.
Attach the new version patch with this change.

Thank you for updating the patch. Looks good to me.

Regards,

--
Masahiko Sawada
EDB: https://www.enterprisedb.com/

#108Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#107)
Re: logical replication empty transactions

On Wed, Mar 30, 2022 at 7:15 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Tue, Mar 29, 2022 at 6:15 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

Thanks for the comment.
Attach the new version patch with this change.

Thank you for updating the patch. Looks good to me.

Pushed.

--
With Regards,
Amit Kapila.