Bug in logical decoding of in-progress transactions
Hi,
There is a recent build farm failure [1]https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=skink&dt=2020-09-09+03%3A42%3A19 in one of the test_decoding
tests as pointed by Tom Lane [2]/messages/by-id/118303.1599691636@sss.pgh.pa.us. The failure report is shown below:
@@ -71,6 +71,8 @@
data
------------------------------------------
opening a streamed block for transaction
+ closing a streamed block for transaction
+ opening a streamed block for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
@@ -83,7 +85,7 @@
streaming change for transaction
closing a streamed block for transaction
committing streamed transaction
-(13 rows)
+(15 rows)
Here, the symptoms are quite similar to what we have fixed in commit
82a0ba7707 which is that an extra empty transaction is being decoded
in the test. It can happen even if have instructed the test to 'skip
empty xacts' for streaming transactions because the test_decoding
plugin APIs (related to streaming changes for in-progress xacts) makes
no effort to skip such empty xacts. It was kept intentionally like
that under the assumption that we would never try to stream empty
xacts but on closer inspection of the code, it seems to me that
assumption was not correct. Basically, we can pick to stream a
transaction that has change messages for
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT and we don't send such
messages to downstream rather they are just to update the internal
state. So, in this particular failure, it is possible that autovacuum
transaction has got such a change message added by one of the other
committed xact and on trying to stream it we get such additional
messages. The fix is to skip empty xacts when indicated by the user in
streaming APIs of test_decoding.
Thoughts?
[1]: https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=skink&dt=2020-09-09+03%3A42%3A19
[2]: /messages/by-id/118303.1599691636@sss.pgh.pa.us
--
With Regards,
Amit Kapila.
On Thu, Sep 10, 2020 at 11:29 AM Amit Kapila <amit.kapila16@gmail.com>
wrote:
Hi,
There is a recent build farm failure [1] in one of the test_decoding
tests as pointed by Tom Lane [2]. The failure report is shown below:@@ -71,6 +71,8 @@ data ------------------------------------------ opening a streamed block for transaction + closing a streamed block for transaction + opening a streamed block for transaction streaming change for transaction streaming change for transaction streaming change for transaction @@ -83,7 +85,7 @@ streaming change for transaction closing a streamed block for transaction committing streamed transaction -(13 rows) +(15 rows)Here, the symptoms are quite similar to what we have fixed in commit
82a0ba7707 which is that an extra empty transaction is being decoded
in the test. It can happen even if have instructed the test to 'skip
empty xacts' for streaming transactions because the test_decoding
plugin APIs (related to streaming changes for in-progress xacts) makes
no effort to skip such empty xacts. It was kept intentionally like
that under the assumption that we would never try to stream empty
xacts but on closer inspection of the code, it seems to me that
assumption was not correct. Basically, we can pick to stream a
transaction that has change messages for
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT and we don't send such
messages to downstream rather they are just to update the internal
state. So, in this particular failure, it is possible that autovacuum
transaction has got such a change message added by one of the other
committed xact and on trying to stream it we get such additional
messages. The fix is to skip empty xacts when indicated by the user in
streaming APIs of test_decoding.Thoughts?
Yeah, that's an issue.
[1] -
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=skink&dt=2020-09-09+03%3A42%3A19
[2] -
/messages/by-id/118303.1599691636@sss.pgh.pa.us
I have written a test case to reproduce the same. I have also prepared a
patch to skip the empty transaction. And after that, the issue has been
fixed. But the extra side effect will be that it would skip any empty
stream even if the transaction is not empty. As such I don't see any
problem with that but this is not what the user has asked for.
logical_decoding_work_mem=64kB
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot',
'test_decoding');
CREATE TABLE stream_test(data text);
-- consume DDL
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL,
'include-xids', '0', 'skip-empty-xacts', '1');
CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS 'select
array_agg(md5(g::text))::text from generate_series(1, 80000) g';
--session1
BEGIN;
CREATE TABLE stream_test1(data text);
--session2
BEGIN;
CREATE TABLE stream_test2(data text);
COMMIT;
--session3
BEGIN;
INSERT INTO stream_test SELECT large_val();
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL,
'include-xids', '1', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
--------------------------------------------------
opening a streamed block for transaction TXN 508
closing a streamed block for transaction TXN 508
opening a streamed block for transaction TXN 510
streaming change for TXN 510
closing a streamed block for transaction TXN 510
(5 rows)
After patch
data
--------------------------------------------------
opening a streamed block for transaction TXN 510
streaming change for TXN 510
closing a streamed block for transaction TXN 510
--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
Attachments:
v1-0001-Skip-printing-empty-stream-in-test-decoding.patchapplication/octet-stream; name=v1-0001-Skip-printing-empty-stream-in-test-decoding.patchDownload
From b433eb1312b86bd15ce5a0727db7d571c20b3b4f Mon Sep 17 00:00:00 2001
From: dilipkumar <dilipbalaut@gmail.com>
Date: Thu, 10 Sep 2020 11:39:27 +0530
Subject: [PATCH v1] Skip printing empty stream in test decoding
---
contrib/test_decoding/test_decoding.c | 34 +++++++++++++++++++++++++++
1 file changed, 34 insertions(+)
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 34745150e9..39e29e13d6 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -64,6 +64,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
Size sz, const char *message);
static void pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
+static void pg_output_stream_start(LogicalDecodingContext *ctx,
+ TestDecodingData *data,
+ ReorderBufferTXN *txn,
+ bool last_write);
static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
@@ -593,6 +597,15 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ data->xact_wrote_changes = false;
+ if (data->skip_empty_xacts)
+ return;
+ pg_output_stream_start(ctx, data, txn, true);
+}
+
+static void
+pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
+{
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
@@ -611,6 +624,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
@@ -630,6 +646,9 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
@@ -649,6 +668,9 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
@@ -676,6 +698,13 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ /* output stream start if we haven't yet */
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ {
+ pg_output_stream_start(ctx, data, txn, false);
+ }
+ data->xact_wrote_changes = true;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
@@ -722,6 +751,11 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ {
+ pg_output_stream_start(ctx, data, txn, false);
+ }
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
--
2.23.0
On Thu, Sep 10, 2020 at 11:42 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
On Thu, Sep 10, 2020 at 11:29 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
Hi,
There is a recent build farm failure [1] in one of the test_decoding
tests as pointed by Tom Lane [2]. The failure report is shown below:@@ -71,6 +71,8 @@ data ------------------------------------------ opening a streamed block for transaction + closing a streamed block for transaction + opening a streamed block for transaction streaming change for transaction streaming change for transaction streaming change for transaction @@ -83,7 +85,7 @@ streaming change for transaction closing a streamed block for transaction committing streamed transaction -(13 rows) +(15 rows)Here, the symptoms are quite similar to what we have fixed in commit
82a0ba7707 which is that an extra empty transaction is being decoded
in the test. It can happen even if have instructed the test to 'skip
empty xacts' for streaming transactions because the test_decoding
plugin APIs (related to streaming changes for in-progress xacts) makes
no effort to skip such empty xacts. It was kept intentionally like
that under the assumption that we would never try to stream empty
xacts but on closer inspection of the code, it seems to me that
assumption was not correct. Basically, we can pick to stream a
transaction that has change messages for
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT and we don't send such
messages to downstream rather they are just to update the internal
state. So, in this particular failure, it is possible that autovacuum
transaction has got such a change message added by one of the other
committed xact and on trying to stream it we get such additional
messages. The fix is to skip empty xacts when indicated by the user in
streaming APIs of test_decoding.Thoughts?
Yeah, that's an issue.
[1] - https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=skink&dt=2020-09-09+03%3A42%3A19
[2] - /messages/by-id/118303.1599691636@sss.pgh.pa.usI have written a test case to reproduce the same. I have also prepared a patch to skip the empty transaction. And after that, the issue has been fixed. But the extra side effect will be that it would skip any empty stream even if the transaction is not empty. As such I don't see any problem with that but this is not what the user has asked for.
Isn't that true for non-streaming xacts as well? Basically
skip-empty-xacts option indicates that if there is no change for
'tuple' or 'message', we skip it.
--
With Regards,
Amit Kapila.
On Thu, Sep 10, 2020 at 11:47 AM Amit Kapila <amit.kapila16@gmail.com>
wrote:
On Thu, Sep 10, 2020 at 11:42 AM Dilip Kumar <dilipbalaut@gmail.com>
wrote:On Thu, Sep 10, 2020 at 11:29 AM Amit Kapila <amit.kapila16@gmail.com>
wrote:
Hi,
There is a recent build farm failure [1] in one of the test_decoding
tests as pointed by Tom Lane [2]. The failure report is shown below:@@ -71,6 +71,8 @@ data ------------------------------------------ opening a streamed block for transaction + closing a streamed block for transaction + opening a streamed block for transaction streaming change for transaction streaming change for transaction streaming change for transaction @@ -83,7 +85,7 @@ streaming change for transaction closing a streamed block for transaction committing streamed transaction -(13 rows) +(15 rows)Here, the symptoms are quite similar to what we have fixed in commit
82a0ba7707 which is that an extra empty transaction is being decoded
in the test. It can happen even if have instructed the test to 'skip
empty xacts' for streaming transactions because the test_decoding
plugin APIs (related to streaming changes for in-progress xacts) makes
no effort to skip such empty xacts. It was kept intentionally like
that under the assumption that we would never try to stream empty
xacts but on closer inspection of the code, it seems to me that
assumption was not correct. Basically, we can pick to stream a
transaction that has change messages for
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT and we don't send such
messages to downstream rather they are just to update the internal
state. So, in this particular failure, it is possible that autovacuum
transaction has got such a change message added by one of the other
committed xact and on trying to stream it we get such additional
messages. The fix is to skip empty xacts when indicated by the user in
streaming APIs of test_decoding.Thoughts?
Yeah, that's an issue.
[1] -
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=skink&dt=2020-09-09+03%3A42%3A19
[2] -
/messages/by-id/118303.1599691636@sss.pgh.pa.us
I have written a test case to reproduce the same. I have also prepared
a patch to skip the empty transaction. And after that, the issue has been
fixed. But the extra side effect will be that it would skip any empty
stream even if the transaction is not empty. As such I don't see any
problem with that but this is not what the user has asked for.Isn't that true for non-streaming xacts as well? Basically
skip-empty-xacts option indicates that if there is no change for
'tuple' or 'message', we skip it.
Yeah, that's right.
--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
On Thu, Sep 10, 2020 at 11:53 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
On Thu, Sep 10, 2020 at 11:47 AM Amit Kapila <amit.kapila16@gmail.com>
wrote:On Thu, Sep 10, 2020 at 11:42 AM Dilip Kumar <dilipbalaut@gmail.com>
wrote:On Thu, Sep 10, 2020 at 11:29 AM Amit Kapila <amit.kapila16@gmail.com>
wrote:
Hi,
There is a recent build farm failure [1] in one of the test_decoding
tests as pointed by Tom Lane [2]. The failure report is shown below:@@ -71,6 +71,8 @@ data ------------------------------------------ opening a streamed block for transaction + closing a streamed block for transaction + opening a streamed block for transaction streaming change for transaction streaming change for transaction streaming change for transaction @@ -83,7 +85,7 @@ streaming change for transaction closing a streamed block for transaction committing streamed transaction -(13 rows) +(15 rows)Here, the symptoms are quite similar to what we have fixed in commit
82a0ba7707 which is that an extra empty transaction is being decoded
in the test. It can happen even if have instructed the test to 'skip
empty xacts' for streaming transactions because the test_decoding
plugin APIs (related to streaming changes for in-progress xacts) makes
no effort to skip such empty xacts. It was kept intentionally like
that under the assumption that we would never try to stream empty
xacts but on closer inspection of the code, it seems to me that
assumption was not correct. Basically, we can pick to stream a
transaction that has change messages for
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT and we don't send such
messages to downstream rather they are just to update the internal
state. So, in this particular failure, it is possible that autovacuum
transaction has got such a change message added by one of the other
committed xact and on trying to stream it we get such additional
messages. The fix is to skip empty xacts when indicated by the user in
streaming APIs of test_decoding.Thoughts?
Yeah, that's an issue.
[1] -
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=skink&dt=2020-09-09+03%3A42%3A19
[2] -
/messages/by-id/118303.1599691636@sss.pgh.pa.us
I have written a test case to reproduce the same. I have also prepared
a patch to skip the empty transaction. And after that, the issue has been
fixed. But the extra side effect will be that it would skip any empty
stream even if the transaction is not empty. As such I don't see any
problem with that but this is not what the user has asked for.Isn't that true for non-streaming xacts as well? Basically
skip-empty-xacts option indicates that if there is no change for
'tuple' or 'message', we skip it.Yeah, that's right.
I have removed some comments which are not valid after this patch.
--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
Attachments:
v2-0001-Skip-printing-empty-stream-in-test-decoding.patchapplication/octet-stream; name=v2-0001-Skip-printing-empty-stream-in-test-decoding.patchDownload
From c4211450760130d1235216c3d6aef1ea4b6b5f63 Mon Sep 17 00:00:00 2001
From: dilipkumar <dilipbalaut@gmail.com>
Date: Thu, 10 Sep 2020 11:39:27 +0530
Subject: [PATCH v2] Skip printing empty stream in test decoding
---
contrib/test_decoding/test_decoding.c | 50 ++++++++++++++++++---------
1 file changed, 34 insertions(+), 16 deletions(-)
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 34745150e9..c8df868718 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -64,6 +64,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
Size sz, const char *message);
static void pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
+static void pg_output_stream_start(LogicalDecodingContext *ctx,
+ TestDecodingData *data,
+ ReorderBufferTXN *txn,
+ bool last_write);
static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
@@ -583,16 +587,21 @@ pg_decode_message(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ data->xact_wrote_changes = false;
+ if (data->skip_empty_xacts)
+ return;
+ pg_output_stream_start(ctx, data, txn, true);
+}
+
+static void
+pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
+{
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
@@ -601,16 +610,15 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
@@ -619,10 +627,6 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
@@ -630,6 +634,9 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
@@ -638,10 +645,6 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
@@ -649,6 +652,9 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
@@ -676,6 +682,13 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ /* output stream start if we haven't yet */
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ {
+ pg_output_stream_start(ctx, data, txn, false);
+ }
+ data->xact_wrote_changes = true;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
@@ -722,6 +735,11 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ {
+ pg_output_stream_start(ctx, data, txn, false);
+ }
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
--
2.23.0
On Thu, Sep 10, 2020 at 12:00 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
On Thu, Sep 10, 2020 at 11:53 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
I have written a test case to reproduce the same.
Can we write an isolation test for this scenario? See some similar
tests in contrib/test_decoding/specs. If that is possible then we can
probably remove the test which failed and instead write an isolation
test involving three transactions as shown by you. Also, please
prepare two separate patches (one for test and other for code) if you
are able to convert existing test to an isolation test as that will
make it easier to test the fix.
I have removed some comments which are not valid after this patch.
Few comments:
=============
1. We need to set xact_wrote_changes in pg_decode_stream_truncate() as
well along with the APIs in which you have set it.
2.
+static void
+pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData
*data, ReorderBufferTXN *txn, bool last_write)
+{
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "opening a streamed block for transaction
TXN %u", txn->xid);
@@ -601,16 +610,15 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
In this API, we need to use 'last_write' in OutputPluginPrepareWrite()
and OutputPluginWrite().
The attached patch fixes both these comments.
--
With Regards,
Amit Kapila.
Attachments:
v3-0001-Skip-printing-empty-stream-in-test-decoding.patchapplication/octet-stream; name=v3-0001-Skip-printing-empty-stream-in-test-decoding.patchDownload
From ad21a1582c88b453cde907b2feb608a7c2f53cff Mon Sep 17 00:00:00 2001
From: dilipkumar <dilipbalaut@gmail.com>
Date: Thu, 10 Sep 2020 11:39:27 +0530
Subject: [PATCH v3] Skip printing empty stream in test decoding
---
contrib/test_decoding/test_decoding.c | 55 ++++++++++++++++++---------
1 file changed, 37 insertions(+), 18 deletions(-)
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 34745150e9..e60ab34a5a 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -64,6 +64,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
Size sz, const char *message);
static void pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
+static void pg_output_stream_start(LogicalDecodingContext *ctx,
+ TestDecodingData *data,
+ ReorderBufferTXN *txn,
+ bool last_write);
static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
@@ -583,34 +587,38 @@ pg_decode_message(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
- OutputPluginPrepareWrite(ctx, true);
+ data->xact_wrote_changes = false;
+ if (data->skip_empty_xacts)
+ return;
+ pg_output_stream_start(ctx, data, txn, true);
+}
+
+static void
+pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
+{
+ OutputPluginPrepareWrite(ctx, last_write);
if (data->include_xids)
appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
else
appendStringInfo(ctx->out, "opening a streamed block for transaction");
- OutputPluginWrite(ctx, true);
+ OutputPluginWrite(ctx, last_write);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
@@ -619,10 +627,6 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
@@ -630,6 +634,9 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
@@ -638,10 +645,6 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
@@ -649,6 +652,9 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
@@ -676,6 +682,13 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ /* output stream start if we haven't yet */
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ {
+ pg_output_stream_start(ctx, data, txn, false);
+ }
+ data->xact_wrote_changes = true;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
@@ -722,6 +735,12 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ {
+ pg_output_stream_start(ctx, data, txn, false);
+ }
+ data->xact_wrote_changes = true;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
--
2.28.0.windows.1
On Thu, Sep 10, 2020 at 2:48 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Thu, Sep 10, 2020 at 12:00 PM Dilip Kumar <dilipbalaut@gmail.com>
wrote:On Thu, Sep 10, 2020 at 11:53 AM Dilip Kumar <dilipbalaut@gmail.com>
wrote:
I have written a test case to reproduce the same.
Can we write an isolation test for this scenario? See some similar
tests in contrib/test_decoding/specs. If that is possible then we can
probably remove the test which failed and instead write an isolation
test involving three transactions as shown by you. Also, please
prepare two separate patches (one for test and other for code) if you
are able to convert existing test to an isolation test as that will
make it easier to test the fix.
I have written a test in isolation test. IMHO, we should not try to merge
stream.sql to this isolation test mainly for two reasons a) this isolation
test is very specific that while we are trying to stream we must have the
incomplete changes so if we try to put more operation like
message/truncate/abort then it will become unpredictable. Currently, I
have kept it with just one big tuple so it is a guarantee that whenever the
the logical_decoding_work_mem exceed then it will have the partial
changes. b) we can add another operation in the transaction and cover the
stream changes but then those are not very specific to the isolation test.
So I feel it is better to put only the specific scenario in the isolation
test.
I have removed some comments which are not valid after this patch.
Few comments: ============= 1. We need to set xact_wrote_changes in pg_decode_stream_truncate() as well along with the APIs in which you have set it. 2. +static void +pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) +{ OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid); @@ -601,16 +610,15 @@ pg_decode_stream_start(LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true);In this API, we need to use 'last_write' in OutputPluginPrepareWrite()
and OutputPluginWrite().The attached patch fixes both these comments.
Okay, there is some change in stream.out so I have included that in the
first patch.
--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
Attachments:
v4-0002-Test-case-to-test-streaming-with-concurrent-empty.patchapplication/octet-stream; name=v4-0002-Test-case-to-test-streaming-with-concurrent-empty.patchDownload
From c163dd25c51416e2911d7d92fa690e18b1374918 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Thu, 10 Sep 2020 15:48:35 +0530
Subject: [PATCH v4 2/2] Test case to test streaming with concurrent empty-xact
---
contrib/test_decoding/Makefile | 2 +-
.../expected/concurrent_stream_empty_txn.out | 19 +++++++++++++
.../specs/concurrent_stream_empty_txn.spec | 33 ++++++++++++++++++++++
3 files changed, 53 insertions(+), 1 deletion(-)
create mode 100644 contrib/test_decoding/expected/concurrent_stream_empty_txn.out
create mode 100644 contrib/test_decoding/specs/concurrent_stream_empty_txn.spec
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index ed9a3d6..f6b1bfb 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -7,7 +7,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \
spill slot truncate stream
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
- oldest_xmin snapshot_transfer subxact_without_top
+ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream_empty_txn
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/concurrent_stream_empty_txn.out b/contrib/test_decoding/expected/concurrent_stream_empty_txn.out
new file mode 100644
index 0000000..e731d13
--- /dev/null
+++ b/contrib/test_decoding/expected/concurrent_stream_empty_txn.out
@@ -0,0 +1,19 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s1_commit s1_get_stream_changes
+step s0_begin: BEGIN;
+step s0_ddl: CREATE TABLE stream_test1(data text);
+step s1_ddl: CREATE TABLE stream_test(data text);
+step s1_begin: BEGIN;
+step s1_toast_insert: INSERT INTO stream_test SELECT large_val();
+step s1_commit: COMMIT;
+step s1_get_stream_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+data
+
+opening a streamed block for transaction
+streaming change for transaction
+closing a streamed block for transaction
+committing streamed transaction
+?column?
+
+stop
diff --git a/contrib/test_decoding/specs/concurrent_stream_empty_txn.spec b/contrib/test_decoding/specs/concurrent_stream_empty_txn.spec
new file mode 100644
index 0000000..0fcd742
--- /dev/null
+++ b/contrib/test_decoding/specs/concurrent_stream_empty_txn.spec
@@ -0,0 +1,33 @@
+# Test decoding of transaction with subtransaction. And concurrent empty transactions
+# with ddl operations.
+
+
+setup
+{
+ SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+
+ -- consume DDL
+ SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS 'select array_agg(md5(g::text))::text from generate_series(1, 80000) g';
+}
+
+teardown
+{
+ DROP TABLE IF EXISTS stream_test;
+ DROP TABLE IF EXISTS stream_test1;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_begin" { BEGIN; }
+step "s0_ddl" {CREATE TABLE stream_test1(data text);}
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_ddl" { CREATE TABLE stream_test(data text); }
+step "s1_begin" { BEGIN; }
+step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();}
+step "s1_commit" { COMMIT; }
+step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');}
+permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s1_commit" "s1_get_stream_changes"
--
1.8.3.1
v4-0001-Skip-printing-empty-stream-in-test-decoding.patchapplication/octet-stream; name=v4-0001-Skip-printing-empty-stream-in-test-decoding.patchDownload
From e96b27cedec641255dcc395739b490da7f288910 Mon Sep 17 00:00:00 2001
From: dilipkumar <dilipbalaut@gmail.com>
Date: Thu, 10 Sep 2020 11:39:27 +0530
Subject: [PATCH v4 1/2] Skip printing empty stream in test decoding
---
contrib/test_decoding/expected/stream.out | 5 +--
contrib/test_decoding/test_decoding.c | 55 +++++++++++++++++++++----------
2 files changed, 38 insertions(+), 22 deletions(-)
diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
index d7e32f8..e1c3bc8 100644
--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -29,10 +29,7 @@ COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
- opening a streamed block for transaction
streaming message: transactional: 1 prefix: test, sz: 50
- closing a streamed block for transaction
- aborting streamed (sub)transaction
opening a streamed block for transaction
streaming change for transaction
streaming change for transaction
@@ -56,7 +53,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
streaming change for transaction
closing a streamed block for transaction
committing streamed transaction
-(27 rows)
+(24 rows)
-- streaming test for toast changes
ALTER TABLE stream_test ALTER COLUMN data set storage external;
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 3474515..e60ab34 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -64,6 +64,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
Size sz, const char *message);
static void pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
+static void pg_output_stream_start(LogicalDecodingContext *ctx,
+ TestDecodingData *data,
+ ReorderBufferTXN *txn,
+ bool last_write);
static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
@@ -583,34 +587,38 @@ pg_decode_message(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
- OutputPluginPrepareWrite(ctx, true);
+ data->xact_wrote_changes = false;
+ if (data->skip_empty_xacts)
+ return;
+ pg_output_stream_start(ctx, data, txn, true);
+}
+
+static void
+pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
+{
+ OutputPluginPrepareWrite(ctx, last_write);
if (data->include_xids)
appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
else
appendStringInfo(ctx->out, "opening a streamed block for transaction");
- OutputPluginWrite(ctx, true);
+ OutputPluginWrite(ctx, last_write);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
@@ -619,10 +627,6 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
@@ -630,6 +634,9 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
@@ -638,10 +645,6 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
@@ -649,6 +652,9 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
@@ -676,6 +682,13 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ /* output stream start if we haven't yet */
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ {
+ pg_output_stream_start(ctx, data, txn, false);
+ }
+ data->xact_wrote_changes = true;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
@@ -722,6 +735,12 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ {
+ pg_output_stream_start(ctx, data, txn, false);
+ }
+ data->xact_wrote_changes = true;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
--
1.8.3.1