Open a streamed block for transactional messages during decoding

Started by Zhijie Hou (Fujitsu)about 2 years ago6 messages
#1Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
1 attachment(s)

Hi,

While reviewing the test_decoding code, I noticed that when skip_empty_xacts
option is specified, it doesn't open the streaming block( e.g.
pg_output_stream_start) before streaming the transactional MESSAGE even if it's
the first change in a streaming block.

It looks inconsistent with what we do when streaming DML
changes(e.g. pg_decode_stream_change()).

Here is a small patch to open the stream block in this case.

Best Regards,
Hou Zhijie

Attachments:

0001-Open-a-streamed-block-for-transactional-messages-dur.patchapplication/octet-stream; name=0001-Open-a-streamed-block-for-transactional-messages-dur.patchDownload
From 4bc2f8a0f26f28f5c374cfd90fd6abf0b44c8b5c Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Mon, 23 Oct 2023 20:45:03 +0800
Subject: [PATCH] Open a streamed block for transactional messages during
 decoding.

In test_decoding module, when skip_empty_xacts option was specified, open a
streamed block when streaming transactional messages. This makes the handling
of transactional messages stream consistent irrespective of whether
skip_empty_xacts option was specified.
---
 contrib/test_decoding/expected/stream.out        |  5 ++++-
 .../test_decoding/expected/twophase_stream.out   | 10 ++++++++--
 contrib/test_decoding/test_decoding.c            | 16 ++++++++++++++++
 3 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
index 0f21dcb8e0..4ab2d47bf8 100644
--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -29,7 +29,10 @@ COMMIT;
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                            data                           
 ----------------------------------------------------------
+ opening a streamed block for transaction
  streaming message: transactional: 1 prefix: test, sz: 50
+ closing a streamed block for transaction
+ aborting streamed (sub)transaction
  opening a streamed block for transaction
  streaming change for transaction
  streaming change for transaction
@@ -53,7 +56,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
  streaming change for transaction
  closing a streamed block for transaction
  committing streamed transaction
-(24 rows)
+(27 rows)
 
 -- streaming test for toast changes
 ALTER TABLE stream_test ALTER COLUMN data set storage external;
diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out
index b08bb0e573..a3574f73c8 100644
--- a/contrib/test_decoding/expected/twophase_stream.out
+++ b/contrib/test_decoding/expected/twophase_stream.out
@@ -31,7 +31,10 @@ PREPARE TRANSACTION 'test1';
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                            data                           
 ----------------------------------------------------------
+ opening a streamed block for transaction
  streaming message: transactional: 1 prefix: test, sz: 50
+ closing a streamed block for transaction
+ aborting streamed (sub)transaction
  opening a streamed block for transaction
  streaming change for transaction
  streaming change for transaction
@@ -55,7 +58,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
  streaming change for transaction
  closing a streamed block for transaction
  preparing streamed transaction 'test1'
-(24 rows)
+(27 rows)
 
 COMMIT PREPARED 'test1';
 --should show the COMMIT PREPARED and the other changes in the transaction
@@ -84,8 +87,11 @@ PREPARE TRANSACTION 'test1_nodecode';
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                            data                           
 ----------------------------------------------------------
+ opening a streamed block for transaction
  streaming message: transactional: 1 prefix: test, sz: 50
