logical streaming of xacts via test_decoding is broken

Started by Amit Kapilaabout 5 years ago26 messages
#1Amit Kapila
amit.kapila16@gmail.com

Michael reported a BF failure [1]/messages/by-id/20201109014118.GD1695@paquier.xyz related to one of the logical
streaming test case and I've analyzed the issue. As responded on
pgsql-committers [2]/messages/by-id/CAA4eK1JMCm9HURVmOapo+v2u2EEABOuzgp7XJ32C072ygcKktQ@mail.gmail.com, the issue here is that the streaming
transactions can be interleaved and because we are maintaining whether
xact_wrote_changes at the LogicalDecodingContext level, one of later
transaction can overwrite the flag for previously streaming
transaction. I think it is logical to have this flag at each
transaction level (aka in ReorderBufferTxn), however till now it was
fine because the changes of each transaction are decoded at one-shot
which will be no longer true. We can keep a output_plugin_private data
pointer in ReorderBufferTxn which will be used by test_decoding module
to keep this and any other such flags in future. We need to set this
flag at begin_cb and stream_start_cb APIs and then reset/remove it at
stream_commit_cb, stream_abort_cb and stream_stop_cb APIs.

Additionally, we can extend the existing test case
concurrent_stream.spec to cover this scenario by adding a step to have
an empty transaction before the commit of transaction which we are
going to stream changes for (before s1_commit).

Thoughts?

[1]: /messages/by-id/20201109014118.GD1695@paquier.xyz
[2]: /messages/by-id/CAA4eK1JMCm9HURVmOapo+v2u2EEABOuzgp7XJ32C072ygcKktQ@mail.gmail.com

--
With Regards,
Amit Kapila.

#2Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#1)
Re: logical streaming of xacts via test_decoding is broken

On Mon, Nov 9, 2020 at 11:00 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

Michael reported a BF failure [1] related to one of the logical
streaming test case and I've analyzed the issue. As responded on
pgsql-committers [2], the issue here is that the streaming
transactions can be interleaved and because we are maintaining whether
xact_wrote_changes at the LogicalDecodingContext level, one of later
transaction can overwrite the flag for previously streaming
transaction. I think it is logical to have this flag at each
transaction level (aka in ReorderBufferTxn), however till now it was
fine because the changes of each transaction are decoded at one-shot
which will be no longer true. We can keep a output_plugin_private data
pointer in ReorderBufferTxn which will be used by test_decoding module
to keep this and any other such flags in future. We need to set this
flag at begin_cb and stream_start_cb APIs and then reset/remove it at
stream_commit_cb, stream_abort_cb and stream_stop_cb APIs.

Additionally, we can extend the existing test case
concurrent_stream.spec to cover this scenario by adding a step to have
an empty transaction before the commit of transaction which we are
going to stream changes for (before s1_commit).

Thoughts?

The analysis seems correct to me, I will work on it.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#3Dilip Kumar
dilipbalaut@gmail.com
In reply to: Dilip Kumar (#2)
Re: logical streaming of xacts via test_decoding is broken

On Mon, Nov 9, 2020 at 11:04 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:00 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

Michael reported a BF failure [1] related to one of the logical
streaming test case and I've analyzed the issue. As responded on
pgsql-committers [2], the issue here is that the streaming
transactions can be interleaved and because we are maintaining whether
xact_wrote_changes at the LogicalDecodingContext level, one of later
transaction can overwrite the flag for previously streaming
transaction. I think it is logical to have this flag at each
transaction level (aka in ReorderBufferTxn), however till now it was
fine because the changes of each transaction are decoded at one-shot
which will be no longer true. We can keep a output_plugin_private data
pointer in ReorderBufferTxn which will be used by test_decoding module
to keep this and any other such flags in future. We need to set this
flag at begin_cb and stream_start_cb APIs and then reset/remove it at
stream_commit_cb, stream_abort_cb and stream_stop_cb APIs.

So IIUC, we need to keep 'output_plugin_private' in
LogicalDecodingContext as well as in ReorderBufferTxn, So the
output_plugin_private in the ReorderBufferTxn will currently just keep
one flag xact_wrote_changes and the remaining things will still be
maintained in output_plugin_private of the LogicalDecodingContext. Is
my understanding correct?

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#4Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#3)
Re: logical streaming of xacts via test_decoding is broken

On Mon, Nov 9, 2020 at 11:21 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:04 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:00 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

Michael reported a BF failure [1] related to one of the logical
streaming test case and I've analyzed the issue. As responded on
pgsql-committers [2], the issue here is that the streaming
transactions can be interleaved and because we are maintaining whether
xact_wrote_changes at the LogicalDecodingContext level, one of later
transaction can overwrite the flag for previously streaming
transaction. I think it is logical to have this flag at each
transaction level (aka in ReorderBufferTxn), however till now it was
fine because the changes of each transaction are decoded at one-shot
which will be no longer true. We can keep a output_plugin_private data
pointer in ReorderBufferTxn which will be used by test_decoding module
to keep this and any other such flags in future. We need to set this
flag at begin_cb and stream_start_cb APIs and then reset/remove it at
stream_commit_cb, stream_abort_cb and stream_stop_cb APIs.

So IIUC, we need to keep 'output_plugin_private' in
LogicalDecodingContext as well as in ReorderBufferTxn, So the
output_plugin_private in the ReorderBufferTxn will currently just keep
one flag xact_wrote_changes and the remaining things will still be
maintained in output_plugin_private of the LogicalDecodingContext. Is
my understanding correct?

Yes. But keep it as void * so that we can add more things later if required.

--
With Regards,
Amit Kapila.

#5Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#4)
Re: logical streaming of xacts via test_decoding is broken

On Mon, Nov 9, 2020 at 11:31 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:21 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:04 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:00 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

Michael reported a BF failure [1] related to one of the logical
streaming test case and I've analyzed the issue. As responded on
pgsql-committers [2], the issue here is that the streaming
transactions can be interleaved and because we are maintaining whether
xact_wrote_changes at the LogicalDecodingContext level, one of later
transaction can overwrite the flag for previously streaming
transaction. I think it is logical to have this flag at each
transaction level (aka in ReorderBufferTxn), however till now it was
fine because the changes of each transaction are decoded at one-shot
which will be no longer true. We can keep a output_plugin_private data
pointer in ReorderBufferTxn which will be used by test_decoding module
to keep this and any other such flags in future. We need to set this
flag at begin_cb and stream_start_cb APIs and then reset/remove it at
stream_commit_cb, stream_abort_cb and stream_stop_cb APIs.

So IIUC, we need to keep 'output_plugin_private' in
LogicalDecodingContext as well as in ReorderBufferTxn, So the
output_plugin_private in the ReorderBufferTxn will currently just keep
one flag xact_wrote_changes and the remaining things will still be
maintained in output_plugin_private of the LogicalDecodingContext. Is
my understanding correct?

Yes. But keep it as void * so that we can add more things later if required.

Yeah, that makes sense to me.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#6Dilip Kumar
dilipbalaut@gmail.com
In reply to: Dilip Kumar (#5)
Re: logical streaming of xacts via test_decoding is broken

On Mon, Nov 9, 2020 at 1:34 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:31 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:21 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:04 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:00 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

Michael reported a BF failure [1] related to one of the logical
streaming test case and I've analyzed the issue. As responded on
pgsql-committers [2], the issue here is that the streaming
transactions can be interleaved and because we are maintaining whether
xact_wrote_changes at the LogicalDecodingContext level, one of later
transaction can overwrite the flag for previously streaming
transaction. I think it is logical to have this flag at each
transaction level (aka in ReorderBufferTxn), however till now it was
fine because the changes of each transaction are decoded at one-shot
which will be no longer true. We can keep a output_plugin_private data
pointer in ReorderBufferTxn which will be used by test_decoding module
to keep this and any other such flags in future. We need to set this
flag at begin_cb and stream_start_cb APIs and then reset/remove it at
stream_commit_cb, stream_abort_cb and stream_stop_cb APIs.

So IIUC, we need to keep 'output_plugin_private' in
LogicalDecodingContext as well as in ReorderBufferTxn, So the
output_plugin_private in the ReorderBufferTxn will currently just keep
one flag xact_wrote_changes and the remaining things will still be
maintained in output_plugin_private of the LogicalDecodingContext. Is
my understanding correct?

Yes. But keep it as void * so that we can add more things later if required.

Yeah, that makes sense to me.

I have made some POC changes and analyzed this further, I think that
for the streaming transaction we need 2 flags
1) xact_wrote_changes 2) stream_wrote_changes

So basically, if the stream didn't make any changes we can skip the
stream start and stream stop message for the empty stream, but if any
of the streams has made any change then we need to emit the
transaction commit message. But if we want to avoid tracking the
changes per stream then maybe once we set the xact_wrote_changes to
true once for the txn then we better emit the message for all the
stream without tracking whether the stream is empty or not. What is
your thought on this?

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#7Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#6)
Re: logical streaming of xacts via test_decoding is broken

On Mon, Nov 9, 2020 at 3:01 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 1:34 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:31 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:21 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:04 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:00 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

Michael reported a BF failure [1] related to one of the logical
streaming test case and I've analyzed the issue. As responded on
pgsql-committers [2], the issue here is that the streaming
transactions can be interleaved and because we are maintaining whether
xact_wrote_changes at the LogicalDecodingContext level, one of later
transaction can overwrite the flag for previously streaming
transaction. I think it is logical to have this flag at each
transaction level (aka in ReorderBufferTxn), however till now it was
fine because the changes of each transaction are decoded at one-shot
which will be no longer true. We can keep a output_plugin_private data
pointer in ReorderBufferTxn which will be used by test_decoding module
to keep this and any other such flags in future. We need to set this
flag at begin_cb and stream_start_cb APIs and then reset/remove it at
stream_commit_cb, stream_abort_cb and stream_stop_cb APIs.

So IIUC, we need to keep 'output_plugin_private' in
LogicalDecodingContext as well as in ReorderBufferTxn, So the
output_plugin_private in the ReorderBufferTxn will currently just keep
one flag xact_wrote_changes and the remaining things will still be
maintained in output_plugin_private of the LogicalDecodingContext. Is
my understanding correct?

Yes. But keep it as void * so that we can add more things later if required.

Yeah, that makes sense to me.

I have made some POC changes and analyzed this further, I think that
for the streaming transaction we need 2 flags
1) xact_wrote_changes 2) stream_wrote_changes

So basically, if the stream didn't make any changes we can skip the
stream start and stream stop message for the empty stream, but if any
of the streams has made any change then we need to emit the
transaction commit message. But if we want to avoid tracking the
changes per stream then maybe once we set the xact_wrote_changes to
true once for the txn then we better emit the message for all the
stream without tracking whether the stream is empty or not. What is
your thought on this?

I would prefer to have two separate flags to control this behavior
because without that it is quite possible that in some of the cases we
display empty stream start/stop messages even when that is not
intended. The bigger question is do we want to give users an option
for skip_empty_streams similar to skip_empty_xacts? I would again
prefer to give a separate option to the user as well. What do you
think?

--
With Regards,
Amit Kapila.

#8Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#7)
Re: logical streaming of xacts via test_decoding is broken

On Mon, Nov 9, 2020 at 4:21 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Nov 9, 2020 at 3:01 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 1:34 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:31 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:21 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:04 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:00 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

Michael reported a BF failure [1] related to one of the logical
streaming test case and I've analyzed the issue. As responded on
pgsql-committers [2], the issue here is that the streaming
transactions can be interleaved and because we are maintaining whether
xact_wrote_changes at the LogicalDecodingContext level, one of later
transaction can overwrite the flag for previously streaming
transaction. I think it is logical to have this flag at each
transaction level (aka in ReorderBufferTxn), however till now it was
fine because the changes of each transaction are decoded at one-shot
which will be no longer true. We can keep a output_plugin_private data
pointer in ReorderBufferTxn which will be used by test_decoding module
to keep this and any other such flags in future. We need to set this
flag at begin_cb and stream_start_cb APIs and then reset/remove it at
stream_commit_cb, stream_abort_cb and stream_stop_cb APIs.

So IIUC, we need to keep 'output_plugin_private' in
LogicalDecodingContext as well as in ReorderBufferTxn, So the
output_plugin_private in the ReorderBufferTxn will currently just keep
one flag xact_wrote_changes and the remaining things will still be
maintained in output_plugin_private of the LogicalDecodingContext. Is
my understanding correct?

Yes. But keep it as void * so that we can add more things later if required.

Yeah, that makes sense to me.

I have made some POC changes and analyzed this further, I think that
for the streaming transaction we need 2 flags
1) xact_wrote_changes 2) stream_wrote_changes

So basically, if the stream didn't make any changes we can skip the
stream start and stream stop message for the empty stream, but if any
of the streams has made any change then we need to emit the
transaction commit message. But if we want to avoid tracking the
changes per stream then maybe once we set the xact_wrote_changes to
true once for the txn then we better emit the message for all the
stream without tracking whether the stream is empty or not. What is
your thought on this?

I would prefer to have two separate flags to control this behavior
because without that it is quite possible that in some of the cases we
display empty stream start/stop messages even when that is not
intended.

+1

The bigger question is do we want to give users an option

for skip_empty_streams similar to skip_empty_xacts? I would again
prefer to give a separate option to the user as well. What do you
think?

Yeah, I think giving an option would be better.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#9Dilip Kumar
dilipbalaut@gmail.com
In reply to: Dilip Kumar (#8)
Re: logical streaming of xacts via test_decoding is broken

On Mon, Nov 9, 2020 at 5:37 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 4:21 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Nov 9, 2020 at 3:01 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 1:34 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:31 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:21 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:04 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 11:00 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

Michael reported a BF failure [1] related to one of the logical
streaming test case and I've analyzed the issue. As responded on
pgsql-committers [2], the issue here is that the streaming
transactions can be interleaved and because we are maintaining whether
xact_wrote_changes at the LogicalDecodingContext level, one of later
transaction can overwrite the flag for previously streaming
transaction. I think it is logical to have this flag at each
transaction level (aka in ReorderBufferTxn), however till now it was
fine because the changes of each transaction are decoded at one-shot
which will be no longer true. We can keep a output_plugin_private data
pointer in ReorderBufferTxn which will be used by test_decoding module
to keep this and any other such flags in future. We need to set this
flag at begin_cb and stream_start_cb APIs and then reset/remove it at
stream_commit_cb, stream_abort_cb and stream_stop_cb APIs.

So IIUC, we need to keep 'output_plugin_private' in
LogicalDecodingContext as well as in ReorderBufferTxn, So the
output_plugin_private in the ReorderBufferTxn will currently just keep
one flag xact_wrote_changes and the remaining things will still be
maintained in output_plugin_private of the LogicalDecodingContext. Is
my understanding correct?

Yes. But keep it as void * so that we can add more things later if required.

Yeah, that makes sense to me.

I have made some POC changes and analyzed this further, I think that
for the streaming transaction we need 2 flags
1) xact_wrote_changes 2) stream_wrote_changes

So basically, if the stream didn't make any changes we can skip the
stream start and stream stop message for the empty stream, but if any
of the streams has made any change then we need to emit the
transaction commit message. But if we want to avoid tracking the
changes per stream then maybe once we set the xact_wrote_changes to
true once for the txn then we better emit the message for all the
stream without tracking whether the stream is empty or not. What is
your thought on this?

I would prefer to have two separate flags to control this behavior
because without that it is quite possible that in some of the cases we
display empty stream start/stop messages even when that is not
intended.

+1

The bigger question is do we want to give users an option

for skip_empty_streams similar to skip_empty_xacts? I would again
prefer to give a separate option to the user as well. What do you
think?

Yeah, I think giving an option would be better.

I think we should also think about the combinations of the
skip_empty_xacts and skip_empty_streams. For example, if the user
passes the skip_empty_xacts to false and skip_empty_streams to true
then what should be the behavior, if the complete transaction
option1: It should not print any stream_start/stream_stop and just
print commit stream because skip_empty_xacts is false and
skip_empty_streams is true.
option2: It should print the stream_start message for the very first
stream because it is the first stream if the txn and skip_empty_xacts
is false so print it and later it will print-stream commit.
option3: Or for the first stream we first put the BEGIN message i.e
stream begin
stream start
stream stop
stream commit
option4: the user should not be allowed to pass skip_empty_xacts =
false with skip_empty_streams to true. Because if the streaming mode
is on then we can not print the xact without printing streams.