-(1 row)
+ closing a streamed block for transaction
+ aborting streamed (sub)transaction
+(4 rows)
 
 COMMIT PREPARED 'test1_nodecode';
 -- should show the inserts but not show a COMMIT PREPARED but a COMMIT
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ab870d9e4d..da57aa4b36 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -944,6 +944,22 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
 						 ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
 						 const char *prefix, Size sz, const char *message)
 {
+	/*
+	 * Output stream start if we haven't yet, but only for the transactional
+	 * case.
+	 */
+	if (transactional)
+	{
+		TestDecodingData *data = ctx->output_plugin_private;
+		TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+		if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
+		{
+			pg_output_stream_start(ctx, data, txn, false);
+		}
+		txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 
 	if (transactional)
-- 
2.30.0.windows.2

#2Amit Kapila
amit.kapila16@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#1)
Re: Open a streamed block for transactional messages during decoding

On Tue, Oct 24, 2023 at 5:27 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

While reviewing the test_decoding code, I noticed that when skip_empty_xacts
option is specified, it doesn't open the streaming block( e.g.
pg_output_stream_start) before streaming the transactional MESSAGE even if it's
the first change in a streaming block.

It looks inconsistent with what we do when streaming DML
changes(e.g. pg_decode_stream_change()).

Here is a small patch to open the stream block in this case.

The change looks good to me though I haven't tested it yet. BTW, can
we change the comment: "Output stream start if we haven't yet, but
only for the transactional case." to "Output stream start if we
haven't yet for transactional messages"?

I think we should backpatch this fix. What do you think?

--
With Regards,
Amit Kapila.

#3Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: Amit Kapila (#2)
1 attachment(s)
RE: Open a streamed block for transactional messages during decoding

On Thursday, October 26, 2023 12:42 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Oct 24, 2023 at 5:27 PM Zhijie Hou (Fujitsu) <houzj.fnst@fujitsu.com>
wrote:

While reviewing the test_decoding code, I noticed that when
skip_empty_xacts option is specified, it doesn't open the streaming

block( e.g.

pg_output_stream_start) before streaming the transactional MESSAGE
even if it's the first change in a streaming block.

It looks inconsistent with what we do when streaming DML changes(e.g.
pg_decode_stream_change()).

Here is a small patch to open the stream block in this case.

The change looks good to me though I haven't tested it yet. BTW, can we
change the comment: "Output stream start if we haven't yet, but only for the
transactional case." to "Output stream start if we haven't yet for transactional
messages"?

Thanks for the review and I changed this as suggested.

I think we should backpatch this fix. What do you think?

I think maybe we can improve the code only for HEAD, as skip_empty_xacts is
primarily used to have consistent test results across different runs and this
patch won't help with that. And I saw in 26dd028, we didn't backpatch for the
same reason.

Best Regards,
Hou zj

Attachments:

v2-0001-Open-a-streamed-block-for-transactional-messages-.patchapplication/octet-stream; name=v2-0001-Open-a-streamed-block-for-transactional-messages-.patchDownload
From f988b0cb821e523d8737476294f27cb4f9b56731 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Mon, 23 Oct 2023 20:45:03 +0800
Subject: [PATCH v2] Open a streamed block for transactional messages during
 decoding.

In test_decoding module, when skip_empty_xacts option was specified, open a
streamed block when streaming transactional messages. This makes the handling
of transactional messages stream consistent irrespective of whether
skip_empty_xacts option was specified.
---
 contrib/test_decoding/expected/stream.out          |  5 ++++-
 contrib/test_decoding/expected/twophase_stream.out | 10 ++++++++--
 contrib/test_decoding/test_decoding.c              | 13 +++++++++++++
 3 files changed, 25 insertions(+), 3 deletions(-)

diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
index 0f21dcb8e0..4ab2d47bf8 100644
--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -29,7 +29,10 @@ COMMIT;
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                            data                           
 ----------------------------------------------------------
+ opening a streamed block for transaction
  streaming message: transactional: 1 prefix: test, sz: 50
+ closing a streamed block for transaction
+ aborting streamed (sub)transaction
  opening a streamed block for transaction
  streaming change for transaction
  streaming change for transaction
@@ -53,7 +56,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
  streaming change for transaction
  closing a streamed block for transaction
  committing streamed transaction
-(24 rows)
+(27 rows)
 
 -- streaming test for toast changes
 ALTER TABLE stream_test ALTER COLUMN data set storage external;
diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out
index b08bb0e573..a3574f73c8 100644
--- a/contrib/test_decoding/expected/twophase_stream.out
+++ b/contrib/test_decoding/expected/twophase_stream.out
@@ -31,7 +31,10 @@ PREPARE TRANSACTION 'test1';
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                            data                           
 ----------------------------------------------------------
+ opening a streamed block for transaction
  streaming message: transactional: 1 prefix: test, sz: 50
+ closing a streamed block for transaction
+ aborting streamed (sub)transaction
  opening a streamed block for transaction
  streaming change for transaction
  streaming change for transaction
@@ -55,7 +58,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
  streaming change for transaction
  closing a streamed block for transaction
  preparing streamed transaction 'test1'
-(24 rows)
+(27 rows)
 
 COMMIT PREPARED 'test1';
 --should show the COMMIT PREPARED and the other changes in the transaction
@@ -84,8 +87,11 @@ PREPARE TRANSACTION 'test1_nodecode';
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                            data                           
 ----------------------------------------------------------
+ opening a streamed block for transaction
  streaming message: transactional: 1 prefix: test, sz: 50
-(1 row)
+ closing a streamed block for transaction
+ aborting streamed (sub)transaction
+(4 rows)
 
 COMMIT PREPARED 'test1_nodecode';
 -- should show the inserts but not show a COMMIT PREPARED but a COMMIT
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ab870d9e4d..288fd0bb4a 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -944,6 +944,19 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
 						 ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
 						 const char *prefix, Size sz, const char *message)
 {
+	/* Output stream start if we haven't yet for transactional messages. */
+	if (transactional)
+	{
+		TestDecodingData *data = ctx->output_plugin_private;
+		TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+		if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
+		{
+			pg_output_stream_start(ctx, data, txn, false);
+		}
+		txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 
 	if (transactional)
-- 
2.30.0.windows.2

#4Amit Kapila
amit.kapila16@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#3)
Re: Open a streamed block for transactional messages during decoding

On Thu, Oct 26, 2023 at 2:01 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

On Thursday, October 26, 2023 12:42 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Oct 24, 2023 at 5:27 PM Zhijie Hou (Fujitsu) <houzj.fnst@fujitsu.com>
wrote:

While reviewing the test_decoding code, I noticed that when
skip_empty_xacts option is specified, it doesn't open the streaming

block( e.g.

pg_output_stream_start) before streaming the transactional MESSAGE
even if it's the first change in a streaming block.

It looks inconsistent with what we do when streaming DML changes(e.g.
pg_decode_stream_change()).

Here is a small patch to open the stream block in this case.

The change looks good to me though I haven't tested it yet. BTW, can we
change the comment: "Output stream start if we haven't yet, but only for the
transactional case." to "Output stream start if we haven't yet for transactional
messages"?

Thanks for the review and I changed this as suggested.

--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -29,7 +29,10 @@ COMMIT;
 SELECT data FROM pg_logical_slot_get_changes('regression_slot',
NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1',
'stream-changes', '1');
                            data
 ----------------------------------------------------------
+ opening a streamed block for transaction
  streaming message: transactional: 1 prefix: test, sz: 50
+ closing a streamed block for transaction
+ aborting streamed (sub)transaction

I was analyzing the reason for the additional message: "aborting
streamed (sub)transaction" in the above test and it seems to be due to
the below check in the function pg_decode_stream_abort():

if (data->skip_empty_xacts && !xact_wrote_changes)
return;

Before the patch, we won't be setting the 'xact_wrote_changes' flag in
txndata which is fixed now. So, this looks okay to me. However, I have
another observation in this code which is that for aborts or
subtransactions, we are not checking the flag 'stream_wrote_changes',
so we may end up emitting the abort message even when no actual change
has been streamed. I haven't tried to generate a test to verify this
observation, so I could be wrong as well but it is worth analyzing
such cases.

--
With Regards,
Amit Kapila.

#5Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: Amit Kapila (#4)
RE: Open a streamed block for transactional messages during decoding

On Monday, October 30, 2023 12:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Oct 26, 2023 at 2:01 PM Zhijie Hou (Fujitsu) <houzj.fnst@fujitsu.com>
wrote:

On Thursday, October 26, 2023 12:42 PM Amit Kapila

<amit.kapila16@gmail.com> wrote:

On Tue, Oct 24, 2023 at 5:27 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com>
wrote:

While reviewing the test_decoding code, I noticed that when
skip_empty_xacts option is specified, it doesn't open the
streaming

block( e.g.

pg_output_stream_start) before streaming the transactional MESSAGE
even if it's the first change in a streaming block.

It looks inconsistent with what we do when streaming DML changes(e.g.
pg_decode_stream_change()).

Here is a small patch to open the stream block in this case.

The change looks good to me though I haven't tested it yet. BTW, can
we change the comment: "Output stream start if we haven't yet, but
only for the transactional case." to "Output stream start if we
haven't yet for transactional messages"?

Thanks for the review and I changed this as suggested.

--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -29,7 +29,10 @@ COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot',
NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
+ opening a streamed block for transaction
streaming message: transactional: 1 prefix: test, sz: 50
+ closing a streamed block for transaction aborting streamed
+ (sub)transaction

I was analyzing the reason for the additional message: "aborting streamed
(sub)transaction" in the above test and it seems to be due to the below check in
the function pg_decode_stream_abort():

if (data->skip_empty_xacts && !xact_wrote_changes) return;

Before the patch, we won't be setting the 'xact_wrote_changes' flag in txndata
which is fixed now. So, this looks okay to me. However, I have another
observation in this code which is that for aborts or subtransactions, we are not
checking the flag 'stream_wrote_changes', so we may end up emitting the
abort message even when no actual change has been streamed. I haven't tried
to generate a test to verify this observation, so I could be wrong as well but it is
worth analyzing such cases.

I have confirmed that the mentioned case is possible(steps[1]SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); BEGIN; savepoint p1; CREATE TABLE test(a int); INSERT INTO test VALUES(1); savepoint p2; CREATE TABLE test2(a int); ROLLBACK TO SAVEPOINT p2; COMMIT;): the
sub-transaction doesn't output any data, but the stream abort for this
sub-transaction will still be sent.

But I think this may not be a problemic behavior, as even the pgoutput can
behave similarly, e.g. If all the changes are filtered by row filter or table
filter, then the stream abort will still be sent. The subscriber will skip
handling the STREAM ABORT if the aborted txn was not applied.

And if we want to fix this, in output plugin, we need to record if we have sent
any changes for each sub-transaction so that we can decide whether to send the
following stream abort or not. We cannot use 'stream_wrote_changes' because
it's a per streamed block flag and there could be serval streamed blocks for one
sub-txn. It looks a bit complicate to me.

[1]: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); BEGIN; savepoint p1; CREATE TABLE test(a int); INSERT INTO test VALUES(1); savepoint p2; CREATE TABLE test2(a int); ROLLBACK TO SAVEPOINT p2; COMMIT;
SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
BEGIN;
savepoint p1;
CREATE TABLE test(a int);
INSERT INTO test VALUES(1);
savepoint p2;
CREATE TABLE test2(a int);
ROLLBACK TO SAVEPOINT p2;
COMMIT;

SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '1', 'stream-changes', '1');

data
--------------------------------------------------
opening a streamed block for transaction TXN 734
streaming change for TXN 734
closing a streamed block for transaction TXN 734
aborting streamed (sub)transaction TXN 736
committing streamed transaction TXN 734

Best Regards,
Hou zj

#6Amit Kapila
amit.kapila16@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#5)
Re: Open a streamed block for transactional messages during decoding

On Mon, Oct 30, 2023 at 2:17 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

On Monday, October 30, 2023 12:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Oct 26, 2023 at 2:01 PM Zhijie Hou (Fujitsu) <houzj.fnst@fujitsu.com>
wrote:

On Thursday, October 26, 2023 12:42 PM Amit Kapila

<amit.kapila16@gmail.com> wrote:

On Tue, Oct 24, 2023 at 5:27 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com>
wrote:

While reviewing the test_decoding code, I noticed that when
skip_empty_xacts option is specified, it doesn't open the
streaming

block( e.g.

pg_output_stream_start) before streaming the transactional MESSAGE
even if it's the first change in a streaming block.

It looks inconsistent with what we do when streaming DML changes(e.g.
pg_decode_stream_change()).

Here is a small patch to open the stream block in this case.

The change looks good to me though I haven't tested it yet. BTW, can
we change the comment: "Output stream start if we haven't yet, but
only for the transactional case." to "Output stream start if we
haven't yet for transactional messages"?