What is your opinion on this?

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#10Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#9)
Re: logical streaming of xacts via test_decoding is broken

On Mon, Nov 9, 2020 at 6:00 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 5:37 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 4:21 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

The bigger question is do we want to give users an option

for skip_empty_streams similar to skip_empty_xacts? I would again
prefer to give a separate option to the user as well. What do you
think?

Yeah, I think giving an option would be better.

I think we should also think about the combinations of the
skip_empty_xacts and skip_empty_streams. For example, if the user
passes the skip_empty_xacts to false and skip_empty_streams to true
then what should be the behavior, if the complete transaction
option1: It should not print any stream_start/stream_stop and just
print commit stream because skip_empty_xacts is false and
skip_empty_streams is true.
option2: It should print the stream_start message for the very first
stream because it is the first stream if the txn and skip_empty_xacts
is false so print it and later it will print-stream commit.
option3: Or for the first stream we first put the BEGIN message i.e
stream begin
stream start
stream stop
stream commit
option4: the user should not be allowed to pass skip_empty_xacts =
false with skip_empty_streams to true. Because if the streaming mode
is on then we can not print the xact without printing streams.

What is your opinion on this?

I would prefer option-4 and in addition to that we can ensure that if
skip_empty_xacts = true then by default skip_empty_streams is also
true.

--
With Regards,
Amit Kapila.

#11Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#10)
Re: logical streaming of xacts via test_decoding is broken

On Tue, Nov 10, 2020 at 8:14 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Nov 9, 2020 at 6:00 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 5:37 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 4:21 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

The bigger question is do we want to give users an option

for skip_empty_streams similar to skip_empty_xacts? I would again
prefer to give a separate option to the user as well. What do you
think?

Yeah, I think giving an option would be better.

I think we should also think about the combinations of the
skip_empty_xacts and skip_empty_streams. For example, if the user
passes the skip_empty_xacts to false and skip_empty_streams to true
then what should be the behavior, if the complete transaction
option1: It should not print any stream_start/stream_stop and just
print commit stream because skip_empty_xacts is false and
skip_empty_streams is true.
option2: It should print the stream_start message for the very first
stream because it is the first stream if the txn and skip_empty_xacts
is false so print it and later it will print-stream commit.
option3: Or for the first stream we first put the BEGIN message i.e
stream begin
stream start
stream stop
stream commit
option4: the user should not be allowed to pass skip_empty_xacts =
false with skip_empty_streams to true. Because if the streaming mode
is on then we can not print the xact without printing streams.

What is your opinion on this?

I would prefer option-4 and in addition to that we can ensure that if
skip_empty_xacts = true then by default skip_empty_streams is also
true.

But then it will behave as a single option only, right? because if
1. skip_empty_xacts = true, then we set skip_empty_streams = true
2. skip_empty_xacts = false then skip_empty_streams can not be set to true

So as per the state machine either both will be true or both will be
false, Am I missing something?

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#12Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#11)
Re: logical streaming of xacts via test_decoding is broken

On Tue, Nov 10, 2020 at 10:26 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 10, 2020 at 8:14 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Nov 9, 2020 at 6:00 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 5:37 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 4:21 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

The bigger question is do we want to give users an option

for skip_empty_streams similar to skip_empty_xacts? I would again
prefer to give a separate option to the user as well. What do you
think?

Yeah, I think giving an option would be better.

I think we should also think about the combinations of the
skip_empty_xacts and skip_empty_streams. For example, if the user
passes the skip_empty_xacts to false and skip_empty_streams to true
then what should be the behavior, if the complete transaction
option1: It should not print any stream_start/stream_stop and just
print commit stream because skip_empty_xacts is false and
skip_empty_streams is true.
option2: It should print the stream_start message for the very first
stream because it is the first stream if the txn and skip_empty_xacts
is false so print it and later it will print-stream commit.
option3: Or for the first stream we first put the BEGIN message i.e
stream begin
stream start
stream stop
stream commit
option4: the user should not be allowed to pass skip_empty_xacts =
false with skip_empty_streams to true. Because if the streaming mode
is on then we can not print the xact without printing streams.

What is your opinion on this?

I would prefer option-4 and in addition to that we can ensure that if
skip_empty_xacts = true then by default skip_empty_streams is also
true.

But then it will behave as a single option only, right? because if
1. skip_empty_xacts = true, then we set skip_empty_streams = true

For this case, users can use skip_empty_xacts = true and
skip_empty_streams = false. I am just asking if the user has only used
skip_empty_xacts = true and didn't use the 'skip_empty_streams'
option.

--
With Regards,
Amit Kapila.

#13Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#12)
Re: logical streaming of xacts via test_decoding is broken

On Tue, Nov 10, 2020 at 10:52 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Nov 10, 2020 at 10:26 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 10, 2020 at 8:14 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Nov 9, 2020 at 6:00 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 5:37 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 4:21 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

The bigger question is do we want to give users an option

for skip_empty_streams similar to skip_empty_xacts? I would again
prefer to give a separate option to the user as well. What do you
think?

Yeah, I think giving an option would be better.

I think we should also think about the combinations of the
skip_empty_xacts and skip_empty_streams. For example, if the user
passes the skip_empty_xacts to false and skip_empty_streams to true
then what should be the behavior, if the complete transaction
option1: It should not print any stream_start/stream_stop and just
print commit stream because skip_empty_xacts is false and
skip_empty_streams is true.
option2: It should print the stream_start message for the very first
stream because it is the first stream if the txn and skip_empty_xacts
is false so print it and later it will print-stream commit.
option3: Or for the first stream we first put the BEGIN message i.e
stream begin
stream start
stream stop
stream commit
option4: the user should not be allowed to pass skip_empty_xacts =
false with skip_empty_streams to true. Because if the streaming mode
is on then we can not print the xact without printing streams.

What is your opinion on this?

I would prefer option-4 and in addition to that we can ensure that if
skip_empty_xacts = true then by default skip_empty_streams is also
true.

But then it will behave as a single option only, right? because if
1. skip_empty_xacts = true, then we set skip_empty_streams = true

For this case, users can use skip_empty_xacts = true and
skip_empty_streams = false. I am just asking if the user has only used
skip_empty_xacts = true and didn't use the 'skip_empty_streams'
option.

Ok, thanks for the clarification.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#14Dilip Kumar
dilipbalaut@gmail.com
In reply to: Dilip Kumar (#13)
1 attachment(s)
Re: logical streaming of xacts via test_decoding is broken

On Tue, Nov 10, 2020 at 11:18 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 10, 2020 at 10:52 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Nov 10, 2020 at 10:26 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 10, 2020 at 8:14 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Nov 9, 2020 at 6:00 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 5:37 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 9, 2020 at 4:21 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

The bigger question is do we want to give users an option

for skip_empty_streams similar to skip_empty_xacts? I would again
prefer to give a separate option to the user as well. What do you
think?

Yeah, I think giving an option would be better.

I think we should also think about the combinations of the
skip_empty_xacts and skip_empty_streams. For example, if the user
passes the skip_empty_xacts to false and skip_empty_streams to true
then what should be the behavior, if the complete transaction
option1: It should not print any stream_start/stream_stop and just
print commit stream because skip_empty_xacts is false and
skip_empty_streams is true.
option2: It should print the stream_start message for the very first
stream because it is the first stream if the txn and skip_empty_xacts
is false so print it and later it will print-stream commit.
option3: Or for the first stream we first put the BEGIN message i.e
stream begin
stream start
stream stop
stream commit
option4: the user should not be allowed to pass skip_empty_xacts =
false with skip_empty_streams to true. Because if the streaming mode
is on then we can not print the xact without printing streams.

What is your opinion on this?

I would prefer option-4 and in addition to that we can ensure that if
skip_empty_xacts = true then by default skip_empty_streams is also
true.

But then it will behave as a single option only, right? because if
1. skip_empty_xacts = true, then we set skip_empty_streams = true

For this case, users can use skip_empty_xacts = true and
skip_empty_streams = false. I am just asking if the user has only used
skip_empty_xacts = true and didn't use the 'skip_empty_streams'
option.

Ok, thanks for the clarification.

I have prepared a patch for the same.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachments:

v1-0001-Bug-fix-skip-empty-xacts-in-streaming-mode.patchapplication/octet-stream; name=v1-0001-Bug-fix-skip-empty-xacts-in-streaming-mode.patchDownload
From 11cf3de4232e02d38850aa3433cd5ec024f571a6 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Tue, 10 Nov 2020 14:07:30 +0530
Subject: [PATCH v1] Bug fix skip-empty-xacts in streaming mode

In streaming mode the transaction can be decoded in multiple streams
and those streams can be interleaved.  Due to that we can not remember
the transaction's write status in the logical decoding context because
those might get changed due to some other transactions so we need to
keep that in the reorder buffer txn.  Along with that we also support
a new option to skip an empty streams.
---
 .../test_decoding/expected/concurrent_stream.out   |   5 +-
 contrib/test_decoding/specs/concurrent_stream.spec |   8 +-
 contrib/test_decoding/test_decoding.c              | 100 +++++++++++++++++----
 src/include/replication/reorderbuffer.h            |   5 ++
 4 files changed, 98 insertions(+), 20 deletions(-)

diff --git a/contrib/test_decoding/expected/concurrent_stream.out b/contrib/test_decoding/expected/concurrent_stream.out
index e731d13..6f8b217 100644
--- a/contrib/test_decoding/expected/concurrent_stream.out
+++ b/contrib/test_decoding/expected/concurrent_stream.out
@@ -1,11 +1,12 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 3 sessions
 
-starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s1_commit s1_get_stream_changes
+starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s2_ddl s1_commit s1_get_stream_changes
 step s0_begin: BEGIN;
 step s0_ddl: CREATE TABLE stream_test1(data text);
 step s1_ddl: CREATE TABLE stream_test(data text);
 step s1_begin: BEGIN;
 step s1_toast_insert: INSERT INTO stream_test SELECT large_val();
+step s2_ddl: CREATE TABLE stream_test2(data text);
 step s1_commit: COMMIT;
 step s1_get_stream_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 data           
diff --git a/contrib/test_decoding/specs/concurrent_stream.spec b/contrib/test_decoding/specs/concurrent_stream.spec
index ad9fde9..8d24ca1 100644
--- a/contrib/test_decoding/specs/concurrent_stream.spec
+++ b/contrib/test_decoding/specs/concurrent_stream.spec
@@ -8,7 +8,7 @@ setup
 
   -- consume DDL
   SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-  CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS 'select array_agg(md5(g::text))::text from generate_series(1, 80000) g';
+  CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS 'select array_agg(md5(g::text))::text from generate_series(1, 60000) g';
 }
 
 teardown
@@ -23,6 +23,10 @@ setup { SET synchronous_commit=on; }
 step "s0_begin" { BEGIN; }
 step "s0_ddl"   {CREATE TABLE stream_test1(data text);}
 
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_ddl"   {CREATE TABLE stream_test2(data text);}
+
 # The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to
 # the currently running s0_ddl and we want to test that s0_ddl should not get
 # streamed when user asked to skip-empty-xacts.
@@ -34,4 +38,4 @@ step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();}
 step "s1_commit" { COMMIT; }
 step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');}
 
-permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s1_commit" "s1_get_stream_changes"
+permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s2_ddl" "s1_commit" "s1_get_stream_changes"
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 8e33614..921306c 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -34,10 +34,16 @@ typedef struct
 	bool		include_xids;
 	bool		include_timestamp;
 	bool		skip_empty_xacts;
-	bool		xact_wrote_changes;
+	bool		skip_empty_streams;	
 	bool		only_local;
 } TestDecodingData;
 
+typedef struct
+{
+	bool		xact_wrote_changes;
+	bool		stream_wrote_changes;
+} TestDecodingTxnData;
+
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 							  bool is_init);
 static void pg_decode_shutdown(LogicalDecodingContext *ctx);
@@ -135,6 +141,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_xids = true;
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
+	data->skip_empty_streams = false;
 	data->only_local = false;
 
 	ctx->output_plugin_private = data;