Thanks for the review and I changed this as suggested.

--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -29,7 +29,10 @@ COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot',
NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
+ opening a streamed block for transaction
streaming message: transactional: 1 prefix: test, sz: 50
+ closing a streamed block for transaction aborting streamed
+ (sub)transaction

I was analyzing the reason for the additional message: "aborting streamed
(sub)transaction" in the above test and it seems to be due to the below check in
the function pg_decode_stream_abort():

if (data->skip_empty_xacts && !xact_wrote_changes) return;

Before the patch, we won't be setting the 'xact_wrote_changes' flag in txndata
which is fixed now. So, this looks okay to me. However, I have another
observation in this code which is that for aborts or subtransactions, we are not
checking the flag 'stream_wrote_changes', so we may end up emitting the
abort message even when no actual change has been streamed. I haven't tried
to generate a test to verify this observation, so I could be wrong as well but it is
worth analyzing such cases.

I have confirmed that the mentioned case is possible(steps[1]): the
sub-transaction doesn't output any data, but the stream abort for this
sub-transaction will still be sent.

But I think this may not be a problemic behavior, as even the pgoutput can
behave similarly, e.g. If all the changes are filtered by row filter or table
filter, then the stream abort will still be sent. The subscriber will skip
handling the STREAM ABORT if the aborted txn was not applied.

And if we want to fix this, in output plugin, we need to record if we have sent
any changes for each sub-transaction so that we can decide whether to send the
following stream abort or not. We cannot use 'stream_wrote_changes' because
it's a per streamed block flag and there could be serval streamed blocks for one
sub-txn. It looks a bit complicate to me.

I agree with your analysis. So, pushed the existing patch. BTW, sorry,
by mistake I used Peter's name as author.

--
With Regards,
Amit Kapila.