@@ -194,6 +201,24 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
+
+			/* set skip empty stream to true if skip empty xact is set to true */
+			if (data->skip_empty_xacts)
+				data->skip_empty_streams = true;
+		}
+		else if (strcmp(elem->defname, "skip-empty-streams") == 0)
+		{
+			if (elem->arg == NULL)
+				data->skip_empty_streams = true;
+			else if (!parse_bool(strVal(elem->arg), &data->skip_empty_streams))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
+								strVal(elem->arg), elem->defname)));
+			if (!data->skip_empty_xacts && data->skip_empty_streams)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("the skip-empty-streams can not be true if skip-empty-xacts is false")));
 		}
 		else if (strcmp(elem->defname, "only-local") == 0)
 		{
@@ -255,8 +280,12 @@ static void
 pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata =
+		MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+
+	txndata->xact_wrote_changes = false;
+	txn->output_plugin_private = txndata;
 
-	data->xact_wrote_changes = false;
 	if (data->skip_empty_xacts)
 		return;
 
@@ -280,8 +309,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr commit_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -442,18 +472,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				 Relation relation, ReorderBufferChange *change)
 {
 	TestDecodingData *data;
+	TestDecodingTxnData *txndata;
 	Form_pg_class class_form;
 	TupleDesc	tupdesc;
 	MemoryContext old;
 
 	data = ctx->output_plugin_private;
+	txndata = txn->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 	{
 		pg_output_begin(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = true;
 
 	class_form = RelationGetForm(relation);
 	tupdesc = RelationGetDescr(relation);
@@ -527,17 +559,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				   int nrelations, Relation relations[], ReorderBufferChange *change)
 {
 	TestDecodingData *data;
+	TestDecodingTxnData *txndata;
 	MemoryContext old;
 	int			i;
 
 	data = ctx->output_plugin_private;
+	txndata = txn->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 	{
 		pg_output_begin(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = true;
 
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
@@ -592,10 +626,26 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	data->xact_wrote_changes = false;
-	if (data->skip_empty_xacts)
+	/*
+	 * If this is the first stream for the txn then allocate the txn plugin
+	 * data and set the xact_wrote_changes to false.
+	 */
+	if (txndata == NULL)
+	{
+		txndata =
+			MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+		txndata->xact_wrote_changes = false;
+		txn->output_plugin_private = txndata;
+	}
+
+	txndata->stream_wrote_changes = false;
+	if (data->skip_empty_streams)
+	{
+		Assert(data->skip_empty_xacts);
 		return;
+	}
 	pg_output_stream_start(ctx, data, txn, true);
 }
 
@@ -615,8 +665,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
 					  ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_streams && !txndata->stream_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -633,8 +684,18 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 					   XLogRecPtr abort_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
+	TestDecodingTxnData *txndata = toptxn->output_plugin_private;
+	bool	xact_wrote_changes = txndata->xact_wrote_changes;
+
+	if (txn->toptxn == NULL)
+	{
+		Assert(txn->output_plugin_private != NULL);
+		pfree(txndata);
+		txn->output_plugin_private = false;
+	}
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -651,8 +712,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
 						XLogRecPtr commit_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
+	bool	xact_wrote_changes = txndata->xact_wrote_changes;
+
+	pfree(txndata);
+	txn->output_plugin_private = false;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -681,13 +747,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
 						ReorderBufferChange *change)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
 	/* output stream start if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_streams && !txndata->stream_wrote_changes)
 	{
 		pg_output_stream_start(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
 
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
@@ -734,12 +801,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 						  ReorderBufferChange *change)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_streams && !txndata->stream_wrote_changes)
 	{
 		pg_output_stream_start(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
 
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index dfdda93..bd9dd7e 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -378,6 +378,11 @@ typedef struct ReorderBufferTXN
 
 	/* If we have detected concurrent abort then ignore future changes. */
 	bool		concurrent_abort;
+
+	/*
+	 * Private data pointer of the output plugin.
+	 */
+	void	   *output_plugin_private;
 } ReorderBufferTXN;
 
 /* so we can define the callbacks used inside struct ReorderBuffer itself */
-- 
1.8.3.1

#15Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#14)
Re: logical streaming of xacts via test_decoding is broken

On Tue, Nov 10, 2020 at 2:25 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 10, 2020 at 11:18 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 10, 2020 at 10:52 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

For this case, users can use skip_empty_xacts = true and
skip_empty_streams = false. I am just asking if the user has only used
skip_empty_xacts = true and didn't use the 'skip_empty_streams'
option.

Ok, thanks for the clarification.

I have prepared a patch for the same.

Few comments:
1.
+ else if (strcmp(elem->defname, "skip-empty-streams") == 0)
+ {
+ if (elem->arg == NULL)
+ data->skip_empty_streams = true;
+ else if (!parse_bool(strVal(elem->arg), &data->skip_empty_streams))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ if (!data->skip_empty_xacts && data->skip_empty_streams)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("the skip-empty-streams can not be true if skip-empty-xacts
is false")));
  }

You can probably add a comment as to why we are disallowing this case.
I thought of considering 'stream-changes' parameter here because it
won't make sense to give this parameter without it, however, it seems
that is not necessary but maybe adding a comment
here in that regard would be a good idea.

2.
pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
  TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata =
+ MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+

Shall we free this memory at commit time for the sake of consistency,
otherwise also it would be freed with decoding context?

3. Can you please prepare a separate patch for test case changes so
that it would be easier to verify that it fails without the patch and
passed after the patch?

--
With Regards,
Amit Kapila.

#16Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#15)
Re: logical streaming of xacts via test_decoding is broken

On Tue, Nov 10, 2020 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Nov 10, 2020 at 2:25 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 10, 2020 at 11:18 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 10, 2020 at 10:52 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

For this case, users can use skip_empty_xacts = true and
skip_empty_streams = false. I am just asking if the user has only used
skip_empty_xacts = true and didn't use the 'skip_empty_streams'
option.

Ok, thanks for the clarification.

I have prepared a patch for the same.

Few comments:
1.
+ else if (strcmp(elem->defname, "skip-empty-streams") == 0)
+ {
+ if (elem->arg == NULL)
+ data->skip_empty_streams = true;
+ else if (!parse_bool(strVal(elem->arg), &data->skip_empty_streams))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ if (!data->skip_empty_xacts && data->skip_empty_streams)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("the skip-empty-streams can not be true if skip-empty-xacts
is false")));
}

You can probably add a comment as to why we are disallowing this case.
I thought of considering 'stream-changes' parameter here because it
won't make sense to give this parameter without it, however, it seems
that is not necessary but maybe adding a comment
here in that regard would be a good idea.

Should we also consider the case that if the user just passed
skip_empty_streams to true then we should automatically set
skip_empty_xacts to true?
And we will allow the 'skip_empty_streams' parameter only if
stream-changes' is true.

2.
pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata =
+ MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+

Shall we free this memory at commit time for the sake of consistency,
otherwise also it would be freed with decoding context?

Yeah, actually I have freed in the stream commit but missed here. I
will do that.

3. Can you please prepare a separate patch for test case changes so
that it would be easier to verify that it fails without the patch and
passed after the patch?

ok

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#17Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#16)
Re: logical streaming of xacts via test_decoding is broken

On Wed, Nov 11, 2020 at 10:00 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 10, 2020 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Nov 10, 2020 at 2:25 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

You can probably add a comment as to why we are disallowing this case.
I thought of considering 'stream-changes' parameter here because it
won't make sense to give this parameter without it, however, it seems
that is not necessary but maybe adding a comment
here in that regard would be a good idea.

Should we also consider the case that if the user just passed
skip_empty_streams to true then we should automatically set
skip_empty_xacts to true?

Is there any problem if we don't do this? Actually, adding
dependencies on parameters is confusing so I want to avoid that unless
it is really required.

And we will allow the 'skip_empty_streams' parameter only if
stream-changes' is true.

Can't we simply ignore 'skip_empty_streams' if 'stream-changes' is not given?

--
With Regards,
Amit Kapila.

#18Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#17)
Re: logical streaming of xacts via test_decoding is broken

On Wed, Nov 11, 2020 at 6:59 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Nov 11, 2020 at 10:00 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 10, 2020 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Nov 10, 2020 at 2:25 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

You can probably add a comment as to why we are disallowing this case.
I thought of considering 'stream-changes' parameter here because it
won't make sense to give this parameter without it, however, it seems
that is not necessary but maybe adding a comment
here in that regard would be a good idea.

Should we also consider the case that if the user just passed
skip_empty_streams to true then we should automatically set
skip_empty_xacts to true?

Is there any problem if we don't do this? Actually, adding
dependencies on parameters is confusing so I want to avoid that unless
it is really required.

And we will allow the 'skip_empty_streams' parameter only if
stream-changes' is true.

The reason behind this thought is that if the user doesn't pass any
value for 'skip_empty_xacts' then the default value will be false and
if the user only pass 'skip_empty_streams' to true then we will error
out assuming that skip_empty_xacts is false but skip_empty_streams is
true. So it seems instead of error out we can assume that
skip_empty_streams true mean skip_empty_xacts is also true if nothing
is passed for that.

Can't we simply ignore 'skip_empty_streams' if 'stream-changes' is not given?

Yeah, we can do that.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#19Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#18)
Re: logical streaming of xacts via test_decoding is broken

On Wed, Nov 11, 2020 at 7:05 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Wed, Nov 11, 2020 at 6:59 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Nov 11, 2020 at 10:00 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 10, 2020 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Nov 10, 2020 at 2:25 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

You can probably add a comment as to why we are disallowing this case.
I thought of considering 'stream-changes' parameter here because it
won't make sense to give this parameter without it, however, it seems
that is not necessary but maybe adding a comment
here in that regard would be a good idea.

Should we also consider the case that if the user just passed
skip_empty_streams to true then we should automatically set
skip_empty_xacts to true?

Is there any problem if we don't do this? Actually, adding
dependencies on parameters is confusing so I want to avoid that unless
it is really required.

And we will allow the 'skip_empty_streams' parameter only if
stream-changes' is true.

The reason behind this thought is that if the user doesn't pass any
value for 'skip_empty_xacts' then the default value will be false and
if the user only pass 'skip_empty_streams' to true then we will error
out assuming that skip_empty_xacts is false but skip_empty_streams is
true. So it seems instead of error out we can assume that
skip_empty_streams true mean skip_empty_xacts is also true if nothing
is passed for that.

So, let's see the overall picture here. We can have four options:
skip_empty_xacts = true, skip_empty_stream = false;
skip_empty_xacts = true, skip_empty_stream = true;
skip_empty_xacts = false, skip_empty_stream = false;
skip_empty_xacts = false, skip_empty_stream = true;

I think we want to say the first three could be supported and for the
last one either we can either give an error or make its behavior
similar to option-2? Is this what your understanding as well?

Another thing I am thinking let's just not expose skip_empty_stream to
the user and consider the behavior based on the value of
skip_empty_xacts. Internally, in the code, we can still have different
variables to distinguish between empty_xacts and empty_streams.

--
With Regards,
Amit Kapila.

#20Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#19)
Re: logical streaming of xacts via test_decoding is broken

On Thu, Nov 12, 2020 at 8:45 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Nov 11, 2020 at 7:05 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Wed, Nov 11, 2020 at 6:59 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Nov 11, 2020 at 10:00 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 10, 2020 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Nov 10, 2020 at 2:25 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

You can probably add a comment as to why we are disallowing this case.
I thought of considering 'stream-changes' parameter here because it
won't make sense to give this parameter without it, however, it seems
that is not necessary but maybe adding a comment
here in that regard would be a good idea.

Should we also consider the case that if the user just passed
skip_empty_streams to true then we should automatically set
skip_empty_xacts to true?

Is there any problem if we don't do this? Actually, adding
dependencies on parameters is confusing so I want to avoid that unless
it is really required.

And we will allow the 'skip_empty_streams' parameter only if
stream-changes' is true.

The reason behind this thought is that if the user doesn't pass any
value for 'skip_empty_xacts' then the default value will be false and
if the user only pass 'skip_empty_streams' to true then we will error
out assuming that skip_empty_xacts is false but skip_empty_streams is
true. So it seems instead of error out we can assume that
skip_empty_streams true mean skip_empty_xacts is also true if nothing
is passed for that.

So, let's see the overall picture here. We can have four options:
skip_empty_xacts = true, skip_empty_stream = false;
skip_empty_xacts = true, skip_empty_stream = true;
skip_empty_xacts = false, skip_empty_stream = false;
skip_empty_xacts = false, skip_empty_stream = true;

I think we want to say the first three could be supported and for the
last one either we can either give an error or make its behavior
similar to option-2? Is this what your understanding as well?

For the last one if the user has specifically passed false for the
skip_empty_xacts then error and if the user did not pass anything for
skip_empty_xacts then make its behavior similar to option-2.

Another thing I am thinking let's just not expose skip_empty_stream to
the user and consider the behavior based on the value of
skip_empty_xacts. Internally, in the code, we can still have different
variables to distinguish between empty_xacts and empty_streams.

Yeah, even I think in most of the cases it makes more sense to have
skip_empty_xacts and skip_empty_stream similar values. So better we
don't expose skip_empty_stream. I agree that we need to keep two
variables to track the empty stream and empty xacts.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#21Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#20)
Re: logical streaming of xacts via test_decoding is broken

On Thu, Nov 12, 2020 at 11:29 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Thu, Nov 12, 2020 at 8:45 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

Another thing I am thinking let's just not expose skip_empty_stream to
the user and consider the behavior based on the value of
skip_empty_xacts. Internally, in the code, we can still have different
variables to distinguish between empty_xacts and empty_streams.

Yeah, even I think in most of the cases it makes more sense to have
skip_empty_xacts and skip_empty_stream similar values. So better we
don't expose skip_empty_stream. I agree that we need to keep two
variables to track the empty stream and empty xacts.

So, let's try to do this way and if we see any problems then we can re-think.

--
With Regards,
Amit Kapila.

#22Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#21)
Re: logical streaming of xacts via test_decoding is broken

On Thu, 12 Nov 2020 at 11:37 AM, Amit Kapila <amit.kapila16@gmail.com>
wrote:

On Thu, Nov 12, 2020 at 11:29 AM Dilip Kumar <dilipbalaut@gmail.com>
wrote:

On Thu, Nov 12, 2020 at 8:45 AM Amit Kapila <amit.kapila16@gmail.com>

wrote:

Another thing I am thinking let's just not expose skip_empty_stream to
the user and consider the behavior based on the value of
skip_empty_xacts. Internally, in the code, we can still have different
variables to distinguish between empty_xacts and empty_streams.

Yeah, even I think in most of the cases it makes more sense to have
skip_empty_xacts and skip_empty_stream similar values. So better we
don't expose skip_empty_stream. I agree that we need to keep two
variables to track the empty stream and empty xacts.

So, let's try to do this way and if we see any problems then we can
re-think.

Sounds good to me, I will send the updated patch.

--

Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#23Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#15)
2 attachment(s)
Re: logical streaming of xacts via test_decoding is broken

On Tue, Nov 10, 2020 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Nov 10, 2020 at 2:25 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 10, 2020 at 11:18 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 10, 2020 at 10:52 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

For this case, users can use skip_empty_xacts = true and
skip_empty_streams = false. I am just asking if the user has only used
skip_empty_xacts = true and didn't use the 'skip_empty_streams'
option.

Ok, thanks for the clarification.

I have prepared a patch for the same.

Few comments:
1.
+ else if (strcmp(elem->defname, "skip-empty-streams") == 0)
+ {
+ if (elem->arg == NULL)
+ data->skip_empty_streams = true;
+ else if (!parse_bool(strVal(elem->arg), &data->skip_empty_streams))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ if (!data->skip_empty_xacts && data->skip_empty_streams)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("the skip-empty-streams can not be true if skip-empty-xacts
is false")));
}

You can probably add a comment as to why we are disallowing this case.
I thought of considering 'stream-changes' parameter here because it
won't make sense to give this parameter without it, however, it seems
that is not necessary but maybe adding a comment
here in that regard would be a good idea.

As per our latest discussion, I have removed the extra input parameter
so this comment is not needed now.

2.
pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata =
+ MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+

Shall we free this memory at commit time for the sake of consistency,
otherwise also it would be freed with decoding context?

Done

3. Can you please prepare a separate patch for test case changes so
that it would be easier to verify that it fails without the patch and
passed after the patch?

Done

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachments:

v2-0001-Bug-fix-skip-empty-xacts-in-streaming-mode.patchapplication/octet-stream; name=v2-0001-Bug-fix-skip-empty-xacts-in-streaming-mode.patchDownload
From b3067d6c7ea17688fd9ded25ce5b96cef6efb383 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Thu, 12 Nov 2020 14:59:58 +0530
Subject: [PATCH v2 1/2] Bug fix skip-empty-xacts in streaming mode

In streaming mode the transaction can be decoded in multiple streams
and those streams can be interleaved.  Due to that we can not remember
the transaction's write status in the logical decoding context because
those might get changed due to some other transactions so we need to
keep that in the reorder buffer txn.
---
 contrib/test_decoding/test_decoding.c   | 80 ++++++++++++++++++++++++++-------
 src/include/replication/reorderbuffer.h |  5 +++
 2 files changed, 70 insertions(+), 15 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 8e33614..50d8e24 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -34,10 +34,15 @@ typedef struct
 	bool		include_xids;
 	bool		include_timestamp;
 	bool		skip_empty_xacts;
-	bool		xact_wrote_changes;
 	bool		only_local;
 } TestDecodingData;
 
+typedef struct
+{
+	bool		xact_wrote_changes;
+	bool		stream_wrote_changes;
+} TestDecodingTxnData;
+
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 							  bool is_init);
 static void pg_decode_shutdown(LogicalDecodingContext *ctx);
@@ -255,8 +260,12 @@ static void
 pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata =
+		MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+
+	txndata->xact_wrote_changes = false;
+	txn->output_plugin_private = txndata;
 
-	data->xact_wrote_changes = false;
 	if (data->skip_empty_xacts)
 		return;
 
@@ -280,8 +289,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr commit_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
+	bool	xact_wrote_changes = txndata->xact_wrote_changes;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	pfree(txndata);
+	txn->output_plugin_private = false;
+
+	if (data->skip_empty_xacts && !xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -442,18 +456,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				 Relation relation, ReorderBufferChange *change)
 {
 	TestDecodingData *data;
+	TestDecodingTxnData *txndata;
 	Form_pg_class class_form;
 	TupleDesc	tupdesc;
 	MemoryContext old;
 
 	data = ctx->output_plugin_private;
+	txndata = txn->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 	{
 		pg_output_begin(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = true;
 
 	class_form = RelationGetForm(relation);
 	tupdesc = RelationGetDescr(relation);
@@ -527,17 +543,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				   int nrelations, Relation relations[], ReorderBufferChange *change)
 {
 	TestDecodingData *data;
+	TestDecodingTxnData *txndata;
 	MemoryContext old;
 	int			i;
 
 	data = ctx->output_plugin_private;
+	txndata = txn->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 	{
 		pg_output_begin(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = true;
 
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
@@ -592,10 +610,24 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	data->xact_wrote_changes = false;
+	/*
+	 * If this is the first stream for the txn then allocate the txn plugin
+	 * data and set the xact_wrote_changes to false.
+	 */
+	if (txndata == NULL)
+	{
+		txndata =
+			MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+		txndata->xact_wrote_changes = false;
+		txn->output_plugin_private = txndata;
+	}
+
+	txndata->stream_wrote_changes = false;
 	if (data->skip_empty_xacts)
 		return;
+
 	pg_output_stream_start(ctx, data, txn, true);
 }
 
@@ -615,8 +647,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
 					  ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -633,8 +666,18 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 					   XLogRecPtr abort_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
+	TestDecodingTxnData *txndata = toptxn->output_plugin_private;
+	bool	xact_wrote_changes = txndata->xact_wrote_changes;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (txn->toptxn == NULL)
+	{
+		Assert(txn->output_plugin_private != NULL);
+		pfree(txndata);
+		txn->output_plugin_private = false;
+	}
+
+	if (data->skip_empty_xacts && !xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -651,8 +694,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
 						XLogRecPtr commit_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
+	bool	xact_wrote_changes = txndata->xact_wrote_changes;
+
+	pfree(txndata);
+	txn->output_plugin_private = false;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -681,13 +729,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
 						ReorderBufferChange *change)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
 	/* output stream start if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
 	{
 		pg_output_stream_start(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
 
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
@@ -734,12 +783,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 						  ReorderBufferChange *change)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
 	{
 		pg_output_stream_start(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
 
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index dfdda93..bd9dd7e 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -378,6 +378,11 @@ typedef struct ReorderBufferTXN
 
 	/* If we have detected concurrent abort then ignore future changes. */
 	bool		concurrent_abort;
+
+	/*
+	 * Private data pointer of the output plugin.
+	 */
+	void	   *output_plugin_private;
 } ReorderBufferTXN;
 
 /* so we can define the callbacks used inside struct ReorderBuffer itself */
-- 
1.8.3.1

v2-0002-Test-case-to-test-the-interleaved-empty-transacti.patchapplication/octet-stream; name=v2-0002-Test-case-to-test-the-interleaved-empty-transacti.patchDownload
From a6fc71f6def9adfb56b54c3ac76e153f0b5c99ab Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Thu, 12 Nov 2020 15:00:25 +0530
Subject: [PATCH v2 2/2] Test case to test the interleaved empty transactions

---
 contrib/test_decoding/expected/concurrent_stream.out | 5 +++--
 contrib/test_decoding/specs/concurrent_stream.spec   | 8 ++++++--
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git a/contrib/test_decoding/expected/concurrent_stream.out b/contrib/test_decoding/expected/concurrent_stream.out
index e731d13..6f8b217 100644
--- a/contrib/test_decoding/expected/concurrent_stream.out
+++ b/contrib/test_decoding/expected/concurrent_stream.out
@@ -1,11 +1,12 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 3 sessions
 
-starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s1_commit s1_get_stream_changes
+starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s2_ddl s1_commit s1_get_stream_changes
 step s0_begin: BEGIN;
 step s0_ddl: CREATE TABLE stream_test1(data text);
 step s1_ddl: CREATE TABLE stream_test(data text);
 step s1_begin: BEGIN;
 step s1_toast_insert: INSERT INTO stream_test SELECT large_val();
+step s2_ddl: CREATE TABLE stream_test2(data text);
 step s1_commit: COMMIT;
 step s1_get_stream_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 data           
diff --git a/contrib/test_decoding/specs/concurrent_stream.spec b/contrib/test_decoding/specs/concurrent_stream.spec
index ad9fde9..8d24ca1 100644
--- a/contrib/test_decoding/specs/concurrent_stream.spec
+++ b/contrib/test_decoding/specs/concurrent_stream.spec
@@ -8,7 +8,7 @@ setup
 
   -- consume DDL
   SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-  CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS 'select array_agg(md5(g::text))::text from generate_series(1, 80000) g';
+  CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS 'select array_agg(md5(g::text))::text from generate_series(1, 60000) g';
 }
 
 teardown
@@ -23,6 +23,10 @@ setup { SET synchronous_commit=on; }
 step "s0_begin" { BEGIN; }
 step "s0_ddl"   {CREATE TABLE stream_test1(data text);}
 
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_ddl"   {CREATE TABLE stream_test2(data text);}
+
 # The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to
 # the currently running s0_ddl and we want to test that s0_ddl should not get
 # streamed when user asked to skip-empty-xacts.
@@ -34,4 +38,4 @@ step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();}
 step "s1_commit" { COMMIT; }
 step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');}
 
-permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s1_commit" "s1_get_stream_changes"
+permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s2_ddl" "s1_commit" "s1_get_stream_changes"
-- 
1.8.3.1

#24Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#23)
Re: logical streaming of xacts via test_decoding is broken

On Thu, Nov 12, 2020 at 3:10 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 10, 2020 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Nov 10, 2020 at 2:25 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

3. Can you please prepare a separate patch for test case changes so
that it would be easier to verify that it fails without the patch and
passed after the patch?

Done

Few comments:
=================
1.
   -- consume DDL
   SELECT data FROM pg_logical_slot_get_changes('isolation_slot',
NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-  CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS
'select array_agg(md5(g::text))::text from generate_series(1, 80000)
g';
+  CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS
'select array_agg(md5(g::text))::text from generate_series(1, 60000)
g';
 }

Is there a reason for this change? I think probably here a lesser
number of rows are sufficient to serve the purpose of the test but I
am not sure if it is related to this patch or there is any other
reason behind this change?

2.
+typedef struct
+{
+ bool xact_wrote_changes;
+ bool stream_wrote_changes;
+} TestDecodingTxnData;
+

I think here a comment explaining why we need this as a separate
structure would be better and probably explain why we need two
different members.

3.
pg_decode_commit_txn()
{
..
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ pfree(txndata);
+ txn->output_plugin_private = false;
+

Here, don't we need to set txn->output_plugin_private as NULL as it is
a pointer and we do explicitly test it for being NULL at other places?
Also, change at other places where it is set as false.

4.
@@ -592,10 +610,24 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
     ReorderBufferTXN *txn)
 {
  TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
- data->xact_wrote_changes = false;
+ /*
+ * If this is the first stream for the txn then allocate the txn plugin
+ * data and set the xact_wrote_changes to false.
+ */
+ if (txndata == NULL)
+ {
+ txndata =

As we are explicitly testing for NULL here, isn't it better to
explicitly initialize 'output_plugin_private' with NULL in
ReorderBufferGetTXN?

5.
@@ -633,8 +666,18 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
     XLogRecPtr abort_lsn)
 {
  TestDecodingData *data = ctx->output_plugin_private;
+ ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
+ TestDecodingTxnData *txndata = toptxn->output_plugin_private;
+ bool xact_wrote_changes = txndata->xact_wrote_changes;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (txn->toptxn == NULL)
+ {
+ Assert(txn->output_plugin_private != NULL);
+ pfree(txndata);
+ txn->output_plugin_private = false;
+ }
+

Here, if we are expecting 'output_plugin_private' to be set only for
toptxn then the Assert and reset should happen for toptxn? I find the
changes in this function a bit unclear so probably adding a comment
here could help.

--
With Regards,
Amit Kapila.

#25Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#24)
2 attachment(s)
Re: logical streaming of xacts via test_decoding is broken

On Fri, Nov 13, 2020 at 3:18 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Nov 12, 2020 at 3:10 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 10, 2020 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Nov 10, 2020 at 2:25 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

3. Can you please prepare a separate patch for test case changes so
that it would be easier to verify that it fails without the patch and
passed after the patch?

Done

Few comments:
=================
1.
-- consume DDL
SELECT data FROM pg_logical_slot_get_changes('isolation_slot',
NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-  CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS
'select array_agg(md5(g::text))::text from generate_series(1, 80000)
g';
+  CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS
'select array_agg(md5(g::text))::text from generate_series(1, 60000)
g';
}

Is there a reason for this change? I think probably here a lesser
number of rows are sufficient to serve the purpose of the test but I
am not sure if it is related to this patch or there is any other
reason behind this change?

I think I changed for some experiment and got included in the patch so
reverted this.

2.
+typedef struct
+{
+ bool xact_wrote_changes;
+ bool stream_wrote_changes;
+} TestDecodingTxnData;
+

I think here a comment explaining why we need this as a separate
structure would be better and probably explain why we need two
different members.

Done

3.
pg_decode_commit_txn()
{
..
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ pfree(txndata);
+ txn->output_plugin_private = false;
+

Here, don't we need to set txn->output_plugin_private as NULL as it is
a pointer and we do explicitly test it for being NULL at other places?
Also, change at other places where it is set as false.

Fixed

4.
@@ -592,10 +610,24 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
- data->xact_wrote_changes = false;
+ /*
+ * If this is the first stream for the txn then allocate the txn plugin
+ * data and set the xact_wrote_changes to false.
+ */
+ if (txndata == NULL)
+ {
+ txndata =

As we are explicitly testing for NULL here, isn't it better to
explicitly initialize 'output_plugin_private' with NULL in
ReorderBufferGetTXN?

Done

5.
@@ -633,8 +666,18 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
XLogRecPtr abort_lsn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
+ TestDecodingTxnData *txndata = toptxn->output_plugin_private;
+ bool xact_wrote_changes = txndata->xact_wrote_changes;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (txn->toptxn == NULL)
+ {
+ Assert(txn->output_plugin_private != NULL);
+ pfree(txndata);
+ txn->output_plugin_private = false;
+ }
+

Here, if we are expecting 'output_plugin_private' to be set only for
toptxn then the Assert and reset should happen for toptxn? I find the
changes in this function a bit unclear so probably adding a comment
here could help.

I have added the comments.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachments:

v3-0002-Test-case-to-test-the-interleaved-empty-transacti.patchtext/x-patch; charset=US-ASCII; name=v3-0002-Test-case-to-test-the-interleaved-empty-transacti.patchDownload
From 037ad84600e308c03fb71d21c2550deccb0bc2aa Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Thu, 12 Nov 2020 15:00:25 +0530
Subject: [PATCH v3 2/2] Test case to test the interleaved empty transactions

---
 contrib/test_decoding/expected/concurrent_stream.out | 5 +++--
 contrib/test_decoding/specs/concurrent_stream.spec   | 6 +++++-
 2 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/contrib/test_decoding/expected/concurrent_stream.out b/contrib/test_decoding/expected/concurrent_stream.out
index e731d13..6f8b217 100644
--- a/contrib/test_decoding/expected/concurrent_stream.out
+++ b/contrib/test_decoding/expected/concurrent_stream.out
@@ -1,11 +1,12 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 3 sessions
 
-starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s1_commit s1_get_stream_changes
+starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s2_ddl s1_commit s1_get_stream_changes
 step s0_begin: BEGIN;
 step s0_ddl: CREATE TABLE stream_test1(data text);
 step s1_ddl: CREATE TABLE stream_test(data text);
 step s1_begin: BEGIN;
 step s1_toast_insert: INSERT INTO stream_test SELECT large_val();
+step s2_ddl: CREATE TABLE stream_test2(data text);
 step s1_commit: COMMIT;
 step s1_get_stream_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 data           
diff --git a/contrib/test_decoding/specs/concurrent_stream.spec b/contrib/test_decoding/specs/concurrent_stream.spec
index ad9fde9..f82e4d6 100644
--- a/contrib/test_decoding/specs/concurrent_stream.spec
+++ b/contrib/test_decoding/specs/concurrent_stream.spec
@@ -23,6 +23,10 @@ setup { SET synchronous_commit=on; }
 step "s0_begin" { BEGIN; }
 step "s0_ddl"   {CREATE TABLE stream_test1(data text);}
 
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_ddl"   {CREATE TABLE stream_test2(data text);}
+
 # The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to
 # the currently running s0_ddl and we want to test that s0_ddl should not get
 # streamed when user asked to skip-empty-xacts.
@@ -34,4 +38,4 @@ step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();}
 step "s1_commit" { COMMIT; }
 step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');}
 
-permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s1_commit" "s1_get_stream_changes"
+permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s2_ddl" "s1_commit" "s1_get_stream_changes"
-- 
1.8.3.1

v3-0001-Bug-fix-skip-empty-xacts-in-streaming-mode.patchtext/x-patch; charset=US-ASCII; name=v3-0001-Bug-fix-skip-empty-xacts-in-streaming-mode.patchDownload
From 6f6f3638a6c6a41370c43e77b47f5390f13c453b Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Thu, 12 Nov 2020 14:59:58 +0530
Subject: [PATCH v3 1/2] Bug fix skip-empty-xacts in streaming mode

In streaming mode the transaction can be decoded in multiple streams
and those streams can be interleaved.  Due to that we can not remember
the transaction's write status in the logical decoding context because
those might get changed due to some other transactions so we need to
keep that in the reorder buffer txn.
---
 contrib/test_decoding/test_decoding.c           | 95 +++++++++++++++++++++----
 src/backend/replication/logical/reorderbuffer.c |  1 +
 src/include/replication/reorderbuffer.h         |  5 ++
 3 files changed, 86 insertions(+), 15 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 8e33614..62b3855 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -34,10 +34,24 @@ typedef struct
 	bool		include_xids;
 	bool		include_timestamp;
 	bool		skip_empty_xacts;
-	bool		xact_wrote_changes;
 	bool		only_local;
 } TestDecodingData;
 
+/*
+ * Maintain a per transaction level variable to track whether the transaction
+ * has wrote any changes or not to identify whether it is an empty transaction
+ * or not.  In streaming mode the transaction can be decoded in streams so
+ * along with maintaining whether the transaction has written any changes or
+ * not we also need to track whether the current stream has written any changes
+ * or not so that if user has requested to skip the empty transactions we can
+ * skip the empty streams even though the transaction has written some changes.
+ */
+typedef struct
+{
+	bool		xact_wrote_changes;
+	bool		stream_wrote_changes;
+} TestDecodingTxnData;
+
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 							  bool is_init);
 static void pg_decode_shutdown(LogicalDecodingContext *ctx);
@@ -255,8 +269,12 @@ static void
 pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata =
+		MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+
+	txndata->xact_wrote_changes = false;
+	txn->output_plugin_private = txndata;
 
-	data->xact_wrote_changes = false;
 	if (data->skip_empty_xacts)
 		return;
 
@@ -280,8 +298,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr commit_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
+	bool	xact_wrote_changes = txndata->xact_wrote_changes;
+
+	pfree(txndata);
+	txn->output_plugin_private = NULL;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -442,18 +465,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				 Relation relation, ReorderBufferChange *change)
 {
 	TestDecodingData *data;
+	TestDecodingTxnData *txndata;
 	Form_pg_class class_form;
 	TupleDesc	tupdesc;
 	MemoryContext old;
 
 	data = ctx->output_plugin_private;
+	txndata = txn->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 	{
 		pg_output_begin(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = true;
 
 	class_form = RelationGetForm(relation);
 	tupdesc = RelationGetDescr(relation);
@@ -527,17 +552,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				   int nrelations, Relation relations[], ReorderBufferChange *change)
 {
 	TestDecodingData *data;
+	TestDecodingTxnData *txndata;
 	MemoryContext old;
 	int			i;
 
 	data = ctx->output_plugin_private;
+	txndata = txn->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 	{
 		pg_output_begin(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = true;
 
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
@@ -592,10 +619,24 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	data->xact_wrote_changes = false;
+	/*
+	 * If this is the first stream for the txn then allocate the txn plugin
+	 * data and set the xact_wrote_changes to false.
+	 */
+	if (txndata == NULL)
+	{
+		txndata =
+			MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+		txndata->xact_wrote_changes = false;
+		txn->output_plugin_private = txndata;
+	}
+
+	txndata->stream_wrote_changes = false;
 	if (data->skip_empty_xacts)
 		return;
+
 	pg_output_stream_start(ctx, data, txn, true);
 }
 
@@ -615,8 +656,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
 					  ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -634,7 +676,23 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	/*
+	 * stream abort can be sent for an individial subtransaction but we
+	 * maintain the output_plugin_private only under the toptxn so if this
+	 * is not the toptxn then fetch the toptxn.
+	 */
+	ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
+	TestDecodingTxnData *txndata = toptxn->output_plugin_private;
+	bool	xact_wrote_changes = txndata->xact_wrote_changes;
+
+	if (txn->toptxn == NULL)
+	{
+		Assert(txn->output_plugin_private != NULL);
+		pfree(txndata);
+		txn->output_plugin_private = NULL;
+	}
+
+	if (data->skip_empty_xacts && !xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -651,8 +709,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
 						XLogRecPtr commit_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
+	bool	xact_wrote_changes = txndata->xact_wrote_changes;
+
+	pfree(txndata);
+	txn->output_plugin_private = NULL;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -681,13 +744,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
 						ReorderBufferChange *change)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
 	/* output stream start if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
 	{
 		pg_output_stream_start(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
 
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
@@ -734,12 +798,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 						  ReorderBufferChange *change)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
 	{
 		pg_output_stream_start(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
 
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c1bd680..301baff 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -402,6 +402,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
 
 	/* InvalidCommandId is not zero, so set it explicitly */
 	txn->command_id = InvalidCommandId;
+	txn->output_plugin_private = NULL;
 
 	return txn;
 }
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index dfdda93..bd9dd7e 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -378,6 +378,11 @@ typedef struct ReorderBufferTXN
 
 	/* If we have detected concurrent abort then ignore future changes. */
 	bool		concurrent_abort;
+
+	/*
+	 * Private data pointer of the output plugin.
+	 */
+	void	   *output_plugin_private;
 } ReorderBufferTXN;
 
 /* so we can define the callbacks used inside struct ReorderBuffer itself */
-- 
1.8.3.1

#26Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#25)
Re: logical streaming of xacts via test_decoding is broken

On Sun, Nov 15, 2020 at 11:34 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

Pushed after minor changes.

--
With Regards,
Amit Kapila.